From f3db021f31cbe7fdf547b4e6e40b1050a3f78c24 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Fri, 7 Nov 2025 15:20:55 -0800 Subject: [PATCH 1/7] Handle agent user input request in AgentExecutor --- .../_workflows/_agent_executor.py | 180 +++++++--- .../tests/workflow/test_agent_executor.py | 4 + .../test_agent_executor_tool_calls.py | 245 ++++++++++++- .../agents_with_approval_requests.py | 333 ++++++++++++++++++ 4 files changed, 716 insertions(+), 46 deletions(-) create mode 100644 python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 0c05abbb69..6882bfe749 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -2,11 +2,14 @@ import logging from dataclasses import dataclass -from typing import Any +from typing import Any, cast + +from agent_framework import FunctionApprovalRequestContent, FunctionApprovalResponseContent from .._agents import AgentProtocol, ChatAgent from .._threads import AgentThread from .._types import AgentRunResponse, AgentRunResponseUpdate, ChatMessage +from ._checkpoint_encoding import decode_checkpoint_value, encode_checkpoint_value from ._conversation_state import encode_chat_messages from ._events import ( AgentRunEvent, @@ -14,6 +17,7 @@ ) from ._executor import Executor, handler from ._message_utils import normalize_messages_input +from ._request_info_mixin import response_handler from ._workflow_context import WorkflowContext logger = logging.getLogger(__name__) @@ -83,6 +87,8 @@ def __init__( super().__init__(exec_id) self._agent = agent self._agent_thread = agent_thread or self._agent.get_new_thread() + self._pending_agent_requests: dict[str, FunctionApprovalRequestContent] = {} + self._pending_responses_to_agent: list[FunctionApprovalResponseContent] = [] self._output_response = output_response self._cache: list[ChatMessage] = [] @@ -93,50 +99,6 @@ def workflow_output_types(self) -> list[type[Any]]: return [AgentRunResponse] return [] - async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]) -> None: - """Execute the underlying agent, emit events, and enqueue response. - - Checks ctx.is_streaming() to determine whether to emit incremental AgentRunUpdateEvent - events (streaming mode) or a single AgentRunEvent (non-streaming mode). - """ - if ctx.is_streaming(): - # Streaming mode: emit incremental updates - updates: list[AgentRunResponseUpdate] = [] - async for update in self._agent.run_stream( - self._cache, - thread=self._agent_thread, - ): - updates.append(update) - await ctx.add_event(AgentRunUpdateEvent(self.id, update)) - - if isinstance(self._agent, ChatAgent): - response_format = self._agent.chat_options.response_format - response = AgentRunResponse.from_agent_run_response_updates( - updates, - output_format_type=response_format, - ) - else: - response = AgentRunResponse.from_agent_run_response_updates(updates) - else: - # Non-streaming mode: use run() and emit single event - response = await self._agent.run( - self._cache, - thread=self._agent_thread, - ) - await ctx.add_event(AgentRunEvent(self.id, response)) - - if self._output_response: - await ctx.yield_output(response) - - # Always construct a full conversation snapshot from inputs (cache) - # plus agent outputs (agent_run_response.messages). Do not mutate - # response.messages so AgentRunEvent remains faithful to the raw output. - full_conversation: list[ChatMessage] = list(self._cache) + list(response.messages) - - agent_response = AgentExecutorResponse(self.id, response, full_conversation=full_conversation) - await ctx.send_message(agent_response) - self._cache.clear() - @handler async def run( self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse] @@ -192,6 +154,31 @@ async def from_messages( self._cache = normalize_messages_input(messages) await self._run_agent_and_emit(ctx) + @response_handler + async def handle_user_input_response( + self, + original_request: FunctionApprovalRequestContent, + response: FunctionApprovalResponseContent, + ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse], + ) -> None: + """Handle user input responses for function approvals during agent execution. + + This will hold the executor's execution until all pending user input requests are resolved. + + Args: + original_request: The original function approval request sent by the agent. + response: The user's response to the function approval request. + ctx: The workflow context for emitting events and outputs. + """ + self._pending_responses_to_agent.append(response) + self._pending_agent_requests.pop(original_request.id, None) + + if not self._pending_agent_requests: + # All pending requests have been resolved; resume agent execution + self._cache = normalize_messages_input(ChatMessage(role="user", contents=self._pending_responses_to_agent)) + self._pending_responses_to_agent.clear() + await self._run_agent_and_emit(ctx) + async def snapshot_state(self) -> dict[str, Any]: """Capture current executor state for checkpointing. @@ -226,6 +213,8 @@ async def snapshot_state(self) -> dict[str, Any]: return { "cache": encode_chat_messages(self._cache), "agent_thread": serialized_thread, + "pending_agent_requests": encode_checkpoint_value(self._pending_agent_requests), + "pending_responses_to_agent": encode_checkpoint_value(self._pending_responses_to_agent), } async def restore_state(self, state: dict[str, Any]) -> None: @@ -258,7 +247,108 @@ async def restore_state(self, state: dict[str, Any]) -> None: else: self._agent_thread = self._agent.get_new_thread() + pending_requests_payload = state.get("pending_agent_requests") + if pending_requests_payload: + self._pending_agent_requests = decode_checkpoint_value(pending_requests_payload) + + pending_responses_payload = state.get("pending_responses_to_agent") + if pending_responses_payload: + self._pending_responses_to_agent = decode_checkpoint_value(pending_responses_payload) + def reset(self) -> None: """Reset the internal cache of the executor.""" logger.debug("AgentExecutor %s: Resetting cache", self.id) self._cache.clear() + + async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]) -> None: + """Execute the underlying agent, emit events, and enqueue response. + + Checks ctx.is_streaming() to determine whether to emit incremental AgentRunUpdateEvent + events (streaming mode) or a single AgentRunEvent (non-streaming mode). + """ + if ctx.is_streaming(): + # Streaming mode: emit incremental updates + response = await self._run_agent_streaming(cast(WorkflowContext, ctx)) + else: + # Non-streaming mode: use run() and emit single event + response = await self._run_agent(cast(WorkflowContext, ctx)) + + if response is None: + # Agent did not complete (e.g., waiting for user input); do not emit response + return + + if self._output_response: + await ctx.yield_output(response) + + # Always construct a full conversation snapshot from inputs (cache) + # plus agent outputs (agent_run_response.messages). Do not mutate + # response.messages so AgentRunEvent remains faithful to the raw output. + full_conversation: list[ChatMessage] = list(self._cache) + list(response.messages) + + agent_response = AgentExecutorResponse(self.id, response, full_conversation=full_conversation) + await ctx.send_message(agent_response) + self._cache.clear() + + async def _run_agent(self, ctx: WorkflowContext) -> AgentRunResponse | None: + """Execute the underlying agent in non-streaming mode. + + Args: + ctx: The workflow context for emitting events. + + Returns: + The complete AgentRunResponse, or None if waiting for user input. + """ + response = await self._agent.run( + self._cache, + thread=self._agent_thread, + ) + await ctx.add_event(AgentRunEvent(self.id, response)) + + # Handle any user input requests + if response.user_input_requests: + for user_input_request in response.user_input_requests: + self._pending_agent_requests[user_input_request.id] = user_input_request + await ctx.request_info(user_input_request, FunctionApprovalResponseContent) + return None + + return response + + async def _run_agent_streaming(self, ctx: WorkflowContext) -> AgentRunResponse | None: + """Execute the underlying agent in streaming mode and collect the full response. + + Args: + ctx: The workflow context for emitting events. + + Returns: + The complete AgentRunResponse, or None if waiting for user input. + """ + updates: list[AgentRunResponseUpdate] = [] + user_input_requests: list[FunctionApprovalRequestContent] = [] + async for update in self._agent.run_stream( + self._cache, + thread=self._agent_thread, + ): + updates.append(update) + await ctx.add_event(AgentRunUpdateEvent(self.id, update)) + + if update.user_input_requests: + user_input_requests.extend(update.user_input_requests) + + # Build the final AgentRunResponse from the collected updates + if isinstance(self._agent, ChatAgent): + response_format = self._agent.chat_options.response_format + response = AgentRunResponse.from_agent_run_response_updates( + updates, + output_format_type=response_format, + ) + else: + response = AgentRunResponse.from_agent_run_response_updates(updates) + + # Handle any user input requests after the streaming completes + if user_input_requests: + for user_input_request in user_input_requests: + self._pending_agent_requests[user_input_request.id] = user_input_request + await ctx.request_info(user_input_request, FunctionApprovalResponseContent) + return None + + return response diff --git a/python/packages/core/tests/workflow/test_agent_executor.py b/python/packages/core/tests/workflow/test_agent_executor.py index 3bda2fcaad..a339265665 100644 --- a/python/packages/core/tests/workflow/test_agent_executor.py +++ b/python/packages/core/tests/workflow/test_agent_executor.py @@ -111,6 +111,10 @@ async def test_agent_executor_checkpoint_stores_and_restores_state() -> None: chat_store_state = thread_state["chat_message_store_state"] # type: ignore[index] assert "messages" in chat_store_state, "Message store state should include messages" + # Verify checkpoint contains pending requests from agents and responses to be sent + assert "pending_agent_requests" in restore_checkpoint.shared_state + assert "pending_responses_to_agent" in restore_checkpoint.shared_state + # Create a new agent and executor for restoration # This simulates starting from a fresh state and restoring from checkpoint restored_agent = _CountingAgent(id="test_agent", name="TestAgent") diff --git a/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py b/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py index 8124f6253d..b0729d45ab 100644 --- a/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py +++ b/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py @@ -3,21 +3,32 @@ """Tests for AgentExecutor handling of tool calls and results in streaming mode.""" from collections.abc import AsyncIterable -from typing import Any +from typing import Any, Never from agent_framework import ( AgentExecutor, + AgentExecutorResponse, AgentRunResponse, AgentRunResponseUpdate, AgentRunUpdateEvent, AgentThread, BaseAgent, + ChatAgent, ChatMessage, + ChatResponse, + ChatResponseUpdate, + FunctionApprovalRequestContent, FunctionCallContent, FunctionResultContent, + RequestInfoEvent, Role, TextContent, WorkflowBuilder, + WorkflowContext, + WorkflowOutputEvent, + ai_function, + executor, + use_function_invocation, ) @@ -120,3 +131,235 @@ async def test_agent_executor_emits_tool_calls_in_streaming_mode() -> None: assert events[3].data is not None assert isinstance(events[3].data.contents[0], TextContent) assert "sunny" in events[3].data.contents[0].text + + +@ai_function(approval_mode="always_require") +def mock_tool_requiring_approval(query: str) -> str: + """Mock tool that requires approval before execution.""" + return f"Executed tool with query: {query}" + + +@use_function_invocation +class MockChatClient: + """Simple implementation of a chat client.""" + + def __init__(self, parallel_request: bool = False) -> None: + self.additional_properties: dict[str, Any] = {} + self._iteration: int = 0 + self._parallel_request: bool = parallel_request + + async def get_response( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage], + **kwargs: Any, + ) -> ChatResponse: + if self._iteration == 0: + if self._parallel_request: + response = ChatResponse( + messages=ChatMessage( + role="assistant", + contents=[ + FunctionCallContent( + call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ), + FunctionCallContent( + call_id="2", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ), + ], + ) + ) + else: + response = ChatResponse( + messages=ChatMessage( + role="assistant", + contents=[ + FunctionCallContent( + call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ) + ], + ) + ) + else: + response = ChatResponse(messages=ChatMessage(role="assistant", text="Tool executed successfully.")) + + self._iteration += 1 + return response + + async def get_streaming_response( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage], + **kwargs: Any, + ) -> AsyncIterable[ChatResponseUpdate]: + if self._iteration == 0: + if self._parallel_request: + yield ChatResponseUpdate( + contents=[ + FunctionCallContent( + call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ), + FunctionCallContent( + call_id="2", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ), + ], + role="assistant", + ) + else: + yield ChatResponseUpdate( + contents=[ + FunctionCallContent( + call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ) + ], + role="assistant", + ) + else: + yield ChatResponseUpdate(text=TextContent(text="Tool executed "), role="assistant") + yield ChatResponseUpdate(contents=[TextContent(text="successfully.")], role="assistant") + + self._iteration += 1 + + +@executor(id="test_executor") +async def test_executor(agent_executor_response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(agent_executor_response.agent_run_response.text) + + +async def test_agent_executor_tool_call_with_approval() -> None: + """Test that AgentExecutor handles tool calls requiring approval.""" + # Arrange + agent = ChatAgent( + chat_client=MockChatClient(), + name="ApprovalAgent", + tools=[mock_tool_requiring_approval], + ) + + workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build() + + # Act + events = await workflow.run("Invoke tool requiring approval") + + # Assert + assert len(events.get_request_info_events()) == 1 + approval_request = events.get_request_info_events()[0] + assert isinstance(approval_request.data, FunctionApprovalRequestContent) + assert approval_request.data.function_call.name == "mock_tool_requiring_approval" + assert approval_request.data.function_call.arguments == '{"query": "test"}' + + # Act + events = await workflow.send_responses({approval_request.request_id: approval_request.data.create_response(True)}) + + # Assert + final_response = events.get_outputs() + assert len(final_response) == 1 + assert final_response[0] == "Tool executed successfully." + + +async def test_agent_executor_tool_call_with_approval_streaming() -> None: + """Test that AgentExecutor handles tool calls requiring approval in streaming mode.""" + # Arrange + agent = ChatAgent( + chat_client=MockChatClient(), + name="ApprovalAgent", + tools=[mock_tool_requiring_approval], + ) + + workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build() + + # Act + request_info_events: list[RequestInfoEvent] = [] + async for event in workflow.run_stream("Invoke tool requiring approval"): + if isinstance(event, RequestInfoEvent): + request_info_events.append(event) + + # Assert + assert len(request_info_events) == 1 + approval_request = request_info_events[0] + assert isinstance(approval_request.data, FunctionApprovalRequestContent) + assert approval_request.data.function_call.name == "mock_tool_requiring_approval" + assert approval_request.data.function_call.arguments == '{"query": "test"}' + + # Act + output: str | None = None + async for event in workflow.send_responses_streaming({ + approval_request.request_id: approval_request.data.create_response(True) + }): + if isinstance(event, WorkflowOutputEvent): + output = event.data + + # Assert + assert output is not None + assert output == "Tool executed successfully." + + +async def test_agent_executor_parallel_tool_call_with_approval() -> None: + """Test that AgentExecutor handles parallel tool calls requiring approval.""" + # Arrange + agent = ChatAgent( + chat_client=MockChatClient(parallel_request=True), + name="ApprovalAgent", + tools=[mock_tool_requiring_approval], + ) + + workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build() + + # Act + events = await workflow.run("Invoke tool requiring approval") + + # Assert + assert len(events.get_request_info_events()) == 2 + for approval_request in events.get_request_info_events(): + assert isinstance(approval_request.data, FunctionApprovalRequestContent) + assert approval_request.data.function_call.name == "mock_tool_requiring_approval" + assert approval_request.data.function_call.arguments == '{"query": "test"}' + + # Act + responses = { + approval_request.request_id: approval_request.data.create_response(True) # type: ignore + for approval_request in events.get_request_info_events() + } + events = await workflow.send_responses(responses) + + # Assert + final_response = events.get_outputs() + assert len(final_response) == 1 + assert final_response[0] == "Tool executed successfully." + + +async def test_agent_executor_parallel_tool_call_with_approval_streaming() -> None: + """Test that AgentExecutor handles parallel tool calls requiring approval in streaming mode.""" + # Arrange + agent = ChatAgent( + chat_client=MockChatClient(parallel_request=True), + name="ApprovalAgent", + tools=[mock_tool_requiring_approval], + ) + + workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build() + + # Act + request_info_events: list[RequestInfoEvent] = [] + async for event in workflow.run_stream("Invoke tool requiring approval"): + if isinstance(event, RequestInfoEvent): + request_info_events.append(event) + + # Assert + assert len(request_info_events) == 2 + for approval_request in request_info_events: + assert isinstance(approval_request.data, FunctionApprovalRequestContent) + assert approval_request.data.function_call.name == "mock_tool_requiring_approval" + assert approval_request.data.function_call.arguments == '{"query": "test"}' + + # Act + responses = { + approval_request.request_id: approval_request.data.create_response(True) # type: ignore + for approval_request in request_info_events + } + + output: str | None = None + async for event in workflow.send_responses_streaming(responses): + if isinstance(event, WorkflowOutputEvent): + output = event.data + + # Assert + assert output is not None + assert output == "Tool executed successfully." diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py new file mode 100644 index 0000000000..d5b914455b --- /dev/null +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -0,0 +1,333 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import json +from dataclasses import dataclass +from typing import Annotated, Never + +from agent_framework import ( + AgentExecutorResponse, + ChatMessage, + Executor, + FunctionApprovalRequestContent, + FunctionApprovalResponseContent, + WorkflowBuilder, + WorkflowContext, + ai_function, + executor, + handler, +) +from agent_framework.openai import OpenAIChatClient + +""" +Sample: Agents in a workflow with AI functions requiring approval + +This sample creates a workflow that automatically reply to incoming emails. +If historical email data is needed, it uses an AI function to read the data, +which requires human approval before execution. + +This sample works as follows: +1. An incoming email is received by the workflow. +2. The EmailPreprocessor executor preprocesses the email, adding special notes if the sender is important. +3. The preprocessed email is sent to the Email Writer agent, which generates a response. +4. If the agent needs to read historical email data, it calls the read_historical_email_data AI function, + which triggers an approval request. +5. The sample automatically approves the request for demonstration purposes. +6. Once approved, the AI function executes and returns the historical email data to the agent. +7. The agent uses the historical data to compose a comprehensive email response. +8. The response is sent to the conclude_workflow_executor, which yields the final response. + +Purpose: +Show how to integrate AI functions with approval requests into a workflow. + +Demonstrate: +- Creating AI functions that require approval before execution. +- Building a workflow that includes an agent and executors. +- Handling approval requests during workflow execution. + +Prerequisites: +- Azure AI Agent Service configured, along with the required environment variables. +- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample. +- Basic familiarity with WorkflowBuilder, edges, events, RequestInfoEvent, and streaming runs. +""" + + +@ai_function +def get_current_date() -> str: + """Get the current date in YYYY-MM-DD format.""" + # For demonstration purposes, we return a fixed date. + return "2025-11-07" + + +@ai_function +def get_team_members_email_addresses() -> list[dict[str, str]]: + """Get the email addresses of team members.""" + # In a real implementation, this might query a database or directory service. + return [ + { + "name": "Alice", + "email": "alice@contoso.com", + "position": "Software Engineer", + "manager": "John Doe", + }, + { + "name": "Bob", + "email": "bob@contoso.com", + "position": "Product Manager", + "manager": "John Doe", + }, + { + "name": "Charlie", + "email": "charlie@contoso.com", + "position": "Senior Software Engineer", + "manager": "John Doe", + }, + { + "name": "Mike", + "email": "mike@contoso.com", + "position": "Principal Software Engineer Manager", + "manager": "VP of Engineering", + }, + ] + + +@ai_function +def get_my_information() -> dict[str, str]: + """Get my personal information.""" + return { + "name": "John Doe", + "email": "john@contoso.com", + "position": "Software Engineer Manager", + "manager": "Mike", + } + + +@ai_function(approval_mode="always_require") +async def read_historical_email_data( + email_address: Annotated[str, "The email address to read historical data from"], + start_date: Annotated[str, "The start date in YYYY-MM-DD format"], + end_date: Annotated[str, "The end date in YYYY-MM-DD format"], +) -> list[dict[str, str]]: + """Read historical email data for a given email address and date range.""" + historical_data = { + "alice@contoso.com": [ + { + "from": "alice@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-05", + "subject": "Bug Bash Results", + "body": "We just completed the bug bash and found a few issues that need immediate attention.", + }, + { + "from": "alice@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-03", + "subject": "Code Freeze", + "body": "We are entering code freeze starting tomorrow.", + }, + ], + "bob@contoso.com": [ + { + "from": "bob@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-04", + "subject": "Team Outing", + "body": "Don't forget about the team outing this Friday!", + }, + { + "from": "bob@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-02", + "subject": "Requirements Update", + "body": "The requirements for the new feature have been updated. Please review them.", + }, + ], + "charlie@contoso.com": [ + { + "from": "charlie@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-05", + "subject": "Project Update", + "body": "The bug bash went well. A few critical bugs but should be fixed by the end of the week.", + }, + { + "from": "charlie@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-06", + "subject": "Code Review", + "body": "Please review my latest code changes.", + }, + ], + } + + emails = historical_data.get(email_address, []) + return [email for email in emails if start_date <= email["date"] <= end_date] + + +@ai_function(approval_mode="always_require") +async def send_email( + to: Annotated[str, "The recipient email address"], + subject: Annotated[str, "The email subject"], + body: Annotated[str, "The email body"], +) -> str: + """Send an email.""" + await asyncio.sleep(1) # Simulate sending email + return "Email successfully sent." + + +@dataclass +class Email: + sender: str + subject: str + body: str + + +class EmailPreprocessor(Executor): + def __init__(self, special_email_addresses: set[str]) -> None: + super().__init__(id="email_preprocessor") + self.special_email_addresses = special_email_addresses + + @handler + async def preprocess(self, email: Email, ctx: WorkflowContext[str]) -> None: + """Preprocess the incoming email.""" + message = str(email) + if email.sender in self.special_email_addresses: + note = ( + "Pay special attention to this sender. This email is very important. " + "Gather relevant information from all previous emails within my team before responding." + ) + message = f"{note}\n\n{message}" + + await ctx.send_message(message) + + +@executor(id="conclude_workflow_executor") +async def conclude_workflow( + email_response: AgentExecutorResponse, + ctx: WorkflowContext[Never, str], +) -> None: + """Conclude the workflow by yielding the final email response.""" + await ctx.yield_output(email_response.agent_run_response.text) + + +async def main() -> None: + chat_client = OpenAIChatClient() + email_writer = chat_client.create_agent( + name="Email Writer", + instructions=("You are an excellent email assistant. You respond to incoming emails."), + tools=[ + read_historical_email_data, + send_email, + get_current_date, + get_team_members_email_addresses, + get_my_information, + ], + ) + + email_preprocessor = EmailPreprocessor(special_email_addresses={"mike@contoso.com"}) + workflow = ( + WorkflowBuilder() + .set_start_executor(email_preprocessor) + .add_edge(email_preprocessor, email_writer) + .add_edge(email_writer, conclude_workflow) + .build() + ) + + # Simulate an incoming email + incoming_email = Email( + sender="mike@contoso.com", + subject="Important: Project Update", + body="Please provide your team's status update on the project since last week.", + ) + + responses: dict[str, FunctionApprovalResponseContent] = {} + output: list[ChatMessage] | None = None + while True: + if responses: + events = await workflow.send_responses(responses) + responses.clear() + else: + events = await workflow.run(incoming_email) + + request_info_events = events.get_request_info_events() + for request_info_event in request_info_events: + if not isinstance(request_info_event.data, FunctionApprovalRequestContent): + raise ValueError(f"Unexpected request info content type: {type(request_info_event.data)}") + + arguments = json.dumps( + json.loads(request_info_event.data.function_call.arguments) + if isinstance(request_info_event.data.function_call.arguments, str) + else request_info_event.data.function_call.arguments, + indent=2, + ) + print( + f"Received approval request for function: {request_info_event.data.function_call.name} " + f"with args:\n{arguments}" + ) + print("Performing automatic approval for demo purposes...") + responses[request_info_event.request_id] = request_info_event.data.create_response(approved=True) + + if outputs := events.get_outputs(): + output = outputs[0] + break + + if not output: + raise RuntimeError("Workflow did not produce any output event.") + + print("Final email response conversation:") + print(output) + + """ + Sample Output: + Received approval request for function: read_historical_email_data with args: + { + "email_address": "alice@contoso.com", + "start_date": "2025-10-31", + "end_date": "2025-11-07" + } + Performing automatic approval for demo purposes... + Received approval request for function: read_historical_email_data with args: + { + "email_address": "bob@contoso.com", + "start_date": "2025-10-31", + "end_date": "2025-11-07" + } + Performing automatic approval for demo purposes... + Received approval request for function: read_historical_email_data with args: + { + "email_address": "charlie@contoso.com", + "start_date": "2025-10-31", + "end_date": "2025-11-07" + } + Performing automatic approval for demo purposes... + Received approval request for function: send_email with args: + { + "to": "mike@contoso.com", + "subject": "Team's Status Update on the Project", + "body": " + Hi Mike, + + Here's the status update from our team: + - **Bug Bash and Code Freeze:** + - We recently completed a bug bash, during which several issues were identified. Alice and Charlie are working on fixing these critical bugs, and we anticipate resolving them by the end of this week. + - We have entered a code freeze as of November 4, 2025. + + - **Requirements Update:** + - Bob has updated the requirements for a new feature, and all team members are reviewing these changes to ensure alignment. + + - **Ongoing Reviews:** + - Charlie has submitted his latest code changes for review to ensure they meet our quality standards. + + Please let me know if you need more detailed information or have any questions. + + Best regards, + John" + } + Performing automatic approval for demo purposes... + Final email response conversation: + I've sent the status update to Mike with the relevant information from the team. Let me know if there's anything else you need + """ # noqa: E501 + + +if __name__ == "__main__": + asyncio.run(main()) From 2929306138451fda905c8beb0fb894564ea8942c Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Fri, 7 Nov 2025 15:40:43 -0800 Subject: [PATCH 2/7] fix test --- python/packages/core/tests/workflow/test_agent_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/packages/core/tests/workflow/test_agent_executor.py b/python/packages/core/tests/workflow/test_agent_executor.py index a339265665..77fd969f12 100644 --- a/python/packages/core/tests/workflow/test_agent_executor.py +++ b/python/packages/core/tests/workflow/test_agent_executor.py @@ -112,8 +112,8 @@ async def test_agent_executor_checkpoint_stores_and_restores_state() -> None: assert "messages" in chat_store_state, "Message store state should include messages" # Verify checkpoint contains pending requests from agents and responses to be sent - assert "pending_agent_requests" in restore_checkpoint.shared_state - assert "pending_responses_to_agent" in restore_checkpoint.shared_state + assert "pending_agent_requests" in executor_state + assert "pending_responses_to_agent" in executor_state # Create a new agent and executor for restoration # This simulates starting from a fresh state and restoring from checkpoint From 9024888c5c6bef123bab11dfb79a49a21ea1e25e Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Fri, 7 Nov 2025 15:49:08 -0800 Subject: [PATCH 3/7] Address comments --- .../human-in-the-loop/agents_with_approval_requests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index d5b914455b..88b8437ffb 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -22,7 +22,7 @@ """ Sample: Agents in a workflow with AI functions requiring approval -This sample creates a workflow that automatically reply to incoming emails. +This sample creates a workflow that automatically replies to incoming emails. If historical email data is needed, it uses an AI function to read the data, which requires human approval before execution. From 2b1495310a94101d549c92706ca7c63839763ff2 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Fri, 7 Nov 2025 15:55:31 -0800 Subject: [PATCH 4/7] Fix tests --- .../human-in-the-loop/agents_with_approval_requests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index 88b8437ffb..a7cae0a1b4 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -3,7 +3,7 @@ import asyncio import json from dataclasses import dataclass -from typing import Annotated, Never +from typing import Annotated from agent_framework import ( AgentExecutorResponse, @@ -18,6 +18,7 @@ handler, ) from agent_framework.openai import OpenAIChatClient +from typing_extensions import Never """ Sample: Agents in a workflow with AI functions requiring approval From 4c5c133ce14d9205231aa12d982fea90c44f12b9 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Fri, 7 Nov 2025 16:04:43 -0800 Subject: [PATCH 5/7] Fix tests --- .../core/tests/workflow/test_agent_executor_tool_calls.py | 4 +++- .../human-in-the-loop/agents_with_approval_requests.py | 3 +-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py b/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py index b0729d45ab..a7849120b0 100644 --- a/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py +++ b/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py @@ -3,7 +3,9 @@ """Tests for AgentExecutor handling of tool calls and results in streaming mode.""" from collections.abc import AsyncIterable -from typing import Any, Never +from typing import Any + +from typing_extensions import Never from agent_framework import ( AgentExecutor, diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index a7cae0a1b4..88b8437ffb 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -3,7 +3,7 @@ import asyncio import json from dataclasses import dataclass -from typing import Annotated +from typing import Annotated, Never from agent_framework import ( AgentExecutorResponse, @@ -18,7 +18,6 @@ handler, ) from agent_framework.openai import OpenAIChatClient -from typing_extensions import Never """ Sample: Agents in a workflow with AI functions requiring approval From 6e1b28ace6194b157ecfe5e07053da1b856696e6 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 10 Nov 2025 16:36:34 -0800 Subject: [PATCH 6/7] Address comments --- .../agent_framework/_workflows/_agent_executor.py | 1 + python/samples/README.md | 1 + python/samples/getting_started/workflows/README.md | 1 + .../agents_with_approval_requests.py | 14 +++++++++++++- 4 files changed, 16 insertions(+), 1 deletion(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 6882bfe749..8fa85b7f84 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -275,6 +275,7 @@ async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse, if response is None: # Agent did not complete (e.g., waiting for user input); do not emit response + logger.info("AgentExecutor %s: Agent did not complete, awaiting user input", self.id) return if self._output_response: diff --git a/python/samples/README.md b/python/samples/README.md index f70a390892..f29a9fa6b6 100644 --- a/python/samples/README.md +++ b/python/samples/README.md @@ -281,6 +281,7 @@ This directory contains samples demonstrating the capabilities of Microsoft Agen | File | Description | |------|-------------| | [`getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py`](./getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py) | Sample: Human in the loop guessing game | +| [`getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py`](./getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py) | Sample: Agents with Approval Requests in Workflows | ### Observability diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 49cbb81e21..1def803373 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -78,6 +78,7 @@ Once comfortable with these, explore the rest of the samples below. |---|---|---| | Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human | | Azure Agents Tool Feedback Loop | [agents/azure_chat_agents_tool_calls_with_feedback.py](./agents/azure_chat_agents_tool_calls_with_feedback.py) | Two-agent workflow that streams tool calls and pauses for human guidance between passes | +| Agents with Approval Requests in Workflows | [human-in-the-loop/agents_with_approval_requests.py](./human-in-the-loop/agents_with_approval_requests.py) | Agents that create approval requests during workflow execution and wait for human approval to proceed | ### observability diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index 88b8437ffb..3a7bec9a1e 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -211,10 +211,12 @@ async def conclude_workflow( async def main() -> None: + # Create the agent and executors chat_client = OpenAIChatClient() email_writer = chat_client.create_agent( name="Email Writer", instructions=("You are an excellent email assistant. You respond to incoming emails."), + # tools with `approval_mode="always_require"` will trigger approval requests tools=[ read_historical_email_data, send_email, @@ -223,8 +225,9 @@ async def main() -> None: get_my_information, ], ) - email_preprocessor = EmailPreprocessor(special_email_addresses={"mike@contoso.com"}) + + # Build the workflow workflow = ( WorkflowBuilder() .set_start_executor(email_preprocessor) @@ -251,9 +254,11 @@ async def main() -> None: request_info_events = events.get_request_info_events() for request_info_event in request_info_events: + # We should only expect FunctionApprovalRequestContent in this sample if not isinstance(request_info_event.data, FunctionApprovalRequestContent): raise ValueError(f"Unexpected request info content type: {type(request_info_event.data)}") + # Pretty print the function call details arguments = json.dumps( json.loads(request_info_event.data.function_call.arguments) if isinstance(request_info_event.data.function_call.arguments, str) @@ -264,10 +269,17 @@ async def main() -> None: f"Received approval request for function: {request_info_event.data.function_call.name} " f"with args:\n{arguments}" ) + + # For demo purposes, we automatically approve the request + # The expected response type of the request is `FunctionApprovalResponseContent`, + # which can be created via `create_response` method on the request content print("Performing automatic approval for demo purposes...") responses[request_info_event.request_id] = request_info_event.data.create_response(approved=True) + # Once we get an output event, we can conclude the workflow + # Outputs can only be produced by the conclude_workflow_executor in this sample if outputs := events.get_outputs(): + # We expect only one output from the conclude_workflow_executor output = outputs[0] break From 7ba941ca585927255cbc8157e0a97ddb7deceaae Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Tue, 11 Nov 2025 09:32:55 -0800 Subject: [PATCH 7/7] Address comments --- .../human-in-the-loop/agents_with_approval_requests.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index 3a7bec9a1e..a51088e886 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -259,12 +259,7 @@ async def main() -> None: raise ValueError(f"Unexpected request info content type: {type(request_info_event.data)}") # Pretty print the function call details - arguments = json.dumps( - json.loads(request_info_event.data.function_call.arguments) - if isinstance(request_info_event.data.function_call.arguments, str) - else request_info_event.data.function_call.arguments, - indent=2, - ) + arguments = json.dumps(request_info_event.data.function_call.parse_arguments(), indent=2) print( f"Received approval request for function: {request_info_event.data.function_call.name} " f"with args:\n{arguments}"