From ecab316640352ff75c9b2bf6cb5bb2dceddd3fe6 Mon Sep 17 00:00:00 2001 From: Hameed Kunkanoor Date: Tue, 19 May 2026 16:00:46 +0530 Subject: [PATCH 1/2] Persist hosted MCP call/results as canonical mcp_call output --- .../_responses.py | 167 ++++++++++++--- .../packages/foundry_hosting/pyproject.toml | 2 +- .../foundry_hosting/tests/test_responses.py | 197 ++++++++++++++++++ 3 files changed, 335 insertions(+), 31 deletions(-) diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index c34c65538c..dc14ba9203 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -363,14 +363,13 @@ async def _handle_inner_agent( # Run the agent in non-streaming mode response = await self._agent.run(stream=False, **run_kwargs) # type: ignore[reportUnknownMemberType] - for message in response.messages: - for content in message.contents: - async for item in _to_outputs( - response_event_stream, - content, - approval_storage=self._approval_storage, - ): - yield item + response_contents = [content for message in response.messages for content in message.contents] + async for item in _to_outputs_for_contents( + response_event_stream, + response_contents, + approval_storage=self._approval_storage, + ): + yield item yield response_event_stream.emit_completed() return @@ -503,10 +502,9 @@ async def _handle_inner_workflow( checkpoint_storage=write_storage, ) - for message in response.messages: - for content in message.contents: - async for item in _to_outputs(response_event_stream, content): - yield item + response_contents = [content for message in response.messages for content in message.contents] + async for item in _to_outputs_for_contents(response_event_stream, response_contents): + yield item await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name) yield response_event_stream.emit_completed() @@ -610,7 +608,7 @@ def handle(self, content: Content) -> Generator[ResponseStreamEvent]: yield self._fc_builder.emit_arguments_delta(args_str) elif content.type == "mcp_server_tool_call" and content.tool_name: - key = f"{content.server_name or 'default'}::{content.tool_name}" + key = content.call_id or f"{content.server_name or 'default'}::{content.tool_name}" if self._active_type != "mcp_server_tool_call" or self._active_id != key: yield from self._close() yield from self._open_mcp_call(content) @@ -619,6 +617,24 @@ def handle(self, content: Content) -> Generator[ResponseStreamEvent]: if self._mcp_builder is not None: yield self._mcp_builder.emit_arguments_delta(args_str) + elif ( + content.type == "mcp_server_tool_result" + and self._active_type == "mcp_server_tool_call" + and self._mcp_builder is not None + and content.call_id is not None + and content.call_id == self._mcp_builder.item_id + ): + accumulated = "".join(self._accumulated) + yield self._mcp_builder.emit_arguments_done(accumulated) + yield self._mcp_builder.emit_completed() + yield self._mcp_builder.emit_done(output=_stringify_mcp_output(content.output)) + self._mcp_builder = None + self._active_type = None + self._active_id = None + self._accumulated.clear() + self.needs_async = False + return + else: yield from self._close() self.needs_async = True @@ -658,9 +674,10 @@ def _open_mcp_call(self, content: Content) -> Generator[ResponseStreamEvent]: self._mcp_builder = self._stream.add_output_item_mcp_call( server_label=content.server_name or "default", name=content.tool_name or "", + item_id=content.call_id, ) self._active_type = "mcp_server_tool_call" - self._active_id = f"{content.server_name or 'default'}::{content.tool_name}" + self._active_id = content.call_id or f"{content.server_name or 'default'}::{content.tool_name}" yield self._mcp_builder.emit_added() def _close(self) -> Generator[ResponseStreamEvent]: @@ -808,16 +825,19 @@ async def _item_to_message(item: Item, *, approval_storage: ApprovalStorage | No if item.type == "mcp_call": mcp = cast(ItemMcpToolCall, item) + contents = [ + Content.from_mcp_server_tool_call( + mcp.id, + mcp.name, + server_name=mcp.server_label, + arguments=mcp.arguments, + ) + ] + if getattr(mcp, "output", None) is not None: + contents.append(Content.from_mcp_server_tool_result(call_id=mcp.id, output=mcp.output)) return Message( role="assistant", - contents=[ - Content.from_mcp_server_tool_call( - mcp.id, - mcp.name, - server_name=mcp.server_label, - arguments=mcp.arguments, - ) - ], + contents=contents, ) if item.type == "mcp_approval_request": @@ -1078,16 +1098,19 @@ async def _output_item_to_message(item: OutputItem, *, approval_storage: Approva if item.type == "mcp_call": mcp = cast(OutputItemMcpToolCall, item) + contents = [ + Content.from_mcp_server_tool_call( + mcp.id, + mcp.name, + server_name=mcp.server_label, + arguments=mcp.arguments, + ) + ] + if getattr(mcp, "output", None) is not None: + contents.append(Content.from_mcp_server_tool_result(call_id=mcp.id, output=mcp.output)) return Message( role="assistant", - contents=[ - Content.from_mcp_server_tool_call( - mcp.id, - mcp.name, - server_name=mcp.server_label, - arguments=mcp.arguments, - ) - ], + contents=contents, ) if item.type == "mcp_approval_request": @@ -1455,6 +1478,7 @@ async def _to_outputs( mcp_call = stream.add_output_item_mcp_call( server_label=content.server_name or "default", name=content.tool_name or "", + item_id=content.call_id, ) yield mcp_call.emit_added() async for event in mcp_call.aarguments(_arguments_to_str(content.arguments)): @@ -1529,4 +1553,87 @@ async def _to_outputs( logger.warning(f"Content type '{content.type}' is not supported yet. This is usually safe to ignore.") +def _stringify_mcp_output(output: Any) -> str: + """Convert hosted MCP output payloads into the string shape expected by mcp_call.output.""" + if output is None: + return "" + if isinstance(output, str): + return output + if isinstance(output, Sequence) and not isinstance(output, (str, bytes, bytearray)): + parts: list[str] = [] + for entry in output: + if isinstance(entry, Content) and entry.type == "text": + parts.append(entry.text or "") + continue + if isinstance(entry, Mapping): + text = cast(Any, entry).get("text") + if isinstance(text, str): + parts.append(text) + continue + parts.append(json.dumps(entry, default=str)) + continue + parts.append(str(entry)) + return "".join(parts) + if isinstance(output, Mapping): + return json.dumps(output, default=str) + return str(output) + + +def _emit_completed_mcp_call( + stream: ResponseEventStream, + call_content: Content, + *, + arguments: str, + output: str, +) -> Generator[ResponseStreamEvent]: + """Emit a single completed MCP call item carrying both arguments and output.""" + mcp_call = stream.add_output_item_mcp_call( + server_label=call_content.server_name or "default", + name=call_content.tool_name or "", + item_id=call_content.call_id, + ) + yield mcp_call.emit_added() + yield mcp_call.emit_arguments_done(arguments) + yield mcp_call.emit_completed() + yield mcp_call.emit_done(output=output) + + +async def _to_outputs_for_contents( + stream: ResponseEventStream, + contents: Sequence[Content], + *, + approval_storage: ApprovalStorage | None = None, +) -> AsyncIterator[ResponseStreamEvent]: + """Convert a sequence of contents to output events with hosted-MCP call/result coalescing.""" + pending_mcp_call: Content | None = None + + for content in contents: + if pending_mcp_call is not None: + if content.type == "mcp_server_tool_result" and content.call_id == pending_mcp_call.call_id: + for event in _emit_completed_mcp_call( + stream, + pending_mcp_call, + arguments=_arguments_to_str(pending_mcp_call.arguments), + output=_stringify_mcp_output(content.output), + ): + yield event + pending_mcp_call = None + continue + + async for event in _to_outputs(stream, pending_mcp_call, approval_storage=approval_storage): + yield event + pending_mcp_call = None + + if content.type == "mcp_server_tool_call" and content.call_id: + pending_mcp_call = content + continue + + async for event in _to_outputs(stream, content, approval_storage=approval_storage): + yield event + + if pending_mcp_call is not None: + async for event in _to_outputs(stream, pending_mcp_call, approval_storage=approval_storage): + yield event + + # endregion diff --git a/python/packages/foundry_hosting/pyproject.toml b/python/packages/foundry_hosting/pyproject.toml index 35a218a6c1..15f6cda0e6 100644 --- a/python/packages/foundry_hosting/pyproject.toml +++ b/python/packages/foundry_hosting/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ dependencies = [ "agent-framework-core>=1.4.0,<2", "azure-ai-agentserver-core>=2.0.0b3,<3", - "azure-ai-agentserver-responses>=1.0.0b5,<2", + "azure-ai-agentserver-responses>=1.0.0b6,<2", "azure-ai-agentserver-invocations>=1.0.0b3,<2", ] diff --git a/python/packages/foundry_hosting/tests/test_responses.py b/python/packages/foundry_hosting/tests/test_responses.py index e4d545d6d7..35428a75ab 100644 --- a/python/packages/foundry_hosting/tests/test_responses.py +++ b/python/packages/foundry_hosting/tests/test_responses.py @@ -251,6 +251,50 @@ async def test_function_call_and_result(self) -> None: assert "function_call_output" in types assert "message" in types + async def test_hosted_mcp_call_and_result_persist_as_single_mcp_call(self) -> None: + agent = _make_agent( + response=AgentResponse( + messages=[ + Message( + role="assistant", + contents=[ + Content.from_mcp_server_tool_call( + call_id="mcp_abc123", + tool_name="search", + server_name="api_specs", + arguments='{"q": "cats"}', + ) + ], + ), + Message( + role="tool", + contents=[ + Content.from_mcp_server_tool_result( + call_id="mcp_abc123", + output=[Content.from_text(text="found 10 cats")], + ) + ], + ), + Message(role="assistant", contents=[Content.from_text("I found 10 cats!")]), + ] + ) + ) + server = _make_server(agent) + resp = await _post(server, stream=False) + + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "completed" + + types = [item["type"] for item in body["output"]] + assert "mcp_call" in types + assert "custom_tool_call_output" not in types + + mcp_items = [item for item in body["output"] if item["type"] == "mcp_call"] + assert len(mcp_items) == 1 + assert mcp_items[0]["id"] == "mcp_abc123" + assert mcp_items[0]["output"] == "found 10 cats" + async def test_reasoning_content(self) -> None: agent = _make_agent( response=AgentResponse( @@ -578,6 +622,53 @@ async def test_mcp_tool_call_streaming(self) -> None: assert "response.output_item.added" in types assert "response.output_item.done" in types + async def test_mcp_tool_call_and_result_streaming_emit_single_completed_mcp_call(self) -> None: + agent = _make_agent( + stream_updates=[ + AgentResponseUpdate( + contents=[ + Content.from_mcp_server_tool_call( + call_id="mcp_abc123", + tool_name="search", + server_name="api_specs", + arguments='{"q":', + ) + ], + role="assistant", + ), + AgentResponseUpdate( + contents=[ + Content.from_mcp_server_tool_call( + call_id="mcp_abc123", + tool_name="search", + server_name="api_specs", + arguments=' "cats"}', + ) + ], + role="assistant", + ), + AgentResponseUpdate( + contents=[ + Content.from_mcp_server_tool_result( + call_id="mcp_abc123", + output=[Content.from_text(text="found 10 cats")], + ) + ], + role="tool", + ), + ] + ) + server = _make_server(agent) + resp = await _post(server, stream=True) + + assert resp.status_code == 200 + events = _parse_sse_events(resp.text) + done_events = [e for e in events if e["event"] == "response.output_item.done"] + assert len(done_events) == 1 + assert done_events[0]["data"]["item"]["type"] == "mcp_call" + assert done_events[0]["data"]["item"]["id"] == "mcp_abc123" + assert done_events[0]["data"]["item"]["output"] == "found 10 cats" + # endregion @@ -681,6 +772,24 @@ async def test_mcp_call(self) -> None: assert msg.contents[0].server_name == "my_server" assert msg.contents[0].tool_name == "search" + async def test_mcp_call_with_output_reconstructs_mcp_result_content(self) -> None: + from azure.ai.agentserver.responses.models import OutputItemMcpToolCall + + item = OutputItemMcpToolCall({ + "type": "mcp_call", + "id": "mcp-1", + "server_label": "my_server", + "name": "search", + "arguments": '{"q": "test"}', + "output": "found 10 cats", + }) + msg = await _output_item_to_message(item) + assert msg.role == "assistant" + assert len(msg.contents) == 2 + assert msg.contents[0].type == "mcp_server_tool_call" + assert msg.contents[1].type == "mcp_server_tool_result" + assert msg.contents[1].output == "found 10 cats" + async def test_mcp_approval_request(self) -> None: from azure.ai.agentserver.responses.models import OutputItemMcpApprovalRequest @@ -1150,6 +1259,25 @@ async def test_mcp_call(self) -> None: assert msg.contents[0].server_name == "my_server" assert msg.contents[0].tool_name == "search" + async def test_mcp_call_with_output_reconstructs_mcp_result_content(self) -> None: + from azure.ai.agentserver.responses.models import ItemMcpToolCall + + item = ItemMcpToolCall({ + "type": "mcp_call", + "id": "mcp-1", + "server_label": "my_server", + "name": "search", + "arguments": '{"q": "test"}', + "output": "found 10 cats", + }) + msg = await _item_to_message(item) + assert msg is not None + assert msg.role == "assistant" + assert len(msg.contents) == 2 + assert msg.contents[0].type == "mcp_server_tool_call" + assert msg.contents[1].type == "mcp_server_tool_result" + assert msg.contents[1].output == "found 10 cats" + async def test_mcp_approval_request(self) -> None: from azure.ai.agentserver.responses.models import ItemMcpApprovalRequest @@ -1898,6 +2026,75 @@ async def test_multi_turn_function_call_in_history(self) -> None: assert len(fc_contents) >= 1 assert fc_contents[0].name == "search" + async def test_hosted_mcp_call_round_trip_does_not_orphan_function_call_output(self) -> None: + """Turn 1 produces hosted MCP call + result, turn 2 must replay both without orphaning output.""" + agent = _make_multi_response_agent([ + AgentResponse( + messages=[ + Message( + role="assistant", + contents=[ + Content.from_mcp_server_tool_call( + call_id="mcp_abc123", + tool_name="search", + server_name="api_specs", + arguments='{"q": "cats"}', + ) + ], + ), + Message( + role="tool", + contents=[ + Content.from_mcp_server_tool_result( + call_id="mcp_abc123", + output=[Content.from_text(text="found 10 cats")], + ) + ], + ), + Message(role="assistant", contents=[Content.from_text("I found 10 cats!")]), + ] + ), + AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Here are more details")])]), + ]) + server = _make_server(agent) + + resp1 = await _post(server, input_text="Search for cats", stream=False) + assert resp1.status_code == 200 + response_id = resp1.json()["id"] + + types1 = [item["type"] for item in resp1.json()["output"]] + assert "mcp_call" in types1 + assert "custom_tool_call_output" not in types1 + + resp2 = await _post_json( + server, + { + "model": "test-model", + "input": "Tell me more", + "stream": False, + "previous_response_id": response_id, + }, + ) + assert resp2.status_code == 200 + assert resp2.json()["status"] == "completed" + + second_call_messages = agent.run.call_args_list[1].kwargs["messages"] + mcp_call_contents = [ + c for m in second_call_messages for c in m.contents if c.type == "mcp_server_tool_call" + ] + mcp_result_contents = [ + c for m in second_call_messages for c in m.contents if c.type == "mcp_server_tool_result" + ] + function_result_contents = [ + c for m in second_call_messages for c in m.contents if c.type == "function_result" + ] + + assert len(mcp_call_contents) >= 1 + assert len(mcp_result_contents) >= 1 + assert all((c.call_id or "") != "mcp_abc123" for c in function_result_contents) + assert any((c.call_id or "") == "mcp_abc123" for c in mcp_call_contents) + assert any((c.call_id or "") == "mcp_abc123" for c in mcp_result_contents) + async def test_multi_turn_reasoning_in_history(self) -> None: """Turn 1 produces reasoning + text, turn 2 sees them in history.""" agent = _make_multi_response_agent([ From aa4cd2ceeaaa7191f8f6e5e77256d80aaed87417 Mon Sep 17 00:00:00 2001 From: Hameed Kunkanoor <41198503+Hameedkunkanoor@users.noreply.github.com> Date: Tue, 19 May 2026 17:49:32 +0530 Subject: [PATCH 2/2] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../_responses.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index dc14ba9203..7f07d293e4 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -1559,23 +1559,19 @@ def _stringify_mcp_output(output: Any) -> str: return "" if isinstance(output, str): return output + if isinstance(output, Mapping): + text = cast(Any, output).get("text") + if isinstance(text, str): + return text + return json.dumps(output, default=str) if isinstance(output, Sequence) and not isinstance(output, (str, bytes, bytearray)): parts: list[str] = [] for entry in output: if isinstance(entry, Content) and entry.type == "text": parts.append(entry.text or "") continue - if isinstance(entry, Mapping): - text = cast(Any, entry).get("text") - if isinstance(text, str): - parts.append(text) - continue - parts.append(json.dumps(entry, default=str)) - continue - parts.append(str(entry)) + parts.append(_stringify_mcp_output(entry)) return "".join(parts) - if isinstance(output, Mapping): - return json.dumps(output, default=str) return str(output)