Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion docs/realtime/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ Bare `RealtimeAgent` handoffs are auto-wrapped, and `realtime_handoff(...)` lets

### Guardrails

Realtime agents support output guardrails on agent responses and input guardrails on function-tool calls. Output guardrails run on debounced transcript accumulation rather than on every partial token, and they emit `guardrail_tripped` instead of raising an exception.
Realtime agents support output guardrails on agent responses and input guardrails on the user's
transcribed audio. (Function-tool calls have their own, separate tool input guardrails, which are a
distinct feature from the transcript input guardrails described here.) Output guardrails run on
debounced transcript accumulation rather than on every partial token, and they emit
`guardrail_tripped` instead of raising an exception.

```python
from agents.guardrail import GuardrailFunctionOutput, OutputGuardrail
Expand All @@ -270,6 +274,36 @@ triggered guardrail so the model can produce a replacement response. Your audio
listen for `audio_interrupted` and stop local playback immediately, because guardrails run on
debounced transcript text and some audio may already be buffered when the tripwire fires.

Realtime agents also support **input guardrails** that run on the user's transcribed audio. Configure
them via `RealtimeAgent.input_guardrails` or `RealtimeRunConfig["input_guardrails"]`; the two lists
are combined and de-duplicated per turn. They run once on the completed user transcript (the
`input_audio_transcription_completed` event), and when one trips the session emits an
`input_guardrail_tripped` event, forces `response.cancel`, and sends a follow-up user message that
names the triggered guardrail.

```python
from agents.guardrail import GuardrailFunctionOutput, InputGuardrail


def no_jailbreak(context, agent, user_input):
return GuardrailFunctionOutput(
tripwire_triggered="jailbreak" in user_input.lower(),
output_info=None,
)


agent = RealtimeAgent(
name="Assistant",
instructions="...",
input_guardrails=[InputGuardrail(guardrail_function=no_jailbreak)],
)
```

Two limitations are worth noting. Input guardrails only run on transcribed audio, so text sent
through `session.send_message()` is not checked. And because guardrails run in a background task,
the forced cancel reliably interrupts a response that is already in flight, but a response created
in the narrow window after the guardrail resolves may not be cancelled.

## SIP and telephony

The Python SDK includes a first-class SIP attach flow via [`OpenAIRealtimeSIPModel`][agents.realtime.openai_realtime.OpenAIRealtimeSIPModel].
Expand Down
1 change: 1 addition & 0 deletions docs/ref/realtime/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

### Guardrail Events
::: agents.realtime.events.RealtimeGuardrailTripped
::: agents.realtime.events.RealtimeInputGuardrailTripped

### History Events
::: agents.realtime.events.RealtimeHistoryAdded
Expand Down
4 changes: 4 additions & 0 deletions examples/realtime/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
base_event["guardrail_results"] = [
{"name": result.guardrail.name} for result in event.guardrail_results
]
elif event.type == "input_guardrail_tripped":
base_event["guardrail_results"] = [
{"name": result.guardrail.name} for result in event.guardrail_results
]
elif event.type == "raw_model_event":
base_event["raw_model_event"] = {
"type": event.data.type,
Expand Down
2 changes: 2 additions & 0 deletions src/agents/realtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
RealtimeHandoffEvent,
RealtimeHistoryAdded,
RealtimeHistoryUpdated,
RealtimeInputGuardrailTripped,
RealtimeRawModelEvent,
RealtimeSessionEvent,
RealtimeToolApprovalRequired,
Expand Down Expand Up @@ -132,6 +133,7 @@
"RealtimeHandoffEvent",
"RealtimeHistoryAdded",
"RealtimeHistoryUpdated",
"RealtimeInputGuardrailTripped",
"RealtimeRawModelEvent",
"RealtimeSessionEvent",
"RealtimeToolApprovalRequired",
Expand Down
9 changes: 8 additions & 1 deletion src/agents/realtime/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from agents.prompts import Prompt

from ..agent import AgentBase
from ..guardrail import OutputGuardrail
from ..guardrail import InputGuardrail, OutputGuardrail
from ..handoffs import Handoff
from ..lifecycle import AgentHooksBase, RunHooksBase
from ..logger import logger
Expand Down Expand Up @@ -79,6 +79,13 @@ class RealtimeAgent(AgentBase, Generic[TContext]):
"""A class that receives callbacks on various lifecycle events for this agent.
"""

input_guardrails: list[InputGuardrail[TContext]] = field(default_factory=list)
"""A list of checks that run on the user's transcribed audio input. They run once on the
completed user transcript and, when tripped, force a cancel of the in-progress response. This
reliably interrupts a response that is already in flight, but a response created after the
guardrail resolves may not be interrupted. Text input sent via `send_message` is not checked.
"""

def __post_init__(self) -> None:
if not isinstance(self.name, str):
raise TypeError(f"RealtimeAgent name must be a string, got {type(self.name).__name__}")
Expand Down
5 changes: 4 additions & 1 deletion src/agents/realtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from agents.prompts import Prompt

from ..guardrail import OutputGuardrail
from ..guardrail import InputGuardrail, OutputGuardrail
from ..handoffs import Handoff
from ..model_settings import ToolChoice
from ..run_config import ToolErrorFormatter
Expand Down Expand Up @@ -279,6 +279,9 @@ class RealtimeRunConfig(TypedDict):
tool_error_formatter: NotRequired[ToolErrorFormatter]
"""Optional callback that formats tool error messages returned to the model."""

input_guardrails: NotRequired[list[InputGuardrail[Any]]]
"""List of input guardrails to run on the user's transcribed audio input."""

# TODO (rm) Add history audio storage config


Expand Down
25 changes: 24 additions & 1 deletion src/agents/realtime/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from typing import Any, Literal, TypeAlias

from ..guardrail import OutputGuardrailResult
from ..guardrail import InputGuardrailResult, OutputGuardrailResult
from ..run_context import RunContextWrapper
from ..tool import Tool
from .agent import RealtimeAgent
Expand Down Expand Up @@ -243,6 +243,28 @@ class RealtimeGuardrailTripped:
type: Literal["guardrail_tripped"] = "guardrail_tripped"


@dataclass
class RealtimeInputGuardrailTripped:
"""An input guardrail has been tripped on the user's transcribed input.

When a guardrail trips, the session forces a cancel of the in-progress response. This
reliably interrupts a response that is already in flight. Because guardrails run in a
background task, a response that is created in the narrow window after the guardrail
resolves but before the cancel can take effect may not be interrupted.
"""

guardrail_results: list[InputGuardrailResult]
"""The results from all triggered input guardrails."""

message: str
"""The user transcript that triggered the guardrail."""

info: RealtimeEventInfo
"""Common info for all events, such as the context."""

type: Literal["input_guardrail_tripped"] = "input_guardrail_tripped"


@dataclass
class RealtimeInputAudioTimeoutTriggered:
"""Called when the model detects a period of inactivity/silence from the user."""
Expand All @@ -268,6 +290,7 @@ class RealtimeInputAudioTimeoutTriggered:
| RealtimeHistoryUpdated
| RealtimeHistoryAdded
| RealtimeGuardrailTripped
| RealtimeInputGuardrailTripped
| RealtimeInputAudioTimeoutTriggered
)
"""An event emitted by the realtime session."""
110 changes: 110 additions & 0 deletions src/agents/realtime/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from ..agent import Agent
from ..exceptions import ToolInputGuardrailTripwireTriggered, UserError
from ..guardrail import InputGuardrail, InputGuardrailResult
from ..handoffs import Handoff
from ..items import ToolApprovalItem
from ..logger import logger
Expand All @@ -43,6 +44,7 @@
RealtimeHistoryAdded,
RealtimeHistoryUpdated,
RealtimeInputAudioTimeoutTriggered,
RealtimeInputGuardrailTripped,
RealtimeRawModelEvent,
RealtimeSessionEvent,
RealtimeToolApprovalRequired,
Expand Down Expand Up @@ -202,6 +204,8 @@ def __init__(

# Guardrails state tracking
self._interrupted_response_ids: set[str] = set()
# User item_ids for which an input guardrail has already interrupted the response.
self._interrupted_input_item_ids: set[str] = set()
self._item_transcripts: dict[str, str] = {} # item_id -> accumulated transcript
self._item_guardrail_run_counts: dict[str, int] = {} # item_id -> run count
self._debounce_text_length = self._run_config.get("guardrails_settings", {}).get(
Expand Down Expand Up @@ -365,6 +369,10 @@ async def on_event(self, event: RealtimeModelEvent) -> None:
await self._put_event(
RealtimeHistoryUpdated(info=self._event_info, history=self._history)
)
# Run input guardrails on the finalized user transcript. The transcription completes
# around the time the server begins generating a response, so we mirror the
# output-guardrail trip behavior and force a response cancel when a guardrail trips.
self._enqueue_input_guardrail_task(event.transcript, event.item_id)
elif event.type == "input_audio_timeout_triggered":
await self._put_event(
RealtimeInputAudioTimeoutTriggered(
Expand Down Expand Up @@ -1263,6 +1271,81 @@ async def _run_output_guardrails(self, text: str, response_id: str) -> bool:

return False

async def _run_input_guardrails(
self,
text: str,
item_id: str,
agent: RealtimeAgent,
input_guardrails: list[InputGuardrail[Any]],
) -> bool:
"""Run input guardrails on the user's transcribed input. Returns True if any guardrail was
triggered.

``agent`` and ``input_guardrails`` are snapshotted when the transcription event is handled
so that a concurrent ``update_agent()`` or handoff cannot swap in a different agent's
guardrails before this background task runs.
"""
# If we've already interrupted the response for this user item, skip.
if not input_guardrails or item_id in self._interrupted_input_item_ids:
return False

async def _run_one(guardrail: InputGuardrail[Any]) -> InputGuardrailResult | None:
try:
return await guardrail.run(
# TODO (rm) Remove this cast, it's wrong
cast(Agent[Any], agent),
text,
self._context_wrapper,
)
except Exception as exc:
logger.warning(
"Input guardrail %r raised %s: %s; skipping it.",
guardrail.get_name(),
type(exc).__name__,
exc,
)
logger.debug("Input guardrail failure details.", exc_info=True)
return None

# Run the guardrails concurrently so a slow guardrail cannot delay the forced cancel behind
# unrelated guardrails, which would let the unsafe turn keep generating.
results = await asyncio.gather(*(_run_one(guardrail) for guardrail in input_guardrails))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Cancel realtime input on the first tripped guardrail

Fresh evidence: this version now uses asyncio.gather, but gather still waits for every input guardrail to finish before inspecting triggered_results. When one guardrail trips quickly but another model-backed guardrail is slow, the unsafe audio turn keeps generating until the slowest guardrail returns, delaying the forced response.cancel that this feature relies on. Use asyncio.as_completed/task cancellation like the streamed input guardrail path so the session interrupts as soon as the first tripwire result is available.

Useful? React with 👍 / 👎.

triggered_results = [
result for result in results if result is not None and result.output.tripwire_triggered
]

if triggered_results:
# Double-check: bail if already interrupted for this user item.
if item_id in self._interrupted_input_item_ids:
return False

# Mark as interrupted immediately (before any awaits) to minimize the race window.
self._interrupted_input_item_ids.add(item_id)

# Emit input guardrail tripped event.
await self._put_event(
RealtimeInputGuardrailTripped(
guardrail_results=triggered_results,
message=text,
info=self._event_info,
)
)

# Interrupt the model, forcing a cancel of any in-progress response.
await self._model.send_event(RealtimeModelSendInterrupt(force_response_cancel=True))

# Send guardrail triggered message.
guardrail_names = [result.guardrail.get_name() for result in triggered_results]
await self._model.send_event(
RealtimeModelSendUserInput(
user_input=f"input guardrail triggered: {', '.join(guardrail_names)}"
)
)

return True

return False

def _enqueue_guardrail_task(self, text: str, response_id: str) -> None:
# Runs the guardrails in a separate task to avoid blocking the main loop

Expand All @@ -1272,6 +1355,33 @@ def _enqueue_guardrail_task(self, text: str, response_id: str) -> None:
# Add callback to remove completed tasks and handle exceptions
task.add_done_callback(self._on_guardrail_task_done)

def _enqueue_input_guardrail_task(self, text: str, item_id: str) -> None:
# Snapshot the active agent and its guardrails now; a later update_agent()/handoff must not
# change which guardrails run against this transcript.
agent = self._current_agent
combined_guardrails = agent.input_guardrails + self._run_config.get("input_guardrails", [])

seen_ids: set[int] = set()
input_guardrails: list[InputGuardrail[Any]] = []
for guardrail in combined_guardrails:
guardrail_id = id(guardrail)
if guardrail_id not in seen_ids:
input_guardrails.append(guardrail)
seen_ids.add(guardrail_id)

# Skip creating a no-op task when no input guardrails are configured.
if not input_guardrails:
return

# Runs the input guardrails in a separate task to avoid blocking the main loop.
task = asyncio.create_task(
self._run_input_guardrails(text, item_id, agent, input_guardrails)
)
# Reuse the shared guardrail task set + done callback so completed tasks are removed,
# exceptions surface as events, and close() cancels any still-running task.
self._guardrail_tasks.add(task)
task.add_done_callback(self._on_guardrail_task_done)

def _on_guardrail_task_done(self, task: asyncio.Task[Any]) -> None:
"""Handle completion of a guardrail task."""
# Remove from tracking set
Expand Down
Loading