-
Notifications
You must be signed in to change notification settings - Fork 1k
Python: add voice live agent #3071
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a new Python package for voice-based agent interactions using Azure Voice Live SDK, enabling real-time voice conversations with streaming audio, voice activity detection, and function calling support.
Key Changes
- Introduces
VoiceLiveAgentclass for real-time voice interactions with Azure OpenAI - Implements streaming audio capabilities with server-side voice activity detection (VAD)
- Adds WebSocket handler for browser-based voice interfaces
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 31 comments.
Show a summary per file
| File | Description |
|---|---|
| pyproject.toml | Package configuration defining dependencies and metadata for the azure-voice-live package |
| streaming_voice_chat.py | Example demonstrating streaming voice chat with microphone input, audio playback, and interruption support |
| websocket_handler.py | WebSocket handler bridging browser connections to Azure Voice Live SDK |
| _web/init.py | Web module initialization exposing VoiceWebSocketHandler |
| _voice_live_session.py | Session management for Azure Voice Live WebSocket connections |
| _voice_live_agent.py | Core VoiceLiveAgent implementation with streaming and function calling support |
| _types.py | Type definitions for AudioContent and VoiceOptions |
| _event_processor.py | Event processor converting Azure Voice Live events to Agent Framework updates |
| _audio_utils.py | Audio encoding/decoding utilities for PCM16 format handling |
| init.py | Package initialization exporting main public classes |
| README.md | Documentation describing package features and installation instructions |
| except Exception as e: | ||
| print(f"\n⚠️ Error playing audio: {e}") |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling uses a bare 'except Exception' clause that catches all exceptions. The error is printed but not logged, and execution continues, potentially masking critical issues.
| print(f"\n[DEBUG] Function call detected: {props.get('name')}") | ||
| # Store pending function call to execute after response_done | ||
| pending_function_call = { | ||
| "call_id": props.get("call_id"), | ||
| "name": props.get("name"), | ||
| "arguments": props.get("arguments") | ||
| } | ||
|
|
||
| # When response is done, execute any pending function call | ||
| elif event_type == "response_complete" and pending_function_call: | ||
| print(f"[DEBUG] Response done, executing pending function call") | ||
| # Execute function and create new response | ||
| asyncio.create_task(self._handle_function_call_after_response( | ||
| pending_function_call["call_id"], | ||
| pending_function_call["name"], | ||
| pending_function_call["arguments"] | ||
| )) | ||
| pending_function_call = None | ||
|
|
||
| yield update | ||
|
|
||
| async def _handle_function_call_after_response(self, call_id: str, name: str, arguments: str) -> None: | ||
| """Handle function call execution after response is done, then trigger new response. | ||
|
|
||
| Args: | ||
| call_id: Function call ID | ||
| name: Function name | ||
| arguments: JSON string of arguments | ||
| """ | ||
| import json | ||
|
|
||
| print(f"[DEBUG] Executing function: {name} with call_id={call_id}, args={arguments}") | ||
|
|
||
| try: | ||
| # Parse arguments | ||
| args_dict = json.loads(arguments) if arguments else {} | ||
|
|
||
| # Find the function | ||
| function = None | ||
| for tool in self._tools: | ||
| if tool.name == name: | ||
| function = tool | ||
| break | ||
|
|
||
| if not function: | ||
| result = f"Error: Function '{name}' not found" | ||
| print(f"[DEBUG] Function not found: {name}") | ||
| else: | ||
| # Execute the function | ||
| print(f"[DEBUG] Calling function {name} with args: {args_dict}") | ||
| result = await function(**args_dict) | ||
| print(f"[DEBUG] Function {name} returned: {result}") | ||
|
|
||
| # Send result back | ||
| print(f"[DEBUG] Sending function result for call_id={call_id}") | ||
| await self._session.send_function_result(call_id, str(result)) | ||
| print(f"[DEBUG] Function result sent successfully") | ||
|
|
||
| # Now trigger a new response to process the function result | ||
| # This is safe because we waited for RESPONSE_DONE | ||
| print(f"[DEBUG] Creating new response to process function result") | ||
| await self._session.create_response() | ||
| print(f"[DEBUG] New response created") | ||
|
|
||
| except Exception as e: | ||
| error_msg = f"Error executing {name}: {e}" | ||
| print(f"[DEBUG] Exception in function execution: {e}") | ||
| import traceback | ||
| traceback.print_exc() | ||
| try: | ||
| await self._session.send_function_result(call_id, error_msg) | ||
| await self._session.create_response() | ||
| except Exception as e2: | ||
| print(f"[DEBUG] Failed to send error result: {e2}") | ||
|
|
||
| async def cancel_response(self) -> None: | ||
| """Cancel the ongoing agent response. | ||
|
|
||
| This is used for interruption handling - when the user starts speaking | ||
| while the agent is responding, call this to stop the agent's response. | ||
|
|
||
| Raises: | ||
| RuntimeError: If session is not connected | ||
|
|
||
| Example: | ||
| ```python | ||
| await agent.connect() | ||
|
|
||
| # If user interrupts, cancel the response | ||
| if user_started_speaking and agent_is_speaking: | ||
| await agent.cancel_response() | ||
|
|
||
| await agent.disconnect() | ||
| ``` | ||
| """ | ||
| if not self._session: | ||
| raise RuntimeError("Must call connect() before canceling response") | ||
|
|
||
| await self._session.cancel_response() | ||
|
|
||
| def _build_session_config(self) -> Any: | ||
| """Build Azure Voice Live session configuration. | ||
|
|
||
| Returns: | ||
| RequestSession configuration object | ||
| """ | ||
| from azure.ai.voicelive.models import ( | ||
| AzureStandardVoice, | ||
| InputAudioFormat, | ||
| Modality, | ||
| OutputAudioFormat, | ||
| RequestSession, | ||
| ServerVad, | ||
| ) | ||
|
|
||
| # Configure VAD | ||
| turn_detection = None | ||
| if self._enable_vad: | ||
| turn_detection = ServerVad( | ||
| threshold=self._vad_threshold, | ||
| prefix_padding_ms=self._vad_prefix_padding_ms, | ||
| silence_duration_ms=self._vad_silence_duration_ms, | ||
| ) | ||
|
|
||
| # Configure transcription | ||
| input_audio_transcription = None | ||
| if self._input_audio_transcription: | ||
| input_audio_transcription = {"model": "whisper-1"} | ||
|
|
||
| # Build session config | ||
| return RequestSession( | ||
| modalities=[Modality.TEXT, Modality.AUDIO], | ||
| instructions=self._instructions, | ||
| voice=AzureStandardVoice(name=self._voice), | ||
| input_audio_format=InputAudioFormat.PCM16, | ||
| output_audio_format=OutputAudioFormat.PCM16, | ||
| input_audio_transcription=input_audio_transcription, | ||
| turn_detection=turn_detection, | ||
| tools=self._convert_tools_to_azure_format(), | ||
| temperature=self._temperature, | ||
| max_response_output_tokens=self._max_response_tokens, | ||
| ) | ||
|
|
||
| def _convert_tools_to_azure_format(self) -> list[Any]: | ||
| """Convert AIFunction tools to Azure Voice Live format. | ||
|
|
||
| Returns: | ||
| List of FunctionTool objects in Azure format | ||
| """ | ||
| from azure.ai.voicelive.models import FunctionTool | ||
|
|
||
| azure_tools = [] | ||
|
|
||
| for tool in self._tools: | ||
| # Get the JSON schema from the tool | ||
| if hasattr(tool, 'to_json_schema_spec'): | ||
| schema = tool.to_json_schema_spec() | ||
|
|
||
| # Extract function details from schema | ||
| func_spec = schema.get('function', {}) | ||
|
|
||
| # Create Azure FunctionTool using dict-style assignment | ||
| azure_tool = FunctionTool() | ||
| azure_tool['type'] = 'function' | ||
| azure_tool['name'] = func_spec.get('name', tool.name) | ||
| azure_tool['description'] = func_spec.get('description', '') | ||
| azure_tool['parameters'] = func_spec.get('parameters', {}) | ||
|
|
||
| azure_tools.append(azure_tool) | ||
| print(f"[DEBUG] Tool converted: {tool.name} -> {dict(azure_tool)}") | ||
| else: | ||
| # Fallback for non-AIFunction tools | ||
| azure_tool = FunctionTool() | ||
| azure_tool['type'] = 'function' | ||
| azure_tool['name'] = getattr(tool, 'name', 'unknown') | ||
| azure_tool['description'] = getattr(tool, 'description', '') | ||
| azure_tool['parameters'] = tool.parameters() if callable(getattr(tool, 'parameters', None)) else {} | ||
|
|
||
| azure_tools.append(azure_tool) | ||
| print(f"[DEBUG] Tool converted (fallback): {dict(azure_tool)}") | ||
|
|
||
| print(f"[DEBUG] Total tools converted: {len(azure_tools)}") | ||
| return azure_tools |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statements should be removed or replaced with proper logging before production use. These print statements are scattered throughout the codebase and can clutter output.
| self, session: VoiceLiveSession, input: str | bytes | ||
| ) -> AsyncIterable[AgentRunResponseUpdate]: | ||
| """Internal method to stream with a given session.""" | ||
| # Send input | ||
| if isinstance(input, str): | ||
| await session.send_text(input) | ||
| await session.create_response() | ||
| else: | ||
| await session.send_audio(input, commit=self._enable_vad) |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function accepts 'input' as a parameter name which shadows the built-in Python function. Consider renaming to 'user_input', 'query', or 'message' to avoid confusion.
| self, session: VoiceLiveSession, input: str | bytes | |
| ) -> AsyncIterable[AgentRunResponseUpdate]: | |
| """Internal method to stream with a given session.""" | |
| # Send input | |
| if isinstance(input, str): | |
| await session.send_text(input) | |
| await session.create_response() | |
| else: | |
| await session.send_audio(input, commit=self._enable_vad) | |
| self, session: VoiceLiveSession, user_input: str | bytes | |
| ) -> AsyncIterable[AgentRunResponseUpdate]: | |
| """Internal method to stream with a given session.""" | |
| # Send input | |
| if isinstance(user_input, str): | |
| await session.send_text(user_input) | |
| await session.create_response() | |
| else: | |
| await session.send_audio(user_input, commit=self._enable_vad) |
| except Exception as e: | ||
| error_msg = f"Error executing {name}: {e}" | ||
| print(f"[DEBUG] Exception in function execution: {e}") | ||
| import traceback | ||
| traceback.print_exc() | ||
| try: | ||
| await self._session.send_function_result(call_id, error_msg) | ||
| await self._session.create_response() | ||
| except Exception as e2: | ||
| print(f"[DEBUG] Failed to send error result: {e2}") |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling uses a bare 'except Exception' clause that catches and prints all exceptions with traceback. This should use proper logging and potentially re-raise critical exceptions rather than continuing execution.
| except Exception as e: | ||
| error_msg = f"Expert agent error: {str(e)}" | ||
| print(f" ❌ [Expert Agent] {error_msg}") | ||
| import traceback | ||
| traceback.print_exc() | ||
| return error_msg |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling uses a bare 'except Exception' clause that catches and prints all exceptions with traceback. This should use proper logging and potentially re-raise critical exceptions rather than continuing execution.
| class AudioPlayer: | ||
| """Handles real-time audio playback with interruption support.""" | ||
|
|
||
| def __init__(self): | ||
| self.audio = pyaudio.PyAudio() | ||
| self.stream = None | ||
| self.queue = queue.Queue() | ||
| self.playing = False | ||
| self.chunks_played = 0 | ||
|
|
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class lacks proper documentation. According to guidelines, all public classes should have docstrings explaining their purpose and usage.
| print(f"Error receiving from browser: {e}") | ||
|
|
||
| async def _send_to_browser(self, websocket: Any, session: VoiceLiveSession) -> None: | ||
| """Receive from Azure and forward to browser. | ||
|
|
||
| Args: | ||
| websocket: FastAPI WebSocket instance | ||
| session: VoiceLiveSession instance | ||
| """ | ||
| try: | ||
| from azure.ai.voicelive.models import ServerEventType | ||
|
|
||
| async for event in session._connection: | ||
| event_type = event.type | ||
|
|
||
| if event_type == ServerEventType.RESPONSE_AUDIO_DELTA: | ||
| # Send audio chunk to browser | ||
| if hasattr(event, "delta") and event.delta: | ||
| await websocket.send_bytes(event.delta) | ||
|
|
||
| elif event_type == ServerEventType.RESPONSE_AUDIO_TRANSCRIPT_DELTA: | ||
| # Send transcript delta | ||
| if hasattr(event, "delta") and event.delta: | ||
| await websocket.send_json({"type": "transcript", "text": event.delta}) | ||
|
|
||
| elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: | ||
| # User started speaking | ||
| await websocket.send_json({"type": "speech_started"}) | ||
|
|
||
| elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: | ||
| # User stopped speaking | ||
| await websocket.send_json({"type": "speech_stopped"}) | ||
|
|
||
| elif event_type == ServerEventType.RESPONSE_CREATED: | ||
| # Response started | ||
| response_id = event.response.id if hasattr(event.response, "id") else None | ||
| await websocket.send_json({"type": "response_started", "response_id": response_id}) | ||
|
|
||
| elif event_type == ServerEventType.RESPONSE_DONE: | ||
| # Response complete | ||
| await websocket.send_json({"type": "response_complete"}) | ||
|
|
||
| elif event_type == ServerEventType.ERROR: | ||
| # Error event | ||
| error_msg = str(event.error) if hasattr(event, "error") else "Unknown error" | ||
| await websocket.send_json({"type": "error", "message": error_msg}) | ||
|
|
||
| except Exception as e: | ||
| print(f"Error sending to browser: {e}") |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statements should be removed or replaced with proper logging before production use. These print statements are scattered throughout the codebase and can clutter output.
| except Exception as e: | ||
| print(f"\n❌ Playback loop error: {e}") | ||
| import traceback | ||
| traceback.print_exc() |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling uses a bare 'except Exception' clause that catches and prints all exceptions with traceback. This should use proper logging and potentially re-raise critical exceptions rather than continuing execution.
|
|
||
| dependencies = [ | ||
| "agent-framework>=0.1.0", | ||
| "azure-ai-voicelive", # Azure Voice Live SDK (preview) |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment uses an incorrect comment format. Python sample code should use standard comment syntax without special markdown formatting.
| "azure-ai-voicelive", # Azure Voice Live SDK (preview) | |
| "azure-ai-voicelive", |
|
|
||
| import base64 | ||
| import wave | ||
| from typing import BinaryIO |
Copilot
AI
Jan 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'BinaryIO' is not used.
| from typing import BinaryIO |
|
@szhaomsft while we appreciate the initiative to add this, we are working on a more broadly compatible design for voice live and other realtime services, see the WIP here: https://github.com/eavanvalkenburg/agent-framework/blob/voice_agents/docs/decisions/00XX-realtime-agents.md so we first want to decide how we want to abstract this as a whole before we ship something that we then have to change significantly. |
Motivation and Context
add voice live agent