From dfb9f73afa6941281a6732eb98dd5e791c56561c Mon Sep 17 00:00:00 2001 From: Bartek Wolowiec Date: Wed, 6 May 2026 12:45:18 +0000 Subject: [PATCH] High level architecture documentation. --- src/a2a/server/agent_execution/active_task.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/a2a/server/agent_execution/active_task.py b/src/a2a/server/agent_execution/active_task.py index 268dd1999..fe8fca38c 100644 --- a/src/a2a/server/agent_execution/active_task.py +++ b/src/a2a/server/agent_execution/active_task.py @@ -1,4 +1,38 @@ # ruff: noqa: TRY301, SLF001 +"""Active Task execution and data flow architecture. + +This module manages the lifecycle, execution, and data flow of an active A2A task. + +High-Level Architecture: +- `AgentExecutor`: An interface with two main methods, `execute(...)` and `cancel(...)`, + responsible for running the core agent logic. +- `ActiveTask`: Coordinates the execution. It runs a main agent loop in `ActiveTask._run_producer()`. + +Data Flow and Event Handling: +1. Producer (`ActiveTask._run_producer`): + - Listens for incoming requests from `ActiveTask._request_queue`. + - Acquires `ActiveTask._request_lock` to ensure requests are processed sequentially. + - Calls `AgentExecutor.execute()` passing `ActiveTask._event_queue_agent` as the primary communication channel. + - Enqueues internal lifecycle events (e.g., `_RequestStarted`, `_RequestCompleted`) and + exceptions to the same `ActiveTask._event_queue_agent`. + +2. Consumer (`EventConsumer`): + - Consumes events from `ActiveTask._event_queue_agent`. + - Processes all events, updating task state and the database (`TaskManager`). + - Upon receiving `_RequestCompleted`, it releases `ActiveTask._request_lock`, making the producer + ready to process the next queued request. + - Propagates relevant agent updates and state changes to `ActiveTask._event_queue_subscribers`. + - Exception Handling: Re-raises exceptions dequeued from the agent queue, or raises its own validation + errors (e.g., 'Received Message object in task mode.'). In case of failure, it updates the task + state to failed and propagates the exception to `ActiveTask._event_queue_subscribers`. + +3. Subscribers (`ActiveTask.subscribe`): + - `ActiveTask._event_queue_subscribers` is not consumed directly. Instead, it is consumed by tapped + queues created within the `ActiveTask.subscribe()` method. + - The `ActiveTask.subscribe()` method is also used to actively run requests, creating a temporary + subscription that yields events and automatically finishes once the request is completed. +""" + from __future__ import annotations import asyncio