Skip to content

Commit db1963e

Browse files
committed
Add streaming support for Responses API
1 parent 4ffaa97 commit db1963e

File tree

18 files changed

+824
-22
lines changed

18 files changed

+824
-22
lines changed

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,31 @@ registry.add("default", llm)
141141
llm = registry.get("default")
142142
```
143143

144+
### Streaming Responses
145+
146+
You can receive incremental deltas from the Responses API by supplying a token
147+
callback when constructing a conversation. Each callback receives an
148+
``LLMStreamEvent`` describing the delta.
149+
150+
```python
151+
from pathlib import Path
152+
from openhands.sdk import Conversation, LLMStreamEvent
153+
154+
log_dir = Path("logs/stream")
155+
log_dir.mkdir(parents=True, exist_ok=True)
156+
157+
def on_token(event: LLMStreamEvent) -> None:
158+
print(event.text or event.arguments or "", end="", flush=True)
159+
160+
conversation = Conversation(agent=agent, token_callbacks=[on_token])
161+
conversation.send_message("Summarize the benefits of token streaming.")
162+
conversation.run()
163+
```
164+
165+
See `examples/01_standalone_sdk/24_responses_streaming.py` for a complete
166+
example that also persists each delta as JSON in `./logs/stream/`.
167+
168+
144169
### Tools
145170

146171
Tools provide agents with capabilities to interact with the environment. The SDK includes several built-in tools:
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""Streaming Responses API example.
2+
3+
This demonstrates how to enable token streaming for the Responses API path,
4+
log streaming deltas to ``./logs/stream/`` as JSON, and print the streamed text
5+
incrementally to the terminal.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import datetime as _dt
11+
import json
12+
import os
13+
from pathlib import Path
14+
from typing import Any
15+
16+
from pydantic import SecretStr
17+
18+
from openhands.sdk import Conversation, LLMStreamEvent, get_logger
19+
from openhands.sdk.llm import LLM
20+
from openhands.tools.preset.default import get_default_agent
21+
22+
23+
logger = get_logger(__name__)
24+
LOG_DIR = Path("logs/stream")
25+
26+
27+
def _serialize_event(event: LLMStreamEvent) -> dict[str, Any]:
28+
record = {
29+
"type": event.type,
30+
"text": event.text,
31+
"arguments": event.arguments,
32+
"output_index": event.output_index,
33+
"content_index": event.content_index,
34+
"item_id": event.item_id,
35+
"is_final": event.is_final,
36+
}
37+
return record
38+
39+
40+
def main() -> None:
41+
api_key = os.getenv("LLM_API_KEY") or os.getenv("OPENAI_API_KEY")
42+
if not api_key:
43+
raise RuntimeError("Set LLM_API_KEY or OPENAI_API_KEY in your environment.")
44+
45+
model = os.getenv("LLM_MODEL", "openhands/gpt-5-codex")
46+
base_url = os.getenv("LLM_BASE_URL")
47+
48+
llm = LLM(
49+
model=model,
50+
api_key=SecretStr(api_key),
51+
base_url=base_url,
52+
service_id="stream-demo",
53+
)
54+
55+
agent = get_default_agent(llm=llm, cli_mode=True)
56+
57+
timestamp = _dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S")
58+
LOG_DIR.mkdir(parents=True, exist_ok=True)
59+
log_path = LOG_DIR / f"responses_stream_{timestamp}.jsonl"
60+
61+
def on_token(event: LLMStreamEvent) -> None:
62+
record = _serialize_event(event)
63+
with log_path.open("a", encoding="utf-8") as fp:
64+
fp.write(json.dumps(record) + "\n")
65+
66+
stream_chunk = event.text or event.arguments
67+
if stream_chunk:
68+
print(stream_chunk, end="", flush=True)
69+
if event.is_final:
70+
print("\n--- stream complete ---")
71+
72+
conversation = Conversation(
73+
agent=agent,
74+
workspace=os.getcwd(),
75+
token_callbacks=[on_token],
76+
)
77+
78+
story_prompt = (
79+
"Compose a vivid, many-paragraph story about a developer discovering "
80+
"the value of streaming token updates. Stream the narrative as you "
81+
"write it, and when you complete the story, save the full text to a "
82+
"file named 'streaming_story.md'."
83+
)
84+
conversation.send_message(story_prompt)
85+
conversation.run()
86+
87+
cleanup_prompt = (
88+
"Thank you. Please delete streaming_story.md now that I've read it, "
89+
"then confirm the deletion."
90+
)
91+
conversation.send_message(cleanup_prompt)
92+
conversation.run()
93+
94+
logger.info("Stream log written to %s", log_path)
95+
96+
97+
if __name__ == "__main__":
98+
main()

openhands/sdk/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
LLM,
2121
ImageContent,
2222
LLMRegistry,
23+
LLMStreamEvent,
2324
Message,
2425
RedactedThinkingBlock,
2526
RegistryEvent,
2627
TextContent,
2728
ThinkingBlock,
29+
TokenCallbackType,
2830
)
2931
from openhands.sdk.logger import get_logger
3032
from openhands.sdk.mcp import (
@@ -58,6 +60,8 @@
5860
__all__ = [
5961
"LLM",
6062
"LLMRegistry",
63+
"LLMStreamEvent",
64+
"TokenCallbackType",
6165
"ConversationStats",
6266
"RegistryEvent",
6367
"Message",

openhands/sdk/agent/agent.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
import openhands.sdk.security.risk as risk
66
from openhands.sdk.agent.base import AgentBase
77
from openhands.sdk.context.view import View
8-
from openhands.sdk.conversation import ConversationCallbackType, ConversationState
8+
from openhands.sdk.conversation import (
9+
ConversationCallbackType,
10+
ConversationState,
11+
ConversationTokenCallbackType,
12+
)
913
from openhands.sdk.conversation.state import AgentExecutionStatus
1014
from openhands.sdk.event import (
1115
ActionEvent,
@@ -133,6 +137,7 @@ def step(
133137
self,
134138
state: ConversationState,
135139
on_event: ConversationCallbackType,
140+
on_token: ConversationTokenCallbackType | None = None,
136141
) -> None:
137142
# Check for pending actions (implicit confirmation)
138143
# and execute them before sampling new actions.
@@ -182,13 +187,15 @@ def step(
182187
store=False,
183188
add_security_risk_prediction=self._add_security_risk_prediction,
184189
metadata=self.llm.metadata,
190+
on_token=on_token,
185191
)
186192
else:
187193
llm_response = self.llm.completion(
188194
messages=_messages,
189195
tools=list(self.tools_map.values()),
190196
extra_body={"metadata": self.llm.metadata},
191197
add_security_risk_prediction=self._add_security_risk_prediction,
198+
on_token=on_token,
192199
)
193200
except Exception as e:
194201
# If there is a condenser registered and the exception is a context window

openhands/sdk/agent/base.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222

2323
if TYPE_CHECKING:
2424
from openhands.sdk.conversation.state import ConversationState
25-
from openhands.sdk.conversation.types import ConversationCallbackType
25+
from openhands.sdk.conversation.types import (
26+
ConversationCallbackType,
27+
ConversationTokenCallbackType,
28+
)
2629

2730
logger = get_logger(__name__)
2831

@@ -236,6 +239,7 @@ def step(
236239
self,
237240
state: "ConversationState",
238241
on_event: "ConversationCallbackType",
242+
on_token: "ConversationTokenCallbackType | None" = None,
239243
) -> None:
240244
"""Taking a step in the conversation.
241245
@@ -247,6 +251,9 @@ def step(
247251
4.1 If conversation is finished, set state.agent_status to FINISHED
248252
4.2 Otherwise, just return, Conversation will kick off the next step
249253
254+
If the underlying LLM supports streaming, partial deltas are forwarded to
255+
``on_token`` before the full response is returned.
256+
250257
NOTE: state will be mutated in-place.
251258
"""
252259

openhands/sdk/conversation/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
from openhands.sdk.conversation.secrets_manager import SecretsManager
88
from openhands.sdk.conversation.state import ConversationState
99
from openhands.sdk.conversation.stuck_detector import StuckDetector
10-
from openhands.sdk.conversation.types import ConversationCallbackType
10+
from openhands.sdk.conversation.types import (
11+
ConversationCallbackType,
12+
ConversationTokenCallbackType,
13+
)
1114
from openhands.sdk.conversation.visualizer import ConversationVisualizer
1215

1316

@@ -16,6 +19,7 @@
1619
"BaseConversation",
1720
"ConversationState",
1821
"ConversationCallbackType",
22+
"ConversationTokenCallbackType",
1923
"ConversationVisualizer",
2024
"SecretsManager",
2125
"StuckDetector",

openhands/sdk/conversation/conversation.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
from openhands.sdk.agent.base import AgentBase
44
from openhands.sdk.conversation.base import BaseConversation
55
from openhands.sdk.conversation.secrets_manager import SecretValue
6-
from openhands.sdk.conversation.types import ConversationCallbackType, ConversationID
6+
from openhands.sdk.conversation.types import (
7+
ConversationCallbackType,
8+
ConversationID,
9+
ConversationTokenCallbackType,
10+
)
711
from openhands.sdk.logger import get_logger
812
from openhands.sdk.workspace import LocalWorkspace, RemoteWorkspace
913

@@ -32,6 +36,7 @@ def __new__(
3236
persistence_dir: str | None = None,
3337
conversation_id: ConversationID | None = None,
3438
callbacks: list[ConversationCallbackType] | None = None,
39+
token_callbacks: list[ConversationTokenCallbackType] | None = None,
3540
max_iteration_per_run: int = 500,
3641
stuck_detection: bool = True,
3742
visualize: bool = True,
@@ -46,6 +51,7 @@ def __new__(
4651
workspace: RemoteWorkspace,
4752
conversation_id: ConversationID | None = None,
4853
callbacks: list[ConversationCallbackType] | None = None,
54+
token_callbacks: list[ConversationTokenCallbackType] | None = None,
4955
max_iteration_per_run: int = 500,
5056
stuck_detection: bool = True,
5157
visualize: bool = True,
@@ -60,6 +66,7 @@ def __new__(
6066
persistence_dir: str | None = None,
6167
conversation_id: ConversationID | None = None,
6268
callbacks: list[ConversationCallbackType] | None = None,
69+
token_callbacks: list[ConversationTokenCallbackType] | None = None,
6370
max_iteration_per_run: int = 500,
6471
stuck_detection: bool = True,
6572
visualize: bool = True,
@@ -81,6 +88,7 @@ def __new__(
8188
agent=agent,
8289
conversation_id=conversation_id,
8390
callbacks=callbacks,
91+
token_callbacks=token_callbacks,
8492
max_iteration_per_run=max_iteration_per_run,
8593
stuck_detection=stuck_detection,
8694
visualize=visualize,
@@ -92,6 +100,7 @@ def __new__(
92100
agent=agent,
93101
conversation_id=conversation_id,
94102
callbacks=callbacks,
103+
token_callbacks=token_callbacks,
95104
max_iteration_per_run=max_iteration_per_run,
96105
stuck_detection=stuck_detection,
97106
visualize=visualize,

openhands/sdk/conversation/impl/local_conversation.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,19 @@
88
from openhands.sdk.conversation.state import AgentExecutionStatus, ConversationState
99
from openhands.sdk.conversation.stuck_detector import StuckDetector
1010
from openhands.sdk.conversation.title_utils import generate_conversation_title
11-
from openhands.sdk.conversation.types import ConversationCallbackType, ConversationID
11+
from openhands.sdk.conversation.types import (
12+
ConversationCallbackType,
13+
ConversationID,
14+
ConversationTokenCallbackType,
15+
)
1216
from openhands.sdk.conversation.visualizer import create_default_visualizer
1317
from openhands.sdk.event import (
1418
MessageEvent,
1519
PauseEvent,
20+
StreamingDeltaEvent,
1621
UserRejectObservation,
1722
)
18-
from openhands.sdk.llm import LLM, Message, TextContent
23+
from openhands.sdk.llm import LLM, LLMStreamEvent, Message, TextContent
1924
from openhands.sdk.llm.llm_registry import LLMRegistry
2025
from openhands.sdk.logger import get_logger
2126
from openhands.sdk.security.confirmation_policy import (
@@ -35,6 +40,7 @@ def __init__(
3540
persistence_dir: str | None = None,
3641
conversation_id: ConversationID | None = None,
3742
callbacks: list[ConversationCallbackType] | None = None,
43+
token_callbacks: list[ConversationTokenCallbackType] | None = None,
3844
max_iteration_per_run: int = 500,
3945
stuck_detection: bool = True,
4046
visualize: bool = True,
@@ -110,6 +116,31 @@ def _default_callback(e):
110116
for llm in list(self.agent.get_all_llms()):
111117
self.llm_registry.add(llm)
112118

119+
def _compose_token_callbacks(
120+
callbacks: list[ConversationTokenCallbackType],
121+
) -> ConversationTokenCallbackType:
122+
def _composed(event):
123+
for cb in callbacks:
124+
cb(event)
125+
126+
return _composed
127+
128+
user_token_callback = (
129+
_compose_token_callbacks(token_callbacks) if token_callbacks else None
130+
)
131+
132+
def _handle_stream_event(stream_event: LLMStreamEvent) -> None:
133+
try:
134+
self._on_event(
135+
StreamingDeltaEvent(source="agent", stream_event=stream_event)
136+
)
137+
except Exception:
138+
logger.exception("stream_event_processing_error", exc_info=True)
139+
if user_token_callback:
140+
user_token_callback(stream_event)
141+
142+
self._on_token = _handle_stream_event
143+
113144
# Initialize secrets if provided
114145
if secrets:
115146
# Convert dict[str, str] to dict[str, SecretValue]
@@ -242,7 +273,11 @@ def run(self) -> None:
242273
self._state.agent_status = AgentExecutionStatus.RUNNING
243274

244275
# step must mutate the SAME state object
245-
self.agent.step(self._state, on_event=self._on_event)
276+
self.agent.step(
277+
self._state,
278+
on_event=self._on_event,
279+
on_token=self._on_token,
280+
)
246281
iteration += 1
247282

248283
# Check for non-finished terminal conditions

0 commit comments

Comments
 (0)