From cf5498197b8a2253c84bcd3398f68259e161f580 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 20 Nov 2025 14:45:27 -0800 Subject: [PATCH 1/5] feat(worker): return workflow completed result on decision processing Signed-off-by: Shijie Sheng --- cadence/_internal/workflow/context.py | 12 +- cadence/_internal/workflow/workflow_engine.py | 248 +++++++----------- .../_internal/workflow/workflow_intance.py | 41 ++- cadence/workflow.py | 4 +- .../test_workflow_engine_integration.py | 75 ------ 5 files changed, 142 insertions(+), 238 deletions(-) diff --git a/cadence/_internal/workflow/context.py b/cadence/_internal/workflow/context.py index d3c926f..43448fe 100644 --- a/cadence/_internal/workflow/context.py +++ b/cadence/_internal/workflow/context.py @@ -1,6 +1,7 @@ +from contextlib import contextmanager from datetime import timedelta from math import ceil -from typing import Optional, Any, Unpack, Type, cast +from typing import Iterator, Optional, Any, Unpack, Type, cast from cadence._internal.workflow.statemachine.decision_manager import DecisionManager from cadence._internal.workflow.decisions_helper import DecisionsHelper @@ -15,13 +16,12 @@ class Context(WorkflowContext): def __init__( self, info: WorkflowInfo, - decision_helper: DecisionsHelper, decision_manager: DecisionManager, ): self._info = info self._replay_mode = True self._replay_current_time_milliseconds: Optional[int] = None - self._decision_helper = decision_helper + self._decision_helper = DecisionsHelper() self._decision_manager = decision_manager def info(self) -> WorkflowInfo: @@ -110,6 +110,12 @@ def get_replay_current_time_milliseconds(self) -> Optional[int]: """Get the current replay time in milliseconds.""" return self._replay_current_time_milliseconds + @contextmanager + def _activate(self) -> Iterator["Context"]: + token = WorkflowContext._var.set(self) + yield self + WorkflowContext._var.reset(token) + def _round_to_nearest_second(delta: timedelta) -> timedelta: return timedelta(seconds=ceil(delta.total_seconds())) diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index e624613..3a16e56 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -1,15 +1,16 @@ -import asyncio import logging from dataclasses import dataclass -from typing import Any, Optional from cadence._internal.workflow.context import Context -from cadence._internal.workflow.decisions_helper import DecisionsHelper from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator -from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop from cadence._internal.workflow.statemachine.decision_manager import DecisionManager from cadence._internal.workflow.workflow_intance import WorkflowInstance -from cadence.api.v1.decision_pb2 import Decision +from cadence.api.v1.common_pb2 import Payload +from cadence.api.v1.decision_pb2 import ( + CompleteWorkflowExecutionDecisionAttributes, + Decision, +) +from cadence.api.v1.history_pb2 import WorkflowExecutionStartedEventAttributes from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.workflow import WorkflowDefinition, WorkflowInfo @@ -23,12 +24,13 @@ class DecisionResult: class WorkflowEngine: def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition): - self._workflow_instance = WorkflowInstance(workflow_definition) - self._decision_manager = DecisionManager() - self._decisions_helper = DecisionsHelper() - self._context = Context(info, self._decisions_helper, self._decision_manager) - self._loop = DeterministicEventLoop() - self._task: Optional[asyncio.Task] = None + self._workflow_instance = WorkflowInstance( + workflow_definition, info.data_converter + ) + self._decision_manager = ( + DecisionManager() + ) # TODO: remove this stateful object and use the context instead + self._context = Context(info, self._decision_manager) def process_decision( self, decision_task: PollForDecisionTaskResponse @@ -46,54 +48,58 @@ def process_decision( DecisionResult containing the list of decisions """ try: - # Log decision task processing start with full context (matches Java ReplayDecisionTaskHandler) - logger.info( - "Processing decision task for workflow", - extra={ - "workflow_type": self._context.info().workflow_type, - "workflow_id": self._context.info().workflow_id, - "run_id": self._context.info().workflow_run_id, - "started_event_id": decision_task.started_event_id, - "attempt": decision_task.attempt, - }, - ) - # Activate workflow context for the entire decision processing - with self._context._activate(): + with self._context._activate() as ctx: + # Log decision task processing start with full context (matches Java ReplayDecisionTaskHandler) + logger.info( + "Processing decision task for workflow", + extra={ + "workflow_type": ctx.info().workflow_type, + "workflow_id": ctx.info().workflow_id, + "run_id": ctx.info().workflow_run_id, + "started_event_id": decision_task.started_event_id, + "attempt": decision_task.attempt, + }, + ) + # Create DecisionEventsIterator for structured event processing events_iterator = DecisionEventsIterator( - decision_task, self._context.info().workflow_events + decision_task, ctx.info().workflow_events ) # Process decision events using iterator-driven approach - self._process_decision_events(events_iterator, decision_task) + self._process_decision_events(ctx, events_iterator, decision_task) # Collect all pending decisions from state machines decisions = self._decision_manager.collect_pending_decisions() - # Log decision task completion with metrics (matches Java ReplayDecisionTaskHandler) - logger.debug( - "Decision task completed", - extra={ - "workflow_type": self._context.info().workflow_type, - "workflow_id": self._context.info().workflow_id, - "run_id": self._context.info().workflow_run_id, - "started_event_id": decision_task.started_event_id, - "decisions_count": len(decisions), - "replay_mode": self._context.is_replay_mode(), - }, - ) + # complete workflow if it is done + try: + if self._workflow_instance.is_done(): + result = self._workflow_instance.get_result() + decisions.append( + Decision( + complete_workflow_execution_decision_attributes=CompleteWorkflowExecutionDecisionAttributes( + result=result + ) + ) + ) + return DecisionResult(decisions=decisions) - return DecisionResult(decisions=decisions) + except Exception: + # TODO: handle CancellationError + # TODO: handle WorkflowError + # TODO: handle unknown error, fail decision task and try again instead of breaking the engine + raise except Exception as e: # Log decision task failure with full context (matches Java ReplayDecisionTaskHandler) logger.error( "Decision task processing failed", extra={ - "workflow_type": self._context.info().workflow_type, - "workflow_id": self._context.info().workflow_id, - "run_id": self._context.info().workflow_run_id, + "workflow_type": ctx.info().workflow_type, + "workflow_id": ctx.info().workflow_id, + "run_id": ctx.info().workflow_run_id, "started_event_id": decision_task.started_event_id, "attempt": decision_task.attempt, "error_type": type(e).__name__, @@ -104,10 +110,11 @@ def process_decision( raise def is_done(self) -> bool: - return self._task is not None and self._task.done() + return self._workflow_instance.is_done() def _process_decision_events( self, + ctx: Context, events_iterator: DecisionEventsIterator, decision_task: PollForDecisionTaskResponse, ) -> None: @@ -131,7 +138,7 @@ def _process_decision_events( logger.debug( "Processing decision events batch", extra={ - "workflow_id": self._context.info().workflow_id, + "workflow_id": ctx.info().workflow_id, "events_count": len(decision_events.get_events()), "markers_count": len(decision_events.get_markers()), "replay_mode": decision_events.is_replay(), @@ -140,109 +147,55 @@ def _process_decision_events( ) # Update context with replay information - self._context.set_replay_mode(decision_events.is_replay()) + ctx.set_replay_mode(decision_events.is_replay()) if decision_events.replay_current_time_milliseconds: - self._context.set_replay_current_time_milliseconds( + ctx.set_replay_current_time_milliseconds( decision_events.replay_current_time_milliseconds ) - # Phase 1: Process markers first for deterministic replay - for marker_event in decision_events.get_markers(): - try: - logger.debug( - "Processing marker event", - extra={ - "workflow_id": self._context.info().workflow_id, - "marker_name": getattr( - marker_event, "marker_name", "unknown" - ), - "event_id": getattr(marker_event, "event_id", None), - "replay_mode": self._context.is_replay_mode(), - }, - ) - # Process through state machines (DecisionsHelper now delegates to DecisionManager) - self._decision_manager.handle_history_event(marker_event) - except Exception as e: - # Warning for unexpected markers (matches Java ClockDecisionContext) - logger.warning( - "Unexpected marker event encountered", - extra={ - "workflow_id": self._context.info().workflow_id, - "marker_name": getattr( - marker_event, "marker_name", "unknown" - ), - "event_id": getattr(marker_event, "event_id", None), - "error_type": type(e).__name__, - }, - exc_info=True, - ) - - # Phase 2: Process regular events to update workflow state - for event in decision_events.get_events(): - try: - logger.debug( - "Processing history event", - extra={ - "workflow_id": self._context.info().workflow_id, - "event_type": getattr(event, "event_type", "unknown"), - "event_id": getattr(event, "event_id", None), - "replay_mode": self._context.is_replay_mode(), - }, - ) - # Process through state machines (DecisionsHelper now delegates to DecisionManager) - self._decision_manager.handle_history_event(event) - except Exception as e: - logger.warning( - "Error processing history event", - extra={ - "workflow_id": self._context.info().workflow_id, - "event_type": getattr(event, "event_type", "unknown"), - "event_id": getattr(event, "event_id", None), - "error_type": type(e).__name__, - }, - exc_info=True, - ) - - # Phase 3: Execute workflow logic - self._execute_workflow_once(decision_task) - - def _execute_workflow_once( - self, decision_task: PollForDecisionTaskResponse - ) -> None: - """ - Execute the workflow function to generate new decisions. + # Phase 1: Process markers first + for marker_event in decision_events.markers: + logger.debug( + "Processing marker event", + extra={ + "workflow_id": ctx.info().workflow_id, + "marker_name": getattr(marker_event, "marker_name", "unknown"), + "event_id": getattr(marker_event, "event_id", None), + "replay_mode": ctx.is_replay_mode(), + }, + ) + # Process through state machines (DecisionsHelper now delegates to DecisionManager) + self._decision_manager.handle_history_event(marker_event) - This blocks until the workflow schedules an activity or completes. + # Phase 2: Process regular input events + for event in decision_events.input: + logger.debug( + "Processing history event", + extra={ + "workflow_id": ctx.info().workflow_id, + "event_type": getattr(event, "event_type", "unknown"), + "event_id": getattr(event, "event_id", None), + "replay_mode": ctx.is_replay_mode(), + }, + ) + # Process through state machines (DecisionsHelper now delegates to DecisionManager) + self._decision_manager.handle_history_event(event) - Args: - decision_task: The decision task containing workflow context - """ - try: - # Extract workflow input from history - if self._task is None: - workflow_input = self._extract_workflow_input(decision_task) - self._task = self._loop.create_task( - self._workflow_instance.run(workflow_input) + # Phase 3: Execute workflow logic + if not self._workflow_instance.is_started(): + self._workflow_instance.start( + self._extract_workflow_input(decision_task) ) - self._loop.run_until_yield() + self._workflow_instance.run_once() - except Exception as e: - logger.error( - "Error executing workflow function", - extra={ - "workflow_type": self._context.info().workflow_type, - "workflow_id": self._context.info().workflow_id, - "run_id": self._context.info().workflow_run_id, - "error_type": type(e).__name__, - }, - exc_info=True, - ) - raise + # Phase 4: update state machine with output events + for event in decision_events.output: + self._decision_manager.handle_history_event(event) def _extract_workflow_input( self, decision_task: PollForDecisionTaskResponse - ) -> Any: + ) -> Payload: """ Extract workflow input from the decision task history. @@ -253,26 +206,15 @@ def _extract_workflow_input( The workflow input data, or None if not found """ if not decision_task.history or not hasattr(decision_task.history, "events"): - logger.warning("No history events found in decision task") - return None + raise ValueError("No history events found in decision task") # Look for WorkflowExecutionStarted event for event in decision_task.history.events: if hasattr(event, "workflow_execution_started_event_attributes"): - started_attrs = event.workflow_execution_started_event_attributes + started_attrs: WorkflowExecutionStartedEventAttributes = ( + event.workflow_execution_started_event_attributes + ) if started_attrs and hasattr(started_attrs, "input"): - # Deserialize the input using the client's data converter - try: - # Use from_data method with a single type hint of None (no type conversion) - input_data_list = self._context.data_converter().from_data( - started_attrs.input, [None] - ) - input_data = input_data_list[0] if input_data_list else None - logger.debug(f"Extracted workflow input: {input_data}") - return input_data - except Exception as e: - logger.warning(f"Failed to deserialize workflow input: {e}") - return None - - logger.warning("No WorkflowExecutionStarted event found in history") - return None + return started_attrs.input + + raise ValueError("No WorkflowExecutionStarted event found in history") diff --git a/cadence/_internal/workflow/workflow_intance.py b/cadence/_internal/workflow/workflow_intance.py index 833b499..5f3205d 100644 --- a/cadence/_internal/workflow/workflow_intance.py +++ b/cadence/_internal/workflow/workflow_intance.py @@ -1,11 +1,42 @@ +from asyncio import Task +from typing import Optional +from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop +from cadence.api.v1.common_pb2 import Payload +from cadence.data_converter import DataConverter from cadence.workflow import WorkflowDefinition class WorkflowInstance: - def __init__(self, workflow_definition: WorkflowDefinition): + def __init__( + self, workflow_definition: WorkflowDefinition, data_converter: DataConverter + ): self._definition = workflow_definition - self._instance = workflow_definition.cls().__init__() + self._data_converter = data_converter + self._instance = ( + workflow_definition.cls().__init__() + ) # construct a new workflow object + self._loop = DeterministicEventLoop() + self._task: Optional[Task] = None - async def run(self, *args): - run_method = self._definition.get_run_method(self._instance) - return run_method(*args) + def start(self, input: Payload): + if self._task is None: + run_method = self._definition.get_run_method(self._instance) + workflow_input = self._data_converter.from_data(input, [None]) + self._task = self._loop.create_task(run_method(*workflow_input)) + + def is_started(self) -> bool: + return self._task is not None + + def run_once(self): + self._loop.run_until_yield() + + def is_done(self) -> bool: + return self._task is not None and self._task.done() + + # TODO: consider cache result to avoid multiple data conversions + def get_result(self) -> Payload: + if self._task is None: + raise RuntimeError("Workflow is not started yet") + result = self._task.result() + # TODO: handle result with multiple outputs + return self._data_converter.to_data([result]) diff --git a/cadence/workflow.py b/cadence/workflow.py index 68911e3..be42a2b 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -193,9 +193,9 @@ async def execute_activity( ) -> ResultType: ... @contextmanager - def _activate(self) -> Iterator[None]: + def _activate(self) -> Iterator["WorkflowContext"]: token = WorkflowContext._var.set(self) - yield None + yield self WorkflowContext._var.reset(token) @staticmethod diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index c1c585c..2c6bceb 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -140,81 +140,6 @@ def test_process_decision_error_handling( assert isinstance(result, DecisionResult) assert len(result.decisions) == 0 - @pytest.mark.asyncio - async def test_extract_workflow_input_success( - self, workflow_engine: "WorkflowEngine", mock_client, decision_task - ): - """Test successful workflow input extraction.""" - - # Extract workflow input - input_data = workflow_engine._extract_workflow_input(decision_task) - - # Verify the input was extracted - assert input_data == "test-input" - mock_client.data_converter.from_data.assert_called_once() - - @pytest.mark.asyncio - async def test_extract_workflow_input_no_history( - self, workflow_engine, mock_client - ): - """Test workflow input extraction with no history.""" - decision_task = PollForDecisionTaskResponse() - decision_task.task_token = b"test-task-token" - # No history set - - # Extract workflow input - input_data = workflow_engine._extract_workflow_input(decision_task) - - # Verify no input was extracted - assert input_data is None - - @pytest.mark.asyncio - async def test_extract_workflow_input_no_started_event( - self, workflow_engine, mock_client - ): - """Test workflow input extraction with no WorkflowExecutionStarted event.""" - # Create a decision task with no started event - decision_task = PollForDecisionTaskResponse() - decision_task.task_token = b"test-task-token" - - # Create workflow execution - workflow_execution = WorkflowExecution() - workflow_execution.workflow_id = "test-workflow" - workflow_execution.run_id = "test-run" - decision_task.workflow_execution.CopyFrom(workflow_execution) - - # Create workflow type - workflow_type_obj = WorkflowType() - workflow_type_obj.name = "test_workflow" - decision_task.workflow_type.CopyFrom(workflow_type_obj) - - # Create history with no events - history = History() - decision_task.history.CopyFrom(history) - - # Extract workflow input - input_data = workflow_engine._extract_workflow_input(decision_task) - - # Verify no input was extracted - assert input_data is None - - @pytest.mark.asyncio - async def test_extract_workflow_input_deserialization_error( - self, workflow_engine, mock_client, decision_task - ): - """Test workflow input extraction with deserialization error.""" - - # Mock data converter to raise an exception - mock_client.data_converter.from_data = AsyncMock( - side_effect=Exception("Deserialization error") - ) - - # Extract workflow input - input_data = workflow_engine._extract_workflow_input(decision_task) - - # Verify no input was extracted due to error - assert input_data is None - def test_process_decision_with_query_results( self, workflow_engine, mock_client, decision_task ): From 9d944468fbff7201c1d5609bd4568459cb2b649d Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Fri, 21 Nov 2025 09:52:18 -0800 Subject: [PATCH 2/5] fix until WorkflowDefinition Signed-off-by: Shijie Sheng --- .../workflow/decision_events_iterator.py | 3 - cadence/_internal/workflow/workflow_engine.py | 47 ++--- .../_internal/workflow/workflow_intance.py | 6 +- cadence/worker/_decision_task_handler.py | 3 +- cadence/workflow.py | 17 +- .../workflow/test_decision_events_iterator.py | 89 +--------- .../workflow/test_workflow_engine.py | 64 +++++++ .../test_workflow_engine_integration.py | 161 ------------------ tests/cadence/_internal/workflow/utils.py | 60 +++++++ .../worker/test_decision_task_handler.py | 5 +- .../test_decision_task_handler_integration.py | 2 +- .../worker/test_task_handler_integration.py | 4 +- 12 files changed, 167 insertions(+), 294 deletions(-) create mode 100644 tests/cadence/_internal/workflow/test_workflow_engine.py delete mode 100644 tests/cadence/_internal/workflow/test_workflow_engine_integration.py create mode 100644 tests/cadence/_internal/workflow/utils.py diff --git a/cadence/_internal/workflow/decision_events_iterator.py b/cadence/_internal/workflow/decision_events_iterator.py index 1d6e0f2..a28c0b5 100644 --- a/cadence/_internal/workflow/decision_events_iterator.py +++ b/cadence/_internal/workflow/decision_events_iterator.py @@ -11,7 +11,6 @@ from cadence._internal.workflow.history_event_iterator import HistoryEventsIterator from cadence.api.v1.history_pb2 import HistoryEvent -from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse @dataclass @@ -44,10 +43,8 @@ class DecisionEventsIterator(Iterator[DecisionEvents]): def __init__( self, - decision_task: PollForDecisionTaskResponse, events: List[HistoryEvent], ): - self._decision_task = decision_task self._events: HistoryEventsIterator = HistoryEventsIterator(events) self._next_decision_event_id: Optional[int] = None self._replay_current_time_milliseconds: Optional[int] = None diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 3a16e56..ef9f0f0 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -1,5 +1,6 @@ import logging from dataclasses import dataclass +from typing import List from cadence._internal.workflow.context import Context from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator @@ -10,7 +11,10 @@ CompleteWorkflowExecutionDecisionAttributes, Decision, ) -from cadence.api.v1.history_pb2 import WorkflowExecutionStartedEventAttributes +from cadence.api.v1.history_pb2 import ( + HistoryEvent, + WorkflowExecutionStartedEventAttributes, +) from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.workflow import WorkflowDefinition, WorkflowInfo @@ -33,7 +37,8 @@ def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition): self._context = Context(info, self._decision_manager) def process_decision( - self, decision_task: PollForDecisionTaskResponse + self, + events: List[HistoryEvent], ) -> DecisionResult: """ Process a decision task and generate decisions using DecisionEventsIterator. @@ -57,18 +62,14 @@ def process_decision( "workflow_type": ctx.info().workflow_type, "workflow_id": ctx.info().workflow_id, "run_id": ctx.info().workflow_run_id, - "started_event_id": decision_task.started_event_id, - "attempt": decision_task.attempt, }, ) # Create DecisionEventsIterator for structured event processing - events_iterator = DecisionEventsIterator( - decision_task, ctx.info().workflow_events - ) + events_iterator = DecisionEventsIterator(events) # Process decision events using iterator-driven approach - self._process_decision_events(ctx, events_iterator, decision_task) + self._process_decision_events(ctx, events_iterator) # Collect all pending decisions from state machines decisions = self._decision_manager.collect_pending_decisions() @@ -100,8 +101,6 @@ def process_decision( "workflow_type": ctx.info().workflow_type, "workflow_id": ctx.info().workflow_id, "run_id": ctx.info().workflow_run_id, - "started_event_id": decision_task.started_event_id, - "attempt": decision_task.attempt, "error_type": type(e).__name__, }, exc_info=True, @@ -116,7 +115,6 @@ def _process_decision_events( self, ctx: Context, events_iterator: DecisionEventsIterator, - decision_task: PollForDecisionTaskResponse, ) -> None: """ Process decision events using the iterator-driven approach similar to Java client. @@ -139,15 +137,14 @@ def _process_decision_events( "Processing decision events batch", extra={ "workflow_id": ctx.info().workflow_id, - "events_count": len(decision_events.get_events()), - "markers_count": len(decision_events.get_markers()), - "replay_mode": decision_events.is_replay(), + "markers_count": len(decision_events.markers), + "replay_mode": decision_events.replay, "replay_time": decision_events.replay_current_time_milliseconds, }, ) # Update context with replay information - ctx.set_replay_mode(decision_events.is_replay()) + ctx.set_replay_mode(decision_events.replay) if decision_events.replay_current_time_milliseconds: ctx.set_replay_current_time_milliseconds( decision_events.replay_current_time_milliseconds @@ -161,7 +158,7 @@ def _process_decision_events( "workflow_id": ctx.info().workflow_id, "marker_name": getattr(marker_event, "marker_name", "unknown"), "event_id": getattr(marker_event, "event_id", None), - "replay_mode": ctx.is_replay_mode(), + "replay_mode": decision_events.replay, }, ) # Process through state machines (DecisionsHelper now delegates to DecisionManager) @@ -175,18 +172,24 @@ def _process_decision_events( "workflow_id": ctx.info().workflow_id, "event_type": getattr(event, "event_type", "unknown"), "event_id": getattr(event, "event_id", None), - "replay_mode": ctx.is_replay_mode(), + "replay_mode": decision_events.replay, }, ) + # start workflow on workflow started event + if ( + event.WhichOneof("attributes") + == "workflow_execution_started_event_attributes" + ): + started_attrs: WorkflowExecutionStartedEventAttributes = ( + event.workflow_execution_started_event_attributes + ) + if started_attrs and hasattr(started_attrs, "input"): + self._workflow_instance.start(started_attrs.input) + # Process through state machines (DecisionsHelper now delegates to DecisionManager) self._decision_manager.handle_history_event(event) # Phase 3: Execute workflow logic - if not self._workflow_instance.is_started(): - self._workflow_instance.start( - self._extract_workflow_input(decision_task) - ) - self._workflow_instance.run_once() # Phase 4: update state machine with output events diff --git a/cadence/_internal/workflow/workflow_intance.py b/cadence/_internal/workflow/workflow_intance.py index 5f3205d..719dd15 100644 --- a/cadence/_internal/workflow/workflow_intance.py +++ b/cadence/_internal/workflow/workflow_intance.py @@ -12,16 +12,14 @@ def __init__( ): self._definition = workflow_definition self._data_converter = data_converter - self._instance = ( - workflow_definition.cls().__init__() - ) # construct a new workflow object + self._instance = workflow_definition.cls() # construct a new workflow object self._loop = DeterministicEventLoop() self._task: Optional[Task] = None def start(self, input: Payload): if self._task is None: run_method = self._definition.get_run_method(self._instance) - workflow_input = self._data_converter.from_data(input, [None]) + workflow_input = self._data_converter.from_data(input, []) self._task = self._loop.create_task(run_method(*workflow_input)) def is_started(self) -> bool: diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index 3103787..e40bb13 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -121,7 +121,6 @@ async def _handle_task_implementation( workflow_run_id=run_id, workflow_task_list=self.task_list, data_converter=self._client.data_converter, - workflow_events=workflow_events, ) # Use thread-safe cache to get or create workflow engine @@ -136,7 +135,7 @@ async def _handle_task_implementation( self._workflow_engines[cache_key] = workflow_engine decision_result = await asyncio.get_running_loop().run_in_executor( - self._executor, workflow_engine.process_decision, task + self._executor, workflow_engine.process_decision, workflow_events ) # Clean up completed workflows from cache to prevent memory leaks diff --git a/cadence/workflow.py b/cadence/workflow.py index be42a2b..a8b257f 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -5,7 +5,6 @@ from datetime import timedelta from typing import ( Callable, - List, cast, Optional, Union, @@ -15,10 +14,10 @@ Type, Unpack, Any, + Generic, ) import inspect -from cadence.api.v1.history_pb2 import HistoryEvent from cadence.data_converter import DataConverter ResultType = TypeVar("ResultType") @@ -44,6 +43,7 @@ async def execute_activity( T = TypeVar("T", bound=Callable[..., Any]) +C = TypeVar("C") class WorkflowDefinitionOptions(TypedDict, total=False): @@ -52,7 +52,7 @@ class WorkflowDefinitionOptions(TypedDict, total=False): name: str -class WorkflowDefinition: +class WorkflowDefinition(Generic[C]): """ Definition of a workflow class with metadata. @@ -60,8 +60,8 @@ class WorkflowDefinition: Provides type safety and metadata for workflow classes. """ - def __init__(self, cls: Type, name: str, run_method_name: str): - self._cls = cls + def __init__(self, cls: Type[C], name: str, run_method_name: str): + self._cls: Type[C] = cls self._name = name self._run_method_name = run_method_name @@ -71,7 +71,7 @@ def name(self) -> str: return self._name @property - def cls(self) -> Type: + def cls(self) -> Type[C]: """Get the workflow class.""" return self._cls @@ -151,7 +151,7 @@ def decorator(f: T) -> T: raise ValueError(f"Workflow run method '{f.__name__}' must be async") # Attach metadata to the function - f._workflow_run = True # type: ignore + setattr(f, "_workflow_run", None) return f # Support both @workflow.run and @workflow.run() @@ -163,14 +163,13 @@ def decorator(f: T) -> T: return decorator(func) -@dataclass +@dataclass(frozen=True) class WorkflowInfo: workflow_type: str workflow_domain: str workflow_id: str workflow_run_id: str workflow_task_list: str - workflow_events: List[HistoryEvent] data_converter: DataConverter diff --git a/tests/cadence/_internal/workflow/test_decision_events_iterator.py b/tests/cadence/_internal/workflow/test_decision_events_iterator.py index 2268421..16591f5 100644 --- a/tests/cadence/_internal/workflow/test_decision_events_iterator.py +++ b/tests/cadence/_internal/workflow/test_decision_events_iterator.py @@ -4,15 +4,12 @@ """ import pytest -from typing import List -from cadence.api.v1.history_pb2 import HistoryEvent, History -from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse -from cadence.api.v1.common_pb2 import WorkflowExecution from cadence._internal.workflow.decision_events_iterator import ( DecisionEventsIterator, ) +from tests.cadence._internal.workflow.utils import create_mock_history_event class TestDecisionEventsIterator: @@ -95,8 +92,7 @@ class TestDecisionEventsIterator: ) def test_successful_cases(self, name, event_types, expected): events = create_mock_history_event(event_types) - decision_task = create_mock_decision_task(events) - iterator = DecisionEventsIterator(decision_task, events) + iterator = DecisionEventsIterator(events) batches = [decision_events for decision_events in iterator] assert len(expected) == len(batches) @@ -108,84 +104,3 @@ def test_successful_cases(self, name, event_types, expected): assert batch.replay == expect["replay"] assert batch.replay_current_time_milliseconds == expect["replay_time"] assert batch.next_decision_event_id == expect["next_decision_event_id"] - - -def create_mock_history_event(event_types: List[str]) -> List[HistoryEvent]: - events = [] - for i, event_type in enumerate(event_types): - event = HistoryEvent() - event.event_id = i + 1 - event.event_time.FromMilliseconds((i + 1) * 1000) - - # Set the appropriate attribute based on event type - if event_type == "decision_task_started": - event.decision_task_started_event_attributes.SetInParent() - elif event_type == "decision_task_completed": - event.decision_task_completed_event_attributes.SetInParent() - elif event_type == "decision_task_failed": - event.decision_task_failed_event_attributes.SetInParent() - elif event_type == "decision_task_timed_out": - event.decision_task_timed_out_event_attributes.SetInParent() - elif event_type == "marker_recorded": - event.marker_recorded_event_attributes.SetInParent() - elif event_type == "activity_scheduled": - event.activity_task_scheduled_event_attributes.SetInParent() - elif event_type == "activity_started": - event.activity_task_started_event_attributes.SetInParent() - elif event_type == "activity_completed": - event.activity_task_completed_event_attributes.SetInParent() - elif event_type == "activity_failed": - event.activity_task_failed_event_attributes.SetInParent() - elif event_type == "activity_timed_out": - event.activity_task_timed_out_event_attributes.SetInParent() - elif event_type == "activity_cancel_requested": - event.activity_task_cancel_requested_event_attributes.SetInParent() - elif event_type == "request_cancel_activity_task_failed": - event.request_cancel_activity_task_failed_event_attributes.SetInParent() - elif event_type == "activity_canceled": - event.activity_task_canceled_event_attributes.SetInParent() - elif event_type == "timer_started": - event.timer_started_event_attributes.SetInParent() - elif event_type == "timer_fired": - event.timer_fired_event_attributes.SetInParent() - elif event_type == "timer_canceled": - event.timer_canceled_event_attributes.SetInParent() - elif event_type == "cancel_timer_failed": - event.cancel_timer_failed_event_attributes.SetInParent() - elif event_type == "request_cancel_external_workflow_execution_initiated": - event.request_cancel_external_workflow_execution_initiated_event_attributes.SetInParent() - elif event_type == "request_cancel_external_workflow_execution_failed": - event.request_cancel_external_workflow_execution_failed_event_attributes.SetInParent() - elif event_type == "external_workflow_execution_cancel_requested": - event.external_workflow_execution_cancel_requested_event_attributes.SetInParent() - elif event_type == "workflow_execution_started": - event.workflow_execution_started_event_attributes.SetInParent() - elif event_type == "workflow_execution_completed": - event.workflow_execution_completed_event_attributes.SetInParent() - - events.append(event) - - return events - - -def create_mock_decision_task( - events: List[HistoryEvent], next_page_token: bytes = None -) -> PollForDecisionTaskResponse: - """Create a mock decision task for testing.""" - task = PollForDecisionTaskResponse() - - # Mock history - history = History() - history.events.extend(events) - task.history.CopyFrom(history) - - # Mock workflow execution - workflow_execution = WorkflowExecution() - workflow_execution.workflow_id = "test-workflow" - workflow_execution.run_id = "test-run" - task.workflow_execution.CopyFrom(workflow_execution) - - if next_page_token: - task.next_page_token = next_page_token - - return task diff --git a/tests/cadence/_internal/workflow/test_workflow_engine.py b/tests/cadence/_internal/workflow/test_workflow_engine.py new file mode 100644 index 0000000..c1e1f43 --- /dev/null +++ b/tests/cadence/_internal/workflow/test_workflow_engine.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +from typing import List +import pytest +from cadence.api.v1.history_pb2 import ( + HistoryEvent, +) +from cadence._internal.workflow.workflow_engine import WorkflowEngine +from cadence import workflow +from cadence.data_converter import DefaultDataConverter +from cadence.workflow import WorkflowInfo, WorkflowDefinition, WorkflowDefinitionOptions +from tests.cadence._internal.workflow.utils import create_mock_history_event + + +class TestWorkflow: + @workflow.run + async def echo(self, input_data): + return f"echo: {input_data}" + + +class TestWorkflowEngine: + """Unit tests for WorkflowEngine.""" + + @pytest.fixture + def echo_workflow_definition(self) -> WorkflowDefinition: + """Create a mock workflow definition.""" + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + return WorkflowDefinition.wrap(TestWorkflow, workflow_opts) + + @pytest.fixture + def simple_workflow_events(self) -> List[HistoryEvent]: + return create_mock_history_event( + event_types=[ + "workflow_execution_started", + "decision_task_scheduled", + "decision_task_started", + "decision_task_completed", + "workflow_execution_completed", + ] + ) + + def test_process_simple_workflow( + self, + echo_workflow_definition: WorkflowDefinition, + simple_workflow_events: List[HistoryEvent], + ): + workflow_engine = create_workflow_engine(echo_workflow_definition) + decision_result = workflow_engine.process_decision(simple_workflow_events[:3]) + + assert len(decision_result.decisions) == 1 + + +def create_workflow_engine(workflow_definition: WorkflowDefinition) -> WorkflowEngine: + """Create workflow engine.""" + return WorkflowEngine( + info=WorkflowInfo( + workflow_type="test_workflow", + workflow_domain="test-domain", + workflow_id="test-workflow-id", + workflow_run_id="test-run-id", + workflow_task_list="test-task-list", + data_converter=DefaultDataConverter(), + ), + workflow_definition=workflow_definition, + ) diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py deleted file mode 100644 index 2c6bceb..0000000 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python3 -""" -Integration tests for WorkflowEngine. -""" - -import pytest -from unittest.mock import Mock, AsyncMock, patch -from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse -from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType -from cadence.api.v1.history_pb2 import ( - History, - HistoryEvent, - WorkflowExecutionStartedEventAttributes, -) -from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult -from cadence import workflow -from cadence.workflow import WorkflowInfo, WorkflowDefinition, WorkflowDefinitionOptions -from cadence.client import Client - - -class TestWorkflowEngineIntegration: - """Integration tests for WorkflowEngine.""" - - @pytest.fixture - def mock_client(self): - """Create a mock Cadence client.""" - client = Mock(spec=Client) - client.domain = "test-domain" - client.data_converter = Mock() - client.data_converter.from_data = Mock(return_value=["test-input"]) - return client - - @pytest.fixture - def workflow_info(self, mock_client, decision_task): - """Create workflow info.""" - return WorkflowInfo( - workflow_type="test_workflow", - workflow_domain="test-domain", - workflow_id="test-workflow-id", - workflow_run_id="test-run-id", - workflow_task_list="test-task-list", - workflow_events=decision_task.history.events, - data_converter=mock_client.data_converter, - ) - - @pytest.fixture - def decision_task(self): - return self.create_mock_decision_task() - - @pytest.fixture - def mock_workflow_definition(self): - """Create a mock workflow definition.""" - - class TestWorkflow: - @workflow.run - async def weird_name(self, input_data): - return f"processed: {input_data}" - - workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - return WorkflowDefinition.wrap(TestWorkflow, workflow_opts) - - @pytest.fixture - def workflow_engine(self, workflow_info, mock_workflow_definition): - """Create a WorkflowEngine instance.""" - return WorkflowEngine( - info=workflow_info, - workflow_definition=mock_workflow_definition, - ) - - def create_mock_decision_task( - self, - workflow_id="test-workflow", - run_id="test-run", - workflow_type="test_workflow", - ): - """Create a mock decision task with history.""" - # Create workflow execution - workflow_execution = WorkflowExecution() - workflow_execution.workflow_id = workflow_id - workflow_execution.run_id = run_id - - # Create workflow type - workflow_type_obj = WorkflowType() - workflow_type_obj.name = workflow_type - - # Create workflow execution started event - started_event = WorkflowExecutionStartedEventAttributes() - input_payload = Payload(data=b'"test-input"') - started_event.input.CopyFrom(input_payload) - - history_event = HistoryEvent() - history_event.workflow_execution_started_event_attributes.CopyFrom( - started_event - ) - - # Create history - history = History() - history.events.append(history_event) - - # Create decision task - decision_task = PollForDecisionTaskResponse() - decision_task.task_token = b"test-task-token" - decision_task.workflow_execution.CopyFrom(workflow_execution) - decision_task.workflow_type.CopyFrom(workflow_type_obj) - decision_task.history.CopyFrom(history) - - return decision_task - - def test_process_decision_success(self, workflow_engine, decision_task): - """Test successful decision processing.""" - - # Mock the decision manager to return some decisions - with patch.object( - workflow_engine._decision_manager, - "collect_pending_decisions", - return_value=[Mock()], - ): - # Process the decision - result = workflow_engine.process_decision(decision_task) - - # Verify the result - assert isinstance(result, DecisionResult) - assert len(result.decisions) == 1 - - def test_process_decision_error_handling( - self, workflow_engine, mock_client, decision_task - ): - """Test decision processing error handling.""" - - # Mock the decision manager to raise an exception - with patch.object( - workflow_engine._decision_manager, - "handle_history_event", - side_effect=Exception("Test error"), - ): - # Process the decision - result = workflow_engine.process_decision(decision_task) - - # Verify error handling - should return empty decisions - assert isinstance(result, DecisionResult) - assert len(result.decisions) == 0 - - def test_process_decision_with_query_results( - self, workflow_engine, mock_client, decision_task - ): - """Test decision processing with query results.""" - - # Mock the decision manager to return decisions with query results - mock_decisions = [Mock()] - - with patch.object( - workflow_engine._decision_manager, - "collect_pending_decisions", - return_value=mock_decisions, - ): - # Process the decision - result = workflow_engine.process_decision(decision_task) - - # Verify the result - assert isinstance(result, DecisionResult) - assert len(result.decisions) == 1 diff --git a/tests/cadence/_internal/workflow/utils.py b/tests/cadence/_internal/workflow/utils.py new file mode 100644 index 0000000..9ea9d3a --- /dev/null +++ b/tests/cadence/_internal/workflow/utils.py @@ -0,0 +1,60 @@ +from typing import List +from cadence.api.v1.history_pb2 import HistoryEvent + + +def create_mock_history_event(event_types: List[str]) -> List[HistoryEvent]: + events = [] + for i, event_type in enumerate(event_types): + event = HistoryEvent() + event.event_id = i + 1 + event.event_time.FromMilliseconds((i + 1) * 1000) + + # Set the appropriate attribute based on event type + if event_type == "decision_task_started": + event.decision_task_started_event_attributes.SetInParent() + elif event_type == "decision_task_completed": + event.decision_task_completed_event_attributes.SetInParent() + elif event_type == "decision_task_failed": + event.decision_task_failed_event_attributes.SetInParent() + elif event_type == "decision_task_timed_out": + event.decision_task_timed_out_event_attributes.SetInParent() + elif event_type == "marker_recorded": + event.marker_recorded_event_attributes.SetInParent() + elif event_type == "activity_scheduled": + event.activity_task_scheduled_event_attributes.SetInParent() + elif event_type == "activity_started": + event.activity_task_started_event_attributes.SetInParent() + elif event_type == "activity_completed": + event.activity_task_completed_event_attributes.SetInParent() + elif event_type == "activity_failed": + event.activity_task_failed_event_attributes.SetInParent() + elif event_type == "activity_timed_out": + event.activity_task_timed_out_event_attributes.SetInParent() + elif event_type == "activity_cancel_requested": + event.activity_task_cancel_requested_event_attributes.SetInParent() + elif event_type == "request_cancel_activity_task_failed": + event.request_cancel_activity_task_failed_event_attributes.SetInParent() + elif event_type == "activity_canceled": + event.activity_task_canceled_event_attributes.SetInParent() + elif event_type == "timer_started": + event.timer_started_event_attributes.SetInParent() + elif event_type == "timer_fired": + event.timer_fired_event_attributes.SetInParent() + elif event_type == "timer_canceled": + event.timer_canceled_event_attributes.SetInParent() + elif event_type == "cancel_timer_failed": + event.cancel_timer_failed_event_attributes.SetInParent() + elif event_type == "request_cancel_external_workflow_execution_initiated": + event.request_cancel_external_workflow_execution_initiated_event_attributes.SetInParent() + elif event_type == "request_cancel_external_workflow_execution_failed": + event.request_cancel_external_workflow_execution_failed_event_attributes.SetInParent() + elif event_type == "external_workflow_execution_cancel_requested": + event.external_workflow_execution_cancel_requested_event_attributes.SetInParent() + elif event_type == "workflow_execution_started": + event.workflow_execution_started_event_attributes.SetInParent() + elif event_type == "workflow_execution_completed": + event.workflow_execution_completed_event_attributes.SetInParent() + + events.append(event) + + return events diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 12ae705..6301695 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -86,7 +86,7 @@ def test_initialization(self, mock_client, mock_registry): @pytest.mark.asyncio async def test_handle_task_implementation_success( - self, handler: DecisionTaskHandler, sample_decision_task, mock_registry + self, handler: DecisionTaskHandler, sample_decision_task: PollForDecisionTaskResponse, mock_registry ): """Test successful decision task handling.""" @@ -117,7 +117,7 @@ async def run(self): mock_registry.get_workflow.assert_called_once_with("TestWorkflow") # Verify workflow engine was created and used - mock_engine.process_decision.assert_called_once_with(sample_decision_task) + mock_engine.process_decision.assert_called_once_with(sample_decision_task.history.events) # Verify response was sent handler._client.worker_stub.RespondDecisionTaskCompleted.assert_called_once() @@ -448,7 +448,6 @@ async def run(self): "workflow_run_id": "test_run_id", "workflow_task_list": "test_task_list", "data_converter": handler._client.data_converter, - "workflow_events": [], } # Verify WorkflowEngine was created with correct parameters diff --git a/tests/cadence/worker/test_decision_task_handler_integration.py b/tests/cadence/worker/test_decision_task_handler_integration.py index 768c962..49358df 100644 --- a/tests/cadence/worker/test_decision_task_handler_integration.py +++ b/tests/cadence/worker/test_decision_task_handler_integration.py @@ -124,7 +124,7 @@ async def test_handle_decision_task_success( await decision_task_handler._handle_task_implementation(decision_task) # Verify the workflow engine was called - mock_engine.process_decision.assert_called_once_with(decision_task) + mock_engine.process_decision.assert_called_once_with(decision_task.history.events) # Verify the response was sent mock_client.worker_stub.RespondDecisionTaskCompleted.assert_called_once() diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index e7e72d1..c28f5c4 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -95,7 +95,7 @@ async def run(self): # Verify the complete flow mock_registry.get_workflow.assert_called_once_with("TestWorkflow") - mock_engine.process_decision.assert_called_once_with(sample_decision_task) + mock_engine.process_decision.assert_called_once_with(sample_decision_task.history.events) handler._client.worker_stub.RespondDecisionTaskCompleted.assert_called_once() @pytest.mark.asyncio @@ -277,7 +277,7 @@ async def run(self): # Verify engine was created and used mock_engine_class.assert_called_once() - mock_engine.process_decision.assert_called_once_with(sample_decision_task) + mock_engine.process_decision.assert_called_once_with(sample_decision_task.history.events) @pytest.mark.asyncio async def test_error_handling_with_context_cleanup( From c8d58bc5bfc86ffc300fc65c540223c12e4f9cde Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Fri, 21 Nov 2025 12:26:37 -0800 Subject: [PATCH 3/5] fix some workflow instance bugs Signed-off-by: Shijie Sheng --- .../_internal/workflow/workflow_intance.py | 5 +- .../workflow/test_workflow_engine.py | 52 +++++++++++++++---- .../worker/test_decision_task_handler.py | 9 +++- .../test_decision_task_handler_integration.py | 4 +- .../worker/test_task_handler_integration.py | 8 ++- 5 files changed, 60 insertions(+), 18 deletions(-) diff --git a/cadence/_internal/workflow/workflow_intance.py b/cadence/_internal/workflow/workflow_intance.py index 719dd15..ef7887a 100644 --- a/cadence/_internal/workflow/workflow_intance.py +++ b/cadence/_internal/workflow/workflow_intance.py @@ -1,5 +1,5 @@ from asyncio import Task -from typing import Optional +from typing import Any, Optional from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop from cadence.api.v1.common_pb2 import Payload from cadence.data_converter import DataConverter @@ -19,7 +19,8 @@ def __init__( def start(self, input: Payload): if self._task is None: run_method = self._definition.get_run_method(self._instance) - workflow_input = self._data_converter.from_data(input, []) + # TODO handle multiple inputs + workflow_input = self._data_converter.from_data(input, [Any]) self._task = self._loop.create_task(run_method(*workflow_input)) def is_started(self) -> bool: diff --git a/tests/cadence/_internal/workflow/test_workflow_engine.py b/tests/cadence/_internal/workflow/test_workflow_engine.py index c1e1f43..64d1229 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine.py @@ -1,14 +1,19 @@ #!/usr/bin/env python3 from typing import List import pytest +from cadence.api.v1.common_pb2 import Payload from cadence.api.v1.history_pb2 import ( + DecisionTaskCompletedEventAttributes, + DecisionTaskScheduledEventAttributes, + DecisionTaskStartedEventAttributes, HistoryEvent, + WorkflowExecutionCompletedEventAttributes, + WorkflowExecutionStartedEventAttributes, ) from cadence._internal.workflow.workflow_engine import WorkflowEngine from cadence import workflow from cadence.data_converter import DefaultDataConverter from cadence.workflow import WorkflowInfo, WorkflowDefinition, WorkflowDefinitionOptions -from tests.cadence._internal.workflow.utils import create_mock_history_event class TestWorkflow: @@ -28,15 +33,36 @@ def echo_workflow_definition(self) -> WorkflowDefinition: @pytest.fixture def simple_workflow_events(self) -> List[HistoryEvent]: - return create_mock_history_event( - event_types=[ - "workflow_execution_started", - "decision_task_scheduled", - "decision_task_started", - "decision_task_completed", - "workflow_execution_completed", - ] - ) + return [ + HistoryEvent( + event_id=1, + workflow_execution_started_event_attributes=WorkflowExecutionStartedEventAttributes( + input=Payload(data=b'"test-input"') + ), + ), + HistoryEvent( + event_id=2, + decision_task_scheduled_event_attributes=DecisionTaskScheduledEventAttributes(), + ), + HistoryEvent( + event_id=3, + decision_task_started_event_attributes=DecisionTaskStartedEventAttributes( + scheduled_event_id=2 + ), + ), + HistoryEvent( + event_id=4, + decision_task_completed_event_attributes=DecisionTaskCompletedEventAttributes( + scheduled_event_id=2, + ), + ), + HistoryEvent( + event_id=5, + workflow_execution_completed_event_attributes=WorkflowExecutionCompletedEventAttributes( + result=Payload(data=b'"echo: test-input"') + ), + ), + ] def test_process_simple_workflow( self, @@ -45,8 +71,12 @@ def test_process_simple_workflow( ): workflow_engine = create_workflow_engine(echo_workflow_definition) decision_result = workflow_engine.process_decision(simple_workflow_events[:3]) - assert len(decision_result.decisions) == 1 + assert decision_result.decisions[ + 0 + ].complete_workflow_execution_decision_attributes.result == Payload( + data=b'"echo: test-input"' + ) def create_workflow_engine(workflow_definition: WorkflowDefinition) -> WorkflowEngine: diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 6301695..45e6541 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -86,7 +86,10 @@ def test_initialization(self, mock_client, mock_registry): @pytest.mark.asyncio async def test_handle_task_implementation_success( - self, handler: DecisionTaskHandler, sample_decision_task: PollForDecisionTaskResponse, mock_registry + self, + handler: DecisionTaskHandler, + sample_decision_task: PollForDecisionTaskResponse, + mock_registry, ): """Test successful decision task handling.""" @@ -117,7 +120,9 @@ async def run(self): mock_registry.get_workflow.assert_called_once_with("TestWorkflow") # Verify workflow engine was created and used - mock_engine.process_decision.assert_called_once_with(sample_decision_task.history.events) + mock_engine.process_decision.assert_called_once_with( + sample_decision_task.history.events + ) # Verify response was sent handler._client.worker_stub.RespondDecisionTaskCompleted.assert_called_once() diff --git a/tests/cadence/worker/test_decision_task_handler_integration.py b/tests/cadence/worker/test_decision_task_handler_integration.py index 49358df..71217b1 100644 --- a/tests/cadence/worker/test_decision_task_handler_integration.py +++ b/tests/cadence/worker/test_decision_task_handler_integration.py @@ -124,7 +124,9 @@ async def test_handle_decision_task_success( await decision_task_handler._handle_task_implementation(decision_task) # Verify the workflow engine was called - mock_engine.process_decision.assert_called_once_with(decision_task.history.events) + mock_engine.process_decision.assert_called_once_with( + decision_task.history.events + ) # Verify the response was sent mock_client.worker_stub.RespondDecisionTaskCompleted.assert_called_once() diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index c28f5c4..920f53b 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -95,7 +95,9 @@ async def run(self): # Verify the complete flow mock_registry.get_workflow.assert_called_once_with("TestWorkflow") - mock_engine.process_decision.assert_called_once_with(sample_decision_task.history.events) + mock_engine.process_decision.assert_called_once_with( + sample_decision_task.history.events + ) handler._client.worker_stub.RespondDecisionTaskCompleted.assert_called_once() @pytest.mark.asyncio @@ -277,7 +279,9 @@ async def run(self): # Verify engine was created and used mock_engine_class.assert_called_once() - mock_engine.process_decision.assert_called_once_with(sample_decision_task.history.events) + mock_engine.process_decision.assert_called_once_with( + sample_decision_task.history.events + ) @pytest.mark.asyncio async def test_error_handling_with_context_cleanup( From c338014fe1334b0bebe4f636ea103ddf2f0931c6 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Fri, 21 Nov 2025 12:54:39 -0800 Subject: [PATCH 4/5] revert unwanted changes Signed-off-by: Shijie Sheng --- .../workflow/test_decision_events_iterator.py | 61 ++++++++++++++++++- tests/cadence/_internal/workflow/utils.py | 60 ------------------ 2 files changed, 60 insertions(+), 61 deletions(-) delete mode 100644 tests/cadence/_internal/workflow/utils.py diff --git a/tests/cadence/_internal/workflow/test_decision_events_iterator.py b/tests/cadence/_internal/workflow/test_decision_events_iterator.py index 16591f5..36c93b9 100644 --- a/tests/cadence/_internal/workflow/test_decision_events_iterator.py +++ b/tests/cadence/_internal/workflow/test_decision_events_iterator.py @@ -3,13 +3,14 @@ Tests for Decision Events Iterator. """ +from typing import List import pytest from cadence._internal.workflow.decision_events_iterator import ( DecisionEventsIterator, ) -from tests.cadence._internal.workflow.utils import create_mock_history_event +from cadence.api.v1.history_pb2 import HistoryEvent class TestDecisionEventsIterator: @@ -104,3 +105,61 @@ def test_successful_cases(self, name, event_types, expected): assert batch.replay == expect["replay"] assert batch.replay_current_time_milliseconds == expect["replay_time"] assert batch.next_decision_event_id == expect["next_decision_event_id"] + + +def create_mock_history_event(event_types: List[str]) -> List[HistoryEvent]: + events = [] + for i, event_type in enumerate(event_types): + event = HistoryEvent() + event.event_id = i + 1 + event.event_time.FromMilliseconds((i + 1) * 1000) + + # Set the appropriate attribute based on event type + if event_type == "decision_task_started": + event.decision_task_started_event_attributes.SetInParent() + elif event_type == "decision_task_completed": + event.decision_task_completed_event_attributes.SetInParent() + elif event_type == "decision_task_failed": + event.decision_task_failed_event_attributes.SetInParent() + elif event_type == "decision_task_timed_out": + event.decision_task_timed_out_event_attributes.SetInParent() + elif event_type == "marker_recorded": + event.marker_recorded_event_attributes.SetInParent() + elif event_type == "activity_scheduled": + event.activity_task_scheduled_event_attributes.SetInParent() + elif event_type == "activity_started": + event.activity_task_started_event_attributes.SetInParent() + elif event_type == "activity_completed": + event.activity_task_completed_event_attributes.SetInParent() + elif event_type == "activity_failed": + event.activity_task_failed_event_attributes.SetInParent() + elif event_type == "activity_timed_out": + event.activity_task_timed_out_event_attributes.SetInParent() + elif event_type == "activity_cancel_requested": + event.activity_task_cancel_requested_event_attributes.SetInParent() + elif event_type == "request_cancel_activity_task_failed": + event.request_cancel_activity_task_failed_event_attributes.SetInParent() + elif event_type == "activity_canceled": + event.activity_task_canceled_event_attributes.SetInParent() + elif event_type == "timer_started": + event.timer_started_event_attributes.SetInParent() + elif event_type == "timer_fired": + event.timer_fired_event_attributes.SetInParent() + elif event_type == "timer_canceled": + event.timer_canceled_event_attributes.SetInParent() + elif event_type == "cancel_timer_failed": + event.cancel_timer_failed_event_attributes.SetInParent() + elif event_type == "request_cancel_external_workflow_execution_initiated": + event.request_cancel_external_workflow_execution_initiated_event_attributes.SetInParent() + elif event_type == "request_cancel_external_workflow_execution_failed": + event.request_cancel_external_workflow_execution_failed_event_attributes.SetInParent() + elif event_type == "external_workflow_execution_cancel_requested": + event.external_workflow_execution_cancel_requested_event_attributes.SetInParent() + elif event_type == "workflow_execution_started": + event.workflow_execution_started_event_attributes.SetInParent() + elif event_type == "workflow_execution_completed": + event.workflow_execution_completed_event_attributes.SetInParent() + + events.append(event) + + return events diff --git a/tests/cadence/_internal/workflow/utils.py b/tests/cadence/_internal/workflow/utils.py deleted file mode 100644 index 9ea9d3a..0000000 --- a/tests/cadence/_internal/workflow/utils.py +++ /dev/null @@ -1,60 +0,0 @@ -from typing import List -from cadence.api.v1.history_pb2 import HistoryEvent - - -def create_mock_history_event(event_types: List[str]) -> List[HistoryEvent]: - events = [] - for i, event_type in enumerate(event_types): - event = HistoryEvent() - event.event_id = i + 1 - event.event_time.FromMilliseconds((i + 1) * 1000) - - # Set the appropriate attribute based on event type - if event_type == "decision_task_started": - event.decision_task_started_event_attributes.SetInParent() - elif event_type == "decision_task_completed": - event.decision_task_completed_event_attributes.SetInParent() - elif event_type == "decision_task_failed": - event.decision_task_failed_event_attributes.SetInParent() - elif event_type == "decision_task_timed_out": - event.decision_task_timed_out_event_attributes.SetInParent() - elif event_type == "marker_recorded": - event.marker_recorded_event_attributes.SetInParent() - elif event_type == "activity_scheduled": - event.activity_task_scheduled_event_attributes.SetInParent() - elif event_type == "activity_started": - event.activity_task_started_event_attributes.SetInParent() - elif event_type == "activity_completed": - event.activity_task_completed_event_attributes.SetInParent() - elif event_type == "activity_failed": - event.activity_task_failed_event_attributes.SetInParent() - elif event_type == "activity_timed_out": - event.activity_task_timed_out_event_attributes.SetInParent() - elif event_type == "activity_cancel_requested": - event.activity_task_cancel_requested_event_attributes.SetInParent() - elif event_type == "request_cancel_activity_task_failed": - event.request_cancel_activity_task_failed_event_attributes.SetInParent() - elif event_type == "activity_canceled": - event.activity_task_canceled_event_attributes.SetInParent() - elif event_type == "timer_started": - event.timer_started_event_attributes.SetInParent() - elif event_type == "timer_fired": - event.timer_fired_event_attributes.SetInParent() - elif event_type == "timer_canceled": - event.timer_canceled_event_attributes.SetInParent() - elif event_type == "cancel_timer_failed": - event.cancel_timer_failed_event_attributes.SetInParent() - elif event_type == "request_cancel_external_workflow_execution_initiated": - event.request_cancel_external_workflow_execution_initiated_event_attributes.SetInParent() - elif event_type == "request_cancel_external_workflow_execution_failed": - event.request_cancel_external_workflow_execution_failed_event_attributes.SetInParent() - elif event_type == "external_workflow_execution_cancel_requested": - event.external_workflow_execution_cancel_requested_event_attributes.SetInParent() - elif event_type == "workflow_execution_started": - event.workflow_execution_started_event_attributes.SetInParent() - elif event_type == "workflow_execution_completed": - event.workflow_execution_completed_event_attributes.SetInParent() - - events.append(event) - - return events From 26407e0e6c47ed9d17b991c761df55f751c8ac2a Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Fri, 21 Nov 2025 12:59:08 -0800 Subject: [PATCH 5/5] remove is_started method Signed-off-by: Shijie Sheng --- cadence/_internal/workflow/workflow_intance.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/cadence/_internal/workflow/workflow_intance.py b/cadence/_internal/workflow/workflow_intance.py index ef7887a..ae94780 100644 --- a/cadence/_internal/workflow/workflow_intance.py +++ b/cadence/_internal/workflow/workflow_intance.py @@ -23,9 +23,6 @@ def start(self, input: Payload): workflow_input = self._data_converter.from_data(input, [Any]) self._task = self._loop.create_task(run_method(*workflow_input)) - def is_started(self) -> bool: - return self._task is not None - def run_once(self): self._loop.run_until_yield()