Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ edition = "2024"

[workspace.dependencies]
async-trait = "0.1.89"
code0-flow = { version = "0.0.32" }
tucana = { version = "0.0.68" }
code0-flow = { version = "0.0.33" }
tucana = { version = "0.0.70" }
Comment thread
raphael-goetz marked this conversation as resolved.
tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] }
log = "0.4.27"
futures-lite = "2.6.0"
Expand Down
8 changes: 7 additions & 1 deletion crates/taurus-core/src/runtime/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ mod tests {
value: Some(NodeValue {
value: Some(node_value::Value::LiteralValue(value)),
}),
cast: None,
}
}

Expand All @@ -189,8 +190,11 @@ mod tests {
database_id,
runtime_parameter_id: runtime_parameter_id.to_string(),
value: Some(NodeValue {
value: Some(node_value::Value::NodeFunctionId(node_id)),
value: Some(node_value::Value::SubFlow(unimplemented!(
"Taurus needs to handle SubFlows (issue nr #184)"
))),
Comment on lines 192 to +195
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test helper constructs a NodeParameter by executing unimplemented!(), which will panic immediately when the test builds its graph (before the engine even runs). Please construct a real node_value::Value::SubFlow representing a thunk/sub-flow (or refactor tests to the new parameter encoding) so the tests can execute.

Copilot uses AI. Check for mistakes.
}),
cast: None,
}
}

Expand All @@ -208,6 +212,7 @@ mod tests {
paths: Vec::new(),
})),
}),
cast: None,
}
}

Expand Down Expand Up @@ -276,6 +281,7 @@ mod tests {
paths: Vec::new(),
})),
}),
cast: None,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/taurus-core/src/runtime/engine/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub fn compile_flow(
let arg = match value {
node_value::Value::LiteralValue(v) => CompiledArg::Literal(v.clone()),
node_value::Value::ReferenceValue(r) => CompiledArg::Reference(r.clone()),
node_value::Value::NodeFunctionId(id) => CompiledArg::DeferredNode(*id),
node_value::Value::SubFlow(_sub_flow) => unimplemented!("Taurus needs to handle SubFlows (issue nr #184)"),
};
Comment thread
raphael-goetz marked this conversation as resolved.

parameters.push(CompiledParameter {
Expand Down
6 changes: 3 additions & 3 deletions crates/taurus-core/src/runtime/engine/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::cell::RefCell;
use std::collections::HashMap;

use futures_lite::future::block_on;
use tucana::aquila::ExecutionRequest;
use tucana::aquila::ActionExecutionRequest;
use tucana::shared::reference_value::Target;
use tucana::shared::value::Kind;
use tucana::shared::{Struct, Value};
Expand Down Expand Up @@ -485,7 +485,7 @@ impl<'a> EngineExecutor<'a> {
&self,
node: &CompiledNode,
values: Vec<Value>,
) -> Result<ExecutionRequest, RuntimeError> {
) -> Result<ActionExecutionRequest, RuntimeError> {
if node.parameters.len() != values.len() {
return Err(RuntimeError::new(
"T-CORE-000005",
Expand All @@ -499,7 +499,7 @@ impl<'a> EngineExecutor<'a> {
fields.insert(parameter.runtime_parameter_id.clone(), value);
}

Ok(ExecutionRequest {
Ok(ActionExecutionRequest {
execution_identifier: Uuid::new_v4().to_string(),
function_identifier: node.handler_id.clone(),
parameters: Some(Struct { fields }),
Expand Down
5 changes: 3 additions & 2 deletions crates/taurus-core/src/runtime/remote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
//! trait without coupling the core engine to a specific transport.
use async_trait::async_trait;
use tucana::{aquila::ExecutionRequest, shared::Value};
use tucana::{aquila::ActionExecutionRequest, shared::Value};

use crate::types::errors::runtime_error::RuntimeError;

pub struct RemoteExecution {
/// Remote service identifier to route the call.
pub target_service: String,
/// Execution request payload expected by the remote runtime.
pub request: ExecutionRequest,
pub request: ActionExecutionRequest,

}

#[async_trait]
Expand Down
26 changes: 4 additions & 22 deletions crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use prost::Message;
use taurus_core::runtime::remote::{RemoteExecution, RemoteRuntime};
use taurus_core::types::errors::runtime_error::RuntimeError;
use tonic::async_trait;
use tucana::aquila::ExecutionResult;
use tucana::aquila::ActionExecutionResponse;
use tucana::shared::Value;

pub struct NATSRemoteRuntime {
Expand Down Expand Up @@ -42,8 +42,8 @@ impl RemoteRuntime for NATSRemoteRuntime {
}
};

let decode_result = ExecutionResult::decode(message.payload);
let execution_result = match decode_result {
let decode_result = ActionExecutionResponse::decode(message.payload);
let _execution_result = match decode_result {
Ok(r) => r,
Comment on lines +45 to 47
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute_remote decodes an ActionExecutionResponse but then discards it and never produces a Result<Value, RuntimeError> based on the response. This currently makes all remote executions unusable; please map the decoded response to Ok(Value) / Err(RuntimeError) (including handling missing/failed results) instead of ignoring it.

Copilot uses AI. Check for mistakes.
Err(err) => {
log::error!(
Expand All @@ -58,24 +58,6 @@ impl RemoteRuntime for NATSRemoteRuntime {
}
};

match execution_result.result {
Some(result) => match result {
tucana::aquila::execution_result::Result::Success(value) => Ok(value),
tucana::aquila::execution_result::Result::Error(err) => {
let code = err.code.to_string();
let description = match err.description {
Some(string) => string,
None => "Unknown Error".to_string(),
};
let error = RuntimeError::new(code, "RemoteExecutionError", description);
Err(error)
}
},
None => Err(RuntimeError::new(
"T-PROV-000003",
"RemoteRuntimeExeption",
"Result of Remote Response was empty.",
)),
}
unimplemented!("Taurus needs to handle text executions (issue nr #185)")
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving unimplemented!() in this runtime path will panic any time a remote node executes. Please replace this with actual response handling (or, at minimum, return a structured RuntimeError with an appropriate stable code/category) so production flows don't crash.

Suggested change
unimplemented!("Taurus needs to handle text executions (issue nr #185)")
Err(RuntimeError::new(
"T-PROV-000003",
"RemoteRuntimeUnsupportedResponse",
"Remote runtime response handling is not implemented for this execution type.",
))

Copilot uses AI. Check for mistakes.
}
}
18 changes: 1 addition & 17 deletions crates/taurus/src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
mod worker;

use std::time::Duration;

use code0_flow::flow_config::load_env_file;
use code0_flow::flow_config::mode::Mode::DYNAMIC;
use code0_flow::flow_service::FlowUpdateService;
use std::time::Duration;
use taurus_core::runtime::engine::ExecutionEngine;
use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter;
use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime;
use tokio::signal;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tonic_health::pb::health_server::HealthServer;
use tucana::shared::{RuntimeFeature, Translation};

use crate::client::runtime_status::TaurusRuntimeStatusService;
use crate::client::runtime_usage::TaurusRuntimeUsageService;
Expand Down Expand Up @@ -114,7 +112,6 @@ async fn setup_dynamic_services_if_needed(
config.aquila_url.clone(),
config.aquila_token.clone(),
"taurus".into(),
runtime_features(),
)
.await,
);
Expand Down Expand Up @@ -152,19 +149,6 @@ async fn push_definitions_until_success(config: &Config) {
}
}

fn runtime_features() -> Vec<RuntimeFeature> {
vec![RuntimeFeature {
name: vec![Translation {
code: "en-US".to_string(),
content: "Runtime".to_string(),
}],
description: vec![Translation {
code: "en-US".to_string(),
content: "Will execute incoming flows.".to_string(),
}],
}]
}

async fn update_stopped_status(runtime_status_service: Option<&TaurusRuntimeStatusService>) {
if let Some(status_service) = runtime_status_service {
status_service
Expand Down
21 changes: 4 additions & 17 deletions crates/taurus/src/client/runtime_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,25 @@ use tucana::{
RuntimeStatusUpdateRequest, runtime_status_service_client::RuntimeStatusServiceClient,
runtime_status_update_request::Status,
},
shared::{ExecutionRuntimeStatus, RuntimeFeature},
shared::ExecutionRuntimeStatus,
};

pub struct TaurusRuntimeStatusService {
channel: Channel,
identifier: String,
features: Vec<RuntimeFeature>,
aquila_token: String,
}

impl TaurusRuntimeStatusService {
pub async fn from_url(
aquila_url: String,
aquila_token: String,
identifier: String,
features: Vec<RuntimeFeature>,
) -> Self {
pub async fn from_url(aquila_url: String, aquila_token: String, identifier: String) -> Self {
let channel = create_channel_with_retry("Aquila", aquila_url).await;
Self::new(channel, aquila_token, identifier, features)
Self::new(channel, aquila_token, identifier)
}

pub fn new(
channel: Channel,
aquila_token: String,
identifier: String,
features: Vec<RuntimeFeature>,
) -> Self {
pub fn new(channel: Channel, aquila_token: String, identifier: String) -> Self {
TaurusRuntimeStatusService {
channel,
identifier,
features,
aquila_token,
}
}
Expand Down Expand Up @@ -68,7 +56,6 @@ impl TaurusRuntimeStatusService {
status: status.into(),
timestamp: timestamp as i64,
identifier: self.identifier.clone(),
features: self.features.clone(),
})),
},
);
Expand Down
Loading