88from openhands .sdk .conversation .state import AgentExecutionStatus , ConversationState
99from openhands .sdk .conversation .stuck_detector import StuckDetector
1010from openhands .sdk .conversation .title_utils import generate_conversation_title
11- from openhands .sdk .conversation .types import ConversationCallbackType , ConversationID
11+ from openhands .sdk .conversation .types import (
12+ ConversationCallbackType ,
13+ ConversationID ,
14+ ConversationTokenCallbackType ,
15+ )
1216from openhands .sdk .conversation .visualizer import create_default_visualizer
1317from openhands .sdk .event import (
1418 MessageEvent ,
1519 PauseEvent ,
20+ StreamingDeltaEvent ,
1621 UserRejectObservation ,
1722)
18- from openhands .sdk .llm import LLM , Message , TextContent
23+ from openhands .sdk .llm import LLM , LLMStreamEvent , Message , TextContent
1924from openhands .sdk .llm .llm_registry import LLMRegistry
2025from openhands .sdk .logger import get_logger
2126from openhands .sdk .security .confirmation_policy import (
@@ -35,6 +40,7 @@ def __init__(
3540 persistence_dir : str | None = None ,
3641 conversation_id : ConversationID | None = None ,
3742 callbacks : list [ConversationCallbackType ] | None = None ,
43+ token_callbacks : list [ConversationTokenCallbackType ] | None = None ,
3844 max_iteration_per_run : int = 500 ,
3945 stuck_detection : bool = True ,
4046 visualize : bool = True ,
@@ -110,6 +116,31 @@ def _default_callback(e):
110116 for llm in list (self .agent .get_all_llms ()):
111117 self .llm_registry .add (llm )
112118
119+ def _compose_token_callbacks (
120+ callbacks : list [ConversationTokenCallbackType ],
121+ ) -> ConversationTokenCallbackType :
122+ def _composed (event ):
123+ for cb in callbacks :
124+ cb (event )
125+
126+ return _composed
127+
128+ user_token_callback = (
129+ _compose_token_callbacks (token_callbacks ) if token_callbacks else None
130+ )
131+
132+ def _handle_stream_event (stream_event : LLMStreamEvent ) -> None :
133+ try :
134+ self ._on_event (
135+ StreamingDeltaEvent (source = "agent" , stream_event = stream_event )
136+ )
137+ except Exception :
138+ logger .exception ("stream_event_processing_error" , exc_info = True )
139+ if user_token_callback :
140+ user_token_callback (stream_event )
141+
142+ self ._on_token = _handle_stream_event
143+
113144 # Initialize secrets if provided
114145 if secrets :
115146 # Convert dict[str, str] to dict[str, SecretValue]
@@ -242,7 +273,11 @@ def run(self) -> None:
242273 self ._state .agent_status = AgentExecutionStatus .RUNNING
243274
244275 # step must mutate the SAME state object
245- self .agent .step (self ._state , on_event = self ._on_event )
276+ self .agent .step (
277+ self ._state ,
278+ on_event = self ._on_event ,
279+ on_token = self ._on_token ,
280+ )
246281 iteration += 1
247282
248283 # Check for non-finished terminal conditions
0 commit comments