Conversation
📝 WalkthroughWalkthroughAdds a Telnyx plugin and example voice agent: streaming STT and TTS backends, plugin registration, session management utilities, project packaging, and an examples/voice_agents/telnyx_voice_agent.py demonstrating RTC session wiring, VAD prewarm, function tools, metrics collection, and a CLI entrypoint. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as AgentServer/Agent
participant STT as Telnyx STT
participant Stream as SpeechStream
participant WS as Telnyx WebSocket
Client->>STT: recognize(audio_stream)
STT->>Stream: create stream()
Stream->>WS: connect (wss + auth)
loop sending audio
Client->>Stream: push(audio_frame)
Stream->>WS: send WAV header / chunk
end
Client->>Stream: end_input
loop receive events
WS-->>Stream: JSON event (interim/final)
Stream->>Client: emit INTERIM/FINAL transcript
end
WS-->>Stream: stream_finished
Stream->>STT: close()
sequenceDiagram
participant Client as AgentServer/Agent
participant TTS as Telnyx TTS
participant Stream as SynthesizeStream
participant WS as Telnyx WebSocket
participant Decoder as MP3 Decoder
Client->>TTS: stream()
TTS->>Stream: new SynthesizeStream()
loop send text segments
Client->>Stream: push_text(segment)
end
Client->>Stream: flush()
Stream->>WS: connect (wss + auth) and send text
loop receive audio chunks
WS-->>Stream: JSON with base64 MP3
Stream->>Decoder: feed MP3 bytes
Decoder-->>Stream: PCM frames
Stream->>Client: emit PCM frame
end
WS-->>Stream: stream_finished
Stream->>TTS: close()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py`:
- Around line 221-225: The current debug log prints the entire STT payload
(logger.debug("Telnyx STT received: %s", data)), which can expose PII; change
the logging in the WebSocket TEXT handler to avoid dumping raw transcripts by
creating a redacted summary before logging: inspect the parsed data (variable
data from json.loads(msg.data) used before calling _process_stream_event),
replace or omit fields like "transcript", "text", "alternatives", or "raw" with
either a redaction token (e.g. "[REDACTED]") or metadata such as their lengths,
and then call logger.debug with that summary (only metadata/keys/timestamps),
not the full payload. Ensure _process_stream_event continues to receive the
original data unmodified.
- Around line 203-219: The recv_task can falsely raise APIStatusError because
closing_ws is only set after the 1s sleep and close; set closing_ws = True
before awaiting the delay/closing so the recv_task sees the flag if the server
closes the connection in response to our shutdown. Update the shutdown sequence
in the function that calls ws.close() (the block that currently does await
asyncio.sleep(1.0); closing_ws = True; await ws.close()) to assign closing_ws =
True first, then await asyncio.sleep(1.0) and finally await ws.close(), so
recv_task (which checks closing_ws) will treat server-side closes as expected
shutdowns.
In `@livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py`:
- Around line 125-134: The generic exception handler in _run currently wraps all
errors from _run_ws as APIConnectionError, losing APIStatusError and
APITimeoutError details; change the except Exception as e block in _run to
re-raise those known API exceptions (APITimeoutError, APIStatusError) untouched
(raise) and only wrap other unknown exceptions as APIConnectionError (raise
APIConnectionError() from e), so that _run preserves diagnostics and retry
semantics coming from _run_ws.
- Around line 139-212: The code currently calls
output_emitter.start_segment(segment_id=segment_id) before the WebSocket work
but only calls output_emitter.end_segment() after the try/except that can be
bypassed on exceptions; move the end_segment() call into a finally that always
runs (alongside the existing decoder.aclose() cleanup) so that
output_emitter.end_segment() executes regardless of WS/connect/handler failures
(update references around ensure_session().ws_connect, the ws async-with block,
and the finally that currently awaits decoder.aclose()).
🧹 Nitpick comments (2)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py (1)
44-301: Add Google-style docstrings for the public STT surface.
STT,SpeechStream, and_create_streaming_wav_headerare public/central APIs but lack Google-style docstrings. Please add concise docstrings with Args/Returns where applicable.
As per coding guidelines: Use Google-style docstrings.livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py (1)
37-212: Add Google-style docstrings for the public TTS surface.
TTSandSynthesizeStreamare public APIs but lack Google-style docstrings. Please add docstrings with Args/Returns where applicable.
As per coding guidelines: Use Google-style docstrings.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/py.typedlivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.pylivekit-plugins/livekit-plugins-telnyx/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (1)
- livekit-plugins/livekit-plugins-telnyx/pyproject.toml
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
🧠 Learnings (5)
📚 Learning: 2026-01-30T12:53:12.738Z
Learnt from: milanperovic
Repo: livekit/agents PR: 4660
File: livekit-plugins/livekit-plugins-personaplex/livekit/plugins/personaplex/__init__.py:19-21
Timestamp: 2026-01-30T12:53:12.738Z
Learning: In plugin __init__.py files under the livekit-plugins or similar plugin directories, place internal imports (for example, from .log import logger) after the __all__ definition. These imports are used for plugin registration and are not part of the public API. This pattern is used across plugins (e.g., openai, deepgram, ultravox) and helps avoid E402 violations while keeping the public API surface clean.
Applied to files:
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Applies to **/*.py : Run ruff linter and auto-fix issues
Applied to files:
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Applies to **/*.py : Format code with ruff
Applied to files:
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Implement Model Interface Pattern for STT, TTS, LLM, and Realtime models with provider-agnostic interfaces, fallback adapters for resilience, and stream adapters for different streaming patterns
Applied to files:
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.pylivekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Follow the Plugin System pattern where plugins in livekit-plugins/ are separate packages registered via the Plugin base class
Applied to files:
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py
🧬 Code graph analysis (3)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py (6)
livekit-agents/livekit/agents/_exceptions.py (3)
APIConnectionError(84-88)APIStatusError(45-81)APITimeoutError(91-95)livekit-agents/livekit/agents/types.py (1)
APIConnectOptions(54-88)livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/common.py (4)
SessionManager(22-36)get_api_key(15-19)close(33-36)ensure_session(27-31)livekit-agents/livekit/agents/tts/tts.py (3)
TTSCapabilities(47-51)num_channels(121-122)exception(213-214)livekit-agents/livekit/agents/utils/aio/channel.py (1)
Chan(49-178)livekit-agents/livekit/agents/utils/codecs/decoder.py (1)
AudioStreamDecoder(119-339)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py (3)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py (1)
STT(44-132)livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py (1)
TTS(37-87)livekit-agents/livekit/agents/plugin.py (2)
Plugin(13-56)register_plugin(31-36)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py (5)
livekit-agents/livekit/agents/types.py (1)
APIConnectOptions(54-88)livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/common.py (4)
SessionManager(22-36)get_api_key(15-19)close(33-36)ensure_session(27-31)livekit-agents/livekit/agents/stt/stt.py (4)
SpeechEvent(70-74)SpeechEventType(32-49)SpeechData(53-61)RecognizeStream(252-469)livekit-agents/livekit/agents/utils/log.py (1)
log_exceptions(9-41)livekit-agents/livekit/agents/utils/audio.py (1)
AudioByteStream(41-157)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: type-check (3.9)
- GitHub Check: unit-tests
- GitHub Check: type-check (3.13)
🔇 Additional comments (1)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py (1)
1-18: LGTM — plugin registration and exports are wired correctly.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
Outdated
Show resolved
Hide resolved
| segment_id = utils.shortuuid() | ||
| output_emitter.start_segment(segment_id=segment_id) | ||
|
|
||
| url = f"{self._tts._opts.base_url}?voice={self._tts._opts.voice}" | ||
| headers = {"Authorization": f"Bearer {self._tts._opts.api_key}"} | ||
|
|
||
| decoder = utils.codecs.AudioStreamDecoder( | ||
| sample_rate=SAMPLE_RATE, | ||
| num_channels=NUM_CHANNELS, | ||
| format="audio/mp3", | ||
| ) | ||
|
|
||
| async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None: | ||
| await ws.send_str(json.dumps({"text": " "})) | ||
| self._mark_started() | ||
| await ws.send_str(json.dumps({"text": text})) | ||
| await ws.send_str(json.dumps({"text": ""})) | ||
|
|
||
| async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: | ||
| async for msg in ws: | ||
| if msg.type == aiohttp.WSMsgType.TEXT: | ||
| try: | ||
| data = json.loads(msg.data) | ||
| audio_data = data.get("audio") | ||
| if audio_data: | ||
| audio_bytes = base64.b64decode(audio_data) | ||
| if audio_bytes: | ||
| decoder.push(audio_bytes) | ||
| except json.JSONDecodeError: | ||
| logger.warning("Telnyx TTS: Received invalid JSON") | ||
|
|
||
| elif msg.type in ( | ||
| aiohttp.WSMsgType.CLOSE, | ||
| aiohttp.WSMsgType.CLOSED, | ||
| aiohttp.WSMsgType.CLOSING, | ||
| ): | ||
| break | ||
| elif msg.type == aiohttp.WSMsgType.ERROR: | ||
| logger.error(f"Telnyx TTS WebSocket error: {ws.exception()}") | ||
| break | ||
|
|
||
| decoder.end_input() | ||
|
|
||
| async def decode_task() -> None: | ||
| async for frame in decoder: | ||
| output_emitter.push(frame.data.tobytes()) | ||
|
|
||
| try: | ||
| ws = await asyncio.wait_for( | ||
| self._tts._session_manager.ensure_session().ws_connect(url, headers=headers), | ||
| self._conn_options.timeout, | ||
| ) | ||
| async with ws: | ||
| tasks = [ | ||
| asyncio.create_task(send_task(ws)), | ||
| asyncio.create_task(recv_task(ws)), | ||
| asyncio.create_task(decode_task()), | ||
| ] | ||
| try: | ||
| await asyncio.gather(*tasks) | ||
| finally: | ||
| await utils.aio.gracefully_cancel(*tasks) | ||
| except asyncio.TimeoutError: | ||
| raise APITimeoutError() from None | ||
| except aiohttp.ClientResponseError as e: | ||
| raise APIStatusError( | ||
| message=e.message, status_code=e.status, request_id=None, body=None | ||
| ) from None | ||
| except Exception as e: | ||
| raise APIConnectionError() from e | ||
| finally: | ||
| await decoder.aclose() | ||
|
|
||
| output_emitter.end_segment() |
There was a problem hiding this comment.
Ensure end_segment() is emitted even on WS failures.
start_segment() is called before the WS connect. If an exception is raised, end_segment() is skipped and downstream consumers may wait indefinitely. Move end_segment() into a finally block.
🧹 Always end the segment
- try:
- ws = await asyncio.wait_for(
- self._tts._session_manager.ensure_session().ws_connect(url, headers=headers),
- self._conn_options.timeout,
- )
- async with ws:
- tasks = [
- asyncio.create_task(send_task(ws)),
- asyncio.create_task(recv_task(ws)),
- asyncio.create_task(decode_task()),
- ]
- try:
- await asyncio.gather(*tasks)
- finally:
- await utils.aio.gracefully_cancel(*tasks)
- except asyncio.TimeoutError:
- raise APITimeoutError() from None
- except aiohttp.ClientResponseError as e:
- raise APIStatusError(
- message=e.message, status_code=e.status, request_id=None, body=None
- ) from None
- except Exception as e:
- raise APIConnectionError() from e
- finally:
- await decoder.aclose()
-
- output_emitter.end_segment()
+ try:
+ ws = await asyncio.wait_for(
+ self._tts._session_manager.ensure_session().ws_connect(url, headers=headers),
+ self._conn_options.timeout,
+ )
+ async with ws:
+ tasks = [
+ asyncio.create_task(send_task(ws)),
+ asyncio.create_task(recv_task(ws)),
+ asyncio.create_task(decode_task()),
+ ]
+ try:
+ await asyncio.gather(*tasks)
+ finally:
+ await utils.aio.gracefully_cancel(*tasks)
+ except asyncio.TimeoutError:
+ raise APITimeoutError() from None
+ except aiohttp.ClientResponseError as e:
+ raise APIStatusError(
+ message=e.message, status_code=e.status, request_id=None, body=None
+ ) from None
+ except Exception as e:
+ raise APIConnectionError() from e
+ finally:
+ await decoder.aclose()
+ output_emitter.end_segment()🤖 Prompt for AI Agents
In `@livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py` around
lines 139 - 212, The code currently calls
output_emitter.start_segment(segment_id=segment_id) before the WebSocket work
but only calls output_emitter.end_segment() after the try/except that can be
bypassed on exceptions; move the end_segment() call into a finally that always
runs (alongside the existing decoder.aclose() cleanup) so that
output_emitter.end_segment() executes regardless of WS/connect/handler failures
(update references around ensure_session().ws_connect, the ws async-with block,
and the finally that currently awaits decoder.aclose()).
|
@theomonnom Hi Théo, would love it if you or someone from your team could take a look at this PR and help review/approve it so we can get it merged and make the integration official. We’re contributing this from Telnyx and are excited to formally plug into the LiveKit ecosystem. There are already several companies in the ecosystem here (e.g. Twilio, Deepgram, Gladia), and we think this fits naturally alongside those and makes the platform more complete. Happy to make any changes or iterate based on feedback - just let us know |
| params = { | ||
| "transcription_engine": opts.transcription_engine, | ||
| "language": self._language, | ||
| "input_format": "wav", | ||
| } |
There was a problem hiding this comment.
🟡 interim_results STT option is stored but never sent to the Telnyx API
The interim_results parameter is accepted in the STT constructor (line 50) and stored in _STTOptions (line 67), but it is never included in the WebSocket connection parameters sent to the Telnyx API.
Click to expand
Issue Details
In _connect_ws (lines 248-254), the query parameters only include:
params = {
"transcription_engine": opts.transcription_engine,
"language": self._language,
"input_format": "wav",
}The opts.interim_results value is never used, meaning users who set interim_results=False expecting to only receive final transcripts will still receive interim results from the API (if Telnyx supports this parameter).
Impact
Users cannot control whether interim results are returned from the STT stream, despite the option being exposed in the API. The STTCapabilities is correctly set with the interim_results value, but the actual API behavior won't match this capability declaration.
Was this helpful? React with 👍 or 👎 to provide feedback.
9d9adaf to
cba6d85
Compare
Ensures output_emitter.end_segment() is always called even when exceptions occur during WebSocket handling. Fixes segment count mismatch error on TTS failures. Addresses review comment from Devin AI.
Wraps stream usage in try/finally to ensure aclose() is called, preventing WebSocket connection leaks. Addresses review comment from Devin AI.
- STT: redact transcript content from debug logs to avoid PII exposure - TTS: use lazy % formatting instead of f-string in logger.error
81652eb to
aa6f2e0
Compare
|
The failing CI checks (unit-tests, type-check) appear to be pre-existing issues in the main codebase, not related to this Telnyx plugin PR. The plugin-specific code passes ruff and has been E2E tested locally. |
There was a problem hiding this comment.
let's remove this example. it doesn't offer any additional value to readers other than the new addition of the model.. which will be available on our docs site
| for frame in audio_bstream.flush(): | ||
| await ws.send_bytes(frame.data.tobytes()) | ||
|
|
||
| await asyncio.sleep(1.0) |
There was a problem hiding this comment.
why is it necessary to sleep here?
There was a problem hiding this comment.
It is not, I have removed this. Thanks for the review.
…TS fixes - Remove example file (per David) - Fix pyproject.toml alphabetical ordering - TTS: preserve APIStatusError/APITimeoutError (don't wrap as generic APIConnectionError) - TTS: recreate _segments_ch for each retry attempt
37f715f to
86091b4
Compare
- STT: don't close WebSocket before server responds with transcript - TTS: initialize _segments_ch in _run() so it exists before first use
| except APIConnectionError: | ||
| raise | ||
| except Exception as e: | ||
| raise APIConnectionError() from e |
There was a problem hiding this comment.
🔴 TTS _run wraps APIStatusError from _run_ws in APIConnectionError, breaking status-code–based error handling
When _run_ws raises an APIStatusError (e.g. from aiohttp.ClientResponseError at line 207-209), that exception propagates through _run_segments into the outer _run method's asyncio.gather. The except chain in _run (lines 127-138) handles asyncio.TimeoutError, aiohttp.ClientResponseError, and APIConnectionError explicitly, but has no clause for APIStatusError. Since APIStatusError extends APIError (not APIConnectionError), it falls through to the catch-all except Exception as e at line 137 and gets re-wrapped as a generic APIConnectionError.
Root Cause and Impact
The exception hierarchy is: APIStatusError(APIError) and APIConnectionError(APIError) — they are siblings, not parent/child. So except APIConnectionError at line 135 does not catch APIStatusError.
The base class _main_task in livekit-agents/livekit/agents/tts/tts.py:476-479 relies on isinstance(e, APIStatusError) checks:
if isinstance(e, APIStatusError) and e.status_code == 499:
return # close gracefully without raisingWhen the original APIStatusError is wrapped in APIConnectionError, this isinstance check fails. A 499 (Client Closed Request) will not be handled gracefully — it will be retried and eventually raised as a hard failure instead of silently returning. Additionally, the HTTP status code and request ID are lost, degrading error diagnostics.
| except APIConnectionError: | |
| raise | |
| except Exception as e: | |
| raise APIConnectionError() from e | |
| except APIConnectionError: | |
| raise | |
| except APIStatusError: | |
| raise | |
| except Exception as e: | |
| raise APIConnectionError() from e | |
Was this helpful? React with 👍 or 👎 to provide feedback.
PR Description
Summary
Adds support for Telnyx as a supported vendor for Speech-to-Text (STT) and Text-to-Speech (TTS) within the LiveKit Agents framework. This includes a new dedicated plugin package and an example voice agent demonstrating the integration.
New Features
examples/voice_agents/telnyx_voice_agent.pyusing Telnyx for both STT and TTS alongside OpenAI for reasoning.Implementation Details
livekit-plugins-telnyxpackage underlivekit-plugins/.SessionManagerincommon.pyto handle sharedaiohttpclient sessions and API key resolution.pyproject.tomlworkspace.Documentation
Summary by CodeRabbit
New Features
Integrations
Chores
✏️ Tip: You can customize this high-level summary in your review settings.