Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -952,9 +952,54 @@ async def _run_connection(self) -> None:
self._reconnect_event.clear()
return

# Race condition fix: if audio task finished first,
# give message task up to 30s to receive the final transcript
# from Sarvam before cancelling it. Without this, FIRST_COMPLETED
# cancels the message task before the transcript arrives, causing
# silent transcript loss. This is needed because long TTS outputs
# accumulate large audio buffers that Sarvam needs time to process.
if self._audio_task in done and self._message_task not in done:
audio_exc = self._audio_task.exception()
if audio_exc is not None:
self._logger.warning(
"Audio task failed, skipping transcript wait",
extra=self._build_log_context(),
)
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _message_task completes during the extra 30s wait, it won’t be in the original done set, so any exception from _process_messages can be missed. After the wait, explicitly check/propagate self._message_task.exception() (or update the done set) so failures still surface like they do for tasks in done.

Suggested change
)
)
# Ensure any exception from the message task is propagated
# by including it in the completed tasks set.
done = done | {self._message_task}

Copilot uses AI. Check for mistakes.
else:
self._logger.info(
"Audio task completed, waiting up to 30s for transcript",
extra=self._build_log_context(),
)
try:
await asyncio.wait([self._message_task], timeout=30.0)
except asyncio.CancelledError:
raise
except Exception:
self._logger.exception(
"Error while waiting for transcript task",
extra=self._build_log_context(),
)
if self._message_task.done():
self._logger.info(
"Transcript received from Sarvam",
extra=self._build_log_context(),
)
exc = self._message_task.exception()
if exc is not None:
if isinstance(exc, BaseException):
raise exc
else:
raise RuntimeError(f"Task failed with non-BaseException: {exc}")
else:
self._logger.warning(
"Transcript timeout (30s) — transcript may be lost",
extra=self._build_log_context(),
)

# Cancel remaining tasks using LiveKit's utility
if pending:
await utils.aio.cancel_and_wait(*pending)
remaining = [t for t in pending if not t.done()]
if remaining:
await utils.aio.cancel_and_wait(*remaining)

# Check for exceptions in completed tasks
for task in done:
Expand Down