Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/agents/realtime/model_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class RealtimeModelToolCallEvent:
id: str | None = None
previous_item_id: str | None = None

response_id: str | None = None
"""The ID of the model response that produced this tool call."""

type: Literal["function_call"] = "function_call"


Expand Down Expand Up @@ -143,6 +146,9 @@ class RealtimeModelTurnStartedEvent:
class RealtimeModelTurnEndedEvent:
"""Triggered when the model finishes generating a response for a turn."""

response_id: str | None = None
"""The ID of the model response that just finished, if available."""

type: Literal["turn_ended"] = "turn_ended"


Expand Down
13 changes: 10 additions & 3 deletions src/agents/realtime/openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,9 @@ async def _handle_audio_delta(self, parsed: ResponseAudioDeltaEvent) -> None:
)
)

async def _handle_output_item(self, item: ConversationItem) -> None:
async def _handle_output_item(
self, item: ConversationItem, response_id: str | None = None
) -> None:
"""Handle response output item events (function calls and messages)."""
if item.type == "function_call" and item.status == "completed":
tool_call = RealtimeToolCallItem(
Expand All @@ -948,6 +950,7 @@ async def _handle_output_item(self, item: ConversationItem) -> None:
name=item.name or "",
arguments=item.arguments or "",
id=item.id or "",
response_id=response_id,
)
)
elif item.type == "message":
Expand Down Expand Up @@ -1149,7 +1152,9 @@ async def _handle_ws_event(self, event: dict[str, Any]):
await self._emit_event(RealtimeModelTurnStartedEvent())
elif parsed.type == "response.done":
await self._mark_response_done()
await self._emit_event(RealtimeModelTurnEndedEvent())
await self._emit_event(
RealtimeModelTurnEndedEvent(response_id=getattr(parsed.response, "id", None))
)
elif parsed.type == "session.created":
await self._send_tracing_config(self._tracing_config)
self._update_created_session(parsed.session)
Expand Down Expand Up @@ -1209,7 +1214,9 @@ async def _handle_ws_event(self, event: dict[str, Any]):
parsed.type == "response.output_item.added"
or parsed.type == "response.output_item.done"
):
await self._handle_output_item(parsed.item)
await self._handle_output_item(
parsed.item, response_id=getattr(parsed, "response_id", None)
)
elif parsed.type == "input_audio_buffer.timeout_triggered":
await self._emit_event(
RealtimeModelInputAudioTimeoutTriggeredEvent(
Expand Down
87 changes: 85 additions & 2 deletions src/agents/realtime/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from .model_inputs import (
RealtimeModelSendAudio,
RealtimeModelSendInterrupt,
RealtimeModelSendRawMessage,
RealtimeModelSendSessionUpdate,
RealtimeModelSendToolOutput,
RealtimeModelSendUserInput,
Expand Down Expand Up @@ -176,6 +177,14 @@ def __init__(
self._tool_call_tasks: set[asyncio.Task[Any]] = set()
self._async_tool_calls: bool = bool(self._run_config.get("async_tool_calls", True))

# Coalesce response.create across parallel tool calls emitted in the same model response.
# Each entry tracks call_ids still pending and whether the model has signalled the end
# of the turn (response.done). Only the last completing tool call for a finished turn
# requests a new response so the OpenAI Realtime API never sees overlapping
# response.create events that would error with conversation_already_has_active_response.
self._tool_calls_by_response: dict[str, set[str]] = {}
self._turn_ended_responses: set[str] = set()

@property
def model(self) -> RealtimeModel:
"""Access the underlying model for adding listeners or other direct interaction."""
Expand Down Expand Up @@ -278,6 +287,7 @@ async def on_event(self, event: RealtimeModelEvent) -> None:
await self._put_event(RealtimeError(info=self._event_info, error=event.error))
elif event.type == "function_call":
agent_snapshot = self._current_agent
self._register_pending_tool_call(event)
if self._async_tool_calls:
self._enqueue_tool_call_task(event, agent_snapshot)
else:
Expand Down Expand Up @@ -429,6 +439,8 @@ async def on_event(self, event: RealtimeModelEvent) -> None:
self._item_transcripts.clear()
self._item_guardrail_run_counts.clear()

await self._mark_turn_ended(event.response_id)

await self._put_event(
RealtimeAgentEndEvent(
agent=self._current_agent,
Expand Down Expand Up @@ -556,11 +568,12 @@ async def _send_tool_rejection(
tool=tool,
call_id=event.call_id,
)
start_response = self._consume_pending_tool_call(event)
await self._model.send_event(
RealtimeModelSendToolOutput(
tool_call=event,
output=rejection_message,
start_response=True,
start_response=start_response,
)
)

Expand Down Expand Up @@ -700,11 +713,12 @@ async def _handle_tool_call(
arguments=event.arguments,
)

start_response = self._consume_pending_tool_call(event)
await self._model.send_event(
RealtimeModelSendToolOutput(
tool_call=event,
output=_serialize_tool_output(result),
start_response=True,
start_response=start_response,
)
)

Expand Down Expand Up @@ -763,6 +777,7 @@ async def _handle_tool_call(

# Then send tool output to complete the handoff (this triggers a new response)
transfer_message = handoff.get_transfer_message(result)
self._consume_pending_tool_call(event)
await self._model.send_event(
RealtimeModelSendToolOutput(
tool_call=event,
Expand All @@ -772,6 +787,7 @@ async def _handle_tool_call(
)
else:
error_message = f"Tool {event.name} not found"
self._consume_pending_tool_call(event)
await self._model.send_event(
RealtimeModelSendToolOutput(
tool_call=event,
Expand Down Expand Up @@ -1065,6 +1081,71 @@ def _enqueue_tool_call_task(
self._tool_call_tasks.add(task)
task.add_done_callback(self._on_tool_call_task_done)

def _register_pending_tool_call(self, event: RealtimeModelToolCallEvent) -> None:
"""Record a tool call so we can coalesce response.create per model response."""
response_id = event.response_id
if not response_id:
return
pending = self._tool_calls_by_response.setdefault(response_id, set())
pending.add(event.call_id)

def _consume_pending_tool_call(self, event: RealtimeModelToolCallEvent) -> bool:
"""Mark a tool call as complete and return whether it should trigger response.create.

Returns True if this is the last pending tool call for a turn that has already ended,
meaning the caller should request a new response. Otherwise returns False so the
Realtime API never receives a response.create while another response is still active
or while other tool outputs for the same turn are still in flight.
"""
response_id = event.response_id
if not response_id:
# Without a response id we can't coalesce, so fall back to the historical behavior
# of always requesting a response after every tool output.
return True

pending = self._tool_calls_by_response.get(response_id)
if pending is None:
# The function_call event arrived without going through on_event, or the bookkeeping
# has already been cleared. Be conservative and request a response.
return True

pending.discard(event.call_id)
if pending:
# Other tool outputs for the same turn are still in flight.
return False

if response_id not in self._turn_ended_responses:
# The model hasn't finished emitting this turn yet, so more tool calls may still
# arrive. Wait for turn_ended to drive response.create.
return False

self._tool_calls_by_response.pop(response_id, None)
self._turn_ended_responses.discard(response_id)
return True

async def _mark_turn_ended(self, response_id: str | None) -> None:
"""Record that the model finished a turn and trigger response.create if needed."""
if not response_id:
return
pending = self._tool_calls_by_response.get(response_id)
if pending is None:
# No tool calls were tracked for this response, nothing to coalesce.
return
if pending:
# Tool outputs are still in flight; the last one to finish will request the response.
self._turn_ended_responses.add(response_id)
return

# All tool outputs have already been sent with start_response=False, so we have to
# request a response ourselves now that the turn is complete.
self._tool_calls_by_response.pop(response_id, None)
self._turn_ended_responses.discard(response_id)
await self._model.send_event(
RealtimeModelSendRawMessage(
message={"type": "response.create"},
)
)

def _on_tool_call_task_done(self, task: asyncio.Task[Any]) -> None:
self._tool_call_tasks.discard(task)

Expand Down Expand Up @@ -1117,6 +1198,8 @@ async def _cleanup(self) -> None:

# Clear pending approval tracking
self._pending_tool_calls.clear()
self._tool_calls_by_response.clear()
self._turn_ended_responses.clear()

# Mark as closed
self._closed = True
Expand Down
3 changes: 3 additions & 0 deletions tests/realtime/test_openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,9 @@ async def test_handle_tool_call_event_success(self, model):
assert tool_call_emitted.name == "get_weather"
assert tool_call_emitted.arguments == '{"location": "San Francisco"}'
assert tool_call_emitted.call_id == "call_123"
# response_id must be propagated so RealtimeSession can coalesce parallel tool
# calls per model response and avoid overlapping response.create events.
assert tool_call_emitted.response_id == "resp_789"

@pytest.mark.asyncio
async def test_audio_timing_calculation_accuracy(self, model):
Expand Down
Loading
Loading