From 246c914c6f12ef9dfcdb367cde4239e577e0b314 Mon Sep 17 00:00:00 2001 From: Alex Fedotyev <61838744+alex-fedotyev@users.noreply.github.com> Date: Tue, 5 May 2026 04:57:47 +0000 Subject: [PATCH] agent/streaming: ship synthetic done when a turn ends without one Symptom: chats appear stuck mid-step. The session card drops out of the sidebar's "Running" group and into the time-grouped list, but the chat detail keeps showing the "thinking..." spinner because isStreaming was never reset. The previous c439473 fix (tool schemas aborting agent turns) closed one of the underlying causes. Three remaining cases also leave _run_inner without emitting a terminal event: 1. Post-stream exception. After receive_response yields ResultMessage the loop exits cleanly, but the ~80 lines after that in _run_inner contain unguarded post-stream work (add_message, mark_active, update_session_metadata, record_turn_usage). If any of those raise, the exception propagates up to engine.run() before broadcast_done ever fires. 2. Hung CLI cancelled externally. claude_agent_sdk's receive_response documents "If no ResultMessage is received, the iterator continues indefinitely." When the CLI wedges and an outer mechanism cancels the asyncio task, the CancelledError handler sends "stopped" on the normal path, but the WS message can be lost mid-reconnect. 3. Lost WebSocket message during reconnect. The frontend handler had no symmetry: handleSessionRunning enters streaming on is_running:true but never exits on is_running:false, so a missed done/stopped/error leaves isStreaming wedged on. Server fix: StreamBroadcaster gains an _open_turns set with mark_turn_open / is_turn_open / clear_turn_open. broadcast() auto- clears the flag when a terminal event ships (done/stopped/error). engine.run() calls mark_turn_open before _run_inner; its finally checks is_turn_open and ships a synthetic broadcast_done if still set. Single backstop covers all three cases at the source. Frontend fix: handleSessionRunning gets a symmetric exit branch. When the active session reports is_running:false while isStreaming is true, it commits any in-flight streaming blocks to messages, clears streamingBlocks, and resets agentStatus to idle. Belt-and-suspenders for the case where even the synthetic done is missed (e.g. WS reconnect drops the done event but redelivers session_running:false). Adds 5 tests in tests/test_streaming.py::TestOpenTurnTracking covering: each terminal type clears the flag, non-terminal types don't, explicit clear_turn_open, multi-session isolation, and the broadcast_done helper closes the turn. --- nerve/agent/engine.py | 27 +++++++++ nerve/agent/streaming.py | 32 +++++++++++ tests/test_streaming.py | 66 ++++++++++++++++++++++ web/src/stores/handlers/sessionHandlers.ts | 26 +++++++++ 4 files changed, 151 insertions(+) diff --git a/nerve/agent/engine.py b/nerve/agent/engine.py index f48c948..2b6de13 100644 --- a/nerve/agent/engine.py +++ b/nerve/agent/engine.py @@ -1427,6 +1427,11 @@ async def run( self.sessions.mark_running(session_id) if channel is not None: self._active_channel[session_id] = channel + # Mark the turn as in flight so the finally below can + # detect "ended without sending done/stopped/error" and + # ship a synthetic done. Clearing happens automatically + # when a terminal event is broadcast. + broadcaster.mark_turn_open(session_id) # Notify all connected clients that this session started running await broadcaster.broadcast("__global__", { "type": "session_running", @@ -1442,6 +1447,28 @@ async def run( finally: self.sessions.mark_not_running(session_id) self._active_channel.pop(session_id, None) + # Backstop: if _run_inner exited without broadcasting + # done/stopped/error (post-stream DB exception, hung + # CLI cancelled by an outer mechanism, etc.), the + # frontend never learned the turn ended and is still + # showing "thinking..." even though the server has + # cleared is_running. Ship a synthetic done so the + # streaming UI exits cleanly. + if broadcaster.is_turn_open(session_id): + logger.warning( + "Session %s ended without a terminal event " + "(done/stopped/error); sending synthetic done " + "so the frontend exits streaming state", + session_id, + ) + try: + await broadcaster.broadcast_done(session_id) + except Exception as e: + logger.warning( + "Synthetic done broadcast failed for %s: %s", + session_id, e, + ) + broadcaster.clear_turn_open(session_id) broadcaster.stop_buffering(session_id) # Notify all connected clients that this session stopped await broadcaster.broadcast("__global__", { diff --git a/nerve/agent/streaming.py b/nerve/agent/streaming.py index ca0fa3c..84f1202 100644 --- a/nerve/agent/streaming.py +++ b/nerve/agent/streaming.py @@ -50,6 +50,18 @@ def __init__(self, max_buffer_size: int = MAX_BUFFER_SIZE): # Per-session event buffer for reconnect replay self._session_buffers: dict[str, list[dict[str, Any]]] = {} self._max_buffer_size = max_buffer_size + # Per-session "turn is in flight" flag. Set by mark_turn_open() + # at the start of a run, cleared automatically when broadcast() + # ships a terminal event ("done", "stopped", "error"). Used by + # the engine's run() finally as a backstop: if the flag is still + # set after _run_inner exits, no terminal event was sent (post- + # stream exception, hung CLI cancelled externally, etc.) and we + # need to ship a synthetic "done" so the frontend exits its + # streaming UI state. Without this, the chat detail stays on + # "thinking..." forever even though the server has cleared + # is_running and the session card has dropped out of the + # "Running" sidebar group. + self._open_turns: set[str] = set() async def register(self, session_id: str, callback_id: str, callback: BroadcastCallback) -> None: """Register a broadcast listener for a session.""" @@ -72,6 +84,11 @@ async def unregister(self, session_id: str, callback_id: str) -> None: async def broadcast(self, session_id: str, message: dict[str, Any]) -> None: """Send a message to all listeners of a session. Also buffers if active.""" + # Terminal events close the open-turn flag so the engine's + # backstop in run() knows no synthetic "done" is needed. + if message.get("type") in ("done", "stopped", "error"): + self._open_turns.discard(session_id) + # Buffer events during active streaming if session_id in self._session_buffers: buf = self._session_buffers[session_id] @@ -89,6 +106,21 @@ async def broadcast(self, session_id: str, message: dict[str, Any]) -> None: except Exception as e: logger.warning("Broadcast to %s failed: %s", callback_id, e) + # --- Turn tracking (engine backstop) --- + + def mark_turn_open(self, session_id: str) -> None: + """Mark a turn as in flight. Cleared when a terminal event is + broadcast (done/stopped/error) or via clear_turn_open().""" + self._open_turns.add(session_id) + + def is_turn_open(self, session_id: str) -> bool: + """Whether a turn is in flight (no terminal event sent yet).""" + return session_id in self._open_turns + + def clear_turn_open(self, session_id: str) -> None: + """Force-clear the open-turn flag without broadcasting.""" + self._open_turns.discard(session_id) + # --- Buffering for reconnect replay --- def start_buffering(self, session_id: str) -> None: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 34fa8a1..5499224 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -108,6 +108,72 @@ async def test_no_buffering_when_not_started(self): assert bc.get_buffer("s1") == [] +@pytest.mark.asyncio +class TestOpenTurnTracking: + """The engine's run() finally relies on open-turn tracking to ship a + synthetic ``done`` when ``_run_inner`` exits without sending a + terminal event. Without it, the chat detail stays on + "thinking..." forever even though the server has cleared + is_running and dropped the session out of the sidebar's "Running" + group. + """ + + async def test_mark_open_and_terminal_clears(self): + bc = StreamBroadcaster() + assert not bc.is_turn_open("s1") + + bc.mark_turn_open("s1") + assert bc.is_turn_open("s1") + + # Each terminal type should clear the flag + for terminal in ("done", "stopped", "error"): + bc.mark_turn_open("s1") + await bc.broadcast("s1", {"type": terminal}) + assert not bc.is_turn_open("s1"), ( + f"{terminal!r} should clear the open-turn flag" + ) + + async def test_non_terminal_does_not_clear(self): + bc = StreamBroadcaster() + bc.mark_turn_open("s1") + + # Streaming events must not close the turn + for nonterminal in ( + {"type": "token", "content": "hi"}, + {"type": "thinking", "content": "..."}, + {"type": "tool_use", "tool": "Bash"}, + {"type": "tool_result", "tool_use_id": "x"}, + ): + await bc.broadcast("s1", nonterminal) + assert bc.is_turn_open("s1"), ( + f"{nonterminal['type']} should not clear the open-turn flag" + ) + + async def test_clear_turn_open_explicit(self): + bc = StreamBroadcaster() + bc.mark_turn_open("s1") + assert bc.is_turn_open("s1") + bc.clear_turn_open("s1") + assert not bc.is_turn_open("s1") + + async def test_open_turns_isolated_per_session(self): + bc = StreamBroadcaster() + bc.mark_turn_open("s1") + bc.mark_turn_open("s2") + await bc.broadcast("s1", {"type": "done"}) + assert not bc.is_turn_open("s1") + assert bc.is_turn_open("s2") + + async def test_broadcast_done_helper_clears_open_turn(self): + # The engine backstop calls broadcaster.broadcast_done() to ship + # the synthetic terminal event. Verify the helper closes the + # turn so a second call wouldn't double-broadcast. + bc = StreamBroadcaster() + bc.mark_turn_open("s1") + await bc.broadcast_done("s1", usage={"input_tokens": 1}) + assert not bc.is_turn_open("s1") + + @pytest.mark.asyncio class TestBoundedBuffers: """Test that buffers are bounded to max_buffer_size.""" diff --git a/web/src/stores/handlers/sessionHandlers.ts b/web/src/stores/handlers/sessionHandlers.ts index c0ed119..bdc9453 100644 --- a/web/src/stores/handlers/sessionHandlers.ts +++ b/web/src/stores/handlers/sessionHandlers.ts @@ -178,6 +178,32 @@ export function handleSessionRunning( updates.streamingBlocks = []; updates.agentStatus = { state: 'thinking' }; } + // Defensive: server says the run ended but the frontend is still in + // streaming mode. This happens when done/stopped/error never made + // it to the client (lost WS message during reconnect, post-stream + // exception on the server before broadcast_done fired, etc.). + // Without this branch the chat detail stays on "thinking..." while + // the sidebar entry has already dropped out of the "Running" group, + // which looks like the chat is stuck between steps. The server's + // backstop in engine.run() ships a synthetic done in most of those + // cases; this is belt-and-suspenders for when even that signal is + // missed. + if (msg.session_id === s.activeSession && !msg.is_running && s.isStreaming) { + const finalBlocks = s.streamingBlocks.map(b => + b.type === 'tool_call' && b.status === 'running' + ? { ...b, status: 'complete' as const } + : b, + ); + if (finalBlocks.length > 0) { + updates.messages = [ + ...s.messages, + { role: 'assistant' as const, blocks: finalBlocks }, + ]; + } + updates.streamingBlocks = []; + updates.isStreaming = false; + updates.agentStatus = { state: 'idle' }; + } return updates; }); }