From 970a4d667f918c950a837b286ea80dcecc7d9f7e Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 18 Jun 2026 20:47:09 +0200 Subject: [PATCH] fix: correct async impl for nats --- crates/taurus-core/src/runtime/engine.rs | 304 ++++++++++++++++-- .../src/runtime/engine/compiler.rs | 36 ++- .../taurus-core/src/runtime/engine/emitter.rs | 4 +- .../src/runtime/engine/executor.rs | 192 +++++++++-- .../taurus-core/src/runtime/functions/http.rs | 7 +- crates/taurus-core/src/runtime/remote/mod.rs | 2 +- .../providers/remote/nats_remote_runtime.rs | 77 ++++- crates/taurus/src/app/mod.rs | 7 +- crates/taurus/src/app/worker.rs | 27 +- docs/errors.md | 1 + 10 files changed, 580 insertions(+), 77 deletions(-) diff --git a/crates/taurus-core/src/runtime/engine.rs b/crates/taurus-core/src/runtime/engine.rs index dafd116..fef2aa8 100644 --- a/crates/taurus-core/src/runtime/engine.rs +++ b/crates/taurus-core/src/runtime/engine.rs @@ -8,6 +8,7 @@ mod emitter; mod executor; mod model; +use futures_lite::future::block_on; use tucana::shared::{ExecutionFlow, NodeExecutionResult, NodeFunction, Value}; use crate::handler::registry::FunctionStore; @@ -59,13 +60,33 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> (Signal, ExitReason) { - let report = self.execute_flow_with_execution_id_report( + let report = block_on(self.execute_flow_with_execution_id_report_async( ExecutionId::new_v4(), flow, remote, respond_emitter, with_trace, - ); + )); + (report.signal, report.exit_reason) + } + + /// Execute an `ExecutionFlow` asynchronously. + pub async fn execute_flow_async( + &self, + flow: ExecutionFlow, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> (Signal, ExitReason) { + let report = self + .execute_flow_with_execution_id_report_async( + ExecutionId::new_v4(), + flow, + remote, + respond_emitter, + with_trace, + ) + .await; (report.signal, report.exit_reason) } @@ -78,13 +99,34 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> (Signal, ExitReason) { - let report = self.execute_flow_with_execution_id_report( + let report = block_on(self.execute_flow_with_execution_id_report_async( execution_id, flow, remote, respond_emitter, with_trace, - ); + )); + (report.signal, report.exit_reason) + } + + /// Execute an `ExecutionFlow` asynchronously with a caller-provided execution id. + pub async fn execute_flow_with_execution_id_async( + &self, + execution_id: ExecutionId, + flow: ExecutionFlow, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> (Signal, ExitReason) { + let report = self + .execute_flow_with_execution_id_report_async( + execution_id, + flow, + remote, + respond_emitter, + with_trace, + ) + .await; (report.signal, report.exit_reason) } @@ -96,13 +138,31 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> EngineExecutionReport { - self.execute_flow_with_execution_id_report( + block_on(self.execute_flow_with_execution_id_report_async( + ExecutionId::new_v4(), + flow, + remote, + respond_emitter, + with_trace, + )) + } + + /// Execute an `ExecutionFlow` asynchronously and return per-node execution results. + pub async fn execute_flow_report_async( + &self, + flow: ExecutionFlow, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> EngineExecutionReport { + self.execute_flow_with_execution_id_report_async( ExecutionId::new_v4(), flow, remote, respond_emitter, with_trace, ) + .await } /// Execute an `ExecutionFlow` with a caller-provided execution id and return per-node results. @@ -114,7 +174,25 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> EngineExecutionReport { - self.execute_graph_with_execution_id_report( + block_on(self.execute_flow_with_execution_id_report_async( + execution_id, + flow, + remote, + respond_emitter, + with_trace, + )) + } + + /// Execute an `ExecutionFlow` asynchronously with a caller-provided execution id and return per-node results. + pub async fn execute_flow_with_execution_id_report_async( + &self, + execution_id: ExecutionId, + flow: ExecutionFlow, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> EngineExecutionReport { + self.execute_graph_with_execution_id_report_async( execution_id, flow.starting_node_id, flow.node_functions, @@ -123,6 +201,7 @@ impl ExecutionEngine { respond_emitter, with_trace, ) + .await } /// Execute a graph described by node list and start node. @@ -135,7 +214,7 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> (Signal, ExitReason) { - let report = self.execute_graph_with_execution_id_report( + let report = block_on(self.execute_graph_with_execution_id_report_async( ExecutionId::new_v4(), start_node_id, node_functions, @@ -143,7 +222,31 @@ impl ExecutionEngine { remote, respond_emitter, with_trace, - ); + )); + (report.signal, report.exit_reason) + } + + /// Execute a graph asynchronously. + pub async fn execute_graph_async( + &self, + start_node_id: i64, + node_functions: Vec, + flow_input: Option, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> (Signal, ExitReason) { + let report = self + .execute_graph_with_execution_id_report_async( + ExecutionId::new_v4(), + start_node_id, + node_functions, + flow_input, + remote, + respond_emitter, + with_trace, + ) + .await; (report.signal, report.exit_reason) } @@ -158,7 +261,7 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> (Signal, ExitReason) { - let report = self.execute_graph_with_execution_id_report( + let report = block_on(self.execute_graph_with_execution_id_report_async( execution_id, start_node_id, node_functions, @@ -166,7 +269,32 @@ impl ExecutionEngine { remote, respond_emitter, with_trace, - ); + )); + (report.signal, report.exit_reason) + } + + /// Execute a graph asynchronously with a caller-provided execution id. + pub async fn execute_graph_with_execution_id_async( + &self, + execution_id: ExecutionId, + start_node_id: i64, + node_functions: Vec, + flow_input: Option, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> (Signal, ExitReason) { + let report = self + .execute_graph_with_execution_id_report_async( + execution_id, + start_node_id, + node_functions, + flow_input, + remote, + respond_emitter, + with_trace, + ) + .await; (report.signal, report.exit_reason) } @@ -180,7 +308,28 @@ impl ExecutionEngine { respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, ) -> EngineExecutionReport { - self.execute_graph_with_execution_id_report( + block_on(self.execute_graph_with_execution_id_report_async( + ExecutionId::new_v4(), + start_node_id, + node_functions, + flow_input, + remote, + respond_emitter, + with_trace, + )) + } + + /// Execute a graph asynchronously and return per-node execution results. + pub async fn execute_graph_report_async( + &self, + start_node_id: i64, + node_functions: Vec, + flow_input: Option, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, + ) -> EngineExecutionReport { + self.execute_graph_with_execution_id_report_async( ExecutionId::new_v4(), start_node_id, node_functions, @@ -189,6 +338,7 @@ impl ExecutionEngine { respond_emitter, with_trace, ) + .await } /// Execute a graph with a caller-provided execution id and return per-node results. @@ -201,6 +351,28 @@ impl ExecutionEngine { remote: Option<&dyn RemoteRuntime>, respond_emitter: Option<&dyn RespondEmitter>, with_trace: bool, + ) -> EngineExecutionReport { + block_on(self.execute_graph_with_execution_id_report_async( + execution_id, + start_node_id, + node_functions, + flow_input, + remote, + respond_emitter, + with_trace, + )) + } + + /// Execute a graph asynchronously with a caller-provided execution id and return per-node results. + pub async fn execute_graph_with_execution_id_report_async( + &self, + execution_id: ExecutionId, + start_node_id: i64, + node_functions: Vec, + flow_input: Option, + remote: Option<&dyn RemoteRuntime>, + respond_emitter: Option<&dyn RespondEmitter>, + with_trace: bool, ) -> EngineExecutionReport { if let Some(emitter) = respond_emitter { emitter.emit(execution_id, EmitType::StartingExec, null_value()); @@ -235,7 +407,8 @@ impl ExecutionEngine { execution_id, respond_emitter, with_trace, - ); + ) + .await; if with_trace && let Some(trace_run) = trace_run { println!( "{}", @@ -271,7 +444,7 @@ mod tests { use crate::runtime::remote::{RemoteExecution, RemoteRuntime}; use crate::types::exit_reason::ExitReason; use async_trait::async_trait; - use std::cell::RefCell; + use std::sync::{Arc, Mutex}; use std::time::Duration; use tucana::shared::{ InputType, ListValue, NodeExecutionResult, NodeParameter, NodeValue, ReferenceValue, @@ -454,15 +627,23 @@ mod tests { #[derive(Clone)] struct StubRemoteRuntime { result: NodeExecutionResult, + target_services: Option>>>, } #[async_trait] impl RemoteRuntime for StubRemoteRuntime { async fn execute_remote( &self, - _execution: RemoteExecution, + execution: RemoteExecution, ) -> Result { + if let Some(target_services) = &self.target_services { + target_services + .lock() + .expect("target service recorder should not be poisoned") + .push(execution.target_service); + } + Ok(self.result.clone()) } } @@ -1053,6 +1234,7 @@ mod tests { id: Some(node_execution_result::Id::NodeId(99)), result: None, }, + target_services: None, }; let mut remote_node = node( 1, @@ -1085,6 +1267,67 @@ mod tests { } } + #[test] + fn remote_execution_strips_action_prefix_from_definition_source() { + let engine = ExecutionEngine::new(); + let target_services = Arc::new(Mutex::new(Vec::new())); + let remote = StubRemoteRuntime { + result: NodeExecutionResult { + started_at: 1, + finished_at: 2, + parameter_results: Vec::new(), + id: Some(node_execution_result::Id::NodeId(99)), + result: Some(node_execution_result::Result::Success(string_value("ok"))), + }, + target_services: Some(Arc::clone(&target_services)), + }; + let mut remote_node = node( + 1, + "remote::stripped_service", + vec![literal_param(100, "payload", int_value(20))], + None, + ); + remote_node.definition_source = Some("action.example".to_string()); + + let report = + engine.execute_graph_report(1, vec![remote_node], None, Some(&remote), None, false); + + assert_eq!(report.exit_reason, ExitReason::Success); + assert_eq!( + *target_services + .lock() + .expect("target service recorder should not be poisoned"), + vec!["example".to_string()] + ); + } + + #[test] + fn remote_execution_rejects_empty_action_definition_source() { + let engine = ExecutionEngine::new(); + let mut remote_node = node( + 1, + "remote::empty_service", + vec![literal_param(100, "payload", int_value(20))], + None, + ); + remote_node.definition_source = Some("action.".to_string()); + + let report = engine.execute_graph_report(1, vec![remote_node], None, None, None, false); + + assert_eq!(report.exit_reason, ExitReason::Failure); + assert!(report.node_execution_results.is_empty()); + match report.signal { + Signal::Failure(err) => { + assert_eq!(err.code, "T-CORE-000106"); + assert_eq!(err.category, "FlowCompileError"); + } + other => panic!( + "expected invalid definition_source failure, got {:?}", + other + ), + } + } + #[test] fn node_execution_result_tracks_actual_node_duration() { let mut handlers = FunctionStore::new(); @@ -1287,9 +1530,12 @@ mod tests { #[test] fn emitter_emits_start_and_finish_for_successful_execution() { let engine = ExecutionEngine::new(); - let events = RefCell::new(Vec::::new()); + let events = Mutex::new(Vec::::new()); let emitter = |_execution_id, emit_type: EmitType, _value: Value| { - events.borrow_mut().push(emit_type); + events + .lock() + .expect("event recorder should not be poisoned") + .push(emit_type); }; let add_node = node( @@ -1306,7 +1552,9 @@ mod tests { engine.execute_graph(1, vec![add_node], None, None, Some(&emitter), false); assert_eq!(reason, ExitReason::Success); assert_eq!( - *events.borrow(), + *events + .lock() + .expect("event recorder should not be poisoned"), vec![EmitType::StartingExec, EmitType::FinishedExec] ); } @@ -1314,9 +1562,12 @@ mod tests { #[test] fn emitter_emits_ongoing_for_intermediate_respond() { let engine = ExecutionEngine::new(); - let events = RefCell::new(Vec::::new()); + let events = Mutex::new(Vec::::new()); let emitter = |_execution_id, emit_type: EmitType, _value: Value| { - events.borrow_mut().push(emit_type); + events + .lock() + .expect("event recorder should not be poisoned") + .push(emit_type); }; let respond_node = node( @@ -1349,7 +1600,9 @@ mod tests { ); assert_eq!(reason, ExitReason::Success); assert_eq!( - *events.borrow(), + *events + .lock() + .expect("event recorder should not be poisoned"), vec![ EmitType::StartingExec, EmitType::OngoingExec, @@ -1361,9 +1614,12 @@ mod tests { #[test] fn emitter_emits_failed_for_runtime_failure() { let engine = ExecutionEngine::new(); - let events = RefCell::new(Vec::::new()); + let events = Mutex::new(Vec::::new()); let emitter = |_execution_id, emit_type: EmitType, _value: Value| { - events.borrow_mut().push(emit_type); + events + .lock() + .expect("event recorder should not be poisoned") + .push(emit_type); }; let invalid_add_node = node(1, "std::number::add", vec![], None); @@ -1372,7 +1628,9 @@ mod tests { engine.execute_graph(1, vec![invalid_add_node], None, None, Some(&emitter), false); assert_eq!(reason, ExitReason::Failure); assert_eq!( - *events.borrow(), + *events + .lock() + .expect("event recorder should not be poisoned"), vec![EmitType::StartingExec, EmitType::FailedExec] ); } diff --git a/crates/taurus-core/src/runtime/engine/compiler.rs b/crates/taurus-core/src/runtime/engine/compiler.rs index 47d7eb4..1f039ce 100644 --- a/crates/taurus-core/src/runtime/engine/compiler.rs +++ b/crates/taurus-core/src/runtime/engine/compiler.rs @@ -35,6 +35,10 @@ pub enum CompileError { node_id: i64, parameter_index: usize, }, + EmptyRemoteService { + node_id: i64, + definition_source: String, + }, } impl CompileError { @@ -88,6 +92,17 @@ impl CompileError { node_id, parameter_index ), ), + CompileError::EmptyRemoteService { + node_id, + definition_source, + } => RuntimeError::new( + "T-CORE-000106", + "FlowCompileError", + format!( + "Node {} definition_source '{}' does not contain a remote service name", + node_id, definition_source + ), + ), } } } @@ -134,7 +149,7 @@ pub fn compile_flow( None => None, }; - let execution_target = execution_target_for(&node); + let execution_target = execution_target_for(node_id, &node)?; let mut parameters = Vec::with_capacity(node.parameters.len()); for (parameter_index, parameter) in node.parameters.iter().enumerate() { @@ -198,12 +213,21 @@ pub fn compile_flow( }) } -fn execution_target_for(node: &NodeFunction) -> NodeExecutionTarget { +fn execution_target_for( + node_id: i64, + node: &NodeFunction, +) -> Result { match node.definition_source.as_deref() { - None | Some("") | Some("taurus") => NodeExecutionTarget::Local, - Some(source) if source.starts_with("draco") => NodeExecutionTarget::Local, - Some(service) => NodeExecutionTarget::Remote { - service: service.to_string(), + None | Some("") | Some("taurus") => Ok(NodeExecutionTarget::Local), + Some(source) if source.starts_with("draco") => Ok(NodeExecutionTarget::Local), + Some(service) => match service.strip_prefix("action.").unwrap_or(service) { + "" => Err(CompileError::EmptyRemoteService { + node_id, + definition_source: service.to_string(), + }), + service => Ok(NodeExecutionTarget::Remote { + service: service.to_string(), + }), }, } } diff --git a/crates/taurus-core/src/runtime/engine/emitter.rs b/crates/taurus-core/src/runtime/engine/emitter.rs index e88600f..ab41951 100644 --- a/crates/taurus-core/src/runtime/engine/emitter.rs +++ b/crates/taurus-core/src/runtime/engine/emitter.rs @@ -22,13 +22,13 @@ pub enum EmitType { } /// Callback interface for streaming execution lifecycle events. -pub trait RespondEmitter { +pub trait RespondEmitter: Send + Sync { fn emit(&self, execution_id: ExecutionId, emit_type: EmitType, value: Value); } impl RespondEmitter for F where - F: Fn(ExecutionId, EmitType, Value) + ?Sized, + F: Fn(ExecutionId, EmitType, Value) + Send + Sync + ?Sized, { fn emit(&self, execution_id: ExecutionId, emit_type: EmitType, value: Value) { self(execution_id, emit_type, value); diff --git a/crates/taurus-core/src/runtime/engine/executor.rs b/crates/taurus-core/src/runtime/engine/executor.rs index ce3dbd5..91902b0 100644 --- a/crates/taurus-core/src/runtime/engine/executor.rs +++ b/crates/taurus-core/src/runtime/engine/executor.rs @@ -1,9 +1,8 @@ //! Runtime engine execution loop for compiled flow plans. -use std::cell::RefCell; use std::collections::HashMap; +use std::sync::Mutex; -use futures_lite::future::block_on; use tucana::aquila::ActionExecutionRequest; use tucana::shared::node_execution_result::Result as TucanaNodeResult; use tucana::shared::reference_value::Target; @@ -30,7 +29,7 @@ use crate::time::now_unix_micros; use crate::types::errors::runtime_error::RuntimeError; use crate::types::signal::Signal; -pub fn execute_compiled( +pub async fn execute_compiled( flow: &CompiledFlow, handlers: &FunctionStore, value_store: &mut ValueStore, @@ -40,7 +39,7 @@ pub fn execute_compiled( with_trace: bool, ) -> (Signal, Option) { // Keep trace allocation fully optional so the hot path stays lean when tracing is disabled. - let tracer = with_trace.then(RefCell::default); + let tracer = with_trace.then(Mutex::default); let executor = EngineExecutor { flow, handlers, @@ -50,8 +49,10 @@ pub fn execute_compiled( tracer: tracer.as_ref(), }; - let result = executor.execute_from_index(flow.start_idx, value_store); - let trace = tracer.and_then(|collector| collector.into_inner().take_run()); + let result = executor + .execute_from_index(flow.start_idx, value_store) + .await; + let trace = tracer.and_then(|collector| collector.into_inner().ok()?.take_run()); (result.signal, trace) } @@ -82,11 +83,11 @@ struct EngineExecutor<'a> { remote: Option<&'a dyn RemoteRuntime>, execution_id: ExecutionId, respond_emitter: Option<&'a dyn RespondEmitter>, - tracer: Option<&'a RefCell>, + tracer: Option<&'a Mutex>, } impl<'a> EngineExecutor<'a> { - fn execute_from_index( + async fn execute_from_index( &self, start_idx: usize, value_store: &mut ValueStore, @@ -99,7 +100,7 @@ impl<'a> EngineExecutor<'a> { loop { let node_id = self.flow.nodes[current_idx].id; let next_idx = self.flow.nodes[current_idx].next_idx; - let result = self.execute_single_node(current_idx, value_store); + let result = self.execute_single_node(current_idx, value_store).await; if call_root_frame.is_none() { call_root_frame = result.frame_id; @@ -156,10 +157,77 @@ impl<'a> EngineExecutor<'a> { } } + fn execute_from_index_sync( + &self, + start_idx: usize, + value_store: &mut ValueStore, + ) -> ExecutionResult { + // Synchronous thunk execution is retained for local handler callbacks. + let mut current_idx = start_idx; + let mut call_root_frame = None; + let mut previous_frame = None; + + loop { + let node_id = self.flow.nodes[current_idx].id; + let next_idx = self.flow.nodes[current_idx].next_idx; + let result = self.execute_single_node_sync(current_idx, value_store); + + if call_root_frame.is_none() { + call_root_frame = result.frame_id; + } + if let (Some(prev), Some(current)) = (previous_frame, result.frame_id) { + self.trace_link_child(prev, current, EdgeKind::Next); + } + if let Some(frame) = result.frame_id { + previous_frame = Some(frame); + } + + match result.signal { + Signal::Success(value) => match next_idx { + Some(next) => current_idx = next, + None => { + return ExecutionResult { + signal: Signal::Success(value), + root_frame: call_root_frame, + }; + } + }, + Signal::Respond(value) => { + if let Some(emitter) = self.respond_emitter { + emitter.emit(self.execution_id, EmitType::OngoingExec, value.clone()); + } + + value_store.insert_success_with_timing( + node_id, + value.clone(), + result.parameter_results, + result.started_at, + result.finished_at, + ); + match next_idx { + Some(next) => current_idx = next, + None => { + return ExecutionResult { + signal: Signal::Success(value), + root_frame: call_root_frame, + }; + } + } + } + other => { + return ExecutionResult { + signal: other, + root_frame: call_root_frame, + }; + } + } + } + } + fn execute_from_node_id(&self, node_id: i64, value_store: &mut ValueStore) -> ExecutionResult { // Used by thunk execution (callbacks, branch blocks, eager parameter nodes). match self.flow.node_idx_by_id.get(&node_id).copied() { - Some(idx) => self.execute_from_index(idx, value_store), + Some(idx) => self.execute_from_index_sync(idx, value_store), None => ExecutionResult { signal: Signal::Failure(RuntimeError::new( "T-CORE-000001", @@ -267,7 +335,11 @@ impl<'a> EngineExecutor<'a> { } } - fn execute_single_node(&self, node_idx: usize, value_store: &mut ValueStore) -> NodeResult { + async fn execute_single_node( + &self, + node_idx: usize, + value_store: &mut ValueStore, + ) -> NodeResult { let node = &self.flow.nodes[node_idx]; // InputType references resolve against the currently running node. value_store.set_current_node_id(node.id); @@ -297,7 +369,9 @@ impl<'a> EngineExecutor<'a> { } NodeExecutionTarget::Remote { service } => { let started_at = now_unix_micros(); - let signal = self.execute_remote_node(node, service, value_store, frame_id); + let signal = self + .execute_remote_node(node, service, value_store, frame_id) + .await; let finished_at = now_unix_micros(); NodeResult { signal, @@ -313,6 +387,65 @@ impl<'a> EngineExecutor<'a> { result } + fn execute_single_node_sync( + &self, + node_idx: usize, + value_store: &mut ValueStore, + ) -> NodeResult { + let node = &self.flow.nodes[node_idx]; + value_store.set_current_node_id(node.id); + + let frame_id = self.trace_enter(node, value_store); + let result = match &node.execution_target { + NodeExecutionTarget::Local => { + let started_at = now_unix_micros(); + let executed = self.execute_local_node(node, value_store, frame_id); + let finished_at = now_unix_micros(); + let parameter_results = executed.parameter_results; + let signal = self.commit_result( + node.id, + executed.signal, + parameter_results.clone(), + started_at, + finished_at, + value_store, + ); + NodeResult { + signal, + frame_id, + parameter_results, + started_at, + finished_at, + } + } + NodeExecutionTarget::Remote { .. } => { + let started_at = now_unix_micros(); + let signal = self.commit_result( + node.id, + Signal::Failure(RuntimeError::new( + "T-CORE-000004", + "RemoteRuntimeRequiresAsyncExecution", + "Remote runtime nodes cannot be executed from a synchronous thunk callback", + )), + Vec::new(), + started_at, + now_unix_micros(), + value_store, + ); + NodeResult { + signal, + frame_id, + parameter_results: Vec::new(), + started_at, + finished_at: now_unix_micros(), + } + } + }; + self.trace_exit(frame_id, &result.signal, value_store); + + result + } + fn execute_local_node( &self, node: &CompiledNode, @@ -369,7 +502,7 @@ impl<'a> EngineExecutor<'a> { } } - fn execute_remote_node( + async fn execute_remote_node( &self, node: &CompiledNode, service: &str, @@ -438,10 +571,13 @@ impl<'a> EngineExecutor<'a> { } }; - match block_on(remote_runtime.execute_remote(RemoteExecution { - target_service: service.to_string(), - request, - })) { + match remote_runtime + .execute_remote(RemoteExecution { + target_service: service.to_string(), + request, + }) + .await + { Ok(result) => self.commit_remote_result( node.id, result, @@ -859,7 +995,8 @@ impl<'a> EngineExecutor<'a> { ) -> Option { self.tracer.map(|tracer| { tracer - .borrow_mut() + .lock() + .expect("trace collector should not be poisoned") .enter_node(node_id, function_name, value_store.trace_snapshot()) }) } @@ -888,19 +1025,26 @@ impl<'a> EngineExecutor<'a> { Signal::Stop => Outcome::Stop, }; tracer - .borrow_mut() + .lock() + .expect("trace collector should not be poisoned") .exit_node(frame_id, outcome, value_store.trace_snapshot()); } fn trace_record_arg(&self, frame_id: Option, arg: ArgTrace) { if let (Some(frame_id), Some(tracer)) = (frame_id, self.tracer) { - tracer.borrow_mut().record_arg(frame_id, arg); + tracer + .lock() + .expect("trace collector should not be poisoned") + .record_arg(frame_id, arg); } } fn trace_link_child(&self, parent: u64, child: u64, edge: EdgeKind) { if let Some(tracer) = self.tracer { - tracer.borrow_mut().link_child(parent, child, edge); + tracer + .lock() + .expect("trace collector should not be poisoned") + .link_child(parent, child, edge); } } @@ -913,7 +1057,8 @@ impl<'a> EngineExecutor<'a> { ) { if let (Some(frame_id), Some(tracer)) = (frame_id, self.tracer) { tracer - .borrow_mut() + .lock() + .expect("trace collector should not be poisoned") .mark_thunk(frame_id, arg_index, eager, executed); } } @@ -921,7 +1066,8 @@ impl<'a> EngineExecutor<'a> { fn trace_mark_thunk_executed(&self, frame_id: Option, thunk: &Thunk) { if let (Some(frame_id), Some(tracer)) = (frame_id, self.tracer) { tracer - .borrow_mut() + .lock() + .expect("trace collector should not be poisoned") .mark_thunk_executed(frame_id, thunk.trace_target().as_str()); } } diff --git a/crates/taurus-core/src/runtime/functions/http.rs b/crates/taurus-core/src/runtime/functions/http.rs index 265567c..b76eb58 100644 --- a/crates/taurus-core/src/runtime/functions/http.rs +++ b/crates/taurus-core/src/runtime/functions/http.rs @@ -297,9 +297,10 @@ fn decode_response_payload(response: http::Response) -> Result(&text) { - return Ok(from_json_value(json)); - } + && let Ok(json) = serde_json::from_str::(&text) + { + return Ok(from_json_value(json)); + } return Ok(text.to_value()); } diff --git a/crates/taurus-core/src/runtime/remote/mod.rs b/crates/taurus-core/src/runtime/remote/mod.rs index f94c3ec..1cbd36d 100644 --- a/crates/taurus-core/src/runtime/remote/mod.rs +++ b/crates/taurus-core/src/runtime/remote/mod.rs @@ -16,7 +16,7 @@ pub struct RemoteExecution { } #[async_trait] -pub trait RemoteRuntime { +pub trait RemoteRuntime: Send + Sync { /// Execute a remote node invocation and return its resulting value. async fn execute_remote( &self, diff --git a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs index 5861964..d8546d3 100644 --- a/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs +++ b/crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs @@ -1,4 +1,7 @@ +use std::time::Duration; + use async_nats::Client; +use futures_lite::StreamExt; use prost::Message; use taurus_core::runtime::remote::{RemoteExecution, RemoteRuntime}; use taurus_core::types::errors::runtime_error::RuntimeError; @@ -10,6 +13,8 @@ pub struct NATSRemoteRuntime { client: Client, } +const REMOTE_EXECUTION_RESULT_TIMEOUT: Duration = Duration::from_secs(30); + impl NATSRemoteRuntime { pub fn new(client: Client) -> Self { NATSRemoteRuntime { client } @@ -29,12 +34,76 @@ impl RemoteRuntime for NATSRemoteRuntime { let payload = execution.request.encode_to_vec(); log::info!("Request Remote Runtime Execution with topic: : {}", topic); - let res = self.client.request(topic, payload.into()).await; - let message = match res { - Ok(r) => r, + let inbox = self.client.new_inbox(); + let mut sub = match self.client.subscribe(inbox.clone()).await { + Ok(sub) => sub, + Err(err) => { + log::error!( + "RemoteRuntimeException: failed to subscribe to NATS reply inbox: {}", + err + ); + return Err(RuntimeError::new( + "T-PROV-000001", + "RemoteRuntimeException", + "Failed to receive any response messages from a remote runtime.", + )); + } + }; + if let Err(err) = self + .client + .publish_with_reply(topic, inbox, payload.into()) + .await + { + log::error!( + "RemoteRuntimeException: failed to publish NATS request: {}", + err + ); + return Err(RuntimeError::new( + "T-PROV-000001", + "RemoteRuntimeException", + "Failed to receive any response messages from a remote runtime.", + )); + } + match tokio::time::timeout(REMOTE_EXECUTION_RESULT_TIMEOUT, self.client.flush()).await { + Ok(Ok(())) => {} + Ok(Err(err)) => { + log::error!( + "RemoteRuntimeException: failed to flush NATS request: {}", + err + ); + return Err(RuntimeError::new( + "T-PROV-000001", + "RemoteRuntimeException", + "Failed to receive any response messages from a remote runtime.", + )); + } + Err(err) => { + log::error!( + "RemoteRuntimeException: failed to flush NATS request before timeout: {}", + err + ); + return Err(RuntimeError::new( + "T-PROV-000001", + "RemoteRuntimeException", + "Failed to receive any response messages from a remote runtime.", + )); + } + } + + let message = match tokio::time::timeout(REMOTE_EXECUTION_RESULT_TIMEOUT, sub.next()).await + { + Ok(Some(message)) => message, + Ok(None) => { + log::error!("RemoteRuntimeException: NATS reply subscription closed"); + return Err(RuntimeError::new( + "T-PROV-000001", + "RemoteRuntimeException", + "Failed to receive any response messages from a remote runtime.", + )); + } Err(err) => { log::error!( - "RemoteRuntimeException: failed to handle NATS message: {}", + "RemoteRuntimeException: failed to receive NATS response before timeout: {}", err ); return Err(RuntimeError::new( diff --git a/crates/taurus/src/app/mod.rs b/crates/taurus/src/app/mod.rs index 768c189..552e3d9 100644 --- a/crates/taurus/src/app/mod.rs +++ b/crates/taurus/src/app/mod.rs @@ -51,9 +51,10 @@ pub async fn run() { if let Some(handle) = runtime_status_heartbeat_task.take() { handle.abort(); if let Err(err) = handle.await - && !err.is_cancelled() { - log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err); - } + && !err.is_cancelled() + { + log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err); + } } update_stopped_status(runtime_status_service.as_ref()).await; diff --git a/crates/taurus/src/app/worker.rs b/crates/taurus/src/app/worker.rs index e6b0c6a..1dc095e 100644 --- a/crates/taurus/src/app/worker.rs +++ b/crates/taurus/src/app/worker.rs @@ -116,7 +116,8 @@ async fn process_execution_message( engine, Some(nats_remote), Some(&respond_emitter), - ); + ) + .await; log::debug!( "Flow {} execution completed; no direct reply message published", flow_id @@ -157,7 +158,7 @@ struct FlowRunResult { runtime_usage: RuntimeUsage, } -fn execute_flow( +async fn execute_flow( execution_id: ExecutionId, flow: ExecutionFlow, engine: &ExecutionEngine, @@ -168,13 +169,15 @@ fn execute_flow( let start = Instant::now(); let flow_id = flow.flow_id; let input = flow.input_value.clone(); - let report = engine.execute_flow_with_execution_id_report( - execution_id, - flow, - remote, - respond_emitter, - true, - ); + let report = engine + .execute_flow_with_execution_id_report_async( + execution_id, + flow, + remote, + respond_emitter, + true, + ) + .await; let finished_at = now_unix_micros(); let duration_micros = start.elapsed().as_micros() as i64; @@ -334,14 +337,14 @@ mod tests { } } - #[test] - fn execute_flow_reports_microsecond_timestamps_and_duration() { + #[tokio::test] + async fn execute_flow_reports_microsecond_timestamps_and_duration() { let execution_id = ExecutionId::new_v4(); let fixture = load_fixture("flows/01_return_object.json"); let flow = execution_flow_from_fixture(fixture); let engine = ExecutionEngine::new(); - let run_result = execute_flow(execution_id, flow, &engine, None, None); + let run_result = execute_flow(execution_id, flow, &engine, None, None).await; println!( "started_at={} finished_at={} delta={} runtime_usage.duration={}", diff --git a/docs/errors.md b/docs/errors.md index 0d70622..088ec5f 100644 --- a/docs/errors.md +++ b/docs/errors.md @@ -30,6 +30,7 @@ This document is the canonical catalog for runtime error codes emitted by Taurus | `T-CORE-000103` | Compiler | Flow compilation failed because a `next` edge points to a missing node. | `next_node_id` references unknown node id. | `runtime/engine/compiler.rs` | | `T-CORE-000104` | Compiler | Flow compilation failed because a parameter is structurally incomplete. | Parameter has no value payload in IR. | `runtime/engine/compiler.rs` | | `T-CORE-000105` | Compiler | Flow compilation failed because a sub-flow parameter is missing its execution reference. | `sub_flow.execution_reference` is absent. | `runtime/engine/compiler.rs` | +| `T-CORE-000106` | Compiler | Flow compilation failed because a remote definition source does not contain a service name. | `definition_source` is `action.` after stripping the remote action prefix. | `runtime/engine/compiler.rs` | | `T-CORE-000107` | Engine | Function sub-flow execution failed because a required setting value is missing. | A non-optional sub-flow setting has no callback input value and no default value. | `runtime/engine/executor.rs` | | `T-CORE-000201` | Handler | Handler argument arity contract was violated before function execution began. | `args!`/`no_args!` macro expected different argument count. | `handler/macros.rs` | | `T-CORE-000202` | Handler | Handler argument type conversion failed during typed extraction. | `TryFromArgument` expected type does not match provided argument. | `handler/argument.rs` |