diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index 0d612534e5..4704cdb824 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -1773,6 +1773,36 @@ describe("ChatView timeline estimator parity (full app)", () => { } }); + it("restores the connecting composer state without showing the stop button", async () => { + const mounted = await mountChatView({ + viewport: DEFAULT_VIEWPORT, + snapshot: createSnapshotForTargetUser({ + targetMessageId: "msg-user-connecting-state" as MessageId, + targetText: "connecting state target", + sessionStatus: "starting", + }), + }); + + try { + const connectingButton = await waitForElement( + () => document.querySelector('button[aria-label="Connecting"]'), + "Unable to find connecting send button.", + ); + expect(connectingButton.disabled).toBe(true); + expect( + document.querySelector('button[aria-label="Stop generation"]'), + ).toBeNull(); + + const composerEditor = await waitForElement( + () => document.querySelector('[contenteditable="false"]'), + "Unable to find disabled composer editor.", + ); + expect(composerEditor).toBeTruthy(); + } finally { + await mounted.cleanup(); + } + }); + it("hides the archive action when the pointer leaves a thread row", async () => { const mounted = await mountChatView({ viewport: DEFAULT_VIEWPORT, diff --git a/apps/web/src/components/ChatView.logic.ts b/apps/web/src/components/ChatView.logic.ts index 0a27fb203e..0e24d8cc16 100644 --- a/apps/web/src/components/ChatView.logic.ts +++ b/apps/web/src/components/ChatView.logic.ts @@ -75,8 +75,6 @@ export function collectUserMessageBlobPreviewUrls(message: ChatMessage): string[ return previewUrls; } -export type SendPhase = "idle" | "preparing-worktree" | "sending-turn"; - export interface PullRequestDialogState { initialReference: string | null; key: number; diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index fbd332354a..76f0516f10 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -175,7 +175,6 @@ import { readFileAsDataUrl, revokeBlobPreviewUrl, revokeUserMessagePreviewUrls, - SendPhase, } from "./ChatView.logic"; import { useLocalStorage } from "~/hooks/useLocalStorage"; @@ -246,9 +245,15 @@ interface PendingPullRequestSetupRequest { } export default function ChatView({ threadId }: ChatViewProps) { + const beginThreadSend = useStore((store) => store.beginThreadSend); + const clearThreadSend = useStore((store) => store.clearThreadSend); + const markThreadSendSettled = useStore((store) => store.markThreadSendSettled); + const moveThreadSend = useStore((store) => store.moveThreadSend); const threads = useStore((store) => store.threads); const projects = useStore((store) => store.projects); const markThreadVisited = useStore((store) => store.markThreadVisited); + const pendingThreadSend = useStore((store) => store.pendingThreadSendById[threadId] ?? null); + const setThreadSendPhase = useStore((store) => store.setThreadSendPhase); const syncServerReadModel = useStore((store) => store.syncServerReadModel); const setStoreThreadError = useStore((store) => store.setError); const setStoreThreadBranch = useStore((store) => store.setThreadBranch); @@ -325,14 +330,10 @@ export default function ChatView({ threadId }: ChatViewProps) { const [expandedImage, setExpandedImage] = useState(null); const [optimisticUserMessages, setOptimisticUserMessages] = useState([]); const optimisticUserMessagesRef = useRef(optimisticUserMessages); - optimisticUserMessagesRef.current = optimisticUserMessages; const composerTerminalContextsRef = useRef(composerTerminalContexts); const [localDraftErrorsByThreadId, setLocalDraftErrorsByThreadId] = useState< Record >({}); - const [sendPhase, setSendPhase] = useState("idle"); - const [sendStartedAt, setSendStartedAt] = useState(null); - const [isConnecting, _setIsConnecting] = useState(false); const [isRevertingCheckpoint, setIsRevertingCheckpoint] = useState(false); const [respondingRequestIds, setRespondingRequestIds] = useState([]); const [respondingUserInputRequestIds, setRespondingUserInputRequestIds] = useState< @@ -664,14 +665,18 @@ export default function ChatView({ threadId }: ChatViewProps) { ); const selectedModelForPicker = selectedModel; const phase = derivePhase(activeThread?.session ?? null); - const isSendBusy = sendPhase !== "idle"; - const isPreparingWorktree = sendPhase === "preparing-worktree"; - const isWorking = phase === "running" || isSendBusy || isConnecting || isRevertingCheckpoint; + const activeSendStartedAt = pendingThreadSend?.startedAt ?? null; + const isSendBusy = pendingThreadSend !== null; + const isPreparingWorktree = + pendingThreadSend?.phase === "preparing-worktree" && createWorktreeMutation.isPending; + const isConnecting = phase === "connecting"; + const showConnectingState = isConnecting && !isSendBusy; + const isWorking = phase === "running" || isSendBusy || isRevertingCheckpoint; const nowIso = new Date(nowTick).toISOString(); const activeWorkStartedAt = deriveActiveWorkStartedAt( activeLatestTurn, activeThread?.session ?? null, - sendStartedAt, + activeSendStartedAt, ); const threadActivities = activeThread?.activities ?? EMPTY_ACTIVITIES; const workLogEntries = useMemo( @@ -1138,9 +1143,6 @@ export default function ChatView({ threadId }: ChatViewProps) { null, [composerHighlightedItemId, composerMenuItems], ); - composerMenuOpenRef.current = composerMenuOpen; - composerMenuItemsRef.current = composerMenuItems; - activeComposerMenuItemRef.current = activeComposerMenuItem; const nonPersistedComposerImageIdSet = useMemo( () => new Set(nonPersistedComposerImageIds), [nonPersistedComposerImageIds], @@ -1980,6 +1982,16 @@ export default function ChatView({ threadId }: ChatViewProps) { composerTerminalContextsRef.current = composerTerminalContexts; }, [composerTerminalContexts]); + useEffect(() => { + optimisticUserMessagesRef.current = optimisticUserMessages; + }, [optimisticUserMessages]); + + useEffect(() => { + composerMenuOpenRef.current = composerMenuOpen; + composerMenuItemsRef.current = composerMenuItems; + activeComposerMenuItemRef.current = activeComposerMenuItem; + }, [activeComposerMenuItem, composerMenuItems, composerMenuOpen]); + useEffect(() => { if (!activeThread?.id) return; if (activeThread.messages.length === 0) { @@ -2020,8 +2032,7 @@ export default function ChatView({ threadId }: ChatViewProps) { } return []; }); - setSendPhase("idle"); - setSendStartedAt(null); + sendInFlightRef.current = false; setComposerHighlightedItemId(null); setComposerCursor(collapseExpandedComposerCursor(promptRef.current, promptRef.current.length)); setComposerTrigger(detectComposerTrigger(promptRef.current, promptRef.current.length)); @@ -2152,45 +2163,14 @@ export default function ChatView({ threadId }: ChatViewProps) { : "local"; useEffect(() => { - if (phase !== "running") return; + if (phase !== "running" && !isSendBusy) return; const timer = window.setInterval(() => { setNowTick(Date.now()); }, 1000); return () => { window.clearInterval(timer); }; - }, [phase]); - - const beginSendPhase = useCallback((nextPhase: Exclude) => { - setSendStartedAt((current) => current ?? new Date().toISOString()); - setSendPhase(nextPhase); - }, []); - - const resetSendPhase = useCallback(() => { - setSendPhase("idle"); - setSendStartedAt(null); - }, []); - - useEffect(() => { - if (sendPhase === "idle") { - return; - } - if ( - phase === "running" || - activePendingApproval !== null || - activePendingUserInput !== null || - activeThread?.error - ) { - resetSendPhase(); - } - }, [ - activePendingApproval, - activePendingUserInput, - activeThread?.error, - phase, - resetSendPhase, - sendPhase, - ]); + }, [isSendBusy, phase]); useEffect(() => { if (!activeThreadId) return; @@ -2410,7 +2390,7 @@ export default function ChatView({ threadId }: ChatViewProps) { const api = readNativeApi(); if (!api || !activeThread || isRevertingCheckpoint) return; - if (phase === "running" || isSendBusy || isConnecting) { + if (phase === "running" || isSendBusy) { setThreadError(activeThread.id, "Interrupt the current turn before reverting checkpoints."); return; } @@ -2443,13 +2423,13 @@ export default function ChatView({ threadId }: ChatViewProps) { } setIsRevertingCheckpoint(false); }, - [activeThread, isConnecting, isRevertingCheckpoint, isSendBusy, phase, setThreadError], + [activeThread, isRevertingCheckpoint, isSendBusy, phase, setThreadError], ); const onSend = async (e?: { preventDefault: () => void }) => { e?.preventDefault(); const api = readNativeApi(); - if (!api || !activeThread || isSendBusy || isConnecting || sendInFlightRef.current) return; + if (!api || !activeThread || isSendBusy || sendInFlightRef.current) return; if (activePendingProgress) { onAdvanceActivePendingUserInput(); return; @@ -2528,8 +2508,13 @@ export default function ChatView({ threadId }: ChatViewProps) { return; } + const sendStartedAt = new Date().toISOString(); sendInFlightRef.current = true; - beginSendPhase(baseBranchForWorktree ? "preparing-worktree" : "sending-turn"); + beginThreadSend( + threadIdForSend, + baseBranchForWorktree ? "preparing-worktree" : "sending-turn", + sendStartedAt, + ); const composerImagesSnapshot = [...composerImages]; const composerTerminalContextsSnapshot = [...sendableComposerTerminalContexts]; @@ -2600,10 +2585,9 @@ export default function ChatView({ threadId }: ChatViewProps) { let turnStartSucceeded = false; let nextThreadBranch = activeThread.branch; let nextThreadWorktreePath = activeThread.worktreePath; - await (async () => { + try { // On first message: lock in branch + create worktree if needed. if (baseBranchForWorktree) { - beginSendPhase("preparing-worktree"); const newBranch = buildTemporaryWorktreeBranchName(); const result = await createWorktreeMutation.mutateAsync({ cwd: activeProject.cwd, @@ -2678,10 +2662,8 @@ export default function ChatView({ threadId }: ChatViewProps) { let shouldRunSetupScript = false; if (isServerThread) { shouldRunSetupScript = true; - } else { - if (createdServerThreadForLocalDraft) { - shouldRunSetupScript = true; - } + } else if (createdServerThreadForLocalDraft) { + shouldRunSetupScript = true; } if (shouldRunSetupScript) { const setupScriptOptions: Parameters[1] = { @@ -2715,7 +2697,9 @@ export default function ChatView({ threadId }: ChatViewProps) { }); } - beginSendPhase("sending-turn"); + if (baseBranchForWorktree) { + setThreadSendPhase(threadIdForSend, "sending-turn"); + } const turnAttachments = await turnAttachmentsPromise; await api.orchestration.dispatchCommand({ type: "thread.turn.start", @@ -2734,7 +2718,8 @@ export default function ChatView({ threadId }: ChatViewProps) { createdAt: messageCreatedAt, }); turnStartSucceeded = true; - })().catch(async (err: unknown) => { + markThreadSendSettled(threadIdForSend); + } catch (err: unknown) { if (createdServerThreadForLocalDraft && !turnStartSucceeded) { await api.orchestration .dispatchCommand({ @@ -2769,11 +2754,9 @@ export default function ChatView({ threadId }: ChatViewProps) { threadIdForSend, err instanceof Error ? err.message : "Failed to send message.", ); - }); - sendInFlightRef.current = false; - if (!turnStartSucceeded) { - resetSendPhase(); + clearThreadSend(threadIdForSend); } + sendInFlightRef.current = false; }; const onInterrupt = async () => { @@ -2943,14 +2926,7 @@ export default function ChatView({ threadId }: ChatViewProps) { interactionMode: "default" | "plan"; }) => { const api = readNativeApi(); - if ( - !api || - !activeThread || - !isServerThread || - isSendBusy || - isConnecting || - sendInFlightRef.current - ) { + if (!api || !activeThread || !isServerThread || isSendBusy || sendInFlightRef.current) { return; } @@ -2971,7 +2947,7 @@ export default function ChatView({ threadId }: ChatViewProps) { }); sendInFlightRef.current = true; - beginSendPhase("sending-turn"); + beginThreadSend(threadIdForSend, "sending-turn", new Date().toISOString()); setThreadError(threadIdForSend, null); setOptimisticUserMessages((existing) => [ ...existing, @@ -3023,6 +2999,7 @@ export default function ChatView({ threadId }: ChatViewProps) { : {}), createdAt: messageCreatedAt, }); + markThreadSendSettled(threadIdForSend); // Optimistically open the plan sidebar when implementing (not refining). // "default" mode here means the agent is executing the plan, which produces // step-tracking activities that the sidebar will display. @@ -3030,34 +3007,33 @@ export default function ChatView({ threadId }: ChatViewProps) { planSidebarDismissedForTurnRef.current = null; setPlanSidebarOpen(true); } - sendInFlightRef.current = false; } catch (err) { setOptimisticUserMessages((existing) => existing.filter((message) => message.id !== messageIdForSend), ); + clearThreadSend(threadIdForSend); setThreadError( threadIdForSend, err instanceof Error ? err.message : "Failed to send plan follow-up.", ); - sendInFlightRef.current = false; - resetSendPhase(); } + sendInFlightRef.current = false; }, [ activeThread, activeProposedPlan, - beginSendPhase, forceStickToBottom, - isConnecting, isSendBusy, isServerThread, persistThreadSettingsForNextTurn, - resetSendPhase, runtimeMode, selectedPromptEffort, selectedModelSelection, selectedProvider, selectedProviderModels, + beginThreadSend, + clearThreadSend, + markThreadSendSettled, setComposerDraftInteractionMode, setThreadError, selectedModel, @@ -3073,7 +3049,6 @@ export default function ChatView({ threadId }: ChatViewProps) { !activeProposedPlan || !isServerThread || isSendBusy || - isConnecting || sendInFlightRef.current ) { return; @@ -3094,14 +3069,10 @@ export default function ChatView({ threadId }: ChatViewProps) { const nextThreadModelSelection: ModelSelection = selectedModelSelection; sendInFlightRef.current = true; - beginSendPhase("sending-turn"); - const finish = () => { - sendInFlightRef.current = false; - resetSendPhase(); - }; + beginThreadSend(activeThread.id, "sending-turn", createdAt); - await api.orchestration - .dispatchCommand({ + try { + await api.orchestration.dispatchCommand({ type: "thread.create", commandId: newCommandId(), threadId: nextThreadId, @@ -3113,72 +3084,75 @@ export default function ChatView({ threadId }: ChatViewProps) { branch: activeThread.branch, worktreePath: activeThread.worktreePath, createdAt, - }) - .then(() => { - return api.orchestration.dispatchCommand({ - type: "thread.turn.start", + }); + + await api.orchestration.dispatchCommand({ + type: "thread.turn.start", + commandId: newCommandId(), + threadId: nextThreadId, + message: { + messageId: newMessageId(), + role: "user", + text: outgoingImplementationPrompt, + attachments: [], + }, + modelSelection: selectedModelSelection, + titleSeed: nextThreadTitle, + runtimeMode, + interactionMode: "default", + createdAt, + }); + + moveThreadSend(activeThread.id, nextThreadId); + markThreadSendSettled(nextThreadId); + const snapshot = await api.orchestration.getSnapshot(); + syncServerReadModel(snapshot); + // Signal that the plan sidebar should open on the new thread. + planSidebarOpenOnNextThreadRef.current = true; + await navigate({ + to: "/$threadId", + params: { threadId: nextThreadId }, + }); + } catch (err) { + clearThreadSend(activeThread.id); + clearThreadSend(nextThreadId); + await api.orchestration + .dispatchCommand({ + type: "thread.delete", commandId: newCommandId(), threadId: nextThreadId, - message: { - messageId: newMessageId(), - role: "user", - text: outgoingImplementationPrompt, - attachments: [], - }, - modelSelection: selectedModelSelection, - titleSeed: nextThreadTitle, - runtimeMode, - interactionMode: "default", - createdAt, - }); - }) - .then(() => api.orchestration.getSnapshot()) - .then((snapshot) => { - syncServerReadModel(snapshot); - // Signal that the plan sidebar should open on the new thread. - planSidebarOpenOnNextThreadRef.current = true; - return navigate({ - to: "/$threadId", - params: { threadId: nextThreadId }, - }); - }) - .catch(async (err) => { - await api.orchestration - .dispatchCommand({ - type: "thread.delete", - commandId: newCommandId(), - threadId: nextThreadId, - }) - .catch(() => undefined); - await api.orchestration - .getSnapshot() - .then((snapshot) => { - syncServerReadModel(snapshot); - }) - .catch(() => undefined); - toastManager.add({ - type: "error", - title: "Could not start implementation thread", - description: - err instanceof Error ? err.message : "An error occurred while creating the new thread.", - }); - }) - .then(finish, finish); + }) + .catch(() => undefined); + await api.orchestration + .getSnapshot() + .then((snapshot) => { + syncServerReadModel(snapshot); + }) + .catch(() => undefined); + toastManager.add({ + type: "error", + title: "Could not start implementation thread", + description: + err instanceof Error ? err.message : "An error occurred while creating the new thread.", + }); + } + sendInFlightRef.current = false; }, [ activeProject, activeProposedPlan, activeThread, - beginSendPhase, - isConnecting, isSendBusy, isServerThread, navigate, - resetSendPhase, runtimeMode, selectedPromptEffort, selectedModelSelection, selectedProvider, selectedProviderModels, + beginThreadSend, + clearThreadSend, + markThreadSendSettled, + moveThreadSend, syncServerReadModel, selectedModel, ]); @@ -3846,7 +3820,7 @@ export default function ChatView({ threadId }: ChatViewProps) { ? "Ask for follow-up changes or attach images" : "Ask anything, @tag files/folders, or use / to show available commands" } - disabled={isConnecting || isComposerApprovalState} + disabled={showConnectingState || isComposerApprovalState} /> @@ -4071,9 +4045,13 @@ export default function ChatView({ threadId }: ChatViewProps) { type="submit" size="sm" className="h-9 rounded-full px-4 sm:h-8" - disabled={isSendBusy || isConnecting} + disabled={isSendBusy || showConnectingState} > - {isConnecting || isSendBusy ? "Sending..." : "Refine"} + {showConnectingState + ? "Connecting..." + : isSendBusy + ? "Sending..." + : "Refine"} ) : (
@@ -4081,9 +4059,13 @@ export default function ChatView({ threadId }: ChatViewProps) { type="submit" size="sm" className="h-9 rounded-l-full rounded-r-none px-4 sm:h-8" - disabled={isSendBusy || isConnecting} + disabled={isSendBusy || showConnectingState} > - {isConnecting || isSendBusy ? "Sending..." : "Implement"} + {showConnectingState + ? "Connecting..." + : isSendBusy + ? "Sending..." + : "Implement"} } > @@ -4101,7 +4083,7 @@ export default function ChatView({ threadId }: ChatViewProps) { void onImplementPlanInNewThread()} > Implement in a new thread @@ -4115,19 +4097,21 @@ export default function ChatView({ threadId }: ChatViewProps) { type="submit" className="flex h-9 w-9 enabled:cursor-pointer items-center justify-center rounded-full bg-primary/90 text-primary-foreground transition-all duration-150 hover:bg-primary hover:scale-105 disabled:pointer-events-none disabled:opacity-30 disabled:hover:scale-100 sm:h-8 sm:w-8" disabled={ - isSendBusy || isConnecting || !composerSendState.hasSendableContent + isSendBusy || + showConnectingState || + !composerSendState.hasSendableContent } aria-label={ - isConnecting - ? "Connecting" - : isPreparingWorktree - ? "Preparing worktree" + isPreparingWorktree + ? "Preparing worktree" + : showConnectingState + ? "Connecting" : isSendBusy ? "Sending" : "Send message" } > - {isConnecting || isSendBusy ? ( + {showConnectingState || isSendBusy ? ( - {row.createdAt - ? `Working for ${formatWorkingTimer(row.createdAt, nowIso) ?? "0s"}` - : "Working..."} + {(() => { + const elapsed = row.createdAt ? formatWorkingTimer(row.createdAt, nowIso) : null; + return elapsed ? `Working for ${elapsed}` : "Working..."; + })()}
@@ -643,6 +644,9 @@ function formatWorkingTimer(startIso: string, endIso: string): string | null { } const elapsedSeconds = Math.max(0, Math.floor((endedAtMs - startedAtMs) / 1000)); + if (elapsedSeconds < 1) { + return null; + } if (elapsedSeconds < 60) { return `${elapsedSeconds}s`; } diff --git a/apps/web/src/store.test.ts b/apps/web/src/store.test.ts index db62bad523..038b4d7f99 100644 --- a/apps/web/src/store.test.ts +++ b/apps/web/src/store.test.ts @@ -7,7 +7,16 @@ import { } from "@t3tools/contracts"; import { describe, expect, it } from "vitest"; -import { markThreadUnread, reorderProjects, syncServerReadModel, type AppState } from "./store"; +import { + beginThreadSend, + clearThreadSend, + markThreadUnread, + markThreadSendSettled, + reorderProjects, + setThreadSendPhase, + syncServerReadModel, + type AppState, +} from "./store"; import { DEFAULT_INTERACTION_MODE, DEFAULT_RUNTIME_MODE, type Thread } from "./types"; function makeThread(overrides: Partial = {}): Thread { @@ -39,6 +48,7 @@ function makeThread(overrides: Partial = {}): Thread { function makeState(thread: Thread): AppState { return { + pendingThreadSendById: {}, projects: [ { id: ProjectId.makeUnsafe("project-1"), @@ -171,6 +181,7 @@ describe("store pure functions", () => { const project2 = ProjectId.makeUnsafe("project-2"); const project3 = ProjectId.makeUnsafe("project-3"); const state: AppState = { + pendingThreadSendById: {}, projects: [ { id: project1, @@ -292,6 +303,7 @@ describe("store read model sync", () => { const project2 = ProjectId.makeUnsafe("project-2"); const project3 = ProjectId.makeUnsafe("project-3"); const initialState: AppState = { + pendingThreadSendById: {}, projects: [ { id: project2, @@ -346,4 +358,149 @@ describe("store read model sync", () => { expect(next.projects.map((project) => project.id)).toEqual([project2, project1, project3]); }); + + it("keeps pending sends until the server catches up to the send", () => { + const threadId = ThreadId.makeUnsafe("thread-1"); + const initialState = beginThreadSend( + makeState(makeThread()), + threadId, + "sending-turn", + "2026-02-27T00:01:00.000Z", + ); + + const next = syncServerReadModel( + initialState, + makeReadModel( + makeReadModelThread({ + latestTurn: { + turnId: TurnId.makeUnsafe("turn-1"), + state: "completed", + requestedAt: "2026-02-27T00:00:59.000Z", + startedAt: "2026-02-27T00:01:01.000Z", + completedAt: "2026-02-27T00:01:03.000Z", + assistantMessageId: null, + }, + }), + ), + ); + + expect(next.pendingThreadSendById[threadId]).toEqual({ + phase: "sending-turn", + startedAt: "2026-02-27T00:01:00.000Z", + requestSettled: false, + }); + }); + + it("does not clear pending sends when the turn appears before the request settles", () => { + const threadId = ThreadId.makeUnsafe("thread-1"); + const initialState = beginThreadSend( + makeState(makeThread()), + threadId, + "sending-turn", + "2026-02-27T00:01:00.000Z", + ); + + const next = syncServerReadModel( + initialState, + makeReadModel( + makeReadModelThread({ + latestTurn: { + turnId: TurnId.makeUnsafe("turn-1"), + state: "completed", + requestedAt: "2026-02-27T00:01:01.000Z", + startedAt: "2026-02-27T00:01:02.000Z", + completedAt: "2026-02-27T00:01:03.000Z", + assistantMessageId: null, + }, + }), + ), + ); + + expect(next.pendingThreadSendById[threadId]).toEqual({ + phase: "sending-turn", + startedAt: "2026-02-27T00:01:00.000Z", + requestSettled: false, + }); + }); + + it("clears pending sends once the server session starts running", () => { + const threadId = ThreadId.makeUnsafe("thread-1"); + const initialState = beginThreadSend( + makeState(makeThread()), + threadId, + "sending-turn", + "2026-02-27T00:01:00.000Z", + ); + + const next = syncServerReadModel( + initialState, + makeReadModel( + makeReadModelThread({ + session: { + threadId, + status: "running", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: TurnId.makeUnsafe("turn-1"), + lastError: null, + updatedAt: "2026-02-27T00:01:02.000Z", + }, + }), + ), + ); + + expect(next.pendingThreadSendById[threadId]).toBeUndefined(); + }); + + it("clears settled pending sends once the turn has caught up", () => { + const threadId = ThreadId.makeUnsafe("thread-1"); + const initialState = markThreadSendSettled( + beginThreadSend( + makeState(makeThread()), + threadId, + "sending-turn", + "2026-02-27T00:01:00.000Z", + ), + threadId, + ); + + const next = syncServerReadModel( + initialState, + makeReadModel( + makeReadModelThread({ + latestTurn: { + turnId: TurnId.makeUnsafe("turn-1"), + state: "completed", + requestedAt: "2026-02-27T00:01:01.000Z", + startedAt: "2026-02-27T00:01:02.000Z", + completedAt: "2026-02-27T00:01:03.000Z", + assistantMessageId: null, + }, + }), + ), + ); + + expect(next.pendingThreadSendById[threadId]).toBeUndefined(); + }); +}); + +describe("store pending thread send helpers", () => { + it("updates and clears pending thread send state", () => { + const threadId = ThreadId.makeUnsafe("thread-1"); + const startedState = beginThreadSend( + makeState(makeThread()), + threadId, + "preparing-worktree", + "2026-02-27T00:01:00.000Z", + ); + const sendingState = setThreadSendPhase(startedState, threadId, "sending-turn"); + const settledState = markThreadSendSettled(sendingState, threadId); + const clearedState = clearThreadSend(settledState, threadId); + + expect(startedState.pendingThreadSendById[threadId]?.phase).toBe("preparing-worktree"); + expect(startedState.pendingThreadSendById[threadId]?.requestSettled).toBe(false); + expect(sendingState.pendingThreadSendById[threadId]?.phase).toBe("sending-turn"); + expect(settledState.pendingThreadSendById[threadId]?.requestSettled).toBe(true); + expect(clearedState.pendingThreadSendById[threadId]).toBeUndefined(); + }); }); diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index a5beb5b1bf..6919533726 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -7,12 +7,19 @@ import { } from "@t3tools/contracts"; import { resolveModelSlugForProvider } from "@t3tools/shared/model"; import { create } from "zustand"; -import { type ChatMessage, type Project, type Thread } from "./types"; +import { + type ChatMessage, + type PendingThreadSend, + type PendingThreadSendPhase, + type Project, + type Thread, +} from "./types"; import { Debouncer } from "@tanstack/react-pacer"; // ── State ──────────────────────────────────────────────────────────── export interface AppState { + pendingThreadSendById: Record; projects: Project[]; threads: Thread[]; threadsHydrated: boolean; @@ -32,6 +39,7 @@ const LEGACY_PERSISTED_STATE_KEYS = [ ] as const; const initialState: AppState = { + pendingThreadSendById: {}, projects: [], threads: [], threadsHydrated: false, @@ -111,6 +119,71 @@ function updateThread( return changed ? next : threads; } +function hasLatestTurnCaughtUpToPendingSend( + latestTurnRequestedAt: string | null, + pendingSendStartedAt: string, +): boolean { + if (latestTurnRequestedAt === null) { + return false; + } + const latestTurnRequestedAtMs = Date.parse(latestTurnRequestedAt); + const pendingSendStartedAtMs = Date.parse(pendingSendStartedAt); + if (Number.isNaN(latestTurnRequestedAtMs) || Number.isNaN(pendingSendStartedAtMs)) { + return false; + } + return latestTurnRequestedAtMs >= pendingSendStartedAtMs; +} + +function reconcilePendingThreadSends( + pendingThreadSendById: AppState["pendingThreadSendById"], + readModel: OrchestrationReadModel, +): AppState["pendingThreadSendById"] { + const readModelThreadById = new Map( + readModel.threads.map((thread) => [thread.id, thread] as const), + ); + let changed = false; + const nextPendingThreadSendById: AppState["pendingThreadSendById"] = {}; + + for (const [threadId, pendingSend] of Object.entries(pendingThreadSendById) as Array< + [ThreadId, PendingThreadSend | undefined] + >) { + if (!pendingSend) { + continue; + } + const thread = readModelThreadById.get(threadId); + if (!thread) { + nextPendingThreadSendById[threadId] = pendingSend; + continue; + } + + const authoritativeSendStarted = thread.session?.status === "running"; + const authoritativeSendFailed = thread.session?.lastError != null; + const latestTurnCaughtUp = hasLatestTurnCaughtUpToPendingSend( + thread.latestTurn?.requestedAt ?? null, + pendingSend.startedAt, + ); + + if ( + authoritativeSendStarted || + authoritativeSendFailed || + (pendingSend.requestSettled && latestTurnCaughtUp) + ) { + changed = true; + continue; + } + + nextPendingThreadSendById[threadId] = pendingSend; + } + + if ( + !changed && + Object.keys(nextPendingThreadSendById).length === Object.keys(pendingThreadSendById).length + ) { + return pendingThreadSendById; + } + return nextPendingThreadSendById; +} + function mapProjectsFromReadModel( incoming: OrchestrationReadModel["projects"], previous: Project[], @@ -319,12 +392,121 @@ export function syncServerReadModel(state: AppState, readModel: OrchestrationRea }); return { ...state, + pendingThreadSendById: reconcilePendingThreadSends(state.pendingThreadSendById, readModel), projects, threads, threadsHydrated: true, }; } +export function beginThreadSend( + state: AppState, + threadId: ThreadId, + phase: PendingThreadSendPhase, + startedAt: string, +): AppState { + const existing = state.pendingThreadSendById[threadId]; + if ( + existing?.phase === phase && + existing.startedAt === startedAt && + existing.requestSettled === false + ) { + return state; + } + return { + ...state, + pendingThreadSendById: { + ...state.pendingThreadSendById, + [threadId]: { phase, startedAt, requestSettled: false }, + }, + }; +} + +export function setThreadSendPhase( + state: AppState, + threadId: ThreadId, + phase: PendingThreadSendPhase, +): AppState { + const existing = state.pendingThreadSendById[threadId]; + if (!existing || existing.phase === phase) { + return state; + } + return { + ...state, + pendingThreadSendById: { + ...state.pendingThreadSendById, + [threadId]: { + ...existing, + phase, + }, + }, + }; +} + +export function markThreadSendSettled(state: AppState, threadId: ThreadId): AppState { + const existing = state.pendingThreadSendById[threadId]; + if (!existing) { + return state; + } + const thread = state.threads.find((candidate) => candidate.id === threadId) ?? null; + const authoritativeSendStarted = thread?.session?.orchestrationStatus === "running"; + const authoritativeSendFailed = thread?.session?.lastError != null; + const latestTurnCaughtUp = hasLatestTurnCaughtUpToPendingSend( + thread?.latestTurn?.requestedAt ?? null, + existing.startedAt, + ); + + if (authoritativeSendStarted || authoritativeSendFailed || latestTurnCaughtUp) { + return clearThreadSend(state, threadId); + } + if (existing.requestSettled) { + return state; + } + return { + ...state, + pendingThreadSendById: { + ...state.pendingThreadSendById, + [threadId]: { + ...existing, + requestSettled: true, + }, + }, + }; +} + +export function clearThreadSend(state: AppState, threadId: ThreadId): AppState { + if (!(threadId in state.pendingThreadSendById)) { + return state; + } + const pendingThreadSendById = { ...state.pendingThreadSendById }; + delete pendingThreadSendById[threadId]; + return { + ...state, + pendingThreadSendById, + }; +} + +export function moveThreadSend( + state: AppState, + fromThreadId: ThreadId, + toThreadId: ThreadId, +): AppState { + if (fromThreadId === toThreadId) { + return state; + } + const pendingSend = state.pendingThreadSendById[fromThreadId]; + if (!pendingSend) { + return state; + } + const pendingThreadSendById = { ...state.pendingThreadSendById }; + delete pendingThreadSendById[fromThreadId]; + pendingThreadSendById[toThreadId] = pendingSend; + return { + ...state, + pendingThreadSendById, + }; +} + export function markThreadVisited( state: AppState, threadId: ThreadId, @@ -425,6 +607,10 @@ export function setThreadBranch( // ── Zustand store ──────────────────────────────────────────────────── interface AppStore extends AppState { + beginThreadSend: (threadId: ThreadId, phase: PendingThreadSendPhase, startedAt: string) => void; + clearThreadSend: (threadId: ThreadId) => void; + markThreadSendSettled: (threadId: ThreadId) => void; + moveThreadSend: (fromThreadId: ThreadId, toThreadId: ThreadId) => void; syncServerReadModel: (readModel: OrchestrationReadModel) => void; markThreadVisited: (threadId: ThreadId, visitedAt?: string) => void; markThreadUnread: (threadId: ThreadId) => void; @@ -432,11 +618,18 @@ interface AppStore extends AppState { setProjectExpanded: (projectId: Project["id"], expanded: boolean) => void; reorderProjects: (draggedProjectId: Project["id"], targetProjectId: Project["id"]) => void; setError: (threadId: ThreadId, error: string | null) => void; + setThreadSendPhase: (threadId: ThreadId, phase: PendingThreadSendPhase) => void; setThreadBranch: (threadId: ThreadId, branch: string | null, worktreePath: string | null) => void; } export const useStore = create((set) => ({ ...readPersistedState(), + beginThreadSend: (threadId, phase, startedAt) => + set((state) => beginThreadSend(state, threadId, phase, startedAt)), + clearThreadSend: (threadId) => set((state) => clearThreadSend(state, threadId)), + markThreadSendSettled: (threadId) => set((state) => markThreadSendSettled(state, threadId)), + moveThreadSend: (fromThreadId, toThreadId) => + set((state) => moveThreadSend(state, fromThreadId, toThreadId)), syncServerReadModel: (readModel) => set((state) => syncServerReadModel(state, readModel)), markThreadVisited: (threadId, visitedAt) => set((state) => markThreadVisited(state, threadId, visitedAt)), @@ -447,6 +640,8 @@ export const useStore = create((set) => ({ reorderProjects: (draggedProjectId, targetProjectId) => set((state) => reorderProjects(state, draggedProjectId, targetProjectId)), setError: (threadId, error) => set((state) => setError(state, threadId, error)), + setThreadSendPhase: (threadId, phase) => + set((state) => setThreadSendPhase(state, threadId, phase)), setThreadBranch: (threadId, branch, worktreePath) => set((state) => setThreadBranch(state, threadId, branch, worktreePath)), })); diff --git a/apps/web/src/types.ts b/apps/web/src/types.ts index e6cb1efea6..e10ec374dc 100644 --- a/apps/web/src/types.ts +++ b/apps/web/src/types.ts @@ -16,6 +16,7 @@ import type { } from "@t3tools/contracts"; export type SessionPhase = "disconnected" | "connecting" | "ready" | "running"; +export type PendingThreadSendPhase = "preparing-worktree" | "sending-turn"; export const DEFAULT_RUNTIME_MODE: RuntimeMode = "full-access"; export const DEFAULT_INTERACTION_MODE: ProviderInteractionMode = "default"; @@ -120,3 +121,9 @@ export interface ThreadSession { lastError?: string; orchestrationStatus: OrchestrationSessionStatus; } + +export interface PendingThreadSend { + phase: PendingThreadSendPhase; + startedAt: string; + requestSettled: boolean; +}