Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/ui/src/lib/connection-status.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import assert from "node:assert/strict"
import { describe, it } from "node:test"
import { deriveDisplayConnectionStatus } from "./connection-status.ts"

describe("deriveDisplayConnectionStatus", () => {
it("overlays connecting while transport is down for connected instances", () => {
assert.equal(deriveDisplayConnectionStatus("connected", "disconnected"), "connecting")
})

it("restores previous connected status when transport reconnects", () => {
assert.equal(deriveDisplayConnectionStatus("connected", "connected"), "connected")
})

it("preserves disconnected instance status while transport is down", () => {
assert.equal(deriveDisplayConnectionStatus("disconnected", "disconnected"), "disconnected")
})

it("preserves error instance status while transport is down", () => {
assert.equal(deriveDisplayConnectionStatus("error", "disconnected"), "error")
})

it("does not clear legitimate instance connecting status after transport opens", () => {
assert.equal(deriveDisplayConnectionStatus("connecting", "connected"), "connecting")
})
})
19 changes: 19 additions & 0 deletions packages/ui/src/lib/connection-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { InstanceStreamStatus } from "../../../server/src/api-types"
import type { WorkspaceEventTransportStatus } from "./event-transport"

export type ConnectionStatus = InstanceStreamStatus

export function deriveDisplayConnectionStatus(
instanceStatus: ConnectionStatus | null,
workspaceTransportStatus: WorkspaceEventTransportStatus,
): ConnectionStatus | null {
if (instanceStatus === "disconnected" || instanceStatus === "error") {
return instanceStatus
}

if (workspaceTransportStatus !== "connected") {
return "connecting"
}

return instanceStatus
}
14 changes: 12 additions & 2 deletions packages/ui/src/lib/event-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,30 @@ export interface WorkspaceEventTransportCallbacks {
onBatch: (events: WorkspaceEventPayload[]) => void
onError?: () => void
onOpen?: () => void
onStatus?: (status: WorkspaceEventTransportStatus) => void
onPing?: (payload: { ts?: number }) => void
}

export type WorkspaceEventTransportStatus = "connecting" | "connected" | "disconnected"

export interface WorkspaceEventConnection {
disconnect: () => void
}

async function connectBrowserWorkspaceEvents(
callbacks: WorkspaceEventTransportCallbacks,
): Promise<WorkspaceEventConnection> {
const notifyDisconnected = () => {
callbacks.onStatus?.("disconnected")
callbacks.onError?.()
}
const source = serverApi.connectEvents((event) => {
callbacks.onBatch([event])
}, callbacks.onError, callbacks.onPing)
source.onopen = () => callbacks.onOpen?.()
}, notifyDisconnected, callbacks.onPing)
source.onopen = () => {
callbacks.onStatus?.("connected")
callbacks.onOpen?.()
}
return {
disconnect() {
source.close()
Expand Down
18 changes: 17 additions & 1 deletion packages/ui/src/lib/native/desktop-events.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from "node:assert/strict"
import { describe, it } from "node:test"
import { createTerminalErrorNotifier } from "./desktop-events.ts"
import { createTerminalErrorNotifier, mapDesktopEventTransportStatus } from "./desktop-events.ts"

describe("createTerminalErrorNotifier", () => {
it("calls onError once for repeated terminal notifications", () => {
Expand All @@ -17,3 +17,19 @@ describe("createTerminalErrorNotifier", () => {
assert.equal(errors, 1)
})
})

describe("mapDesktopEventTransportStatus", () => {
it("maps native connected state to shared connected state", () => {
assert.equal(mapDesktopEventTransportStatus("connected"), "connected")
})

it("maps native connecting state to shared connecting state", () => {
assert.equal(mapDesktopEventTransportStatus("connecting"), "connecting")
})

it("maps native transient failures to shared disconnected state", () => {
assert.equal(mapDesktopEventTransportStatus("disconnected"), "disconnected")
assert.equal(mapDesktopEventTransportStatus("error"), "disconnected")
assert.equal(mapDesktopEventTransportStatus("unauthorized"), "disconnected")
})
})
17 changes: 16 additions & 1 deletion packages/ui/src/lib/native/desktop-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import type { WorkspaceEventPayload } from "../../../../server/src/api-types"
import type {
DesktopEventsStartResult,
DesktopEventTransportStartOptions,
DesktopEventTransportState,
DesktopEventTransportStatusPayload,
} from "../event-transport-contract"
import type { WorkspaceEventConnection, WorkspaceEventTransportCallbacks } from "../event-transport"
import type {
WorkspaceEventConnection,
WorkspaceEventTransportCallbacks,
WorkspaceEventTransportStatus,
} from "../event-transport"
import { getLogger } from "../logger"

const log = getLogger("sse")
Expand All @@ -27,6 +32,14 @@ export function createTerminalErrorNotifier(callbacks: Pick<WorkspaceEventTransp
}
}

export function mapDesktopEventTransportStatus(
state: DesktopEventTransportState,
): WorkspaceEventTransportStatus {
if (state === "connected") return "connected"
if (state === "connecting") return "connecting"
return "disconnected"
}

export async function connectTauriWorkspaceEvents(
callbacks: WorkspaceEventTransportCallbacks,
options: DesktopEventTransportStartOptions,
Expand Down Expand Up @@ -59,6 +72,8 @@ export async function connectTauriWorkspaceEvents(
const handleStatusPayload = (payload: DesktopEventTransportStatusPayload) => {
if (!payload || !matchesGeneration(payload.generation)) return

callbacks.onStatus?.(mapDesktopEventTransportStatus(payload.state))

if (payload.state === "connected" && !opened) {
opened = true
callbacks.onOpen?.()
Expand Down
24 changes: 23 additions & 1 deletion packages/ui/src/lib/server-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import { batch as solidBatch } from "solid-js"
import type { WorkspaceEventPayload, WorkspaceEventType } from "../../../server/src/api-types"
import { serverApi } from "./api-client"
import { getClientIdentity } from "./client-identity"
import { connectWorkspaceEvents, type WorkspaceEventConnection } from "./event-transport"
import {
connectWorkspaceEvents,
type WorkspaceEventConnection,
type WorkspaceEventTransportStatus,
} from "./event-transport"
import { getLogger } from "./logger"
import { retryWithBackoff, isRetryableError } from "./retry-utils"

Expand All @@ -21,6 +25,7 @@ function logSse(message: string, context?: Record<string, unknown>) {
class ServerEvents {
private handlers = new Map<WorkspaceEventType | "*", Set<(event: WorkspaceEventPayload) => void>>()
private openHandlers = new Set<() => void>()
private statusHandlers = new Set<(status: WorkspaceEventTransportStatus) => void>()
private connection: WorkspaceEventConnection | null = null
private connectGeneration = 0
private retryDelay = RETRY_BASE_DELAY
Expand Down Expand Up @@ -50,6 +55,12 @@ class ServerEvents {
}
this.scheduleReconnect()
},
onStatus: (status) => {
if (generation !== this.connectGeneration) {
return
}
this.emitTransportStatus(status)
},
onOpen: () => {
if (generation !== this.connectGeneration) {
return
Expand Down Expand Up @@ -105,6 +116,8 @@ class ServerEvents {
this.connection = null
}

this.emitTransportStatus("disconnected")

logSse("Events stream disconnected, scheduling reconnect", { delayMs: this.retryDelay })
this.retryTimer = setTimeout(() => {
this.retryTimer = null
Expand Down Expand Up @@ -140,6 +153,10 @@ class ServerEvents {
})
}

private emitTransportStatus(status: WorkspaceEventTransportStatus) {
this.statusHandlers.forEach((handler) => handler(status))
}

on(type: WorkspaceEventType | "*", handler: (event: WorkspaceEventPayload) => void): () => void {
if (!this.handlers.has(type)) {
this.handlers.set(type, new Set())
Expand All @@ -154,6 +171,11 @@ class ServerEvents {
return () => this.openHandlers.delete(handler)
}

onTransportStatus(handler: (status: WorkspaceEventTransportStatus) => void): () => void {
this.statusHandlers.add(handler)
return () => this.statusHandlers.delete(handler)
}

restart(reason = "manual restart"): void {
this.retryDelay = RETRY_BASE_DELAY
this.clearReconnectTimer()
Expand Down
15 changes: 11 additions & 4 deletions packages/ui/src/lib/sse-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import type {
} from "@opencode-ai/sdk/v2"
import type { LegacyPermissionAskedEvent, LegacyPermissionRepliedEvent } from "../types/permission"
import { serverEvents } from "./server-events"
import type { WorkspaceEventTransportStatus } from "./event-transport"
import type {
BackgroundProcess,
InstanceStreamEvent,
InstanceStreamStatus,
WorkspaceEventPayload,
} from "../../../server/src/api-types"
import { getLogger } from "./logger"
import { deriveDisplayConnectionStatus, type ConnectionStatus } from "./connection-status"

const log = getLogger("sse")

Expand Down Expand Up @@ -95,12 +96,13 @@ type SSEEvent =
| ServerInstanceDisposedEvent
| { type: string; properties?: Record<string, unknown> }

type ConnectionStatus = InstanceStreamStatus

const [connectionStatus, setConnectionStatus] = createSignal<Map<string, ConnectionStatus>>(new Map())
const [transportStatus, setTransportStatus] = createSignal<WorkspaceEventTransportStatus>("connecting")

class SSEManager {
constructor() {
log.info("sseManager initialized: listening for SSE disconnect and reconnect")

serverEvents.on("instance.eventStatus", (event) => {
const payload = event as InstanceStatusPayload
this.updateConnectionStatus(payload.instanceId, payload.status)
Expand All @@ -118,6 +120,11 @@ class SSEManager {
this.updateConnectionStatus(payload.instanceId, "connected")
this.handleEvent(payload.instanceId, payload.event as SSEEvent)
})

serverEvents.onTransportStatus((status) => {
log.info("SSE transport status changed", { status })
setTransportStatus(status)
})
}

seedStatus(instanceId: string, status: ConnectionStatus) {
Expand Down Expand Up @@ -240,7 +247,7 @@ class SSEManager {
onConnectionLost?: (instanceId: string, reason: string) => void | Promise<void>

getStatus(instanceId: string): ConnectionStatus | null {
return connectionStatus().get(instanceId) ?? null
return deriveDisplayConnectionStatus(connectionStatus().get(instanceId) ?? null, transportStatus())
}

getStatuses() {
Expand Down
Loading