From d149370fb1a220e0bf3b8525364409ac69e7e64b Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Sun, 19 Oct 2025 17:45:28 +0200 Subject: [PATCH 01/15] Add streaming support for Responses API --- README.md | 25 ++ .../24_responses_streaming.py | 96 ++++++++ openhands/sdk/__init__.py | 4 + openhands/sdk/agent/agent.py | 9 +- openhands/sdk/agent/base.py | 9 +- openhands/sdk/conversation/__init__.py | 6 +- openhands/sdk/conversation/conversation.py | 11 +- .../conversation/impl/local_conversation.py | 41 +++- .../conversation/impl/remote_conversation.py | 13 +- openhands/sdk/conversation/types.py | 5 + openhands/sdk/conversation/visualizer.py | 82 ++++++- openhands/sdk/event/__init__.py | 2 + openhands/sdk/event/streaming.py | 33 +++ openhands/sdk/llm/__init__.py | 3 + openhands/sdk/llm/llm.py | 215 +++++++++++++++++- openhands/sdk/llm/streaming.py | 35 +++ .../sdk/conversation/test_streaming_events.py | 183 +++++++++++++++ .../llm/test_responses_parsing_and_kwargs.py | 87 ++++++- 18 files changed, 837 insertions(+), 22 deletions(-) create mode 100644 examples/01_standalone_sdk/24_responses_streaming.py create mode 100644 openhands/sdk/event/streaming.py create mode 100644 openhands/sdk/llm/streaming.py create mode 100644 tests/sdk/conversation/test_streaming_events.py diff --git a/README.md b/README.md index f4e0cf30d5..7d7f689933 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,31 @@ registry.add("default", llm) llm = registry.get("default") ``` +### Streaming Responses + +You can receive incremental deltas from the Responses API by supplying a token +callback when constructing a conversation. Each callback receives an +``LLMStreamEvent`` describing the delta. + +```python +from pathlib import Path +from openhands.sdk import Conversation, LLMStreamEvent + +log_dir = Path("logs/stream") +log_dir.mkdir(parents=True, exist_ok=True) + +def on_token(event: LLMStreamEvent) -> None: + print(event.text or event.arguments or "", end="", flush=True) + +conversation = Conversation(agent=agent, token_callbacks=[on_token]) +conversation.send_message("Summarize the benefits of token streaming.") +conversation.run() +``` + +See `examples/01_standalone_sdk/24_responses_streaming.py` for a complete +example that also persists each delta as JSON in `./logs/stream/`. + + ### Tools Tools provide agents with capabilities to interact with the environment. The SDK includes several built-in tools: diff --git a/examples/01_standalone_sdk/24_responses_streaming.py b/examples/01_standalone_sdk/24_responses_streaming.py new file mode 100644 index 0000000000..9a87cbe4ba --- /dev/null +++ b/examples/01_standalone_sdk/24_responses_streaming.py @@ -0,0 +1,96 @@ +"""Streaming Responses API example. + +This demonstrates how to enable token streaming for the Responses API path, +log streaming deltas to ``./logs/stream/`` as JSON, and print the streamed text +incrementally to the terminal. +""" + +from __future__ import annotations + +import datetime as _dt +import json +import os +from pathlib import Path +from typing import Any + +from pydantic import SecretStr + +from openhands.sdk import Conversation, LLMStreamEvent, get_logger +from openhands.sdk.llm import LLM +from openhands.tools.preset.default import get_default_agent + + +logger = get_logger(__name__) +LOG_DIR = Path("logs/stream") + + +def _serialize_event(event: LLMStreamEvent) -> dict[str, Any]: + record = { + "type": event.type, + "text": event.text, + "arguments": event.arguments, + "output_index": event.output_index, + "content_index": event.content_index, + "item_id": event.item_id, + "is_final": event.is_final, + } + return record + + +def main() -> None: + api_key = os.getenv("LLM_API_KEY") or os.getenv("OPENAI_API_KEY") + if not api_key: + raise RuntimeError("Set LLM_API_KEY or OPENAI_API_KEY in your environment.") + + model = os.getenv("LLM_MODEL", "openhands/gpt-5-codex") + base_url = os.getenv("LLM_BASE_URL") + + llm = LLM( + model=model, + api_key=SecretStr(api_key), + base_url=base_url, + service_id="stream-demo", + ) + + agent = get_default_agent(llm=llm, cli_mode=True) + + timestamp = _dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S") + LOG_DIR.mkdir(parents=True, exist_ok=True) + log_path = LOG_DIR / f"responses_stream_{timestamp}.jsonl" + + def on_token(event: LLMStreamEvent) -> None: + record = _serialize_event(event) + with log_path.open("a", encoding="utf-8") as fp: + fp.write(json.dumps(record) + "\n") + + stream_chunk = event.text or event.arguments + if stream_chunk: + print(stream_chunk, end="", flush=True) + if event.is_final: + print("\n--- stream complete ---") + + conversation = Conversation( + agent=agent, + workspace=os.getcwd(), + token_callbacks=[on_token], + ) + + story_prompt = ( + "Tell me a long story about LLM streaming, make sure it has multiple " + "paragraphs. Then write it on disk using a tool call." + ) + conversation.send_message(story_prompt) + conversation.run() + + cleanup_prompt = ( + "Thank you. Please delete streaming_story.md now that I've read it, " + "then confirm the deletion." + ) + conversation.send_message(cleanup_prompt) + conversation.run() + + logger.info("Stream log written to %s", log_path) + + +if __name__ == "__main__": + main() diff --git a/openhands/sdk/__init__.py b/openhands/sdk/__init__.py index 07ec5f48a1..6d0c771eab 100644 --- a/openhands/sdk/__init__.py +++ b/openhands/sdk/__init__.py @@ -20,11 +20,13 @@ LLM, ImageContent, LLMRegistry, + LLMStreamEvent, Message, RedactedThinkingBlock, RegistryEvent, TextContent, ThinkingBlock, + TokenCallbackType, ) from openhands.sdk.logger import get_logger from openhands.sdk.mcp import ( @@ -58,6 +60,8 @@ __all__ = [ "LLM", "LLMRegistry", + "LLMStreamEvent", + "TokenCallbackType", "ConversationStats", "RegistryEvent", "Message", diff --git a/openhands/sdk/agent/agent.py b/openhands/sdk/agent/agent.py index 38378eba5e..d75e7ca834 100644 --- a/openhands/sdk/agent/agent.py +++ b/openhands/sdk/agent/agent.py @@ -5,7 +5,11 @@ import openhands.sdk.security.risk as risk from openhands.sdk.agent.base import AgentBase from openhands.sdk.context.view import View -from openhands.sdk.conversation import ConversationCallbackType, ConversationState +from openhands.sdk.conversation import ( + ConversationCallbackType, + ConversationState, + ConversationTokenCallbackType, +) from openhands.sdk.conversation.state import AgentExecutionStatus from openhands.sdk.event import ( ActionEvent, @@ -133,6 +137,7 @@ def step( self, state: ConversationState, on_event: ConversationCallbackType, + on_token: ConversationTokenCallbackType | None = None, ) -> None: # Check for pending actions (implicit confirmation) # and execute them before sampling new actions. @@ -182,6 +187,7 @@ def step( store=False, add_security_risk_prediction=self._add_security_risk_prediction, metadata=self.llm.metadata, + on_token=on_token, ) else: llm_response = self.llm.completion( @@ -189,6 +195,7 @@ def step( tools=list(self.tools_map.values()), extra_body={"metadata": self.llm.metadata}, add_security_risk_prediction=self._add_security_risk_prediction, + on_token=on_token, ) except Exception as e: # If there is a condenser registered and the exception is a context window diff --git a/openhands/sdk/agent/base.py b/openhands/sdk/agent/base.py index a5e52809a0..8913b70b03 100644 --- a/openhands/sdk/agent/base.py +++ b/openhands/sdk/agent/base.py @@ -22,7 +22,10 @@ if TYPE_CHECKING: from openhands.sdk.conversation.state import ConversationState - from openhands.sdk.conversation.types import ConversationCallbackType + from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, + ) logger = get_logger(__name__) @@ -236,6 +239,7 @@ def step( self, state: "ConversationState", on_event: "ConversationCallbackType", + on_token: "ConversationTokenCallbackType | None" = None, ) -> None: """Taking a step in the conversation. @@ -247,6 +251,9 @@ def step( 4.1 If conversation is finished, set state.agent_status to FINISHED 4.2 Otherwise, just return, Conversation will kick off the next step + If the underlying LLM supports streaming, partial deltas are forwarded to + ``on_token`` before the full response is returned. + NOTE: state will be mutated in-place. """ diff --git a/openhands/sdk/conversation/__init__.py b/openhands/sdk/conversation/__init__.py index a9213fa94a..56dcae42e6 100644 --- a/openhands/sdk/conversation/__init__.py +++ b/openhands/sdk/conversation/__init__.py @@ -7,7 +7,10 @@ from openhands.sdk.conversation.secrets_manager import SecretsManager from openhands.sdk.conversation.state import ConversationState from openhands.sdk.conversation.stuck_detector import StuckDetector -from openhands.sdk.conversation.types import ConversationCallbackType +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, +) from openhands.sdk.conversation.visualizer import ConversationVisualizer @@ -16,6 +19,7 @@ "BaseConversation", "ConversationState", "ConversationCallbackType", + "ConversationTokenCallbackType", "ConversationVisualizer", "SecretsManager", "StuckDetector", diff --git a/openhands/sdk/conversation/conversation.py b/openhands/sdk/conversation/conversation.py index 03e76e8b45..3227254610 100644 --- a/openhands/sdk/conversation/conversation.py +++ b/openhands/sdk/conversation/conversation.py @@ -3,7 +3,11 @@ from openhands.sdk.agent.base import AgentBase from openhands.sdk.conversation.base import BaseConversation from openhands.sdk.conversation.secrets_manager import SecretValue -from openhands.sdk.conversation.types import ConversationCallbackType, ConversationID +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationID, + ConversationTokenCallbackType, +) from openhands.sdk.logger import get_logger from openhands.sdk.workspace import LocalWorkspace, RemoteWorkspace @@ -32,6 +36,7 @@ def __new__( persistence_dir: str | None = None, conversation_id: ConversationID | None = None, callbacks: list[ConversationCallbackType] | None = None, + token_callbacks: list[ConversationTokenCallbackType] | None = None, max_iteration_per_run: int = 500, stuck_detection: bool = True, visualize: bool = True, @@ -46,6 +51,7 @@ def __new__( workspace: RemoteWorkspace, conversation_id: ConversationID | None = None, callbacks: list[ConversationCallbackType] | None = None, + token_callbacks: list[ConversationTokenCallbackType] | None = None, max_iteration_per_run: int = 500, stuck_detection: bool = True, visualize: bool = True, @@ -60,6 +66,7 @@ def __new__( persistence_dir: str | None = None, conversation_id: ConversationID | None = None, callbacks: list[ConversationCallbackType] | None = None, + token_callbacks: list[ConversationTokenCallbackType] | None = None, max_iteration_per_run: int = 500, stuck_detection: bool = True, visualize: bool = True, @@ -81,6 +88,7 @@ def __new__( agent=agent, conversation_id=conversation_id, callbacks=callbacks, + token_callbacks=token_callbacks, max_iteration_per_run=max_iteration_per_run, stuck_detection=stuck_detection, visualize=visualize, @@ -92,6 +100,7 @@ def __new__( agent=agent, conversation_id=conversation_id, callbacks=callbacks, + token_callbacks=token_callbacks, max_iteration_per_run=max_iteration_per_run, stuck_detection=stuck_detection, visualize=visualize, diff --git a/openhands/sdk/conversation/impl/local_conversation.py b/openhands/sdk/conversation/impl/local_conversation.py index 2ea670f5cc..46366ecf57 100644 --- a/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands/sdk/conversation/impl/local_conversation.py @@ -8,14 +8,19 @@ from openhands.sdk.conversation.state import AgentExecutionStatus, ConversationState from openhands.sdk.conversation.stuck_detector import StuckDetector from openhands.sdk.conversation.title_utils import generate_conversation_title -from openhands.sdk.conversation.types import ConversationCallbackType, ConversationID +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationID, + ConversationTokenCallbackType, +) from openhands.sdk.conversation.visualizer import create_default_visualizer from openhands.sdk.event import ( MessageEvent, PauseEvent, + StreamingDeltaEvent, UserRejectObservation, ) -from openhands.sdk.llm import LLM, Message, TextContent +from openhands.sdk.llm import LLM, LLMStreamEvent, Message, TextContent from openhands.sdk.llm.llm_registry import LLMRegistry from openhands.sdk.logger import get_logger from openhands.sdk.security.confirmation_policy import ( @@ -35,6 +40,7 @@ def __init__( persistence_dir: str | None = None, conversation_id: ConversationID | None = None, callbacks: list[ConversationCallbackType] | None = None, + token_callbacks: list[ConversationTokenCallbackType] | None = None, max_iteration_per_run: int = 500, stuck_detection: bool = True, visualize: bool = True, @@ -110,6 +116,31 @@ def _default_callback(e): for llm in list(self.agent.get_all_llms()): self.llm_registry.add(llm) + def _compose_token_callbacks( + callbacks: list[ConversationTokenCallbackType], + ) -> ConversationTokenCallbackType: + def _composed(event): + for cb in callbacks: + cb(event) + + return _composed + + user_token_callback = ( + _compose_token_callbacks(token_callbacks) if token_callbacks else None + ) + + def _handle_stream_event(stream_event: LLMStreamEvent) -> None: + try: + self._on_event( + StreamingDeltaEvent(source="agent", stream_event=stream_event) + ) + except Exception: + logger.exception("stream_event_processing_error", exc_info=True) + if user_token_callback: + user_token_callback(stream_event) + + self._on_token = _handle_stream_event + # Initialize secrets if provided if secrets: # Convert dict[str, str] to dict[str, SecretValue] @@ -242,7 +273,11 @@ def run(self) -> None: self._state.agent_status = AgentExecutionStatus.RUNNING # step must mutate the SAME state object - self.agent.step(self._state, on_event=self._on_event) + self.agent.step( + self._state, + on_event=self._on_event, + on_token=self._on_token, + ) iteration += 1 # Check for non-finished terminal conditions diff --git a/openhands/sdk/conversation/impl/remote_conversation.py b/openhands/sdk/conversation/impl/remote_conversation.py index 8b0e8710aa..33462617dd 100644 --- a/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands/sdk/conversation/impl/remote_conversation.py @@ -15,7 +15,11 @@ from openhands.sdk.conversation.events_list_base import EventsListBase from openhands.sdk.conversation.secrets_manager import SecretValue from openhands.sdk.conversation.state import AgentExecutionStatus -from openhands.sdk.conversation.types import ConversationCallbackType, ConversationID +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationID, + ConversationTokenCallbackType, +) from openhands.sdk.conversation.visualizer import create_default_visualizer from openhands.sdk.event.base import Event from openhands.sdk.event.conversation_state import ( @@ -378,6 +382,7 @@ def __init__( workspace: RemoteWorkspace, conversation_id: ConversationID | None = None, callbacks: list[ConversationCallbackType] | None = None, + token_callbacks: list[ConversationTokenCallbackType] | None = None, max_iteration_per_run: int = 500, stuck_detection: bool = True, visualize: bool = False, @@ -398,6 +403,12 @@ def __init__( stuck_detection: Whether to enable stuck detection on server visualize: Whether to enable the default visualizer callback """ + if token_callbacks: + logger.warning( + "Token streaming callbacks are not yet supported for remote " + "conversations; they will be ignored." + ) + self.agent = agent self._callbacks = callbacks or [] self.max_iteration_per_run = max_iteration_per_run diff --git a/openhands/sdk/conversation/types.py b/openhands/sdk/conversation/types.py index d10b085666..f84c4080c3 100644 --- a/openhands/sdk/conversation/types.py +++ b/openhands/sdk/conversation/types.py @@ -2,9 +2,14 @@ from collections.abc import Callable from openhands.sdk.event.base import Event +from openhands.sdk.llm.streaming import TokenCallbackType ConversationCallbackType = Callable[[Event], None] +"""Type alias for event callback functions.""" + +ConversationTokenCallbackType = TokenCallbackType +"""Callback type invoked for streaming LLM deltas.""" ConversationID = uuid.UUID """Type alias for conversation IDs.""" diff --git a/openhands/sdk/conversation/visualizer.py b/openhands/sdk/conversation/visualizer.py index b6bf61e1fe..75e4b5adcc 100644 --- a/openhands/sdk/conversation/visualizer.py +++ b/openhands/sdk/conversation/visualizer.py @@ -1,7 +1,8 @@ import re -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from rich.console import Console +from rich.live import Live from rich.panel import Panel from rich.text import Text @@ -11,11 +12,13 @@ MessageEvent, ObservationEvent, PauseEvent, + StreamingDeltaEvent, SystemPromptEvent, UserRejectObservation, ) from openhands.sdk.event.base import Event from openhands.sdk.event.condenser import Condensation +from openhands.sdk.llm.streaming import StreamChannel if TYPE_CHECKING: @@ -47,6 +50,15 @@ r"\*(.*?)\*": "italic", } +STREAM_CHANNEL_HEADERS: dict[StreamChannel, tuple[str, str]] = { + "assistant_message": ("Assistant", _ACTION_COLOR), + "reasoning_summary": ("Reasoning", _THOUGHT_COLOR), + "function_call_arguments": ("Function Arguments", _ACTION_COLOR), + "refusal": ("Refusal", _ERROR_COLOR), + "tool_call_output": ("Tool Output", _ACTION_COLOR), +} + + _PANEL_PADDING = (1, 1) @@ -75,11 +87,21 @@ def __init__( """ self._console = Console() self._skip_user_messages = skip_user_messages - self._highlight_patterns: dict[str, str] = highlight_regex or {} + base_patterns = dict(DEFAULT_HIGHLIGHT_REGEX) + if highlight_regex: + base_patterns.update(highlight_regex) + self._highlight_patterns = base_patterns self._conversation_stats = conversation_stats + self._stream_state: dict[ + tuple[StreamChannel, int | None, str | None], dict[str, Any] + ] = {} def on_event(self, event: Event) -> None: """Main event handler that displays events with Rich formatting.""" + if isinstance(event, StreamingDeltaEvent): + self._render_streaming_event(event) + return + panel = self._create_event_panel(event) if panel: self._console.print(panel) @@ -107,6 +129,54 @@ def _apply_highlighting(self, text: Text) -> Text: return highlighted + def _render_streaming_event(self, event: StreamingDeltaEvent) -> None: + stream = event.stream_event + channel = stream.channel + + if channel == "status": + return + + header, color = STREAM_CHANNEL_HEADERS.get(channel, ("Streaming", "cyan")) + key = (channel, stream.output_index, stream.item_id) + state = self._stream_state.setdefault( + key, + { + "header_printed": False, + "buffer": "", + "header": header, + "color": color, + "live": None, + }, + ) + + if not state["header_printed"]: + self._console.print(Text(f"{header}:", style=f"bold {color}")) + state["header_printed"] = True + + delta_text = stream.text or stream.arguments + if delta_text: + state["buffer"] += delta_text + + live: Live | None = state.get("live") + if live is None: + live = Live( + Text(state["buffer"], style=str(color)), + console=self._console, + refresh_per_second=24, + transient=False, + ) + live.start() + state["live"] = live + else: + live.update(Text(state["buffer"], style=str(color))) + + if stream.is_final: + live = state.get("live") + if live is not None: + live.stop() + self._console.print() + self._stream_state.pop(key, None) + def _create_event_panel(self, event: Event) -> Panel | None: """Create a Rich Panel for the event with appropriate styling.""" # Use the event's visualize property for content @@ -163,6 +233,14 @@ def _create_event_panel(self, event: Event) -> Panel | None: padding=_PANEL_PADDING, expand=True, ) + elif isinstance(event, StreamingDeltaEvent): + return Panel( + event.visualize, + title="[bold cyan]Streaming Delta[/bold cyan]", + border_style="cyan", + padding=_PANEL_PADDING, + expand=True, + ) elif isinstance(event, MessageEvent): if ( self._skip_user_messages diff --git a/openhands/sdk/event/__init__.py b/openhands/sdk/event/__init__.py index 578afcbb8b..8ad6582274 100644 --- a/openhands/sdk/event/__init__.py +++ b/openhands/sdk/event/__init__.py @@ -14,6 +14,7 @@ SystemPromptEvent, UserRejectObservation, ) +from openhands.sdk.event.streaming import StreamingDeltaEvent from openhands.sdk.event.types import EventID, ToolCallID from openhands.sdk.event.user_action import PauseEvent @@ -28,6 +29,7 @@ "MessageEvent", "AgentErrorEvent", "UserRejectObservation", + "StreamingDeltaEvent", "PauseEvent", "Condensation", "CondensationRequest", diff --git a/openhands/sdk/event/streaming.py b/openhands/sdk/event/streaming.py new file mode 100644 index 0000000000..1c6bd9233d --- /dev/null +++ b/openhands/sdk/event/streaming.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from pydantic import Field +from rich.text import Text + +from openhands.sdk.event.base import Event +from openhands.sdk.event.types import SourceType +from openhands.sdk.llm.streaming import LLMStreamEvent, StreamChannel + + +class StreamingDeltaEvent(Event): + """Event emitted for each incremental LLM streaming delta.""" + + source: SourceType = Field(default="agent") + stream_event: LLMStreamEvent + + @property + def channel(self) -> StreamChannel: + return self.stream_event.channel + + @property + def visualize(self) -> Text: + content = Text() + content.append(f"Channel: {self.stream_event.channel}\n", style="bold") + + if self.stream_event.text: + content.append(self.stream_event.text) + elif self.stream_event.arguments: + content.append(self.stream_event.arguments) + else: + content.append("[no streaming content]") + + return content diff --git a/openhands/sdk/llm/__init__.py b/openhands/sdk/llm/__init__.py index fabed357d1..f5d47aefc4 100644 --- a/openhands/sdk/llm/__init__.py +++ b/openhands/sdk/llm/__init__.py @@ -12,6 +12,7 @@ content_to_str, ) from openhands.sdk.llm.router import RouterLLM +from openhands.sdk.llm.streaming import LLMStreamEvent, TokenCallbackType from openhands.sdk.llm.utils.metrics import Metrics, MetricsSnapshot from openhands.sdk.llm.utils.unverified_models import ( UNVERIFIED_MODELS_EXCLUDING_BEDROCK, @@ -34,6 +35,8 @@ "RedactedThinkingBlock", "ReasoningItemModel", "content_to_str", + "LLMStreamEvent", + "TokenCallbackType", "Metrics", "MetricsSnapshot", "VERIFIED_MODELS", diff --git a/openhands/sdk/llm/llm.py b/openhands/sdk/llm/llm.py index d0c45b516e..95093927fe 100644 --- a/openhands/sdk/llm/llm.py +++ b/openhands/sdk/llm/llm.py @@ -50,7 +50,11 @@ Timeout as LiteLLMTimeout, ) from litellm.responses.main import responses as litellm_responses -from litellm.types.llms.openai import ResponsesAPIResponse +from litellm.types.llms.openai import ( + ResponsesAPIResponse, + ResponsesAPIStreamEvents, + ResponsesAPIStreamingResponse, +) from litellm.types.utils import ModelResponse from litellm.utils import ( create_pretrained_tokenizer, @@ -67,6 +71,11 @@ Message, ) from openhands.sdk.llm.mixins.non_native_fc import NonNativeToolCallingMixin +from openhands.sdk.llm.streaming import ( + LLMStreamEvent, + StreamChannel, + TokenCallbackType, +) from openhands.sdk.llm.utils.metrics import Metrics, MetricsSnapshot from openhands.sdk.llm.utils.model_features import get_features from openhands.sdk.llm.utils.retry_mixin import RetryMixin @@ -89,6 +98,22 @@ LLMNoResponseError, ) +RESPONSES_COMPLETION_EVENT_TYPES = { + ResponsesAPIStreamEvents.RESPONSE_COMPLETED.value, + ResponsesAPIStreamEvents.RESPONSE_FAILED.value, + ResponsesAPIStreamEvents.RESPONSE_INCOMPLETE.value, +} +RESPONSES_FINAL_EVENT_TYPES = RESPONSES_COMPLETION_EVENT_TYPES | { + ResponsesAPIStreamEvents.FUNCTION_CALL_ARGUMENTS_DONE.value, + ResponsesAPIStreamEvents.MCP_CALL_ARGUMENTS_DONE.value, + ResponsesAPIStreamEvents.OUTPUT_TEXT_DONE.value, + ResponsesAPIStreamEvents.REFUSAL_DONE.value, + ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE.value, + ResponsesAPIStreamEvents.MCP_CALL_COMPLETED.value, + ResponsesAPIStreamEvents.MCP_CALL_FAILED.value, + ResponsesAPIStreamEvents.ERROR.value, +} + class LLM(BaseModel, RetryMixin, NonNativeToolCallingMixin): """Refactored LLM: simple `completion()`, centralized Telemetry, tiny helpers.""" @@ -385,6 +410,7 @@ def completion( tools: Sequence[ToolBase] | None = None, _return_metrics: bool = False, add_security_risk_prediction: bool = False, + on_token: TokenCallbackType | None = None, **kwargs, ) -> LLMResponse: """Single entry point for LLM completion. @@ -392,8 +418,8 @@ def completion( Normalize → (maybe) mock tools → transport → postprocess. """ # Check if streaming is requested - if kwargs.get("stream", False): - raise ValueError("Streaming is not supported") + if on_token is not None or kwargs.get("stream", False): + raise ValueError("Streaming is not supported for completion API yet") # 1) serialize messages formatted_messages = self.format_messages_for_llm(messages) @@ -507,16 +533,24 @@ def responses( store: bool | None = None, _return_metrics: bool = False, add_security_risk_prediction: bool = False, + on_token: TokenCallbackType | None = None, **kwargs, ) -> LLMResponse: """Alternative invocation path using OpenAI Responses API via LiteLLM. Maps Message[] -> (instructions, input[]) and returns LLMResponse. - Non-stream only for v1. + Streaming is enabled when ``on_token`` is provided. """ - # Streaming not yet supported - if kwargs.get("stream", False): - raise ValueError("Streaming is not supported for Responses API yet") + user_requested_stream = bool(kwargs.get("stream", False)) + if user_requested_stream and on_token is None: + raise ValueError( + "Streaming for Responses API requires an on_token callback" + ) + + if on_token is not None: + kwargs["stream"] = True + else: + kwargs.pop("stream", None) # Build instructions + input list using dedicated Responses formatter instructions, input_items = self.format_messages_for_responses(messages) @@ -561,7 +595,7 @@ def responses( retry_multiplier=self.retry_multiplier, retry_listener=self.retry_listener, ) - def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: + def _one_attempt(**retry_kwargs): final_kwargs = {**call_kwargs, **retry_kwargs} with self._litellm_modify_params_ctx(self.modify_params): with warnings.catch_warnings(): @@ -584,16 +618,24 @@ def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: seed=self.seed, **final_kwargs, ) + if self._is_responses_stream_result(ret): + return ret + assert isinstance(ret, ResponsesAPIResponse), ( f"Expected ResponsesAPIResponse, got {type(ret)}" ) # telemetry (latency, cost). Token usage mapping we handle after. assert self._telemetry is not None + self._telemetry.on_response(ret) return ret try: - resp: ResponsesAPIResponse = _one_attempt() + raw_resp = _one_attempt() + if self._is_responses_stream_result(raw_resp): + resp = self._consume_responses_stream(raw_resp, on_token=on_token) + else: + resp = cast(ResponsesAPIResponse, raw_resp) # Parse output -> Message (typed) # Cast to a typed sequence @@ -615,9 +657,162 @@ def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: self._telemetry.on_error(e) raise + @staticmethod + def _is_responses_stream_result(candidate: Any) -> bool: + if isinstance(candidate, ResponsesAPIResponse): + return False + return ( + hasattr(candidate, "__iter__") + and (hasattr(candidate, "__next__") or hasattr(candidate, "__aiter__")) + and hasattr(candidate, "finished") + ) + + def _consume_responses_stream( + self, + stream: Any, + *, + on_token: TokenCallbackType | None, + ) -> ResponsesAPIResponse: + final_response: ResponsesAPIResponse | None = None + for chunk in stream: + event = self._stream_event_from_responses_chunk(chunk) + if event is not None and on_token is not None: + on_token(event) + + if event is not None and event.type in RESPONSES_COMPLETION_EVENT_TYPES: + response_candidate = self._get_chunk_attr(chunk, "response") + if isinstance(response_candidate, ResponsesAPIResponse): + final_response = response_candidate + + if final_response is None: + completion_event = getattr(stream, "completed_response", None) + if completion_event is not None: + response_candidate = self._get_chunk_attr(completion_event, "response") + if isinstance(response_candidate, ResponsesAPIResponse): + final_response = response_candidate + + if final_response is None: + raise LLMNoResponseError( + "Streaming ended without a completion event from the provider." + ) + + assert self._telemetry is not None + self._telemetry.on_response(final_response) + return final_response + + def _stream_event_from_responses_chunk( + self, chunk: ResponsesAPIStreamingResponse | Any + ) -> LLMStreamEvent | None: + event_type_obj = self._get_chunk_attr(chunk, "type") + if event_type_obj is None: + return None + + if isinstance(event_type_obj, ResponsesAPIStreamEvents): + event_value = event_type_obj.value + else: + event_value = str(event_type_obj) + + event = LLMStreamEvent( + type=event_value, + output_index=self._get_chunk_attr(chunk, "output_index"), + content_index=self._get_chunk_attr(chunk, "content_index"), + item_id=self._get_chunk_attr(chunk, "item_id"), + raw=chunk, + ) + + if event_value in RESPONSES_FINAL_EVENT_TYPES: + event.is_final = True + + text_value = self._get_chunk_text(chunk) + arguments_value = self._get_chunk_arguments(chunk) + channel: StreamChannel = "unknown" + + if event_value in { + ResponsesAPIStreamEvents.OUTPUT_TEXT_DELTA.value, + ResponsesAPIStreamEvents.OUTPUT_TEXT_DONE.value, + }: + channel = "assistant_message" + elif event_value in { + ResponsesAPIStreamEvents.REASONING_SUMMARY_TEXT_DELTA.value, + }: + channel = "reasoning_summary" + elif event_value in { + ResponsesAPIStreamEvents.FUNCTION_CALL_ARGUMENTS_DELTA.value, + ResponsesAPIStreamEvents.FUNCTION_CALL_ARGUMENTS_DONE.value, + ResponsesAPIStreamEvents.MCP_CALL_ARGUMENTS_DELTA.value, + ResponsesAPIStreamEvents.MCP_CALL_ARGUMENTS_DONE.value, + }: + channel = "function_call_arguments" + elif event_value in { + ResponsesAPIStreamEvents.REFUSAL_DELTA.value, + ResponsesAPIStreamEvents.REFUSAL_DONE.value, + }: + channel = "refusal" + elif event_value in { + ResponsesAPIStreamEvents.RESPONSE_CREATED.value, + ResponsesAPIStreamEvents.RESPONSE_IN_PROGRESS.value, + ResponsesAPIStreamEvents.RESPONSE_COMPLETED.value, + ResponsesAPIStreamEvents.RESPONSE_FAILED.value, + ResponsesAPIStreamEvents.RESPONSE_INCOMPLETE.value, + ResponsesAPIStreamEvents.OUTPUT_ITEM_ADDED.value, + ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE.value, + ResponsesAPIStreamEvents.RESPONSE_PART_ADDED.value, + ResponsesAPIStreamEvents.CONTENT_PART_ADDED.value, + ResponsesAPIStreamEvents.CONTENT_PART_DONE.value, + ResponsesAPIStreamEvents.FILE_SEARCH_CALL_IN_PROGRESS.value, + ResponsesAPIStreamEvents.FILE_SEARCH_CALL_SEARCHING.value, + ResponsesAPIStreamEvents.FILE_SEARCH_CALL_COMPLETED.value, + ResponsesAPIStreamEvents.MCP_CALL_IN_PROGRESS.value, + ResponsesAPIStreamEvents.MCP_CALL_COMPLETED.value, + ResponsesAPIStreamEvents.MCP_CALL_FAILED.value, + ResponsesAPIStreamEvents.WEB_SEARCH_CALL_IN_PROGRESS.value, + ResponsesAPIStreamEvents.WEB_SEARCH_CALL_SEARCHING.value, + ResponsesAPIStreamEvents.WEB_SEARCH_CALL_COMPLETED.value, + ResponsesAPIStreamEvents.ERROR.value, + "response.reasoning_summary_part.added", + }: + channel = "status" + + event.channel = channel + + if channel in {"assistant_message", "reasoning_summary", "refusal"}: + if text_value: + event.text = text_value + if channel == "function_call_arguments": + if arguments_value: + event.arguments = arguments_value + + return event + + @staticmethod + def _get_chunk_attr(chunk: Any, attr: str, default: Any = None) -> Any: + if hasattr(chunk, attr): + return getattr(chunk, attr) + if isinstance(chunk, dict): + return chunk.get(attr, default) + return default + + def _get_chunk_text(self, chunk: Any) -> str | None: + text = self._get_chunk_attr(chunk, "delta") + if not isinstance(text, str) or text == "": + text = self._get_chunk_attr(chunk, "text") + if (text is None or text == "") and self._get_chunk_attr(chunk, "part"): + part = self._get_chunk_attr(chunk, "part") + text = self._get_chunk_attr(part, "text") + if isinstance(text, str) and text: + return text + return None + + def _get_chunk_arguments(self, chunk: Any) -> str | None: + arguments = self._get_chunk_attr(chunk, "arguments") + if not isinstance(arguments, str) or arguments == "": + arguments = self._get_chunk_attr(chunk, "delta") + if isinstance(arguments, str) and arguments: + return arguments + return None + # ========================================================================= # Transport + helpers - # ========================================================================= def _transport_call( self, *, messages: list[dict[str, Any]], **kwargs ) -> ModelResponse: diff --git a/openhands/sdk/llm/streaming.py b/openhands/sdk/llm/streaming.py new file mode 100644 index 0000000000..6e1b1eb9cd --- /dev/null +++ b/openhands/sdk/llm/streaming.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any, Literal + + +StreamChannel = Literal[ + "assistant_message", + "reasoning_summary", + "function_call_arguments", + "tool_call_output", + "refusal", + "system", + "status", + "unknown", +] + + +@dataclass(slots=True) +class LLMStreamEvent: + """Represents a streaming delta emitted by an LLM provider.""" + + type: str + channel: StreamChannel = "unknown" + text: str | None = None + arguments: str | None = None + output_index: int | None = None + content_index: int | None = None + item_id: str | None = None + is_final: bool = False + raw: Any | None = None + + +TokenCallbackType = Callable[[LLMStreamEvent], None] diff --git a/tests/sdk/conversation/test_streaming_events.py b/tests/sdk/conversation/test_streaming_events.py new file mode 100644 index 0000000000..3bab2551a4 --- /dev/null +++ b/tests/sdk/conversation/test_streaming_events.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +from litellm.responses.main import mock_responses_api_response +from rich.console import Console + +from openhands.sdk import Conversation +from openhands.sdk.agent import Agent +from openhands.sdk.event import MessageEvent, StreamingDeltaEvent +from openhands.sdk.llm import LLM, LLMResponse, LLMStreamEvent +from openhands.sdk.llm.message import Message, TextContent +from openhands.sdk.llm.utils.metrics import MetricsSnapshot + + +class FakeStreamingLLM(LLM): + def __init__(self) -> None: + super().__init__(model="test-stream", service_id="test-stream") + self._stream_events = [ + LLMStreamEvent( + type="response.output_text.delta", + channel="assistant_message", + text="Hello", + output_index=0, + content_index=0, + item_id="item-1", + ), + LLMStreamEvent( + type="response.output_text.delta", + channel="assistant_message", + text=" world", + output_index=0, + content_index=0, + item_id="item-1", + ), + LLMStreamEvent( + type="response.output_text.done", + channel="assistant_message", + is_final=True, + output_index=0, + content_index=0, + item_id="item-1", + ), + LLMStreamEvent( + type="response.completed", + channel="status", + is_final=True, + output_index=0, + content_index=0, + item_id="item-1", + ), + ] + + def uses_responses_api(self) -> bool: # pragma: no cover - simple override + return True + + def responses( + self, + messages, + tools=None, + include=None, + store=None, + _return_metrics=False, + add_security_risk_prediction=False, + on_token=None, + **kwargs, + ): + if on_token: + for event in self._stream_events: + on_token(event) + + message = Message( + role="assistant", + content=[TextContent(text="Hello world")], + ) + snapshot = MetricsSnapshot( + model_name=self.metrics.model_name, + accumulated_cost=self.metrics.accumulated_cost, + max_budget_per_task=self.metrics.max_budget_per_task, + accumulated_token_usage=self.metrics.accumulated_token_usage, + ) + raw_response = mock_responses_api_response("Hello world") + if self._telemetry: + self._telemetry.on_response(raw_response) + return LLMResponse(message=message, metrics=snapshot, raw_response=raw_response) + + +def test_streaming_events_persist_and_dispatch(tmp_path): + llm = FakeStreamingLLM() + agent = Agent(llm=llm, tools=[]) + + tokens: list[LLMStreamEvent] = [] + callback_events = [] + + def token_cb(event: LLMStreamEvent) -> None: + tokens.append(event) + + def recorder(event) -> None: + callback_events.append(event) + + conversation = Conversation( + agent=agent, + workspace=str(tmp_path), + callbacks=[recorder], + token_callbacks=[token_cb], + visualize=False, + ) + + conversation.send_message("Say hello") + conversation.run() + + stream_events = [ + event + for event in conversation.state.events + if isinstance(event, StreamingDeltaEvent) + ] + + assert len(stream_events) == len(llm._stream_events) + assert [evt.stream_event.type for evt in stream_events] == [ + evt.type for evt in llm._stream_events + ] + assert [evt.stream_event.channel for evt in stream_events[:3]] == [ + "assistant_message", + "assistant_message", + "assistant_message", + ] + assert stream_events[-2].stream_event.is_final is True + assert stream_events[-2].stream_event.channel == "assistant_message" + assert stream_events[-1].stream_event.channel == "status" + + assert [evt.type for evt in tokens] == [evt.type for evt in llm._stream_events] + + stream_indices = [ + idx + for idx, event in enumerate(callback_events) + if isinstance(event, StreamingDeltaEvent) + ] + final_message_index = next( + idx + for idx, event in enumerate(callback_events) + if isinstance(event, MessageEvent) and event.source == "agent" + ) + + assert stream_indices # streaming events received via callbacks + assert all(idx < final_message_index for idx in stream_indices) + + +def test_visualizer_streaming_renders_incremental_text(): + from openhands.sdk.conversation.visualizer import ConversationVisualizer + + viz = ConversationVisualizer() + viz._console = Console(record=True) + + reasoning_start = LLMStreamEvent( + type="response.reasoning_summary_text.delta", + channel="reasoning_summary", + text="Think", + output_index=0, + content_index=0, + item_id="reasoning-1", + ) + reasoning_continue = LLMStreamEvent( + type="response.reasoning_summary_text.delta", + channel="reasoning_summary", + text=" deeply", + output_index=0, + content_index=0, + item_id="reasoning-1", + ) + reasoning_end = LLMStreamEvent( + type="response.reasoning_summary_text.delta", + channel="reasoning_summary", + is_final=True, + output_index=0, + content_index=0, + item_id="reasoning-1", + ) + + viz.on_event(StreamingDeltaEvent(source="agent", stream_event=reasoning_start)) + viz.on_event(StreamingDeltaEvent(source="agent", stream_event=reasoning_continue)) + viz.on_event(StreamingDeltaEvent(source="agent", stream_event=reasoning_end)) + + output = viz._console.export_text() + assert "Reasoning:" in output + assert "Think deeply" in output diff --git a/tests/sdk/llm/test_responses_parsing_and_kwargs.py b/tests/sdk/llm/test_responses_parsing_and_kwargs.py index 81ee1f2ce4..ffda3c95e9 100644 --- a/tests/sdk/llm/test_responses_parsing_and_kwargs.py +++ b/tests/sdk/llm/test_responses_parsing_and_kwargs.py @@ -1,6 +1,13 @@ +from types import SimpleNamespace from unittest.mock import patch -from litellm.types.llms.openai import ResponseAPIUsage, ResponsesAPIResponse +import pytest +from litellm.responses.main import mock_responses_api_response +from litellm.types.llms.openai import ( + ResponseAPIUsage, + ResponsesAPIResponse, + ResponsesAPIStreamEvents, +) from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall from openai.types.responses.response_output_message import ResponseOutputMessage from openai.types.responses.response_output_text import ResponseOutputText @@ -9,7 +16,7 @@ Summary, ) -from openhands.sdk.llm.llm import LLM +from openhands.sdk.llm import LLM from openhands.sdk.llm.message import Message, ReasoningItemModel, TextContent @@ -116,3 +123,79 @@ def test_llm_responses_end_to_end(mock_responses_call): ] # Telemetry should have recorded usage (one entry) assert len(llm._telemetry.metrics.token_usages) == 1 # type: ignore[attr-defined] + + +@patch("openhands.sdk.llm.llm.litellm_responses") +def test_llm_responses_streaming_invokes_token_callback(mock_responses_call): + llm = LLM(model="gpt-5-mini") + sys = Message(role="system", content=[TextContent(text="inst")]) + user = Message(role="user", content=[TextContent(text="hi")]) + + final_resp = mock_responses_api_response("Streaming hello") + + delta_event = SimpleNamespace( + type=ResponsesAPIStreamEvents.OUTPUT_TEXT_DELTA, + delta="Streaming ", + output_index=0, + content_index=0, + item_id="item-1", + ) + completion_event = SimpleNamespace( + type=ResponsesAPIStreamEvents.RESPONSE_COMPLETED, + response=final_resp, + ) + + class DummyStream: + def __init__(self, events): + self._events = events + self._index = 0 + self.finished = False + self.completed_response = None + + def __iter__(self): + return self + + def __next__(self): + if self._index >= len(self._events): + self.finished = True + raise StopIteration + event = self._events[self._index] + self._index += 1 + if ( + getattr(event, "type", None) + == ResponsesAPIStreamEvents.RESPONSE_COMPLETED + ): + self.completed_response = event + return event + + stream = DummyStream([delta_event, completion_event]) + mock_responses_call.return_value = stream + + captured = [] + + def on_token(event): + captured.append(event) + + result = llm.responses([sys, user], on_token=on_token) + + assert [evt.type for evt in captured] == [ + ResponsesAPIStreamEvents.OUTPUT_TEXT_DELTA.value, + ResponsesAPIStreamEvents.RESPONSE_COMPLETED.value, + ] + assert captured[0].text == "Streaming " + assert captured[1].is_final is True + assert result.message.role == "assistant" + assert "Streaming hello" in "".join( + c.text for c in result.message.content if isinstance(c, TextContent) + ) + assert stream.finished is True + assert len(llm._telemetry.metrics.token_usages) == 1 # type: ignore[attr-defined] + + +def test_llm_responses_stream_requires_callback(): + llm = LLM(model="gpt-5-mini") + sys = Message(role="system", content=[TextContent(text="inst")]) + user = Message(role="user", content=[TextContent(text="hi")]) + + with pytest.raises(ValueError, match="requires an on_token callback"): + llm.responses([sys, user], stream=True) From d331abfc5ae945585ec8bae5a81bfe0dbb7ac47c Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Mon, 20 Oct 2025 10:53:29 +0200 Subject: [PATCH 02/15] Document LLM streaming refactor plan --- llm_streaming_refactor_plan.md | 121 +++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 llm_streaming_refactor_plan.md diff --git a/llm_streaming_refactor_plan.md b/llm_streaming_refactor_plan.md new file mode 100644 index 0000000000..3272d97d90 --- /dev/null +++ b/llm_streaming_refactor_plan.md @@ -0,0 +1,121 @@ +# LLM Streaming Refactor Plan + +## Observed LiteLLM stream event types + +LiteLLM emits `ResponsesAPIStreamEvents` values while streaming. The current enum and their string payloads are: + +- `response.created` +- `response.in_progress` +- `response.completed` +- `response.failed` +- `response.incomplete` +- `response.output_item.added` +- `response.output_item.done` +- `response.output_text.delta` +- `response.output_text.done` +- `response.output_text.annotation.added` +- `response.reasoning_summary_text.delta` +- `response.reasoning_summary_part.added` +- `response.function_call_arguments.delta` +- `response.function_call_arguments.done` +- `response.mcp_call_arguments.delta` +- `response.mcp_call_arguments.done` +- `response.mcp_call.in_progress` +- `response.mcp_call.completed` +- `response.mcp_call.failed` +- `response.mcp_list_tools.in_progress` +- `response.mcp_list_tools.completed` +- `response.mcp_list_tools.failed` +- `response.file_search_call.in_progress` +- `response.file_search_call.searching` +- `response.file_search_call.completed` +- `response.web_search_call.in_progress` +- `response.web_search_call.searching` +- `response.web_search_call.completed` +- `response.refusal.delta` +- `response.refusal.done` +- `error` +- `response.content_part.added` +- `response.content_part.done` + +These events conceptually fall into buckets we care about for visualization and higher-level semantics: + +| Category | Events | Notes | +| --- | --- | --- | +| **Lifecycle / status** | created, in_progress, completed, failed, incomplete, *_call.* events, output_item.added/done, content_part.added/done, error | remind our UI but typically not shown inline | +| **Assistant text** | output_text.delta, output_text.done, output_text.annotation.added | forms "Message" body | +| **Reasoning summary** | reasoning_summary_part.added, reasoning_summary_text.delta | feed into Reasoning blobs | +| **Function / tool arguments** | function_call_arguments.delta/done, mcp_call_arguments.delta/done | update Action sections | +| **Refusal** | refusal.delta/done | render special refusal text | + +## Problems to resolve + +1. **Streaming display duplicates content and forces line breaks.** We currently print each delta as its own Rich print call with `end=""`, but Live panels aren’t used and the console injects newlines between `print` calls, so output becomes `word\nword\n...`. +2. **No per-message aggregation.** All reasoning deltas accumulate into a single global area, so later messages overwrite earlier context. We need separate buffers per "logical container" (assistant message, reasoning summary, function call) associated with the owning `LLMConvertibleEvent` (e.g., `MessageEvent`, `ActionEvent`). +3. **Naming collision / clarity.** LiteLLM "events" clash with our own domain events. We should introduce a distinct abstraction, e.g. `LLMStreamChunk`, that wraps metadata about channel, indices, and owning response item. +4. **Persistence & replay.** We still want to persist raw stream parts for clients, but the visualizer should rebuild high-level fragments from these parts when replaying history. + +## Proposed model hierarchy + +``` +LLMStreamChunk (renamed from LLMStreamEvent) +├── part_kind: Literal["assistant", "reasoning", "function_arguments", "refusal", "status", "tool_output"] +├── text_delta: str | None +├── arguments_delta: str | None +├── response_index: int | None +├── item_id: str | None +├── chunk_type: str # raw LiteLLM value +├── is_terminal: bool +├── raw_chunk: Any # original LiteLLM payload retained for logging/replay +└── origin_metadata: dict[str, Any] +``` + +Keeping the raw LiteLLM payload inside each `LLMStreamChunk` means we do **not** need a separate envelope structure; logging can simply serialize the chunk directly. + +## Visualization strategy + +1. **Track a hierarchy per conversation event.** When a LiteLLM stream begins we emit a placeholder `MessageEvent` (assistant message) or `ActionEvent` (function call). Each `LLMStreamChunk` should include a `response_id`/`item_id` so we can map to the owning conversation event: + - `output_text` → existing `MessageEvent` for the assistant response. + - `reasoning_summary_*` → reasoning area inside `MessageEvent`. + - `function_call_arguments_*` → arguments area inside `ActionEvent`. +2. **Use `Live` per section.** For each unique `(conversation_event_id, part_kind, item_id)` create a Rich `Live` instance that updates with concatenated text. When the part is terminal, stop the `Live` and leave the final text in place. +3. **Avoid newlines unless emitted by the model.** We’ll join chunks using plain string concatenation and only add newline characters when the delta contains `\n` or when we intentionally insert separators (e.g., between tool JSON arguments). +4. **Segregate sections:** + - `Reasoning:` header per `MessageEvent`. Each new reasoning item gets its own Live line under that message. + - `Assistant:` body for natural language output, appended inside the message panel. + - `Function Arguments:` block under each action panel, streaming JSON incrementally. + +## Implementation roadmap + +1. **Data model adjustments** + - Rename the existing `LLMStreamEvent` class to `LLMStreamChunk` and extend it with richer fields: `part_kind`, `response_index`, `conversation_event_id` (populated later), `raw_chunk`, etc. + - Create helper to classify LiteLLM chunks into `LLMStreamChunk` instances (including mapping item IDs to owning role/time). + +2. **Conversation state integration** + - When we enqueue the initial `MessageEvent`/`ActionEvent`, cache a lookup (e.g., `inflight_streams[(response_id, output_index)] = conversation_event_id`). + - Update `LocalConversation` token callback wrapper to attach the resolved conversation event ID onto the `LLMStreamChunk` before emitting/persisting. + +3. **Visualizer rewrite** + - Maintain `self._stream_views[(conversation_event_id, part_kind, item_id)] = LiveState` where `LiveState` wraps buffer, style, and a `Live` instance. + - On streaming updates: update buffer, `live.update(Text(buffer, style=...))` without printing newlines. + - On final chunk: stop `Live`, render final static text, and optionally record in conversation state for playback. + - Ensure replay (when visualizer processes stored events) converts stored parts into final text as well. + +4. **Persistence / tests** + - Update tests to ensure: + - Multiple output_text deltas produce contiguous text without duplicates or extra newlines. + - Separate reasoning items create separate entries under one message event. + - Function call arguments stream into their own block. + - Add snapshot/log assertions to confirm persisted JSONL remains unchanged for downstream clients. + +5. **Documentation & naming cleanup** + - Decide on final terminology (`LLMStreamChunk`, `StreamItem`, etc.) and update code comments accordingly. + - Document the classification table for future maintainers. + +## Next actions + +- [ ] Refactor classifier to output `LLMStreamChunk` objects with clear `part_kind`. +- [ ] Track in-flight conversation events so parts know their owner. +- [ ] Replace print-based visualizer streaming with `Live` blocks per section. +- [ ] Extend unit tests to cover multiple messages, reasoning segments, and tool calls. +- [ ] Manually validate with long streaming example to confirm smooth in-place updates. From e31b728bf654e4830c2fd94199469342c31c4d49 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Mon, 20 Oct 2025 14:03:26 +0200 Subject: [PATCH 03/15] Refactor streaming chunk model and visualizer --- README.md | 8 +- .../24_responses_streaming.py | 38 +- llm_streaming_refactor_plan.md | 29 +- openhands/sdk/__init__.py | 14 +- .../conversation/impl/local_conversation.py | 8 +- openhands/sdk/conversation/visualizer.py | 332 +++++++++++++++--- openhands/sdk/event/streaming.py | 18 +- openhands/sdk/llm/__init__.py | 4 +- openhands/sdk/llm/llm.py | 50 +-- openhands/sdk/llm/streaming.py | 15 +- .../sdk/conversation/test_streaming_events.py | 70 ++-- .../llm/test_responses_parsing_and_kwargs.py | 2 +- 12 files changed, 424 insertions(+), 164 deletions(-) diff --git a/README.md b/README.md index 7d7f689933..00cb3cb918 100644 --- a/README.md +++ b/README.md @@ -145,17 +145,17 @@ llm = registry.get("default") You can receive incremental deltas from the Responses API by supplying a token callback when constructing a conversation. Each callback receives an -``LLMStreamEvent`` describing the delta. +``LLMStreamChunk`` describing the delta. ```python from pathlib import Path -from openhands.sdk import Conversation, LLMStreamEvent +from openhands.sdk import Conversation, LLMStreamChunk log_dir = Path("logs/stream") log_dir.mkdir(parents=True, exist_ok=True) -def on_token(event: LLMStreamEvent) -> None: - print(event.text or event.arguments or "", end="", flush=True) +def on_token(event: LLMStreamChunk) -> None: + print(event.text_delta or event.arguments_delta or "", end="", flush=True) conversation = Conversation(agent=agent, token_callbacks=[on_token]) conversation.send_message("Summarize the benefits of token streaming.") diff --git a/examples/01_standalone_sdk/24_responses_streaming.py b/examples/01_standalone_sdk/24_responses_streaming.py index 9a87cbe4ba..75b4f602c8 100644 --- a/examples/01_standalone_sdk/24_responses_streaming.py +++ b/examples/01_standalone_sdk/24_responses_streaming.py @@ -15,23 +15,34 @@ from pydantic import SecretStr -from openhands.sdk import Conversation, LLMStreamEvent, get_logger +from openhands.sdk import ( + Conversation, + ConversationCallbackType, + LLMStreamChunk, + get_logger, +) +from openhands.sdk.conversation.visualizer import create_streaming_visualizer from openhands.sdk.llm import LLM from openhands.tools.preset.default import get_default_agent +PRINT_STREAM_TO_STDOUT = False + + logger = get_logger(__name__) LOG_DIR = Path("logs/stream") -def _serialize_event(event: LLMStreamEvent) -> dict[str, Any]: +def _serialize_event(event: LLMStreamChunk) -> dict[str, Any]: record = { "type": event.type, - "text": event.text, - "arguments": event.arguments, + "part_kind": event.part_kind, + "text": event.text_delta, + "arguments": event.arguments_delta, "output_index": event.output_index, "content_index": event.content_index, "item_id": event.item_id, + "response_id": event.response_id, "is_final": event.is_final, } return record @@ -58,21 +69,28 @@ def main() -> None: LOG_DIR.mkdir(parents=True, exist_ok=True) log_path = LOG_DIR / f"responses_stream_{timestamp}.jsonl" - def on_token(event: LLMStreamEvent) -> None: + def on_token(event: LLMStreamChunk) -> None: record = _serialize_event(event) with log_path.open("a", encoding="utf-8") as fp: fp.write(json.dumps(record) + "\n") - stream_chunk = event.text or event.arguments - if stream_chunk: - print(stream_chunk, end="", flush=True) - if event.is_final: + delta = event.text_delta or event.arguments_delta + if delta and PRINT_STREAM_TO_STDOUT: + print(delta, end="", flush=True) + if event.is_final and event.part_kind == "status" and PRINT_STREAM_TO_STDOUT: print("\n--- stream complete ---") + callbacks: list[ConversationCallbackType] = [] + if not PRINT_STREAM_TO_STDOUT: + streaming_visualizer = create_streaming_visualizer() + callbacks.append(streaming_visualizer.on_event) + conversation = Conversation( agent=agent, workspace=os.getcwd(), token_callbacks=[on_token], + callbacks=callbacks or None, + visualize=False, ) story_prompt = ( @@ -83,7 +101,7 @@ def on_token(event: LLMStreamEvent) -> None: conversation.run() cleanup_prompt = ( - "Thank you. Please delete streaming_story.md now that I've read it, " + "Thank you. Please delete the streaming story file now that I've read it, " "then confirm the deletion." ) conversation.send_message(cleanup_prompt) diff --git a/llm_streaming_refactor_plan.md b/llm_streaming_refactor_plan.md index 3272d97d90..fce33f67ac 100644 --- a/llm_streaming_refactor_plan.md +++ b/llm_streaming_refactor_plan.md @@ -74,16 +74,12 @@ Keeping the raw LiteLLM payload inside each `LLMStreamChunk` means we do **not** ## Visualization strategy -1. **Track a hierarchy per conversation event.** When a LiteLLM stream begins we emit a placeholder `MessageEvent` (assistant message) or `ActionEvent` (function call). Each `LLMStreamChunk` should include a `response_id`/`item_id` so we can map to the owning conversation event: - - `output_text` → existing `MessageEvent` for the assistant response. - - `reasoning_summary_*` → reasoning area inside `MessageEvent`. - - `function_call_arguments_*` → arguments area inside `ActionEvent`. -2. **Use `Live` per section.** For each unique `(conversation_event_id, part_kind, item_id)` create a Rich `Live` instance that updates with concatenated text. When the part is terminal, stop the `Live` and leave the final text in place. -3. **Avoid newlines unless emitted by the model.** We’ll join chunks using plain string concatenation and only add newline characters when the delta contains `\n` or when we intentionally insert separators (e.g., between tool JSON arguments). -4. **Segregate sections:** - - `Reasoning:` header per `MessageEvent`. Each new reasoning item gets its own Live line under that message. - - `Assistant:` body for natural language output, appended inside the message panel. - - `Function Arguments:` block under each action panel, streaming JSON incrementally. +We will leave the existing `ConversationVisualizer` untouched for default/legacy usage and introduce a new `StreamingConversationVisualizer` that renders deltas directly inside the final panels: + +1. **Create/update per-response panels.** The first chunk for a `(response_id, output_index)` pair creates (or reuses) a panel for the assistant message or tool call and immediately starts streaming into it. +2. **Route text into semantic sections.** Assistant text, reasoning summaries, function-call arguments, tool output, and refusals each update their own section inside the panel. +3. **Use Rich `Live` when interactive.** In a real terminal we keep the panel on screen and update it in place; when the console is not interactive (tests, logging) we fall back to static updates. +4. **Leave the panel in place when finished.** When the final chunk arrives we stop updating but keep the panel visible; the subsequent `MessageEvent`/`ActionEvent` is suppressed to avoid duplicate re-rendering. ## Implementation roadmap @@ -95,11 +91,11 @@ Keeping the raw LiteLLM payload inside each `LLMStreamChunk` means we do **not** - When we enqueue the initial `MessageEvent`/`ActionEvent`, cache a lookup (e.g., `inflight_streams[(response_id, output_index)] = conversation_event_id`). - Update `LocalConversation` token callback wrapper to attach the resolved conversation event ID onto the `LLMStreamChunk` before emitting/persisting. -3. **Visualizer rewrite** - - Maintain `self._stream_views[(conversation_event_id, part_kind, item_id)] = LiveState` where `LiveState` wraps buffer, style, and a `Live` instance. - - On streaming updates: update buffer, `live.update(Text(buffer, style=...))` without printing newlines. - - On final chunk: stop `Live`, render final static text, and optionally record in conversation state for playback. - - Ensure replay (when visualizer processes stored events) converts stored parts into final text as well. +3. **Streaming visualizer** + - Implement `StreamingConversationVisualizer` with lightweight session tracking (keyed by response/output) that owns Rich panels for streaming sections. + - Stream updates into the same panel that will remain visible after completion; use `Live` only when running in an interactive terminal. + - Suppress duplicate rendering when the final `MessageEvent`/`ActionEvent` arrives, since the streamed panel already contains the content. + - Provide a factory helper (e.g., `create_streaming_visualizer`) for callers that want the streaming experience. 4. **Persistence / tests** - Update tests to ensure: @@ -117,5 +113,6 @@ Keeping the raw LiteLLM payload inside each `LLMStreamChunk` means we do **not** - [ ] Refactor classifier to output `LLMStreamChunk` objects with clear `part_kind`. - [ ] Track in-flight conversation events so parts know their owner. - [ ] Replace print-based visualizer streaming with `Live` blocks per section. -- [ ] Extend unit tests to cover multiple messages, reasoning segments, and tool calls. +- [ ] Extend unit tests to cover multiple messages, reasoning segments, tool calls, and the new streaming visualizer. +- [ ] Update the standalone streaming example to wire in the streaming visualizer helper. - [ ] Manually validate with long streaming example to confirm smooth in-place updates. diff --git a/openhands/sdk/__init__.py b/openhands/sdk/__init__.py index 6d0c771eab..4faf23e952 100644 --- a/openhands/sdk/__init__.py +++ b/openhands/sdk/__init__.py @@ -13,6 +13,12 @@ RemoteConversation, ) from openhands.sdk.conversation.conversation_stats import ConversationStats +from openhands.sdk.conversation.visualizer import ( + ConversationVisualizer, + StreamingConversationVisualizer, + create_default_visualizer, + create_streaming_visualizer, +) from openhands.sdk.event import Event, LLMConvertibleEvent from openhands.sdk.event.llm_convertible import MessageEvent from openhands.sdk.io import FileStore, LocalFileStore @@ -20,7 +26,7 @@ LLM, ImageContent, LLMRegistry, - LLMStreamEvent, + LLMStreamChunk, Message, RedactedThinkingBlock, RegistryEvent, @@ -60,9 +66,13 @@ __all__ = [ "LLM", "LLMRegistry", - "LLMStreamEvent", + "LLMStreamChunk", "TokenCallbackType", "ConversationStats", + "ConversationVisualizer", + "StreamingConversationVisualizer", + "create_default_visualizer", + "create_streaming_visualizer", "RegistryEvent", "Message", "TextContent", diff --git a/openhands/sdk/conversation/impl/local_conversation.py b/openhands/sdk/conversation/impl/local_conversation.py index 46366ecf57..15611c07a0 100644 --- a/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands/sdk/conversation/impl/local_conversation.py @@ -20,7 +20,7 @@ StreamingDeltaEvent, UserRejectObservation, ) -from openhands.sdk.llm import LLM, LLMStreamEvent, Message, TextContent +from openhands.sdk.llm import LLM, LLMStreamChunk, Message, TextContent from openhands.sdk.llm.llm_registry import LLMRegistry from openhands.sdk.logger import get_logger from openhands.sdk.security.confirmation_policy import ( @@ -129,15 +129,15 @@ def _composed(event): _compose_token_callbacks(token_callbacks) if token_callbacks else None ) - def _handle_stream_event(stream_event: LLMStreamEvent) -> None: + def _handle_stream_event(stream_chunk: LLMStreamChunk) -> None: try: self._on_event( - StreamingDeltaEvent(source="agent", stream_event=stream_event) + StreamingDeltaEvent(source="agent", stream_chunk=stream_chunk) ) except Exception: logger.exception("stream_event_processing_error", exc_info=True) if user_token_callback: - user_token_callback(stream_event) + user_token_callback(stream_chunk) self._on_token = _handle_stream_event diff --git a/openhands/sdk/conversation/visualizer.py b/openhands/sdk/conversation/visualizer.py index 75e4b5adcc..2734ea7161 100644 --- a/openhands/sdk/conversation/visualizer.py +++ b/openhands/sdk/conversation/visualizer.py @@ -1,7 +1,7 @@ import re from typing import TYPE_CHECKING, Any -from rich.console import Console +from rich.console import Console, Group from rich.live import Live from rich.panel import Panel from rich.text import Text @@ -18,11 +18,13 @@ ) from openhands.sdk.event.base import Event from openhands.sdk.event.condenser import Condensation -from openhands.sdk.llm.streaming import StreamChannel +from openhands.sdk.llm.llm import RESPONSES_COMPLETION_EVENT_TYPES +from openhands.sdk.llm.streaming import StreamPartKind if TYPE_CHECKING: from openhands.sdk.conversation.conversation_stats import ConversationStats + from openhands.sdk.llm.streaming import LLMStreamChunk # These are external inputs @@ -50,16 +52,137 @@ r"\*(.*?)\*": "italic", } -STREAM_CHANNEL_HEADERS: dict[StreamChannel, tuple[str, str]] = { - "assistant_message": ("Assistant", _ACTION_COLOR), - "reasoning_summary": ("Reasoning", _THOUGHT_COLOR), - "function_call_arguments": ("Function Arguments", _ACTION_COLOR), +_PANEL_PADDING = (1, 1) +_SECTION_CONFIG: dict[str, tuple[str, str]] = { + "reasoning": ("Reasoning", _THOUGHT_COLOR), + "assistant": ("Assistant", _ACTION_COLOR), + "function_arguments": ("Function Arguments", _ACTION_COLOR), + "tool_output": ("Tool Output", _ACTION_COLOR), "refusal": ("Refusal", _ERROR_COLOR), - "tool_call_output": ("Tool Output", _ACTION_COLOR), } +_SESSION_CONFIG: dict[str, tuple[str, str]] = { + "message": ( + f"[bold {_MESSAGE_ASSISTANT_COLOR}]Message from Agent (streaming)" # type: ignore[str-format] + f"[/bold {_MESSAGE_ASSISTANT_COLOR}]", + _MESSAGE_ASSISTANT_COLOR, + ), + "action": ( + f"[bold {_ACTION_COLOR}]Agent Action (streaming)[/bold {_ACTION_COLOR}]", + _ACTION_COLOR, + ), +} + +_SECTION_ORDER = [ + "reasoning", + "assistant", + "function_arguments", + "tool_output", + "refusal", +] -_PANEL_PADDING = (1, 1) + +class _StreamSection: + def __init__(self, header: str, style: str) -> None: + self.header = header + self.style = style + self.content: str = "" + + +class _StreamSession: + def __init__( + self, + *, + console: Console, + session_type: str, + response_id: str | None, + output_index: int | None, + use_live: bool, + ) -> None: + self._console = console + self._session_type = session_type + self._response_id = response_id + self._output_index = output_index + self._use_live = use_live + self._sections: dict[str, _StreamSection] = {} + self._order: list[str] = [] + self._live: Live | None = None + self._last_renderable: Panel | None = None + + @property + def response_id(self) -> str | None: + return self._response_id + + def append_text(self, section_key: str, text: str | None) -> None: + if not text: + return + header, style = _SECTION_CONFIG.get(section_key, (section_key.title(), "cyan")) + section = self._sections.get(section_key) + if section is None: + section = _StreamSection(header, style) + self._sections[section_key] = section + self._order.append(section_key) + self._order.sort( + key=lambda key: _SECTION_ORDER.index(key) + if key in _SECTION_ORDER + else len(_SECTION_ORDER) + ) + section.content += text + self._update() + + def finish(self, *, persist: bool) -> None: + renderable = self._render_panel() + if self._use_live: + if self._live is not None: + self._live.stop() + self._live = None + if persist: + self._console.print(renderable) + self._console.print() + else: + self._console.print() + else: + if persist: + self._console.print(renderable) + self._console.print() + + def _update(self) -> None: + renderable = self._render_panel() + if self._use_live: + if self._live is None: + self._live = Live( + renderable, + console=self._console, + refresh_per_second=24, + transient=True, + ) + self._live.start() + else: + self._live.update(renderable) + else: + self._last_renderable = renderable + + def _render_panel(self) -> Panel: + body_parts: list[Any] = [] + for key in self._order: + section = self._sections[key] + if not section.content: + continue + body_parts.append(Text(f"{section.header}:", style=f"bold {section.style}")) + body_parts.append(Text(section.content, style=section.style)) + if not body_parts: + body_parts.append(Text("[streaming...]", style="dim")) + + title, border_style = _SESSION_CONFIG.get( + self._session_type, ("[bold cyan]Streaming[/bold cyan]", "cyan") + ) + return Panel( + Group(*body_parts), + title=title, + border_style=border_style, + padding=_PANEL_PADDING, + expand=True, + ) class ConversationVisualizer: @@ -92,9 +215,8 @@ def __init__( base_patterns.update(highlight_regex) self._highlight_patterns = base_patterns self._conversation_stats = conversation_stats - self._stream_state: dict[ - tuple[StreamChannel, int | None, str | None], dict[str, Any] - ] = {} + self._use_live = self._console.is_terminal + self._stream_sessions: dict[tuple[str, int, str], _StreamSession] = {} def on_event(self, event: Event) -> None: """Main event handler that displays events with Rich formatting.""" @@ -102,7 +224,7 @@ def on_event(self, event: Event) -> None: self._render_streaming_event(event) return - panel = self._create_event_panel(event) + panel = self._create_event_panel(event) # pyright: ignore[reportAttributeAccessIssue] if panel: self._console.print(panel) self._console.print() # Add spacing between events @@ -130,52 +252,138 @@ def _apply_highlighting(self, text: Text) -> Text: return highlighted def _render_streaming_event(self, event: StreamingDeltaEvent) -> None: - stream = event.stream_event - channel = stream.channel + self._handle_stream_chunk(event.stream_chunk, persist_on_finish=False) - if channel == "status": + def _handle_stream_chunk( + self, stream_chunk: "LLMStreamChunk", *, persist_on_finish: bool + ) -> None: + if stream_chunk.part_kind == "status": + if ( + stream_chunk.type in RESPONSES_COMPLETION_EVENT_TYPES + or stream_chunk.is_final + ): + self._finish_stream_sessions( + stream_chunk.response_id, persist=persist_on_finish + ) return - header, color = STREAM_CHANNEL_HEADERS.get(channel, ("Streaming", "cyan")) - key = (channel, stream.output_index, stream.item_id) - state = self._stream_state.setdefault( - key, - { - "header_printed": False, - "buffer": "", - "header": header, - "color": color, - "live": None, - }, - ) - - if not state["header_printed"]: - self._console.print(Text(f"{header}:", style=f"bold {color}")) - state["header_printed"] = True - - delta_text = stream.text or stream.arguments - if delta_text: - state["buffer"] += delta_text + session_type = self._session_type_for_part(stream_chunk.part_kind) + if session_type is None: + return - live: Live | None = state.get("live") - if live is None: - live = Live( - Text(state["buffer"], style=str(color)), + key = self._make_stream_session_key(stream_chunk, session_type) + session = self._stream_sessions.get(key) + if session is None: + session = _StreamSession( console=self._console, - refresh_per_second=24, - transient=False, + session_type=session_type, + response_id=stream_chunk.response_id, + output_index=stream_chunk.output_index, + use_live=self._use_live, ) - live.start() - state["live"] = live + self._stream_sessions[key] = session + + section_key = self._section_key_for_part(stream_chunk.part_kind) + session.append_text( + section_key, stream_chunk.text_delta or stream_chunk.arguments_delta + ) + + if stream_chunk.is_final: + if persist_on_finish: + self._finish_session_by_key(key, persist=True) + else: + if not self._use_live: + self._finish_session_by_key(key, persist=False) + elif stream_chunk.response_id is None: + self._finish_session_by_key(key, persist=False) + + def _session_type_for_part(self, part_kind: StreamPartKind) -> str | None: + if part_kind in {"assistant_message", "reasoning_summary", "refusal"}: + return "message" + if part_kind in {"function_call_arguments", "tool_call_output"}: + return "action" + return None + + def _section_key_for_part(self, part_kind: StreamPartKind) -> str: + if part_kind == "assistant_message": + return "assistant" + if part_kind == "reasoning_summary": + return "reasoning" + if part_kind == "function_call_arguments": + return "function_arguments" + if part_kind == "tool_call_output": + return "tool_output" + if part_kind == "refusal": + return "refusal" + return "assistant" + + def _make_stream_session_key( + self, chunk: "LLMStreamChunk", session_type: str + ) -> tuple[str, int, str]: + response_key = ( + chunk.response_id + or f"unknown::{chunk.item_id or chunk.output_index or chunk.type}" + ) + output_index = chunk.output_index if chunk.output_index is not None else 0 + return (response_key, output_index, session_type) + + def _finish_stream_sessions( + self, response_id: str | None, *, persist: bool + ) -> None: + if not self._stream_sessions: + return + if response_id is None: + keys = list(self._stream_sessions.keys()) else: - live.update(Text(state["buffer"], style=str(color))) + keys = [ + key + for key, session in self._stream_sessions.items() + if session.response_id == response_id + ] + if not keys: + keys = list(self._stream_sessions.keys()) + for key in keys: + self._finish_session_by_key(key, persist=persist) + + def _finish_session_by_key( + self, key: tuple[str, int, str], *, persist: bool + ) -> None: + session = self._stream_sessions.pop(key, None) + if session is not None: + session.finish(persist=persist) + + +class StreamingConversationVisualizer(ConversationVisualizer): + """Streaming-focused visualizer that renders deltas in-place.""" + + def __init__( + self, + highlight_regex: dict[str, str] | None = None, + skip_user_messages: bool = False, + conversation_stats: "ConversationStats | None" = None, + ) -> None: + super().__init__( + highlight_regex=highlight_regex, + skip_user_messages=skip_user_messages, + conversation_stats=conversation_stats, + ) - if stream.is_final: - live = state.get("live") - if live is not None: - live.stop() - self._console.print() - self._stream_state.pop(key, None) + def on_event(self, event: Event) -> None: + if isinstance(event, StreamingDeltaEvent): + self._handle_stream_chunk(event.stream_chunk, persist_on_finish=True) + return + + if self._should_skip_event(event): + return + + super().on_event(event) + + def _should_skip_event(self, event: Event) -> bool: + if isinstance(event, MessageEvent) and event.source == "agent": + return True + if isinstance(event, ActionEvent) and event.source == "agent": + return True + return False def _create_event_panel(self, event: Event) -> Panel | None: """Create a Rich Panel for the event with appropriate styling.""" @@ -233,14 +441,6 @@ def _create_event_panel(self, event: Event) -> Panel | None: padding=_PANEL_PADDING, expand=True, ) - elif isinstance(event, StreamingDeltaEvent): - return Panel( - event.visualize, - title="[bold cyan]Streaming Delta[/bold cyan]", - border_style="cyan", - padding=_PANEL_PADDING, - expand=True, - ) elif isinstance(event, MessageEvent): if ( self._skip_user_messages @@ -376,3 +576,19 @@ def create_default_visualizer( conversation_stats=conversation_stats, **kwargs, ) + + +def create_streaming_visualizer( + highlight_regex: dict[str, str] | None = None, + conversation_stats: "ConversationStats | None" = None, + **kwargs, +) -> StreamingConversationVisualizer: + """Create a streaming-aware visualizer instance.""" + + return StreamingConversationVisualizer( + highlight_regex=DEFAULT_HIGHLIGHT_REGEX + if highlight_regex is None + else highlight_regex, + conversation_stats=conversation_stats, + **kwargs, + ) diff --git a/openhands/sdk/event/streaming.py b/openhands/sdk/event/streaming.py index 1c6bd9233d..f90534985b 100644 --- a/openhands/sdk/event/streaming.py +++ b/openhands/sdk/event/streaming.py @@ -5,28 +5,28 @@ from openhands.sdk.event.base import Event from openhands.sdk.event.types import SourceType -from openhands.sdk.llm.streaming import LLMStreamEvent, StreamChannel +from openhands.sdk.llm.streaming import LLMStreamChunk, StreamPartKind class StreamingDeltaEvent(Event): """Event emitted for each incremental LLM streaming delta.""" source: SourceType = Field(default="agent") - stream_event: LLMStreamEvent + stream_chunk: LLMStreamChunk @property - def channel(self) -> StreamChannel: - return self.stream_event.channel + def part_kind(self) -> StreamPartKind: + return self.stream_chunk.part_kind @property def visualize(self) -> Text: content = Text() - content.append(f"Channel: {self.stream_event.channel}\n", style="bold") + content.append(f"Part: {self.stream_chunk.part_kind}\n", style="bold") - if self.stream_event.text: - content.append(self.stream_event.text) - elif self.stream_event.arguments: - content.append(self.stream_event.arguments) + if self.stream_chunk.text_delta: + content.append(self.stream_chunk.text_delta) + elif self.stream_chunk.arguments_delta: + content.append(self.stream_chunk.arguments_delta) else: content.append("[no streaming content]") diff --git a/openhands/sdk/llm/__init__.py b/openhands/sdk/llm/__init__.py index f5d47aefc4..63d8d437e6 100644 --- a/openhands/sdk/llm/__init__.py +++ b/openhands/sdk/llm/__init__.py @@ -12,7 +12,7 @@ content_to_str, ) from openhands.sdk.llm.router import RouterLLM -from openhands.sdk.llm.streaming import LLMStreamEvent, TokenCallbackType +from openhands.sdk.llm.streaming import LLMStreamChunk, TokenCallbackType from openhands.sdk.llm.utils.metrics import Metrics, MetricsSnapshot from openhands.sdk.llm.utils.unverified_models import ( UNVERIFIED_MODELS_EXCLUDING_BEDROCK, @@ -35,7 +35,7 @@ "RedactedThinkingBlock", "ReasoningItemModel", "content_to_str", - "LLMStreamEvent", + "LLMStreamChunk", "TokenCallbackType", "Metrics", "MetricsSnapshot", diff --git a/openhands/sdk/llm/llm.py b/openhands/sdk/llm/llm.py index 95093927fe..721ec87f24 100644 --- a/openhands/sdk/llm/llm.py +++ b/openhands/sdk/llm/llm.py @@ -72,8 +72,8 @@ ) from openhands.sdk.llm.mixins.non_native_fc import NonNativeToolCallingMixin from openhands.sdk.llm.streaming import ( - LLMStreamEvent, - StreamChannel, + LLMStreamChunk, + StreamPartKind, TokenCallbackType, ) from openhands.sdk.llm.utils.metrics import Metrics, MetricsSnapshot @@ -702,7 +702,7 @@ def _consume_responses_stream( def _stream_event_from_responses_chunk( self, chunk: ResponsesAPIStreamingResponse | Any - ) -> LLMStreamEvent | None: + ) -> LLMStreamChunk | None: event_type_obj = self._get_chunk_attr(chunk, "type") if event_type_obj is None: return None @@ -712,42 +712,43 @@ def _stream_event_from_responses_chunk( else: event_value = str(event_type_obj) - event = LLMStreamEvent( + stream_chunk = LLMStreamChunk( type=event_value, output_index=self._get_chunk_attr(chunk, "output_index"), content_index=self._get_chunk_attr(chunk, "content_index"), item_id=self._get_chunk_attr(chunk, "item_id"), - raw=chunk, + raw_chunk=chunk, + response_id=self._get_chunk_response_id(chunk), ) if event_value in RESPONSES_FINAL_EVENT_TYPES: - event.is_final = True + stream_chunk.is_final = True text_value = self._get_chunk_text(chunk) arguments_value = self._get_chunk_arguments(chunk) - channel: StreamChannel = "unknown" + part_kind: StreamPartKind = "unknown" if event_value in { ResponsesAPIStreamEvents.OUTPUT_TEXT_DELTA.value, ResponsesAPIStreamEvents.OUTPUT_TEXT_DONE.value, }: - channel = "assistant_message" + part_kind = "assistant_message" elif event_value in { ResponsesAPIStreamEvents.REASONING_SUMMARY_TEXT_DELTA.value, }: - channel = "reasoning_summary" + part_kind = "reasoning_summary" elif event_value in { ResponsesAPIStreamEvents.FUNCTION_CALL_ARGUMENTS_DELTA.value, ResponsesAPIStreamEvents.FUNCTION_CALL_ARGUMENTS_DONE.value, ResponsesAPIStreamEvents.MCP_CALL_ARGUMENTS_DELTA.value, ResponsesAPIStreamEvents.MCP_CALL_ARGUMENTS_DONE.value, }: - channel = "function_call_arguments" + part_kind = "function_call_arguments" elif event_value in { ResponsesAPIStreamEvents.REFUSAL_DELTA.value, ResponsesAPIStreamEvents.REFUSAL_DONE.value, }: - channel = "refusal" + part_kind = "refusal" elif event_value in { ResponsesAPIStreamEvents.RESPONSE_CREATED.value, ResponsesAPIStreamEvents.RESPONSE_IN_PROGRESS.value, @@ -771,18 +772,27 @@ def _stream_event_from_responses_chunk( ResponsesAPIStreamEvents.ERROR.value, "response.reasoning_summary_part.added", }: - channel = "status" + part_kind = "status" - event.channel = channel + stream_chunk.part_kind = part_kind - if channel in {"assistant_message", "reasoning_summary", "refusal"}: + if part_kind in {"assistant_message", "reasoning_summary", "refusal"}: if text_value: - event.text = text_value - if channel == "function_call_arguments": - if arguments_value: - event.arguments = arguments_value - - return event + stream_chunk.text_delta = text_value + if part_kind == "function_call_arguments" and arguments_value: + stream_chunk.arguments_delta = arguments_value + + return stream_chunk + + def _get_chunk_response_id(self, chunk: Any) -> str | None: + response = self._get_chunk_attr(chunk, "response") + response_id = getattr(response, "id", None) if response is not None else None + if isinstance(response_id, str) and response_id: + return response_id + response_id = self._get_chunk_attr(chunk, "response_id") + if isinstance(response_id, str) and response_id: + return response_id + return None @staticmethod def _get_chunk_attr(chunk: Any, attr: str, default: Any = None) -> Any: diff --git a/openhands/sdk/llm/streaming.py b/openhands/sdk/llm/streaming.py index 6e1b1eb9cd..5a5dcdac0f 100644 --- a/openhands/sdk/llm/streaming.py +++ b/openhands/sdk/llm/streaming.py @@ -5,7 +5,7 @@ from typing import Any, Literal -StreamChannel = Literal[ +StreamPartKind = Literal[ "assistant_message", "reasoning_summary", "function_call_arguments", @@ -18,18 +18,19 @@ @dataclass(slots=True) -class LLMStreamEvent: +class LLMStreamChunk: """Represents a streaming delta emitted by an LLM provider.""" type: str - channel: StreamChannel = "unknown" - text: str | None = None - arguments: str | None = None + part_kind: StreamPartKind = "unknown" + text_delta: str | None = None + arguments_delta: str | None = None output_index: int | None = None content_index: int | None = None item_id: str | None = None + response_id: str | None = None is_final: bool = False - raw: Any | None = None + raw_chunk: Any | None = None -TokenCallbackType = Callable[[LLMStreamEvent], None] +TokenCallbackType = Callable[[LLMStreamChunk], None] diff --git a/tests/sdk/conversation/test_streaming_events.py b/tests/sdk/conversation/test_streaming_events.py index 3bab2551a4..b71d6dcf1c 100644 --- a/tests/sdk/conversation/test_streaming_events.py +++ b/tests/sdk/conversation/test_streaming_events.py @@ -6,7 +6,7 @@ from openhands.sdk import Conversation from openhands.sdk.agent import Agent from openhands.sdk.event import MessageEvent, StreamingDeltaEvent -from openhands.sdk.llm import LLM, LLMResponse, LLMStreamEvent +from openhands.sdk.llm import LLM, LLMResponse, LLMStreamChunk from openhands.sdk.llm.message import Message, TextContent from openhands.sdk.llm.utils.metrics import MetricsSnapshot @@ -15,37 +15,41 @@ class FakeStreamingLLM(LLM): def __init__(self) -> None: super().__init__(model="test-stream", service_id="test-stream") self._stream_events = [ - LLMStreamEvent( + LLMStreamChunk( type="response.output_text.delta", - channel="assistant_message", - text="Hello", + part_kind="assistant_message", + text_delta="Hello", output_index=0, content_index=0, item_id="item-1", + response_id="resp-test", ), - LLMStreamEvent( + LLMStreamChunk( type="response.output_text.delta", - channel="assistant_message", - text=" world", + part_kind="assistant_message", + text_delta=" world", output_index=0, content_index=0, item_id="item-1", + response_id="resp-test", ), - LLMStreamEvent( + LLMStreamChunk( type="response.output_text.done", - channel="assistant_message", + part_kind="assistant_message", is_final=True, output_index=0, content_index=0, item_id="item-1", + response_id="resp-test", ), - LLMStreamEvent( + LLMStreamChunk( type="response.completed", - channel="status", + part_kind="status", is_final=True, output_index=0, content_index=0, item_id="item-1", + response_id="resp-test", ), ] @@ -87,10 +91,10 @@ def test_streaming_events_persist_and_dispatch(tmp_path): llm = FakeStreamingLLM() agent = Agent(llm=llm, tools=[]) - tokens: list[LLMStreamEvent] = [] + tokens: list[LLMStreamChunk] = [] callback_events = [] - def token_cb(event: LLMStreamEvent) -> None: + def token_cb(event: LLMStreamChunk) -> None: tokens.append(event) def recorder(event) -> None: @@ -114,17 +118,17 @@ def recorder(event) -> None: ] assert len(stream_events) == len(llm._stream_events) - assert [evt.stream_event.type for evt in stream_events] == [ + assert [evt.stream_chunk.type for evt in stream_events] == [ evt.type for evt in llm._stream_events ] - assert [evt.stream_event.channel for evt in stream_events[:3]] == [ + assert [evt.stream_chunk.part_kind for evt in stream_events[:3]] == [ "assistant_message", "assistant_message", "assistant_message", ] - assert stream_events[-2].stream_event.is_final is True - assert stream_events[-2].stream_event.channel == "assistant_message" - assert stream_events[-1].stream_event.channel == "status" + assert stream_events[-2].stream_chunk.is_final is True + assert stream_events[-2].stream_chunk.part_kind == "assistant_message" + assert stream_events[-1].stream_chunk.part_kind == "status" assert [evt.type for evt in tokens] == [evt.type for evt in llm._stream_events] @@ -144,39 +148,43 @@ def recorder(event) -> None: def test_visualizer_streaming_renders_incremental_text(): - from openhands.sdk.conversation.visualizer import ConversationVisualizer + from openhands.sdk.conversation.visualizer import create_streaming_visualizer - viz = ConversationVisualizer() + viz = create_streaming_visualizer() viz._console = Console(record=True) + viz._use_live = viz._console.is_terminal - reasoning_start = LLMStreamEvent( + reasoning_start = LLMStreamChunk( type="response.reasoning_summary_text.delta", - channel="reasoning_summary", - text="Think", + part_kind="reasoning_summary", + text_delta="Think", output_index=0, content_index=0, item_id="reasoning-1", + response_id="resp-test", ) - reasoning_continue = LLMStreamEvent( + reasoning_continue = LLMStreamChunk( type="response.reasoning_summary_text.delta", - channel="reasoning_summary", - text=" deeply", + part_kind="reasoning_summary", + text_delta=" deeply", output_index=0, content_index=0, item_id="reasoning-1", + response_id="resp-test", ) - reasoning_end = LLMStreamEvent( + reasoning_end = LLMStreamChunk( type="response.reasoning_summary_text.delta", - channel="reasoning_summary", + part_kind="reasoning_summary", is_final=True, output_index=0, content_index=0, item_id="reasoning-1", + response_id="resp-test", ) - viz.on_event(StreamingDeltaEvent(source="agent", stream_event=reasoning_start)) - viz.on_event(StreamingDeltaEvent(source="agent", stream_event=reasoning_continue)) - viz.on_event(StreamingDeltaEvent(source="agent", stream_event=reasoning_end)) + viz.on_event(StreamingDeltaEvent(source="agent", stream_chunk=reasoning_start)) + viz.on_event(StreamingDeltaEvent(source="agent", stream_chunk=reasoning_continue)) + viz.on_event(StreamingDeltaEvent(source="agent", stream_chunk=reasoning_end)) output = viz._console.export_text() assert "Reasoning:" in output diff --git a/tests/sdk/llm/test_responses_parsing_and_kwargs.py b/tests/sdk/llm/test_responses_parsing_and_kwargs.py index ffda3c95e9..f6f6be5ccf 100644 --- a/tests/sdk/llm/test_responses_parsing_and_kwargs.py +++ b/tests/sdk/llm/test_responses_parsing_and_kwargs.py @@ -182,7 +182,7 @@ def on_token(event): ResponsesAPIStreamEvents.OUTPUT_TEXT_DELTA.value, ResponsesAPIStreamEvents.RESPONSE_COMPLETED.value, ] - assert captured[0].text == "Streaming " + assert captured[0].text_delta == "Streaming " assert captured[1].is_final is True assert result.message.role == "assistant" assert "Streaming hello" in "".join( From a65dbda064a7a82fc3d12fbceeaf649192ca0b00 Mon Sep 17 00:00:00 2001 From: enyst Date: Thu, 20 Nov 2025 16:52:27 +0000 Subject: [PATCH 04/15] Simplify streaming visualizer and always-persist streaming panels Co-authored-by: openhands --- .../sdk/conversation/streaming_visualizer.py | 37 +--- openhands-sdk/openhands/sdk/llm/streaming.py | 1 - .../test_conversation_streaming_visualizer.py | 191 ++++++++++++++++++ .../sdk/conversation/test_streaming_events.py | 75 +------ 4 files changed, 207 insertions(+), 97 deletions(-) create mode 100644 tests/sdk/conversation/local/test_conversation_streaming_visualizer.py diff --git a/openhands-sdk/openhands/sdk/conversation/streaming_visualizer.py b/openhands-sdk/openhands/sdk/conversation/streaming_visualizer.py index 9c08d01113..dfc7e2b702 100644 --- a/openhands-sdk/openhands/sdk/conversation/streaming_visualizer.py +++ b/openhands-sdk/openhands/sdk/conversation/streaming_visualizer.py @@ -6,7 +6,9 @@ from rich.panel import Panel from rich.text import Text -from openhands.sdk.conversation.visualizer import ConversationVisualizer +from openhands.sdk.conversation.visualizer.default import ( + DefaultConversationVisualizer, +) from openhands.sdk.event import ActionEvent, MessageEvent, StreamingDeltaEvent from openhands.sdk.event.base import Event from openhands.sdk.llm.llm import RESPONSES_COMPLETION_EVENT_TYPES @@ -14,7 +16,6 @@ if TYPE_CHECKING: - from openhands.sdk.conversation.conversation_stats import ConversationStats from openhands.sdk.llm.streaming import LLMStreamChunk @@ -44,7 +45,6 @@ "reasoning": ("Reasoning", _THOUGHT_COLOR), "assistant": ("Assistant", _ACTION_COLOR), "function_arguments": ("Function Arguments", _ACTION_COLOR), - "tool_output": ("Tool Output", _ACTION_COLOR), "refusal": ("Refusal", _ERROR_COLOR), } @@ -64,7 +64,6 @@ "reasoning", "assistant", "function_arguments", - "tool_output", "refusal", ] @@ -172,7 +171,7 @@ def _render_panel(self) -> Panel: ) -class StreamingConversationVisualizer(ConversationVisualizer): +class StreamingConversationVisualizer(DefaultConversationVisualizer): """Streaming-focused visualizer that renders deltas in-place.""" requires_streaming: bool = True @@ -181,19 +180,17 @@ def __init__( self, highlight_regex: dict[str, str] | None = None, skip_user_messages: bool = False, - conversation_stats: "ConversationStats | None" = None, ) -> None: super().__init__( highlight_regex=highlight_regex, skip_user_messages=skip_user_messages, - conversation_stats=conversation_stats, ) self._use_live: bool = self._console.is_terminal self._stream_sessions: dict[tuple[str, int, str], _StreamSession] = {} def on_event(self, event: Event) -> None: if isinstance(event, StreamingDeltaEvent): - self._handle_stream_chunk(event.stream_chunk, persist_on_finish=True) + self._handle_stream_chunk(event.stream_chunk) return if self._should_skip_event(event): @@ -201,17 +198,13 @@ def on_event(self, event: Event) -> None: super().on_event(event) - def _handle_stream_chunk( - self, stream_chunk: "LLMStreamChunk", *, persist_on_finish: bool - ) -> None: + def _handle_stream_chunk(self, stream_chunk: "LLMStreamChunk") -> None: if stream_chunk.part_kind == "status": if ( stream_chunk.type in RESPONSES_COMPLETION_EVENT_TYPES or stream_chunk.is_final ): - self._finish_stream_sessions( - stream_chunk.response_id, persist=persist_on_finish - ) + self._finish_stream_sessions(stream_chunk.response_id, persist=True) return session_type = self._session_type_for_part(stream_chunk.part_kind) @@ -236,18 +229,12 @@ def _handle_stream_chunk( ) if stream_chunk.is_final: - if persist_on_finish: - self._finish_session_by_key(key, persist=True) - else: - if not self._use_live: - self._finish_session_by_key(key, persist=False) - elif stream_chunk.response_id is None: - self._finish_session_by_key(key, persist=False) + self._finish_session_by_key(key, persist=True) def _session_type_for_part(self, part_kind: StreamPartKind) -> str | None: if part_kind in {"assistant_message", "reasoning_summary", "refusal"}: return "message" - if part_kind in {"function_call_arguments", "tool_call_output"}: + if part_kind in {"function_call_arguments"}: return "action" return None @@ -258,8 +245,6 @@ def _section_key_for_part(self, part_kind: StreamPartKind) -> str: return "reasoning" if part_kind == "function_call_arguments": return "function_arguments" - if part_kind == "tool_call_output": - return "tool_output" if part_kind == "refusal": return "refusal" return "assistant" @@ -313,7 +298,7 @@ def _create_event_panel(self, event: Event) -> Panel | None: padding=_PANEL_PADDING, expand=True, ) - return super()._create_event_panel(event) + return None def _should_skip_event(self, event: Event) -> bool: if isinstance(event, MessageEvent) and event.source == "agent": @@ -325,7 +310,6 @@ def _should_skip_event(self, event: Event) -> bool: def create_streaming_visualizer( highlight_regex: dict[str, str] | None = None, - conversation_stats: "ConversationStats | None" = None, **kwargs, ) -> StreamingConversationVisualizer: """Create a streaming-aware visualizer instance.""" @@ -334,6 +318,5 @@ def create_streaming_visualizer( highlight_regex=DEFAULT_HIGHLIGHT_REGEX if highlight_regex is None else highlight_regex, - conversation_stats=conversation_stats, **kwargs, ) diff --git a/openhands-sdk/openhands/sdk/llm/streaming.py b/openhands-sdk/openhands/sdk/llm/streaming.py index 5a5dcdac0f..9daf3736a5 100644 --- a/openhands-sdk/openhands/sdk/llm/streaming.py +++ b/openhands-sdk/openhands/sdk/llm/streaming.py @@ -9,7 +9,6 @@ "assistant_message", "reasoning_summary", "function_call_arguments", - "tool_call_output", "refusal", "system", "status", diff --git a/tests/sdk/conversation/local/test_conversation_streaming_visualizer.py b/tests/sdk/conversation/local/test_conversation_streaming_visualizer.py new file mode 100644 index 0000000000..80ee3a63ae --- /dev/null +++ b/tests/sdk/conversation/local/test_conversation_streaming_visualizer.py @@ -0,0 +1,191 @@ +from __future__ import annotations + +from litellm.responses.main import mock_responses_api_response +from rich.console import Console + +from openhands.sdk import Conversation +from openhands.sdk.agent import Agent +from openhands.sdk.conversation.streaming_visualizer import ( + StreamingConversationVisualizer, +) +from openhands.sdk.event import MessageEvent, StreamingDeltaEvent +from openhands.sdk.llm import LLM, LLMResponse, LLMStreamChunk +from openhands.sdk.llm.message import Message, TextContent +from openhands.sdk.llm.utils.metrics import MetricsSnapshot + + +class FakeStreamingLLM(LLM): + def __init__(self) -> None: + super().__init__(model="test-stream", usage_id="test-stream") + self._stream_events: list[LLMStreamChunk] = [ + LLMStreamChunk( + type="response.output_text.delta", + part_kind="assistant_message", + text_delta="Hello", + output_index=0, + content_index=0, + item_id="item-1", + response_id="resp-test", + ), + LLMStreamChunk( + type="response.output_text.delta", + part_kind="assistant_message", + text_delta=" world", + output_index=0, + content_index=0, + item_id="item-1", + response_id="resp-test", + ), + LLMStreamChunk( + type="response.output_text.done", + part_kind="assistant_message", + is_final=True, + output_index=0, + content_index=0, + item_id="item-1", + response_id="resp-test", + ), + LLMStreamChunk( + type="response.completed", + part_kind="status", + is_final=True, + output_index=0, + content_index=0, + item_id="item-1", + response_id="resp-test", + ), + ] + + def uses_responses_api(self) -> bool: # pragma: no cover - simple override + return True + + def responses( + self, + messages, + tools=None, + include=None, + store=None, + _return_metrics=False, + add_security_risk_prediction=False, + on_token=None, + **kwargs, + ): + if on_token: + for event in self._stream_events: + on_token(event) + + message = Message( + role="assistant", + content=[TextContent(text="Hello world")], + ) + snapshot = MetricsSnapshot( + model_name=self.metrics.model_name, + accumulated_cost=self.metrics.accumulated_cost, + max_budget_per_task=self.metrics.max_budget_per_task, + accumulated_token_usage=self.metrics.accumulated_token_usage, + ) + raw_response = mock_responses_api_response("Hello world") + if self._telemetry: + self._telemetry.on_response(raw_response) + return LLMResponse(message=message, metrics=snapshot, raw_response=raw_response) + + +def test_streaming_events_persist_and_dispatch(tmp_path): + llm = FakeStreamingLLM() + agent = Agent(llm=llm, tools=[]) + + tokens: list[LLMStreamChunk] = [] + callback_events = [] + + def token_cb(event: LLMStreamChunk) -> None: + tokens.append(event) + + def recorder(event) -> None: + callback_events.append(event) + + conversation = Conversation( + agent=agent, + workspace=str(tmp_path), + callbacks=[recorder], + token_callbacks=[token_cb], + ) + + conversation.send_message("Say hello") + conversation.run() + + stream_events = [ + event + for event in conversation.state.events + if isinstance(event, StreamingDeltaEvent) + ] + + assert len(stream_events) == len(llm._stream_events) + assert [evt.stream_chunk.type for evt in stream_events] == [ + evt.type for evt in llm._stream_events + ] + assert [evt.stream_chunk.part_kind for evt in stream_events[:3]] == [ + "assistant_message", + "assistant_message", + "assistant_message", + ] + assert stream_events[-2].stream_chunk.is_final is True + assert stream_events[-2].stream_chunk.part_kind == "assistant_message" + assert stream_events[-1].stream_chunk.part_kind == "status" + + assert [evt.type for evt in tokens] == [evt.type for evt in llm._stream_events] + + stream_indices = [ + idx + for idx, event in enumerate(callback_events) + if isinstance(event, StreamingDeltaEvent) + ] + final_message_index = next( + idx + for idx, event in enumerate(callback_events) + if isinstance(event, MessageEvent) and event.source == "agent" + ) + + assert stream_indices # streaming events received via callbacks + assert all(idx < final_message_index for idx in stream_indices) + + +def test_visualizer_streaming_renders_incremental_text(): + viz = StreamingConversationVisualizer() + viz._console = Console(record=True) + viz._use_live = viz._console.is_terminal + + reasoning_start = LLMStreamChunk( + type="response.reasoning_summary_text.delta", + part_kind="reasoning_summary", + text_delta="Think", + output_index=0, + content_index=0, + item_id="reasoning-1", + response_id="resp-test", + ) + reasoning_continue = LLMStreamChunk( + type="response.reasoning_summary_text.delta", + part_kind="reasoning_summary", + text_delta=" deeply", + output_index=0, + content_index=0, + item_id="reasoning-1", + response_id="resp-test", + ) + reasoning_end = LLMStreamChunk( + type="response.reasoning_summary_text.delta", + part_kind="reasoning_summary", + is_final=True, + output_index=0, + content_index=0, + item_id="reasoning-1", + response_id="resp-test", + ) + + viz.on_event(StreamingDeltaEvent(source="agent", stream_chunk=reasoning_start)) + viz.on_event(StreamingDeltaEvent(source="agent", stream_chunk=reasoning_continue)) + viz.on_event(StreamingDeltaEvent(source="agent", stream_chunk=reasoning_end)) + + output = viz._console.export_text() + assert "Reasoning:" in output + assert "Think deeply" in output diff --git a/tests/sdk/conversation/test_streaming_events.py b/tests/sdk/conversation/test_streaming_events.py index e1a26962d1..81f6a97cf5 100644 --- a/tests/sdk/conversation/test_streaming_events.py +++ b/tests/sdk/conversation/test_streaming_events.py @@ -3,9 +3,10 @@ from litellm.responses.main import mock_responses_api_response from rich.console import Console -from openhands.sdk import Conversation -from openhands.sdk.agent import Agent -from openhands.sdk.event import MessageEvent, StreamingDeltaEvent +from openhands.sdk.conversation.streaming_visualizer import ( + StreamingConversationVisualizer, +) +from openhands.sdk.event import StreamingDeltaEvent from openhands.sdk.llm import LLM, LLMResponse, LLMStreamChunk from openhands.sdk.llm.message import Message, TextContent from openhands.sdk.llm.utils.metrics import MetricsSnapshot @@ -13,7 +14,7 @@ class FakeStreamingLLM(LLM): def __init__(self) -> None: - super().__init__(model="test-stream", service_id="test-stream") + super().__init__(model="test-stream", usage_id="test-stream") self._stream_events: list[LLMStreamChunk] = [ LLMStreamChunk( type="response.output_text.delta", @@ -87,72 +88,8 @@ def responses( return LLMResponse(message=message, metrics=snapshot, raw_response=raw_response) -def test_streaming_events_persist_and_dispatch(tmp_path): - llm = FakeStreamingLLM() - agent = Agent(llm=llm, tools=[]) - - tokens: list[LLMStreamChunk] = [] - callback_events = [] - - def token_cb(event: LLMStreamChunk) -> None: - tokens.append(event) - - def recorder(event) -> None: - callback_events.append(event) - - conversation = Conversation( - agent=agent, - workspace=str(tmp_path), - callbacks=[recorder], - token_callbacks=[token_cb], - visualize=False, - ) - - conversation.send_message("Say hello") - conversation.run() - - stream_events = [ - event - for event in conversation.state.events - if isinstance(event, StreamingDeltaEvent) - ] - - assert len(stream_events) == len(llm._stream_events) - assert [evt.stream_chunk.type for evt in stream_events] == [ - evt.type for evt in llm._stream_events - ] - assert [evt.stream_chunk.part_kind for evt in stream_events[:3]] == [ - "assistant_message", - "assistant_message", - "assistant_message", - ] - assert stream_events[-2].stream_chunk.is_final is True - assert stream_events[-2].stream_chunk.part_kind == "assistant_message" - assert stream_events[-1].stream_chunk.part_kind == "status" - - assert [evt.type for evt in tokens] == [evt.type for evt in llm._stream_events] - - stream_indices = [ - idx - for idx, event in enumerate(callback_events) - if isinstance(event, StreamingDeltaEvent) - ] - final_message_index = next( - idx - for idx, event in enumerate(callback_events) - if isinstance(event, MessageEvent) and event.source == "agent" - ) - - assert stream_indices # streaming events received via callbacks - assert all(idx < final_message_index for idx in stream_indices) - - def test_visualizer_streaming_renders_incremental_text(): - from openhands.sdk.conversation.streaming_visualizer import ( - create_streaming_visualizer, - ) - - viz = create_streaming_visualizer() + viz = StreamingConversationVisualizer() viz._console = Console(record=True) viz._use_live = viz._console.is_terminal From dbbd0cf88df00410e7436db816bf123ce356a904 Mon Sep 17 00:00:00 2001 From: openhands Date: Tue, 25 Nov 2025 23:30:36 +0000 Subject: [PATCH 05/15] Fix merge conflicts and type errors after merging main - Add on_token parameter to all agent step() method signatures - Import ConversationTokenCallbackType where needed - Fix LLM router to pass on_token parameter to underlying LLM - Fix example 24_responses_streaming.py (service_id -> usage_id, visualize -> visualizer) - All pre-commit checks now passing Co-authored-by: openhands --- examples/01_standalone_sdk/24_responses_streaming.py | 5 +++-- openhands-sdk/openhands/sdk/llm/router/base.py | 3 +++ tests/cross/test_registry_directories.py | 10 ++++++++-- .../local/test_conversation_default_callback.py | 10 ++++++++-- tests/sdk/conversation/local/test_conversation_id.py | 10 ++++++++-- .../local/test_conversation_send_message.py | 10 ++++++++-- .../test_run_exception_includes_conversation_id.py | 11 ++++++++++- .../conversation/local/test_state_serialization.py | 11 ++++++++++- 8 files changed, 58 insertions(+), 12 deletions(-) diff --git a/examples/01_standalone_sdk/24_responses_streaming.py b/examples/01_standalone_sdk/24_responses_streaming.py index d787e600f8..da4430676d 100644 --- a/examples/01_standalone_sdk/24_responses_streaming.py +++ b/examples/01_standalone_sdk/24_responses_streaming.py @@ -22,6 +22,7 @@ get_logger, ) from openhands.sdk.conversation.streaming_visualizer import create_streaming_visualizer +from openhands.sdk.conversation.visualizer import DefaultConversationVisualizer from openhands.sdk.llm import LLM from openhands.tools.preset.default import get_default_agent @@ -60,7 +61,7 @@ def main() -> None: model=model, api_key=SecretStr(api_key), base_url=base_url, - service_id="stream-demo", + usage_id="stream-demo", ) agent = get_default_agent(llm=llm, cli_mode=True) @@ -90,7 +91,7 @@ def on_token(event: LLMStreamChunk) -> None: workspace=os.getcwd(), token_callbacks=[on_token], callbacks=callbacks or None, - visualize=False, + visualizer=None if callbacks else DefaultConversationVisualizer, ) story_prompt = ( diff --git a/openhands-sdk/openhands/sdk/llm/router/base.py b/openhands-sdk/openhands/sdk/llm/router/base.py index cd908255e6..20a680c259 100644 --- a/openhands-sdk/openhands/sdk/llm/router/base.py +++ b/openhands-sdk/openhands/sdk/llm/router/base.py @@ -10,6 +10,7 @@ from openhands.sdk.llm.llm import LLM from openhands.sdk.llm.llm_response import LLMResponse from openhands.sdk.llm.message import Message +from openhands.sdk.llm.streaming import TokenCallbackType from openhands.sdk.logger import get_logger from openhands.sdk.tool.tool import ToolDefinition @@ -52,6 +53,7 @@ def completion( tools: Sequence[ToolDefinition] | None = None, return_metrics: bool = False, add_security_risk_prediction: bool = False, + on_token: TokenCallbackType | None = None, **kwargs, ) -> LLMResponse: """ @@ -70,6 +72,7 @@ def completion( tools=tools, _return_metrics=return_metrics, add_security_risk_prediction=add_security_risk_prediction, + on_token=on_token, **kwargs, ) diff --git a/tests/cross/test_registry_directories.py b/tests/cross/test_registry_directories.py index 505c250b3e..d4549b872d 100644 --- a/tests/cross/test_registry_directories.py +++ b/tests/cross/test_registry_directories.py @@ -10,7 +10,10 @@ from openhands.sdk.agent.base import AgentBase from openhands.sdk.conversation import Conversation, LocalConversation from openhands.sdk.conversation.state import ConversationState -from openhands.sdk.conversation.types import ConversationCallbackType +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, +) from openhands.sdk.event.llm_convertible import SystemPromptEvent from openhands.sdk.llm import LLM, TextContent from openhands.sdk.tool.registry import resolve_tool @@ -38,7 +41,10 @@ def init_state( on_event(event) def step( - self, conversation: LocalConversation, on_event: ConversationCallbackType + self, + conversation: LocalConversation, + on_event: ConversationCallbackType, + on_token: ConversationTokenCallbackType | None = None, ) -> None: pass diff --git a/tests/sdk/conversation/local/test_conversation_default_callback.py b/tests/sdk/conversation/local/test_conversation_default_callback.py index edaf7b0b57..c56b6b9610 100644 --- a/tests/sdk/conversation/local/test_conversation_default_callback.py +++ b/tests/sdk/conversation/local/test_conversation_default_callback.py @@ -3,7 +3,10 @@ from openhands.sdk.agent.base import AgentBase from openhands.sdk.conversation import Conversation, LocalConversation from openhands.sdk.conversation.state import ConversationState -from openhands.sdk.conversation.types import ConversationCallbackType +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, +) from openhands.sdk.event.llm_convertible import MessageEvent, SystemPromptEvent from openhands.sdk.llm import LLM, Message, TextContent @@ -24,7 +27,10 @@ def init_state( on_event(event) def step( - self, conversation: LocalConversation, on_event: ConversationCallbackType + self, + conversation: LocalConversation, + on_event: ConversationCallbackType, + on_token: ConversationTokenCallbackType | None = None, ) -> None: on_event( MessageEvent( diff --git a/tests/sdk/conversation/local/test_conversation_id.py b/tests/sdk/conversation/local/test_conversation_id.py index bd9f9285ce..721100b048 100644 --- a/tests/sdk/conversation/local/test_conversation_id.py +++ b/tests/sdk/conversation/local/test_conversation_id.py @@ -5,7 +5,10 @@ from openhands.sdk.agent.base import AgentBase from openhands.sdk.conversation import Conversation, LocalConversation from openhands.sdk.conversation.state import ConversationState -from openhands.sdk.conversation.types import ConversationCallbackType +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, +) from openhands.sdk.event.llm_convertible import SystemPromptEvent from openhands.sdk.llm import LLM, TextContent from openhands.sdk.security.confirmation_policy import AlwaysConfirm, NeverConfirm @@ -27,7 +30,10 @@ def init_state( on_event(event) def step( - self, conversation: LocalConversation, on_event: ConversationCallbackType + self, + conversation: LocalConversation, + on_event: ConversationCallbackType, + on_token: ConversationTokenCallbackType | None = None, ) -> None: pass diff --git a/tests/sdk/conversation/local/test_conversation_send_message.py b/tests/sdk/conversation/local/test_conversation_send_message.py index 74409dd10d..e19f87c334 100644 --- a/tests/sdk/conversation/local/test_conversation_send_message.py +++ b/tests/sdk/conversation/local/test_conversation_send_message.py @@ -3,7 +3,10 @@ from openhands.sdk.agent.base import AgentBase from openhands.sdk.conversation import Conversation, LocalConversation from openhands.sdk.conversation.state import ConversationState -from openhands.sdk.conversation.types import ConversationCallbackType +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, +) from openhands.sdk.event.llm_convertible import MessageEvent, SystemPromptEvent from openhands.sdk.llm import LLM, Message, TextContent @@ -24,7 +27,10 @@ def init_state( on_event(event) def step( - self, conversation: LocalConversation, on_event: ConversationCallbackType + self, + conversation: LocalConversation, + on_event: ConversationCallbackType, + on_token: ConversationTokenCallbackType | None = None, ) -> None: on_event( MessageEvent( diff --git a/tests/sdk/conversation/local/test_run_exception_includes_conversation_id.py b/tests/sdk/conversation/local/test_run_exception_includes_conversation_id.py index 1c56bdcf21..3d01218f8d 100644 --- a/tests/sdk/conversation/local/test_run_exception_includes_conversation_id.py +++ b/tests/sdk/conversation/local/test_run_exception_includes_conversation_id.py @@ -5,11 +5,20 @@ from openhands.sdk.agent.base import AgentBase from openhands.sdk.conversation import Conversation from openhands.sdk.conversation.exceptions import ConversationRunError +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, +) from openhands.sdk.llm import LLM class FailingAgent(AgentBase): - def step(self, conversation, on_event): # noqa: D401, ARG002 + def step( + self, + conversation, + on_event: ConversationCallbackType, + on_token: ConversationTokenCallbackType | None = None, + ): # noqa: D401, ARG002 """Intentionally fail to simulate an unexpected runtime error.""" raise ValueError("boom") diff --git a/tests/sdk/conversation/local/test_state_serialization.py b/tests/sdk/conversation/local/test_state_serialization.py index 0c068a391a..1b289356bd 100644 --- a/tests/sdk/conversation/local/test_state_serialization.py +++ b/tests/sdk/conversation/local/test_state_serialization.py @@ -15,6 +15,10 @@ ConversationExecutionStatus, ConversationState, ) +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationTokenCallbackType, +) from openhands.sdk.event.llm_convertible import MessageEvent, SystemPromptEvent from openhands.sdk.llm import LLM, Message, TextContent from openhands.sdk.llm.llm_registry import RegistryEvent @@ -438,7 +442,12 @@ def __init__(self): def init_state(self, state, on_event): pass - def step(self, conversation, on_event): + def step( + self, + conversation, + on_event: ConversationCallbackType, + on_token: ConversationTokenCallbackType | None = None, + ): pass llm = LLM(model="gpt-4o-mini", api_key=SecretStr("test-key"), usage_id="test-llm") From 7ac405db06327297863c971b5193d48226c88d07 Mon Sep 17 00:00:00 2001 From: openhands Date: Tue, 25 Nov 2025 23:47:45 +0000 Subject: [PATCH 06/15] Fix circular import and update tests for streaming API - Fix circular import between agent/utils.py and conversation modules by using lazy imports - Update test_agent_utils.py to include new streaming parameters (on_token, metadata, extra_body) - All tests now passing Co-authored-by: openhands --- openhands-sdk/openhands/sdk/agent/utils.py | 2 +- .../conversation/impl/local_conversation.py | 3 ++- tests/sdk/agent/test_agent_utils.py | 23 +++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/utils.py b/openhands-sdk/openhands/sdk/agent/utils.py index b35581538c..1bf43da611 100644 --- a/openhands-sdk/openhands/sdk/agent/utils.py +++ b/openhands-sdk/openhands/sdk/agent/utils.py @@ -12,7 +12,7 @@ from openhands.sdk.context.condenser.base import CondenserBase from openhands.sdk.context.view import View -from openhands.sdk.conversation import ConversationTokenCallbackType +from openhands.sdk.conversation.types import ConversationTokenCallbackType from openhands.sdk.event.base import Event, LLMConvertibleEvent from openhands.sdk.event.condenser import Condensation from openhands.sdk.llm import LLM, LLMResponse, Message diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 047edf50f1..9abf20afe7 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -4,7 +4,6 @@ from pathlib import Path from openhands.sdk.agent.base import AgentBase -from openhands.sdk.agent.utils import make_llm_completion, prepare_llm_messages from openhands.sdk.context.prompts.prompt import render_template from openhands.sdk.conversation.base import BaseConversation from openhands.sdk.conversation.exceptions import ConversationRunError @@ -514,6 +513,8 @@ def ask_agent(self, question: str) -> str: Returns: A string response from the agent """ + from openhands.sdk.agent.utils import make_llm_completion, prepare_llm_messages + template_dir = ( Path(__file__).parent.parent.parent / "context" / "prompts" / "templates" ) diff --git a/tests/sdk/agent/test_agent_utils.py b/tests/sdk/agent/test_agent_utils.py index 238d76118b..f4e3045a30 100644 --- a/tests/sdk/agent/test_agent_utils.py +++ b/tests/sdk/agent/test_agent_utils.py @@ -27,6 +27,7 @@ def mock_llm(): """Create a mock LLM for testing.""" llm = Mock(spec=LLM) llm.uses_responses_api.return_value = False + llm.metadata = {} return llm @@ -277,7 +278,9 @@ def test_make_llm_completion_with_completion_api(mock_llm, sample_messages): mock_llm.completion.assert_called_once_with( messages=sample_messages, tools=[], + extra_body={"metadata": {}}, add_security_risk_prediction=True, + on_token=None, ) mock_llm.responses.assert_not_called() @@ -301,6 +304,8 @@ def test_make_llm_completion_with_responses_api(mock_llm, sample_messages): include=None, store=False, add_security_risk_prediction=True, + metadata={}, + on_token=None, ) mock_llm.completion.assert_not_called() @@ -323,7 +328,9 @@ def test_make_llm_completion_with_tools_completion_api( mock_llm.completion.assert_called_once_with( messages=sample_messages, tools=sample_tools, + extra_body={"metadata": {}}, add_security_risk_prediction=True, + on_token=None, ) @@ -348,6 +355,8 @@ def test_make_llm_completion_with_tools_responses_api( include=None, store=False, add_security_risk_prediction=True, + metadata={}, + on_token=None, ) @@ -366,7 +375,9 @@ def test_make_llm_completion_with_none_tools(mock_llm, sample_messages): mock_llm.completion.assert_called_once_with( messages=sample_messages, tools=[], + extra_body={"metadata": {}}, add_security_risk_prediction=True, + on_token=None, ) @@ -385,7 +396,9 @@ def test_make_llm_completion_with_empty_tools_list(mock_llm, sample_messages): mock_llm.completion.assert_called_once_with( messages=sample_messages, tools=[], + extra_body={"metadata": {}}, add_security_risk_prediction=True, + on_token=None, ) @@ -404,7 +417,9 @@ def test_make_llm_completion_empty_messages(mock_llm): mock_llm.completion.assert_called_once_with( messages=[], tools=[], + extra_body={"metadata": {}}, add_security_risk_prediction=True, + on_token=None, ) @@ -440,7 +455,9 @@ def test_prepare_llm_messages_and_make_llm_completion_integration( mock_llm.completion.assert_called_once_with( messages=sample_messages, tools=[], + extra_body={"metadata": {}}, add_security_risk_prediction=True, + on_token=None, ) @@ -449,6 +466,7 @@ def test_make_llm_completion_api_selection(): # Test completion API selection mock_llm = Mock(spec=LLM) mock_llm.uses_responses_api.return_value = False + mock_llm.metadata = {} mock_response = Mock(spec=LLMResponse) mock_llm.completion.return_value = mock_response @@ -466,12 +484,15 @@ def test_make_llm_completion_api_selection(): mock_llm.completion.assert_called_once_with( messages=messages, tools=[], + extra_body={"metadata": {}}, add_security_risk_prediction=True, + on_token=None, ) mock_llm.responses.assert_not_called() # Reset mocks and test responses API selection mock_llm.reset_mock() + mock_llm.metadata = {} mock_llm.uses_responses_api.return_value = True mock_llm.responses.return_value = mock_response @@ -485,5 +506,7 @@ def test_make_llm_completion_api_selection(): include=None, store=False, add_security_risk_prediction=True, + metadata={}, + on_token=None, ) mock_llm.completion.assert_not_called() From 847eaaafc55c75db2744995f05e261da43ea3a78 Mon Sep 17 00:00:00 2001 From: openhands Date: Tue, 25 Nov 2025 23:49:33 +0000 Subject: [PATCH 07/15] Trigger CI re-run Co-authored-by: openhands From 80c06f7411d37a0a92d1871852aa4b8ac7079176 Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 25 Nov 2025 21:03:02 -0500 Subject: [PATCH 08/15] remove md --- llm_streaming_refactor_plan.md | 118 --------------------------------- 1 file changed, 118 deletions(-) delete mode 100644 llm_streaming_refactor_plan.md diff --git a/llm_streaming_refactor_plan.md b/llm_streaming_refactor_plan.md deleted file mode 100644 index fce33f67ac..0000000000 --- a/llm_streaming_refactor_plan.md +++ /dev/null @@ -1,118 +0,0 @@ -# LLM Streaming Refactor Plan - -## Observed LiteLLM stream event types - -LiteLLM emits `ResponsesAPIStreamEvents` values while streaming. The current enum and their string payloads are: - -- `response.created` -- `response.in_progress` -- `response.completed` -- `response.failed` -- `response.incomplete` -- `response.output_item.added` -- `response.output_item.done` -- `response.output_text.delta` -- `response.output_text.done` -- `response.output_text.annotation.added` -- `response.reasoning_summary_text.delta` -- `response.reasoning_summary_part.added` -- `response.function_call_arguments.delta` -- `response.function_call_arguments.done` -- `response.mcp_call_arguments.delta` -- `response.mcp_call_arguments.done` -- `response.mcp_call.in_progress` -- `response.mcp_call.completed` -- `response.mcp_call.failed` -- `response.mcp_list_tools.in_progress` -- `response.mcp_list_tools.completed` -- `response.mcp_list_tools.failed` -- `response.file_search_call.in_progress` -- `response.file_search_call.searching` -- `response.file_search_call.completed` -- `response.web_search_call.in_progress` -- `response.web_search_call.searching` -- `response.web_search_call.completed` -- `response.refusal.delta` -- `response.refusal.done` -- `error` -- `response.content_part.added` -- `response.content_part.done` - -These events conceptually fall into buckets we care about for visualization and higher-level semantics: - -| Category | Events | Notes | -| --- | --- | --- | -| **Lifecycle / status** | created, in_progress, completed, failed, incomplete, *_call.* events, output_item.added/done, content_part.added/done, error | remind our UI but typically not shown inline | -| **Assistant text** | output_text.delta, output_text.done, output_text.annotation.added | forms "Message" body | -| **Reasoning summary** | reasoning_summary_part.added, reasoning_summary_text.delta | feed into Reasoning blobs | -| **Function / tool arguments** | function_call_arguments.delta/done, mcp_call_arguments.delta/done | update Action sections | -| **Refusal** | refusal.delta/done | render special refusal text | - -## Problems to resolve - -1. **Streaming display duplicates content and forces line breaks.** We currently print each delta as its own Rich print call with `end=""`, but Live panels aren’t used and the console injects newlines between `print` calls, so output becomes `word\nword\n...`. -2. **No per-message aggregation.** All reasoning deltas accumulate into a single global area, so later messages overwrite earlier context. We need separate buffers per "logical container" (assistant message, reasoning summary, function call) associated with the owning `LLMConvertibleEvent` (e.g., `MessageEvent`, `ActionEvent`). -3. **Naming collision / clarity.** LiteLLM "events" clash with our own domain events. We should introduce a distinct abstraction, e.g. `LLMStreamChunk`, that wraps metadata about channel, indices, and owning response item. -4. **Persistence & replay.** We still want to persist raw stream parts for clients, but the visualizer should rebuild high-level fragments from these parts when replaying history. - -## Proposed model hierarchy - -``` -LLMStreamChunk (renamed from LLMStreamEvent) -├── part_kind: Literal["assistant", "reasoning", "function_arguments", "refusal", "status", "tool_output"] -├── text_delta: str | None -├── arguments_delta: str | None -├── response_index: int | None -├── item_id: str | None -├── chunk_type: str # raw LiteLLM value -├── is_terminal: bool -├── raw_chunk: Any # original LiteLLM payload retained for logging/replay -└── origin_metadata: dict[str, Any] -``` - -Keeping the raw LiteLLM payload inside each `LLMStreamChunk` means we do **not** need a separate envelope structure; logging can simply serialize the chunk directly. - -## Visualization strategy - -We will leave the existing `ConversationVisualizer` untouched for default/legacy usage and introduce a new `StreamingConversationVisualizer` that renders deltas directly inside the final panels: - -1. **Create/update per-response panels.** The first chunk for a `(response_id, output_index)` pair creates (or reuses) a panel for the assistant message or tool call and immediately starts streaming into it. -2. **Route text into semantic sections.** Assistant text, reasoning summaries, function-call arguments, tool output, and refusals each update their own section inside the panel. -3. **Use Rich `Live` when interactive.** In a real terminal we keep the panel on screen and update it in place; when the console is not interactive (tests, logging) we fall back to static updates. -4. **Leave the panel in place when finished.** When the final chunk arrives we stop updating but keep the panel visible; the subsequent `MessageEvent`/`ActionEvent` is suppressed to avoid duplicate re-rendering. - -## Implementation roadmap - -1. **Data model adjustments** - - Rename the existing `LLMStreamEvent` class to `LLMStreamChunk` and extend it with richer fields: `part_kind`, `response_index`, `conversation_event_id` (populated later), `raw_chunk`, etc. - - Create helper to classify LiteLLM chunks into `LLMStreamChunk` instances (including mapping item IDs to owning role/time). - -2. **Conversation state integration** - - When we enqueue the initial `MessageEvent`/`ActionEvent`, cache a lookup (e.g., `inflight_streams[(response_id, output_index)] = conversation_event_id`). - - Update `LocalConversation` token callback wrapper to attach the resolved conversation event ID onto the `LLMStreamChunk` before emitting/persisting. - -3. **Streaming visualizer** - - Implement `StreamingConversationVisualizer` with lightweight session tracking (keyed by response/output) that owns Rich panels for streaming sections. - - Stream updates into the same panel that will remain visible after completion; use `Live` only when running in an interactive terminal. - - Suppress duplicate rendering when the final `MessageEvent`/`ActionEvent` arrives, since the streamed panel already contains the content. - - Provide a factory helper (e.g., `create_streaming_visualizer`) for callers that want the streaming experience. - -4. **Persistence / tests** - - Update tests to ensure: - - Multiple output_text deltas produce contiguous text without duplicates or extra newlines. - - Separate reasoning items create separate entries under one message event. - - Function call arguments stream into their own block. - - Add snapshot/log assertions to confirm persisted JSONL remains unchanged for downstream clients. - -5. **Documentation & naming cleanup** - - Decide on final terminology (`LLMStreamChunk`, `StreamItem`, etc.) and update code comments accordingly. - - Document the classification table for future maintainers. - -## Next actions - -- [ ] Refactor classifier to output `LLMStreamChunk` objects with clear `part_kind`. -- [ ] Track in-flight conversation events so parts know their owner. -- [ ] Replace print-based visualizer streaming with `Live` blocks per section. -- [ ] Extend unit tests to cover multiple messages, reasoning segments, tool calls, and the new streaming visualizer. -- [ ] Update the standalone streaming example to wire in the streaming visualizer helper. -- [ ] Manually validate with long streaming example to confirm smooth in-place updates. From 9859171c854ba5fce2b0b946eb72425e147c206d Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 25 Nov 2025 21:12:36 -0500 Subject: [PATCH 09/15] rename example --- FACTS.txt | 5 +++++ .../{24_responses_streaming.py => 29_responses_streaming.py} | 0 2 files changed, 5 insertions(+) create mode 100644 FACTS.txt rename examples/01_standalone_sdk/{24_responses_streaming.py => 29_responses_streaming.py} (100%) diff --git a/FACTS.txt b/FACTS.txt new file mode 100644 index 0000000000..e76122e165 --- /dev/null +++ b/FACTS.txt @@ -0,0 +1,5 @@ +1. The OpenHands Software Agent SDK is a set of Python and REST APIs for building agents that work with code, supporting tasks from simple README generation to complex multi-agent refactors and rewrites. + +2. The SDK supports multiple workspace environments - agents can either use the local machine as their workspace or run inside ephemeral workspaces (e.g., in Docker or Kubernetes) using the Agent Server. + +3. The project is organized into multiple sub-packages including openhands-sdk, openhands-tools, openhands-workspace, and openhands-agent-server, and powers production applications like the OpenHands CLI and OpenHands Cloud. diff --git a/examples/01_standalone_sdk/24_responses_streaming.py b/examples/01_standalone_sdk/29_responses_streaming.py similarity index 100% rename from examples/01_standalone_sdk/24_responses_streaming.py rename to examples/01_standalone_sdk/29_responses_streaming.py From 71fce094a3f211603bb8978412324c7dfb031160 Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 25 Nov 2025 21:13:25 -0500 Subject: [PATCH 10/15] make LLMStreamChunk a basemodel --- openhands-sdk/openhands/sdk/llm/streaming.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/openhands-sdk/openhands/sdk/llm/streaming.py b/openhands-sdk/openhands/sdk/llm/streaming.py index 9daf3736a5..f2468d0bf3 100644 --- a/openhands-sdk/openhands/sdk/llm/streaming.py +++ b/openhands-sdk/openhands/sdk/llm/streaming.py @@ -1,9 +1,8 @@ -from __future__ import annotations - from collections.abc import Callable -from dataclasses import dataclass from typing import Any, Literal +from pydantic import BaseModel + StreamPartKind = Literal[ "assistant_message", @@ -16,8 +15,7 @@ ] -@dataclass(slots=True) -class LLMStreamChunk: +class LLMStreamChunk(BaseModel): """Represents a streaming delta emitted by an LLM provider.""" type: str From 6a67bac50eac4d22ae2da8ef22a2babaf83685cf Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 25 Nov 2025 21:16:57 -0500 Subject: [PATCH 11/15] clean up some merges --- openhands-sdk/openhands/sdk/agent/agent.py | 1 - openhands-sdk/openhands/sdk/agent/utils.py | 8 ++------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/openhands-sdk/openhands/sdk/agent/agent.py b/openhands-sdk/openhands/sdk/agent/agent.py index 76e8b6236a..036fa6bd4f 100644 --- a/openhands-sdk/openhands/sdk/agent/agent.py +++ b/openhands-sdk/openhands/sdk/agent/agent.py @@ -172,7 +172,6 @@ def step( self.llm, _messages, tools=list(self.tools_map.values()), - add_security_risk_prediction=True, on_token=on_token, ) except FunctionCallValidationError as e: diff --git a/openhands-sdk/openhands/sdk/agent/utils.py b/openhands-sdk/openhands/sdk/agent/utils.py index 1bf43da611..68f5f27cd5 100644 --- a/openhands-sdk/openhands/sdk/agent/utils.py +++ b/openhands-sdk/openhands/sdk/agent/utils.py @@ -183,7 +183,6 @@ def make_llm_completion( llm: LLM, messages: list[Message], tools: list[ToolDefinition] | None = None, - add_security_risk_prediction: bool = True, on_token: ConversationTokenCallbackType | None = None, ) -> LLMResponse: """Make an LLM completion call with the provided messages and tools. @@ -192,7 +191,6 @@ def make_llm_completion( llm: The LLM instance to use for completion messages: The messages to send to the LLM tools: Optional list of tools to provide to the LLM - add_security_risk_prediction: Whether to add security risk prediction on_token: Optional callback for streaming token updates Returns: @@ -204,15 +202,13 @@ def make_llm_completion( tools=tools or [], include=None, store=False, - add_security_risk_prediction=add_security_risk_prediction, - metadata=llm.metadata, + add_security_risk_prediction=True, on_token=on_token, ) else: return llm.completion( messages=messages, tools=tools or [], - extra_body={"metadata": llm.metadata}, - add_security_risk_prediction=add_security_risk_prediction, + add_security_risk_prediction=True, on_token=on_token, ) From ab8961a0c062c0f2c13d40a33479a50844c6e5ae Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 25 Nov 2025 21:39:28 -0500 Subject: [PATCH 12/15] simplify local convo and remove streaming event since that's probably not necessary --- .../openhands/sdk/conversation/base.py | 21 +++++-- .../conversation/impl/local_conversation.py | 63 ++++--------------- openhands-sdk/openhands/sdk/event/__init__.py | 4 -- .../openhands/sdk/event/streaming.py | 33 ---------- 4 files changed, 26 insertions(+), 95 deletions(-) delete mode 100644 openhands-sdk/openhands/sdk/event/streaming.py diff --git a/openhands-sdk/openhands/sdk/conversation/base.py b/openhands-sdk/openhands/sdk/conversation/base.py index 79079a7025..57f2f1280e 100644 --- a/openhands-sdk/openhands/sdk/conversation/base.py +++ b/openhands-sdk/openhands/sdk/conversation/base.py @@ -1,12 +1,16 @@ from abc import ABC, abstractmethod from collections.abc import Iterable, Mapping from pathlib import Path -from typing import TYPE_CHECKING, Protocol +from typing import TYPE_CHECKING, Protocol, TypeVar, cast from openhands.sdk.conversation.conversation_stats import ConversationStats from openhands.sdk.conversation.events_list_base import EventsListBase from openhands.sdk.conversation.secret_registry import SecretValue -from openhands.sdk.conversation.types import ConversationCallbackType, ConversationID +from openhands.sdk.conversation.types import ( + ConversationCallbackType, + ConversationID, + ConversationTokenCallbackType, +) from openhands.sdk.llm.llm import LLM from openhands.sdk.llm.message import Message from openhands.sdk.observability.laminar import ( @@ -27,6 +31,13 @@ from openhands.sdk.conversation.state import ConversationExecutionStatus +CallbackType = TypeVar( + "CallbackType", + ConversationCallbackType, + ConversationTokenCallbackType, +) + + class ConversationStateProtocol(Protocol): """Protocol defining the interface for conversation state objects.""" @@ -235,9 +246,7 @@ def ask_agent(self, question: str) -> str: ... @staticmethod - def compose_callbacks( - callbacks: Iterable[ConversationCallbackType], - ) -> ConversationCallbackType: + def compose_callbacks(callbacks: Iterable[CallbackType]) -> CallbackType: """Compose multiple callbacks into a single callback function. Args: @@ -252,4 +261,4 @@ def composed(event) -> None: if cb: cb(event) - return composed + return cast(CallbackType, composed) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py index 9abf20afe7..12a30ca79d 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py @@ -26,11 +26,10 @@ from openhands.sdk.event import ( MessageEvent, PauseEvent, - StreamingDeltaEvent, UserRejectObservation, ) from openhands.sdk.event.conversation_error import ConversationErrorEvent -from openhands.sdk.llm import LLM, LLMStreamChunk, Message, TextContent +from openhands.sdk.llm import LLM, Message, TextContent from openhands.sdk.llm.llm_registry import LLMRegistry from openhands.sdk.logger import get_logger from openhands.sdk.observability.laminar import observe @@ -50,13 +49,12 @@ class LocalConversation(BaseConversation): _state: ConversationState _visualizer: ConversationVisualizerBase | None _on_event: ConversationCallbackType + _on_token: ConversationTokenCallbackType | None max_iteration_per_run: int _stuck_detector: StuckDetector | None llm_registry: LLMRegistry _cleanup_initiated: bool - _on_token: ConversationTokenCallbackType | None - def __init__( self, agent: AgentBase, @@ -151,6 +149,12 @@ def _default_callback(e): self._visualizer = None self._on_event = BaseConversation.compose_callbacks(composed_list) + self._on_token = ( + BaseConversation.compose_callbacks(token_callbacks) + if token_callbacks + else None + ) + self.max_iteration_per_run = max_iteration_per_run # Initialize stuck detector @@ -165,43 +169,6 @@ def _default_callback(e): for llm in list(self.agent.get_all_llms()): self.llm_registry.add(llm) - def _compose_token_callbacks( - callbacks: list[ConversationTokenCallbackType], - ) -> ConversationTokenCallbackType: - def _composed(event): - for cb in callbacks: - cb(event) - - return _composed - - user_token_callback = ( - _compose_token_callbacks(token_callbacks) if token_callbacks else None - ) - - def _handle_stream_event(stream_chunk: LLMStreamChunk) -> None: - try: - self._on_event( - StreamingDeltaEvent(source="agent", stream_chunk=stream_chunk) - ) - except Exception: - logger.exception("stream_event_processing_error", exc_info=True) - if user_token_callback: - user_token_callback(stream_chunk) - - streaming_enabled = user_token_callback is not None - - if callbacks: - for cb in callbacks: - owner = getattr(cb, "__self__", None) - if owner is not None and getattr(owner, "requires_streaming", False): - streaming_enabled = True - break - - if self._visualizer and getattr(self._visualizer, "requires_streaming", False): - streaming_enabled = True - - self._on_token = _handle_stream_event if streaming_enabled else None - # Initialize secrets if provided if secrets: # Convert dict[str, str] to dict[str, SecretValue] @@ -350,17 +317,9 @@ def run(self) -> None: ConversationExecutionStatus.RUNNING ) - if self._on_token is not None: - self.agent.step( - self, - on_event=self._on_event, - on_token=self._on_token, - ) - else: - self.agent.step( - self, - on_event=self._on_event, - ) + self.agent.step( + self, on_event=self._on_event, on_token=self._on_token + ) iteration += 1 # Check for non-finished terminal conditions diff --git a/openhands-sdk/openhands/sdk/event/__init__.py b/openhands-sdk/openhands/sdk/event/__init__.py index 60bfbf89f4..9e4346e1dc 100644 --- a/openhands-sdk/openhands/sdk/event/__init__.py +++ b/openhands-sdk/openhands/sdk/event/__init__.py @@ -5,7 +5,6 @@ CondensationSummaryEvent, ) from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent -from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.event.llm_convertible import ( ActionEvent, AgentErrorEvent, @@ -15,7 +14,6 @@ SystemPromptEvent, UserRejectObservation, ) -from openhands.sdk.event.streaming import StreamingDeltaEvent from openhands.sdk.event.token import TokenEvent from openhands.sdk.event.types import EventID, ToolCallID from openhands.sdk.event.user_action import PauseEvent @@ -32,13 +30,11 @@ "MessageEvent", "AgentErrorEvent", "UserRejectObservation", - "StreamingDeltaEvent", "PauseEvent", "Condensation", "CondensationRequest", "CondensationSummaryEvent", "ConversationStateUpdateEvent", - "LLMCompletionLogEvent", "EventID", "ToolCallID", ] diff --git a/openhands-sdk/openhands/sdk/event/streaming.py b/openhands-sdk/openhands/sdk/event/streaming.py deleted file mode 100644 index f90534985b..0000000000 --- a/openhands-sdk/openhands/sdk/event/streaming.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import annotations - -from pydantic import Field -from rich.text import Text - -from openhands.sdk.event.base import Event -from openhands.sdk.event.types import SourceType -from openhands.sdk.llm.streaming import LLMStreamChunk, StreamPartKind - - -class StreamingDeltaEvent(Event): - """Event emitted for each incremental LLM streaming delta.""" - - source: SourceType = Field(default="agent") - stream_chunk: LLMStreamChunk - - @property - def part_kind(self) -> StreamPartKind: - return self.stream_chunk.part_kind - - @property - def visualize(self) -> Text: - content = Text() - content.append(f"Part: {self.stream_chunk.part_kind}\n", style="bold") - - if self.stream_chunk.text_delta: - content.append(self.stream_chunk.text_delta) - elif self.stream_chunk.arguments_delta: - content.append(self.stream_chunk.arguments_delta) - else: - content.append("[no streaming content]") - - return content From fa57f0857d0a7d03ffbdf929196f195ef5e3bfb8 Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 25 Nov 2025 21:40:39 -0500 Subject: [PATCH 13/15] update the right init --- openhands-sdk/openhands/sdk/event/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/openhands-sdk/openhands/sdk/event/__init__.py b/openhands-sdk/openhands/sdk/event/__init__.py index 9e4346e1dc..f2ae2ea5e3 100644 --- a/openhands-sdk/openhands/sdk/event/__init__.py +++ b/openhands-sdk/openhands/sdk/event/__init__.py @@ -5,6 +5,7 @@ CondensationSummaryEvent, ) from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent +from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.event.llm_convertible import ( ActionEvent, AgentErrorEvent, @@ -35,6 +36,7 @@ "CondensationRequest", "CondensationSummaryEvent", "ConversationStateUpdateEvent", + "LLMCompletionLogEvent", "EventID", "ToolCallID", ] From 66e209293bf7dce475494839135113a54626c623 Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 25 Nov 2025 21:41:06 -0500 Subject: [PATCH 14/15] rm streaming visualizer --- .../sdk/conversation/streaming_visualizer.py | 322 ------------------ 1 file changed, 322 deletions(-) delete mode 100644 openhands-sdk/openhands/sdk/conversation/streaming_visualizer.py diff --git a/openhands-sdk/openhands/sdk/conversation/streaming_visualizer.py b/openhands-sdk/openhands/sdk/conversation/streaming_visualizer.py deleted file mode 100644 index dfc7e2b702..0000000000 --- a/openhands-sdk/openhands/sdk/conversation/streaming_visualizer.py +++ /dev/null @@ -1,322 +0,0 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, Any - -from rich.console import Console, Group -from rich.live import Live -from rich.panel import Panel -from rich.text import Text - -from openhands.sdk.conversation.visualizer.default import ( - DefaultConversationVisualizer, -) -from openhands.sdk.event import ActionEvent, MessageEvent, StreamingDeltaEvent -from openhands.sdk.event.base import Event -from openhands.sdk.llm.llm import RESPONSES_COMPLETION_EVENT_TYPES -from openhands.sdk.llm.streaming import StreamPartKind - - -if TYPE_CHECKING: - from openhands.sdk.llm.streaming import LLMStreamChunk - - -# These are external inputs -_OBSERVATION_COLOR = "yellow" -_THOUGHT_COLOR = "bright_black" -_ERROR_COLOR = "red" -# These are agent actions -_ACTION_COLOR = "blue" -_MESSAGE_ASSISTANT_COLOR = _ACTION_COLOR - -DEFAULT_HIGHLIGHT_REGEX = { - r"^Reasoning:": f"bold {_THOUGHT_COLOR}", - r"^Thought:": f"bold {_THOUGHT_COLOR}", - r"^Action:": f"bold {_ACTION_COLOR}", - r"^Arguments:": f"bold {_ACTION_COLOR}", - r"^Tool:": f"bold {_OBSERVATION_COLOR}", - r"^Result:": f"bold {_OBSERVATION_COLOR}", - r"^Rejection Reason:": f"bold {_ERROR_COLOR}", - # Markdown-style - r"\*\*(.*?)\*\*": "bold", - r"\*(.*?)\*": "italic", -} - -_PANEL_PADDING = (1, 1) -_SECTION_CONFIG: dict[str, tuple[str, str]] = { - "reasoning": ("Reasoning", _THOUGHT_COLOR), - "assistant": ("Assistant", _ACTION_COLOR), - "function_arguments": ("Function Arguments", _ACTION_COLOR), - "refusal": ("Refusal", _ERROR_COLOR), -} - -_SESSION_CONFIG: dict[str, tuple[str, str]] = { - "message": ( - f"[bold {_MESSAGE_ASSISTANT_COLOR}]Message from Agent (streaming)" # type: ignore[str-format] - f"[/bold {_MESSAGE_ASSISTANT_COLOR}]", - _MESSAGE_ASSISTANT_COLOR, - ), - "action": ( - f"[bold {_ACTION_COLOR}]Agent Action (streaming)[/bold {_ACTION_COLOR}]", - _ACTION_COLOR, - ), -} - -_SECTION_ORDER = [ - "reasoning", - "assistant", - "function_arguments", - "refusal", -] - - -@dataclass(slots=True) -class _StreamSection: - header: str - style: str - content: str = "" - - -class _StreamSession: - def __init__( - self, - *, - console: Console, - session_type: str, - response_id: str | None, - output_index: int | None, - use_live: bool, - ) -> None: - self._console: Console = console - self._session_type: str = session_type - self._response_id: str | None = response_id - self._output_index: int | None = output_index - self._use_live: bool = use_live - self._sections: dict[str, _StreamSection] = {} - self._order: list[str] = [] - self._live: Live | None = None - self._last_renderable: Panel | None = None - - @property - def response_id(self) -> str | None: - return self._response_id - - def append_text(self, section_key: str, text: str | None) -> None: - if not text: - return - header, style = _SECTION_CONFIG.get(section_key, (section_key.title(), "cyan")) - section = self._sections.get(section_key) - if section is None: - section = _StreamSection(header, style) - self._sections[section_key] = section - self._order.append(section_key) - self._order.sort( - key=lambda key: _SECTION_ORDER.index(key) - if key in _SECTION_ORDER - else len(_SECTION_ORDER) - ) - section.content += text - self._update() - - def finish(self, *, persist: bool) -> None: - renderable = self._render_panel() - if self._use_live: - if self._live is not None: - self._live.stop() - self._live = None - if persist: - self._console.print(renderable) - self._console.print() - else: - self._console.print() - else: - if persist: - self._console.print(renderable) - self._console.print() - - def _update(self) -> None: - renderable = self._render_panel() - if self._use_live: - if self._live is None: - self._live = Live( - renderable, - console=self._console, - refresh_per_second=24, - transient=True, - ) - self._live.start() - else: - self._live.update(renderable) - else: - self._last_renderable = renderable - - def _render_panel(self) -> Panel: - body_parts: list[Any] = [] - for key in self._order: - section = self._sections[key] - if not section.content: - continue - body_parts.append(Text(f"{section.header}:", style=f"bold {section.style}")) - body_parts.append(Text(section.content, style=section.style)) - if not body_parts: - body_parts.append(Text("[streaming...]", style="dim")) - - title, border_style = _SESSION_CONFIG.get( - self._session_type, ("[bold cyan]Streaming[/bold cyan]", "cyan") - ) - return Panel( - Group(*body_parts), - title=title, - border_style=border_style, - padding=_PANEL_PADDING, - expand=True, - ) - - -class StreamingConversationVisualizer(DefaultConversationVisualizer): - """Streaming-focused visualizer that renders deltas in-place.""" - - requires_streaming: bool = True - - def __init__( - self, - highlight_regex: dict[str, str] | None = None, - skip_user_messages: bool = False, - ) -> None: - super().__init__( - highlight_regex=highlight_regex, - skip_user_messages=skip_user_messages, - ) - self._use_live: bool = self._console.is_terminal - self._stream_sessions: dict[tuple[str, int, str], _StreamSession] = {} - - def on_event(self, event: Event) -> None: - if isinstance(event, StreamingDeltaEvent): - self._handle_stream_chunk(event.stream_chunk) - return - - if self._should_skip_event(event): - return - - super().on_event(event) - - def _handle_stream_chunk(self, stream_chunk: "LLMStreamChunk") -> None: - if stream_chunk.part_kind == "status": - if ( - stream_chunk.type in RESPONSES_COMPLETION_EVENT_TYPES - or stream_chunk.is_final - ): - self._finish_stream_sessions(stream_chunk.response_id, persist=True) - return - - session_type = self._session_type_for_part(stream_chunk.part_kind) - if session_type is None: - return - - key = self._make_stream_session_key(stream_chunk, session_type) - session = self._stream_sessions.get(key) - if session is None: - session = _StreamSession( - console=self._console, - session_type=session_type, - response_id=stream_chunk.response_id, - output_index=stream_chunk.output_index, - use_live=self._use_live, - ) - self._stream_sessions[key] = session - - section_key = self._section_key_for_part(stream_chunk.part_kind) - session.append_text( - section_key, stream_chunk.text_delta or stream_chunk.arguments_delta - ) - - if stream_chunk.is_final: - self._finish_session_by_key(key, persist=True) - - def _session_type_for_part(self, part_kind: StreamPartKind) -> str | None: - if part_kind in {"assistant_message", "reasoning_summary", "refusal"}: - return "message" - if part_kind in {"function_call_arguments"}: - return "action" - return None - - def _section_key_for_part(self, part_kind: StreamPartKind) -> str: - if part_kind == "assistant_message": - return "assistant" - if part_kind == "reasoning_summary": - return "reasoning" - if part_kind == "function_call_arguments": - return "function_arguments" - if part_kind == "refusal": - return "refusal" - return "assistant" - - def _make_stream_session_key( - self, chunk: "LLMStreamChunk", session_type: str - ) -> tuple[str, int, str]: - response_key = ( - chunk.response_id - or f"unknown::{chunk.item_id or chunk.output_index or chunk.type}" - ) - output_index = chunk.output_index if chunk.output_index is not None else 0 - return (response_key, output_index, session_type) - - def _finish_stream_sessions( - self, response_id: str | None, *, persist: bool - ) -> None: - if not self._stream_sessions: - return - if response_id is None: - keys = list(self._stream_sessions.keys()) - else: - keys = [ - key - for key, session in self._stream_sessions.items() - if session.response_id == response_id - ] - if not keys: - keys = list(self._stream_sessions.keys()) - for key in keys: - self._finish_session_by_key(key, persist=persist) - - def _finish_session_by_key( - self, key: tuple[str, int, str], *, persist: bool - ) -> None: - session = self._stream_sessions.pop(key, None) - if session is not None: - session.finish(persist=persist) - - def _create_event_panel(self, event: Event) -> Panel | None: - if isinstance(event, StreamingDeltaEvent): - content = event.visualize - if not content.plain.strip(): - return None - if self._highlight_patterns: - content = self._apply_highlighting(content) - return Panel( - content, - title="[bold cyan]Streaming Delta[/bold cyan]", - border_style="cyan", - padding=_PANEL_PADDING, - expand=True, - ) - return None - - def _should_skip_event(self, event: Event) -> bool: - if isinstance(event, MessageEvent) and event.source == "agent": - return True - if isinstance(event, ActionEvent) and event.source == "agent": - return True - return False - - -def create_streaming_visualizer( - highlight_regex: dict[str, str] | None = None, - **kwargs, -) -> StreamingConversationVisualizer: - """Create a streaming-aware visualizer instance.""" - - return StreamingConversationVisualizer( - highlight_regex=DEFAULT_HIGHLIGHT_REGEX - if highlight_regex is None - else highlight_regex, - **kwargs, - ) From 9d1914cd2c39e57cf930dc78d273e35cd645bd3c Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 25 Nov 2025 21:55:14 -0500 Subject: [PATCH 15/15] some attempt to simplify --- openhands-sdk/openhands/sdk/llm/llm.py | 30 +++++++++++--------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index 8d35f8c7e1..9628cafdd6 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -211,6 +211,14 @@ class LLM(BaseModel, RetryMixin, NonNativeToolCallingMixin): ) ollama_base_url: str | None = Field(default=None) + stream: bool = Field( + default=False, + description=( + "Enable streaming responses from the LLM. " + "When enabled, the provided `on_token` callback in .completions " + "and .responses will be invoked for each chunk of tokens." + ), + ) drop_params: bool = Field(default=True) modify_params: bool = Field( default=True, @@ -290,15 +298,6 @@ class LLM(BaseModel, RetryMixin, NonNativeToolCallingMixin): "telemetry, and spend tracking." ), ) - metadata: dict[str, Any] = Field( - default_factory=dict, - description=( - "Additional metadata for the LLM instance. " - "Example structure: " - "{'trace_version': '1.0.0', 'tags': ['model:gpt-4', 'agent:my-agent'], " - "'session_id': 'session-123', 'trace_user_id': 'user-456'}" - ), - ) litellm_extra_body: dict[str, Any] = Field( default_factory=dict, description=( @@ -504,8 +503,8 @@ def completion( >>> print(response.content) """ # Check if streaming is requested - if on_token is not None or kwargs.get("stream", False): - raise ValueError("Streaming is not supported for completion API yet") + if kwargs.get("stream", False) or self.stream or on_token is not None: + raise ValueError("Streaming is not supported in completion() method") # 1) serialize messages formatted_messages = self.format_messages_for_llm(messages) @@ -631,18 +630,14 @@ def responses( """Alternative invocation path using OpenAI Responses API via LiteLLM. Maps Message[] -> (instructions, input[]) and returns LLMResponse. - Streaming is enabled when ``on_token`` is provided. """ - user_requested_stream = bool(kwargs.get("stream", False)) - if user_requested_stream and on_token is None: + enable_streaming = bool(kwargs.get("stream", False)) or self.stream + if enable_streaming and on_token is None: raise ValueError( "Streaming for Responses API requires an on_token callback" ) - if on_token is not None: kwargs["stream"] = True - else: - kwargs.pop("stream", None) # Build instructions + input list using dedicated Responses formatter instructions, input_items = self.format_messages_for_responses(messages) @@ -922,6 +917,7 @@ def _get_chunk_arguments(self, chunk: Any) -> str | None: # ========================================================================= # Transport + helpers + # ========================================================================= def _transport_call( self, *, messages: list[dict[str, Any]], **kwargs ) -> ModelResponse: