From fccb45fa3bfe2e87bc11959a4084ecce1e6fa303 Mon Sep 17 00:00:00 2001 From: Aditya Singh Date: Wed, 13 May 2026 20:47:36 -0700 Subject: [PATCH] fix(realtime): coalesce response.create across parallel tool calls When the Realtime model emits multiple function_call items in a single response, each completing tool task previously fired its own RealtimeModelSendToolOutput(start_response=True). The two response.create events race the API and the second one is rejected with conversation_already_has_active_response, so the model never speaks for the rest of the turn. Track tool calls per response_id and drive a single response.create from the last completing call (or directly from turn_ended if all tools finish before the response is done). Refs #1168 --- src/agents/realtime/model_events.py | 6 + src/agents/realtime/openai_realtime.py | 13 +- src/agents/realtime/session.py | 87 ++++++++++- tests/realtime/test_openai_realtime.py | 3 + tests/realtime/test_session.py | 196 +++++++++++++++++++++++++ 5 files changed, 300 insertions(+), 5 deletions(-) diff --git a/src/agents/realtime/model_events.py b/src/agents/realtime/model_events.py index 7715f98c12..bd8bb99c12 100644 --- a/src/agents/realtime/model_events.py +++ b/src/agents/realtime/model_events.py @@ -28,6 +28,9 @@ class RealtimeModelToolCallEvent: id: str | None = None previous_item_id: str | None = None + response_id: str | None = None + """The ID of the model response that produced this tool call.""" + type: Literal["function_call"] = "function_call" @@ -143,6 +146,9 @@ class RealtimeModelTurnStartedEvent: class RealtimeModelTurnEndedEvent: """Triggered when the model finishes generating a response for a turn.""" + response_id: str | None = None + """The ID of the model response that just finished, if available.""" + type: Literal["turn_ended"] = "turn_ended" diff --git a/src/agents/realtime/openai_realtime.py b/src/agents/realtime/openai_realtime.py index 6b986c6edc..df88646d25 100644 --- a/src/agents/realtime/openai_realtime.py +++ b/src/agents/realtime/openai_realtime.py @@ -926,7 +926,9 @@ async def _handle_audio_delta(self, parsed: ResponseAudioDeltaEvent) -> None: ) ) - async def _handle_output_item(self, item: ConversationItem) -> None: + async def _handle_output_item( + self, item: ConversationItem, response_id: str | None = None + ) -> None: """Handle response output item events (function calls and messages).""" if item.type == "function_call" and item.status == "completed": tool_call = RealtimeToolCallItem( @@ -948,6 +950,7 @@ async def _handle_output_item(self, item: ConversationItem) -> None: name=item.name or "", arguments=item.arguments or "", id=item.id or "", + response_id=response_id, ) ) elif item.type == "message": @@ -1149,7 +1152,9 @@ async def _handle_ws_event(self, event: dict[str, Any]): await self._emit_event(RealtimeModelTurnStartedEvent()) elif parsed.type == "response.done": await self._mark_response_done() - await self._emit_event(RealtimeModelTurnEndedEvent()) + await self._emit_event( + RealtimeModelTurnEndedEvent(response_id=getattr(parsed.response, "id", None)) + ) elif parsed.type == "session.created": await self._send_tracing_config(self._tracing_config) self._update_created_session(parsed.session) @@ -1209,7 +1214,9 @@ async def _handle_ws_event(self, event: dict[str, Any]): parsed.type == "response.output_item.added" or parsed.type == "response.output_item.done" ): - await self._handle_output_item(parsed.item) + await self._handle_output_item( + parsed.item, response_id=getattr(parsed, "response_id", None) + ) elif parsed.type == "input_audio_buffer.timeout_triggered": await self._emit_event( RealtimeModelInputAudioTimeoutTriggeredEvent( diff --git a/src/agents/realtime/session.py b/src/agents/realtime/session.py index f424b5b9d5..1cb14522ca 100644 --- a/src/agents/realtime/session.py +++ b/src/agents/realtime/session.py @@ -66,6 +66,7 @@ from .model_inputs import ( RealtimeModelSendAudio, RealtimeModelSendInterrupt, + RealtimeModelSendRawMessage, RealtimeModelSendSessionUpdate, RealtimeModelSendToolOutput, RealtimeModelSendUserInput, @@ -176,6 +177,14 @@ def __init__( self._tool_call_tasks: set[asyncio.Task[Any]] = set() self._async_tool_calls: bool = bool(self._run_config.get("async_tool_calls", True)) + # Coalesce response.create across parallel tool calls emitted in the same model response. + # Each entry tracks call_ids still pending and whether the model has signalled the end + # of the turn (response.done). Only the last completing tool call for a finished turn + # requests a new response so the OpenAI Realtime API never sees overlapping + # response.create events that would error with conversation_already_has_active_response. + self._tool_calls_by_response: dict[str, set[str]] = {} + self._turn_ended_responses: set[str] = set() + @property def model(self) -> RealtimeModel: """Access the underlying model for adding listeners or other direct interaction.""" @@ -278,6 +287,7 @@ async def on_event(self, event: RealtimeModelEvent) -> None: await self._put_event(RealtimeError(info=self._event_info, error=event.error)) elif event.type == "function_call": agent_snapshot = self._current_agent + self._register_pending_tool_call(event) if self._async_tool_calls: self._enqueue_tool_call_task(event, agent_snapshot) else: @@ -429,6 +439,8 @@ async def on_event(self, event: RealtimeModelEvent) -> None: self._item_transcripts.clear() self._item_guardrail_run_counts.clear() + await self._mark_turn_ended(event.response_id) + await self._put_event( RealtimeAgentEndEvent( agent=self._current_agent, @@ -556,11 +568,12 @@ async def _send_tool_rejection( tool=tool, call_id=event.call_id, ) + start_response = self._consume_pending_tool_call(event) await self._model.send_event( RealtimeModelSendToolOutput( tool_call=event, output=rejection_message, - start_response=True, + start_response=start_response, ) ) @@ -700,11 +713,12 @@ async def _handle_tool_call( arguments=event.arguments, ) + start_response = self._consume_pending_tool_call(event) await self._model.send_event( RealtimeModelSendToolOutput( tool_call=event, output=_serialize_tool_output(result), - start_response=True, + start_response=start_response, ) ) @@ -763,6 +777,7 @@ async def _handle_tool_call( # Then send tool output to complete the handoff (this triggers a new response) transfer_message = handoff.get_transfer_message(result) + self._consume_pending_tool_call(event) await self._model.send_event( RealtimeModelSendToolOutput( tool_call=event, @@ -772,6 +787,7 @@ async def _handle_tool_call( ) else: error_message = f"Tool {event.name} not found" + self._consume_pending_tool_call(event) await self._model.send_event( RealtimeModelSendToolOutput( tool_call=event, @@ -1065,6 +1081,71 @@ def _enqueue_tool_call_task( self._tool_call_tasks.add(task) task.add_done_callback(self._on_tool_call_task_done) + def _register_pending_tool_call(self, event: RealtimeModelToolCallEvent) -> None: + """Record a tool call so we can coalesce response.create per model response.""" + response_id = event.response_id + if not response_id: + return + pending = self._tool_calls_by_response.setdefault(response_id, set()) + pending.add(event.call_id) + + def _consume_pending_tool_call(self, event: RealtimeModelToolCallEvent) -> bool: + """Mark a tool call as complete and return whether it should trigger response.create. + + Returns True if this is the last pending tool call for a turn that has already ended, + meaning the caller should request a new response. Otherwise returns False so the + Realtime API never receives a response.create while another response is still active + or while other tool outputs for the same turn are still in flight. + """ + response_id = event.response_id + if not response_id: + # Without a response id we can't coalesce, so fall back to the historical behavior + # of always requesting a response after every tool output. + return True + + pending = self._tool_calls_by_response.get(response_id) + if pending is None: + # The function_call event arrived without going through on_event, or the bookkeeping + # has already been cleared. Be conservative and request a response. + return True + + pending.discard(event.call_id) + if pending: + # Other tool outputs for the same turn are still in flight. + return False + + if response_id not in self._turn_ended_responses: + # The model hasn't finished emitting this turn yet, so more tool calls may still + # arrive. Wait for turn_ended to drive response.create. + return False + + self._tool_calls_by_response.pop(response_id, None) + self._turn_ended_responses.discard(response_id) + return True + + async def _mark_turn_ended(self, response_id: str | None) -> None: + """Record that the model finished a turn and trigger response.create if needed.""" + if not response_id: + return + pending = self._tool_calls_by_response.get(response_id) + if pending is None: + # No tool calls were tracked for this response, nothing to coalesce. + return + if pending: + # Tool outputs are still in flight; the last one to finish will request the response. + self._turn_ended_responses.add(response_id) + return + + # All tool outputs have already been sent with start_response=False, so we have to + # request a response ourselves now that the turn is complete. + self._tool_calls_by_response.pop(response_id, None) + self._turn_ended_responses.discard(response_id) + await self._model.send_event( + RealtimeModelSendRawMessage( + message={"type": "response.create"}, + ) + ) + def _on_tool_call_task_done(self, task: asyncio.Task[Any]) -> None: self._tool_call_tasks.discard(task) @@ -1117,6 +1198,8 @@ async def _cleanup(self) -> None: # Clear pending approval tracking self._pending_tool_calls.clear() + self._tool_calls_by_response.clear() + self._turn_ended_responses.clear() # Mark as closed self._closed = True diff --git a/tests/realtime/test_openai_realtime.py b/tests/realtime/test_openai_realtime.py index 4ebc2aa9a3..86258be573 100644 --- a/tests/realtime/test_openai_realtime.py +++ b/tests/realtime/test_openai_realtime.py @@ -1692,6 +1692,9 @@ async def test_handle_tool_call_event_success(self, model): assert tool_call_emitted.name == "get_weather" assert tool_call_emitted.arguments == '{"location": "San Francisco"}' assert tool_call_emitted.call_id == "call_123" + # response_id must be propagated so RealtimeSession can coalesce parallel tool + # calls per model response and avoid overlapping response.create events. + assert tool_call_emitted.response_id == "resp_789" @pytest.mark.asyncio async def test_audio_timing_calculation_accuracy(self, model): diff --git a/tests/realtime/test_session.py b/tests/realtime/test_session.py index 67cf717aa5..f5d8fdda34 100644 --- a/tests/realtime/test_session.py +++ b/tests/realtime/test_session.py @@ -56,6 +56,7 @@ from agents.realtime.model_inputs import ( RealtimeModelSendAudio, RealtimeModelSendInterrupt, + RealtimeModelSendRawMessage, RealtimeModelSendSessionUpdate, RealtimeModelSendUserInput, ) @@ -1850,6 +1851,201 @@ async def test_mixed_tool_types_filtering(self, mock_model, mock_agent): assert sent_output == "result2" +class TestParallelToolCallCoalescing: + """Regression tests for #1168: parallel tool calls must not race response.create.""" + + async def _wait_for_tool_call_tasks(self, session: RealtimeSession) -> None: + if not session._tool_call_tasks: + return + await asyncio.gather(*session._tool_call_tasks, return_exceptions=True) + + @pytest.mark.asyncio + async def test_parallel_tool_calls_emit_single_response_create(self, mock_model, mock_agent): + """When the model emits two tool calls in one response, only the last + completing tool call should request a new response. This avoids overlapping + response.create events that the Realtime API rejects with + conversation_already_has_active_response. + """ + + tool_one = _set_default_timeout_fields(Mock(spec=FunctionTool)) + tool_one.name = "tool_one" + tool_one.on_invoke_tool = AsyncMock(return_value="result_one") + tool_one.needs_approval = False + + tool_two = _set_default_timeout_fields(Mock(spec=FunctionTool)) + tool_two.name = "tool_two" + tool_two.on_invoke_tool = AsyncMock(return_value="result_two") + tool_two.needs_approval = False + + mock_agent.get_all_tools.return_value = [tool_one, tool_two] + + session = RealtimeSession(mock_model, mock_agent, None) + + first_call = RealtimeModelToolCallEvent( + name="tool_one", + call_id="call_one", + arguments="{}", + response_id="resp_abc", + ) + second_call = RealtimeModelToolCallEvent( + name="tool_two", + call_id="call_two", + arguments="{}", + response_id="resp_abc", + ) + + await session.on_event(first_call) + await session.on_event(second_call) + await session.on_event(RealtimeModelTurnEndedEvent(response_id="resp_abc")) + + await self._wait_for_tool_call_tasks(session) + + assert len(mock_model.sent_tool_outputs) == 2 + start_response_flags = [ + start_response for _, _, start_response in mock_model.sent_tool_outputs + ] + raw_response_create_count = sum( + 1 + for event in mock_model.sent_events + if isinstance(event, RealtimeModelSendRawMessage) + and event.message.get("type") == "response.create" + ) + # Exactly one response.create should reach the model. It can either ride along + # with the last tool output (start_response=True) or be sent as an explicit + # response.create once both tools are done and the turn has ended. + assert start_response_flags.count(True) + raw_response_create_count == 1 + + # No leftover bookkeeping after both tools complete. + assert session._tool_calls_by_response == {} + assert session._turn_ended_responses == set() + + @pytest.mark.asyncio + async def test_turn_ended_after_tools_complete_triggers_response_create( + self, mock_model, mock_agent + ): + """When all parallel tool calls finish before turn_ended arrives, the session + must explicitly request a response so the model actually speaks. + """ + + tool_one = _set_default_timeout_fields(Mock(spec=FunctionTool)) + tool_one.name = "tool_one" + tool_one.on_invoke_tool = AsyncMock(return_value="result_one") + tool_one.needs_approval = False + + tool_two = _set_default_timeout_fields(Mock(spec=FunctionTool)) + tool_two.name = "tool_two" + tool_two.on_invoke_tool = AsyncMock(return_value="result_two") + tool_two.needs_approval = False + + mock_agent.get_all_tools.return_value = [tool_one, tool_two] + + session = RealtimeSession( + mock_model, mock_agent, None, run_config={"async_tool_calls": False} + ) + + first_call = RealtimeModelToolCallEvent( + name="tool_one", + call_id="call_one", + arguments="{}", + response_id="resp_def", + ) + second_call = RealtimeModelToolCallEvent( + name="tool_two", + call_id="call_two", + arguments="{}", + response_id="resp_def", + ) + + await session.on_event(first_call) + await session.on_event(second_call) + + assert len(mock_model.sent_tool_outputs) == 2 + # Neither tool output should have requested a response yet because the + # session has not seen turn_ended for this response_id. + assert all(start_response is False for _, _, start_response in mock_model.sent_tool_outputs) + + raw_messages_before = [ + event + for event in mock_model.sent_events + if isinstance(event, RealtimeModelSendRawMessage) + ] + assert raw_messages_before == [] + + await session.on_event(RealtimeModelTurnEndedEvent(response_id="resp_def")) + + raw_messages_after = [ + event + for event in mock_model.sent_events + if isinstance(event, RealtimeModelSendRawMessage) + ] + assert len(raw_messages_after) == 1 + assert raw_messages_after[0].message["type"] == "response.create" + + assert session._tool_calls_by_response == {} + assert session._turn_ended_responses == set() + + @pytest.mark.asyncio + async def test_single_tool_call_still_requests_response( + self, mock_model, mock_agent, mock_function_tool + ): + """A single tool call must still request a response so the model continues. + + Whether the response.create is bundled into the tool output event + (start_response=True) or sent as a separate raw message after turn_ended is an + implementation detail; the contract is that exactly one response.create reaches + the Realtime API per turn that contained tool calls. + """ + mock_agent.get_all_tools.return_value = [mock_function_tool] + + session = RealtimeSession( + mock_model, mock_agent, None, run_config={"async_tool_calls": False} + ) + + tool_call = RealtimeModelToolCallEvent( + name="test_function", + call_id="call_solo", + arguments="{}", + response_id="resp_solo", + ) + + await session.on_event(tool_call) + await session.on_event(RealtimeModelTurnEndedEvent(response_id="resp_solo")) + + assert len(mock_model.sent_tool_outputs) == 1 + _, _, start_response = mock_model.sent_tool_outputs[0] + + raw_response_create_count = sum( + 1 + for event in mock_model.sent_events + if isinstance(event, RealtimeModelSendRawMessage) + and event.message.get("type") == "response.create" + ) + response_create_requests = (1 if start_response else 0) + raw_response_create_count + assert response_create_requests == 1 + + @pytest.mark.asyncio + async def test_tool_call_without_response_id_falls_back_to_immediate_response( + self, mock_model, mock_agent, mock_function_tool + ): + """Tool call events that omit response_id must keep the legacy behavior of + requesting a response immediately after each tool output.""" + mock_agent.get_all_tools.return_value = [mock_function_tool] + + session = RealtimeSession( + mock_model, mock_agent, None, run_config={"async_tool_calls": False} + ) + + tool_call = RealtimeModelToolCallEvent( + name="test_function", call_id="call_legacy", arguments="{}" + ) + + await session.on_event(tool_call) + + assert len(mock_model.sent_tool_outputs) == 1 + _, _, start_response = mock_model.sent_tool_outputs[0] + assert start_response is True + + class TestGuardrailFunctionality: """Test suite for output guardrail functionality in RealtimeSession"""