Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
APIStatusError,
APITimeoutError,
AssignmentTimeoutError,
UnexpectedModelBehavior,
create_api_error_from_http,
)
from .job import (
Expand Down Expand Up @@ -92,6 +93,7 @@
ModelSettings,
RecordingOptions,
RunContext,
RunOutputOptions,
SessionUsageUpdatedEvent,
SpeechCreatedEvent,
UserInputTranscribedEvent,
Expand Down Expand Up @@ -202,12 +204,14 @@ def __getattr__(name: str) -> typing.Any:
"SimulationVerdict",
"AgentSession",
"RecordingOptions",
"RunOutputOptions",
"text_transforms",
"AgentEvent",
"ModelSettings",
"Agent",
"AgentTask",
"AssignmentTimeoutError",
"UnexpectedModelBehavior",
"APIConnectionError",
"APIError",
"APIStatusError",
Expand Down
6 changes: 6 additions & 0 deletions livekit-agents/livekit/agents/_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from __future__ import annotations


class UnexpectedModelBehavior(RuntimeError):
"""Raised when the model behaves in a way the run cannot recover from,
e.g. a run with an output_type ends without the expected output after
exhausting its retries."""


class AssignmentTimeoutError(Exception):
"""Raised when accepting a job but not receiving an assignment within the specified timeout.
The server may have chosen another worker to handle this job."""
Expand Down
8 changes: 7 additions & 1 deletion livekit-agents/livekit/agents/voice/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from . import io, run_result
from .agent import Agent, AgentTask, ModelSettings
from .agent_session import AgentSession, RecordingOptions, VoiceActivityVideoSampler
from .agent_session import (
AgentSession,
RecordingOptions,
RunOutputOptions,
VoiceActivityVideoSampler,
)
from .events import (
AgentEvent,
AgentFalseInterruptionEvent,
Expand Down Expand Up @@ -30,6 +35,7 @@
__all__ = [
"AgentSession",
"RecordingOptions",
"RunOutputOptions",
"VoiceActivityVideoSampler",
"Agent",
"ModelSettings",
Expand Down
29 changes: 28 additions & 1 deletion livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,25 @@
from .transcription.text_transforms import TextTransforms


class RunOutputOptions(TypedDict, total=False):
"""Structured-output behavior for :meth:`AgentSession.run`.

Can be passed as a plain dict::

sess.run(
user_input=...,
output_type=MyOutput,
output_options={"max_retries": 2, "retry_instructions": "Call submit_result."},
)
"""

max_retries: int
"""Re-prompts when a run ends without its ``output_type``, before raising
UnexpectedModelBehavior. Defaults to ``2``."""
retry_instructions: str
"""Override the built-in retry prompt."""


class RecordingOptions(TypedDict, total=False):
"""Granular control over which recording features are active.

Expand Down Expand Up @@ -592,11 +611,19 @@ def run(
user_input: str,
input_modality: Literal["text", "audio"] = "text",
output_type: type[Run_T] | None = None,
output_options: RunOutputOptions | None = None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this include NOT_GIVEN so None can be used to disable the retry behavior? otherwise we have to type {"max_retries": 0} to disable it explicitly.

) -> RunResult[Run_T]:
if self._global_run_state is not None and not self._global_run_state.done():
raise RuntimeError("nested runs are not supported")

run_state = RunResult(user_input=user_input, output_type=output_type)
output_options = output_options or RunOutputOptions()
run_state = RunResult(
user_input=user_input,
output_type=output_type,
output_retries=output_options.get("max_retries", 2),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking: we could follow the _resolve* pattern here to have explicit default value(s).

output_retry_instructions=output_options.get("retry_instructions"),
session=self,
)
self._global_run_state = run_state
self.generate_reply(user_input=user_input, input_modality=input_modality)
return run_state
Expand Down
52 changes: 50 additions & 2 deletions livekit-agents/livekit/agents/voice/run_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,17 @@

if TYPE_CHECKING:
from .agent import Agent
from .agent_session import AgentSession


lk_evals_verbose = int(os.getenv("LIVEKIT_EVALS_VERBOSE", 0))

_OUTPUT_RETRY_PROMPT = (
"Plain text responses are not permitted, call the appropriate function "
"to provide your final output."
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
)


Run_T = TypeVar("Run_T")


Expand Down Expand Up @@ -66,12 +73,23 @@ class AgentHandoffEvent:


class RunResult(Generic[Run_T]):
def __init__(self, *, user_input: str | None = None, output_type: type[Run_T] | None) -> None:
def __init__(
self,
*,
user_input: str | None = None,
output_type: type[Run_T] | None,
output_retries: int = 1,
output_retry_instructions: str | None = None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking: should we just pass the output options here so default values and resolution can stay in one place?

session: AgentSession | None = None,
) -> None:
self._handles: set[SpeechHandle | asyncio.Task] = set()

self._done_fut = asyncio.Future[None]()
self._user_input = user_input
self._output_type = output_type
self._output_retries = output_retries
self._output_retry_instructions = output_retry_instructions or _OUTPUT_RETRY_PROMPT
self._session = session
self._recorded_items: list[RunEvent] = []
self._final_output: Run_T | None = None

Expand Down Expand Up @@ -213,8 +231,14 @@ def _mark_done(self) -> None:
final_output = self.__last_speech_handle._maybe_run_final_output
if not isinstance(final_output, BaseException):
if self._output_type and not isinstance(final_output, self._output_type):
# only the no-output case is retryable: a completed task is
# one-shot, so a wrong type cannot change on a retry
if final_output is None and self._maybe_retry_output():
return
from .._exceptions import UnexpectedModelBehavior

self._done_fut.set_exception(
RuntimeError(
UnexpectedModelBehavior(
f"Expected output of type {self._output_type.__name__}, "
f"got {type(final_output).__name__}"
)
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Expand All @@ -225,6 +249,30 @@ def _mark_done(self) -> None:
else:
self._done_fut.set_exception(final_output)

def _maybe_retry_output(self) -> bool:
"""Re-prompt the model when the run ended without the expected output
type. Returns True when a retry was scheduled."""
if self._output_retries <= 0 or self._session is None:
return False
self._output_retries -= 1

from ..log import logger

try:
# generate_reply attaches the new handle to this run state (it is
# still the session's active run); instructions inject as a
# per-turn system message instead of a fake user message.
self._session.generate_reply(instructions=self._output_retry_instructions)
except Exception:
# an unhandled exception here would leave the run future
# unresolved; fall through to UnexpectedModelBehavior instead
return False
logger.warning(
"run ended without the expected output type, retrying",
extra={"output_type": self._output_type.__name__ if self._output_type else None},
)
return True

def _find_insertion_index(self, *, created_at: float) -> int:
"""
Returns the index to insert an item by creation time.
Expand Down
Loading