diff --git a/.gitignore b/.gitignore index da436cd090..283f626207 100644 --- a/.gitignore +++ b/.gitignore @@ -242,3 +242,7 @@ python/dotnet-ref # Generated filtered solution files (created by eng/scripts/New-FilteredSolution.ps1) dotnet/filtered-*.slnx **/*.lscache + +# Local tool state +.omc/ +.omx/ diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 2fd3f35213..111ef61e3b 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -528,6 +528,7 @@ def _convert_workflow_events_to_agent_response( raw_representations.append(output_event) else: data = output_event.data + if isinstance(data, AgentResponseUpdate): # We cannot support AgentResponseUpdate in non-streaming mode. This is because the message # sequence cannot be guaranteed when there are streaming updates in between non-streaming @@ -628,16 +629,23 @@ def _convert_workflow_event_to_agent_response_updates( A list of AgentResponseUpdate objects. Empty list if the event is not relevant. """ if event.type == "output": - # Convert workflow output to agent response updates. - # Handle different data types appropriately. data = event.data executor_id = event.executor_id if isinstance(data, AgentResponseUpdate): - # Pass through AgentResponseUpdate directly (streaming from AgentExecutor) - if not data.author_name: - data.author_name = executor_id - return [data] + # Construct a fresh AgentResponseUpdate so we don't mutate a payload + # that AgentExecutor still holds a reference to in its `updates` list. + return [ + AgentResponseUpdate( + contents=list(data.contents), + role=data.role, + author_name=data.author_name or executor_id, + response_id=data.response_id, + message_id=data.message_id, + created_at=data.created_at, + raw_representation=data.raw_representation, + ) + ] if isinstance(data, AgentResponse): # Convert each message in AgentResponse to an AgentResponseUpdate updates: list[AgentResponseUpdate] = [] diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 626a02199b..9b16b1f291 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -156,8 +156,9 @@ def __init__( the agent run. - "custom": use the provided context_filter function to determine which messages to include as context for the agent run. - context_filter: An optional function for filtering conversation context when context_mode is set - to "custom". + context_filter: A function that takes the full conversation (list of Messages) as input and returns + a filtered list of Messages to be used as context for the agent run. This is required + if context_mode is set to "custom". """ # Prefer provided id; else use agent.name if present; else generate deterministic prefix exec_id = id or resolve_agent_id(agent) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_executor.py b/python/packages/core/agent_framework/_workflows/_workflow_executor.py index afb6145251..847b44863c 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_executor.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_executor.py @@ -361,7 +361,7 @@ def can_handle(self, message: WorkflowMessage) -> bool: return any(is_instance_of(message.data, input_type) for input_type in self.workflow.input_types) @handler - async def process_workflow(self, input_data: object, ctx: WorkflowContext[Any]) -> None: + async def process_workflow(self, input_data: object, ctx: WorkflowContext[Any, Any]) -> None: """Execute the sub-workflow with raw input data. This handler starts a new sub-workflow execution. When the sub-workflow @@ -428,7 +428,7 @@ async def process_workflow(self, input_data: object, ctx: WorkflowContext[Any]) async def handle_message_wrapped_request_response( self, response: SubWorkflowResponseMessage, - ctx: WorkflowContext[Any], + ctx: WorkflowContext[Any, Any], ) -> None: """Handle response from parent for a forwarded request. diff --git a/python/packages/core/tests/workflow/test_workflow_kwargs.py b/python/packages/core/tests/workflow/test_workflow_kwargs.py index bba808f87a..9c664c6ac2 100644 --- a/python/packages/core/tests/workflow/test_workflow_kwargs.py +++ b/python/packages/core/tests/workflow/test_workflow_kwargs.py @@ -232,16 +232,18 @@ def simple_selector(state: GroupChatState) -> str: async def test_kwargs_stored_in_state() -> None: """Test that function_invocation_kwargs are stored in State with the correct key.""" - from agent_framework import Executor, WorkflowContext, handler + from typing_extensions import Never + + from agent_framework import AgentResponse, Executor, WorkflowContext, handler stored_kwargs: dict[str, Any] | None = None class _StateInspector(Executor): @handler - async def inspect(self, msgs: list[Message], ctx: WorkflowContext[list[Message]]) -> None: + async def inspect(self, msgs: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None: nonlocal stored_kwargs stored_kwargs = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY) - await ctx.send_message(msgs) + await ctx.yield_output(AgentResponse(messages=msgs)) inspector = _StateInspector(id="inspector") workflow = SequentialBuilder(participants=[inspector]).build() @@ -256,16 +258,18 @@ async def inspect(self, msgs: list[Message], ctx: WorkflowContext[list[Message]] async def test_empty_kwargs_stored_as_empty_dict() -> None: """Test that empty kwargs are stored as empty dict in State.""" - from agent_framework import Executor, WorkflowContext, handler + from typing_extensions import Never + + from agent_framework import AgentResponse, Executor, WorkflowContext, handler stored_kwargs: Any = "NOT_CHECKED" class _StateChecker(Executor): @handler - async def check(self, msgs: list[Message], ctx: WorkflowContext[list[Message]]) -> None: + async def check(self, msgs: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None: nonlocal stored_kwargs stored_kwargs = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY) - await ctx.send_message(msgs) + await ctx.yield_output(AgentResponse(messages=msgs)) checker = _StateChecker(id="checker") workflow = SequentialBuilder(participants=[checker]).build() @@ -695,7 +699,9 @@ async def test_subworkflow_kwargs_accessible_via_state() -> None: Verifies that WORKFLOW_RUN_KWARGS_KEY is populated in the subworkflow's State with kwargs from the parent workflow. """ - from agent_framework import Executor, WorkflowContext, handler + from typing_extensions import Never + + from agent_framework import AgentResponse, Executor, WorkflowContext, handler from agent_framework._workflows._workflow_executor import WorkflowExecutor captured_kwargs_from_state: list[dict[str, Any]] = [] @@ -704,10 +710,10 @@ class _StateReader(Executor): """Executor that reads kwargs from State for verification.""" @handler - async def read_kwargs(self, msgs: list[Message], ctx: WorkflowContext[list[Message]]) -> None: + async def read_kwargs(self, msgs: list[Message], ctx: WorkflowContext[Never, AgentResponse]) -> None: kwargs_from_state = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY) captured_kwargs_from_state.append(kwargs_from_state or {}) - await ctx.send_message(msgs) + await ctx.yield_output(AgentResponse(messages=msgs)) # Build inner workflow with State reader state_reader = _StateReader(id="state_reader") diff --git a/python/packages/foundry/tests/foundry/test_foundry_embedding_client.py b/python/packages/foundry/tests/foundry/test_foundry_embedding_client.py index 664123637d..f005a737dc 100644 --- a/python/packages/foundry/tests/foundry/test_foundry_embedding_client.py +++ b/python/packages/foundry/tests/foundry/test_foundry_embedding_client.py @@ -303,6 +303,7 @@ def _foundry_integration_tests_enabled() -> bool: class TestFoundryEmbeddingIntegration: """Integration tests requiring a live Foundry inference endpoint.""" + @pytest.mark.skip(reason="Flaky in merge queue, blocking unrelated PRs. Tracked in #5553.") @pytest.mark.flaky @pytest.mark.integration @skip_if_foundry_inference_integration_tests_disabled diff --git a/python/packages/foundry_hosting/tests/test_responses_int.py b/python/packages/foundry_hosting/tests/test_responses_int.py index e64976989b..24c590f25c 100644 --- a/python/packages/foundry_hosting/tests/test_responses_int.py +++ b/python/packages/foundry_hosting/tests/test_responses_int.py @@ -559,6 +559,7 @@ async def test_tool_call_streaming(self, server_with_tools: ResponsesHostServer) class TestOptions: """Verify chat options are passed through to the model.""" + @pytest.mark.skip(reason="Flaky in merge queue, blocking unrelated PRs. Tracked in #5553.") @pytest.mark.flaky @pytest.mark.integration @skip_if_foundry_hosting_integration_tests_disabled diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py b/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py index 86c85cc079..a4108b23f0 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from typing import Any, ClassVar, TypeAlias -from agent_framework._types import Message +from agent_framework._types import AgentResponse, AgentResponseUpdate, Message from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse from agent_framework._workflows._events import WorkflowEvent from agent_framework._workflows._executor import Executor, handler @@ -351,8 +351,10 @@ async def _check_termination(self) -> bool: result = await result return result - async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: - """Check termination conditions and yield completion if met. + async def _check_terminate_and_yield( + self, ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate] + ) -> bool: + """Check termination conditions and yield the completion message if met. Args: ctx: Workflow context for yielding output @@ -362,12 +364,37 @@ async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Mess """ terminate = await self._check_termination() if terminate: - self._append_messages([self._create_completion_message(self.TERMINATION_CONDITION_MET_MESSAGE)]) - await ctx.yield_output(self._full_conversation) + completion_message = self._create_completion_message(self.TERMINATION_CONDITION_MET_MESSAGE) + self._append_messages([completion_message]) + await self._yield_completion(ctx, completion_message) return True return False + async def _yield_completion( + self, + ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate], + completion_message: Message, + ) -> None: + """Yield a synthesized terminal completion message in the right shape for the run mode. + + Mode-aware to mirror ``AgentExecutor`` semantics: + - Streaming (``ctx.is_streaming()``): yield a single ``AgentResponseUpdate`` so the + ``output`` event stream stays uniformly per-chunk. + - Non-streaming: yield the full ``AgentResponse``. + """ + if ctx.is_streaming(): + await ctx.yield_output( + AgentResponseUpdate( + contents=list(completion_message.contents), + role=completion_message.role, + author_name=completion_message.author_name, + message_id=completion_message.message_id, + ) + ) + else: + await ctx.yield_output(AgentResponse(messages=[completion_message])) + def _create_completion_message(self, message: str) -> Message: """Create a standardized completion message. @@ -490,8 +517,10 @@ def _check_round_limit(self) -> bool: return False - async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: - """Check round limit and yield completion if reached. + async def _check_round_limit_and_yield( + self, ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate] + ) -> bool: + """Check round limit and yield the max-rounds completion message if reached. Args: ctx: Workflow context for yielding output @@ -501,8 +530,9 @@ async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Me """ reach_max_rounds = self._check_round_limit() if reach_max_rounds: - self._append_messages([self._create_completion_message(self.MAX_ROUNDS_MET_MESSAGE)]) - await ctx.yield_output(self._full_conversation) + completion_message = self._create_completion_message(self.MAX_ROUNDS_MET_MESSAGE) + self._append_messages([completion_message]) + await self._yield_completion(ctx, completion_message) return True return False diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py b/python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py index d73b7e322b..e1a931019a 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_concurrent.py @@ -6,7 +6,7 @@ from collections.abc import Callable, Sequence from typing import Any -from agent_framework import Message, SupportsAgentRun +from agent_framework import AgentResponse, Message, SupportsAgentRun from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage @@ -71,18 +71,20 @@ async def from_messages( class _AggregateAgentConversations(Executor): - """Aggregates agent responses and completes with combined ChatMessages. + """Aggregates agent responses and completes with a single AgentResponse. - Emits a list[Message] shaped as: - [ single_user_prompt?, agent1_final_assistant, agent2_final_assistant, ... ] + Emits an `AgentResponse` whose `messages` are the final assistant message from each + participant (one message per agent), in deterministic participant order matching + the fan-in `sources` configuration. The user prompt is intentionally not included — + that is part of the input, not the answer. - - Extracts a single user prompt (first user message seen across results). - - For each result, selects the final assistant message (prefers agent_response.messages). - - Avoids duplicating the same user message per agent. + For each participant the final assistant message is sourced from + `r.agent_response.messages`, falling back to scanning `r.full_conversation` for + pathological executors that did not populate the response. """ @handler - async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, list[Message]]) -> None: + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, AgentResponse]) -> None: if not results: logger.error("Concurrent aggregator received empty results list") raise ValueError("Aggregation failed: no results provided") @@ -91,12 +93,10 @@ def _is_role(msg: Any, role: str) -> bool: r = getattr(msg, "role", None) if r is None: return False - # Normalize both r and role to lowercase strings for comparison r_str = str(r).lower() if isinstance(r, str) or hasattr(r, "__str__") else r role_str = str(role).lower() return r_str == role_str - prompt_message: Message | None = None assistant_replies: list[Message] = [] for r in results: @@ -107,10 +107,6 @@ def _is_role(msg: Any, role: str) -> bool: f"{len(resp_messages)} response msgs, {len(r.full_conversation)} conversation msgs" ) - # Capture a single user prompt (first encountered across any conversation) - if prompt_message is None: - prompt_message = next((m for m in r.full_conversation if _is_role(m, "user")), None) - # Pick the final assistant message from the response; fallback to conversation search final_assistant = next((m for m in reversed(resp_messages) if _is_role(m, "assistant")), None) if final_assistant is None: @@ -127,14 +123,7 @@ def _is_role(msg: Any, role: str) -> bool: logger.error(f"Aggregation failed: no assistant replies found across {len(results)} results") raise RuntimeError("Aggregation failed: no assistant replies found") - output: list[Message] = [] - if prompt_message is not None: - output.append(prompt_message) - else: - logger.warning("No user prompt found in any conversation; emitting assistants only") - output.extend(assistant_replies) - - await ctx.yield_output(output) + await ctx.yield_output(AgentResponse(messages=assistant_replies)) class _CallbackAggregator(Executor): @@ -190,7 +179,8 @@ class ConcurrentBuilder: from agent_framework_orchestrations import ConcurrentBuilder - # Minimal: use default aggregator (returns list[Message]) + # Minimal: use default aggregator (yields one AgentResponse with one assistant + # message per participant) workflow = ConcurrentBuilder(participants=[agent1, agent2, agent3]).build() @@ -222,8 +212,9 @@ def __init__( Args: participants: Sequence of agent or executor instances to run in parallel. checkpoint_storage: Optional checkpoint storage for enabling workflow state persistence. - intermediate_outputs: If True, enables intermediate outputs from agent participants - before aggregation. + intermediate_outputs: If True, every participant's `yield_output` surfaces as a + workflow `output` event in addition to the aggregator's. By default + (False) only the aggregator's output surfaces. """ self._participants: list[SupportsAgentRun | Executor] = [] self._aggregator: Executor | None = None @@ -383,7 +374,7 @@ def build(self) -> Workflow: - If request info is enabled, the orchestration emits a request info event with outputs from all participants before sending the outputs to the aggregator - Aggregator yields output and the workflow becomes idle. The output is either: - - list[Message] (default aggregator: one user + one assistant per agent) + - AgentResponse (default aggregator: one assistant message per participant) - custom payload from the provided aggregator Returns: diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py b/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py index 4f1c2f832a..9f7e011252 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py @@ -29,7 +29,7 @@ from dataclasses import dataclass from typing import Any, ClassVar, cast -from agent_framework import Agent, AgentSession, Message, SupportsAgentRun +from agent_framework import Agent, AgentResponse, AgentResponseUpdate, AgentSession, Message, SupportsAgentRun from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage @@ -169,7 +169,9 @@ async def _handle_messages( """Initialize orchestrator state and start the conversation loop.""" self._append_messages(messages) # Termination condition will also be applied to the input messages - if await self._check_terminate_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)): + if await self._check_terminate_and_yield( + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx) + ): return next_speaker = await self._get_next_speaker() @@ -198,9 +200,13 @@ async def _handle_response( messages = clean_conversation_for_handoff(messages) self._append_messages(messages) - if await self._check_terminate_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)): + if await self._check_terminate_and_yield( + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx) + ): return - if await self._check_round_limit_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)): + if await self._check_round_limit_and_yield( + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx) + ): return next_speaker = await self._get_next_speaker() @@ -332,13 +338,15 @@ async def _handle_messages( """Initialize orchestrator state and start the conversation loop.""" self._append_messages(messages) # Termination condition will also be applied to the input messages - if await self._check_terminate_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)): + if await self._check_terminate_and_yield( + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx) + ): return agent_orchestration_output = await self._invoke_agent() if await self._check_agent_terminate_and_yield( agent_orchestration_output, - cast(WorkflowContext[Never, list[Message]], ctx), + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx), ): return @@ -366,15 +374,19 @@ async def _handle_response( # Remove tool-related content to prevent API errors from empty messages messages = clean_conversation_for_handoff(messages) self._append_messages(messages) - if await self._check_terminate_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)): + if await self._check_terminate_and_yield( + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx) + ): return - if await self._check_round_limit_and_yield(cast(WorkflowContext[Never, list[Message]], ctx)): + if await self._check_round_limit_and_yield( + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx) + ): return agent_orchestration_output = await self._invoke_agent() if await self._check_agent_terminate_and_yield( agent_orchestration_output, - cast(WorkflowContext[Never, list[Message]], ctx), + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx), ): return @@ -522,9 +534,9 @@ async def _invoke_agent_helper(conversation: list[Message]) -> AgentOrchestratio async def _check_agent_terminate_and_yield( self, agent_orchestration_output: AgentOrchestrationOutput, - ctx: WorkflowContext[Never, list[Message]], + ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ) -> bool: - """Check if the agent requested termination and yield completion if so. + """Yield the orchestrator's completion if termination was requested. Args: agent_orchestration_output: Output from the orchestrator agent @@ -536,8 +548,9 @@ async def _check_agent_terminate_and_yield( final_message = ( agent_orchestration_output.final_message or "The conversation has been terminated by the agent." ) - self._append_messages([self._create_completion_message(final_message)]) - await ctx.yield_output(self._full_conversation) + completion_message = self._create_completion_message(final_message) + self._append_messages([completion_message]) + await self._yield_completion(ctx, completion_message) return True return False @@ -622,7 +635,9 @@ def __init__( True to terminate the conversation, False to continue. max_rounds: Optional maximum number of orchestrator rounds to prevent infinite conversations. checkpoint_storage: Optional checkpoint storage for enabling workflow state persistence. - intermediate_outputs: If True, enables intermediate outputs from agent participants. + intermediate_outputs: If True, every participant's `yield_output` surfaces as a + workflow `output` event in addition to the orchestrator's. By default (False) + only the orchestrator's output surfaces. """ self._participants: dict[str, SupportsAgentRun | Executor] = {} self._participant_factories: list[Callable[[], SupportsAgentRun | Executor]] = [] @@ -643,8 +658,7 @@ def __init__( self._request_info_enabled: bool = False self._request_info_filter: set[str] = set() - # Intermediate outputs - self._intermediate_outputs = intermediate_outputs + self._intermediate_outputs: bool = intermediate_outputs if participants is None and participant_factories is None: raise ValueError("Either participants or participant_factories must be provided.") diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py index c3e156096c..f555ab89b0 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py @@ -352,7 +352,7 @@ async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None: self._full_conversation.extend(self._cache.copy()) # Check termination condition before running the agent - if await self._check_terminate_and_yield(ctx): + if await self._should_terminate(): return # Run the agent @@ -410,7 +410,7 @@ async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None: # Re-evaluate termination after appending and broadcasting this response. # Without this check, workflows that become terminal due to the latest assistant # message would still emit request_info and require an unnecessary extra resume. - if await self._check_terminate_and_yield(ctx): + if await self._should_terminate(): return # Handle case where no handoff was requested @@ -447,10 +447,10 @@ async def handle_response( response: The user's response messages ctx: The workflow context - If the response is empty, it indicates termination of the handoff workflow. + If the response is empty, the handoff workflow terminates. Per-agent responses + already surfaced as `output` events; no terminal yield is needed. """ if not response: - await ctx.yield_output(self._full_conversation) return # Broadcast the user response to all other agents @@ -520,14 +520,12 @@ def _is_handoff_requested(self, response: AgentResponse) -> tuple[str, Message] return None - async def _check_terminate_and_yield(self, ctx: WorkflowContext[Any, Any]) -> bool: - """Check termination conditions and yield completion if met. + async def _should_terminate(self) -> bool: + """Pure predicate: return True iff the configured termination condition is satisfied. - Args: - ctx: Workflow context for yielding output - - Returns: - True if termination condition met and output yielded, False otherwise + Per-agent responses already surface as `output` events as agents speak, so the + handoff workflow has no terminal yield to make — this method only decides whether + the workflow should stop iterating. """ if self._termination_condition is None: return False @@ -535,12 +533,7 @@ async def _check_terminate_and_yield(self, ctx: WorkflowContext[Any, Any]) -> bo terminated = self._termination_condition(self._full_conversation) if inspect.isawaitable(terminated): terminated = await terminated - - if terminated: - await ctx.yield_output(self._full_conversation) - return True - - return False + return bool(terminated) @override async def on_checkpoint_save(self) -> dict[str, Any]: @@ -577,13 +570,15 @@ class HandoffBuilder: tool injection, and middleware — capabilities only available on ``Agent``. Outputs: - The final conversation history as a list of Message once the group chat completes. + Each agent's response surfaces as a workflow `output` event as it speaks; there is no + synthetic terminal event. Consumers iterating events see per-agent ``AgentResponse`` (or + ``AgentResponseUpdate`` while streaming) in conversation order. The workflow returns to + idle once the termination condition is met (or the user terminates an interactive run). Note: 1. Agents in handoff workflows must be ``Agent`` instances and support local tool calls. - 2. Handoff doesn't support intermediate outputs from agents. All outputs are returned as - they become available. This is because agents in handoff workflows are not considered - sub-agents of a central orchestrator, thus all outputs are directly emitted. + 2. Because each agent's response is itself a workflow output, handoff has no separate + "intermediate outputs" channel — every per-agent response is the primary output. """ def __init__( diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_magentic.py b/python/packages/orchestrations/agent_framework_orchestrations/_magentic.py index 80031cd726..7f1854f914 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_magentic.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_magentic.py @@ -14,6 +14,7 @@ from agent_framework import ( AgentResponse, + AgentResponseUpdate, AgentSession, Message, SupportsAgentRun, @@ -1057,7 +1058,9 @@ async def _run_inner_loop_helper( if self._magentic_context is None: raise RuntimeError("Context not initialized") # Check limits first - within_limits = await self._check_within_limits_or_complete(cast(WorkflowContext[Never, list[Message]], ctx)) + within_limits = await self._check_within_limits_or_complete( + cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx) + ) if not within_limits: return @@ -1092,7 +1095,7 @@ async def _run_inner_loop_helper( # Check for task completion if self._progress_ledger.is_request_satisfied.answer: logger.info("Magentic Orchestrator: Task completed") - await self._prepare_final_answer(cast(WorkflowContext[Never, list[Message]], ctx)) + await self._prepare_final_answer(cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)) return # Check for stalling or looping @@ -1116,7 +1119,7 @@ async def _run_inner_loop_helper( if next_speaker not in self._participant_registry.participants: logger.warning(f"Invalid next speaker: {next_speaker}") - await self._prepare_final_answer(cast(WorkflowContext[Never, list[Message]], ctx)) + await self._prepare_final_answer(cast(WorkflowContext[Never, AgentResponse | AgentResponseUpdate], ctx)) return # Add instruction to conversation (assistant guidance) @@ -1192,20 +1195,25 @@ async def _run_outer_loop( # Start inner loop await self._run_inner_loop(ctx) - async def _prepare_final_answer(self, ctx: WorkflowContext[Never, list[Message]]) -> None: - """Prepare the final answer using the manager.""" + async def _prepare_final_answer(self, ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate]) -> None: + """Yield the manager's synthesized final answer. + + Mode-aware: streaming -> ``AgentResponseUpdate``, non-streaming → ``AgentResponse``. + See ``BaseGroupChatOrchestrator._yield_completion``. + """ if self._magentic_context is None: raise RuntimeError("Context not initialized") logger.info("Magentic Orchestrator: Preparing final answer") final_answer = await self._manager.prepare_final_answer(self._magentic_context.clone(deep=True)) - # Emit a completed event for the workflow - await ctx.yield_output([final_answer]) + await self._yield_completion(ctx, final_answer) self._terminated = True - async def _check_within_limits_or_complete(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: + async def _check_within_limits_or_complete( + self, ctx: WorkflowContext[Never, AgentResponse | AgentResponseUpdate] + ) -> bool: """Check if orchestrator is within operational limits. If limits are exceeded, yield a termination message and mark the workflow as terminated. @@ -1229,15 +1237,12 @@ async def _check_within_limits_or_complete(self, ctx: WorkflowContext[Never, lis limit_type = "round" if hit_round_limit else "reset" logger.error(f"Magentic Orchestrator: Max {limit_type} count reached") - # Yield the full conversation with an indication of termination due to limits - await ctx.yield_output([ - *self._magentic_context.chat_history, - Message( - role="assistant", - contents=[f"Workflow terminated due to reaching maximum {limit_type} count."], - author_name=MAGENTIC_MANAGER_NAME, - ), - ]) + termination_message = Message( + role="assistant", + contents=[f"Workflow terminated due to reaching maximum {limit_type} count."], + author_name=MAGENTIC_MANAGER_NAME, + ) + await self._yield_completion(ctx, termination_message) self._terminated = True return False @@ -1427,7 +1432,9 @@ def __init__( max_round_count: Max total coordination rounds. None means unlimited. enable_plan_review: If True, requires human approval of the initial plan before proceeding. checkpoint_storage: Optional checkpoint storage for enabling workflow state persistence. - intermediate_outputs: If True, enables intermediate outputs from agent participants. + intermediate_outputs: If True, every participant's `yield_output` surfaces as a + workflow `output` event in addition to the orchestrator's. By default (False) + only the orchestrator's output surfaces. """ self._participants: dict[str, SupportsAgentRun | Executor] = {} @@ -1440,7 +1447,6 @@ def __init__( self._checkpoint_storage: CheckpointStorage | None = checkpoint_storage - # Intermediate outputs self._intermediate_outputs = intermediate_outputs self._set_participants(participants) diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_orchestration_request_info.py b/python/packages/orchestrations/agent_framework_orchestrations/_orchestration_request_info.py index e78d1bef14..23e382aa13 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_orchestration_request_info.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_orchestration_request_info.py @@ -4,7 +4,7 @@ from typing import Literal from agent_framework._agents import SupportsAgentRun -from agent_framework._types import Message +from agent_framework._types import AgentResponse, Message from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._executor import Executor, handler @@ -86,7 +86,13 @@ def approve() -> "AgentRequestInfoResponse": class AgentRequestInfoExecutor(Executor): - """Executor for gathering request info from users to assist agents.""" + """Executor for gathering request info from users to assist agents. + + On approval (caller returned no follow-up messages), yields the original + ``AgentExecutorResponse`` so downstream ``AgentExecutor`` participants can consume it + via their ``from_response`` handler — i.e., the inner workflow's output type matches the + chain currency used between Sequential participants. + """ @handler async def request_info(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext) -> None: @@ -109,6 +115,56 @@ async def handle_request_info_response( await ctx.yield_output(original_request) +class _TerminalAgentRequestInfoExecutor(Executor): + """Sibling of ``AgentRequestInfoExecutor`` used when ``AgentApprovalExecutor`` is the workflow's terminator. + + This exists because: + - The orchestration contract established is that every orchestration's terminal + ``output`` event carries an ``AgentResponse``. That is the user-facing promise — e.g., + ``workflow.as_agent().run(prompt)`` returns an ``AgentResponse``. + - ``AgentRequestInfoExecutor`` yields ``AgentExecutorResponse`` because that is the chain + currency between Sequential participants: the next ``AgentExecutor`` consumes + ``AgentExecutorResponse`` via its ``from_response`` handler. That is correct when + ``AgentApprovalExecutor`` is *intermediate*. + - When ``AgentApprovalExecutor`` is the *terminator* (``allow_direct_output=True``), the + inner yield flows straight through ``WorkflowExecutor`` to the outer workflow's terminal + output. Yielding ``AgentExecutorResponse`` there would surface ``AgentExecutorResponse`` + as the workflow's terminal output — violating the orchestration contract. + + Used in place of ``AgentRequestInfoExecutor`` inside the terminator-mode inner workflow + built by ``AgentApprovalExecutor._build_workflow`` when ``allow_direct_output=True``. + + Translation belongs here — at the source of the yield in the orchestrations package — + rather than at the ``WorkflowExecutor`` boundary in core, because core has no opinion + about the orchestration's ``AgentResponse`` contract. + + Note: not a subclass of ``AgentRequestInfoExecutor``. The two classes have different + terminal yield contracts (``AgentExecutorResponse`` vs. ``AgentResponse``), and + ``WorkflowContext``'s output type parameter is invariant — so a subclass override would + be type-incompatible. They are siblings sharing only a small ``request_info`` handler. + """ + + @handler + async def request_info(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext) -> None: + """Handle the agent's response and gather additional info from users.""" + await ctx.request_info(agent_response, AgentRequestInfoResponse) + + @response_handler + async def handle_request_info_response( + self, + original_request: AgentExecutorResponse, + response: AgentRequestInfoResponse, + ctx: WorkflowContext[AgentExecutorRequest, AgentResponse], + ) -> None: + """Process the additional info provided by users; yield ``AgentResponse`` on approval.""" + if response.messages: + # User provided additional messages, further iterate on agent response + await ctx.send_message(AgentExecutorRequest(messages=response.messages, should_respond=True)) + else: + # No additional info, approve and surface the wrapped AgentResponse to the parent. + await ctx.yield_output(original_request.agent_response) + + class AgentApprovalExecutor(WorkflowExecutor): """Executor for enabling scenarios requiring agent approval in an orchestration. @@ -122,22 +178,47 @@ def __init__( self, agent: SupportsAgentRun, context_mode: Literal["full", "last_agent", "custom"] | None = None, + *, + allow_direct_output: bool = False, ) -> None: """Initialize the AgentApprovalExecutor. Args: agent: The agent protocol to use for generating responses. context_mode: The mode for providing context to the agent. + allow_direct_output: When True, the inner agent's response is yielded as the + wrapping workflow's output (rather than forwarded as a message to a + downstream participant). Set this when this executor is the workflow's + terminator — so the user-approved final response surfaces as a workflow + ``output`` event. """ self._context_mode: Literal["full", "last_agent", "custom"] | None = context_mode self._description = agent.description - super().__init__(workflow=self._build_workflow(agent), id=resolve_agent_id(agent), propagate_request=True) + super().__init__( + workflow=self._build_workflow(agent, terminal=allow_direct_output), + id=resolve_agent_id(agent), + propagate_request=True, + allow_direct_output=allow_direct_output, + ) + + def _build_workflow(self, agent: SupportsAgentRun, *, terminal: bool) -> Workflow: + """Build the internal workflow for the AgentApprovalExecutor. - def _build_workflow(self, agent: SupportsAgentRun) -> Workflow: - """Build the internal workflow for the AgentApprovalExecutor.""" - agent_executor = AgentExecutor(agent, context_mode=self._context_mode) - request_info_executor = AgentRequestInfoExecutor(id="agent_request_info_executor") + Picks the right ``AgentRequestInfoExecutor`` variant for the role this approval flow + plays in the outer workflow: + + - Intermediate (``terminal=False``): inner workflow yields ``AgentExecutorResponse`` + so the next outer ``AgentExecutor`` participant can consume it via ``from_response``. + - Terminator (``terminal=True``): inner workflow yields ``AgentResponse`` so the outer + workflow's terminal output matches the orchestration contract. + """ + agent_executor = AgentExecutor( + agent, + context_mode=self._context_mode, + ) + request_info_cls = _TerminalAgentRequestInfoExecutor if terminal else AgentRequestInfoExecutor + request_info_executor = request_info_cls(id="agent_request_info_executor") return ( WorkflowBuilder(start_executor=agent_executor) diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py b/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py index bda7f194ab..36d4f23f49 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py @@ -2,50 +2,24 @@ """Sequential builder for agent/executor workflows with shared conversation context. -This module provides a high-level, agent-focused API to assemble a sequential -workflow where: -- Participants are provided as SupportsAgentRun or Executor instances via `participants=[...]` -- A shared conversation context (list[Message]) is passed along the chain -- Agents append their assistant messages to the context -- Custom executors can transform or summarize and return a refined context -- The workflow finishes with the final context produced by the last participant - -Typical wiring: - input -> _InputToConversation -> participant1 -> (agent? -> _ResponseToConversation) -> - ... -> participantN -> _EndWithConversation - -Notes: -- Participants can mix SupportsAgentRun and Executor objects -- Agents are auto-wrapped by WorkflowBuilder as AgentExecutor (unless already wrapped) -- AgentExecutor produces AgentExecutorResponse; _ResponseToConversation converts this to list[Message] -- Non-agent executors must define a handler that consumes `list[Message]` and sends back - the updated `list[Message]` via their workflow context - -Why include the small internal adapter executors? -- Input normalization ("input-conversation"): ensures the workflow always starts with a - `list[Message]` regardless of whether callers pass a `str`, a single `Message`, - or a list. This keeps the first hop strongly typed and avoids boilerplate in participants. -- Agent response adaptation ("to-conversation:"): agents (via AgentExecutor) - emit `AgentExecutorResponse`. The adapter converts that to a `list[Message]` - using `full_conversation` so original prompts aren't lost when chaining. -- Result output ("end"): yields the final conversation list and the workflow becomes idle - giving a consistent terminal payload shape for both agents and custom executors. - -These adapters are first-class executors by design so they are type-checked at edges, -observable (ExecutorInvoke/Completed events), and easily testable/reusable. Their IDs are -deterministic and self-describing (for example, "to-conversation:writer") to reduce event-log -confusion and to mirror how the concurrent builder uses explicit dispatcher/aggregator nodes. +Participants (SupportsAgentRun or Executor instances) run in order, sharing a +conversation along the chain. Agents append their assistant messages; custom executors +transform and return a refined `list[Message]`. + +Wiring: input -> _InputToConversation -> participant1 -> ... -> participantN + +The workflow's final `output` event is the last participant's `yield_output(...)`. For +agent terminators that is an `AgentResponse` (or per-chunk `AgentResponseUpdate`s when +streaming). For custom-executor terminators, the executor itself yields whatever it +produces — by convention an `AgentResponse` so downstream consumers see a uniform shape. """ import logging from collections.abc import Sequence -from typing import Any, Literal +from typing import Literal from agent_framework import Message, SupportsAgentRun -from agent_framework._workflows._agent_executor import ( - AgentExecutor, - AgentExecutorResponse, -) +from agent_framework._workflows._agent_executor import AgentExecutor from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage from agent_framework._workflows._executor import ( @@ -78,34 +52,6 @@ async def from_messages(self, messages: list[str | Message], ctx: WorkflowContex await ctx.send_message(normalize_messages_input(messages)) -class _EndWithConversation(Executor): - """Terminates the workflow by emitting the final conversation context.""" - - @handler - async def end_with_messages( - self, - conversation: list[Message], - ctx: WorkflowContext[Any, list[Message]], - ) -> None: - """Handler for ending with a list of Message. - - This is used when the last participant is a custom executor. - """ - await ctx.yield_output(list(conversation)) - - @handler - async def end_with_agent_executor_response( - self, - response: AgentExecutorResponse, - ctx: WorkflowContext[Any, list[Message] | None], - ) -> None: - """Handle case where last participant is an agent. - - The agent is wrapped by AgentExecutor and emits AgentExecutorResponse. - """ - await ctx.yield_output(response.full_conversation) - - class SequentialBuilder: r"""High-level builder for sequential agent/executor workflows with shared context. @@ -155,7 +101,9 @@ def __init__( chain_only_agent_responses: If True, only agent responses are chained between agents. By default, the full conversation context is passed to the next agent. This also applies to Executor -> Agent transitions if the executor sends `AgentExecutorResponse`. - intermediate_outputs: If True, enables intermediate outputs from agent participants. + intermediate_outputs: If True, every participant's `yield_output` surfaces as a + workflow `output` event in addition to the terminator's. By default (False) only + the last participant's output surfaces. """ self._participants: list[SupportsAgentRun | Executor] = [] self._checkpoint_storage: CheckpointStorage | None = checkpoint_storage @@ -225,7 +173,14 @@ def with_request_info( return self def _resolve_participants(self) -> list[Executor]: - """Resolve participant instances into Executor objects.""" + """Resolve participant instances into Executor objects. + + Wraps `SupportsAgentRun` participants as `AgentExecutor` (or `AgentApprovalExecutor` + when request-info is enabled for that participant). The last participant, when wrapped + as `AgentApprovalExecutor`, is constructed with `allow_direct_output=True` so the + approved response surfaces as the workflow's output event instead of being forwarded + as a message that has nowhere to go. + """ if not self._participants: raise ValueError("No participants provided. Pass participants to the constructor.") @@ -235,8 +190,9 @@ def _resolve_participants(self) -> list[Executor]: "last_agent" if self._chain_only_agent_responses else None ) + last_idx = len(participants) - 1 executors: list[Executor] = [] - for p in participants: + for idx, p in enumerate(participants): if isinstance(p, Executor): executors.append(p) elif isinstance(p, SupportsAgentRun): @@ -244,7 +200,13 @@ def _resolve_participants(self) -> list[Executor]: not self._request_info_filter or resolve_agent_id(p) in self._request_info_filter ): # Handle request info enabled agents - executors.append(AgentApprovalExecutor(p, context_mode=context_mode)) + executors.append( + AgentApprovalExecutor( + p, + context_mode=context_mode, + allow_direct_output=(idx == last_idx), + ) + ) else: executors.append(AgentExecutor(p, context_mode=context_mode)) else: @@ -256,17 +218,18 @@ def build(self) -> Workflow: """Build and validate the sequential workflow. Wiring pattern: - - _InputToConversation normalizes the initial input into list[Message] - - For each participant in order: - - If Agent (or AgentExecutor): pass conversation to the agent, then optionally - route through a request info interceptor, then convert response to conversation - via _ResponseToConversation - - Else (custom Executor): pass conversation directly to the executor - - _EndWithConversation yields the final conversation and the workflow becomes idle + - `_InputToConversation` normalizes the initial input into `list[Message]`. + - Each participant runs in order: + - `AgentExecutor`: receives the conversation / `AgentExecutorResponse` and + forwards an `AgentExecutorResponse` downstream. + - Custom `Executor`: receives `list[Message]` and forwards `list[Message]`. + If used as the terminator, it must call `ctx.yield_output(AgentResponse(...))` + instead of `ctx.send_message(...)` — its yield becomes the workflow's output. + - The last participant is registered as the workflow's `output_executor`, so the + terminator's own `yield_output` is the workflow's terminal output (`AgentResponse`, + or per-chunk `AgentResponseUpdate` when streaming). """ - # Internal nodes input_conv = _InputToConversation(id="input-conversation") - end = _EndWithConversation(id="end") # Resolve participants and participant factories to executors participants: list[Executor] = self._resolve_participants() @@ -274,15 +237,12 @@ def build(self) -> Workflow: builder = WorkflowBuilder( start_executor=input_conv, checkpoint_storage=self._checkpoint_storage, - output_executors=[end] if not self._intermediate_outputs else None, + output_executors=[participants[-1]] if not self._intermediate_outputs else None, ) - # Start of the chain is the input normalizer prior: Executor | SupportsAgentRun = input_conv for p in participants: builder.add_edge(prior, p) prior = p - # Terminate with the final conversation - builder.add_edge(prior, end) return builder.build() diff --git a/python/packages/orchestrations/tests/test_concurrent.py b/python/packages/orchestrations/tests/test_concurrent.py index 7d9a2bc534..e6160f39db 100644 --- a/python/packages/orchestrations/tests/test_concurrent.py +++ b/python/packages/orchestrations/tests/test_concurrent.py @@ -1,14 +1,21 @@ # Copyright (c) Microsoft. All rights reserved. -from typing import Any, cast +from collections.abc import AsyncIterable, Awaitable +from typing import Any, Literal, cast, overload import pytest from agent_framework import ( AgentExecutorRequest, AgentExecutorResponse, AgentResponse, + AgentResponseUpdate, + AgentRunInputs, + AgentSession, + BaseAgent, + Content, Executor, Message, + ResponseStream, WorkflowContext, WorkflowRunState, handler, @@ -49,36 +56,26 @@ def test_concurrent_builder_rejects_duplicate_executors() -> None: ConcurrentBuilder(participants=[a, b]) -async def test_concurrent_default_aggregator_emits_single_user_and_assistants() -> None: - # Three synthetic agent executors +async def test_concurrent_default_aggregator_emits_assistants_only() -> None: + """Default aggregator yields a single AgentResponse with one assistant message per participant. + + The user prompt is intentionally not included — that belongs in the input, not the answer. + """ e1 = _FakeAgentExec("agentA", "Alpha") e2 = _FakeAgentExec("agentB", "Beta") e3 = _FakeAgentExec("agentC", "Gamma") wf = ConcurrentBuilder(participants=[e1, e2, e3]).build() - completed = False - output: list[Message] | None = None - async for ev in wf.run("prompt: hello world", stream=True): - if ev.type == "status" and ev.state == WorkflowRunState.IDLE: - completed = True - elif ev.type == "output": - output = cast(list[Message], ev.data) - if completed and output is not None: - break - - assert completed - assert output is not None - messages: list[Message] = output + output_events = [ev for ev in await wf.run("prompt: hello world") if ev.type == "output"] + assert len(output_events) == 1 + response = output_events[0].data + assert isinstance(response, AgentResponse) - # Expect one user message + one assistant message per participant - assert len(messages) == 1 + 3 - assert messages[0].role == "user" - assert "hello world" in messages[0].text - - assistant_texts = {m.text for m in messages[1:]} - assert assistant_texts == {"Alpha", "Beta", "Gamma"} - assert all(m.role == "assistant" for m in messages[1:]) + # Exactly one assistant message per participant; no user prompt. + assert len(response.messages) == 3 + assert all(m.role == "assistant" for m in response.messages) + assert {m.text for m in response.messages} == {"Alpha", "Beta", "Gamma"} async def test_concurrent_custom_aggregator_callback_is_used() -> None: @@ -215,7 +212,7 @@ async def test_concurrent_checkpoint_resume_round_trip() -> None: wf = ConcurrentBuilder(participants=list(participants), checkpoint_storage=storage).build() - baseline_output: list[Message] | None = None + baseline_output: AgentResponse | None = None async for ev in wf.run("checkpoint concurrent", stream=True): if ev.type == "output": baseline_output = ev.data # type: ignore[assignment] @@ -236,7 +233,7 @@ async def test_concurrent_checkpoint_resume_round_trip() -> None: ) wf_resume = ConcurrentBuilder(participants=list(resumed_participants), checkpoint_storage=storage).build() - resumed_output: list[Message] | None = None + resumed_output: AgentResponse | None = None async for ev in wf_resume.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True): if ev.type == "output": resumed_output = ev.data # type: ignore[assignment] @@ -247,8 +244,8 @@ async def test_concurrent_checkpoint_resume_round_trip() -> None: break assert resumed_output is not None - assert [m.role for m in resumed_output] == [m.role for m in baseline_output] - assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + assert [m.role for m in resumed_output.messages] == [m.role for m in baseline_output.messages] + assert [m.text for m in resumed_output.messages] == [m.text for m in baseline_output.messages] async def test_concurrent_checkpoint_runtime_only() -> None: @@ -258,7 +255,7 @@ async def test_concurrent_checkpoint_runtime_only() -> None: agents = [_FakeAgentExec(id="agent1", reply_text="A1"), _FakeAgentExec(id="agent2", reply_text="A2")] wf = ConcurrentBuilder(participants=agents).build() - baseline_output: list[Message] | None = None + baseline_output: AgentResponse | None = None async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True): if ev.type == "output": baseline_output = ev.data # type: ignore[assignment] @@ -278,7 +275,7 @@ async def test_concurrent_checkpoint_runtime_only() -> None: resumed_agents = [_FakeAgentExec(id="agent1", reply_text="A1"), _FakeAgentExec(id="agent2", reply_text="A2")] wf_resume = ConcurrentBuilder(participants=resumed_agents).build() - resumed_output: list[Message] | None = None + resumed_output: AgentResponse | None = None async for ev in wf_resume.run( checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage, stream=True ): @@ -291,7 +288,7 @@ async def test_concurrent_checkpoint_runtime_only() -> None: break assert resumed_output is not None - assert [m.role for m in resumed_output] == [m.role for m in baseline_output] + assert [m.role for m in resumed_output.messages] == [m.role for m in baseline_output.messages] async def test_concurrent_checkpoint_runtime_overrides_buildtime() -> None: @@ -334,3 +331,46 @@ async def test_concurrent_builder_reusable_after_build_with_participants() -> No assert builder._participants[0] is e1 # type: ignore assert builder._participants[1] is e2 # type: ignore + + +class _EchoAgent(BaseAgent): + """Simple agent that appends a single assistant message with its name.""" + + @overload + def run( + self, + messages: AgentRunInputs | None = ..., + *, + stream: Literal[False] = ..., + session: AgentSession | None = ..., + **kwargs: Any, + ) -> Awaitable[AgentResponse[Any]]: ... + @overload + def run( + self, + messages: AgentRunInputs | None = ..., + *, + stream: Literal[True], + session: AgentSession | None = ..., + **kwargs: Any, + ) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ... + + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + **kwargs: Any, + ) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: + if stream: + + async def _stream() -> AsyncIterable[AgentResponseUpdate]: + yield AgentResponseUpdate(contents=[Content.from_text(text=f"{self.name} reply")]) + + return ResponseStream(_stream(), finalizer=AgentResponse.from_updates) + + async def _run() -> AgentResponse: + return AgentResponse(messages=[Message("assistant", [f"{self.name} reply"])]) + + return _run() diff --git a/python/packages/orchestrations/tests/test_group_chat.py b/python/packages/orchestrations/tests/test_group_chat.py index 2118de5ba7..50f58e781a 100644 --- a/python/packages/orchestrations/tests/test_group_chat.py +++ b/python/packages/orchestrations/tests/test_group_chat.py @@ -238,18 +238,16 @@ async def test_group_chat_builder_basic_flow() -> None: orchestrator_name="manager", ).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run("coordinate task", stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) - assert len(outputs) == 1 - assert len(outputs[0]) >= 1 - # Check that both agents contributed - authors = {msg.author_name for msg in outputs[0] if msg.author_name in ["alpha", "beta"]} - assert len(authors) == 2 + # Exactly one terminal `output` event = the orchestrator's completion AgentResponseUpdate + # (mode-aware: streaming yields a single update chunk for the synthesized message). + assert len(updates) == 1 + # The completion message is authored by the orchestrator. + assert updates[0].author_name == "manager" async def test_group_chat_as_agent_accepts_conversation() -> None: @@ -283,18 +281,16 @@ async def test_agent_manager_handles_concatenated_json_output() -> None: orchestrator_agent=manager, ).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run("coordinate task", stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) - assert outputs - conversation = outputs[-1] - assert any(msg.author_name == "agent" and msg.text == "worker response" for msg in conversation) - assert conversation[-1].author_name == manager.name - assert conversation[-1].text == "concatenated manager final" + assert updates + final_update = updates[-1] + # Terminal update is the orchestrator's completion message. + assert final_update.author_name == manager.name + assert final_update.text == "concatenated manager final" # Comprehensive tests for group chat functionality @@ -400,20 +396,14 @@ def selector(state: GroupChatState) -> str: selection_func=selector, ).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run("test task", stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) - - # Should have terminated due to max_rounds, expect at least one output - assert len(outputs) >= 1 - # The final message in the conversation should be about round limit - conversation = outputs[-1] - assert len(conversation) >= 1 - final_output = conversation[-1] - assert "maximum number of rounds" in final_output.text.lower() + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) + + # Exactly one terminal output event = orchestrator's max-rounds completion update. + assert len(updates) == 1 + assert "maximum number of rounds" in (updates[0].text or "").lower() async def test_termination_condition_halts_conversation(self) -> None: """Test that a custom termination condition stops the workflow.""" @@ -433,20 +423,89 @@ def termination_condition(conversation: list[Message]) -> bool: selection_func=selector, ).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] + async for event in workflow.run("test task", stream=True): + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) + + assert updates, "Expected termination to yield output" + # Terminal update is the orchestrator's completion message only. + assert "termination condition" in (updates[-1].text or "").lower() + + async def test_termination_yields_update_in_streaming(self) -> None: + """In streaming mode, the orchestrator's terminal completion surfaces as `AgentResponseUpdate`. + + Mirrors AgentExecutor's mode-aware behavior: streaming workflows produce per-chunk + `AgentResponseUpdate` events; the synthesized termination message is logically a + single chunk, so it should be a single `AgentResponseUpdate`. + """ + + def selector(state: GroupChatState) -> str: + return "agent" + + def termination_condition(conversation: list[Message]) -> bool: + replies = [msg for msg in conversation if msg.role == "assistant" and msg.author_name == "agent"] + return len(replies) >= 2 + + workflow = GroupChatBuilder( + participants=[StubAgent("agent", "response")], + termination_condition=termination_condition, + selection_func=selector, + ).build() + + terminal: AgentResponseUpdate | None = None async for event in workflow.run("test task", stream=True): if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) - - assert outputs, "Expected termination to yield output" - conversation = outputs[-1] - agent_replies = [msg for msg in conversation if msg.author_name == "agent" and msg.role == "assistant"] - assert len(agent_replies) == 2 - final_output = conversation[-1] - # The orchestrator uses its ID as author_name by default - assert "termination condition" in final_output.text.lower() + terminal = event.data # last output event wins + + assert isinstance(terminal, AgentResponseUpdate), ( + f"Expected AgentResponseUpdate in streaming mode, got {type(terminal).__name__}" + ) + assert "termination condition" in (terminal.text or "").lower() + + async def test_termination_yields_response_in_non_streaming(self) -> None: + """In non-streaming mode, the orchestrator's terminal completion surfaces as `AgentResponse`.""" + + def selector(state: GroupChatState) -> str: + return "agent" + + def termination_condition(conversation: list[Message]) -> bool: + replies = [msg for msg in conversation if msg.role == "assistant" and msg.author_name == "agent"] + return len(replies) >= 2 + + workflow = GroupChatBuilder( + participants=[StubAgent("agent", "response")], + termination_condition=termination_condition, + selection_func=selector, + ).build() + + events = await workflow.run("test task") + outputs = [ev for ev in events if ev.type == "output"] + assert len(outputs) == 1 + assert isinstance(outputs[0].data, AgentResponse) + assert "termination condition" in outputs[0].data.messages[-1].text.lower() + + async def test_max_rounds_yields_update_in_streaming(self) -> None: + """Max-rounds completion in streaming mode surfaces as `AgentResponseUpdate`.""" + + def selector(state: GroupChatState) -> str: + return "agent" + + workflow = GroupChatBuilder( + participants=[StubAgent("agent", "response")], + max_rounds=2, + selection_func=selector, + ).build() + + terminal: AgentResponseUpdate | None = None + async for event in workflow.run("test task", stream=True): + if event.type == "output": + terminal = event.data + + assert isinstance(terminal, AgentResponseUpdate), ( + f"Expected AgentResponseUpdate in streaming mode, got {type(terminal).__name__}" + ) + assert "maximum number of rounds" in (terminal.text or "").lower() async def test_termination_condition_agent_manager_finalizes(self) -> None: """Test that termination condition with agent orchestrator produces default termination message.""" @@ -459,17 +518,15 @@ async def test_termination_condition_agent_manager_finalizes(self) -> None: orchestrator_agent=manager, ).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run("test task", stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) - assert outputs, "Expected termination to yield output" - conversation = outputs[-1] - assert conversation[-1].text == BaseGroupChatOrchestrator.TERMINATION_CONDITION_MET_MESSAGE - assert conversation[-1].author_name == manager.name + assert updates, "Expected termination to yield output" + final_update = updates[-1] + assert final_update.text == BaseGroupChatOrchestrator.TERMINATION_CONDITION_MET_MESSAGE + assert final_update.author_name == manager.name async def test_unknown_participant_error(self) -> None: """Test that unknown participant selection raises error.""" @@ -505,14 +562,12 @@ def selector(state: GroupChatState) -> str: selection_func=selector, ).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run("test task", stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) - assert len(outputs) == 1 # Should complete normally + assert len(updates) == 1 # Should complete normally class TestConversationHandling: @@ -546,14 +601,12 @@ def selector(state: GroupChatState) -> str: workflow = GroupChatBuilder(participants=[agent], max_rounds=1, selection_func=selector).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run("test string", stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) - assert len(outputs) == 1 + assert len(updates) == 1 async def test_handle_chat_message_input(self) -> None: """Test handling Message input directly.""" @@ -569,14 +622,12 @@ def selector(state: GroupChatState) -> str: workflow = GroupChatBuilder(participants=[agent], max_rounds=1, selection_func=selector).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run(task_message, stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) - assert len(outputs) == 1 + assert len(updates) == 1 async def test_handle_conversation_list_input(self) -> None: """Test handling conversation list preserves context.""" @@ -595,14 +646,12 @@ def selector(state: GroupChatState) -> str: workflow = GroupChatBuilder(participants=[agent], max_rounds=1, selection_func=selector).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run(conversation, stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) - assert len(outputs) == 1 + assert len(updates) == 1 class TestRoundLimitEnforcement: @@ -625,20 +674,14 @@ def selector(state: GroupChatState) -> str: selection_func=selector, ).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run("test", stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) - - # Should have at least one output (the round limit message) - assert len(outputs) >= 1 - # The last message in the conversation should be about round limit - conversation = outputs[-1] - assert len(conversation) >= 1 - final_output = conversation[-1] - assert "maximum number of rounds" in final_output.text.lower() + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) + + # Exactly one terminal output event = orchestrator's max-rounds completion update. + assert len(updates) == 1 + assert "maximum number of rounds" in (updates[0].text or "").lower() async def test_round_limit_in_ingest_participant_message(self) -> None: """Test round limit enforcement after participant response.""" @@ -658,20 +701,14 @@ def selector(state: GroupChatState) -> str: selection_func=selector, ).build() - outputs: list[list[Message]] = [] + updates: list[AgentResponseUpdate] = [] async for event in workflow.run("test", stream=True): - if event.type == "output": - data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) - # Should have at least one output (the round limit message) - assert len(outputs) >= 1 - # The last message in the conversation should be about round limit - conversation = outputs[-1] - assert len(conversation) >= 1 - final_output = conversation[-1] - assert "maximum number of rounds" in final_output.text.lower() + # Exactly one terminal output event = orchestrator's max-rounds completion update. + assert len(updates) == 1 + assert "maximum number of rounds" in (updates[0].text or "").lower() async def test_group_chat_checkpoint_runtime_only() -> None: @@ -684,17 +721,17 @@ async def test_group_chat_checkpoint_runtime_only() -> None: wf = GroupChatBuilder(participants=[agent_a, agent_b], max_rounds=2, selection_func=selector).build() - baseline_output: list[Message] | None = None + baseline_update: AgentResponseUpdate | None = None async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True): - if ev.type == "output": - baseline_output = cast(list[Message], ev.data) if isinstance(ev.data, list) else None # type: ignore + if ev.type == "output" and isinstance(ev.data, AgentResponseUpdate): + baseline_update = ev.data if ev.type == "status" and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, ): break - assert baseline_output is not None + assert baseline_update is not None checkpoints = await storage.list_checkpoints(workflow_name=wf.name) assert len(checkpoints) > 0, "Runtime-only checkpointing should have created checkpoints" @@ -720,17 +757,17 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: checkpoint_storage=buildtime_storage, selection_func=selector, ).build() - baseline_output: list[Message] | None = None + baseline_update: AgentResponseUpdate | None = None async for ev in wf.run("override test", checkpoint_storage=runtime_storage, stream=True): - if ev.type == "output": - baseline_output = cast(list[Message], ev.data) if isinstance(ev.data, list) else None # type: ignore + if ev.type == "output" and isinstance(ev.data, AgentResponseUpdate): + baseline_update = ev.data if ev.type == "status" and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, ): break - assert baseline_output is not None + assert baseline_update is not None buildtime_checkpoints = await buildtime_storage.list_checkpoints(workflow_name=wf.name) runtime_checkpoints = await runtime_storage.list_checkpoints(workflow_name=wf.name) @@ -974,14 +1011,11 @@ def agent_factory() -> Agent: outputs.append(event) assert len(outputs) == 1 - # The DynamicManagerAgent terminates after second call with final_message - final_messages = outputs[0].data - assert isinstance(final_messages, list) - assert any( - msg.text == "dynamic manager final" - for msg in cast(list[Message], final_messages) - if msg.author_name == "dynamic_manager" - ) + # Streaming mode: terminal yield is AgentResponseUpdate. The DynamicManagerAgent + # terminates after second call with final_message. + final_update = outputs[0].data + assert isinstance(final_update, AgentResponseUpdate) + assert final_update.text == "dynamic manager final" def test_group_chat_with_orchestrator_factory_returning_base_orchestrator(): diff --git a/python/packages/orchestrations/tests/test_handoff.py b/python/packages/orchestrations/tests/test_handoff.py index a512d9b9df..33eed34406 100644 --- a/python/packages/orchestrations/tests/test_handoff.py +++ b/python/packages/orchestrations/tests/test_handoff.py @@ -9,6 +9,8 @@ import pytest from agent_framework import ( Agent, + AgentResponse, + AgentResponseUpdate, ChatResponse, ChatResponseUpdate, Content, @@ -856,10 +858,15 @@ async def test_autonomous_mode_yields_output_without_user_request(): outputs = [ev for ev in events if ev.type == "output"] assert outputs, "Autonomous mode should yield a workflow output" - final_conversation = outputs[-1].data - assert isinstance(final_conversation, list) - conversation_list = cast(list[Message], final_conversation) - assert any(msg.role == "assistant" and (msg.text or "").startswith("specialist reply") for msg in conversation_list) + # Per-agent activity surfaces as `output` events from each HandoffAgentExecutor as they + # speak. Handoff has no orchestrator that produces a separate "answer" — the conversation + # IS the result. In streaming mode payloads are AgentResponseUpdate; combined text should + # contain the specialist's reply. + payloads = [ev.data for ev in outputs if isinstance(ev.data, (AgentResponse, AgentResponseUpdate))] + combined = " ".join( + getattr(p, "text", None) or " ".join(m.text for m in getattr(p, "messages", [])) for p in payloads + ) + assert "specialist reply" in combined async def test_autonomous_mode_resumes_user_input_on_turn_limit(): @@ -923,14 +930,10 @@ async def async_termination(conv: list[Message]) -> bool: stream=True, responses={requests[-1].request_id: [Message(role="user", contents=["Second user message"])]} ) ) - outputs = [ev for ev in events if ev.type == "output"] - assert len(outputs) == 1 - - final_conversation = outputs[0].data - assert isinstance(final_conversation, list) - final_conv_list = cast(list[Message], final_conversation) - user_messages = [msg for msg in final_conv_list if msg.role == "user"] - assert len(user_messages) == 2 + # Resume run terminates without further agent activity once the second user message + # satisfies the termination condition. The workflow returns to idle cleanly. + idle_states = [ev for ev in events if ev.type == "status" and ev.state == WorkflowRunState.IDLE] + assert idle_states, "Workflow should become idle after termination" assert termination_call_count > 0 @@ -990,8 +993,9 @@ async def _get() -> ChatResponse: outputs = [event for event in events if event.type == "output"] assert outputs - conversation_outputs = [event for event in outputs if isinstance(event.data, list)] - assert len(conversation_outputs) == 1 + # Per-agent activity surfaces as output events (AgentResponseUpdate in streaming mode). + agent_payloads = [event for event in outputs if isinstance(event.data, (AgentResponse, AgentResponseUpdate))] + assert len(agent_payloads) >= 1 async def test_tool_choice_preserved_from_agent_config(): diff --git a/python/packages/orchestrations/tests/test_magentic.py b/python/packages/orchestrations/tests/test_magentic.py index b87de8c6f4..0389fad94e 100644 --- a/python/packages/orchestrations/tests/test_magentic.py +++ b/python/packages/orchestrations/tests/test_magentic.py @@ -190,24 +190,82 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None: assert isinstance(workflow, Workflow) - outputs: list[Message] = [] + updates: list[AgentResponseUpdate] = [] orchestrator_event_count = 0 async for event in workflow.run("compose summary", stream=True): - if event.type == "output": - msg = event.data - if isinstance(msg, list): - outputs.extend(cast(list[Message], msg)) + if event.type == "output" and isinstance(event.data, AgentResponseUpdate): + updates.append(event.data) elif event.type == "magentic_orchestrator": orchestrator_event_count += 1 - assert outputs, "Expected a final output message" - assert len(outputs) >= 1 - final = outputs[-1] + assert updates, "Expected a final output update" + final = updates[-1] assert final.text == manager.FINAL_ANSWER assert final.author_name == manager.name assert orchestrator_event_count > 0, "Expected orchestrator events to be emitted" +async def test_magentic_final_answer_yields_update_in_streaming() -> None: + """In streaming mode, Magentic's manager final-answer surfaces as `AgentResponseUpdate`. + + Mirrors AgentExecutor's mode-aware behavior: streaming workflows produce per-chunk + `AgentResponseUpdate` events; the synthesized final answer is logically a single chunk, + so it surfaces as a single `AgentResponseUpdate`. + """ + manager = FakeManager() + workflow = MagenticBuilder( + participants=[StubAgent(manager.next_speaker_name, "first draft")], + manager=manager, + ).build() + + terminal: AgentResponseUpdate | None = None + async for event in workflow.run("compose summary", stream=True): + if event.type == "output": + terminal = event.data + + assert isinstance(terminal, AgentResponseUpdate), ( + f"Expected AgentResponseUpdate in streaming mode, got {type(terminal).__name__}" + ) + assert terminal.text == manager.FINAL_ANSWER + assert terminal.author_name == manager.name + + +async def test_magentic_final_answer_yields_response_in_non_streaming() -> None: + """In non-streaming mode, Magentic's manager final-answer surfaces as `AgentResponse`.""" + manager = FakeManager() + workflow = MagenticBuilder( + participants=[StubAgent(manager.next_speaker_name, "first draft")], + manager=manager, + ).build() + + events = await workflow.run("compose summary") + outputs = [ev for ev in events if ev.type == "output"] + assert len(outputs) == 1 + assert isinstance(outputs[0].data, AgentResponse) + assert outputs[0].data.messages[-1].text == manager.FINAL_ANSWER + + +async def test_magentic_limit_termination_yields_update_in_streaming() -> None: + """In streaming mode, Magentic's round-limit termination surfaces as `AgentResponseUpdate`.""" + manager = FakeManager(max_round_count=1) + workflow = MagenticBuilder( + participants=[DummyExec(name=manager.next_speaker_name)], + manager=manager, + ).build() + + terminal: AgentResponseUpdate | None = None + async for event in workflow.run("round limit test", stream=True): + if event.type == "output": + terminal = event.data + + assert isinstance(terminal, AgentResponseUpdate), ( + f"Expected AgentResponseUpdate in streaming mode, got {type(terminal).__name__}" + ) + # Either the final answer OR the round-limit termination message — both are valid terminal states + # for max_round_count=1; the precise one depends on FakeManager's progression. + assert terminal.text + + async def test_magentic_as_agent_does_not_accept_conversation() -> None: manager = FakeManager() writer = StubAgent(manager.next_speaker_name, "summary response") @@ -250,7 +308,7 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): assert isinstance(req_event.data, MagenticPlanReviewRequest) completed = False - output: list[Message] | None = None + output: AgentResponseUpdate | None = None async for ev in wf.run(stream=True, responses={req_event.request_id: req_event.data.approve()}): if ev.type == "status" and ev.state == WorkflowRunState.IDLE: completed = True @@ -261,8 +319,8 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): assert completed assert output is not None - assert isinstance(output, list) - assert all(isinstance(msg, Message) for msg in output) + # Streaming mode: terminal output is AgentResponseUpdate. + assert isinstance(output, AgentResponseUpdate) async def test_magentic_plan_review_with_revise(): @@ -333,14 +391,12 @@ async def test_magentic_orchestrator_round_limit_produces_partial_result(): None, ) assert idle_status is not None - # Check that we got workflow output via WorkflowEvent with type "output" + # Streaming mode: terminal output is AgentResponseUpdate. output_event = next((e for e in events if e.type == "output"), None) assert output_event is not None data = output_event.data - assert isinstance(data, list) - assert len(data) > 0 # type: ignore - assert data[-1].role == "assistant" # type: ignore - assert all(isinstance(msg, Message) for msg in data) # type: ignore + assert isinstance(data, AgentResponseUpdate) + assert data.role == "assistant" async def test_magentic_checkpoint_resume_round_trip(): @@ -578,7 +634,7 @@ async def _collect_agent_responses_setup(participant: SupportsAgentRun) -> list[ # Run a bounded stream to allow one invoke and then completion events: list[WorkflowEvent] = [] - async for ev in wf.run("task", stream=True): # plan review disabled + async for ev in wf.run("task", stream=True): events.append(ev) # Capture streaming updates (type="output" with AgentResponseUpdate data) if ev.type == "output" and isinstance(ev.data, AgentResponseUpdate): @@ -753,11 +809,9 @@ async def test_magentic_stall_and_reset_reach_limits(): assert idle_status is not None output_event = next((e for e in events if e.type == "output"), None) assert output_event is not None - assert isinstance(output_event.data, list) - assert all(isinstance(msg, Message) for msg in output_event.data) # type: ignore - assert len(output_event.data) > 0 # type: ignore - assert output_event.data[-1].text is not None # type: ignore - assert output_event.data[-1].text == "Workflow terminated due to reaching maximum reset count." # type: ignore + # Streaming mode: terminal output is AgentResponseUpdate. + assert isinstance(output_event.data, AgentResponseUpdate) + assert output_event.data.text == "Workflow terminated due to reaching maximum reset count." async def test_magentic_checkpoint_runtime_only() -> None: diff --git a/python/packages/orchestrations/tests/test_sequential.py b/python/packages/orchestrations/tests/test_sequential.py index 0f000ef254..d119a20120 100644 --- a/python/packages/orchestrations/tests/test_sequential.py +++ b/python/packages/orchestrations/tests/test_sequential.py @@ -22,6 +22,7 @@ ) from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage from agent_framework.orchestrations import SequentialBuilder +from typing_extensions import Never class _EchoAgent(BaseAgent): @@ -67,16 +68,20 @@ async def _run() -> AgentResponse: return _run() -class _SummarizerExec(Executor): - """Custom executor that summarizes by appending a short assistant message.""" +class _SummarizerTerminator(Executor): + """Custom-executor terminator that yields a synthesized summary as the workflow's final answer.""" @handler - async def summarize(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext[list[Message]]) -> None: + async def summarize( + self, + agent_response: AgentExecutorResponse, + ctx: WorkflowContext[Never, AgentResponse], + ) -> None: conversation = agent_response.full_conversation or [] user_texts = [m.text for m in conversation if m.role == "user"] agents = [m.author_name or m.role for m in conversation if m.role == "assistant"] summary = Message("assistant", [f"Summary of users:{len(user_texts)} agents:{len(agents)}"]) - await ctx.send_message(list(conversation) + [summary]) + await ctx.yield_output(AgentResponse(messages=[summary])) class _InvalidExecutor(Executor): @@ -98,58 +103,91 @@ def test_sequential_builder_validation_rejects_invalid_executor() -> None: SequentialBuilder(participants=[_EchoAgent(id="agent1", name="A1"), _InvalidExecutor(id="invalid")]).build() -async def test_sequential_agents_append_to_context() -> None: +async def test_sequential_streaming_yields_only_last_agent_updates() -> None: + """Streaming mode surfaces only the last agent's AgentResponseUpdate chunks as outputs. + + Intermediate agents do NOT emit `output` events; only the last agent (the workflow's + output_executor) emits chunks of the final answer. + """ a1 = _EchoAgent(id="agent1", name="A1") a2 = _EchoAgent(id="agent2", name="A2") wf = SequentialBuilder(participants=[a1, a2]).build() completed = False - output: list[Message] | None = None + update_events: list[AgentResponseUpdate] = [] async for ev in wf.run("hello sequential", stream=True): if ev.type == "status" and ev.state == WorkflowRunState.IDLE: completed = True elif ev.type == "output": - output = ev.data # type: ignore[assignment] - if completed and output is not None: + update_events.append(ev.data) # type: ignore[arg-type] + if completed: break assert completed - assert output is not None - assert isinstance(output, list) - msgs: list[Message] = output - assert len(msgs) == 3 - assert msgs[0].role == "user" and "hello sequential" in msgs[0].text - assert msgs[1].role == "assistant" and (msgs[1].author_name == "A1" or True) - assert msgs[2].role == "assistant" and (msgs[2].author_name == "A2" or True) - assert "A1 reply" in msgs[1].text - assert "A2 reply" in msgs[2].text + # Only the last agent's streaming chunks surface as `output` events. + assert update_events, "Expected at least one streaming update from the last agent" + for upd in update_events: + assert isinstance(upd, AgentResponseUpdate) + combined_text = "".join(u.text for u in update_events if hasattr(u, "text")) + assert "A2 reply" in combined_text + assert "A1 reply" not in combined_text + + +async def test_sequential_non_streaming_yields_only_last_agent_response() -> None: + """Non-streaming mode emits a single `output` event with the last agent's AgentResponse.""" + a1 = _EchoAgent(id="agent1", name="A1") + a2 = _EchoAgent(id="agent2", name="A2") + + wf = SequentialBuilder(participants=[a1, a2]).build() + + output_events = [ev for ev in await wf.run("hello sequential") if ev.type == "output"] + assert len(output_events) == 1 + response = output_events[0].data + assert isinstance(response, AgentResponse) + assert all(m.role == "assistant" for m in response.messages) + combined = " ".join(m.text for m in response.messages) + assert "A2 reply" in combined + assert "A1 reply" not in combined + + +async def test_sequential_as_agent_returns_only_last_agent_response() -> None: + """`workflow.as_agent().run(prompt)` returns ONLY the last agent's messages — not the user + input or earlier agents' replies. This is the core fix for the orchestration-as-agent + output contract.""" + a1 = _EchoAgent(id="agent1", name="A1") + a2 = _EchoAgent(id="agent2", name="A2") + + agent = SequentialBuilder(participants=[a1, a2]).build().as_agent() + response = await agent.run("hello as_agent") + + assert isinstance(response, AgentResponse) + # Only the last agent's reply — no user prompt, no agent1 messages. + combined = " ".join(m.text for m in response.messages) + assert "A2 reply" in combined + assert "A1 reply" not in combined + assert "hello as_agent" not in combined async def test_sequential_with_custom_executor_summary() -> None: + """A custom-executor terminator yields its own AgentResponse — that becomes the workflow output. + + Custom executors used as the terminator must call `ctx.yield_output(AgentResponse(...))` + directly (rather than `ctx.send_message(list[Message])` like an intermediate executor would), + because the terminator IS the workflow's output executor. + """ a1 = _EchoAgent(id="agent1", name="A1") - summarizer = _SummarizerExec(id="summarizer") + summarizer = _SummarizerTerminator(id="summarizer") wf = SequentialBuilder(participants=[a1, summarizer]).build() - completed = False - output: list[Message] | None = None - async for ev in wf.run("topic X", stream=True): - if ev.type == "status" and ev.state == WorkflowRunState.IDLE: - completed = True - elif ev.type == "output": - output = ev.data - if completed and output is not None: - break - - assert completed - assert output is not None - msgs: list[Message] = output - # Expect: [user, A1 reply, summary] - assert len(msgs) == 3 - assert msgs[0].role == "user" - assert msgs[1].role == "assistant" and "A1 reply" in msgs[1].text - assert msgs[2].role == "assistant" and msgs[2].text.startswith("Summary of users:") + output_events = [ev for ev in await wf.run("topic X") if ev.type == "output"] + assert len(output_events) == 1 + response = output_events[0].data + assert isinstance(response, AgentResponse) + assert len(response.messages) == 1 + assert response.messages[0].role == "assistant" + assert response.messages[0].text.startswith("Summary of users:") async def test_sequential_checkpoint_resume_round_trip() -> None: @@ -158,14 +196,14 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: initial_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf = SequentialBuilder(participants=list(initial_agents), checkpoint_storage=storage).build() - baseline_output: list[Message] | None = None + baseline_updates: list[AgentResponseUpdate] = [] async for ev in wf.run("checkpoint sequential", stream=True): if ev.type == "output": - baseline_output = ev.data # type: ignore[assignment] + baseline_updates.append(ev.data) # type: ignore[arg-type] if ev.type == "status" and ev.state == WorkflowRunState.IDLE: break - assert baseline_output is not None + assert baseline_updates checkpoints = await storage.list_checkpoints(workflow_name=wf.name) assert checkpoints @@ -175,19 +213,20 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf_resume = SequentialBuilder(participants=list(resumed_agents), checkpoint_storage=storage).build() - resumed_output: list[Message] | None = None + resumed_updates: list[AgentResponseUpdate] = [] async for ev in wf_resume.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True): if ev.type == "output": - resumed_output = ev.data # type: ignore[assignment] + resumed_updates.append(ev.data) # type: ignore[arg-type] if ev.type == "status" and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, ): break - assert resumed_output is not None - assert [m.role for m in resumed_output] == [m.role for m in baseline_output] - assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + assert resumed_updates + baseline_text = "".join(u.text for u in baseline_updates if hasattr(u, "text")) + resumed_text = "".join(u.text for u in resumed_updates if hasattr(u, "text")) + assert baseline_text == resumed_text async def test_sequential_checkpoint_runtime_only() -> None: @@ -197,14 +236,14 @@ async def test_sequential_checkpoint_runtime_only() -> None: agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf = SequentialBuilder(participants=list(agents)).build() - baseline_output: list[Message] | None = None + baseline_updates: list[AgentResponseUpdate] = [] async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True): if ev.type == "output": - baseline_output = ev.data # type: ignore[assignment] + baseline_updates.append(ev.data) # type: ignore[arg-type] if ev.type == "status" and ev.state == WorkflowRunState.IDLE: break - assert baseline_output is not None + assert baseline_updates checkpoints = await storage.list_checkpoints(workflow_name=wf.name) assert checkpoints @@ -214,21 +253,22 @@ async def test_sequential_checkpoint_runtime_only() -> None: resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf_resume = SequentialBuilder(participants=list(resumed_agents)).build() - resumed_output: list[Message] | None = None + resumed_updates: list[AgentResponseUpdate] = [] async for ev in wf_resume.run( checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage, stream=True ): if ev.type == "output": - resumed_output = ev.data # type: ignore[assignment] + resumed_updates.append(ev.data) # type: ignore[arg-type] if ev.type == "status" and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, ): break - assert resumed_output is not None - assert [m.role for m in resumed_output] == [m.role for m in baseline_output] - assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + assert resumed_updates + baseline_text = "".join(u.text for u in baseline_updates if hasattr(u, "text")) + resumed_text = "".join(u.text for u in resumed_updates if hasattr(u, "text")) + assert baseline_text == resumed_text async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None: @@ -390,3 +430,47 @@ async def test_chain_only_agent_responses_three_agents() -> None: # a3 should see only A2's reply assert len(a3.last_messages) == 1 assert a3.last_messages[0].role == "assistant" and "A2 reply" in (a3.last_messages[0].text or "") + + +# --------------------------------------------------------------------------- +# with_request_info tests +# --------------------------------------------------------------------------- + + +async def test_sequential_request_info_last_participant_emits_output() -> None: + """When the last participant is wrapped via with_request_info(), the workflow + still emits a terminal output event after approval. + + This exercises the _EndWithConversation.end_with_agent_executor_response path + that converts the AgentApprovalExecutor's forwarded AgentExecutorResponse into + the workflow's final AgentResponse output. + """ + from agent_framework_orchestrations._orchestration_request_info import AgentRequestInfoResponse + + a1 = _EchoAgent(id="agent1", name="A1") + a2 = _EchoAgent(id="agent2", name="A2") + + wf = SequentialBuilder(participants=[a1, a2]).with_request_info().build() + + # First run: collect request_info events for both agents + request_events: list[Any] = [] + async for ev in wf.run("hello with approval", stream=True): + if ev.type == "request_info" and isinstance(ev.data, AgentExecutorResponse): + request_events.append(ev) + + # Approve each agent in sequence until the workflow completes + while request_events: + responses = {req.request_id: AgentRequestInfoResponse.approve() for req in request_events} + request_events = [] + output_events: list[Any] = [] + async for ev in wf.run(stream=True, responses=responses): + if ev.type == "request_info" and isinstance(ev.data, AgentExecutorResponse): + request_events.append(ev) + elif ev.type == "output": + output_events.append(ev) + + # The workflow must produce a terminal output with the last agent's response. + assert len(output_events) == 1 + response = output_events[0].data + assert isinstance(response, AgentResponse) + assert any("A2 reply" in m.text for m in response.messages) diff --git a/python/samples/03-workflows/agents/sequential_workflow_as_agent.py b/python/samples/03-workflows/agents/sequential_workflow_as_agent.py index 52de975173..120bd448aa 100644 --- a/python/samples/03-workflows/agents/sequential_workflow_as_agent.py +++ b/python/samples/03-workflows/agents/sequential_workflow_as_agent.py @@ -26,8 +26,8 @@ You can safely ignore them when focusing on agent progress. Prerequisites: -- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint. -- FOUNDRY_MODEL must be set to your Azure OpenAI model deployment name. +- FOUNDRY_PROJECT_ENDPOINT must be set to the Azure Foundry project endpoint. +- FOUNDRY_MODEL must be set to the model name for the Foundry chat client. """ @@ -68,28 +68,18 @@ async def main() -> None: """ Sample Output: - ===== Final Conversation ===== + ===== Conversation ===== ------------------------------------------------------------ - 01 [user] - Write a tagline for a budget-friendly eBike. - ------------------------------------------------------------ - 02 [writer] - Ride farther, spend less—your affordable eBike adventure starts here. - ------------------------------------------------------------ - 03 [reviewer] - This tagline clearly communicates affordability and the benefit of extended travel, making it - appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could - be slightly shorter for more punch. Overall, a strong and effective suggestion! - - ===== as_agent() Conversation ===== - ------------------------------------------------------------ - 01 [writer] - Go electric, save big—your affordable ride awaits! - ------------------------------------------------------------ - 02 [reviewer] + 01 [reviewer] Catchy and straightforward! The tagline clearly emphasizes both the electric aspect and the affordability of the eBike. It's inviting and actionable. For even more impact, consider making it slightly shorter: "Go electric, save big." Overall, this is an effective and appealing suggestion for a budget-friendly eBike. + + Note: + `workflow.as_agent()` returns ONLY the final agent's response (the "answer") — the prior agents' work + is not included in the response. To observe intermediate agents while running as an agent, build with + `SequentialBuilder(participants=[...], intermediate_outputs=True)`; the intermediate replies are then + surfaced as `data` events and merged into the AgentResponse. """ diff --git a/python/samples/03-workflows/orchestrations/sequential_custom_executors.py b/python/samples/03-workflows/orchestrations/sequential_custom_executors.py index a4fb2d602b..b907c24cdb 100644 --- a/python/samples/03-workflows/orchestrations/sequential_custom_executors.py +++ b/python/samples/03-workflows/orchestrations/sequential_custom_executors.py @@ -2,11 +2,11 @@ import asyncio import os -from typing import Any from agent_framework import ( Agent, AgentExecutorResponse, + AgentResponse, Executor, Message, WorkflowContext, @@ -16,6 +16,7 @@ from agent_framework.orchestrations import SequentialBuilder from azure.identity import AzureCliCredential from dotenv import load_dotenv +from typing_extensions import Never # Load environment variables from .env file load_dotenv() @@ -25,13 +26,14 @@ This demonstrates how SequentialBuilder chains participants with a shared conversation context (list[Message]). An agent produces content; a custom -executor appends a compact summary to the conversation. The workflow completes -after all participants have executed in sequence, and the final output contains -the complete conversation. +executor synthesizes a compact summary and yields it as the workflow's terminal +output. Custom executor contract: -- Provide at least one @handler accepting AgentExecutorResponse and a WorkflowContext[list[Message]] -- Emit the updated conversation via ctx.send_message([...]) +- Intermediate custom executors: handle the message type from the prior participant + and forward `list[Message]` via `ctx.send_message(...)` for the next participant. +- Terminator custom executors: handle the message type from the prior participant and + yield the workflow's final answer as an `AgentResponse` via `ctx.yield_output(...)`. Prerequisites: - FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint. @@ -41,27 +43,29 @@ class Summarizer(Executor): - """Simple summarizer: consumes full conversation and appends an assistant summary.""" + """Terminator custom executor: synthesizes a one-line summary as the workflow's final answer.""" @handler - async def summarize(self, agent_response: AgentExecutorResponse, ctx: WorkflowContext[list[Message]]) -> None: - """Append a summary message to a copy of the full conversation. - - Note: A custom executor must be able to handle the message type from the prior participant, and produce - the message type expected by the next participant. In this case, the prior participant is an agent thus - the input is AgentExecutorResponse (an agent will be wrapped in an AgentExecutor, which produces - `AgentExecutorResponse`). If the next participant is also an agent or this is the final participant, - the output must be `list[Message]`. + async def summarize( + self, + agent_response: AgentExecutorResponse, + ctx: WorkflowContext[Never, AgentResponse], + ) -> None: + """Yield a terminal AgentResponse containing the summary. + + The prior participant is an agent, which is wrapped in an `AgentExecutor` that + produces `AgentExecutorResponse`. As the last participant in the sequential workflow, + this executor calls `ctx.yield_output(AgentResponse(...))` so its output becomes the + workflow's terminal output (rather than being forwarded to a downstream participant). """ if not agent_response.full_conversation: - await ctx.send_message([Message("assistant", ["No conversation to summarize."])]) + await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])])) return users = sum(1 for m in agent_response.full_conversation if m.role == "user") assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant") summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"]) - final_conversation = list(agent_response.full_conversation) + [summary] - await ctx.send_message(final_conversation) + await ctx.yield_output(AgentResponse(messages=[summary])) async def main() -> None: @@ -81,33 +85,20 @@ async def main() -> None: summarizer = Summarizer(id="summarizer") workflow = SequentialBuilder(participants=[content, summarizer]).build() - # 3) Run workflow and extract final conversation + # 3) Run workflow and extract the final summary events = await workflow.run("Explain the benefits of budget eBikes for commuters.") outputs = events.get_outputs() if outputs: - print("===== Final Conversation =====") - messages: list[Message] | Any = outputs[0] - for i, msg in enumerate(messages, start=1): - name = msg.author_name or ("assistant" if msg.role == "assistant" else "user") - print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}") + print("===== Final Summary =====") + final: AgentResponse = outputs[0] + for msg in final.messages: + print(msg.text) """ Sample Output: - ------------------------------------------------------------ - 01 [user] - Explain the benefits of budget eBikes for commuters. - ------------------------------------------------------------ - 02 [content] - Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport. - Their electric assistance reduces physical strain and allows riders to cover longer distances quickly, - minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible - for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions, - supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and - sustainable transportation for daily commuting needs. - ------------------------------------------------------------ - 03 [assistant] + ===== Final Summary ===== Summary -> users:1 assistants:1 """ diff --git a/python/uv.lock b/python/uv.lock index 4e60e5cfd6..8fb84afa44 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -585,7 +585,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "agent-framework-core", editable = "packages/core" }, - { name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = "<=0.2.1,>=0.2.1" }, + { name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = ">=0.2.1,<=0.2.1" }, ] [[package]]