From 789e1ea76140998b2365729e8f4aa8e419e1cad8 Mon Sep 17 00:00:00 2001 From: Alex Fedotyev <61838744+alex-fedotyev@users.noreply.github.com> Date: Wed, 13 May 2026 02:51:10 +0000 Subject: [PATCH] fix(chat): close two WS chat-glitch gaps not covered by #63-#67 User-reported glitch: "I ask for something and there is no response, then I ask again and it answers to the previous question." Five reliability PRs (#63 shorthand-schema, #64 synthetic done, #65 stale sdk_session_id, #66 idle timeout, #67 sticky session) each close one underlying cause. Two gaps remain that none of those PRs cover. Gap 1: client-side send silently drops payloads. web/src/api/websocket.ts checked readyState === OPEN and no-op'd otherwise. The 3s reconnect window leaves a hole: send() returns to the caller and chatStore.sendMessage has already optimistically appended the user message and set isStreaming=true. The user thinks the agent is thinking but the message never reached the server, so the next reply lands against a stale prompt. Track readyState explicitly. CONNECTING or reconnect-scheduled now queues the payload (bounded to 5 entries; oldest evicted) and flushes from onopen. CLOSED-without-reconnect and CLOSING return 'dropped' so the caller can revert. chatStore.sendMessage pops the optimistic user message on 'dropped' and surfaces an inline assistant error so the user can retry. Gap 2: gateway initial-bind never replayed the broadcaster buffer. The switch_session handler already shipped session_status with buffered_events on session switch, but the initial-connect handshake at server.py:286-311 didn't. Reload mid-turn (or a transient 3s WS drop) and the in-flight stream was lost from the client's view even though the events sat in broadcaster._session_buffers waiting to be replayed. Lift the duplicated send-status construction into _send_session_status and call it from both branches. Initial-bind gates on broadcaster.is_buffering so idle sessions stay silent; switch_session calls unconditionally so the client refreshes is_running/status on every selection. The frontend handleSessionStatus already restores streamingBlocks, panels, todos, and interaction state from the buffer (handled by #69), so this is purely additive at the gateway. Tests: - 9 new asserts in tests/test_gateway_ws.py covering the helper output, the initial-bind gate, the switch_session regression path, and a load-fidelity check for buffer ordering. - Full pytest run: 444 pass, 2 skip, 2 pre-existing failures unrelated (test_bootstrap docker-env detection and test_cli_upgrade docker mode, both noted in notes/repo-conventions/nerve.md). - web/ tsc --noEmit clean, npm run build clean. Out of scope (followups, not blocking): - Stale-listener cleanup on swallowed send_json exceptions (server.py:298-301). - Application-level message_received ack from engine after sessions.add_message. - _session_locks TTL on session archive. --- nerve/gateway/server.py | 56 +++++++-- tests/test_gateway_ws.py | 244 ++++++++++++++++++++++++++++++++++++ web/src/api/websocket.ts | 50 +++++++- web/src/stores/chatStore.ts | 24 +++- 4 files changed, 360 insertions(+), 14 deletions(-) create mode 100644 tests/test_gateway_ws.py diff --git a/nerve/gateway/server.py b/nerve/gateway/server.py index ed928bf..1c907ed 100644 --- a/nerve/gateway/server.py +++ b/nerve/gateway/server.py @@ -53,6 +53,32 @@ def get_engine() -> AgentEngine: return _engine +async def _send_session_status( + websocket: WebSocket, + session_id: str, + is_running: bool, + session_record: dict | None, +) -> None: + """Send a ``session_status`` event to the freshly-bound listener. + + Called from the initial WS handshake (only when a turn is in flight so an + idle client doesn't get a no-op message) and from ``switch_session`` + (always, to refresh client-side ``is_running``/``status``). When the + session is running, the accumulated stream buffer is attached so the + client can rebuild ``streamingBlocks``, panels, todos, and interaction + state without waiting for new events. + """ + status_msg: dict = { + "type": "session_status", + "session_id": session_id, + "is_running": is_running, + "status": session_record.get("status") if session_record else "unknown", + } + if is_running: + status_msg["buffered_events"] = broadcaster.get_buffer(session_id) + await websocket.send_json(status_msg) + + @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan — initialize DB, engine, channels on startup.""" @@ -310,6 +336,18 @@ async def ws_broadcast(session_id: str, message: dict): "session_id": active_session, }) + # If a turn is mid-flight (page reload, transient WS drop, sticky + # reconnect after a network blip), replay the broadcaster buffer so + # the freshly-bound listener can rebuild the in-flight stream + # without waiting for new events. Idle sessions get nothing here; + # they hydrate via REST + the existing ``session_switched`` event. + if broadcaster.is_buffering(active_session): + is_running = _engine.is_session_running(active_session) + session_record = await _engine.db.get_session(active_session) + await _send_session_status( + websocket, active_session, is_running, session_record, + ) + try: while True: data = await websocket.receive_json() @@ -368,18 +406,16 @@ async def ws_broadcast(session_id: str, message: dict): # Persist channel mapping so next page load resumes this session await router.switch_session("web:default", new_session) - # Send session status (running/idle + buffered events for reconnect) + # Send session status (running/idle + buffered events for + # reconnect). Unlike the initial-bind branch, we always + # ship a status here so the client can flip its + # ``isStreaming`` / ``status`` for the newly-selected + # session even when the session is idle. is_running = _engine.is_session_running(new_session) session_record = await _engine.db.get_session(new_session) - status_msg: dict = { - "type": "session_status", - "session_id": new_session, - "is_running": is_running, - "status": session_record.get("status") if session_record else "unknown", - } - if is_running: - status_msg["buffered_events"] = broadcaster.get_buffer(new_session) - await websocket.send_json(status_msg) + await _send_session_status( + websocket, new_session, is_running, session_record, + ) await websocket.send_json({ "type": "session_switched", diff --git a/tests/test_gateway_ws.py b/tests/test_gateway_ws.py new file mode 100644 index 0000000..b00301e --- /dev/null +++ b/tests/test_gateway_ws.py @@ -0,0 +1,244 @@ +"""Tests for nerve.gateway.server WebSocket handshake buffer replay. + +The initial WS handshake replays the broadcaster buffer when a turn is in +flight, and stays silent when the session is idle. The existing +``switch_session`` path is covered too so the refactor onto +``_send_session_status`` doesn't regress. +""" + +from __future__ import annotations + +import pytest + +from nerve.agent.streaming import StreamBroadcaster, broadcaster as _global_broadcaster +from nerve.gateway.server import _send_session_status + + +class FakeWebSocket: + """Minimal WebSocket stand-in that captures ``send_json`` payloads.""" + + def __init__(self) -> None: + self.sent: list[dict] = [] + + async def send_json(self, payload: dict) -> None: + self.sent.append(payload) + + +@pytest.fixture(autouse=True) +def _reset_broadcaster_buffers(): + """Clear the module-global broadcaster between tests. + + Tests poke ``_global_broadcaster.start_buffering`` directly because the + helper reads ``broadcaster.get_buffer`` off the module global, not a + parameter. Reset before and after so a failing test can't leak state. + """ + _global_broadcaster._session_buffers.clear() + yield + _global_broadcaster._session_buffers.clear() + + +# --------------------------------------------------------------------------- +# _send_session_status helper +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestSendSessionStatus: + """Unit tests for the shared helper called from both WS branches.""" + + async def test_running_session_attaches_buffered_events(self): + ws = FakeWebSocket() + session_id = "sess-running" + _global_broadcaster.start_buffering(session_id) + await _global_broadcaster.broadcast(session_id, { + "type": "token", "session_id": session_id, "content": "hello ", + }) + await _global_broadcaster.broadcast(session_id, { + "type": "token", "session_id": session_id, "content": "world", + }) + + await _send_session_status( + ws, session_id, is_running=True, + session_record={"status": "active"}, + ) + + assert len(ws.sent) == 1 + msg = ws.sent[0] + assert msg["type"] == "session_status" + assert msg["session_id"] == session_id + assert msg["is_running"] is True + assert msg["status"] == "active" + assert "buffered_events" in msg + contents = [e["content"] for e in msg["buffered_events"]] + assert contents == ["hello ", "world"] + + async def test_idle_session_omits_buffered_events(self): + ws = FakeWebSocket() + # No start_buffering: the buffer is empty / absent. + await _send_session_status( + ws, "sess-idle", is_running=False, + session_record={"status": "active"}, + ) + + assert len(ws.sent) == 1 + msg = ws.sent[0] + assert msg["is_running"] is False + assert msg["status"] == "active" + # buffered_events MUST be absent when not running; the frontend + # gates buffer replay on its presence, not on length. + assert "buffered_events" not in msg + + async def test_missing_session_record_uses_unknown_status(self): + ws = FakeWebSocket() + await _send_session_status( + ws, "sess-gone", is_running=False, session_record=None, + ) + + assert ws.sent[0]["status"] == "unknown" + + async def test_running_session_with_empty_buffer_still_attaches_list(self): + """is_running gates ``buffered_events``; an empty list is still a signal. + + Frontend code branches on ``msg.buffered_events !== undefined``; + shipping an empty list tells the client "this session is running + but the stream has produced nothing yet" so it can flip + ``isStreaming`` without inventing fake blocks. + """ + ws = FakeWebSocket() + session_id = "sess-running-empty" + _global_broadcaster.start_buffering(session_id) + + await _send_session_status( + ws, session_id, is_running=True, + session_record={"status": "active"}, + ) + + msg = ws.sent[0] + assert msg["buffered_events"] == [] + + +# --------------------------------------------------------------------------- +# Initial-bind handshake (AC15) +# +# The actual handler is a closure inside ``create_app`` so it can't be unit- +# tested without spinning a full FastAPI app + lifespan. We re-exercise the +# *same logic* it runs (``is_buffering`` gate + helper invocation) so that a +# regression in either guard or call shape fails this test. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestInitialBindReplay: + + async def _simulate_initial_bind( + self, + ws: FakeWebSocket, + session_id: str, + is_running: bool, + session_record: dict | None, + ) -> None: + """Mirror the gate + helper call from the WS handshake.""" + if _global_broadcaster.is_buffering(session_id): + await _send_session_status( + ws, session_id, is_running, session_record, + ) + + async def test_replays_when_turn_in_flight(self): + ws = FakeWebSocket() + session_id = "sess-in-flight" + _global_broadcaster.start_buffering(session_id) + await _global_broadcaster.broadcast(session_id, { + "type": "tool_use", "session_id": session_id, + "tool": "Read", "input": {"file_path": "/x"}, + }) + + await self._simulate_initial_bind( + ws, session_id, is_running=True, + session_record={"status": "active"}, + ) + + assert len(ws.sent) == 1 + msg = ws.sent[0] + assert msg["type"] == "session_status" + assert msg["is_running"] is True + assert msg["buffered_events"][0]["tool"] == "Read" + + async def test_no_replay_when_session_idle(self): + ws = FakeWebSocket() + # No start_buffering: handshake guard must short-circuit. + await self._simulate_initial_bind( + ws, "sess-idle", is_running=False, + session_record={"status": "active"}, + ) + + assert ws.sent == [] + + +# --------------------------------------------------------------------------- +# switch_session regression guard (existing behaviour) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestSwitchSessionStillReplays: + """``switch_session`` ALWAYS sends ``session_status`` (running or idle).""" + + async def _simulate_switch_session( + self, + ws: FakeWebSocket, + new_session: str, + is_running: bool, + session_record: dict | None, + ) -> None: + await _send_session_status( + ws, new_session, is_running, session_record, + ) + + async def test_running_target_replays_buffer(self): + ws = FakeWebSocket() + session_id = "sess-switch-running" + _global_broadcaster.start_buffering(session_id) + await _global_broadcaster.broadcast(session_id, { + "type": "token", "session_id": session_id, "content": "x", + }) + + await self._simulate_switch_session( + ws, session_id, is_running=True, + session_record={"status": "active"}, + ) + + assert ws.sent[0]["is_running"] is True + assert ws.sent[0]["buffered_events"] == [ + {"type": "token", "session_id": session_id, "content": "x"}, + ] + + async def test_idle_target_still_sends_status(self): + ws = FakeWebSocket() + await self._simulate_switch_session( + ws, "sess-switch-idle", is_running=False, + session_record={"status": "active"}, + ) + + assert len(ws.sent) == 1 + assert ws.sent[0]["is_running"] is False + assert "buffered_events" not in ws.sent[0] + + +# --------------------------------------------------------------------------- +# Buffer fidelity: large stream survives intact through replay +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_replay_preserves_event_order_under_load(): + """Replay must hand events back in arrival order with no truncation.""" + bc = StreamBroadcaster(max_buffer_size=100) + bc.start_buffering("sess-load") + for i in range(50): + await bc.broadcast("sess-load", { + "type": "token", "session_id": "sess-load", "content": f"#{i}", + }) + + snapshot = bc.get_buffer("sess-load") + assert len(snapshot) == 50 + assert [e["content"] for e in snapshot] == [f"#{i}" for i in range(50)] diff --git a/web/src/api/websocket.ts b/web/src/api/websocket.ts index 8c50873..62e59cf 100644 --- a/web/src/api/websocket.ts +++ b/web/src/api/websocket.ts @@ -29,12 +29,29 @@ export type WSMessage = type MessageHandler = (msg: WSMessage) => void; +/** + * Outcome of a send attempt. + * - 'sent': payload was written to the open socket. + * - 'queued': socket isn't open yet (CONNECTING or reconnect scheduled); + * payload is buffered and will flush on the next `onopen`. + * - 'dropped': socket is closed with no reconnect pending (or in CLOSING); + * the payload was discarded. Caller should surface an error. + */ +export type SendStatus = 'sent' | 'queued' | 'dropped'; + +// Cap the pending queue so a long disconnect with fast typing doesn't grow +// memory without bound. Five slots is enough to hold a normal user's burst +// during the 3-second reconnect window; the oldest is dropped and the new +// payload wins. The caller still gets 'queued' for the surviving payload. +const MAX_PENDING = 5; + export class NerveWebSocket { private ws: WebSocket | null = null; private handlers: Set = new Set(); private reconnectTimer: ReturnType | null = null; private pingInterval: ReturnType | null = null; private _connected = false; + private _pending: Record[] = []; get connected() { return this._connected; @@ -53,6 +70,7 @@ export class NerveWebSocket { this.ws.onopen = () => { this._connected = true; this.startPing(); + this.flushPending(); }; this.ws.onmessage = (event) => { @@ -81,20 +99,34 @@ export class NerveWebSocket { this.ws?.close(); this.ws = null; this._connected = false; + this._pending = []; } - send(data: Record) { + send(data: Record): SendStatus { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(data)); + return 'sent'; + } + // Queue while the socket is mid-handshake or a reconnect is scheduled. + // The drain happens in `onopen` once the new socket is OPEN. + const connecting = this.ws?.readyState === WebSocket.CONNECTING; + if (connecting || this.reconnectTimer !== null) { + if (this._pending.length >= MAX_PENDING) { + this._pending.shift(); + } + this._pending.push(data); + return 'queued'; } + // CLOSING, CLOSED without reconnect, or no socket: caller must handle. + return 'dropped'; } - sendMessage(content: string, sessionId: string, fileIds?: string[]) { + sendMessage(content: string, sessionId: string, fileIds?: string[]): SendStatus { const msg: Record = { type: 'message', content, session_id: sessionId }; if (fileIds && fileIds.length > 0) { msg.file_ids = fileIds; } - this.send(msg); + return this.send(msg); } switchSession(sessionId: string) { @@ -137,6 +169,18 @@ export class NerveWebSocket { } } + private flushPending() { + if (this._pending.length === 0) return; + // Snapshot then clear so a synchronous error path can't double-send. + const pending = this._pending; + this._pending = []; + for (const data of pending) { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(data)); + } + } + } + private scheduleReconnect() { if (this.reconnectTimer) return; this.reconnectTimer = setTimeout(() => { diff --git a/web/src/stores/chatStore.ts b/web/src/stores/chatStore.ts index c58a4f9..51b5d44 100644 --- a/web/src/stores/chatStore.ts +++ b/web/src/stores/chatStore.ts @@ -437,13 +437,35 @@ export const useChatStore = create((set, get) => ({ blocks.push({ type: 'image', url: img.url, filename: img.filename, media_type: img.media_type }); } } + // Optimistic update: append the user message, flip to streaming. If the + // socket isn't open, send() returns 'queued' (will flush on reconnect) + // or 'dropped' (revert below). set((state) => ({ messages: [...state.messages, { role: 'user', blocks }], streamingBlocks: [], isStreaming: true, agentStatus: { state: 'thinking' }, })); - ws.sendMessage(content, session, fileIds); + const status = ws.sendMessage(content, session, fileIds); + if (status === 'dropped') { + // The message could not reach the server. Revert the optimistic + // state and surface the failure inline so the user knows to retry. + set((state) => ({ + messages: [ + ...state.messages.slice(0, -1), + { + role: 'assistant' as const, + blocks: [{ + type: 'text', + content: 'Error: Message could not be sent. The connection is closed; please retry.', + }], + }, + ], + streamingBlocks: [], + isStreaming: false, + agentStatus: { state: 'idle' }, + })); + } }, stopSession: () => {