AGT-2182: Add inference bargein and examples#4771
AGT-2182: Add inference bargein and examples#4771chenghao-mou wants to merge 32 commits intomainfrom
Conversation
Co-authored-by: Chenghao Mou <chenghao.mou@livekit.io> Co-authored-by: Théo Monnom <theo.monnom@outlook.com>
| except Exception as e: | ||
| raise APIError(f"error during interruption prediction: {e}") from e |
There was a problem hiding this comment.
🟡 InterruptionHttpStream.predict double-wraps APIError exceptions
When an error occurs inside the async with self._session.post(...) response processing (e.g., resp.raise_for_status() fails or JSON parsing fails), the inner except Exception at line 744 catches it and raises an APIError. This APIError then propagates out of the async with block and the outer try. Since APIError is not asyncio.TimeoutError or aiohttp.ClientError, it is NOT caught by line 747, but IS caught by the generic except Exception at line 749, which wraps it in yet another APIError.
Root Cause and Impact
The exception handling structure has three nested layers:
- Inner
try/except Exception(line 723-746) - catches response processing errors, raisesAPIError - Outer
except (asyncio.TimeoutError, aiohttp.ClientError)(line 747) - catches connection/timeout errors - Outer
except Exception(line 749) - intended as a catch-all, but also catches theAPIErrorfrom step 1
Since APIError(Exception) is a subclass of Exception, the APIError raised at line 746 escapes the async with block and hits the outer except Exception at line 749. The result is a double-wrapped error message like:
"error during interruption prediction: error during interruption prediction: 404 Not Found"
Additionally, the body parameter from the inner APIError (which contains the response text for debugging) is lost in the re-wrapping.
Impact: Confusing error messages in logs and loss of the response body diagnostic information when interruption prediction HTTP calls fail.
| except Exception as e: | |
| raise APIError(f"error during interruption prediction: {e}") from e | |
| except APIError: | |
| raise | |
| except Exception as e: | |
| raise APIError(f"error during interruption prediction: {e}") from e |
Was this helpful? React with 👍 or 👎 to provide feedback.
| interruption_mode: NotGivenOr[Literal["adaptive", "vad", False]] = NOT_GIVEN | ||
| if allow_interruptions is False: | ||
| interruption_mode = False |
There was a problem hiding this comment.
🔴 allow_interruptions=True (explicit) is silently ignored in TurnHandlingConfig.migrate
When a user explicitly passes allow_interruptions=True to Agent.__init__ or AgentTask.__init__, the TurnHandlingConfig.migrate method only checks if allow_interruptions is False: (line 141). When allow_interruptions=True, the interruption_mode stays as NOT_GIVEN, which means the InterruptionConfig.mode defaults to NOT_GIVEN. Later in Agent.__init__ at agent.py:95, is_given(turn_handling.interruption_cfg.mode) returns False, so self._allow_interruptions stays NOT_GIVEN — the explicit True is lost.
Detailed Explanation
The old API allowed allow_interruptions=True to be explicitly set on an Agent, which would override the session's default. With the new migration code:
Agent.__init__callsTurnHandlingConfig.migrate(allow_interruptions=True)- In
migrate,allow_interruptions is False→False, sointerruption_modestaysNOT_GIVEN InterruptionConfigis created withmode=NOT_GIVEN- Back in
Agent.__init__:is_given(turn_handling.interruption_cfg.mode)→False self._allow_interruptionsstaysNOT_GIVENinstead of being set toTrue
This means an Agent(allow_interruptions=True) no longer overrides a session's allow_interruptions=False. The fix should also set interruption_mode when allow_interruptions is True.
Impact: Agents that explicitly set allow_interruptions=True to override a session-level allow_interruptions=False will no longer work correctly — the session's setting will take precedence.
| interruption_mode: NotGivenOr[Literal["adaptive", "vad", False]] = NOT_GIVEN | |
| if allow_interruptions is False: | |
| interruption_mode = False | |
| interruption_mode: NotGivenOr[Literal["adaptive", "vad", False]] = NOT_GIVEN | |
| if is_given(allow_interruptions): | |
| if allow_interruptions is False: | |
| interruption_mode = False | |
| else: | |
| interruption_mode = "adaptive" |
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
It is intentional so that it can be resolved dynamically to either VAD or adaptive interruption.
| async def _send_task() -> None: | ||
| nonlocal overlap_speech_started | ||
| nonlocal cache | ||
| async for data in data_chan: | ||
| if self._overlap_speech_started_at is None: | ||
| continue | ||
| resp = await self.predict(data) | ||
| created_at = resp["created_at"] | ||
| cache[created_at] = entry = InterruptionCacheEntry( | ||
| created_at=created_at, | ||
| total_duration=(time.perf_counter_ns() - created_at) / 1e9, | ||
| speech_input=data, | ||
| detection_delay=time.time() - self._overlap_speech_started_at, | ||
| probabilities=resp["probabilities"], | ||
| prediction_duration=resp["prediction_duration"], | ||
| is_interruption=resp["is_bargein"], | ||
| ) | ||
| if overlap_speech_started and entry.is_interruption: | ||
| logger.debug("user interruption detected") | ||
| if self._user_speech_span: | ||
| self._update_user_speech_span(self._user_speech_span, entry) | ||
| self._user_speech_span = None | ||
| ev = InterruptionEvent( | ||
| type="user_interruption_detected", | ||
| timestamp=time.time(), | ||
| overlap_speech_started_at=self._overlap_speech_started_at, | ||
| is_interruption=entry.is_interruption, | ||
| speech_input=entry.speech_input, | ||
| probabilities=entry.probabilities, | ||
| total_duration=entry.get_total_duration(), | ||
| prediction_duration=entry.get_prediction_duration(), | ||
| detection_delay=entry.get_detection_delay(), | ||
| probability=entry.get_probability(), | ||
| ) | ||
| self._event_ch.send_nowait(ev) | ||
| self._model.emit("user_interruption_detected", ev) | ||
| overlap_speech_started = False |
There was a problem hiding this comment.
🟡 Shared mutable overlap_speech_started between concurrent tasks in InterruptionHttpStream without synchronization
In InterruptionHttpStream._run(), the overlap_speech_started boolean is shared between _forward_data and _send_task via nonlocal. _forward_data writes to it (lines 566, 590, 631, 699) and _send_task reads it (line 680). Since _send_task contains await self.predict(data) (line 669), the event loop can switch between the two tasks between the await and the subsequent read of overlap_speech_started at line 680.
Root Cause
The specific scenario is:
_forward_datasetsoverlap_speech_started = True(line 590)_send_taskpicks up data and callsawait self.predict(data)(line 669)- While
predictis awaiting,_forward_dataprocesses an_OverlapSpeechEndedSentineland setsoverlap_speech_started = False(line 631), emitting auser_non_interruption_detectedevent predictreturns withis_interruption=True_send_taskchecksoverlap_speech_started(line 680) — it's nowFalse, so the interruption is silently dropped
This is a race condition that could cause missed interruption detections. However, since this is async (not threaded) and the window is narrow (only during the HTTP request), the practical impact is limited to edge cases where overlap speech ends exactly during an in-flight prediction.
Impact: Occasional missed interruption detections when overlap speech ends during an in-flight HTTP prediction request.
Was this helpful? React with 👍 or 👎 to provide feedback.
| endpointing_kwargs = {} | ||
| # allow not given values for agent to inherit from session | ||
| endpointing_kwargs["min_delay"] = min_endpointing_delay | ||
| endpointing_kwargs["max_delay"] = max_endpointing_delay |
There was a problem hiding this comment.
🟡 endpointing_kwargs dict is always non-empty, overriding EndpointingConfig defaults with NOT_GIVEN
In TurnHandlingConfig.migrate(), endpointing_kwargs always gets two keys set (lines 146-147), so the if endpointing_kwargs: check at line 166 is always True. When min_endpointing_delay and max_endpointing_delay are both NOT_GIVEN, this creates EndpointingConfig(min_delay=NOT_GIVEN, max_delay=NOT_GIVEN), which overrides the class defaults of 0.5 and 3.0.
Root Cause
The EndpointingConfig class defines:
min_delay: NotGivenOr[float] = 0.5
max_delay: NotGivenOr[float] = 3.0But migrate() always passes these explicitly:
endpointing_kwargs["min_delay"] = min_endpointing_delay # could be NOT_GIVEN
endpointing_kwargs["max_delay"] = max_endpointing_delay # could be NOT_GIVENThis means EndpointingConfig(min_delay=NOT_GIVEN, max_delay=NOT_GIVEN) is created, which sets the fields to NOT_GIVEN instead of using the defaults 0.5 and 3.0.
For AgentSession, this is mitigated because the caller already provides fallback defaults (0.5 and 3.0). For Agent and AgentTask, NOT_GIVEN is intentional (agents inherit from session). So the practical impact is limited, but the endpointing_kwargs dict check is misleading — it will never be empty.
Impact: Low — the callers already handle this correctly, but the code is misleading.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
AgentSession uses the default value when it is not given. This is expected per the comment.
| def on_interruption(self, ev: inference.InterruptionEvent) -> None: | ||
| # restore interruption by audio activity | ||
| self._restore_interruption_by_audio_activity() | ||
| self._interrupt_by_audio_activity() | ||
| if self._audio_recognition: | ||
| self._audio_recognition.on_end_of_agent_speech( | ||
| ignore_user_transcript_until=ev.overlap_speech_started_at or ev.timestamp | ||
| ) |
There was a problem hiding this comment.
🔴 Duplicate on_end_of_agent_speech call in on_interruption sets wrong transcript ignore cutoff
When adaptive interruption detection fires with use_pause=True, _ignore_user_transcript_until is set to time.time() instead of the correct ev.overlap_speech_started_at, causing user transcripts from the overlap period to be incorrectly dropped.
Root Cause
In on_interruption (agent_activity.py:1381-1388):
def on_interruption(self, ev):
self._restore_interruption_by_audio_activity()
self._interrupt_by_audio_activity() # ← first call to on_end_of_agent_speech
if self._audio_recognition:
self._audio_recognition.on_end_of_agent_speech(
ignore_user_transcript_until=ev.overlap_speech_started_at or ev.timestamp # ← second call
)When _interrupt_by_audio_activity() follows the use_pause path (agent_activity.py:1298-1305), it calls on_end_of_agent_speech(ignore_user_transcript_until=time.time()) internally. This sets _agent_speaking=False and _ignore_user_transcript_until=time.time().
When the second call from on_interruption runs with the correct ev.overlap_speech_started_at, _agent_speaking is already False, so the body is skipped and the better value is never applied.
The _flush_held_transcripts task (scheduled by the first call) then uses time.time() as the cutoff. Since time.time() is later than ev.overlap_speech_started_at (the overlap started earlier), transcripts whose end_time + input_started_at < time.time() are dropped — including the user's actual interruption speech during the overlap period.
Impact: When adaptive interruption detection fires and resume_false_interruption=True with false_interruption_timeout set, user transcripts from the overlap period (typically 100-500ms) can be silently dropped, causing the agent to miss what the user said during their interruption.
Prompt for agents
In agent_activity.py's on_interruption method (around line 1381), the call to _interrupt_by_audio_activity() also calls on_end_of_agent_speech internally (when use_pause=True) with time.time() as the cutoff. This prevents the subsequent on_end_of_agent_speech call with the correct ev.overlap_speech_started_at from taking effect because _agent_speaking is already False.
To fix this, the on_interruption method should pass the correct ignore_user_transcript_until value BEFORE calling _interrupt_by_audio_activity, or _interrupt_by_audio_activity should accept an optional ignore_user_transcript_until parameter to use instead of time.time(). For example, you could refactor on_interruption to:
1. Call _restore_interruption_by_audio_activity()
2. Call on_end_of_agent_speech(ignore_user_transcript_until=ev.overlap_speech_started_at or ev.timestamp) first
3. Then call _interrupt_by_audio_activity() but skip its internal on_end_of_agent_speech call
Alternatively, add an optional parameter to _interrupt_by_audio_activity like ignore_user_transcript_until: float | None = None, and pass ev.overlap_speech_started_at when calling from on_interruption, then use that value instead of time.time() in the use_pause path.
Was this helpful? React with 👍 or 👎 to provide feedback.
| If the event has no timestamps, we assume it is the same as the next valid event. | ||
| """ | ||
| if not self._interruption_enabled: | ||
| return |
There was a problem hiding this comment.
🟡 _flush_held_transcripts leaks buffered events when _ignore_user_transcript_until is reset before the flush task runs
When adaptive interruption detection is enabled, _flush_held_transcripts returns early without clearing _transcript_buffer if _ignore_user_transcript_until is NOT_GIVEN. This causes stale transcript events to accumulate in the buffer.
Root Cause and Race Condition
The race condition occurs between on_end_of_agent_speech and incoming STT events:
on_end_of_agent_speechatlivekit-agents/livekit/agents/voice/audio_recognition.py:289sets_ignore_user_transcript_untiland creates a flush task viaasyncio.create_task(self._flush_held_transcripts())self._agent_speaking = Falseruns synchronously at line 293- Before the flush task executes, a
START_OF_SPEECHSTT event arrives _should_hold_stt_eventat line 362 seesSTART_OF_SPEECH+ not agent_speaking → resets_ignore_user_transcript_until = NOT_GIVEN- The flush task finally runs but
_flush_held_transcriptsat line 302 doesif not is_given(self._ignore_user_transcript_until): return— it returns early without clearing the buffer
After this, _transcript_buffer still contains stale events. Every subsequent call to _on_stt_event hits the elif self._transcript_buffer: check at line 614, re-calling _flush_held_transcripts which again returns early. The buffer is only cleaned when update_stt(None) / update_stt(stt) is called in clear_user_turn.
Impact: Valid user transcript events from during overlap speech are silently dropped rather than being emitted. Additionally, every subsequent STT event makes an unnecessary (no-op) call to _flush_held_transcripts until the STT is reset.
| If the event has no timestamps, we assume it is the same as the next valid event. | |
| """ | |
| if not self._interruption_enabled: | |
| return | |
| if not is_given(self._ignore_user_transcript_until): | |
| self._transcript_buffer.clear() | |
| return | |
| if not self._transcript_buffer: |
Was this helpful? React with 👍 or 👎 to provide feedback.
No description provided.