diff --git a/nerve/agent/engine.py b/nerve/agent/engine.py index f48c948..2b6de13 100644 --- a/nerve/agent/engine.py +++ b/nerve/agent/engine.py @@ -1427,6 +1427,11 @@ async def run( self.sessions.mark_running(session_id) if channel is not None: self._active_channel[session_id] = channel + # Mark the turn as in flight so the finally below can + # detect "ended without sending done/stopped/error" and + # ship a synthetic done. Clearing happens automatically + # when a terminal event is broadcast. + broadcaster.mark_turn_open(session_id) # Notify all connected clients that this session started running await broadcaster.broadcast("__global__", { "type": "session_running", @@ -1442,6 +1447,28 @@ async def run( finally: self.sessions.mark_not_running(session_id) self._active_channel.pop(session_id, None) + # Backstop: if _run_inner exited without broadcasting + # done/stopped/error (post-stream DB exception, hung + # CLI cancelled by an outer mechanism, etc.), the + # frontend never learned the turn ended and is still + # showing "thinking..." even though the server has + # cleared is_running. Ship a synthetic done so the + # streaming UI exits cleanly. + if broadcaster.is_turn_open(session_id): + logger.warning( + "Session %s ended without a terminal event " + "(done/stopped/error); sending synthetic done " + "so the frontend exits streaming state", + session_id, + ) + try: + await broadcaster.broadcast_done(session_id) + except Exception as e: + logger.warning( + "Synthetic done broadcast failed for %s: %s", + session_id, e, + ) + broadcaster.clear_turn_open(session_id) broadcaster.stop_buffering(session_id) # Notify all connected clients that this session stopped await broadcaster.broadcast("__global__", { diff --git a/nerve/agent/streaming.py b/nerve/agent/streaming.py index ca0fa3c..84f1202 100644 --- a/nerve/agent/streaming.py +++ b/nerve/agent/streaming.py @@ -50,6 +50,18 @@ def __init__(self, max_buffer_size: int = MAX_BUFFER_SIZE): # Per-session event buffer for reconnect replay self._session_buffers: dict[str, list[dict[str, Any]]] = {} self._max_buffer_size = max_buffer_size + # Per-session "turn is in flight" flag. Set by mark_turn_open() + # at the start of a run, cleared automatically when broadcast() + # ships a terminal event ("done", "stopped", "error"). Used by + # the engine's run() finally as a backstop: if the flag is still + # set after _run_inner exits, no terminal event was sent (post- + # stream exception, hung CLI cancelled externally, etc.) and we + # need to ship a synthetic "done" so the frontend exits its + # streaming UI state. Without this, the chat detail stays on + # "thinking..." forever even though the server has cleared + # is_running and the session card has dropped out of the + # "Running" sidebar group. + self._open_turns: set[str] = set() async def register(self, session_id: str, callback_id: str, callback: BroadcastCallback) -> None: """Register a broadcast listener for a session.""" @@ -72,6 +84,11 @@ async def unregister(self, session_id: str, callback_id: str) -> None: async def broadcast(self, session_id: str, message: dict[str, Any]) -> None: """Send a message to all listeners of a session. Also buffers if active.""" + # Terminal events close the open-turn flag so the engine's + # backstop in run() knows no synthetic "done" is needed. + if message.get("type") in ("done", "stopped", "error"): + self._open_turns.discard(session_id) + # Buffer events during active streaming if session_id in self._session_buffers: buf = self._session_buffers[session_id] @@ -89,6 +106,21 @@ async def broadcast(self, session_id: str, message: dict[str, Any]) -> None: except Exception as e: logger.warning("Broadcast to %s failed: %s", callback_id, e) + # --- Turn tracking (engine backstop) --- + + def mark_turn_open(self, session_id: str) -> None: + """Mark a turn as in flight. Cleared when a terminal event is + broadcast (done/stopped/error) or via clear_turn_open().""" + self._open_turns.add(session_id) + + def is_turn_open(self, session_id: str) -> bool: + """Whether a turn is in flight (no terminal event sent yet).""" + return session_id in self._open_turns + + def clear_turn_open(self, session_id: str) -> None: + """Force-clear the open-turn flag without broadcasting.""" + self._open_turns.discard(session_id) + # --- Buffering for reconnect replay --- def start_buffering(self, session_id: str) -> None: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 34fa8a1..5499224 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -108,6 +108,72 @@ async def test_no_buffering_when_not_started(self): assert bc.get_buffer("s1") == [] +@pytest.mark.asyncio +class TestOpenTurnTracking: + """The engine's run() finally relies on open-turn tracking to ship a + synthetic ``done`` when ``_run_inner`` exits without sending a + terminal event. Without it, the chat detail stays on + "thinking..." forever even though the server has cleared + is_running and dropped the session out of the sidebar's "Running" + group. + """ + + async def test_mark_open_and_terminal_clears(self): + bc = StreamBroadcaster() + assert not bc.is_turn_open("s1") + + bc.mark_turn_open("s1") + assert bc.is_turn_open("s1") + + # Each terminal type should clear the flag + for terminal in ("done", "stopped", "error"): + bc.mark_turn_open("s1") + await bc.broadcast("s1", {"type": terminal}) + assert not bc.is_turn_open("s1"), ( + f"{terminal!r} should clear the open-turn flag" + ) + + async def test_non_terminal_does_not_clear(self): + bc = StreamBroadcaster() + bc.mark_turn_open("s1") + + # Streaming events must not close the turn + for nonterminal in ( + {"type": "token", "content": "hi"}, + {"type": "thinking", "content": "..."}, + {"type": "tool_use", "tool": "Bash"}, + {"type": "tool_result", "tool_use_id": "x"}, + ): + await bc.broadcast("s1", nonterminal) + assert bc.is_turn_open("s1"), ( + f"{nonterminal['type']} should not clear the open-turn flag" + ) + + async def test_clear_turn_open_explicit(self): + bc = StreamBroadcaster() + bc.mark_turn_open("s1") + assert bc.is_turn_open("s1") + bc.clear_turn_open("s1") + assert not bc.is_turn_open("s1") + + async def test_open_turns_isolated_per_session(self): + bc = StreamBroadcaster() + bc.mark_turn_open("s1") + bc.mark_turn_open("s2") + await bc.broadcast("s1", {"type": "done"}) + assert not bc.is_turn_open("s1") + assert bc.is_turn_open("s2") + + async def test_broadcast_done_helper_clears_open_turn(self): + # The engine backstop calls broadcaster.broadcast_done() to ship + # the synthetic terminal event. Verify the helper closes the + # turn so a second call wouldn't double-broadcast. + bc = StreamBroadcaster() + bc.mark_turn_open("s1") + await bc.broadcast_done("s1", usage={"input_tokens": 1}) + assert not bc.is_turn_open("s1") + + @pytest.mark.asyncio class TestBoundedBuffers: """Test that buffers are bounded to max_buffer_size.""" diff --git a/web/src/stores/handlers/sessionHandlers.ts b/web/src/stores/handlers/sessionHandlers.ts index c0ed119..bdc9453 100644 --- a/web/src/stores/handlers/sessionHandlers.ts +++ b/web/src/stores/handlers/sessionHandlers.ts @@ -178,6 +178,32 @@ export function handleSessionRunning( updates.streamingBlocks = []; updates.agentStatus = { state: 'thinking' }; } + // Defensive: server says the run ended but the frontend is still in + // streaming mode. This happens when done/stopped/error never made + // it to the client (lost WS message during reconnect, post-stream + // exception on the server before broadcast_done fired, etc.). + // Without this branch the chat detail stays on "thinking..." while + // the sidebar entry has already dropped out of the "Running" group, + // which looks like the chat is stuck between steps. The server's + // backstop in engine.run() ships a synthetic done in most of those + // cases; this is belt-and-suspenders for when even that signal is + // missed. + if (msg.session_id === s.activeSession && !msg.is_running && s.isStreaming) { + const finalBlocks = s.streamingBlocks.map(b => + b.type === 'tool_call' && b.status === 'running' + ? { ...b, status: 'complete' as const } + : b, + ); + if (finalBlocks.length > 0) { + updates.messages = [ + ...s.messages, + { role: 'assistant' as const, blocks: finalBlocks }, + ]; + } + updates.streamingBlocks = []; + updates.isStreaming = false; + updates.agentStatus = { state: 'idle' }; + } return updates; }); }