Skip to content

Commit e31b728

Browse files
committed
Refactor streaming chunk model and visualizer
1 parent d331abf commit e31b728

File tree

12 files changed

+424
-164
lines changed

12 files changed

+424
-164
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,17 @@ llm = registry.get("default")
145145

146146
You can receive incremental deltas from the Responses API by supplying a token
147147
callback when constructing a conversation. Each callback receives an
148-
``LLMStreamEvent`` describing the delta.
148+
``LLMStreamChunk`` describing the delta.
149149

150150
```python
151151
from pathlib import Path
152-
from openhands.sdk import Conversation, LLMStreamEvent
152+
from openhands.sdk import Conversation, LLMStreamChunk
153153

154154
log_dir = Path("logs/stream")
155155
log_dir.mkdir(parents=True, exist_ok=True)
156156

157-
def on_token(event: LLMStreamEvent) -> None:
158-
print(event.text or event.arguments or "", end="", flush=True)
157+
def on_token(event: LLMStreamChunk) -> None:
158+
print(event.text_delta or event.arguments_delta or "", end="", flush=True)
159159

160160
conversation = Conversation(agent=agent, token_callbacks=[on_token])
161161
conversation.send_message("Summarize the benefits of token streaming.")

examples/01_standalone_sdk/24_responses_streaming.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,34 @@
1515

1616
from pydantic import SecretStr
1717

18-
from openhands.sdk import Conversation, LLMStreamEvent, get_logger
18+
from openhands.sdk import (
19+
Conversation,
20+
ConversationCallbackType,
21+
LLMStreamChunk,
22+
get_logger,
23+
)
24+
from openhands.sdk.conversation.visualizer import create_streaming_visualizer
1925
from openhands.sdk.llm import LLM
2026
from openhands.tools.preset.default import get_default_agent
2127

2228

29+
PRINT_STREAM_TO_STDOUT = False
30+
31+
2332
logger = get_logger(__name__)
2433
LOG_DIR = Path("logs/stream")
2534

2635

27-
def _serialize_event(event: LLMStreamEvent) -> dict[str, Any]:
36+
def _serialize_event(event: LLMStreamChunk) -> dict[str, Any]:
2837
record = {
2938
"type": event.type,
30-
"text": event.text,
31-
"arguments": event.arguments,
39+
"part_kind": event.part_kind,
40+
"text": event.text_delta,
41+
"arguments": event.arguments_delta,
3242
"output_index": event.output_index,
3343
"content_index": event.content_index,
3444
"item_id": event.item_id,
45+
"response_id": event.response_id,
3546
"is_final": event.is_final,
3647
}
3748
return record
@@ -58,21 +69,28 @@ def main() -> None:
5869
LOG_DIR.mkdir(parents=True, exist_ok=True)
5970
log_path = LOG_DIR / f"responses_stream_{timestamp}.jsonl"
6071

61-
def on_token(event: LLMStreamEvent) -> None:
72+
def on_token(event: LLMStreamChunk) -> None:
6273
record = _serialize_event(event)
6374
with log_path.open("a", encoding="utf-8") as fp:
6475
fp.write(json.dumps(record) + "\n")
6576

66-
stream_chunk = event.text or event.arguments
67-
if stream_chunk:
68-
print(stream_chunk, end="", flush=True)
69-
if event.is_final:
77+
delta = event.text_delta or event.arguments_delta
78+
if delta and PRINT_STREAM_TO_STDOUT:
79+
print(delta, end="", flush=True)
80+
if event.is_final and event.part_kind == "status" and PRINT_STREAM_TO_STDOUT:
7081
print("\n--- stream complete ---")
7182

83+
callbacks: list[ConversationCallbackType] = []
84+
if not PRINT_STREAM_TO_STDOUT:
85+
streaming_visualizer = create_streaming_visualizer()
86+
callbacks.append(streaming_visualizer.on_event)
87+
7288
conversation = Conversation(
7389
agent=agent,
7490
workspace=os.getcwd(),
7591
token_callbacks=[on_token],
92+
callbacks=callbacks or None,
93+
visualize=False,
7694
)
7795

7896
story_prompt = (
@@ -83,7 +101,7 @@ def on_token(event: LLMStreamEvent) -> None:
83101
conversation.run()
84102

85103
cleanup_prompt = (
86-
"Thank you. Please delete streaming_story.md now that I've read it, "
104+
"Thank you. Please delete the streaming story file now that I've read it, "
87105
"then confirm the deletion."
88106
)
89107
conversation.send_message(cleanup_prompt)

llm_streaming_refactor_plan.md

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,12 @@ Keeping the raw LiteLLM payload inside each `LLMStreamChunk` means we do **not**
7474

7575
## Visualization strategy
7676

77-
1. **Track a hierarchy per conversation event.** When a LiteLLM stream begins we emit a placeholder `MessageEvent` (assistant message) or `ActionEvent` (function call). Each `LLMStreamChunk` should include a `response_id`/`item_id` so we can map to the owning conversation event:
78-
- `output_text` → existing `MessageEvent` for the assistant response.
79-
- `reasoning_summary_*` → reasoning area inside `MessageEvent`.
80-
- `function_call_arguments_*` → arguments area inside `ActionEvent`.
81-
2. **Use `Live` per section.** For each unique `(conversation_event_id, part_kind, item_id)` create a Rich `Live` instance that updates with concatenated text. When the part is terminal, stop the `Live` and leave the final text in place.
82-
3. **Avoid newlines unless emitted by the model.** We’ll join chunks using plain string concatenation and only add newline characters when the delta contains `\n` or when we intentionally insert separators (e.g., between tool JSON arguments).
83-
4. **Segregate sections:**
84-
- `Reasoning:` header per `MessageEvent`. Each new reasoning item gets its own Live line under that message.
85-
- `Assistant:` body for natural language output, appended inside the message panel.
86-
- `Function Arguments:` block under each action panel, streaming JSON incrementally.
77+
We will leave the existing `ConversationVisualizer` untouched for default/legacy usage and introduce a new `StreamingConversationVisualizer` that renders deltas directly inside the final panels:
78+
79+
1. **Create/update per-response panels.** The first chunk for a `(response_id, output_index)` pair creates (or reuses) a panel for the assistant message or tool call and immediately starts streaming into it.
80+
2. **Route text into semantic sections.** Assistant text, reasoning summaries, function-call arguments, tool output, and refusals each update their own section inside the panel.
81+
3. **Use Rich `Live` when interactive.** In a real terminal we keep the panel on screen and update it in place; when the console is not interactive (tests, logging) we fall back to static updates.
82+
4. **Leave the panel in place when finished.** When the final chunk arrives we stop updating but keep the panel visible; the subsequent `MessageEvent`/`ActionEvent` is suppressed to avoid duplicate re-rendering.
8783

8884
## Implementation roadmap
8985

@@ -95,11 +91,11 @@ Keeping the raw LiteLLM payload inside each `LLMStreamChunk` means we do **not**
9591
- When we enqueue the initial `MessageEvent`/`ActionEvent`, cache a lookup (e.g., `inflight_streams[(response_id, output_index)] = conversation_event_id`).
9692
- Update `LocalConversation` token callback wrapper to attach the resolved conversation event ID onto the `LLMStreamChunk` before emitting/persisting.
9793

98-
3. **Visualizer rewrite**
99-
- Maintain `self._stream_views[(conversation_event_id, part_kind, item_id)] = LiveState` where `LiveState` wraps buffer, style, and a `Live` instance.
100-
- On streaming updates: update buffer, `live.update(Text(buffer, style=...))` without printing newlines.
101-
- On final chunk: stop `Live`, render final static text, and optionally record in conversation state for playback.
102-
- Ensure replay (when visualizer processes stored events) converts stored parts into final text as well.
94+
3. **Streaming visualizer**
95+
- Implement `StreamingConversationVisualizer` with lightweight session tracking (keyed by response/output) that owns Rich panels for streaming sections.
96+
- Stream updates into the same panel that will remain visible after completion; use `Live` only when running in an interactive terminal.
97+
- Suppress duplicate rendering when the final `MessageEvent`/`ActionEvent` arrives, since the streamed panel already contains the content.
98+
- Provide a factory helper (e.g., `create_streaming_visualizer`) for callers that want the streaming experience.
10399

104100
4. **Persistence / tests**
105101
- Update tests to ensure:
@@ -117,5 +113,6 @@ Keeping the raw LiteLLM payload inside each `LLMStreamChunk` means we do **not**
117113
- [ ] Refactor classifier to output `LLMStreamChunk` objects with clear `part_kind`.
118114
- [ ] Track in-flight conversation events so parts know their owner.
119115
- [ ] Replace print-based visualizer streaming with `Live` blocks per section.
120-
- [ ] Extend unit tests to cover multiple messages, reasoning segments, and tool calls.
116+
- [ ] Extend unit tests to cover multiple messages, reasoning segments, tool calls, and the new streaming visualizer.
117+
- [ ] Update the standalone streaming example to wire in the streaming visualizer helper.
121118
- [ ] Manually validate with long streaming example to confirm smooth in-place updates.

openhands/sdk/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,20 @@
1313
RemoteConversation,
1414
)
1515
from openhands.sdk.conversation.conversation_stats import ConversationStats
16+
from openhands.sdk.conversation.visualizer import (
17+
ConversationVisualizer,
18+
StreamingConversationVisualizer,
19+
create_default_visualizer,
20+
create_streaming_visualizer,
21+
)
1622
from openhands.sdk.event import Event, LLMConvertibleEvent
1723
from openhands.sdk.event.llm_convertible import MessageEvent
1824
from openhands.sdk.io import FileStore, LocalFileStore
1925
from openhands.sdk.llm import (
2026
LLM,
2127
ImageContent,
2228
LLMRegistry,
23-
LLMStreamEvent,
29+
LLMStreamChunk,
2430
Message,
2531
RedactedThinkingBlock,
2632
RegistryEvent,
@@ -60,9 +66,13 @@
6066
__all__ = [
6167
"LLM",
6268
"LLMRegistry",
63-
"LLMStreamEvent",
69+
"LLMStreamChunk",
6470
"TokenCallbackType",
6571
"ConversationStats",
72+
"ConversationVisualizer",
73+
"StreamingConversationVisualizer",
74+
"create_default_visualizer",
75+
"create_streaming_visualizer",
6676
"RegistryEvent",
6777
"Message",
6878
"TextContent",

openhands/sdk/conversation/impl/local_conversation.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
StreamingDeltaEvent,
2121
UserRejectObservation,
2222
)
23-
from openhands.sdk.llm import LLM, LLMStreamEvent, Message, TextContent
23+
from openhands.sdk.llm import LLM, LLMStreamChunk, Message, TextContent
2424
from openhands.sdk.llm.llm_registry import LLMRegistry
2525
from openhands.sdk.logger import get_logger
2626
from openhands.sdk.security.confirmation_policy import (
@@ -129,15 +129,15 @@ def _composed(event):
129129
_compose_token_callbacks(token_callbacks) if token_callbacks else None
130130
)
131131

132-
def _handle_stream_event(stream_event: LLMStreamEvent) -> None:
132+
def _handle_stream_event(stream_chunk: LLMStreamChunk) -> None:
133133
try:
134134
self._on_event(
135-
StreamingDeltaEvent(source="agent", stream_event=stream_event)
135+
StreamingDeltaEvent(source="agent", stream_chunk=stream_chunk)
136136
)
137137
except Exception:
138138
logger.exception("stream_event_processing_error", exc_info=True)
139139
if user_token_callback:
140-
user_token_callback(stream_event)
140+
user_token_callback(stream_chunk)
141141

142142
self._on_token = _handle_stream_event
143143

0 commit comments

Comments
 (0)