Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,13 @@ async def _handle_inner_agent(
# Run the agent in non-streaming mode
response = await self._agent.run(stream=False, **run_kwargs) # type: ignore[reportUnknownMemberType]

for message in response.messages:
for content in message.contents:
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
response_contents = [content for message in response.messages for content in message.contents]
async for item in _to_outputs_for_contents(
response_event_stream,
response_contents,
approval_storage=self._approval_storage,
):
yield item

yield response_event_stream.emit_completed()
return
Expand Down Expand Up @@ -503,10 +502,9 @@ async def _handle_inner_workflow(
checkpoint_storage=write_storage,
)

for message in response.messages:
for content in message.contents:
async for item in _to_outputs(response_event_stream, content):
yield item
response_contents = [content for message in response.messages for content in message.contents]
async for item in _to_outputs_for_contents(response_event_stream, response_contents):
yield item

await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name)
yield response_event_stream.emit_completed()
Expand Down Expand Up @@ -610,7 +608,7 @@ def handle(self, content: Content) -> Generator[ResponseStreamEvent]:
yield self._fc_builder.emit_arguments_delta(args_str)

elif content.type == "mcp_server_tool_call" and content.tool_name:
key = f"{content.server_name or 'default'}::{content.tool_name}"
key = content.call_id or f"{content.server_name or 'default'}::{content.tool_name}"
if self._active_type != "mcp_server_tool_call" or self._active_id != key:
yield from self._close()
yield from self._open_mcp_call(content)
Expand All @@ -619,6 +617,24 @@ def handle(self, content: Content) -> Generator[ResponseStreamEvent]:
if self._mcp_builder is not None:
yield self._mcp_builder.emit_arguments_delta(args_str)

elif (
content.type == "mcp_server_tool_result"
and self._active_type == "mcp_server_tool_call"
and self._mcp_builder is not None
and content.call_id is not None
and content.call_id == self._mcp_builder.item_id
):
accumulated = "".join(self._accumulated)
yield self._mcp_builder.emit_arguments_done(accumulated)
yield self._mcp_builder.emit_completed()
yield self._mcp_builder.emit_done(output=_stringify_mcp_output(content.output))
self._mcp_builder = None
self._active_type = None
self._active_id = None
self._accumulated.clear()
self.needs_async = False
return

Comment on lines +624 to +637
else:
yield from self._close()
self.needs_async = True
Expand Down Expand Up @@ -658,9 +674,10 @@ def _open_mcp_call(self, content: Content) -> Generator[ResponseStreamEvent]:
self._mcp_builder = self._stream.add_output_item_mcp_call(
server_label=content.server_name or "default",
name=content.tool_name or "",
item_id=content.call_id,
)
self._active_type = "mcp_server_tool_call"
self._active_id = f"{content.server_name or 'default'}::{content.tool_name}"
self._active_id = content.call_id or f"{content.server_name or 'default'}::{content.tool_name}"
yield self._mcp_builder.emit_added()

def _close(self) -> Generator[ResponseStreamEvent]:
Expand Down Expand Up @@ -808,16 +825,19 @@ async def _item_to_message(item: Item, *, approval_storage: ApprovalStorage | No

if item.type == "mcp_call":
mcp = cast(ItemMcpToolCall, item)
contents = [
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
]
if getattr(mcp, "output", None) is not None:
contents.append(Content.from_mcp_server_tool_result(call_id=mcp.id, output=mcp.output))
return Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
],
contents=contents,
)

if item.type == "mcp_approval_request":
Expand Down Expand Up @@ -1078,16 +1098,19 @@ async def _output_item_to_message(item: OutputItem, *, approval_storage: Approva

if item.type == "mcp_call":
mcp = cast(OutputItemMcpToolCall, item)
contents = [
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
]
if getattr(mcp, "output", None) is not None:
contents.append(Content.from_mcp_server_tool_result(call_id=mcp.id, output=mcp.output))
return Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
],
contents=contents,
)

if item.type == "mcp_approval_request":
Expand Down Expand Up @@ -1455,6 +1478,7 @@ async def _to_outputs(
mcp_call = stream.add_output_item_mcp_call(
server_label=content.server_name or "default",
name=content.tool_name or "",
item_id=content.call_id,
)
yield mcp_call.emit_added()
async for event in mcp_call.aarguments(_arguments_to_str(content.arguments)):
Expand Down Expand Up @@ -1529,4 +1553,83 @@ async def _to_outputs(
logger.warning(f"Content type '{content.type}' is not supported yet. This is usually safe to ignore.")


def _stringify_mcp_output(output: Any) -> str:
"""Convert hosted MCP output payloads into the string shape expected by mcp_call.output."""
if output is None:
return ""
if isinstance(output, str):
return output
if isinstance(output, Mapping):
text = cast(Any, output).get("text")
if isinstance(text, str):
return text
return json.dumps(output, default=str)
if isinstance(output, Sequence) and not isinstance(output, (str, bytes, bytearray)):
parts: list[str] = []
for entry in output:
if isinstance(entry, Content) and entry.type == "text":
parts.append(entry.text or "")
continue
parts.append(_stringify_mcp_output(entry))
return "".join(parts)
return str(output)


def _emit_completed_mcp_call(
stream: ResponseEventStream,
call_content: Content,
*,
arguments: str,
output: str,
) -> Generator[ResponseStreamEvent]:
"""Emit a single completed MCP call item carrying both arguments and output."""
mcp_call = stream.add_output_item_mcp_call(
server_label=call_content.server_name or "default",
name=call_content.tool_name or "",
item_id=call_content.call_id,
)
yield mcp_call.emit_added()
yield mcp_call.emit_arguments_done(arguments)
yield mcp_call.emit_completed()
yield mcp_call.emit_done(output=output)


async def _to_outputs_for_contents(
stream: ResponseEventStream,
contents: Sequence[Content],
*,
approval_storage: ApprovalStorage | None = None,
) -> AsyncIterator[ResponseStreamEvent]:
"""Convert a sequence of contents to output events with hosted-MCP call/result coalescing."""
pending_mcp_call: Content | None = None

for content in contents:
if pending_mcp_call is not None:
if content.type == "mcp_server_tool_result" and content.call_id == pending_mcp_call.call_id:
for event in _emit_completed_mcp_call(
Comment on lines +1607 to +1609
stream,
pending_mcp_call,
arguments=_arguments_to_str(pending_mcp_call.arguments),
output=_stringify_mcp_output(content.output),
):
yield event
pending_mcp_call = None
continue

async for event in _to_outputs(stream, pending_mcp_call, approval_storage=approval_storage):
yield event
pending_mcp_call = None

if content.type == "mcp_server_tool_call" and content.call_id:
pending_mcp_call = content
continue

async for event in _to_outputs(stream, content, approval_storage=approval_storage):
yield event

if pending_mcp_call is not None:
async for event in _to_outputs(stream, pending_mcp_call, approval_storage=approval_storage):
yield event


# endregion
2 changes: 1 addition & 1 deletion python/packages/foundry_hosting/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
dependencies = [
"agent-framework-core>=1.4.0,<2",
"azure-ai-agentserver-core>=2.0.0b3,<3",
"azure-ai-agentserver-responses>=1.0.0b5,<2",
"azure-ai-agentserver-responses>=1.0.0b6,<2",
"azure-ai-agentserver-invocations>=1.0.0b3,<2",
]

Expand Down
Loading
Loading