From 78452fbf9967065934d660dfe50f1499f5dfa4d4 Mon Sep 17 00:00:00 2001 From: andy Date: Sun, 21 Jun 2026 11:05:58 +0800 Subject: [PATCH 1/5] fix: prevent orphan SUBMITTED rows when producer fails early (#1067) - Add initial_message parameter to ActiveTaskRegistry.get_or_create() - Pass params.message from DefaultRequestHandlerV2._setup_active_task - Explicitly transition to FAILED state in _run_producer exception handler This fixes a race condition where tasks could be persisted as SUBMITTED with empty history when AgentExecutor.execute() raises before emitting any events. Fixes #1067 --- src/a2a/server/agent_execution/active_task.py | 10 ++++ .../agent_execution/active_task_registry.py | 4 +- .../default_request_handler_v2.py | 1 + .../agent_execution/test_active_task.py | 54 +++++++++++++++++++ 4 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/a2a/server/agent_execution/active_task.py b/src/a2a/server/agent_execution/active_task.py index b0154c8d6..fa3898ce1 100644 --- a/src/a2a/server/agent_execution/active_task.py +++ b/src/a2a/server/agent_execution/active_task.py @@ -558,6 +558,16 @@ async def _run_producer(self) -> None: request_context.context_id or '', ) self._task_created.set() + # Explicitly transition to FAILED state to avoid orphan SUBMITTED rows + # when the consumer may not process the exception event. + failed_status = TaskStatus( + state=TaskState.TASK_STATE_FAILED, + ) + failed_event = TaskStatusUpdateEvent( + task_id=self._task_id, + status=failed_status, + ) + await self._task_manager.save_task_event(failed_event) await self._event_queue_agent.enqueue_event(cast('Event', e)) finally: diff --git a/src/a2a/server/agent_execution/active_task_registry.py b/src/a2a/server/agent_execution/active_task_registry.py index 9c1299ab3..4f5822f68 100644 --- a/src/a2a/server/agent_execution/active_task_registry.py +++ b/src/a2a/server/agent_execution/active_task_registry.py @@ -11,6 +11,7 @@ from a2a.server.context import ServerCallContext from a2a.server.tasks.push_notification_sender import PushNotificationSender from a2a.server.tasks.task_store import TaskStore + from a2a.types.a2a_pb2 import Message from a2a.server.agent_execution.active_task import ActiveTask from a2a.server.tasks.task_manager import TaskManager @@ -41,6 +42,7 @@ async def get_or_create( call_context: ServerCallContext, context_id: str | None = None, create_task_if_missing: bool = False, + initial_message: Message | None = None, ) -> ActiveTask: """Retrieves an existing ActiveTask or creates a new one.""" async with self._lock: @@ -51,7 +53,7 @@ async def get_or_create( task_id=task_id, context_id=context_id, task_store=self._task_store, - initial_message=None, + initial_message=initial_message, context=call_context, ) diff --git a/src/a2a/server/request_handlers/default_request_handler_v2.py b/src/a2a/server/request_handlers/default_request_handler_v2.py index 30304609a..28b60a20c 100644 --- a/src/a2a/server/request_handlers/default_request_handler_v2.py +++ b/src/a2a/server/request_handlers/default_request_handler_v2.py @@ -222,6 +222,7 @@ async def _setup_active_task( context_id=context_id, call_context=call_context, create_task_if_missing=True, + initial_message=params.message, ) return active_task, request_context diff --git a/tests/server/agent_execution/test_active_task.py b/tests/server/agent_execution/test_active_task.py index ce9e2c068..402ed8b2c 100644 --- a/tests/server/agent_execution/test_active_task.py +++ b/tests/server/agent_execution/test_active_task.py @@ -895,3 +895,57 @@ async def execute_mock(req, q): assert len(events) == 0 await active_task.cancel(request_context) + + +@pytest.mark.asyncio +async def test_active_task_producer_exception_sets_failed_state(): + """Test that producer exception explicitly sets FAILED state. + + Regression test for #1067: ActiveTaskRegistry produces orphan SUBMITTED rows + when AgentExecutor.execute() raises before emitting any events. + """ + agent_executor = Mock() + task_manager = Mock() + request_context = Mock(spec=RequestContext) + request_context.context_id = 'test-context-id' + request_context.call_context = ServerCallContext() + + active_task = ActiveTask( + agent_executor=agent_executor, + task_id='test-task-id', + task_manager=task_manager, + push_sender=Mock(), + ) + + # Simulate executor raising an exception before emitting any events + async def execute_mock(req, q): + raise RuntimeError('Executor failed before emitting events') + + agent_executor.execute = AsyncMock(side_effect=execute_mock) + agent_executor.cancel = AsyncMock() + task_manager.get_task = AsyncMock(return_value=None) + task_manager.ensure_task_id = AsyncMock( + return_value=Task( + id='test-task-id', + status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED), + ) + ) + task_manager.save_task_event = AsyncMock() + task_manager.process = AsyncMock(side_effect=lambda x: x) + + await active_task.enqueue_request(request_context) + await active_task.start( + call_context=ServerCallContext(), create_task_if_missing=True + ) + + # Wait for producer to finish + await asyncio.sleep(0.2) + + # Verify that save_task_event was called with a FAILED status + assert task_manager.save_task_event.called + # Get the last call to save_task_event + last_call = task_manager.save_task_event.call_args + if last_call: + event = last_call.args[0] + assert isinstance(event, TaskStatusUpdateEvent) + assert event.status.state == TaskState.TASK_STATE_FAILED From 62491a78d49afb8cd27a5c05f8bf5019fd791f75 Mon Sep 17 00:00:00 2001 From: andy Date: Sun, 21 Jun 2026 11:15:27 +0800 Subject: [PATCH 2/5] fix: add context_id to TaskStatusUpdateEvent in producer exception handler CR feedback: The newly created TaskStatusUpdateEvent was missing context_id, which would trigger InvalidParamsError during validation in TaskManager.save_task_event. Also added assertion in test to prevent future regressions. --- src/a2a/server/agent_execution/active_task.py | 1 + tests/server/agent_execution/test_active_task.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/a2a/server/agent_execution/active_task.py b/src/a2a/server/agent_execution/active_task.py index fa3898ce1..02877a963 100644 --- a/src/a2a/server/agent_execution/active_task.py +++ b/src/a2a/server/agent_execution/active_task.py @@ -565,6 +565,7 @@ async def _run_producer(self) -> None: ) failed_event = TaskStatusUpdateEvent( task_id=self._task_id, + context_id=request_context.context_id or '', status=failed_status, ) await self._task_manager.save_task_event(failed_event) diff --git a/tests/server/agent_execution/test_active_task.py b/tests/server/agent_execution/test_active_task.py index 402ed8b2c..34e68a74b 100644 --- a/tests/server/agent_execution/test_active_task.py +++ b/tests/server/agent_execution/test_active_task.py @@ -949,3 +949,5 @@ async def execute_mock(req, q): event = last_call.args[0] assert isinstance(event, TaskStatusUpdateEvent) assert event.status.state == TaskState.TASK_STATE_FAILED + # Regression test: context_id must be set to avoid InvalidParamsError + assert event.context_id == 'test-context-id' From 3c938f7a4ad0d561cbc531360074cefd9239aad2 Mon Sep 17 00:00:00 2001 From: andy Date: Sun, 21 Jun 2026 17:42:01 +0800 Subject: [PATCH 3/5] revert: remove save_task_event from producer exception handler The explicit FAILED state transition in producer exception handler breaks existing tests by changing exception propagation behavior. Need to find alternative solution for orphan SUBMITTED rows issue. --- src/a2a/server/agent_execution/active_task.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/a2a/server/agent_execution/active_task.py b/src/a2a/server/agent_execution/active_task.py index 02877a963..b0154c8d6 100644 --- a/src/a2a/server/agent_execution/active_task.py +++ b/src/a2a/server/agent_execution/active_task.py @@ -558,17 +558,6 @@ async def _run_producer(self) -> None: request_context.context_id or '', ) self._task_created.set() - # Explicitly transition to FAILED state to avoid orphan SUBMITTED rows - # when the consumer may not process the exception event. - failed_status = TaskStatus( - state=TaskState.TASK_STATE_FAILED, - ) - failed_event = TaskStatusUpdateEvent( - task_id=self._task_id, - context_id=request_context.context_id or '', - status=failed_status, - ) - await self._task_manager.save_task_event(failed_event) await self._event_queue_agent.enqueue_event(cast('Event', e)) finally: From 0aae6c8f815b981c21434b0484cf9d45a10d6c2d Mon Sep 17 00:00:00 2001 From: andy Date: Sun, 21 Jun 2026 20:52:27 +0800 Subject: [PATCH 4/5] fix: remove test for save_task_event in producer exception handler Test was added for the explicit FAILED state transition which was reverted due to breaking existing tests. The initial_message fix remains as the solution for orphan SUBMITTED rows. --- .../agent_execution/test_active_task.py | 53 ------------------- 1 file changed, 53 deletions(-) diff --git a/tests/server/agent_execution/test_active_task.py b/tests/server/agent_execution/test_active_task.py index 34e68a74b..e620b467a 100644 --- a/tests/server/agent_execution/test_active_task.py +++ b/tests/server/agent_execution/test_active_task.py @@ -897,57 +897,4 @@ async def execute_mock(req, q): await active_task.cancel(request_context) -@pytest.mark.asyncio -async def test_active_task_producer_exception_sets_failed_state(): - """Test that producer exception explicitly sets FAILED state. - - Regression test for #1067: ActiveTaskRegistry produces orphan SUBMITTED rows - when AgentExecutor.execute() raises before emitting any events. - """ - agent_executor = Mock() - task_manager = Mock() - request_context = Mock(spec=RequestContext) - request_context.context_id = 'test-context-id' - request_context.call_context = ServerCallContext() - - active_task = ActiveTask( - agent_executor=agent_executor, - task_id='test-task-id', - task_manager=task_manager, - push_sender=Mock(), - ) - - # Simulate executor raising an exception before emitting any events - async def execute_mock(req, q): - raise RuntimeError('Executor failed before emitting events') - - agent_executor.execute = AsyncMock(side_effect=execute_mock) - agent_executor.cancel = AsyncMock() - task_manager.get_task = AsyncMock(return_value=None) - task_manager.ensure_task_id = AsyncMock( - return_value=Task( - id='test-task-id', - status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED), - ) - ) - task_manager.save_task_event = AsyncMock() - task_manager.process = AsyncMock(side_effect=lambda x: x) - - await active_task.enqueue_request(request_context) - await active_task.start( - call_context=ServerCallContext(), create_task_if_missing=True - ) - # Wait for producer to finish - await asyncio.sleep(0.2) - - # Verify that save_task_event was called with a FAILED status - assert task_manager.save_task_event.called - # Get the last call to save_task_event - last_call = task_manager.save_task_event.call_args - if last_call: - event = last_call.args[0] - assert isinstance(event, TaskStatusUpdateEvent) - assert event.status.state == TaskState.TASK_STATE_FAILED - # Regression test: context_id must be set to avoid InvalidParamsError - assert event.context_id == 'test-context-id' From e8178d59ede16d30755350cd13eae4ebde7a0c16 Mon Sep 17 00:00:00 2001 From: andy Date: Sun, 21 Jun 2026 20:54:54 +0800 Subject: [PATCH 5/5] style: fix ruff formatting in test file --- tests/server/agent_execution/test_active_task.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/server/agent_execution/test_active_task.py b/tests/server/agent_execution/test_active_task.py index e620b467a..ce9e2c068 100644 --- a/tests/server/agent_execution/test_active_task.py +++ b/tests/server/agent_execution/test_active_task.py @@ -895,6 +895,3 @@ async def execute_mock(req, q): assert len(events) == 0 await active_task.cancel(request_context) - - -