Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d149370
Add streaming support for Responses API
enyst Oct 19, 2025
d331abf
Document LLM streaming refactor plan
enyst Oct 20, 2025
e31b728
Refactor streaming chunk model and visualizer
enyst Oct 20, 2025
3983ce4
Merge remote-tracking branch 'upstream/main' into streaming-responses
enyst Oct 21, 2025
a341d0e
Merge remote-tracking branch 'upstream/main' into streaming-responses
enyst Oct 21, 2025
031fcf1
Merge remote-tracking branch 'upstream/main' into streaming-responses
enyst Oct 23, 2025
287c9c2
Merge branch 'main' into streaming-responses
enyst Oct 23, 2025
21bcaa5
Merge main branch into streaming-responses
openhands-agent Nov 14, 2025
f920696
Merge branch 'main' into streaming-responses
enyst Nov 20, 2025
a65dbda
Simplify streaming visualizer and always-persist streaming panels
enyst Nov 20, 2025
27f9653
Merge main into streaming-responses and resolve conflicts
openhands-agent Nov 25, 2025
dbbd0cf
Fix merge conflicts and type errors after merging main
openhands-agent Nov 25, 2025
7ac405d
Fix circular import and update tests for streaming API
openhands-agent Nov 25, 2025
847eaaa
Trigger CI re-run
openhands-agent Nov 25, 2025
80c06f7
remove md
xingyaoww Nov 26, 2025
9859171
rename example
xingyaoww Nov 26, 2025
71fce09
make LLMStreamChunk a basemodel
xingyaoww Nov 26, 2025
6a67bac
clean up some merges
xingyaoww Nov 26, 2025
ab8961a
simplify local convo and remove streaming event since that's probably…
xingyaoww Nov 26, 2025
fa57f08
update the right init
xingyaoww Nov 26, 2025
66e2092
rm streaming visualizer
xingyaoww Nov 26, 2025
9d1914c
some attempt to simplify
xingyaoww Nov 26, 2025
2491734
revert facts
xingyaoww Nov 26, 2025
777f4de
remove extra tests
xingyaoww Nov 26, 2025
db995d8
implement chat completion streaming
xingyaoww Nov 26, 2025
06cf551
fix
xingyaoww Nov 26, 2025
95622ba
fix chunk
xingyaoww Nov 26, 2025
f7a07fa
simplify example
xingyaoww Nov 26, 2025
df87e8e
get streaming example to work!
xingyaoww Nov 26, 2025
d7734c6
ignore warnings
xingyaoww Nov 26, 2025
5b6a58b
Fix failing tests and pre-commit checks for streaming implementation
openhands-agent Nov 26, 2025
38e2fd6
update streaming example
xingyaoww Nov 26, 2025
f34ccd1
Merge branch 'main' into xw/completions-streaming
xingyaoww Nov 26, 2025
7e7fd35
remove unused metadata
xingyaoww Nov 26, 2025
7f8cd32
Update openhands-sdk/openhands/sdk/conversation/impl/local_conversati…
xingyaoww Nov 26, 2025
f223f05
revert loop
xingyaoww Nov 26, 2025
767741e
Merge commit '7f8cd32533928a41f00e06675d003d3b2c34cc92' into xw/compl…
xingyaoww Nov 26, 2025
39db2f3
move imports
xingyaoww Nov 26, 2025
1753bbc
Revert "move imports"
xingyaoww Nov 26, 2025
48584ab
add a comment
xingyaoww Nov 26, 2025
8ee4341
report example cost
xingyaoww Nov 26, 2025
cd1bbb0
revert tests for responses API which is not implemnted yet
xingyaoww Nov 26, 2025
ca4418e
Fix failing tests to match streaming implementation
openhands-agent Nov 26, 2025
c7819bf
Replace Responses API streaming tests with Chat Completion streaming …
openhands-agent Nov 26, 2025
ccfb3e6
Merge branch 'main' into xw/completions-streaming
xingyaoww Nov 26, 2025
8be913e
Remove unnecessary metadata mocking from test_agent_utils
openhands-agent Nov 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions examples/01_standalone_sdk/29_llm_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import os
import sys
from typing import Literal

from pydantic import SecretStr

from openhands.sdk import (
Conversation,
get_logger,
)
from openhands.sdk.llm import LLM
from openhands.sdk.llm.streaming import ModelResponseStream
from openhands.tools.preset.default import get_default_agent


logger = get_logger(__name__)


api_key = os.getenv("LLM_API_KEY") or os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("Set LLM_API_KEY or OPENAI_API_KEY in your environment.")

model = os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929")
base_url = os.getenv("LLM_BASE_URL")
llm = LLM(
model=model,
api_key=SecretStr(api_key),
base_url=base_url,
usage_id="stream-demo",
stream=True,
)

agent = get_default_agent(llm=llm, cli_mode=True)


# Define streaming states
StreamingState = Literal["thinking", "content", "tool_name", "tool_args"]
# Track state across on_token calls for boundary detection
_current_state: StreamingState | None = None


def on_token(chunk: ModelResponseStream) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think this maybe belongs in the visualizer? Otherwise it doesn't work for anything else, every client code needs to rewrite this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think getting streaming supported in visualizer is probably a bit too advanced (we need to figure out edge cases and come up w/ a standard data structure in streaming responses: different model might return different things - litellm did not unify this) -- in this PR i was mainly hoping to get the scaffold / initial MVP for streaming and not to go too deep into the rabbit hole to keep the PR size /scope reasonable 🤣

Maybe we can do a visualizer in later PR, while in the meantime it'll be good to at least have some level of streaming ability

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I was actually thinking about this in the back of my mind and I think that is totally the way to go. Let's get this in, and take it from here.

The essential structure is fine, and as it is, I think maybe it unlocks some potential for client developers to build further or improve.

"""
Handle all types of streaming tokens including content,
tool calls, and thinking blocks with dynamic boundary detection.
"""
global _current_state

choices = chunk.choices
for choice in choices:
delta = choice.delta
if delta is not None:
# Handle thinking blocks (reasoning content)
reasoning_content = getattr(delta, "reasoning_content", None)
if isinstance(reasoning_content, str) and reasoning_content:
if _current_state != "thinking":
if _current_state is not None:
sys.stdout.write("\n")
sys.stdout.write("THINKING: ")
_current_state = "thinking"
sys.stdout.write(reasoning_content)
sys.stdout.flush()

# Handle regular content
content = getattr(delta, "content", None)
if isinstance(content, str) and content:
if _current_state != "content":
if _current_state is not None:
sys.stdout.write("\n")
sys.stdout.write("CONTENT: ")
_current_state = "content"
sys.stdout.write(content)
sys.stdout.flush()

# Handle tool calls
tool_calls = getattr(delta, "tool_calls", None)
if tool_calls:
for tool_call in tool_calls:
tool_name = (
tool_call.function.name if tool_call.function.name else ""
)
tool_args = (
tool_call.function.arguments
if tool_call.function.arguments
else ""
)
if tool_name:
if _current_state != "tool_name":
if _current_state is not None:
sys.stdout.write("\n")
sys.stdout.write("TOOL NAME: ")
_current_state = "tool_name"
sys.stdout.write(tool_name)
sys.stdout.flush()
if tool_args:
if _current_state != "tool_args":
if _current_state is not None:
sys.stdout.write("\n")
sys.stdout.write("TOOL ARGS: ")
_current_state = "tool_args"
sys.stdout.write(tool_args)
sys.stdout.flush()


conversation = Conversation(
agent=agent,
workspace=os.getcwd(),
token_callbacks=[on_token],
)

story_prompt = (
"Tell me a long story about LLM streaming, write it a file, "
"make sure it has multiple paragraphs. "
)
conversation.send_message(story_prompt)
print("Token Streaming:")
print("-" * 100 + "\n")
conversation.run()

cleanup_prompt = (
"Thank you. Please delete the streaming story file now that I've read it, "
"then confirm the deletion."
)
conversation.send_message(cleanup_prompt)
print("Token Streaming:")
print("-" * 100 + "\n")
conversation.run()

# Report cost
cost = llm.metrics.accumulated_cost
print(f"EXAMPLE_COST: {cost}")
4 changes: 4 additions & 0 deletions openhands-sdk/openhands/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
LLM,
ImageContent,
LLMRegistry,
LLMStreamChunk,
Message,
RedactedThinkingBlock,
RegistryEvent,
TextContent,
ThinkingBlock,
TokenCallbackType,
)
from openhands.sdk.logger import get_logger
from openhands.sdk.mcp import (
Expand Down Expand Up @@ -58,6 +60,8 @@
__all__ = [
"LLM",
"LLMRegistry",
"LLMStreamChunk",
"TokenCallbackType",
"ConversationStats",
"RegistryEvent",
"Message",
Expand Down
7 changes: 6 additions & 1 deletion openhands-sdk/openhands/sdk/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from openhands.sdk.conversation import (
ConversationCallbackType,
ConversationState,
ConversationTokenCallbackType,
LocalConversation,
)
from openhands.sdk.conversation.state import ConversationExecutionStatus
Expand Down Expand Up @@ -135,6 +136,7 @@ def step(
self,
conversation: LocalConversation,
on_event: ConversationCallbackType,
on_token: ConversationTokenCallbackType | None = None,
) -> None:
state = conversation.state
# Check for pending actions (implicit confirmation)
Expand Down Expand Up @@ -167,7 +169,10 @@ def step(

try:
llm_response = make_llm_completion(
self.llm, _messages, tools=list(self.tools_map.values())
self.llm,
_messages,
tools=list(self.tools_map.values()),
on_token=on_token,
)
except FunctionCallValidationError as e:
logger.warning(f"LLM generated malformed function call: {e}")
Expand Down
9 changes: 8 additions & 1 deletion openhands-sdk/openhands/sdk/agent/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

if TYPE_CHECKING:
from openhands.sdk.conversation import ConversationState, LocalConversation
from openhands.sdk.conversation.types import ConversationCallbackType
from openhands.sdk.conversation.types import (
ConversationCallbackType,
ConversationTokenCallbackType,
)


logger = get_logger(__name__)
Expand Down Expand Up @@ -239,6 +242,7 @@ def step(
self,
conversation: "LocalConversation",
on_event: "ConversationCallbackType",
on_token: "ConversationTokenCallbackType | None" = None,
) -> None:
"""Taking a step in the conversation.

Expand All @@ -250,6 +254,9 @@ def step(
4.1 If conversation is finished, set state.execution_status to FINISHED
4.2 Otherwise, just return, Conversation will kick off the next step

If the underlying LLM supports streaming, partial deltas are forwarded to
``on_token`` before the full response is returned.

NOTE: state will be mutated in-place.
"""

Expand Down
5 changes: 5 additions & 0 deletions openhands-sdk/openhands/sdk/agent/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from openhands.sdk.context.condenser.base import CondenserBase
from openhands.sdk.context.view import View
from openhands.sdk.conversation.types import ConversationTokenCallbackType
from openhands.sdk.event.base import Event, LLMConvertibleEvent
from openhands.sdk.event.condenser import Condensation
from openhands.sdk.llm import LLM, LLMResponse, Message
Expand Down Expand Up @@ -182,13 +183,15 @@ def make_llm_completion(
llm: LLM,
messages: list[Message],
tools: list[ToolDefinition] | None = None,
on_token: ConversationTokenCallbackType | None = None,
) -> LLMResponse:
"""Make an LLM completion call with the provided messages and tools.

Args:
llm: The LLM instance to use for completion
messages: The messages to send to the LLM
tools: Optional list of tools to provide to the LLM
on_token: Optional callback for streaming token updates

Returns:
LLMResponse from the LLM completion call
Expand All @@ -200,10 +203,12 @@ def make_llm_completion(
include=None,
store=False,
add_security_risk_prediction=True,
on_token=on_token,
)
else:
return llm.completion(
messages=messages,
tools=tools or [],
add_security_risk_prediction=True,
on_token=on_token,
)
6 changes: 5 additions & 1 deletion openhands-sdk/openhands/sdk/conversation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
ConversationState,
)
from openhands.sdk.conversation.stuck_detector import StuckDetector
from openhands.sdk.conversation.types import ConversationCallbackType
from openhands.sdk.conversation.types import (
ConversationCallbackType,
ConversationTokenCallbackType,
)
from openhands.sdk.conversation.visualizer import (
ConversationVisualizerBase,
DefaultConversationVisualizer,
Expand All @@ -24,6 +27,7 @@
"ConversationState",
"ConversationExecutionStatus",
"ConversationCallbackType",
"ConversationTokenCallbackType",
"DefaultConversationVisualizer",
"ConversationVisualizerBase",
"SecretRegistry",
Expand Down
21 changes: 15 additions & 6 deletions openhands-sdk/openhands/sdk/conversation/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from abc import ABC, abstractmethod
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import TYPE_CHECKING, Protocol
from typing import TYPE_CHECKING, Protocol, TypeVar, cast

from openhands.sdk.conversation.conversation_stats import ConversationStats
from openhands.sdk.conversation.events_list_base import EventsListBase
from openhands.sdk.conversation.secret_registry import SecretValue
from openhands.sdk.conversation.types import ConversationCallbackType, ConversationID
from openhands.sdk.conversation.types import (
ConversationCallbackType,
ConversationID,
ConversationTokenCallbackType,
)
from openhands.sdk.llm.llm import LLM
from openhands.sdk.llm.message import Message
from openhands.sdk.observability.laminar import (
Expand All @@ -27,6 +31,13 @@
from openhands.sdk.conversation.state import ConversationExecutionStatus


CallbackType = TypeVar(
"CallbackType",
ConversationCallbackType,
ConversationTokenCallbackType,
)


class ConversationStateProtocol(Protocol):
"""Protocol defining the interface for conversation state objects."""

Expand Down Expand Up @@ -235,9 +246,7 @@ def ask_agent(self, question: str) -> str:
...

@staticmethod
def compose_callbacks(
callbacks: Iterable[ConversationCallbackType],
) -> ConversationCallbackType:
def compose_callbacks(callbacks: Iterable[CallbackType]) -> CallbackType:
"""Compose multiple callbacks into a single callback function.

Args:
Expand All @@ -252,4 +261,4 @@ def composed(event) -> None:
if cb:
cb(event)

return composed
return cast(CallbackType, composed)
11 changes: 10 additions & 1 deletion openhands-sdk/openhands/sdk/conversation/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from openhands.sdk.agent.base import AgentBase
from openhands.sdk.conversation.base import BaseConversation
from openhands.sdk.conversation.secret_registry import SecretValue
from openhands.sdk.conversation.types import ConversationCallbackType, ConversationID
from openhands.sdk.conversation.types import (
ConversationCallbackType,
ConversationID,
ConversationTokenCallbackType,
)
from openhands.sdk.conversation.visualizer import (
ConversationVisualizerBase,
DefaultConversationVisualizer,
Expand Down Expand Up @@ -49,6 +53,7 @@ def __new__(
persistence_dir: str | Path | None = None,
conversation_id: ConversationID | None = None,
callbacks: list[ConversationCallbackType] | None = None,
token_callbacks: list[ConversationTokenCallbackType] | None = None,
max_iteration_per_run: int = 500,
stuck_detection: bool = True,
visualizer: (
Expand All @@ -65,6 +70,7 @@ def __new__(
workspace: RemoteWorkspace,
conversation_id: ConversationID | None = None,
callbacks: list[ConversationCallbackType] | None = None,
token_callbacks: list[ConversationTokenCallbackType] | None = None,
max_iteration_per_run: int = 500,
stuck_detection: bool = True,
visualizer: (
Expand All @@ -81,6 +87,7 @@ def __new__(
persistence_dir: str | Path | None = None,
conversation_id: ConversationID | None = None,
callbacks: list[ConversationCallbackType] | None = None,
token_callbacks: list[ConversationTokenCallbackType] | None = None,
max_iteration_per_run: int = 500,
stuck_detection: bool = True,
visualizer: (
Expand All @@ -104,6 +111,7 @@ def __new__(
agent=agent,
conversation_id=conversation_id,
callbacks=callbacks,
token_callbacks=token_callbacks,
max_iteration_per_run=max_iteration_per_run,
stuck_detection=stuck_detection,
visualizer=visualizer,
Expand All @@ -115,6 +123,7 @@ def __new__(
agent=agent,
conversation_id=conversation_id,
callbacks=callbacks,
token_callbacks=token_callbacks,
max_iteration_per_run=max_iteration_per_run,
stuck_detection=stuck_detection,
visualizer=visualizer,
Expand Down
Loading
Loading