From d337838382df51ad541724feef1dd1cf31b6dc63 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 14 Jan 2026 13:34:12 -0800 Subject: [PATCH 1/6] Add support for more timeouts to Nexus operations Add schedule-to-start and start-to-close for Nexus operations --- temporalio/worker/_interceptor.py | 2 ++ temporalio/worker/_workflow_instance.py | 12 ++++++++ temporalio/workflow.py | 40 +++++++++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/temporalio/worker/_interceptor.py b/temporalio/worker/_interceptor.py index d0d83da20..b3a0a12cc 100644 --- a/temporalio/worker/_interceptor.py +++ b/temporalio/worker/_interceptor.py @@ -303,6 +303,8 @@ class StartNexusOperationInput(Generic[InputT, OutputT]): operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any] input: InputT schedule_to_close_timeout: timedelta | None + schedule_to_start_timeout: timedelta | None + start_to_close_timeout: timedelta | None cancellation_type: temporalio.workflow.NexusOperationCancellationType headers: Mapping[str, str] | None summary: str | None diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 26169f257..3b1c3e878 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1596,6 +1596,8 @@ async def workflow_start_nexus_operation( input: Any, output_type: type[OutputT] | None, schedule_to_close_timeout: timedelta | None, + schedule_to_start_timeout: timedelta | None, + start_to_close_timeout: timedelta | None, cancellation_type: temporalio.workflow.NexusOperationCancellationType, headers: Mapping[str, str] | None, summary: str | None, @@ -1609,6 +1611,8 @@ async def workflow_start_nexus_operation( input=input, output_type=output_type, schedule_to_close_timeout=schedule_to_close_timeout, + schedule_to_start_timeout=schedule_to_start_timeout, + start_to_close_timeout=start_to_close_timeout, cancellation_type=cancellation_type, headers=headers, summary=summary, @@ -3340,6 +3344,14 @@ def _apply_schedule_command(self) -> None: v.schedule_to_close_timeout.FromTimedelta( self._input.schedule_to_close_timeout ) + if self._input.schedule_to_start_timeout is not None: + v.schedule_to_start_timeout.FromTimedelta( + self._input.schedule_to_start_timeout + ) + if self._input.start_to_close_timeout is not None: + v.start_to_close_timeout.FromTimedelta( + self._input.start_to_close_timeout + ) v.cancellation_type = cast( temporalio.bridge.proto.nexus.NexusOperationCancellationType.ValueType, int(self._input.cancellation_type), diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 441dac6be..ec0d0cac4 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -5418,6 +5418,8 @@ async def start_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5433,6 +5435,8 @@ async def start_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5451,6 +5455,8 @@ async def start_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5469,6 +5475,8 @@ async def start_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5487,6 +5495,8 @@ async def start_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5504,6 +5514,8 @@ async def start_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5517,6 +5529,8 @@ async def start_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5528,6 +5542,8 @@ async def start_operation( input: The Nexus operation input. output_type: The Nexus operation output type. schedule_to_close_timeout: Timeout for the entire operation attempt. + schedule_to_start_timeout: Timeout for the operation to be started. + start_to_close_timeout: Timeout for async operations to complete after starting. headers: Headers to send with the Nexus HTTP request. Returns: @@ -5548,6 +5564,8 @@ async def execute_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5563,6 +5581,8 @@ async def execute_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5581,6 +5601,8 @@ async def execute_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5599,6 +5621,8 @@ async def execute_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5617,6 +5641,8 @@ async def execute_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5635,6 +5661,8 @@ async def execute_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5648,6 +5676,8 @@ async def execute_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5659,6 +5689,8 @@ async def execute_operation( input: The Nexus operation input. output_type: The Nexus operation output type. schedule_to_close_timeout: Timeout for the entire operation attempt. + schedule_to_start_timeout: Timeout for the operation to be started. + start_to_close_timeout: Timeout for async operations to complete after starting. headers: Headers to send with the Nexus HTTP request. Returns: @@ -5701,6 +5733,8 @@ async def start_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5713,6 +5747,8 @@ async def start_operation( input=input, output_type=output_type, schedule_to_close_timeout=schedule_to_close_timeout, + schedule_to_start_timeout=schedule_to_start_timeout, + start_to_close_timeout=start_to_close_timeout, cancellation_type=cancellation_type, headers=headers, summary=summary, @@ -5726,6 +5762,8 @@ async def execute_operation( *, output_type: type[OutputT] | None = None, schedule_to_close_timeout: timedelta | None = None, + schedule_to_start_timeout: timedelta | None = None, + start_to_close_timeout: timedelta | None = None, cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED, headers: Mapping[str, str] | None = None, summary: str | None = None, @@ -5735,6 +5773,8 @@ async def execute_operation( input, output_type=output_type, schedule_to_close_timeout=schedule_to_close_timeout, + schedule_to_start_timeout=schedule_to_start_timeout, + start_to_close_timeout=start_to_close_timeout, cancellation_type=cancellation_type, headers=headers, summary=summary, From 2100a072b3fc4eb4a3b0b4e4d08a87f0d68d7381 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 14 Jan 2026 13:41:03 -0800 Subject: [PATCH 2/6] fix format --- temporalio/worker/_workflow_instance.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 3b1c3e878..9c22c05ce 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -3349,9 +3349,7 @@ def _apply_schedule_command(self) -> None: self._input.schedule_to_start_timeout ) if self._input.start_to_close_timeout is not None: - v.start_to_close_timeout.FromTimedelta( - self._input.start_to_close_timeout - ) + v.start_to_close_timeout.FromTimedelta(self._input.start_to_close_timeout) v.cancellation_type = cast( temporalio.bridge.proto.nexus.NexusOperationCancellationType.ValueType, int(self._input.cancellation_type), From 8d0939dd0e28da98ceee84ed0248413ce10f6984 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 14 Jan 2026 13:47:51 -0800 Subject: [PATCH 3/6] fic workflow ABC --- temporalio/workflow.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index ec0d0cac4..8c7b47db7 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -856,6 +856,8 @@ async def workflow_start_nexus_operation( input: Any, output_type: type[OutputT] | None, schedule_to_close_timeout: timedelta | None, + schedule_to_start_timeout: timedelta | None, + start_to_close_timeout: timedelta | None, cancellation_type: temporalio.workflow.NexusOperationCancellationType, headers: Mapping[str, str] | None, summary: str | None, From c1e512ee47a377df530b34d6671fad68f2f7d1fe Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 17 Feb 2026 13:21:35 -0800 Subject: [PATCH 4/6] Add test --- tests/nexus/test_workflow_caller_errors.py | 150 +++++++++++++++++++++ 1 file changed, 150 insertions(+) diff --git a/tests/nexus/test_workflow_caller_errors.py b/tests/nexus/test_workflow_caller_errors.py index 2cfc3a36d..b3519c6ad 100644 --- a/tests/nexus/test_workflow_caller_errors.py +++ b/tests/nexus/test_workflow_caller_errors.py @@ -30,6 +30,7 @@ ApplicationError, NexusOperationError, TimeoutError, + TimeoutType, ) from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker @@ -323,6 +324,155 @@ async def test_error_raised_by_timeout_of_nexus_start_operation( assert capturer.find_log("unexpected cancellation reason") is None +# Schedule to start timeout test +@service_handler +class ScheduleToStartTimeoutTestService: + @sync_operation + async def expect_schedule_to_start_timeout( + self, ctx: StartOperationContext, _input: None + ) -> None: + try: + await asyncio.wait_for(ctx.task_cancellation.wait_until_cancelled(), 1) + except asyncio.TimeoutError: + raise ApplicationError("expected cancel", non_retryable=True) + + +@workflow.defn +class ScheduleToStartTimeoutTestCallerWorkflow: + @workflow.init + def __init__(self): + self.nexus_client = workflow.create_nexus_client( + service=ScheduleToStartTimeoutTestService, + endpoint=make_nexus_endpoint_name(workflow.info().task_queue), + ) + + @workflow.run + async def run(self) -> None: + await self.nexus_client.execute_operation( + ScheduleToStartTimeoutTestService.expect_schedule_to_start_timeout, + None, + output_type=None, + schedule_to_start_timeout=timedelta(seconds=0.1), + ) + + +async def test_error_raised_by_schedule_to_start_timeout_of_nexus_operation( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + async with Worker( + client, + nexus_service_handlers=[ScheduleToStartTimeoutTestService()], + workflows=[ScheduleToStartTimeoutTestCallerWorkflow], + task_queue=task_queue, + nexus_task_executor=concurrent.futures.ThreadPoolExecutor(), + ): + await create_nexus_endpoint(task_queue, client) + try: + await client.execute_workflow( + ScheduleToStartTimeoutTestCallerWorkflow.run, + id=str(uuid.uuid4()), + task_queue=task_queue, + ) + except Exception as err: + assert isinstance(err, WorkflowFailureError) + assert isinstance(err.__cause__, NexusOperationError) + assert isinstance(err.__cause__.__cause__, TimeoutError) + timeout_err = err.__cause__.__cause__ + assert timeout_err.type == TimeoutType.SCHEDULE_TO_START + else: + pytest.fail( + "Expected exception due to schedule to start timeout of nexus operation" + ) + + +# Start to close timeout test + + +class OperationThatExpectsStartToCloseTimeoutAsync(OperationHandler[None, None]): + async def start( + self, ctx: StartOperationContext, input: None + ) -> StartOperationResultAsync: + return StartOperationResultAsync("fake-token") + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + pass + + +class OperationThatExpectsStartToCloseTimeoutSync(OperationHandler[None, None]): + async def start( + self, ctx: StartOperationContext, input: None + ) -> StartOperationResultAsync: + return StartOperationResultAsync("fake-token") + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + pass + + +@service_handler +class StartToCloseTimeoutTestService: + @operation_handler + def expect_start_to_close_timeout(self) -> OperationHandler[None, None]: + return OperationThatExpectsStartToCloseTimeoutAsync() + + +@workflow.defn +class StartToCloseTimeoutTestCallerWorkflow: + @workflow.init + def __init__( + self, + ): + self.nexus_client = workflow.create_nexus_client( + service=StartToCloseTimeoutTestService, + endpoint=make_nexus_endpoint_name(workflow.info().task_queue), + ) + + @workflow.run + async def run(self) -> None: + op_handle = await self.nexus_client.start_operation( + StartToCloseTimeoutTestService.expect_start_to_close_timeout, + None, + start_to_close_timeout=timedelta(seconds=0.1), + ) + await op_handle + + +async def test_error_raised_by_start_to_close_timeout_of_nexus_operation( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + async with Worker( + client, + nexus_service_handlers=[StartToCloseTimeoutTestService()], + workflows=[StartToCloseTimeoutTestCallerWorkflow], + task_queue=task_queue, + nexus_task_executor=concurrent.futures.ThreadPoolExecutor(), + ): + await create_nexus_endpoint(task_queue, client) + try: + await client.execute_workflow( + StartToCloseTimeoutTestCallerWorkflow.run, + id=str(uuid.uuid4()), + task_queue=task_queue, + ) + except Exception as err: + assert isinstance(err, WorkflowFailureError) + assert isinstance(err.__cause__, NexusOperationError) + timeout_err = err.__cause__.__cause__ + assert isinstance(timeout_err, TimeoutError) + assert timeout_err.type == TimeoutType.START_TO_CLOSE + else: + pytest.fail( + "Expected exception due to start to close timeout of nexus operation" + ) + + # Cancellation timeout test From c7b0ff0859700421ebf4f4f654119acac3c8ca2d Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 18 Feb 2026 09:23:20 -0800 Subject: [PATCH 5/6] Remove unused class --- tests/nexus/test_workflow_caller_errors.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/nexus/test_workflow_caller_errors.py b/tests/nexus/test_workflow_caller_errors.py index b3519c6ad..47e37cbab 100644 --- a/tests/nexus/test_workflow_caller_errors.py +++ b/tests/nexus/test_workflow_caller_errors.py @@ -402,16 +402,6 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None: pass -class OperationThatExpectsStartToCloseTimeoutSync(OperationHandler[None, None]): - async def start( - self, ctx: StartOperationContext, input: None - ) -> StartOperationResultAsync: - return StartOperationResultAsync("fake-token") - - async def cancel(self, ctx: CancelOperationContext, token: str) -> None: - pass - - @service_handler class StartToCloseTimeoutTestService: @operation_handler From a78070708e4a6f6dbba4b3326b0c4b5b1ea08ebd Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 19 Feb 2026 12:35:26 -0800 Subject: [PATCH 6/6] Fix rebase --- tests/nexus/test_workflow_caller_errors.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/nexus/test_workflow_caller_errors.py b/tests/nexus/test_workflow_caller_errors.py index 47e37cbab..17f39ad39 100644 --- a/tests/nexus/test_workflow_caller_errors.py +++ b/tests/nexus/test_workflow_caller_errors.py @@ -370,7 +370,9 @@ async def test_error_raised_by_schedule_to_start_timeout_of_nexus_operation( task_queue=task_queue, nexus_task_executor=concurrent.futures.ThreadPoolExecutor(), ): - await create_nexus_endpoint(task_queue, client) + await env.create_nexus_endpoint( + make_nexus_endpoint_name(task_queue), task_queue + ) try: await client.execute_workflow( ScheduleToStartTimeoutTestCallerWorkflow.run, @@ -444,7 +446,9 @@ async def test_error_raised_by_start_to_close_timeout_of_nexus_operation( task_queue=task_queue, nexus_task_executor=concurrent.futures.ThreadPoolExecutor(), ): - await create_nexus_endpoint(task_queue, client) + await env.create_nexus_endpoint( + make_nexus_endpoint_name(task_queue), task_queue + ) try: await client.execute_workflow( StartToCloseTimeoutTestCallerWorkflow.run,