diff --git a/crates/sdk-core-c-bridge/Cargo.toml b/crates/sdk-core-c-bridge/Cargo.toml index 30df33eed..ea4e9d4d9 100644 --- a/crates/sdk-core-c-bridge/Cargo.toml +++ b/crates/sdk-core-c-bridge/Cargo.toml @@ -55,4 +55,5 @@ thiserror = { workspace = true } cbindgen = { version = "0.29", default-features = false } [features] +antithesis_assertions = ["temporalio-sdk-core/antithesis_assertions"] xz2-static = ["xz2/static"] diff --git a/crates/sdk-core/Cargo.toml b/crates/sdk-core/Cargo.toml index c57a32ef2..c4d6d964f 100644 --- a/crates/sdk-core/Cargo.toml +++ b/crates/sdk-core/Cargo.toml @@ -27,9 +27,11 @@ tokio-console = ["console-subscriber"] ephemeral-server = ["dep:flate2", "dep:reqwest", "dep:tar", "dep:zip"] debug-plugin = ["dep:reqwest"] test-utilities = ["dep:assert_matches", "dep:bimap"] +antithesis_assertions = ["dep:antithesis_sdk"] [dependencies] anyhow = "1.0" +antithesis_sdk = { version = "0.2.1", optional = true, default-features = false, features = ["full"] } assert_matches = { version = "1.5", optional = true } bimap = { version = "0.6.3", optional = true } async-trait = "0.1" diff --git a/crates/sdk-core/src/abstractions.rs b/crates/sdk-core/src/abstractions.rs index f246ab40a..d6f6956af 100644 --- a/crates/sdk-core/src/abstractions.rs +++ b/crates/sdk-core/src/abstractions.rs @@ -423,10 +423,23 @@ impl OwnedMeteredSemPermit { pub(crate) struct UsedMeteredSemPermit(#[allow(dead_code)] OwnedMeteredSemPermit); macro_rules! dbg_panic { - ($($arg:tt)*) => { - error!($($arg)*); - debug_assert!(false, $($arg)*); - }; + ($($arg:tt)*) => {{ + let message = format!($($arg)*); + error!("{}", message); + + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_always!( + false, + "dbg_panic invariant triggered", + ::serde_json::json!({ + "message": message, + "file": file!(), + "line": line!(), + "module": module_path!(), + }) + ); + debug_assert!(false, "{}", message); + }}; } pub(crate) use dbg_panic; diff --git a/crates/sdk-core/src/antithesis.rs b/crates/sdk-core/src/antithesis.rs new file mode 100644 index 000000000..750dd10b0 --- /dev/null +++ b/crates/sdk-core/src/antithesis.rs @@ -0,0 +1,60 @@ +//! Antithesis SDK integration for invariant testing. +//! +//! This module provides assertion macros that integrate with the Antithesis +//! testing platform to detect invariant violations during fuzz testing. + +use std::sync::OnceLock; + +/// Ensure Antithesis is initialized exactly once. +pub(crate) fn ensure_init() { + static INIT: OnceLock<()> = OnceLock::new(); + INIT.get_or_init(|| { + ::antithesis_sdk::antithesis_init(); + }); +} + +/// Assert that a condition is always true during Antithesis fuzz testing. +/// Use `false` as the condition to log an invariant violation. +macro_rules! assert_always { + ($condition:expr, $message:literal, $details:expr) => {{ + $crate::antithesis::ensure_init(); + let details: ::serde_json::Value = $details; + ::antithesis_sdk::assert_always!($condition, $message, &details); + }}; + ($condition:expr, $message:literal) => {{ + $crate::antithesis::ensure_init(); + ::antithesis_sdk::assert_always!($condition, $message); + }}; +} + +/// Assert that a condition is sometimes true during Antithesis fuzz testing. +/// This checks that the condition occurs at least once across the entire test session. +macro_rules! assert_sometimes { + ($condition:expr, $message:literal, $details:expr) => {{ + $crate::antithesis::ensure_init(); + let details: ::serde_json::Value = $details; + ::antithesis_sdk::assert_sometimes!($condition, $message, &details); + }}; + ($condition:expr, $message:literal) => {{ + $crate::antithesis::ensure_init(); + ::antithesis_sdk::assert_sometimes!($condition, $message); + }}; +} + +/// Assert that a code location is unreachable during Antithesis fuzz testing. +/// Use this for code paths that should never be reached (bugs, invariant violations). +macro_rules! assert_unreachable { + ($message:literal, $details:expr) => {{ + $crate::antithesis::ensure_init(); + let details: ::serde_json::Value = $details; + ::antithesis_sdk::assert_unreachable!($message, &details); + }}; + ($message:literal) => {{ + $crate::antithesis::ensure_init(); + ::antithesis_sdk::assert_unreachable!($message); + }}; +} + +pub(crate) use assert_always; +pub(crate) use assert_sometimes; +pub(crate) use assert_unreachable; diff --git a/crates/sdk-core/src/lib.rs b/crates/sdk-core/src/lib.rs index 1832f4a9c..663af8fd8 100644 --- a/crates/sdk-core/src/lib.rs +++ b/crates/sdk-core/src/lib.rs @@ -12,6 +12,8 @@ extern crate tracing; extern crate core; mod abstractions; +#[cfg(feature = "antithesis_assertions")] +mod antithesis; #[cfg(feature = "debug-plugin")] pub mod debug_client; #[cfg(feature = "ephemeral-server")] diff --git a/crates/sdk-core/src/retry_logic.rs b/crates/sdk-core/src/retry_logic.rs index b4eacbb4d..723ccd7c4 100644 --- a/crates/sdk-core/src/retry_logic.rs +++ b/crates/sdk-core/src/retry_logic.rs @@ -71,6 +71,15 @@ impl ValidatedRetryPolicy { application_failure: Option<&ApplicationFailureInfo>, ) -> Option { if self.maximum_attempts > 0 && attempt_number.get() >= self.maximum_attempts { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Retry maximum_attempts limit reached", + ::serde_json::json!({ + "attempt": attempt_number.get(), + "maximum_attempts": self.maximum_attempts + }) + ); return None; } @@ -78,6 +87,15 @@ impl ValidatedRetryPolicy { .map(|f| f.non_retryable) .unwrap_or_default(); if non_retryable { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Non-retryable application failure encountered", + ::serde_json::json!({ + "attempt": attempt_number.get(), + "error_type": application_failure.map(|f| &f.r#type) + }) + ); return None; } diff --git a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs index 03ae9627f..33ea7a963 100644 --- a/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs @@ -7,7 +7,7 @@ use super::{ use crate::{ abstractions::dbg_panic, internal_flags::CoreInternalFlags, - worker::workflow::{InternalFlagsRef, machines::HistEventData}, + worker::workflow::{InternalFlagsRef, fatal, machines::HistEventData, nondeterminism}, }; use std::convert::{TryFrom, TryInto}; use temporalio_common::protos::{ @@ -173,6 +173,16 @@ impl ActivityMachine { | ActivityMachineState::TimedOut(_) ) { // Ignore attempted cancels in terminal states + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_always!( + true, + "Activity cancel ignored in terminal state", + ::serde_json::json!({ + "seq": self.shared_state.attrs.seq, + "state": format!("{}", self.state()) + }) + ); + debug!( "Attempted to cancel already resolved activity (seq {})", self.shared_state.attrs.seq @@ -193,7 +203,10 @@ impl ActivityMachine { ActivityMachineCommand::Cancel(details) => { vec![self.create_cancelation_resolve(details).into()] } - x => panic!("Invalid cancel event response {x:?}"), + x => { + dbg_panic!("Invalid cancel event response {x:?}"); + panic!("Invalid cancel event response {x:?}"); + } }) .collect(); Ok(res) @@ -223,9 +236,7 @@ impl TryFrom for ActivityMachineEvents { last_task_in_history, }) } else { - return Err(WFMachinesError::Fatal(format!( - "Activity scheduled attributes were unset: {e}" - ))); + return Err(fatal!("Activity scheduled attributes were unset: {e}")); } } EventType::ActivityTaskStarted => Self::ActivityTaskStarted(e.event_id), @@ -236,9 +247,7 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskCompleted(attrs) } else { - return Err(WFMachinesError::Fatal(format!( - "Activity completion attributes were unset: {e}" - ))); + return Err(fatal!("Activity completion attributes were unset: {e}")); } } EventType::ActivityTaskFailed => { @@ -247,9 +256,7 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskFailed(attrs) } else { - return Err(WFMachinesError::Fatal(format!( - "Activity failure attributes were unset: {e}" - ))); + return Err(fatal!("Activity failure attributes were unset: {e}")); } } EventType::ActivityTaskTimedOut => { @@ -258,9 +265,7 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskTimedOut(attrs) } else { - return Err(WFMachinesError::Fatal(format!( - "Activity timeout attributes were unset: {e}" - ))); + return Err(fatal!("Activity timeout attributes were unset: {e}")); } } EventType::ActivityTaskCancelRequested => Self::ActivityTaskCancelRequested, @@ -270,15 +275,13 @@ impl TryFrom for ActivityMachineEvents { { Self::ActivityTaskCanceled(attrs) } else { - return Err(WFMachinesError::Fatal(format!( - "Activity cancellation attributes were unset: {e}" - ))); + return Err(fatal!("Activity cancellation attributes were unset: {e}")); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Activity machine does not handle this event: {e}" - ))); + )); } }) } @@ -375,18 +378,20 @@ impl ScheduleCommandCreated { sched_dat.last_task_in_history, ) { if sched_dat.act_id != dat.attrs.activity_id { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Activity id of scheduled event '{}' does not \ match activity id of activity command '{}'", - sched_dat.act_id, dat.attrs.activity_id - ))); + sched_dat.act_id, + dat.attrs.activity_id + )); } if sched_dat.act_type != dat.attrs.activity_type { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Activity type of scheduled event '{}' does not \ match activity type of activity command '{}'", - sched_dat.act_type, dat.attrs.activity_type - ))); + sched_dat.act_type, + dat.attrs.activity_type + )); } } dat.scheduled_event_id = sched_dat.event_id; @@ -427,6 +432,15 @@ impl ScheduledEventRecorded { dat: &mut SharedState, attrs: ActivityTaskTimedOutEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity timed out before starting", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "state": "ScheduledEventRecorded" + }) + ); notify_lang_activity_timed_out(dat, attrs) } @@ -434,12 +448,27 @@ impl ScheduledEventRecorded { self, dat: &mut SharedState, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity cancelled in scheduled state", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "cancellation_type": format!("{:?}", dat.cancellation_type) + }) + ); create_request_cancel_activity_task_command( dat, ScheduledActivityCancelCommandCreated::default(), ) } pub(super) fn on_abandoned(self) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity abandoned", + ::serde_json::json!({"mode": "Abandon"}) + ); notify_lang_activity_cancelled(None) } } @@ -452,6 +481,14 @@ impl Started { self, attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity completed successfully", + ::serde_json::json!({ + "has_result": attrs.result.is_some() + }) + ); ActivityMachineTransition::ok( vec![ActivityMachineCommand::Complete(attrs.result)], Completed::default(), @@ -462,6 +499,16 @@ impl Started { dat: &mut SharedState, attrs: ActivityTaskFailedEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity task failed", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "activity_type": dat.attrs.activity_type, + "retry_state": attrs.retry_state + }) + ); ActivityMachineTransition::ok( vec![ActivityMachineCommand::Fail(new_failure(dat, attrs))], Failed::default(), @@ -473,6 +520,15 @@ impl Started { dat: &mut SharedState, attrs: ActivityTaskTimedOutEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity timed out after starting", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "state": "Started" + }) + ); notify_lang_activity_timed_out(dat, attrs) } @@ -480,12 +536,27 @@ impl Started { self, dat: &mut SharedState, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity cancelled after started", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id, + "cancellation_type": format!("{:?}", dat.cancellation_type) + }) + ); create_request_cancel_activity_task_command( dat, StartedActivityCancelCommandCreated::default(), ) } pub(super) fn on_abandoned(self) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity abandoned in started state", + ::serde_json::json!({"state": "Started"}) + ); notify_lang_activity_cancelled(None) } } @@ -502,6 +573,14 @@ impl ScheduledActivityCancelEventRecorded { dat: &mut SharedState, attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity cancellation completed", + ::serde_json::json!({ + "has_details": attrs.details.is_some() + }) + ); notify_if_not_already_cancelled(dat, |_| notify_lang_activity_cancelled(Some(attrs))) } @@ -532,6 +611,14 @@ impl StartedActivityCancelEventRecorded { dat: &mut SharedState, attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity completed despite cancel request", + ::serde_json::json!({ + "cancellation_type": format!("{:?}", dat.cancellation_type) + }) + ); notify_if_not_already_cancelled(dat, |_| { TransitionResult::commands(vec![ActivityMachineCommand::Complete(attrs.result)]) }) @@ -541,6 +628,14 @@ impl StartedActivityCancelEventRecorded { dat: &mut SharedState, attrs: ActivityTaskFailedEventAttributes, ) -> ActivityMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Activity failed despite cancel request", + ::serde_json::json!({ + "cancellation_type": format!("{:?}", dat.cancellation_type) + }) + ); notify_if_not_already_cancelled(dat, |dat| { TransitionResult::commands(vec![ActivityMachineCommand::Fail(new_failure(dat, attrs))]) }) @@ -647,12 +742,21 @@ impl Canceled { ) -> ActivityMachineTransition { // Abandoned activities might start anyway. Ignore the result. if dat.cancellation_type == ActivityCancellationType::Abandon { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_always!( + true, + "Abandoned activity can start after cancellation", + ::serde_json::json!({ + "seq_num": seq_num, + "cancellation_type": "Abandon" + }) + ); TransitionResult::default() } else { - TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + TransitionResult::Err(nondeterminism!( "Non-Abandon cancel mode activities cannot be started after being cancelled. \ Seq: {seq_num:?}" - ))) + )) } } pub(super) fn on_activity_task_completed( @@ -662,11 +766,19 @@ impl Canceled { ) -> ActivityMachineTransition { // Abandoned activities might complete anyway. Ignore the result. if dat.cancellation_type == ActivityCancellationType::Abandon { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_always!( + true, + "Abandoned activity can complete after cancellation", + ::serde_json::json!({ + "activity_id": dat.attrs.activity_id + }) + ); TransitionResult::default() } else { - TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + TransitionResult::Err(nondeterminism!( "Non-Abandon cancel mode activities cannot be completed after being cancelled: {attrs:?}" - ))) + )) } } } @@ -792,9 +904,7 @@ fn convert_payloads( result: Option, ) -> Result, WFMachinesError> { result.map(TryInto::try_into).transpose().map_err(|pe| { - WFMachinesError::Fatal(format!( - "Not exactly one payload in activity result ({pe}) for event: {event_info:?}" - )) + fatal!("Not exactly one payload in activity result ({pe}) for event: {event_info:?}") }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs index 4e308e4a4..2250da8af 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ @@ -153,15 +153,15 @@ impl TryFrom for CancelExternalMachineEvents { if let Some(history_event::Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(attrs)) = e.attributes { Self::RequestCancelExternalWorkflowExecutionFailed(attrs.cause()) } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Cancelworkflow failed attributes were unset: {e}" - ))); + )); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Cancel external WF machine does not handle this event: {e}" - ))) + )) } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs index 898965b78..502030bbe 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs @@ -3,7 +3,7 @@ use super::{ workflow_machines::MachineResponse, }; use super::{StateMachine, TransitionResult, fsm}; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, nondeterminism, fatal}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_activation::ResolveCancelNexusOperation, @@ -76,9 +76,9 @@ impl TryFrom for CancelNexusOpMachineEvents { Ok(match e.event_type() { EventType::NexusOperationCancelRequested => Self::NexusOpCancelRequested, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Cancel external WF machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs index 5de44b996..30331c4e9 100644 --- a/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::CancelWorkflowExecution, @@ -67,9 +67,9 @@ impl TryFrom for CancelWorkflowMachineEvents { Ok(match EventType::try_from(e.event_type) { Ok(EventType::WorkflowExecutionCanceled) => Self::WorkflowExecutionCanceled, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Cancel workflow machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs index d87325141..e70a7ba76 100644 --- a/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs @@ -3,8 +3,9 @@ use super::{ WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; use crate::{ + abstractions::dbg_panic, internal_flags::CoreInternalFlags, - worker::workflow::{InternalFlagsRef, machines::HistEventData}, + worker::workflow::{InternalFlagsRef, fatal, machines::HistEventData, nondeterminism}, }; use itertools::Itertools; use std::{ @@ -133,10 +134,9 @@ pub(super) struct Cancelled { } fn completion_of_not_abandoned_err() -> WFMachinesError { - WFMachinesError::Nondeterminism( + nondeterminism!( "Child workflows which don't have the ABANDON cancellation type cannot complete after \ being cancelled." - .to_string(), ) } @@ -145,10 +145,9 @@ impl Cancelled { self, ) -> ChildWorkflowMachineTransition { if self.seen_cancelled_event { - ChildWorkflowMachineTransition::Err(WFMachinesError::Fatal( + ChildWorkflowMachineTransition::Err(fatal!( "Child workflow has already seen a ChildWorkflowExecutionCanceledEvent, and now \ another is being applied! This is a bug, please report." - .to_string(), )) } else { ChildWorkflowMachineTransition::ok( @@ -233,18 +232,20 @@ impl StartCommandCreated { event_dat.last_task_in_history, ) { if event_dat.wf_id != state.workflow_id { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Child workflow id of scheduled event '{}' does not \ match child workflow id of command '{}'", - event_dat.wf_id, state.workflow_id - ))); + event_dat.wf_id, + state.workflow_id + )); } if event_dat.wf_type != state.workflow_type { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Child workflow type of scheduled event '{}' does not \ match child workflow type of command '{}'", - event_dat.wf_type, state.workflow_type - ))); + event_dat.wf_type, + state.workflow_type + )); } } state.initiated_event_id = event_dat.event_id; @@ -496,7 +497,10 @@ impl ChildWorkflowMachine { | c @ ChildWorkflowCommand::IssueCancelAfterStarted { .. } => { self.adapt_response(c, None) } - x => panic!("Invalid cancel event response {x:?}"), + x => { + dbg_panic!("Invalid cancel event response {x:?}"); + panic!("Invalid cancel event response {x:?}"); + } }) .flatten_ok() .try_collect()?; @@ -529,8 +533,8 @@ impl TryFrom for ChildWorkflowMachineEvents { last_task_in_history, }) } else { - return Err(WFMachinesError::Fatal( - "StartChildWorkflowExecutionInitiated attributes were unset".to_string(), + return Err(fatal!( + "StartChildWorkflowExecutionInitiated attributes were unset" )); } } @@ -543,14 +547,12 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::StartChildWorkflowExecutionFailed( StartChildWorkflowExecutionFailedCause::try_from(cause).map_err(|_| { - WFMachinesError::Fatal( - "Invalid StartChildWorkflowExecutionFailedCause".to_string(), - ) + fatal!("Invalid StartChildWorkflowExecutionFailedCause") })?, ) } else { - return Err(WFMachinesError::Fatal( - "StartChildWorkflowExecutionFailed attributes were unset".to_string(), + return Err(fatal!( + "StartChildWorkflowExecutionFailed attributes were unset" )); } } @@ -569,9 +571,8 @@ impl TryFrom for ChildWorkflowMachineEvents { started_event_id: e.event_id, }) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!( "ChildWorkflowExecutionStarted attributes were unset or malformed" - .to_string(), )); } } @@ -584,9 +585,8 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionCompleted(result) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!( "ChildWorkflowExecutionCompleted attributes were unset or malformed" - .to_string(), )); } } @@ -597,9 +597,7 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionFailed(attrs) } else { - return Err(WFMachinesError::Fatal( - "ChildWorkflowExecutionFailed attributes were unset".to_string(), - )); + return Err(fatal!("ChildWorkflowExecutionFailed attributes were unset")); } } Ok(EventType::ChildWorkflowExecutionTimedOut) => { @@ -609,9 +607,8 @@ impl TryFrom for ChildWorkflowMachineEvents { { Self::ChildWorkflowExecutionTimedOut(atts.retry_state()) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!( "ChildWorkflowExecutionTimedOut attributes were unset or malformed" - .to_string(), )); } } @@ -620,9 +617,9 @@ impl TryFrom for ChildWorkflowMachineEvents { } Ok(EventType::ChildWorkflowExecutionCanceled) => Self::ChildWorkflowExecutionCancelled, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Child workflow machine does not handle this event: {e:?}" - ))); + )); } }) } @@ -769,9 +766,7 @@ fn convert_payloads( result: Option, ) -> Result, WFMachinesError> { result.map(TryInto::try_into).transpose().map_err(|pe| { - WFMachinesError::Fatal(format!( - "Not exactly one payload in child workflow result ({pe}) for event: {event_info:?}" - )) + fatal!("Not exactly one payload in child workflow result ({pe}) for event: {event_info:?}") }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs index cbdc9d015..2d6579b03 100644 --- a/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs @@ -2,7 +2,10 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::{ + abstractions::dbg_panic, + worker::workflow::{machines::HistEventData, nondeterminism}, +}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::CompleteWorkflowExecution, @@ -41,7 +44,10 @@ pub(super) fn complete_workflow(attribs: CompleteWorkflowExecution) -> NewMachin .pop() { Some(CompleteWFCommand::AddCommand(c)) => c, - _ => panic!("complete wf machine on_schedule must produce command"), + unexpected => { + dbg_panic!("complete wf machine on_schedule must produce command: {unexpected:?}"); + panic!("complete wf machine on_schedule must produce command"); + } }; NewMachineWithCommand { command: add_cmd, @@ -57,9 +63,9 @@ impl TryFrom for CompleteWorkflowMachineEvents { Ok(match e.event_type() { EventType::WorkflowExecutionCompleted => Self::WorkflowExecutionCompleted, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Complete workflow machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs index becf3b19e..148203457 100644 --- a/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, MachineResponse, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::ContinueAsNewWorkflowExecution, @@ -73,9 +73,9 @@ impl TryFrom for ContinueAsNewWorkflowMachineEvents { Ok(match e.event_type() { EventType::WorkflowExecutionContinuedAsNew => Self::WorkflowExecutionContinuedAsNew, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Continue as new workflow machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs index b61009eb2..290ee9278 100644 --- a/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::workflow_commands::FailWorkflowExecution, @@ -84,9 +84,9 @@ impl TryFrom for FailWorkflowMachineEvents { Ok(match e.event_type() { EventType::WorkflowExecutionFailed => Self::WorkflowExecutionFailed, _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Fail workflow machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index ef8873533..6a12e11dd 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -8,8 +8,9 @@ use crate::{ worker::{ LocalActivityExecutionResult, workflow::{ - InternalFlagsRef, + InternalFlagsRef, fatal, machines::{HistEventData, activity_state_machine::activity_fail_info}, + nondeterminism, }, }, }; @@ -162,8 +163,8 @@ pub(super) fn new_local_activity( } } else { if maybe_pre_resolved.is_some() { - return Err(WFMachinesError::Nondeterminism( - "Local activity cannot be created as pre-resolved while not replaying".to_string(), + return Err(nondeterminism!( + "Local activity cannot be created as pre-resolved while not replaying" )); } Executing {}.into() @@ -210,10 +211,10 @@ impl LocalActivityMachine { LocalActivityMachineState::ResultNotified(_) => Ok(false), LocalActivityMachineState::WaitingMarkerEvent(_) => Ok(true), LocalActivityMachineState::WaitingMarkerEventPreResolved(_) => Ok(true), - _ => Err(WFMachinesError::Fatal(format!( + _ => Err(fatal!( "Attempted to check for LA marker handling in invalid state {}", self.state() - ))), + )), } } @@ -241,12 +242,12 @@ impl LocalActivityMachine { let mut res = OnEventWrapper::on_event_mut(self, LocalActivityMachineEvents::StartedNonReplayWFT) .map_err(|e| match e { - MachineError::InvalidTransition => WFMachinesError::Fatal(format!( + MachineError::InvalidTransition => fatal!( "Invalid transition while notifying local activity (seq {})\ of non-replay-wft-started in {}", self.shared_state.attrs.seq, self.state(), - )), + ), MachineError::Underlying(e) => e, })?; let res = res.pop().expect("Always produces one response"); @@ -295,12 +296,12 @@ impl LocalActivityMachine { LocalActivityMachineEvents::HandleResult(dat) }; let res = OnEventWrapper::on_event_mut(self, evt).map_err(|e| match e { - MachineError::InvalidTransition => WFMachinesError::Fatal(format!( + MachineError::InvalidTransition => fatal!( "Invalid transition resolving local activity (seq {}, from marker: {}) in {}", self.shared_state.attrs.seq, from_marker, self.state(), - )), + ), MachineError::Underlying(e) => e, })?; @@ -542,16 +543,16 @@ impl ResultNotified { dat: CompleteLocalActivityData, ) -> LocalActivityMachineTransition { if self.result_type == ResultType::Completed && dat.result.is_err() { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Local activity (seq {}) completed successfully locally, but history said \ it failed!", shared.attrs.seq - ))); + )); } else if self.result_type == ResultType::Failed && dat.result.is_ok() { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Local activity (seq {}) failed locally, but history said it completed!", shared.attrs.seq - ))); + )); } verify_marker_dat!(shared, &dat, TransitionResult::default()) } @@ -811,15 +812,15 @@ impl TryFrom for LocalActivityMachineEvents { fn try_from(e: HistEventData) -> Result { let e = e.event; if e.event_type() != EventType::MarkerRecorded { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Local activity machine cannot handle this event: {e}" - ))); + )); } match e.into_local_activity_marker_details() { Some(marker_dat) => Ok(LocalActivityMachineEvents::MarkerRecorded(marker_dat)), - _ => Err(WFMachinesError::Nondeterminism( - "Local activity machine encountered an unparsable marker".to_string(), + _ => Err(nondeterminism!( + "Local activity machine encountered an unparsable marker" )), } } @@ -830,11 +831,12 @@ fn verify_marker_data_matches( dat: &CompleteLocalActivityData, ) -> Result<(), WFMachinesError> { if shared.attrs.seq != dat.marker_dat.seq { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Local activity marker data has sequence number {} but matched against LA \ command with sequence number {}", - dat.marker_dat.seq, shared.attrs.seq - ))); + dat.marker_dat.seq, + shared.attrs.seq + )); } // Here we use whether or not we were replaying when we _first invoked_ the LA, because we // are always replaying when we see the marker recorded event, and that would make this check @@ -844,18 +846,20 @@ fn verify_marker_data_matches( !shared.replaying_when_invoked, ) { if dat.marker_dat.activity_id != shared.attrs.activity_id { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Activity id of recorded marker '{}' does not \ match activity id of local activity command '{}'", - dat.marker_dat.activity_id, shared.attrs.activity_id - ))); + dat.marker_dat.activity_id, + shared.attrs.activity_id + )); } if dat.marker_dat.activity_type != shared.attrs.activity_type { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Activity type of recorded marker '{}' does not \ match activity type of local activity command '{}'", - dat.marker_dat.activity_type, shared.attrs.activity_type - ))); + dat.marker_dat.activity_type, + shared.attrs.activity_type + )); } } diff --git a/crates/sdk-core/src/worker/workflow/machines/mod.rs b/crates/sdk-core/src/worker/workflow/machines/mod.rs index d46f5b3d9..bc5e3fcb9 100644 --- a/crates/sdk-core/src/worker/workflow/machines/mod.rs +++ b/crates/sdk-core/src/worker/workflow/machines/mod.rs @@ -26,7 +26,10 @@ mod transition_coverage; pub(crate) use temporalio_common::fsm_trait::MachineError; pub(crate) use workflow_machines::{MachinesWFTResponseContent, WorkflowMachines}; -use crate::{telemetry::VecDisplayer, worker::workflow::WFMachinesError}; +use crate::{ + telemetry::VecDisplayer, + worker::workflow::{WFMachinesError, fatal, nondeterminism}, +}; use activity_state_machine::ActivityMachine; use cancel_external_state_machine::CancelExternalMachine; use cancel_workflow_state_machine::CancelWorkflowMachine; @@ -127,21 +130,19 @@ where if let Ok(converted_command) = command_type.try_into() { match OnEventWrapper::on_event_mut(self, converted_command) { Ok(c) => process_machine_commands(self, c, None), - Err(MachineError::InvalidTransition) => { - Err(WFMachinesError::Nondeterminism(format!( - "Unexpected command producing an invalid transition {:?} in state {}", - command_type, - self.state() - ))) - } + Err(MachineError::InvalidTransition) => Err(nondeterminism!( + "Unexpected command producing an invalid transition {:?} in state {}", + command_type, + self.state() + )), Err(MachineError::Underlying(e)) => Err(e.into()), } } else { - Err(WFMachinesError::Nondeterminism(format!( + Err(nondeterminism!( "Unexpected command {:?} generated by a {:?} machine", command_type, self.name() - ))) + )) } } @@ -163,12 +164,12 @@ where match OnEventWrapper::on_event_mut(self, converted_event) { Ok(c) => process_machine_commands(self, c, Some(event_info)), - Err(MachineError::InvalidTransition) => Err(WFMachinesError::Fatal(format!( + Err(MachineError::InvalidTransition) => Err(fatal!( "{} in state {} says the transition is invalid during event {:?}", self.name(), self.state(), event_info - ))), + )), Err(MachineError::Underlying(e)) => Err(e.into()), } } diff --git a/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs index e15cbced8..b8c744982 100644 --- a/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs @@ -5,6 +5,7 @@ use crate::worker::workflow::{ EventInfo, HistEventData, NewMachineWithCommand, OnEventWrapper, WFMachinesAdapter, workflow_machines::MachineResponse, }, + nondeterminism, }; use itertools::Itertools; use temporalio_common::protos::{ @@ -392,10 +393,9 @@ pub(super) struct TimedOut; pub(super) struct Cancelled; fn completion_of_not_abandoned_err() -> WFMachinesError { - WFMachinesError::Nondeterminism( + nondeterminism!( "Nexus operation which don't have the ABANDON cancellation type cannot complete after \ being cancelled." - .to_string(), ) } @@ -455,8 +455,8 @@ impl TryFrom for NexusOperationMachineEvents { event_id: e.event_id, }) } else { - return Err(WFMachinesError::Nondeterminism( - "NexusOperationScheduled attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationScheduled attributes were unset or malformed" )); } } @@ -466,8 +466,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationStarted(sa) } else { - return Err(WFMachinesError::Nondeterminism( - "NexusOperationStarted attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationStarted attributes were unset or malformed" )); } } @@ -477,8 +477,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCompleted(ca) } else { - return Err(WFMachinesError::Nondeterminism( - "NexusOperationCompleted attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationCompleted attributes were unset or malformed" )); } } @@ -488,8 +488,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationFailed(fa) } else { - return Err(WFMachinesError::Nondeterminism( - "NexusOperationFailed attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationFailed attributes were unset or malformed" )); } } @@ -499,8 +499,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCanceled(ca) } else { - return Err(WFMachinesError::Nondeterminism( - "NexusOperationCanceled attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationCanceled attributes were unset or malformed" )); } } @@ -510,8 +510,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationTimedOut(toa) } else { - return Err(WFMachinesError::Nondeterminism( - "NexusOperationTimedOut attributes were unset or malformed".to_string(), + return Err(nondeterminism!( + "NexusOperationTimedOut attributes were unset or malformed" )); } } @@ -525,9 +525,8 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCancelRequestCompleted(attrs) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!( "NexusOperationCancelRequestCompleted attributes were unset or malformed" - .to_string(), )); } } @@ -540,16 +539,15 @@ impl TryFrom for NexusOperationMachineEvents { { Self::NexusOperationCancelRequestFailed(attrs) } else { - return Err(WFMachinesError::Nondeterminism( + return Err(nondeterminism!( "NexusOperationCancelRequestFailed attributes were unset or malformed" - .to_string(), )); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Nexus operation machine does not handle this event: {e:?}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs index 94105f7a3..6c6f14821 100644 --- a/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs @@ -25,10 +25,11 @@ use crate::{ internal_flags::CoreInternalFlags, protosext::HistoryEventExt, worker::workflow::{ - InternalFlagsRef, + InternalFlagsRef, fatal, machines::{ HistEventData, upsert_search_attributes_state_machine::MAX_SEARCH_ATTR_PAYLOAD_SIZE, }, + nondeterminism, }, }; use anyhow::Context; @@ -129,7 +130,7 @@ pub(super) fn has_change<'a>( let mut serialized = all_ids .as_json_payload() .context("Could not serialize search attribute value for patch machine") - .map_err(|e| WFMachinesError::Fatal(e.to_string()))?; + .map_err(|e| fatal!("{}", e))?; if serialized.data.len() >= MAX_SEARCH_ATTR_PAYLOAD_SIZE { warn!( @@ -219,10 +220,11 @@ impl Notified { id: String, ) -> PatchMachineTransition { if id != dat.patch_id { - return TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + return TransitionResult::Err(nondeterminism!( "Change id {} does not match expected id {}", - id, dat.patch_id - ))); + id, + dat.patch_id + )); } TransitionResult::default() } @@ -256,9 +258,9 @@ impl TryFrom for PatchMachineEvents { let e = e.event; match e.get_patch_marker_details() { Some((id, _)) => Ok(Self::MarkerRecorded(id)), - _ => Err(WFMachinesError::Nondeterminism(format!( + _ => Err(nondeterminism!( "Change machine cannot handle this event: {e}" - ))), + )), } } } diff --git a/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs index db128d5ed..548c198b3 100644 --- a/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs @@ -2,7 +2,7 @@ use super::{ EventInfo, MachineError, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ @@ -70,8 +70,8 @@ pub(super) fn new_external_signal( ) -> Result { let (workflow_execution, only_child) = match attrs.target { None => { - return Err(WFMachinesError::Fatal( - "Signal external workflow command had empty target field".to_string(), + return Err(fatal!( + "Signal external workflow command had empty target field" )); } Some(sig_we::Target::ChildWorkflowId(wfid)) => ( @@ -195,15 +195,13 @@ impl TryFrom for SignalExternalMachineEvents { { Self::SignalExternalWorkflowExecutionFailed(attrs.cause()) } else { - return Err(WFMachinesError::Fatal(format!( - "Signal workflow failed attributes were unset: {e}" - ))); + return Err(fatal!("Signal workflow failed attributes were unset: {e}")); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Signal external WF machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs index 32fd70702..f193fd41d 100644 --- a/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs @@ -4,7 +4,7 @@ use super::{ EventInfo, MachineError, NewMachineWithCommand, OnEventWrapper, StateMachine, TransitionResult, WFMachinesAdapter, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::{WFMachinesError, machines::HistEventData}; +use crate::worker::workflow::{WFMachinesError, fatal, machines::HistEventData, nondeterminism}; use std::convert::TryFrom; use temporalio_common::protos::{ coresdk::{ @@ -120,15 +120,13 @@ impl TryFrom for TimerMachineEvents { { Self::TimerFired(attrs) } else { - return Err(WFMachinesError::Fatal(format!( - "Timer fired attribs were unset: {e}" - ))); + return Err(fatal!("Timer fired attribs were unset: {e}")); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Timer machine does not handle this event: {e}" - ))); + )); } }) } @@ -192,6 +190,14 @@ impl StartCommandCreated { pub(super) fn on_cancel(self, dat: &mut SharedState) -> TimerMachineTransition { dat.cancelled_before_sent = true; + + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Timer cancelled before sent to server", + ::serde_json::json!({"timer_seq": dat.attrs.seq}) + ); + TransitionResult::default() } } @@ -208,10 +214,11 @@ impl StartCommandRecorded { if dat.attrs.seq.to_string() == attrs.timer_id { TransitionResult::ok(vec![TimerMachineCommand::Complete], Fired::default()) } else { - TransitionResult::Err(WFMachinesError::Nondeterminism(format!( + TransitionResult::Err(nondeterminism!( "Timer fired event did not have expected timer id {}, it was {}!", - dat.attrs.seq, attrs.timer_id - ))) + dat.attrs.seq, + attrs.timer_id + )) } } @@ -219,6 +226,13 @@ impl StartCommandRecorded { self, dat: &mut SharedState, ) -> TimerMachineTransition { + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_sometimes!( + true, + "Timer cancelled after started", + ::serde_json::json!({"timer_seq": dat.attrs.seq}) + ); + TransitionResult::ok( vec![TimerMachineCommand::IssueCancelCmd( CancelTimer { seq: dat.attrs.seq }.into(), diff --git a/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs index 998e318ac..b3222355b 100644 --- a/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs @@ -4,7 +4,11 @@ use super::{ }; use crate::{ protosext::protocol_messages::UpdateRequest, - worker::workflow::machines::{HistEventData, NewMachineWithResponse}, + worker::workflow::{ + fatal, + machines::{HistEventData, NewMachineWithResponse}, + nondeterminism, + }, }; use itertools::Itertools; use prost::EncodeError; @@ -121,10 +125,10 @@ impl UpdateMachine { ) -> Result, WFMachinesError> { let cmds = match resp.response { None => { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Update response for update {} had an empty result, this is a lang layer bug.", &self.shared_state.meta.update_id - ))); + )); } Some(update_response::Response::Accepted(_)) => { self.on_event(UpdateMachineEvents::Accept) @@ -137,11 +141,11 @@ impl UpdateMachine { } } .map_err(|e| match e { - MachineError::InvalidTransition => WFMachinesError::Nondeterminism(format!( + MachineError::InvalidTransition => nondeterminism!( "Invalid transition while handling update response (id {}) in state {}", &self.shared_state.meta.update_id, self.state(), - )), + ), MachineError::Underlying(e) => e, })?; cmds.into_iter() @@ -174,9 +178,9 @@ impl UpdateMachine { outgoing_id: String, msg: UpdateMsg, ) -> Result { - let accept_body = msg.pack().map_err(|e| { - WFMachinesError::Fatal(format!("Failed to serialize update response: {e:?}")) - })?; + let accept_body = msg + .pack() + .map_err(|e| fatal!("Failed to serialize update response: {e:?}"))?; Ok(MachineResponse::IssueNewMessage(ProtocolMessage { id: outgoing_id.clone(), protocol_instance_id: self.shared_state.instance_id.clone(), @@ -223,9 +227,9 @@ impl TryFrom for UpdateMachineEvents { UpdateMachineEvents::WorkflowExecutionUpdateCompleted } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Update machine does not handle this event: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 23411f008..dd7ba43f2 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -25,7 +25,7 @@ use crate::{ workflow::{ CommandID, DrivenWorkflow, HistoryUpdate, InternalFlagsRef, LocalResolution, OutgoingJob, RunBasics, WFCommand, WFCommandVariant, WFMachinesError, - WorkflowStartedInfo, + WorkflowStartedInfo, fatal, history_update::NextWFT, machines::{ HistEventData, activity_state_machine::ActivityMachine, @@ -35,6 +35,7 @@ use crate::{ update_state_machine::UpdateMachine, upsert_search_attributes_state_machine::upsert_search_attrs_internal, }, + nondeterminism, }, }, }; @@ -246,11 +247,11 @@ macro_rules! cancel_machine { let machine = if let Machines::$machine_variant(m) = $self.machine_mut(m_key) { m } else { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Machine was not a {} when it should have been during cancellation: {:?}", stringify!($machine_variant), $cmd_id - ))); + )); }; let machine_resps = machine.$cancel_method($($args),*)?; $self.process_machine_responses(m_key, machine_resps)? @@ -366,10 +367,10 @@ impl WorkflowMachines { } self.process_machine_responses(mk, resps)?; } else { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Command matching activity with seq num {seq} existed but was not a \ local activity!" - ))); + )); } self.local_activity_data.done_executing(seq); } @@ -573,10 +574,9 @@ impl WorkflowMachines { (evts, has_final_event) } NextWFT::NeedFetch => { - return Err(WFMachinesError::Fatal( + return Err(fatal!( "Need to fetch history events to continue applying workflow task, but this \ should be prevented ahead of time! This is a Core SDK bug." - .to_string(), )); } }; @@ -649,10 +649,11 @@ impl WorkflowMachines { while let Some(event) = history.next() { let eid = event.event_id; if eid != self.last_processed_event + 1 { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "History is out of order. Last processed event: {}, event id: {}", - self.last_processed_event, eid - ))); + self.last_processed_event, + eid + )); } let next_event = history.peek(); @@ -782,9 +783,7 @@ impl WorkflowMachines { self.local_activity_data.insert_peeked_marker(la_dat); } } else { - return Err(WFMachinesError::Fatal(format!( - "Local activity marker was unparsable: {e:?}" - ))); + return Err(fatal!("Local activity marker was unparsable: {e:?}")); } } else if let Some( history_event::Attributes::WorkflowExecutionUpdateAcceptedEventAttributes(ref atts), @@ -856,10 +855,9 @@ impl WorkflowMachines { let are_more_events = next_event.is_some() || !event_dat.current_task_is_last_in_history; return if are_more_events { - Err(WFMachinesError::Fatal( + Err(fatal!( "Machines were fed a history which has an event after workflow execution was \ terminated!" - .to_string(), )) } else { Ok(EventHandlingOutcome::Normal) @@ -867,9 +865,9 @@ impl WorkflowMachines { } if event.event_type() == EventType::Unspecified || event.attributes.is_none() { return if !event.worker_may_ignore { - Err(WFMachinesError::Fatal(format!( + Err(fatal!( "Event type is unspecified! This history is invalid. Event detail: {event:?}" - ))) + )) } else { debug!("Event is ignorable"); Ok(EventHandlingOutcome::SkipEvent { @@ -894,10 +892,10 @@ impl WorkflowMachines { .machines_by_event_id .get(&initial_cmd_id) .ok_or_else(|| { - WFMachinesError::Nondeterminism(format!( + nondeterminism!( "During event handling, this event had an initial command ID but we \ could not find a matching command for it: {event:?}" - )) + ) })?; self.submachine_handle_event(*mkey, event_dat)?; } else { @@ -924,9 +922,9 @@ impl WorkflowMachines { let event = &event_dat.event; if event.is_local_activity_marker() { - let deets = event.extract_local_activity_marker_data().ok_or_else(|| { - WFMachinesError::Fatal(format!("Local activity marker was unparsable: {event:?}")) - })?; + let deets = event + .extract_local_activity_marker_data() + .ok_or_else(|| fatal!("Local activity marker was unparsable: {event:?}"))?; let cmdid = CommandID::LocalActivity(deets.seq); let mkey = self.get_machine_key(cmdid)?; if let Machines::LocalActivityMachine(lam) = self.machine(mkey) { @@ -935,10 +933,10 @@ impl WorkflowMachines { return Ok(EventHandlingOutcome::Normal); } } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Encountered local activity marker but the associated machine was of the \ wrong type! {event:?}" - ))); + )); } } @@ -959,9 +957,7 @@ impl WorkflowMachines { let command = if let Some(c) = maybe_command { c } else { - return Err(WFMachinesError::Nondeterminism(format!( - "No command scheduled for event {event}" - ))); + return Err(nondeterminism!("No command scheduled for event {event}")); }; let canceled_before_sent = self @@ -1008,9 +1004,9 @@ impl WorkflowMachines { attrs, ); } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "WorkflowExecutionStarted event did not have appropriate attributes: {event_dat}" - ))); + )); } } Ok(EventType::WorkflowTaskScheduled) => { @@ -1044,9 +1040,9 @@ impl WorkflowMachines { } } _ => { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "The event is not a non-stateful event, but we tried to handle it as one: {event_dat}" - ))); + )); } } Ok(()) @@ -1082,13 +1078,12 @@ impl WorkflowMachines { match message.body { IncomingProtocolMessageBody::UpdateRequest(ur) => { - let seq_id = if let SequencingId::EventId(eid) = message - .sequencing_id - .ok_or_else(|| WFMachinesError::Fatal(SEQIDERR.to_string()))? + let seq_id = if let SequencingId::EventId(eid) = + message.sequencing_id.ok_or_else(|| fatal!("{SEQIDERR}"))? { eid } else { - return Err(WFMachinesError::Fatal(SEQIDERR.to_string())); + return Err(fatal!("{SEQIDERR}")); }; let um = UpdateMachine::init( message.id, @@ -1213,9 +1208,9 @@ impl WorkflowMachines { ); } c => { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "A machine requested to create a new command of an unsupported type: {c:?}" - ))); + )); } }, MachineResponse::IssueFakeLocalActivityMarker(seq) => { @@ -1360,9 +1355,7 @@ impl WorkflowMachines { let seq = attrs.seq; let attrs: ValidScheduleLA = ValidScheduleLA::from_schedule_la(attrs, cmd.metadata).map_err(|e| { - WFMachinesError::Fatal(format!( - "Invalid schedule local activity request (seq {seq}): {e}" - )) + fatal!("Invalid schedule local activity request (seq {seq}): {e}") })?; let (la, mach_resp) = new_local_activity( attrs, @@ -1470,10 +1463,7 @@ impl WorkflowMachines { } WFCommandVariant::RequestCancelExternalWorkflow(attrs) => { let we = attrs.workflow_execution.ok_or_else(|| { - WFMachinesError::Fatal( - "Cancel external workflow command had no workflow_execution field" - .to_string(), - ) + fatal!("Cancel external workflow command had no workflow_execution field") })?; self.add_cmd_to_wf_task( new_external_cancel( @@ -1521,11 +1511,11 @@ impl WorkflowMachines { let m = if let Machines::UpdateMachine(m) = self.machine_mut(m_key) { m } else { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Tried to handle an update response for \ update with instance id {} but it was not found!", &ur.protocol_instance_id - ))); + )); }; let resps = m.handle_response(ur)?; self.process_machine_responses(m_key, resps)?; @@ -1553,9 +1543,10 @@ impl WorkflowMachines { } fn get_machine_key(&self, id: CommandID) -> Result { - Ok(*self.id_to_machine.get(&id).ok_or_else(|| { - WFMachinesError::Nondeterminism(format!("Missing associated machine for {id:?}")) - })?) + Ok(*self + .id_to_machine + .get(&id) + .ok_or_else(|| nondeterminism!("Missing associated machine for {id:?}"))?) } fn get_machine_by_msg(&self, protocol_instance_id: &str) -> Result { @@ -1563,9 +1554,7 @@ impl WorkflowMachines { .machines_by_protocol_instance_id .get(protocol_instance_id) .ok_or_else(|| { - WFMachinesError::Fatal(format!( - "Missing associated machine for protocol message {protocol_instance_id}" - )) + fatal!("Missing associated machine for protocol message {protocol_instance_id}") })?) } @@ -1758,10 +1747,10 @@ fn patch_marker_handling( debug!("Deprecated patch marker tried against non-patch machine, skipping."); skip_one_or_two_events(next_event) } else { - Err(WFMachinesError::Nondeterminism(format!( + Err(nondeterminism!( "Non-deprecated patch marker encountered for change {patch_name}, but there is \ no corresponding change command!" - ))) + )) } } } else if patch_machine.is_some() { diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs index 2ad275e30..f82d3008b 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs @@ -4,7 +4,7 @@ use super::{ EventInfo, StateMachine, TransitionResult, WFMachinesAdapter, WFMachinesError, fsm, workflow_machines::MachineResponse, }; -use crate::worker::workflow::machines::HistEventData; +use crate::worker::workflow::{fatal, machines::HistEventData, nondeterminism}; use std::{ convert::{TryFrom, TryInto}, time::SystemTime, @@ -67,10 +67,9 @@ impl WFMachinesAdapter for WorkflowTaskMachine { let (event_id, event_type) = if let Some(ei) = event_info { (ei.event_id, ei.event_type) } else { - return Err(WFMachinesError::Fatal( + return Err(fatal!( "WF Task machine should never issue a task started trigger \ command in response to non-history events" - .to_string(), )); }; @@ -102,16 +101,15 @@ impl TryFrom for WorkflowTaskMachineEvents { match time.try_into() { Ok(t) => t, Err(_) => { - return Err(WFMachinesError::Fatal( + return Err(fatal!( "Workflow task started event timestamp was inconvertible" - .to_string(), )); } } } else { - return Err(WFMachinesError::Fatal(format!( + return Err(fatal!( "Workflow task started event must contain timestamp: {e}" - ))); + )); }; WFTStartedDat { started_event_id: e.event_id, @@ -137,15 +135,13 @@ impl TryFrom for WorkflowTaskMachineEvents { }, }) } else { - return Err(WFMachinesError::Fatal(format!( - "Workflow task failed is missing attributes: {e}" - ))); + return Err(fatal!("Workflow task failed is missing attributes: {e}")); } } _ => { - return Err(WFMachinesError::Nondeterminism(format!( + return Err(nondeterminism!( "Event does not apply to a wf task machine: {e}" - ))); + )); } }) } diff --git a/crates/sdk-core/src/worker/workflow/managed_run.rs b/crates/sdk-core/src/worker/workflow/managed_run.rs index 79cb5c55c..7d10b6d6c 100644 --- a/crates/sdk-core/src/worker/workflow/managed_run.rs +++ b/crates/sdk-core/src/worker/workflow/managed_run.rs @@ -243,10 +243,10 @@ impl ManagedRun { let r = self.wfm.get_next_activation()?; if r.jobs.is_empty() { return Err(RunUpdateErr { - source: WFMachinesError::Fatal(format!( + source: crate::worker::workflow::fatal!( "Machines created for {} with no jobs", self.wfm.machines.run_id - )), + ), complete_resp_chan: None, }); } diff --git a/crates/sdk-core/src/worker/workflow/mod.rs b/crates/sdk-core/src/worker/workflow/mod.rs index 61253b53d..31f86f430 100644 --- a/crates/sdk-core/src/worker/workflow/mod.rs +++ b/crates/sdk-core/src/worker/workflow/mod.rs @@ -1433,7 +1433,79 @@ pub(crate) enum WFMachinesError { Fatal(String), } +/// Helper macro to create Nondeterminism errors with automatic assertion +/// +/// Usage: `nondeterminism!("Activity id mismatch: {} vs {}", id1, id2)` +macro_rules! nondeterminism { + ($($arg:tt)*) => {{ + let msg = format!($($arg)*); + #[cfg(feature = "antithesis_assertions")] + $crate::antithesis::assert_unreachable!( + "Nondeterminism error detected", + ::serde_json::json!({ + "error_message": &msg, + "error_type": "Nondeterminism", + "location": format!("{}:{}", file!(), line!()) + }) + ); + WFMachinesError::Nondeterminism(msg) + }}; +} +pub(crate) use nondeterminism; + +/// Helper macro to create Fatal errors with automatic assertion +/// +/// Usage: `fatal!("Invalid state transition: {:?}", state)` +macro_rules! fatal { + ($($arg:tt)*) => {{ + let msg = format!($($arg)*); + #[cfg(feature = "antithesis_assertions")] + $crate::antithesis::assert_unreachable!( + "Fatal error detected", + ::serde_json::json!({ + "error_message": &msg, + "error_type": "Fatal", + "location": format!("{}:{}", file!(), line!()) + }) + ); + WFMachinesError::Fatal(msg) + }}; +} +pub(crate) use fatal; + impl WFMachinesError { + /// Create a new Nondeterminism error with assertion instrumentation + pub(crate) fn nondeterminism(message: impl Into) -> Self { + let msg = message.into(); + + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_unreachable!( + "Nondeterminism error detected", + ::serde_json::json!({ + "error_message": msg, + "error_type": "Nondeterminism" + }) + ); + + WFMachinesError::Nondeterminism(msg) + } + + /// Create a new Fatal error with assertion instrumentation + pub(crate) fn fatal(message: impl Into) -> Self { + let msg = message.into(); + + #[cfg(feature = "antithesis_assertions")] + crate::antithesis::assert_unreachable!( + "Fatal error detected", + ::serde_json::json!({ + "error_message": msg, + "error_type": "Fatal" + }) + ); + + WFMachinesError::Fatal(msg) + } + fn evict_reason(&self) -> EvictionReason { match self { WFMachinesError::Nondeterminism(_) => EvictionReason::Nondeterminism, @@ -1459,7 +1531,7 @@ impl From> for WFMachinesError { match v { MachineError::InvalidTransition => { // TODO: Get states back - WFMachinesError::Nondeterminism("Invalid transition in state machine".to_string()) + WFMachinesError::nondeterminism("Invalid transition in state machine") } MachineError::Underlying(e) => e, } @@ -1468,13 +1540,13 @@ impl From> for WFMachinesError { impl From for WFMachinesError { fn from(_: TimestampError) -> Self { - Self::Fatal("Could not decode timestamp".to_string()) + Self::fatal("Could not decode timestamp") } } impl From for WFMachinesError { fn from(value: anyhow::Error) -> Self { - WFMachinesError::Fatal(value.to_string()) + WFMachinesError::fatal(value.to_string()) } } diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml index 29a5d93e5..acab33f0d 100644 --- a/crates/sdk/Cargo.toml +++ b/crates/sdk/Cargo.toml @@ -42,5 +42,9 @@ version = "0.1" path = "../client" version = "0.1" +[features] +default = [] +antithesis_assertions = ["temporalio-sdk-core/antithesis_assertions"] + [lints] workspace = true