Add Google Dialogflow CX plugin for LiveKit Agents#4800
Add Google Dialogflow CX plugin for LiveKit Agents#4800
Conversation
Implements a new LiveKit Agents plugin that integrates Google Dialogflow CX as an LLM provider in STT -> LLM -> TTS pipelines. Dialogflow CX is an intent-based conversational AI engine that returns full responses (no streaming). The plugin includes: - LLM class wrapping SessionsAsyncClient with detectIntent API - DialogflowLLMStream emitting single ChatChunk responses - Error mapping from Google API exceptions to LiveKit error types - Dialogflow metadata (intent confidence, match type, page) in ChoiceDelta.extra - Custom payload forwarding for structured Dialogflow responses - Example voice agent demonstrating usage https://claude.ai/code/session_01HTkStArHmt7uy1zX51pEvp
22 tests covering: - LLM instantiation, env var fallbacks, validation - chat() method returning correct LLMStream subclass - Full _run() flow: response extraction, usage emission, metadata - Custom payload forwarding from Dialogflow responses - Session path construction with/without environment_id - Session ID passthrough via extra_kwargs - Latest user message extraction from multi-turn context - Language code propagation - Error mapping for all Google API exception types https://claude.ai/code/session_01HTkStArHmt7uy1zX51pEvp
|
|
| async def aclose(self) -> None: | ||
| pass |
There was a problem hiding this comment.
🔴 aclose() is a no-op, leaking the gRPC client/channel
The LLM.aclose() method at line 150-151 does nothing (pass), but the SessionsAsyncClient created at line 113 opens a gRPC channel that should be closed when the LLM is no longer needed.
Resource Leak Details
The SessionsAsyncClient from google-cloud-dialogflow-cx establishes a gRPC transport/channel on creation. Without closing it, the channel remains open indefinitely. In a long-running AgentServer, if LLM instances are created and discarded (e.g., per-session agents as shown in the example at examples/voice_agents/dialogflow_agent.py:48), each will leak a gRPC channel.
The base class llm.LLM defines aclose() at livekit-agents/livekit/agents/llm/llm.py:144 and it is called via __aexit__ at line 155. The Dialogflow plugin should close self._client in its override.
Impact: gRPC channel and associated resources (file descriptors, memory) are never released, which can cause resource exhaustion in long-running processes.
| async def aclose(self) -> None: | |
| pass | |
| async def aclose(self) -> None: | |
| await self._client.transport.close() | |
Was this helpful? React with 👍 or 👎 to provide feedback.
| except (InvalidArgument, NotFound, PermissionDenied) as e: | ||
| raise APIStatusError( | ||
| f"dialogflow: {type(e).__name__}", | ||
| status_code=400, | ||
| body=str(e), | ||
| request_id=request_id, | ||
| retryable=False, | ||
| ) from e |
There was a problem hiding this comment.
🟡 NotFound and PermissionDenied exceptions mapped to wrong HTTP status code 400
NotFound and PermissionDenied exceptions are caught together with InvalidArgument and all mapped to status_code=400. NotFound should be 404 and PermissionDenied should be 403.
Incorrect Status Code Mapping
At lines 223-230, the code catches (InvalidArgument, NotFound, PermissionDenied) as a group and assigns status_code=400 to all of them:
except (InvalidArgument, NotFound, PermissionDenied) as e:
raise APIStatusError(
f"dialogflow: {type(e).__name__}",
status_code=400, # wrong for NotFound (404) and PermissionDenied (403)
...
)While retryable=False is explicitly set (so retryability is unaffected), the status code is used in APIStatusError.__str__ (at livekit-agents/livekit/agents/_exceptions.py:74-81) and exposed for observability/logging. Incorrect status codes make debugging harder — a NotFound error appearing as a 400 obscures whether the agent ID is wrong vs. the request payload being invalid.
Impact: Misleading diagnostic information in logs and error messages; developers cannot distinguish between a bad request and a missing resource from the status code alone.
| except (InvalidArgument, NotFound, PermissionDenied) as e: | |
| raise APIStatusError( | |
| f"dialogflow: {type(e).__name__}", | |
| status_code=400, | |
| body=str(e), | |
| request_id=request_id, | |
| retryable=False, | |
| ) from e | |
| except InvalidArgument as e: | |
| raise APIStatusError( | |
| f"dialogflow: {type(e).__name__}", | |
| status_code=400, | |
| body=str(e), | |
| request_id=request_id, | |
| retryable=False, | |
| ) from e | |
| except NotFound as e: | |
| raise APIStatusError( | |
| f"dialogflow: {type(e).__name__}", | |
| status_code=404, | |
| body=str(e), | |
| request_id=request_id, | |
| retryable=False, | |
| ) from e | |
| except PermissionDenied as e: | |
| raise APIStatusError( | |
| f"dialogflow: {type(e).__name__}", | |
| status_code=403, | |
| body=str(e), | |
| request_id=request_id, | |
| retryable=False, | |
| ) from e |
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
This PR introduces a new LiveKit Agents plugin that integrates Google Dialogflow CX as an LLM provider. Dialogflow CX is an intent-based conversational AI engine where all conversation logic is configured in the Dialogflow CX console rather than through agent instructions.
Key Changes
New plugin package:
livekit-plugins-dialogflowwith full LLM integrationLLMclass implementing the LiveKitllm.LLMinterface for Dialogflow CXDialogflowLLMStreamclass handling request/response streamingGOOGLE_CLOUD_PROJECT,GOOGLE_CLOUD_LOCATION) with explicit parameter overridesCore functionality:
Session management:
extra_kwargsfor multi-turn conversationsComprehensive test suite (416 lines):
Documentation and examples:
dialogflow_agent.py) demonstrating STT → Dialogflow → TTS pipelineNotable Implementation Details
ChatChunk.google-cloud-dialogflow-cxSDK.https://claude.ai/code/session_01HTkStArHmt7uy1zX51pEvp