Skip to content

Commit 5f86198

Browse files
committed
Refactor streaming chunk model and visualizer
1 parent d331abf commit 5f86198

File tree

11 files changed

+319
-146
lines changed

11 files changed

+319
-146
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,17 @@ llm = registry.get("default")
145145

146146
You can receive incremental deltas from the Responses API by supplying a token
147147
callback when constructing a conversation. Each callback receives an
148-
``LLMStreamEvent`` describing the delta.
148+
``LLMStreamChunk`` describing the delta.
149149

150150
```python
151151
from pathlib import Path
152-
from openhands.sdk import Conversation, LLMStreamEvent
152+
from openhands.sdk import Conversation, LLMStreamChunk
153153

154154
log_dir = Path("logs/stream")
155155
log_dir.mkdir(parents=True, exist_ok=True)
156156

157-
def on_token(event: LLMStreamEvent) -> None:
158-
print(event.text or event.arguments or "", end="", flush=True)
157+
def on_token(event: LLMStreamChunk) -> None:
158+
print(event.text_delta or event.arguments_delta or "", end="", flush=True)
159159

160160
conversation = Conversation(agent=agent, token_callbacks=[on_token])
161161
conversation.send_message("Summarize the benefits of token streaming.")

examples/01_standalone_sdk/24_responses_streaming.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,28 @@
1515

1616
from pydantic import SecretStr
1717

18-
from openhands.sdk import Conversation, LLMStreamEvent, get_logger
18+
from openhands.sdk import Conversation, LLMStreamChunk, get_logger
1919
from openhands.sdk.llm import LLM
2020
from openhands.tools.preset.default import get_default_agent
2121

2222

23+
PRINT_STREAM_TO_STDOUT = False
24+
25+
2326
logger = get_logger(__name__)
2427
LOG_DIR = Path("logs/stream")
2528

2629

27-
def _serialize_event(event: LLMStreamEvent) -> dict[str, Any]:
30+
def _serialize_event(event: LLMStreamChunk) -> dict[str, Any]:
2831
record = {
2932
"type": event.type,
30-
"text": event.text,
31-
"arguments": event.arguments,
33+
"part_kind": event.part_kind,
34+
"text": event.text_delta,
35+
"arguments": event.arguments_delta,
3236
"output_index": event.output_index,
3337
"content_index": event.content_index,
3438
"item_id": event.item_id,
39+
"response_id": event.response_id,
3540
"is_final": event.is_final,
3641
}
3742
return record
@@ -58,15 +63,15 @@ def main() -> None:
5863
LOG_DIR.mkdir(parents=True, exist_ok=True)
5964
log_path = LOG_DIR / f"responses_stream_{timestamp}.jsonl"
6065

61-
def on_token(event: LLMStreamEvent) -> None:
66+
def on_token(event: LLMStreamChunk) -> None:
6267
record = _serialize_event(event)
6368
with log_path.open("a", encoding="utf-8") as fp:
6469
fp.write(json.dumps(record) + "\n")
6570

66-
stream_chunk = event.text or event.arguments
67-
if stream_chunk:
68-
print(stream_chunk, end="", flush=True)
69-
if event.is_final:
71+
delta = event.text_delta or event.arguments_delta
72+
if delta and PRINT_STREAM_TO_STDOUT:
73+
print(delta, end="", flush=True)
74+
if event.is_final and event.part_kind == "status" and PRINT_STREAM_TO_STDOUT:
7075
print("\n--- stream complete ---")
7176

7277
conversation = Conversation(
@@ -83,7 +88,7 @@ def on_token(event: LLMStreamEvent) -> None:
8388
conversation.run()
8489

8590
cleanup_prompt = (
86-
"Thank you. Please delete streaming_story.md now that I've read it, "
91+
"Thank you. Please delete the streaming story file now that I've read it, "
8792
"then confirm the deletion."
8893
)
8994
conversation.send_message(cleanup_prompt)

openhands/sdk/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
LLM,
2121
ImageContent,
2222
LLMRegistry,
23-
LLMStreamEvent,
23+
LLMStreamChunk,
2424
Message,
2525
RedactedThinkingBlock,
2626
RegistryEvent,
@@ -60,7 +60,7 @@
6060
__all__ = [
6161
"LLM",
6262
"LLMRegistry",
63-
"LLMStreamEvent",
63+
"LLMStreamChunk",
6464
"TokenCallbackType",
6565
"ConversationStats",
6666
"RegistryEvent",

openhands/sdk/conversation/impl/local_conversation.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
StreamingDeltaEvent,
2121
UserRejectObservation,
2222
)
23-
from openhands.sdk.llm import LLM, LLMStreamEvent, Message, TextContent
23+
from openhands.sdk.llm import LLM, LLMStreamChunk, Message, TextContent
2424
from openhands.sdk.llm.llm_registry import LLMRegistry
2525
from openhands.sdk.logger import get_logger
2626
from openhands.sdk.security.confirmation_policy import (
@@ -129,15 +129,15 @@ def _composed(event):
129129
_compose_token_callbacks(token_callbacks) if token_callbacks else None
130130
)
131131

132-
def _handle_stream_event(stream_event: LLMStreamEvent) -> None:
132+
def _handle_stream_event(stream_chunk: LLMStreamChunk) -> None:
133133
try:
134134
self._on_event(
135-
StreamingDeltaEvent(source="agent", stream_event=stream_event)
135+
StreamingDeltaEvent(source="agent", stream_chunk=stream_chunk)
136136
)
137137
except Exception:
138138
logger.exception("stream_event_processing_error", exc_info=True)
139139
if user_token_callback:
140-
user_token_callback(stream_event)
140+
user_token_callback(stream_chunk)
141141

142142
self._on_token = _handle_stream_event
143143

0 commit comments

Comments
 (0)