diff --git a/apps/server/src/git/Services/TextGeneration.ts b/apps/server/src/git/Services/TextGeneration.ts index 0df2fff62c..3f6b7a43ae 100644 --- a/apps/server/src/git/Services/TextGeneration.ts +++ b/apps/server/src/git/Services/TextGeneration.ts @@ -13,7 +13,7 @@ import type { ChatAttachment, ModelSelection } from "@t3tools/contracts"; import type { TextGenerationError } from "../Errors.ts"; /** Providers that support git text generation (commit messages, PR content, branch names). */ -export type TextGenerationProvider = "codex" | "claudeAgent"; +export type TextGenerationProvider = "codex" | "claudeAgent" | "droid"; export interface CommitMessageGenerationInput { cwd: string; diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 529eae2444..27e921ca13 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -1325,7 +1325,7 @@ describe("ProviderRuntimeIngestion", () => { }); it("buffers assistant deltas by default until completion", async () => { - const harness = await createHarness(); + const harness = await createHarness({ serverSettings: { enableAssistantStreaming: false } }); const now = new Date().toISOString(); harness.emit({ @@ -1486,6 +1486,243 @@ describe("ProviderRuntimeIngestion", () => { expect(finalMessage?.streaming).toBe(false); }); + it("stitches Droid assistant token fragments back into spaced text", async () => { + const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); + const now = new Date().toISOString(); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-droid-spacing"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + }); + await waitForThread( + harness.engine, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-droid-spacing", + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-spacing-1"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + itemId: asItemId("item-droid-spacing"), + payload: { + streamKind: "assistant_text", + delta: "I'm", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-spacing-2"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + itemId: asItemId("item-droid-spacing"), + payload: { + streamKind: "assistant_text", + delta: "Droid, an AI software engineering agent", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-spacing-3"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + itemId: asItemId("item-droid-spacing"), + payload: { + streamKind: "assistant_text", + delta: "built by Factory.", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-spacing-4"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + itemId: asItemId("item-droid-spacing"), + payload: { + streamKind: "assistant_text", + delta: "Right", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-spacing-5"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + itemId: asItemId("item-droid-spacing"), + payload: { + streamKind: "assistant_text", + delta: "now I'm sitting in your", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-spacing-6"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + itemId: asItemId("item-droid-spacing"), + payload: { + streamKind: "assistant_text", + delta: "`apps", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-spacing-7"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + itemId: asItemId("item-droid-spacing"), + payload: { + streamKind: "assistant_text", + delta: "/server` directory.", + }, + }); + + const liveThread = await waitForThread(harness.engine, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === "assistant:item-droid-spacing" && message.streaming, + ), + ); + const liveMessage = liveThread.messages.find( + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-droid-spacing", + ); + expect(liveMessage?.streaming).toBe(true); + + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-message-completed-droid-spacing"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-spacing"), + itemId: asItemId("item-droid-spacing"), + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + const finalThread = await waitForThread(harness.engine, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === "assistant:item-droid-spacing" && !message.streaming, + ), + ); + const finalMessage = finalThread.messages.find( + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-droid-spacing", + ); + expect(finalMessage?.text).toBe( + "I'm Droid, an AI software engineering agent built by Factory. Right now I'm sitting in your `apps/server` directory.", + ); + expect(finalMessage?.streaming).toBe(false); + }); + + it("does not inject spaces into open double-backtick Droid inline code spans", async () => { + const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); + const now = new Date().toISOString(); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-droid-double-backtick"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-double-backtick"), + }); + await waitForThread( + harness.engine, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-droid-double-backtick", + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-double-backtick-1"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-double-backtick"), + itemId: asItemId("item-droid-double-backtick"), + payload: { + streamKind: "assistant_text", + delta: "Here is ``", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-double-backtick-2"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-double-backtick"), + itemId: asItemId("item-droid-double-backtick"), + payload: { + streamKind: "assistant_text", + delta: "code span", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-droid-double-backtick-3"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-double-backtick"), + itemId: asItemId("item-droid-double-backtick"), + payload: { + streamKind: "assistant_text", + delta: "`` done", + }, + }); + + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-message-completed-droid-double-backtick"), + provider: "droid", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-droid-double-backtick"), + itemId: asItemId("item-droid-double-backtick"), + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + const finalThread = await waitForThread(harness.engine, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === "assistant:item-droid-double-backtick" && !message.streaming, + ), + ); + const finalMessage = finalThread.messages.find( + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-droid-double-backtick", + ); + expect(finalMessage?.text).toBe("Here is ``code span`` done"); + expect(finalMessage?.streaming).toBe(false); + }); + it("spills oversized buffered deltas and still finalizes full assistant text", async () => { const harness = await createHarness(); const now = new Date().toISOString(); @@ -1967,7 +2204,9 @@ describe("ProviderRuntimeIngestion", () => { : undefined; expect(toolUpdate?.kind).toBe("tool.updated"); expect(toolUpdatePayload?.itemType).toBe("command_execution"); + expect(toolUpdatePayload?.title).toBe("Run tests"); expect(toolUpdatePayload?.status).toBe("in_progress"); + expect(toolUpdatePayload?.detail).toBe("bun test"); const warning = thread.activities.find( (activity: ProviderRuntimeTestActivity) => activity.id === "evt-runtime-warning", diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index b479d1c28a..11a7b66e5f 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -83,6 +83,71 @@ function normalizeProposedPlanMarkdown(planMarkdown: string | undefined): string return trimmed; } +function hasUnclosedInlineCodeFence(value: string): boolean { + let openFenceLength: number | null = null; + + for (const match of value.matchAll(/`+/g)) { + const fenceLength = match[0].length; + if (openFenceLength === null) { + openFenceLength = fenceLength; + continue; + } + if (fenceLength === openFenceLength) { + openFenceLength = null; + } + } + + return openFenceLength !== null; +} + +function normalizeAssistantDelta(input: { + provider: ProviderRuntimeEvent["provider"]; + previousText: string; + delta: string; +}): string { + if (input.provider !== "droid") { + return input.delta; + } + if (input.previousText.length === 0 || input.delta.length === 0) { + return input.delta; + } + if (/\s$/.test(input.previousText) || /^\s/.test(input.delta)) { + return input.delta; + } + if (hasUnclosedInlineCodeFence(input.previousText)) { + return input.delta; + } + + const previousLast = input.previousText.at(-1) ?? ""; + const nextFirst = input.delta[0] ?? ""; + + if (["'", '"', ",", ".", ";", ":", "!", "?", "%", ")", "]", "}", "/"].includes(nextFirst)) { + return input.delta; + } + if (["(", "[", "{", "/", "-", "_"].includes(previousLast)) { + return input.delta; + } + + const previousEndsWordLike = /[\p{L}\p{N}]$/u.test(input.previousText); + const nextStartsWordLike = /^[\p{L}\p{N}]/u.test(input.delta); + + if (previousEndsWordLike && (nextStartsWordLike || nextFirst === "`")) { + return ` ${input.delta}`; + } + if ([".", "!", "?", ":", ";"].includes(previousLast) && nextStartsWordLike) { + return ` ${input.delta}`; + } + if ( + previousLast === "`" && + !hasUnclosedInlineCodeFence(input.previousText) && + nextStartsWordLike + ) { + return ` ${input.delta}`; + } + + return input.delta; +} + function proposedPlanIdForTurn(threadId: ThreadId, turnId: TurnId): string { return `plan:${threadId}:turn:${turnId}`; } @@ -441,6 +506,7 @@ function runtimeEventToActivities( summary: event.payload.title ?? "Tool updated", payload: { itemType: event.payload.itemType, + ...(event.payload.title ? { title: event.payload.title } : {}), ...(event.payload.status ? { status: event.payload.status } : {}), ...(event.payload.detail ? { detail: truncateDetail(event.payload.detail) } : {}), ...(event.payload.data !== undefined ? { data: event.payload.data } : {}), @@ -464,7 +530,9 @@ function runtimeEventToActivities( summary: event.payload.title ?? "Tool", payload: { itemType: event.payload.itemType, + ...(event.payload.title ? { title: event.payload.title } : {}), ...(event.payload.detail ? { detail: truncateDetail(event.payload.detail) } : {}), + ...(event.payload.data !== undefined ? { data: event.payload.data } : {}), }, turnId: toTurnId(event.turnId) ?? null, ...maybeSequence, @@ -485,7 +553,9 @@ function runtimeEventToActivities( summary: `${event.payload.title ?? "Tool"} started`, payload: { itemType: event.payload.itemType, + ...(event.payload.title ? { title: event.payload.title } : {}), ...(event.payload.detail ? { detail: truncateDetail(event.payload.detail) } : {}), + ...(event.payload.data !== undefined ? { data: event.payload.data } : {}), }, turnId: toTurnId(event.turnId) ?? null, ...maybeSequence, @@ -617,6 +687,11 @@ const make = Effect.fn("make")(function* () { const clearBufferedAssistantText = (messageId: MessageId) => Cache.invalidate(bufferedAssistantTextByMessageId, messageId); + const peekBufferedAssistantText = (messageId: MessageId) => + Cache.getOption(bufferedAssistantTextByMessageId, messageId).pipe( + Effect.map((existingText) => Option.getOrElse(existingText, () => "")), + ); + const appendBufferedProposedPlan = (planId: string, delta: string, createdAt: string) => Cache.getOption(bufferedProposedPlanById, planId).pipe( Effect.flatMap((existingEntry) => { @@ -1003,6 +1078,15 @@ const make = Effect.fn("make")(function* () { const assistantMessageId = MessageId.makeUnsafe( `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, ); + const existingAssistantMessage = thread.messages.find( + (entry) => entry.id === assistantMessageId, + ); + const bufferedAssistantText = yield* peekBufferedAssistantText(assistantMessageId); + const normalizedAssistantDelta = normalizeAssistantDelta({ + provider: event.provider, + previousText: `${existingAssistantMessage?.text ?? ""}${bufferedAssistantText}`, + delta: assistantDelta, + }); const turnId = toTurnId(event.turnId); if (turnId) { yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId); @@ -1013,7 +1097,10 @@ const make = Effect.fn("make")(function* () { (settings) => (settings.enableAssistantStreaming ? "streaming" : "buffered"), ); if (assistantDeliveryMode === "buffered") { - const spillChunk = yield* appendBufferedAssistantText(assistantMessageId, assistantDelta); + const spillChunk = yield* appendBufferedAssistantText( + assistantMessageId, + normalizedAssistantDelta, + ); if (spillChunk.length > 0) { yield* orchestrationEngine.dispatch({ type: "thread.message.assistant.delta", @@ -1031,7 +1118,7 @@ const make = Effect.fn("make")(function* () { commandId: providerCommandId(event, "assistant-delta"), threadId: thread.id, messageId: assistantMessageId, - delta: assistantDelta, + delta: normalizedAssistantDelta, ...(turnId ? { turnId } : {}), createdAt: now, }); diff --git a/apps/server/src/provider/Layers/DroidAdapter.ts b/apps/server/src/provider/Layers/DroidAdapter.ts new file mode 100644 index 0000000000..abe31beead --- /dev/null +++ b/apps/server/src/provider/Layers/DroidAdapter.ts @@ -0,0 +1,403 @@ +/** + * DroidAdapterLive - ACP (Agent Client Protocol) provider adapter. + * + * Spawns `droid exec --output-format acp` as a child process per session and + * speaks JSON-RPC 2.0 over stdio. Maps ACP session/update notifications into + * canonical ProviderRuntimeEvent vocabulary. + * + * @module DroidAdapterLive + */ +import { + type ProviderRuntimeEvent, + type ProviderSession, + type ProviderTurnStartResult, + EventId, + RuntimeItemId, + ThreadId, + TurnId, +} from "@t3tools/contracts"; +import { DateTime, Effect, Layer, Queue, Random, Stream } from "effect"; + +import { + ProviderAdapterValidationError, + ProviderAdapterRequestError, + ProviderAdapterSessionNotFoundError, + type ProviderAdapterError, +} from "../Errors.ts"; +import { DroidAdapter, type DroidAdapterShape } from "../Services/DroidAdapter.ts"; +import { + type AcpSessionState, + createAcpRemoteSession, + initializeAcpSession, + sendAcpRequest, + spawnAcpProcessSession, + stopAcpProcessSession, + wireAcpProcessMessages, +} from "../acpCore.ts"; +import { makeAcpRuntimeBridge } from "../acpRuntimeBridge.ts"; +import { ServerSettingsService } from "../../serverSettings.ts"; + +const PROVIDER = "droid" as const; +const DROID_PREFERRED_ENABLED_TOOLS = ["task-cli"] as const; + +function getDroidAutoLevel(runtimeMode: ProviderSession["runtimeMode"]): string | undefined { + return runtimeMode === "full-access" ? "high" : undefined; +} + +function getDroidReasoningEffort(input: { + modelSelection?: { provider: string; options?: Record | undefined } | undefined; +}): string | undefined { + if (input.modelSelection?.provider !== PROVIDER) { + return undefined; + } + const effort = input.modelSelection.options?.effort; + return typeof effort === "string" && effort.length > 0 ? effort : undefined; +} + +function toParamsRecord(value: unknown): Record { + return value !== null && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : {}; +} + +// ── Layer Implementation ──────────────────────────────────────────── + +export const DroidAdapterLive = Layer.effect( + DroidAdapter, + Effect.gen(function* () { + const sessions = new Map(); + const runtimeEventQueue = yield* Queue.unbounded(); + const serverSettingsService = yield* ServerSettingsService; + + const nowIso = Effect.map(DateTime.now, DateTime.formatIso); + const nextEventId = Effect.map(Random.nextUUIDv4, (id) => EventId.makeUnsafe(id)); + const nextTurnId = Effect.map(Random.nextUUIDv4, (id) => TurnId.makeUnsafe(id)); + const nextItemId = Effect.map(Random.nextUUIDv4, (id) => RuntimeItemId.makeUnsafe(id)); + const makeStamp = () => Effect.all({ eventId: nextEventId, createdAt: nowIso }); + + const offerEvent = (event: ProviderRuntimeEvent): Effect.Effect => + Queue.offer(runtimeEventQueue, event).pipe(Effect.asVoid); + + const getSession = ( + threadId: ThreadId, + ): Effect.Effect => { + const session = sessions.get(threadId); + if (!session) { + return Effect.fail( + new ProviderAdapterSessionNotFoundError({ provider: PROVIDER, threadId }), + ); + } + return Effect.succeed(session); + }; + + const acpRuntimeBridge = makeAcpRuntimeBridge({ + provider: PROVIDER, + logLabel: "[DroidAdapter] session/update", + makeStamp, + nextItemId, + offerEvent, + }); + const { + emitSessionStarted, + emitTurnStarted, + emitTurnCompleted, + emitRuntimeError, + emitSessionExited, + handleSessionUpdate, + closeOpenToolCallsForTurn, + completeOpenStreamItemsForTurn, + clearSessionState, + } = acpRuntimeBridge; + + const runDetachedEffect = ( + effect: Effect.Effect, + label: string, + metadata: Record, + ): void => { + Effect.runPromise(effect).catch((cause) => { + console.error(`[DroidAdapter] ${label} failed`, { ...metadata, cause }); + }); + }; + + const finalizeSession = (session: AcpSessionState) => + Effect.gen(function* () { + if (sessions.get(session.threadId) !== session) { + return; + } + if (session.activeTurnId) { + yield* completeOpenStreamItemsForTurn(session.threadId, session.activeTurnId); + yield* closeOpenToolCallsForTurn(session.threadId, session.activeTurnId, "failed"); + } + session.activeTurnId = null; + session.status = "closed"; + yield* clearSessionState(session.threadId); + sessions.delete(session.threadId); + yield* emitSessionExited(session.threadId); + }); + + const settleTurnState = ( + session: AcpSessionState, + turnId: TurnId, + status: ProviderSession["status"], + ): Effect.Effect => + Effect.sync(() => { + if (session.activeTurnId !== turnId) { + return; + } + session.activeTurnId = null; + session.status = status; + }); + + // ── Adapter interface ─────────────────────────────────────────── + + const startSession: DroidAdapterShape["startSession"] = (input) => + Effect.gen(function* () { + const settings = yield* serverSettingsService.getSettings.pipe(Effect.orDie); + const binaryPath = settings.providers.droid.binaryPath; + const cwd = input.cwd ?? process.cwd(); + const runtimeMode = input.runtimeMode ?? "full-access"; + + const model = + input.modelSelection?.provider === PROVIDER ? input.modelSelection.model : undefined; + const reasoningEffort = getDroidReasoningEffort(input); + const autoLevel = getDroidAutoLevel(runtimeMode); + const args = ["exec", "--output-format", "acp"]; + if (model) { + args.push("--model", model); + } + if (reasoningEffort) { + args.push("--reasoning-effort", reasoningEffort); + } + if (autoLevel) { + args.push("--auto", autoLevel); + } + if (autoLevel === "high") { + args.push("--enabled-tools", DROID_PREFERRED_ENABLED_TOOLS.join(",")); + } + const session = yield* spawnAcpProcessSession({ + provider: PROVIDER, + threadId: input.threadId, + binaryPath, + args, + cwd, + runtimeMode, + ...(input.modelSelection?.model ? { model: input.modelSelection.model } : {}), + }); + + wireAcpProcessMessages({ + session, + onNotification: (_method, params) => handleSessionUpdate(session, toParamsRecord(params)), + onUnhandledNotification: (method) => + Effect.logDebug("[DroidAdapter] unhandled notification", { + method, + threadId: session.threadId, + }), + onUnhandledMessage: (message) => + Effect.logDebug("[DroidAdapter] unhandled message", { + hasMethod: "method" in message, + hasId: "id" in message, + threadId: session.threadId, + }), + onExit: () => finalizeSession(session), + }); + + yield* Effect.gen(function* () { + yield* initializeAcpSession({ + provider: PROVIDER, + session, + clientName: "t3-code", + clientVersion: "0.1.0", + }); + yield* createAcpRemoteSession({ provider: PROVIDER, session, cwd }); + }).pipe( + Effect.catch((error) => + Effect.gen(function* () { + stopAcpProcessSession(session); + yield* clearSessionState(session.threadId); + return yield* error; + }), + ), + ); + + sessions.set(input.threadId, session); + session.status = "ready"; + yield* emitSessionStarted(input.threadId); + + return { + provider: PROVIDER, + status: "ready", + runtimeMode, + cwd, + model: input.modelSelection?.model, + threadId: input.threadId, + createdAt: session.createdAt, + updatedAt: session.createdAt, + } satisfies ProviderSession; + }); + + const sendTurn: DroidAdapterShape["sendTurn"] = (input) => + Effect.gen(function* () { + const session = yield* getSession(input.threadId); + if (session.activeTurnId) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "sendTurn", + issue: "Cannot start a new Droid turn while another turn is still running.", + }); + } + const turnId = yield* nextTurnId; + session.activeTurnId = turnId; + session.status = "running"; + + yield* emitTurnStarted( + input.threadId, + turnId, + session.model, + getDroidReasoningEffort(input), + ); + + // Build prompt content blocks + const promptBlocks: Array<{ type: string; text?: string }> = []; + if (input.input) { + promptBlocks.push({ type: "text", text: input.input }); + } + + // Fire session/prompt asynchronously -- the ACP response closes the turn. + runDetachedEffect( + Effect.gen(function* () { + const promptStart = Date.now(); + yield* Effect.tryPromise({ + try: () => + sendAcpRequest(session, "session/prompt", { + sessionId: session.acpSessionId, + prompt: promptBlocks, + }), + catch: (cause) => + new ProviderAdapterRequestError({ + provider: PROVIDER, + method: "session/prompt", + detail: cause instanceof Error ? cause.message : String(cause), + cause, + }), + }); + + yield* Effect.logDebug("[DroidAdapter] session/prompt resolved", { + threadId: session.threadId, + turnId, + elapsedSec: ((Date.now() - promptStart) / 1000).toFixed(1), + }); + + yield* completeOpenStreamItemsForTurn(session.threadId, turnId); + yield* closeOpenToolCallsForTurn(session.threadId, turnId, "completed"); + + yield* settleTurnState(session, turnId, "ready"); + yield* emitTurnCompleted(session.threadId, turnId, "completed"); + }).pipe( + Effect.catch((error) => + Effect.gen(function* () { + yield* completeOpenStreamItemsForTurn(session.threadId, turnId); + yield* closeOpenToolCallsForTurn(session.threadId, turnId, "failed"); + yield* settleTurnState(session, turnId, "ready"); + yield* emitRuntimeError(session.threadId, turnId, error.message); + yield* emitTurnCompleted(session.threadId, turnId, "failed", error.message); + }), + ), + ), + "session/prompt", + { threadId: session.threadId, turnId }, + ); + + return { + threadId: input.threadId, + turnId, + } satisfies ProviderTurnStartResult; + }); + + const interruptTurn: DroidAdapterShape["interruptTurn"] = (threadId) => + Effect.gen(function* () { + const session = yield* getSession(threadId); + if (session.acpSessionId) { + yield* Effect.tryPromise({ + try: () => + sendAcpRequest(session, "session/cancel", { + sessionId: session.acpSessionId, + }), + catch: () => undefined, + }).pipe(Effect.ignore); + } + }); + + const respondToRequest: DroidAdapterShape["respondToRequest"] = () => Effect.void; + + const respondToUserInput: DroidAdapterShape["respondToUserInput"] = () => Effect.void; + + const stopSession: DroidAdapterShape["stopSession"] = (threadId) => + Effect.gen(function* () { + const session = sessions.get(threadId); + if (!session) return; + stopAcpProcessSession(session); + yield* finalizeSession(session); + }); + + const listSessions: DroidAdapterShape["listSessions"] = () => + Effect.sync(() => { + const now = new Date().toISOString(); + return Array.from(sessions.values()).map( + (s): ProviderSession => + Object.assign( + { + provider: PROVIDER, + status: s.status, + runtimeMode: s.runtimeMode, + cwd: s.cwd, + model: s.model, + threadId: s.threadId, + createdAt: s.createdAt, + updatedAt: now, + }, + s.activeTurnId ? { activeTurnId: s.activeTurnId } : {}, + ), + ); + }); + + const hasSession: DroidAdapterShape["hasSession"] = (threadId) => + Effect.sync(() => sessions.has(threadId)); + + const readThread: DroidAdapterShape["readThread"] = (threadId) => + Effect.succeed({ threadId, turns: [] }); + + const rollbackThread: DroidAdapterShape["rollbackThread"] = (threadId) => + Effect.succeed({ threadId, turns: [] }); + + const stopAll: DroidAdapterShape["stopAll"] = () => + Effect.gen(function* () { + for (const threadId of Array.from(sessions.keys())) { + const session = sessions.get(threadId); + if (!session) { + continue; + } + stopAcpProcessSession(session); + yield* finalizeSession(session); + } + }); + + return { + provider: PROVIDER, + capabilities: { sessionModelSwitch: "restart-session" }, + startSession, + sendTurn, + interruptTurn, + respondToRequest, + respondToUserInput, + stopSession, + listSessions, + hasSession, + readThread, + rollbackThread, + stopAll, + get streamEvents() { + return Stream.fromQueue(runtimeEventQueue); + }, + } satisfies DroidAdapterShape; + }), +); diff --git a/apps/server/src/provider/Layers/DroidProvider.ts b/apps/server/src/provider/Layers/DroidProvider.ts new file mode 100644 index 0000000000..ded06177e6 --- /dev/null +++ b/apps/server/src/provider/Layers/DroidProvider.ts @@ -0,0 +1,309 @@ +import type { DroidSettings, ModelCapabilities, ServerProviderModel } from "@t3tools/contracts"; +import { Effect, Equal, Layer, Option, Result, Stream } from "effect"; +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; + +import { + buildServerProvider, + DEFAULT_TIMEOUT_MS, + detailFromResult, + isCommandMissingCause, + parseGenericCliVersion, + providerModelsFromSettings, + spawnAndCollect, +} from "../providerSnapshot"; +import { makeManagedServerProvider } from "../makeManagedServerProvider"; +import { DroidProvider } from "../Services/DroidProvider"; +import { ServerSettingsService } from "../../serverSettings"; + +const PROVIDER = "droid" as const; +const FALLBACK_MODELS: ReadonlyArray = [ + { slug: "claude-opus-4-6", name: "Claude Opus 4.6", isCustom: false, capabilities: null }, +]; + +const MODEL_LINE_RE = /^\s{2}(\S+)\s{2,}(.+)$/; +const MODEL_DETAILS_LINE_RE = + /^\s*-\s(.+?): supports reasoning: (Yes|No); supported: \[([^\]]*)\]; default: (\S+)$/; + +function toEffortLabel(value: string): string { + switch (value) { + case "xhigh": + return "Extra High"; + case "none": + case "off": + return "Off"; + default: + return value + .split("-") + .map((segment) => segment.charAt(0).toUpperCase() + segment.slice(1)) + .join(" "); + } +} + +function parseCapabilitiesFromHelp(helpText: string): Map { + const capabilitiesByName = new Map(); + let inSection = false; + + for (const line of helpText.split("\n")) { + if (line.startsWith("Model details:")) { + inSection = true; + continue; + } + if (!inSection) continue; + if (line.trim() === "") continue; + if (!line.trimStart().startsWith("- ")) { + break; + } + + const match = MODEL_DETAILS_LINE_RE.exec(line); + if (!match?.[1] || !match[2] || !match[3] || !match[4]) continue; + + const name = match[1].trim(); + const supportsReasoning = match[2] === "Yes"; + const supportedValues = match[3] + .split(",") + .map((value) => value.trim()) + .filter((value) => value.length > 0); + const defaultValue = match[4].trim(); + + capabilitiesByName.set(name, { + reasoningEffortLevels: supportsReasoning + ? supportedValues.map((value) => + value === defaultValue + ? { value, label: toEffortLabel(value), isDefault: true } + : { value, label: toEffortLabel(value) }, + ) + : [], + supportsFastMode: false, + supportsThinkingToggle: false, + contextWindowOptions: [], + promptInjectedEffortLevels: [], + }); + } + + return capabilitiesByName; +} + +function normalizeCapabilityKey(value: string): string { + return value.toLowerCase().replace(/[^a-z0-9]+/g, ""); +} + +function resolveCapabilitiesForModel( + slug: string, + name: string, + capabilitiesByName: Map, +): ModelCapabilities | null { + const direct = capabilitiesByName.get(name); + if (direct) { + return direct; + } + + const normalizedSlug = normalizeCapabilityKey(slug.replace(/^custom:/, "")); + const normalizedName = normalizeCapabilityKey(name); + + for (const [candidateName, capabilities] of capabilitiesByName.entries()) { + const normalizedCandidate = normalizeCapabilityKey(candidateName); + if ( + normalizedCandidate === normalizedName || + normalizedCandidate === normalizedSlug || + normalizedName.includes(normalizedCandidate) || + normalizedCandidate.includes(normalizedName) || + normalizedSlug.includes(normalizedCandidate) || + normalizedCandidate.includes(normalizedSlug) + ) { + return capabilities; + } + } + + return null; +} + +function parseModelsFromHelp(helpText: string): ReadonlyArray { + const models: ServerProviderModel[] = []; + const capabilitiesByName = parseCapabilitiesFromHelp(helpText); + let inSection = false; + + for (const line of helpText.split("\n")) { + if (line.startsWith("Available Models:")) { + inSection = true; + continue; + } + if (line.startsWith("Custom Models:")) { + inSection = true; + continue; + } + if (inSection && line.trim() === "") { + inSection = false; + continue; + } + if (!inSection) continue; + + const match = MODEL_LINE_RE.exec(line); + if (!match?.[1] || !match[2]) continue; + const slug = match[1]; + let name = match[2].trim(); + if (name.includes("[Deprecated]")) continue; + if (name.includes("(default)")) name = name.replace("(default)", "").trim(); + models.push({ + slug, + name, + isCustom: slug.startsWith("custom:"), + capabilities: resolveCapabilitiesForModel(slug, name, capabilitiesByName), + }); + } + + return models.length > 0 ? models : FALLBACK_MODELS; +} + +const runDroidCommand = (args: ReadonlyArray) => + Effect.gen(function* () { + const droidSettings = yield* Effect.service(ServerSettingsService).pipe( + Effect.flatMap((service) => service.getSettings), + Effect.map((settings) => settings.providers.droid), + ); + const command = ChildProcess.make(droidSettings.binaryPath, [...args], { + shell: process.platform === "win32", + }); + return yield* spawnAndCollect(droidSettings.binaryPath, command); + }); + +export const checkDroidProviderStatus = Effect.gen(function* () { + const droidSettings = yield* Effect.service(ServerSettingsService).pipe( + Effect.flatMap((service) => service.getSettings), + Effect.map((settings) => settings.providers.droid), + ); + const checkedAt = new Date().toISOString(); + const disabledModels = providerModelsFromSettings( + FALLBACK_MODELS, + PROVIDER, + droidSettings.customModels, + ); + + if (!droidSettings.enabled) { + return buildServerProvider({ + provider: PROVIDER, + enabled: false, + checkedAt, + models: disabledModels, + probe: { + installed: false, + version: null, + status: "warning", + auth: { status: "unknown" }, + message: "Droid is disabled in T3 Code settings.", + }, + }); + } + + const versionProbe = yield* runDroidCommand(["--version"]).pipe( + Effect.timeoutOption(DEFAULT_TIMEOUT_MS), + Effect.result, + ); + + if (Result.isFailure(versionProbe)) { + const error = versionProbe.failure; + return buildServerProvider({ + provider: PROVIDER, + enabled: droidSettings.enabled, + checkedAt, + models: disabledModels, + probe: { + installed: !isCommandMissingCause(error), + version: null, + status: "error", + auth: { status: "unknown" }, + message: isCommandMissingCause(error) + ? "Droid CLI (`droid`) is not installed or not on PATH." + : `Failed to execute Droid CLI health check: ${error instanceof Error ? error.message : String(error)}.`, + }, + }); + } + + if (Option.isNone(versionProbe.success)) { + return buildServerProvider({ + provider: PROVIDER, + enabled: droidSettings.enabled, + checkedAt, + models: disabledModels, + probe: { + installed: true, + version: null, + status: "error", + auth: { status: "unknown" }, + message: "Droid CLI is installed but timed out while running.", + }, + }); + } + + const version = Option.getOrThrow(versionProbe.success); + const parsedVersion = parseGenericCliVersion(`${version.stdout}\n${version.stderr}`); + + if (version.code !== 0) { + const detail = detailFromResult(version); + return buildServerProvider({ + provider: PROVIDER, + enabled: droidSettings.enabled, + checkedAt, + models: disabledModels, + probe: { + installed: true, + version: parsedVersion, + status: "error", + auth: { status: "unknown" }, + message: detail + ? `Droid CLI is installed but failed to run. ${detail}` + : "Droid CLI is installed but failed to run.", + }, + }); + } + + const helpProbe = yield* runDroidCommand(["exec", "--help"]).pipe( + Effect.timeoutOption(DEFAULT_TIMEOUT_MS), + Effect.result, + ); + + let builtInModels: ReadonlyArray = FALLBACK_MODELS; + if (Result.isSuccess(helpProbe) && Option.isSome(helpProbe.success)) { + const helpResult = Option.getOrThrow(helpProbe.success); + builtInModels = parseModelsFromHelp(`${helpResult.stdout}\n${helpResult.stderr}`); + } + + const models = providerModelsFromSettings(builtInModels, PROVIDER, droidSettings.customModels); + + return buildServerProvider({ + provider: PROVIDER, + enabled: droidSettings.enabled, + checkedAt, + models, + probe: { + installed: true, + version: parsedVersion, + status: "ready", + auth: { status: "authenticated" }, + }, + }); +}); + +export const DroidProviderLive = Layer.effect( + DroidProvider, + Effect.gen(function* () { + const serverSettings = yield* ServerSettingsService; + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; + + const checkProvider = checkDroidProviderStatus.pipe( + Effect.provideService(ServerSettingsService, serverSettings), + Effect.provideService(ChildProcessSpawner.ChildProcessSpawner, spawner), + ); + + return yield* makeManagedServerProvider({ + getSettings: serverSettings.getSettings.pipe( + Effect.map((settings) => settings.providers.droid), + Effect.orDie, + ), + streamSettings: serverSettings.streamChanges.pipe( + Stream.map((settings) => settings.providers.droid), + ), + haveSettingsChanged: (previous, next) => !Equal.equals(previous, next), + checkProvider, + }); + }), +); diff --git a/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts b/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts index db0293f0fe..007d05ed82 100644 --- a/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts @@ -6,6 +6,7 @@ import { Effect, Layer, Stream } from "effect"; import { ClaudeAdapter, ClaudeAdapterShape } from "../Services/ClaudeAdapter.ts"; import { CodexAdapter, CodexAdapterShape } from "../Services/CodexAdapter.ts"; +import { DroidAdapter, DroidAdapterShape } from "../Services/DroidAdapter.ts"; import { ProviderAdapterRegistry } from "../Services/ProviderAdapterRegistry.ts"; import { ProviderAdapterRegistryLive } from "./ProviderAdapterRegistry.ts"; import { ProviderUnsupportedError } from "../Errors.ts"; @@ -45,6 +46,23 @@ const fakeClaudeAdapter: ClaudeAdapterShape = { streamEvents: Stream.empty, }; +const fakeDroidAdapter: DroidAdapterShape = { + provider: "droid", + capabilities: { sessionModelSwitch: "restart-session" }, + startSession: vi.fn(), + sendTurn: vi.fn(), + interruptTurn: vi.fn(), + respondToRequest: vi.fn(), + respondToUserInput: vi.fn(), + stopSession: vi.fn(), + listSessions: vi.fn(), + hasSession: vi.fn(), + readThread: vi.fn(), + rollbackThread: vi.fn(), + stopAll: vi.fn(), + streamEvents: Stream.empty, +}; + const layer = it.layer( Layer.mergeAll( Layer.provide( @@ -52,6 +70,7 @@ const layer = it.layer( Layer.mergeAll( Layer.succeed(CodexAdapter, fakeCodexAdapter), Layer.succeed(ClaudeAdapter, fakeClaudeAdapter), + Layer.succeed(DroidAdapter, fakeDroidAdapter), ), ), NodeServices.layer, @@ -68,7 +87,7 @@ layer("ProviderAdapterRegistryLive", (it) => { assert.equal(claude, fakeClaudeAdapter); const providers = yield* registry.listProviders(); - assert.deepEqual(providers, ["codex", "claudeAgent"]); + assert.deepEqual(providers, ["codex", "claudeAgent", "droid"]); }), ); diff --git a/apps/server/src/provider/Layers/ProviderAdapterRegistry.ts b/apps/server/src/provider/Layers/ProviderAdapterRegistry.ts index 23ef8d1b9b..9822089db2 100644 --- a/apps/server/src/provider/Layers/ProviderAdapterRegistry.ts +++ b/apps/server/src/provider/Layers/ProviderAdapterRegistry.ts @@ -17,6 +17,7 @@ import { } from "../Services/ProviderAdapterRegistry.ts"; import { ClaudeAdapter } from "../Services/ClaudeAdapter.ts"; import { CodexAdapter } from "../Services/CodexAdapter.ts"; +import { DroidAdapter } from "../Services/DroidAdapter.ts"; export interface ProviderAdapterRegistryLiveOptions { readonly adapters?: ReadonlyArray>; @@ -27,7 +28,7 @@ const makeProviderAdapterRegistry = (options?: ProviderAdapterRegistryLiveOption const adapters = options?.adapters !== undefined ? options.adapters - : [yield* CodexAdapter, yield* ClaudeAdapter]; + : [yield* CodexAdapter, yield* ClaudeAdapter, yield* DroidAdapter]; const byProvider = new Map(adapters.map((adapter) => [adapter.provider, adapter])); const getByProvider: ProviderAdapterRegistryShape["getByProvider"] = (provider) => { diff --git a/apps/server/src/provider/Layers/ProviderRegistry.ts b/apps/server/src/provider/Layers/ProviderRegistry.ts index 1e66ce8ff5..fd8c856c2b 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.ts @@ -8,17 +8,21 @@ import { Effect, Equal, Layer, PubSub, Ref, Stream } from "effect"; import { ClaudeProviderLive } from "./ClaudeProvider"; import { CodexProviderLive } from "./CodexProvider"; +import { DroidProviderLive } from "./DroidProvider"; import type { ClaudeProviderShape } from "../Services/ClaudeProvider"; import { ClaudeProvider } from "../Services/ClaudeProvider"; import type { CodexProviderShape } from "../Services/CodexProvider"; import { CodexProvider } from "../Services/CodexProvider"; +import type { DroidProviderShape } from "../Services/DroidProvider"; +import { DroidProvider } from "../Services/DroidProvider"; import { ProviderRegistry, type ProviderRegistryShape } from "../Services/ProviderRegistry"; const loadProviders = ( codexProvider: CodexProviderShape, claudeProvider: ClaudeProviderShape, -): Effect.Effect => - Effect.all([codexProvider.getSnapshot, claudeProvider.getSnapshot], { + droidProvider: DroidProviderShape, +): Effect.Effect => + Effect.all([codexProvider.getSnapshot, claudeProvider.getSnapshot, droidProvider.getSnapshot], { concurrency: "unbounded", }); @@ -32,18 +36,19 @@ export const ProviderRegistryLive = Layer.effect( Effect.gen(function* () { const codexProvider = yield* CodexProvider; const claudeProvider = yield* ClaudeProvider; + const droidProvider = yield* DroidProvider; const changesPubSub = yield* Effect.acquireRelease( PubSub.unbounded>(), PubSub.shutdown, ); const providersRef = yield* Ref.make>( - yield* loadProviders(codexProvider, claudeProvider), + yield* loadProviders(codexProvider, claudeProvider, droidProvider), ); const syncProviders = (options?: { readonly publish?: boolean }) => Effect.gen(function* () { const previousProviders = yield* Ref.get(providersRef); - const providers = yield* loadProviders(codexProvider, claudeProvider); + const providers = yield* loadProviders(codexProvider, claudeProvider, droidProvider); yield* Ref.set(providersRef, providers); if (options?.publish !== false && haveProvidersChanged(previousProviders, providers)) { @@ -59,6 +64,9 @@ export const ProviderRegistryLive = Layer.effect( yield* Stream.runForEach(claudeProvider.streamChanges, () => syncProviders()).pipe( Effect.forkScoped, ); + yield* Stream.runForEach(droidProvider.streamChanges, () => syncProviders()).pipe( + Effect.forkScoped, + ); return { getProviders: syncProviders({ publish: false }).pipe( @@ -74,10 +82,14 @@ export const ProviderRegistryLive = Layer.effect( case "claudeAgent": yield* claudeProvider.refresh; break; + case "droid": + yield* droidProvider.refresh; + break; default: - yield* Effect.all([codexProvider.refresh, claudeProvider.refresh], { - concurrency: "unbounded", - }); + yield* Effect.all( + [codexProvider.refresh, claudeProvider.refresh, droidProvider.refresh], + { concurrency: "unbounded" }, + ); break; } return yield* syncProviders(); @@ -90,4 +102,8 @@ export const ProviderRegistryLive = Layer.effect( }, } satisfies ProviderRegistryShape; }), -).pipe(Layer.provideMerge(CodexProviderLive), Layer.provideMerge(ClaudeProviderLive)); +).pipe( + Layer.provideMerge(CodexProviderLive), + Layer.provideMerge(ClaudeProviderLive), + Layer.provideMerge(DroidProviderLive), +); diff --git a/apps/server/src/provider/Layers/ProviderSessionDirectory.test.ts b/apps/server/src/provider/Layers/ProviderSessionDirectory.test.ts index d23b247f21..6408aeddd6 100644 --- a/apps/server/src/provider/Layers/ProviderSessionDirectory.test.ts +++ b/apps/server/src/provider/Layers/ProviderSessionDirectory.test.ts @@ -163,6 +163,30 @@ it.layer(makeDirectoryLayer(SqlitePersistenceMemory))("ProviderSessionDirectoryL } })); + it("reads persisted droid bindings", () => + Effect.gen(function* () { + const directory = yield* ProviderSessionDirectory; + const runtimeRepository = yield* ProviderSessionRuntimeRepository; + const threadId = ThreadId.makeUnsafe("thread-droid"); + + yield* runtimeRepository.upsert({ + threadId, + providerName: "droid", + adapterKey: "droid", + runtimeMode: "full-access", + status: "running", + lastSeenAt: new Date().toISOString(), + resumeCursor: null, + runtimePayload: null, + }); + + const binding = yield* directory.getBinding(threadId); + assertSome(binding, { + threadId, + provider: "droid", + }); + })); + it("rehydrates persisted mappings across layer restart", () => Effect.gen(function* () { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-directory-")); diff --git a/apps/server/src/provider/Layers/ProviderSessionDirectory.ts b/apps/server/src/provider/Layers/ProviderSessionDirectory.ts index 961c63d696..75f4b455aa 100644 --- a/apps/server/src/provider/Layers/ProviderSessionDirectory.ts +++ b/apps/server/src/provider/Layers/ProviderSessionDirectory.ts @@ -22,7 +22,7 @@ function decodeProviderKind( providerName: string, operation: string, ): Effect.Effect { - if (providerName === "codex" || providerName === "claudeAgent") { + if (providerName === "codex" || providerName === "claudeAgent" || providerName === "droid") { return Effect.succeed(providerName); } return Effect.fail( diff --git a/apps/server/src/provider/Services/DroidAdapter.ts b/apps/server/src/provider/Services/DroidAdapter.ts new file mode 100644 index 0000000000..9d845a6a3e --- /dev/null +++ b/apps/server/src/provider/Services/DroidAdapter.ts @@ -0,0 +1,21 @@ +/** + * DroidAdapter - Droid/ACP implementation of the generic provider adapter contract. + * + * This service owns ACP protocol semantics (JSON-RPC 2.0 over stdio) and emits + * canonical provider runtime events. It does not perform cross-provider routing, + * shared event fan-out, or checkpoint orchestration. + * + * @module DroidAdapter + */ +import { ServiceMap } from "effect"; + +import type { ProviderAdapterError } from "../Errors.ts"; +import type { ProviderAdapterShape } from "./ProviderAdapter.ts"; + +export interface DroidAdapterShape extends ProviderAdapterShape { + readonly provider: "droid"; +} + +export class DroidAdapter extends ServiceMap.Service()( + "t3/provider/Services/DroidAdapter", +) {} diff --git a/apps/server/src/provider/Services/DroidProvider.ts b/apps/server/src/provider/Services/DroidProvider.ts new file mode 100644 index 0000000000..364ebb7563 --- /dev/null +++ b/apps/server/src/provider/Services/DroidProvider.ts @@ -0,0 +1,9 @@ +import { ServiceMap } from "effect"; + +import type { ServerProviderShape } from "./ServerProvider"; + +export interface DroidProviderShape extends ServerProviderShape {} + +export class DroidProvider extends ServiceMap.Service()( + "t3/provider/Services/DroidProvider", +) {} diff --git a/apps/server/src/provider/acpCore.ts b/apps/server/src/provider/acpCore.ts new file mode 100644 index 0000000000..d3317f730b --- /dev/null +++ b/apps/server/src/provider/acpCore.ts @@ -0,0 +1,280 @@ +import type { ProviderKind, ProviderSession, ThreadId, TurnId } from "@t3tools/contracts"; +import { Effect, Schema } from "effect"; +import * as ChildProcess from "node:child_process"; +import * as readline from "node:readline"; + +import { ProviderAdapterProcessError, ProviderAdapterRequestError } from "./Errors.ts"; + +export const ACP_VERSION = 1; + +export interface JsonRpcRequest { + readonly jsonrpc: "2.0"; + readonly id: number; + readonly method: string; + readonly params?: unknown; +} + +export interface JsonRpcNotification { + readonly jsonrpc: "2.0"; + readonly method: string; + readonly params?: unknown; +} + +export interface JsonRpcResponse { + readonly jsonrpc: "2.0"; + readonly id: number; + readonly result?: unknown; + readonly error?: { readonly code: number; readonly message: string; readonly data?: unknown }; +} + +export type JsonRpcMessage = JsonRpcRequest | JsonRpcNotification | JsonRpcResponse; + +export interface AcpSessionState { + readonly threadId: ThreadId; + readonly process: ChildProcess.ChildProcess; + readonly rl: readline.Interface; + readonly pendingRequests: Map< + number, + { + resolve: (value: unknown) => void; + reject: (error: Error) => void; + } + >; + nextId: number; + acpSessionId: string | null; + activeTurnId: TurnId | null; + status: ProviderSession["status"]; + runtimeMode: ProviderSession["runtimeMode"]; + cwd: string | undefined; + readonly model?: string; + createdAt: string; +} + +function runDetachedEffect(effect: Effect.Effect, label: string): void { + Effect.runPromise(effect).catch((cause) => { + console.error(`[acpCore] ${label} failed`, cause); + }); +} + +function isResponse(message: JsonRpcMessage): message is JsonRpcResponse { + return "id" in message && !("method" in message); +} + +function isNotification(message: JsonRpcMessage): message is JsonRpcNotification { + return "method" in message && !("id" in message); +} + +function sendMessage( + session: AcpSessionState, + message: JsonRpcRequest | JsonRpcNotification, +): void { + session.process.stdin?.write(`${JSON.stringify(message)}\n`); +} + +export function sendAcpRequest( + session: AcpSessionState, + method: string, + params?: unknown, +): Promise { + const id = session.nextId++; + return new Promise((resolve, reject) => { + session.pendingRequests.set(id, { resolve, reject }); + sendMessage(session, { jsonrpc: "2.0", id, method, params }); + }); +} + +export function sendAcpNotification( + session: AcpSessionState, + method: string, + params?: unknown, +): void { + sendMessage(session, { jsonrpc: "2.0", method, params }); +} + +export function spawnAcpProcessSession(input: { + provider: ProviderKind; + threadId: ThreadId; + binaryPath: string; + args: ReadonlyArray; + cwd: string; + runtimeMode: ProviderSession["runtimeMode"]; + model?: string; +}): Effect.Effect { + return Effect.tryPromise({ + try: async () => { + const child = ChildProcess.spawn(input.binaryPath, [...input.args], { + stdio: ["pipe", "pipe", "pipe"], + cwd: input.cwd, + env: { ...process.env }, + }); + + if (!child.stdin || !child.stdout) { + child.kill(); + throw new ProviderAdapterProcessError({ + provider: input.provider, + threadId: input.threadId, + detail: "Failed to spawn ACP process: missing stdio", + }); + } + + await new Promise((resolve, reject) => { + const onError = (cause: Error) => { + child.off("spawn", onSpawn); + reject(cause); + }; + const onSpawn = () => { + child.off("error", onError); + resolve(); + }; + + child.once("error", onError); + child.once("spawn", onSpawn); + }); + + return { + threadId: input.threadId, + process: child, + rl: readline.createInterface({ input: child.stdout, crlfDelay: Infinity }), + pendingRequests: new Map(), + nextId: 1, + acpSessionId: null, + activeTurnId: null, + status: "connecting", + runtimeMode: input.runtimeMode, + cwd: input.cwd, + ...(input.model ? { model: input.model } : {}), + createdAt: new Date().toISOString(), + } satisfies AcpSessionState; + }, + catch: (cause) => + Schema.is(ProviderAdapterProcessError)(cause) + ? cause + : new ProviderAdapterProcessError({ + provider: input.provider, + threadId: input.threadId, + detail: cause instanceof Error ? cause.message : String(cause), + cause, + }), + }); +} + +export function wireAcpProcessMessages(input: { + session: AcpSessionState; + onNotification: (method: string, params: unknown) => Effect.Effect; + onUnhandledNotification?: (method: string) => Effect.Effect; + onUnhandledMessage?: (message: JsonRpcMessage) => Effect.Effect; + onExit?: () => Effect.Effect; +}): void { + input.session.rl.on("line", (line: string) => { + if (!line.trim()) { + return; + } + + let message: JsonRpcMessage; + try { + message = JSON.parse(line) as JsonRpcMessage; + } catch { + return; + } + + if (isResponse(message)) { + const pending = input.session.pendingRequests.get(message.id); + if (!pending) { + return; + } + input.session.pendingRequests.delete(message.id); + if (message.error) { + pending.reject(new Error(message.error.message)); + } else { + pending.resolve(message.result); + } + return; + } + + if (isNotification(message)) { + if (message.method === "session/update") { + runDetachedEffect(input.onNotification(message.method, message.params), "session/update"); + return; + } + if (input.onUnhandledNotification) { + runDetachedEffect(input.onUnhandledNotification(message.method), "unhandled notification"); + } + return; + } + + if (input.onUnhandledMessage) { + runDetachedEffect(input.onUnhandledMessage(message), "unhandled message"); + } + }); + + input.session.process.on("exit", () => { + input.session.status = "closed"; + for (const [, pending] of input.session.pendingRequests) { + pending.reject(new Error("ACP process exited")); + } + input.session.pendingRequests.clear(); + if (input.onExit) { + runDetachedEffect(input.onExit(), "process exit"); + } + }); + + input.session.process.stderr?.on("data", () => {}); +} + +export function initializeAcpSession(input: { + provider: ProviderKind; + session: AcpSessionState; + clientName: string; + clientVersion: string; + capabilities?: Record; +}): Effect.Effect | undefined, ProviderAdapterProcessError> { + return Effect.tryPromise({ + try: async () => { + const result = (await sendAcpRequest(input.session, "initialize", { + protocolVersion: ACP_VERSION, + client_info: { name: input.clientName, version: input.clientVersion }, + capabilities: input.capabilities ?? {}, + })) as Record | undefined; + sendAcpNotification(input.session, "initialized"); + return result; + }, + catch: (cause) => + new ProviderAdapterProcessError({ + provider: input.provider, + threadId: input.session.threadId, + detail: `ACP initialization failed: ${cause instanceof Error ? cause.message : String(cause)}`, + cause, + }), + }); +} + +export function createAcpRemoteSession(input: { + provider: ProviderKind; + session: AcpSessionState; + cwd: string; + mcpServers?: ReadonlyArray; +}): Effect.Effect<{ sessionId?: string } | undefined, ProviderAdapterRequestError> { + return Effect.tryPromise({ + try: async () => { + const result = (await sendAcpRequest(input.session, "session/new", { + cwd: input.cwd, + mcpServers: input.mcpServers ?? [], + })) as { sessionId?: string } | undefined; + input.session.acpSessionId = result?.sessionId ?? null; + return result; + }, + catch: (cause) => + new ProviderAdapterRequestError({ + provider: input.provider, + method: "session/new", + detail: cause instanceof Error ? cause.message : String(cause), + cause, + }), + }); +} + +export function stopAcpProcessSession(session: AcpSessionState): void { + session.rl.close(); + session.process.kill(); + session.status = "closed"; +} diff --git a/apps/server/src/provider/acpRuntimeBridge.test.ts b/apps/server/src/provider/acpRuntimeBridge.test.ts new file mode 100644 index 0000000000..8f093daff5 --- /dev/null +++ b/apps/server/src/provider/acpRuntimeBridge.test.ts @@ -0,0 +1,120 @@ +import { EventId, RuntimeItemId, ThreadId, TurnId } from "@t3tools/contracts"; +import { Effect } from "effect"; +import { describe, expect, it } from "vitest"; + +import { makeAcpRuntimeBridge } from "./acpRuntimeBridge.ts"; + +describe("acpRuntimeBridge", () => { + it("preserves whitespace in assistant text deltas", async () => { + const events: Array = []; + let stampCount = 0; + let itemCount = 0; + const bridge = makeAcpRuntimeBridge({ + provider: "droid", + logLabel: "[test]", + makeStamp: () => + Effect.succeed({ + eventId: EventId.makeUnsafe(`evt-${++stampCount}`), + createdAt: "2026-03-30T00:00:00.000Z", + }), + nextItemId: Effect.sync(() => RuntimeItemId.makeUnsafe(`item-${++itemCount}`)), + offerEvent: (event) => Effect.sync(() => void events.push(event)), + }); + + const session = { + threadId: ThreadId.makeUnsafe("thread-acp-spaces"), + activeTurnId: TurnId.makeUnsafe("turn-acp-spaces"), + }; + + await Effect.runPromise( + bridge.handleSessionUpdate(session, { + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Here" }, + }, + }), + ); + await Effect.runPromise( + bridge.handleSessionUpdate(session, { + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "'s what the subagent found" }, + }, + }), + ); + + const deltas = events.flatMap((event) => + typeof event === "object" && + event !== null && + "type" in event && + event.type === "content.delta" && + "payload" in event && + typeof event.payload === "object" && + event.payload !== null && + "delta" in event.payload + ? [event.payload.delta] + : [], + ); + + expect(deltas).toEqual(["Here", "'s what the subagent found"]); + expect(deltas.join("")).toBe("Here's what the subagent found"); + }); + + it("preserves whitespace when summarizing tool call output", async () => { + const events: Array = []; + let stampCount = 0; + let itemCount = 0; + const bridge = makeAcpRuntimeBridge({ + provider: "droid", + logLabel: "[test]", + makeStamp: () => + Effect.succeed({ + eventId: EventId.makeUnsafe(`evt-${++stampCount}`), + createdAt: "2026-03-30T00:00:00.000Z", + }), + nextItemId: Effect.sync(() => RuntimeItemId.makeUnsafe(`item-${++itemCount}`)), + offerEvent: (event) => Effect.sync(() => void events.push(event)), + }); + + const session = { + threadId: ThreadId.makeUnsafe("thread-acp-tool-spaces"), + activeTurnId: TurnId.makeUnsafe("turn-acp-tool-spaces"), + }; + + await Effect.runPromise( + bridge.handleSessionUpdate(session, { + update: { + sessionUpdate: "tool_call", + toolCallId: "tool-call-1", + title: "Task", + rawInput: { subagent_type: "explore", description: "Read files" }, + }, + }), + ); + await Effect.runPromise( + bridge.handleSessionUpdate(session, { + update: { + sessionUpdate: "tool_call_update", + toolCallId: "tool-call-1", + content: [ + { text: "Here's" }, + { text: " what the subagent found:" }, + { text: "\n- package.json: The t3 package" }, + ], + }, + }), + ); + + const updatedEvent = events.find( + (event) => + typeof event === "object" && + event !== null && + "type" in event && + event.type === "item.updated", + ) as { payload: { detail?: string } } | undefined; + + expect(updatedEvent?.payload.detail).toBe( + "Here's what the subagent found:\n- package.json: The t3 package", + ); + }); +}); diff --git a/apps/server/src/provider/acpRuntimeBridge.ts b/apps/server/src/provider/acpRuntimeBridge.ts new file mode 100644 index 0000000000..a3d25b592f --- /dev/null +++ b/apps/server/src/provider/acpRuntimeBridge.ts @@ -0,0 +1,613 @@ +import type { + CanonicalItemType, + EventId, + ProviderKind, + ProviderRuntimeEvent, + RuntimeContentStreamKind, + RuntimeItemId, + ThreadId, + TurnId, +} from "@t3tools/contracts"; +import { RuntimeTaskId } from "@t3tools/contracts"; +import { Effect, Random } from "effect"; + +import { classifyToolItemType, summarizeToolRequest, titleForTool } from "./toolCallMetadata.ts"; + +export interface AcpRuntimeSessionState { + readonly threadId: ThreadId; + activeTurnId: TurnId | null; +} + +interface AcpToolCallState { + readonly key: string; + readonly threadId: ThreadId; + readonly turnId: TurnId; + readonly itemId: RuntimeItemId; + readonly taskId: RuntimeTaskId | null; + readonly itemType: CanonicalItemType; + readonly title: string; + readonly toolName: string; + readonly input: unknown; + readonly taskDescription: string; + detail?: string; + lastTaskSummary?: string; +} + +function asRecord(value: unknown): Record | undefined { + return value !== null && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : undefined; +} + +function asTrimmedString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +function asNonEmptyString(value: unknown): string | undefined { + return typeof value === "string" && value.length > 0 ? value : undefined; +} + +function inferAcpToolName(title: unknown, rawInput: unknown): string { + const input = asRecord(rawInput); + const candidates = [ + title, + input?.toolName, + input?.tool_name, + input?.name, + input?.tool, + input?.type, + ]; + + for (const candidate of candidates) { + const value = asTrimmedString(candidate); + if (value) { + return value; + } + } + + if (asTrimmedString(input?.subagent_type)) { + return "Task"; + } + + return "Tool"; +} + +function summarizeAcpToolInput(toolName: string, rawInput: unknown): string | undefined { + const input = asRecord(rawInput); + if (input) { + return summarizeToolRequest(toolName, input); + } + + if (rawInput === undefined) { + return undefined; + } + + const serialized = JSON.stringify(rawInput); + if (!serialized || serialized === "{}") { + return undefined; + } + if (serialized.length <= 400) { + return `${toolName}: ${serialized}`; + } + return `${toolName}: ${serialized.slice(0, 397)}...`; +} + +function taskDescriptionFromToolInput( + toolName: string, + rawInput: unknown, + detail?: string, +): string { + const input = asRecord(rawInput); + return ( + asTrimmedString(input?.description) ?? + asTrimmedString(input?.prompt) ?? + asTrimmedString(input?.instructions) ?? + detail ?? + `${titleForTool(classifyToolItemType(toolName, input))} in progress` + ); +} + +function collectAcpToolOutputDeltas( + rawOutput: unknown, + content: ReadonlyArray | undefined, +): string[] { + const deltas: string[] = []; + const rawOutputText = asNonEmptyString(asRecord(rawOutput)?.text); + if (rawOutputText) { + deltas.push(rawOutputText); + } + for (const chunk of content ?? []) { + const chunkRecord = asRecord(chunk); + if (!chunkRecord) { + continue; + } + const directText = asNonEmptyString(chunkRecord.text); + if (directText) { + deltas.push(directText); + continue; + } + const nestedText = asNonEmptyString(asRecord(chunkRecord.content)?.text); + if (nestedText) { + deltas.push(nestedText); + } + } + return deltas; +} + +function toolCallKey(threadId: ThreadId, toolCallId: string): string { + return `${threadId}:tc:${toolCallId}`; +} + +function toolCallData(toolName: string, input: unknown): { toolName: string; input?: unknown } { + return { + toolName, + ...(input !== undefined ? { input } : {}), + }; +} + +export function makeAcpRuntimeBridge(input: { + provider: ProviderKind; + logLabel: string; + makeStamp: () => Effect.Effect<{ eventId: EventId; createdAt: string }>; + nextItemId: Effect.Effect; + offerEvent: (event: ProviderRuntimeEvent) => Effect.Effect; +}) { + const runtimeItemIds = new Map(); + const toolCalls = new Map(); + + const runtimeEvent = (event: unknown): ProviderRuntimeEvent => event as ProviderRuntimeEvent; + + const emitContentDelta = ( + threadId: ThreadId, + turnId: TurnId, + itemId: RuntimeItemId, + text: string, + streamKind: RuntimeContentStreamKind, + ) => + Effect.gen(function* () { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type: "content.delta", + eventId: stamp.eventId, + provider: input.provider, + threadId, + turnId, + itemId, + createdAt: stamp.createdAt, + payload: { streamKind, delta: text }, + }), + ); + }); + + const emitItemEvent = ( + type: "item.started" | "item.updated" | "item.completed", + threadId: ThreadId, + turnId: TurnId, + itemId: RuntimeItemId, + itemType: string, + status: "inProgress" | "completed" | "failed", + metadata?: { title?: string; detail?: string; data?: unknown }, + ) => + Effect.gen(function* () { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type, + eventId: stamp.eventId, + provider: input.provider, + threadId, + turnId, + itemId, + createdAt: stamp.createdAt, + payload: { + itemType, + status, + ...(metadata?.title ? { title: metadata.title } : {}), + ...(metadata?.detail ? { detail: metadata.detail } : {}), + ...(metadata?.data !== undefined ? { data: metadata.data } : {}), + }, + }), + ); + }); + + const emitTaskEvent = ( + type: "task.started" | "task.progress" | "task.completed", + threadId: ThreadId, + turnId: TurnId, + taskId: RuntimeTaskId, + payload: Record, + ) => + Effect.gen(function* () { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type, + eventId: stamp.eventId, + provider: input.provider, + threadId, + turnId, + createdAt: stamp.createdAt, + payload: { taskId, ...payload }, + }), + ); + }); + + const emitToolCallCompletion = ( + state: AcpToolCallState, + status: "completed" | "failed", + summary?: string, + ) => + Effect.gen(function* () { + const detail = summary ?? state.detail; + yield* emitItemEvent( + "item.completed", + state.threadId, + state.turnId, + state.itemId, + state.itemType, + status, + { + title: state.title, + ...(detail ? { detail } : {}), + data: toolCallData(state.toolName, state.input), + }, + ); + if (state.taskId) { + yield* emitTaskEvent("task.completed", state.threadId, state.turnId, state.taskId, { + status, + ...((summary ?? state.lastTaskSummary) + ? { summary: summary ?? state.lastTaskSummary } + : {}), + }); + } + runtimeItemIds.delete(state.key); + toolCalls.delete(state.key); + }); + + const closeOpenToolCallsForTurn = ( + threadId: ThreadId, + turnId: TurnId, + status: "completed" | "failed", + ) => + Effect.forEach( + Array.from(toolCalls.values()).filter( + (state) => state.threadId === threadId && state.turnId === turnId, + ), + (state) => emitToolCallCompletion(state, status), + { discard: true }, + ); + + const clearSessionState = (threadId: ThreadId) => + Effect.sync(() => { + const prefix = `${threadId}:`; + for (const key of runtimeItemIds.keys()) { + if (key.startsWith(prefix)) { + runtimeItemIds.delete(key); + } + } + for (const [key, state] of toolCalls.entries()) { + if (state.threadId === threadId) { + toolCalls.delete(key); + } + } + }); + + const completeOpenStreamItemsForTurn = (threadId: ThreadId, turnId: TurnId) => + Effect.gen(function* () { + const keys = [ + { key: `${threadId}:${turnId}:assistant`, itemType: "assistant_message" }, + { key: `${threadId}:${turnId}:thinking`, itemType: "reasoning" }, + ] as const; + + for (const entry of keys) { + const itemId = runtimeItemIds.get(entry.key); + if (!itemId) { + continue; + } + yield* emitItemEvent( + "item.completed", + threadId, + turnId, + itemId, + entry.itemType, + "completed", + ); + runtimeItemIds.delete(entry.key); + } + }); + + const handleSessionUpdate = (session: AcpRuntimeSessionState, params: Record) => + Effect.gen(function* () { + const threadId = session.threadId; + const turnId = session.activeTurnId; + if (!turnId) { + return; + } + + const update = asRecord(params.update); + if (!update) { + return; + } + + const kind = asTrimmedString(update.sessionUpdate); + + yield* Effect.logDebug(input.logLabel, { + kind, + threadId, + turnId, + ts: new Date().toISOString(), + ...(kind === "tool_call" || kind === "tool_call_update" + ? { + toolCallId: update.toolCallId ?? "", + status: update.status ?? "", + title: update.title ?? "", + } + : {}), + }); + + if (kind === "agent_message_chunk") { + const content = asRecord(update.content); + const contentType = asTrimmedString(content?.type); + const text = asNonEmptyString(content?.text); + if (contentType === "text" && text !== undefined) { + const key = `${threadId}:${turnId}:assistant`; + let itemId = runtimeItemIds.get(key); + if (!itemId) { + itemId = yield* input.nextItemId; + runtimeItemIds.set(key, itemId); + yield* emitItemEvent( + "item.started", + threadId, + turnId, + itemId, + "assistant_message", + "inProgress", + ); + } + yield* emitContentDelta(threadId, turnId, itemId, text, "assistant_text"); + } else if (contentType === "thinking" && text !== undefined) { + const key = `${threadId}:${turnId}:thinking`; + let itemId = runtimeItemIds.get(key); + if (!itemId) { + itemId = yield* input.nextItemId; + runtimeItemIds.set(key, itemId); + yield* emitItemEvent( + "item.started", + threadId, + turnId, + itemId, + "reasoning", + "inProgress", + ); + } + yield* emitContentDelta(threadId, turnId, itemId, text, "reasoning_text"); + } + return; + } + + if (kind === "tool_call") { + const tcId = asTrimmedString(update.toolCallId) ?? (yield* Random.nextUUIDv4); + const key = toolCallKey(threadId, tcId); + const rawInput = update.rawInput; + const inputRecord = asRecord(rawInput); + const toolName = inferAcpToolName(update.title, rawInput); + const itemType = classifyToolItemType(toolName, inputRecord); + const title = titleForTool(itemType); + const detail = summarizeAcpToolInput(toolName, rawInput); + + let state = toolCalls.get(key); + if (!state) { + const itemId = yield* input.nextItemId; + const taskId = + itemType === "collab_agent_tool_call" ? RuntimeTaskId.makeUnsafe(tcId) : null; + state = { + key, + threadId, + turnId, + itemId, + taskId, + itemType, + title, + toolName, + input: rawInput, + taskDescription: taskDescriptionFromToolInput(toolName, rawInput, detail), + ...(detail ? { detail } : {}), + } satisfies AcpToolCallState; + toolCalls.set(key, state); + runtimeItemIds.set(key, itemId); + yield* emitItemEvent("item.started", threadId, turnId, itemId, itemType, "inProgress", { + title, + ...(detail ? { detail } : {}), + data: toolCallData(toolName, rawInput), + }); + if (taskId) { + yield* emitTaskEvent("task.started", threadId, turnId, taskId, { + description: state.taskDescription, + ...(asTrimmedString(inputRecord?.subagent_type) + ? { taskType: asTrimmedString(inputRecord?.subagent_type) } + : {}), + }); + } + } + + if (update.status === "completed" || update.status === "failed") { + yield* emitToolCallCompletion(state, update.status); + } + return; + } + + if (kind === "tool_call_update") { + const tcId = asTrimmedString(update.toolCallId); + if (!tcId) { + return; + } + const key = toolCallKey(threadId, tcId); + const state = toolCalls.get(key); + if (!state) { + return; + } + + const deltas = collectAcpToolOutputDeltas( + update.rawOutput, + Array.isArray(update.content) ? update.content : undefined, + ); + const summary = deltas.join(""); + if (summary.length > 0) { + state.detail = summary; + yield* emitItemEvent( + "item.updated", + threadId, + turnId, + state.itemId, + state.itemType, + "inProgress", + { + title: state.title, + detail: summary, + data: toolCallData(state.toolName, state.input), + }, + ); + if (state.taskId) { + state.lastTaskSummary = summary; + yield* emitTaskEvent("task.progress", threadId, turnId, state.taskId, { + description: state.taskDescription, + summary, + lastToolName: state.toolName, + }); + } + } + + if (update.status === "completed" || update.status === "failed") { + yield* emitToolCallCompletion(state, update.status, summary || undefined); + } + return; + } + + if ((kind === "status" || kind === "error") && asTrimmedString(update.message)) { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type: "runtime.error", + eventId: stamp.eventId, + provider: input.provider, + threadId, + turnId, + createdAt: stamp.createdAt, + payload: { + class: "provider_error", + message: asTrimmedString(update.message), + }, + }), + ); + } + }); + + const emitSessionStarted = (threadId: ThreadId) => + Effect.gen(function* () { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type: "session.started", + eventId: stamp.eventId, + provider: input.provider, + threadId, + createdAt: stamp.createdAt, + payload: {}, + }), + ); + }); + + const emitTurnStarted = (threadId: ThreadId, turnId: TurnId, model?: string, effort?: string) => + Effect.gen(function* () { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type: "turn.started", + eventId: stamp.eventId, + provider: input.provider, + threadId, + turnId, + createdAt: stamp.createdAt, + payload: { + ...(model ? { model } : {}), + ...(effort ? { effort } : {}), + }, + }), + ); + }); + + const emitTurnCompleted = ( + threadId: ThreadId, + turnId: TurnId, + state: "completed" | "failed", + errorMessage?: string, + ) => + Effect.gen(function* () { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type: "turn.completed", + eventId: stamp.eventId, + provider: input.provider, + threadId, + turnId, + createdAt: stamp.createdAt, + payload: { + state, + ...(errorMessage ? { errorMessage } : {}), + }, + }), + ); + }); + + const emitRuntimeError = (threadId: ThreadId, turnId: TurnId | undefined, message: string) => + Effect.gen(function* () { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type: "runtime.error", + eventId: stamp.eventId, + provider: input.provider, + threadId, + turnId, + createdAt: stamp.createdAt, + payload: { class: "provider_error", message }, + }), + ); + }); + + const emitSessionExited = (threadId: ThreadId, reason?: string) => + Effect.gen(function* () { + const stamp = yield* input.makeStamp(); + yield* input.offerEvent( + runtimeEvent({ + type: "session.exited", + eventId: stamp.eventId, + provider: input.provider, + threadId, + createdAt: stamp.createdAt, + payload: { + ...(reason ? { reason } : {}), + exitKind: "graceful", + }, + }), + ); + }); + + return { + emitSessionStarted, + emitTurnStarted, + emitTurnCompleted, + emitRuntimeError, + emitSessionExited, + handleSessionUpdate, + closeOpenToolCallsForTurn, + completeOpenStreamItemsForTurn, + clearSessionState, + }; +} diff --git a/apps/server/src/provider/toolCallMetadata.test.ts b/apps/server/src/provider/toolCallMetadata.test.ts new file mode 100644 index 0000000000..794d49ab50 --- /dev/null +++ b/apps/server/src/provider/toolCallMetadata.test.ts @@ -0,0 +1,16 @@ +import { describe, expect, it } from "vitest"; + +import { classifyToolItemType, summarizeToolRequest, titleForTool } from "./toolCallMetadata.ts"; + +describe("toolCallMetadata", () => { + it("classifies task-style tool calls with subagent metadata as collaboration agents", () => { + expect(classifyToolItemType("Task", { subagent_type: "code-reviewer" })).toBe( + "collab_agent_tool_call", + ); + expect(titleForTool("collab_agent_tool_call")).toBe("Subagent task"); + }); + + it("summarizes command-style tool requests using the command text", () => { + expect(summarizeToolRequest("Bash", { command: "git status" })).toBe("Bash: git status"); + }); +}); diff --git a/apps/server/src/provider/toolCallMetadata.ts b/apps/server/src/provider/toolCallMetadata.ts new file mode 100644 index 0000000000..f91ea5d430 --- /dev/null +++ b/apps/server/src/provider/toolCallMetadata.ts @@ -0,0 +1,95 @@ +import type { CanonicalItemType } from "@t3tools/contracts"; + +function asTrimmedString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +export function classifyToolItemType( + toolName: string, + input?: Record, +): CanonicalItemType { + if (asTrimmedString(input?.subagent_type)) { + return "collab_agent_tool_call"; + } + + const normalized = toolName.toLowerCase(); + if (normalized.includes("agent")) { + return "collab_agent_tool_call"; + } + if ( + normalized === "task" || + normalized === "agent" || + normalized.includes("subagent") || + normalized.includes("sub-agent") + ) { + return "collab_agent_tool_call"; + } + if ( + normalized.includes("bash") || + normalized.includes("command") || + normalized.includes("shell") || + normalized.includes("terminal") + ) { + return "command_execution"; + } + if ( + normalized.includes("edit") || + normalized.includes("write") || + normalized.includes("file") || + normalized.includes("patch") || + normalized.includes("replace") || + normalized.includes("create") || + normalized.includes("delete") + ) { + return "file_change"; + } + if (normalized.includes("mcp")) { + return "mcp_tool_call"; + } + if (normalized.includes("websearch") || normalized.includes("web search")) { + return "web_search"; + } + if (normalized.includes("image")) { + return "image_view"; + } + return "dynamic_tool_call"; +} + +export function summarizeToolRequest(toolName: string, input: Record): string { + const commandValue = input.command ?? input.cmd; + const command = typeof commandValue === "string" ? commandValue : undefined; + if (command && command.trim().length > 0) { + return `${toolName}: ${command.trim().slice(0, 400)}`; + } + + const serialized = JSON.stringify(input); + if (serialized.length <= 400) { + return `${toolName}: ${serialized}`; + } + return `${toolName}: ${serialized.slice(0, 397)}...`; +} + +export function titleForTool(itemType: CanonicalItemType): string { + switch (itemType) { + case "command_execution": + return "Command run"; + case "file_change": + return "File change"; + case "mcp_tool_call": + return "MCP tool call"; + case "collab_agent_tool_call": + return "Subagent task"; + case "web_search": + return "Web search"; + case "image_view": + return "Image view"; + case "dynamic_tool_call": + return "Tool call"; + default: + return "Item"; + } +} diff --git a/apps/server/src/serverLayers.ts b/apps/server/src/serverLayers.ts index a8c1a13f7f..edf8634b3f 100644 --- a/apps/server/src/serverLayers.ts +++ b/apps/server/src/serverLayers.ts @@ -19,6 +19,7 @@ import { RuntimeReceiptBusLive } from "./orchestration/Layers/RuntimeReceiptBus" import { ProviderUnsupportedError } from "./provider/Errors"; import { makeClaudeAdapterLive } from "./provider/Layers/ClaudeAdapter"; import { makeCodexAdapterLive } from "./provider/Layers/CodexAdapter"; +import { DroidAdapterLive } from "./provider/Layers/DroidAdapter"; import { ProviderAdapterRegistryLive } from "./provider/Layers/ProviderAdapterRegistry"; import { makeProviderServiceLive } from "./provider/Layers/ProviderService"; import { ProviderSessionDirectoryLive } from "./provider/Layers/ProviderSessionDirectory"; @@ -81,6 +82,7 @@ export function makeServerProviderLayer(): Layer.Layer< const adapterRegistryLayer = ProviderAdapterRegistryLive.pipe( Layer.provide(codexAdapterLayer), Layer.provide(claudeAdapterLayer), + Layer.provide(DroidAdapterLive), Layer.provideMerge(providerSessionDirectoryLayer), ); return makeProviderServiceLive( diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 7562f845e2..468dcaf04a 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -129,7 +129,7 @@ import { resolveSelectableProvider, } from "../providerModels"; import { useSettings } from "../hooks/useSettings"; -import { resolveAppModelSelection } from "../modelSelection"; +import { buildModelSelection, resolveAppModelSelection } from "../modelSelection"; import { isTerminalFocused } from "../lib/terminalFocus"; import { type ComposerImageAttachment, @@ -815,11 +815,7 @@ export default function ChatView({ threadId }: ChatViewProps) { const selectedPromptEffort = composerProviderState.promptEffort; const selectedModelOptionsForDispatch = composerProviderState.modelOptionsForDispatch; const selectedModelSelection = useMemo( - () => ({ - provider: selectedProvider, - model: selectedModel, - ...(selectedModelOptionsForDispatch ? { options: selectedModelOptionsForDispatch } : {}), - }), + () => buildModelSelection(selectedProvider, selectedModel, selectedModelOptionsForDispatch), [selectedModel, selectedModelOptionsForDispatch, selectedProvider], ); const selectedModelForPicker = selectedModel; @@ -1178,6 +1174,7 @@ export default function ChatView({ threadId }: ChatViewProps) { codex: providerStatuses.find((provider) => provider.provider === "codex")?.models ?? [], claudeAgent: providerStatuses.find((provider) => provider.provider === "claudeAgent")?.models ?? [], + droid: providerStatuses.find((provider) => provider.provider === "droid")?.models ?? [], }), [providerStatuses], ); @@ -2758,14 +2755,13 @@ export default function ChatView({ threadId }: ChatViewProps) { } } const title = truncate(titleSeed); - const threadCreateModelSelection: ModelSelection = { - provider: selectedProvider, - model: - selectedModel || + const threadCreateModelSelection = buildModelSelection( + selectedProvider, + selectedModel || activeProject.defaultModelSelection?.model || DEFAULT_MODEL_BY_PROVIDER.codex, - ...(selectedModelSelection.options ? { options: selectedModelSelection.options } : {}), - }; + selectedModelSelection.options, + ); if (isLocalDraftThread) { await api.orchestration.dispatchCommand({ diff --git a/apps/web/src/components/Icons.tsx b/apps/web/src/components/Icons.tsx index 7d210fa173..6cd86fc43e 100644 --- a/apps/web/src/components/Icons.tsx +++ b/apps/web/src/components/Icons.tsx @@ -296,6 +296,15 @@ export const AntigravityIcon: Icon = (props) => ( ); +export const DroidIcon: Icon = (props) => ( + + + +); + export const OpenCodeIcon: Icon = (props) => ( diff --git a/apps/web/src/components/KeybindingsToast.browser.tsx b/apps/web/src/components/KeybindingsToast.browser.tsx index 224bd2f887..88a4e357e1 100644 --- a/apps/web/src/components/KeybindingsToast.browser.tsx +++ b/apps/web/src/components/KeybindingsToast.browser.tsx @@ -57,12 +57,13 @@ function createBaseServerConfig(): ServerConfig { ], availableEditors: [], settings: { - enableAssistantStreaming: false, + enableAssistantStreaming: true, defaultThreadEnvMode: "local" as const, textGenerationModelSelection: { provider: "codex" as const, model: "gpt-5.4-mini" }, providers: { codex: { enabled: true, binaryPath: "", homePath: "", customModels: [] }, claudeAgent: { enabled: true, binaryPath: "", customModels: [] }, + droid: { enabled: false, binaryPath: "", customModels: [] }, }, }, }; diff --git a/apps/web/src/components/chat/MessageCopyButton.browser.tsx b/apps/web/src/components/chat/MessageCopyButton.browser.tsx new file mode 100644 index 0000000000..050b43b82f --- /dev/null +++ b/apps/web/src/components/chat/MessageCopyButton.browser.tsx @@ -0,0 +1,41 @@ +import { page } from "vitest/browser"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { render } from "vitest-browser-react"; + +import { MessageCopyButton } from "./MessageCopyButton"; + +describe("MessageCopyButton", () => { + afterEach(() => { + document.body.innerHTML = ""; + vi.restoreAllMocks(); + }); + + it("falls back to execCommand when the Clipboard API rejects", async () => { + const originalClipboard = navigator.clipboard; + const originalExecCommand = document.execCommand.bind(document); + + Object.defineProperty(navigator, "clipboard", { + configurable: true, + value: { + writeText: vi.fn().mockRejectedValue(new Error("denied")), + }, + }); + const execCommand = vi.fn().mockReturnValue(true); + document.execCommand = execCommand; + + try { + await render(); + await page.getByRole("button").click(); + + await vi.waitFor(() => { + expect(execCommand).toHaveBeenCalledWith("copy"); + }); + } finally { + Object.defineProperty(navigator, "clipboard", { + configurable: true, + value: originalClipboard, + }); + document.execCommand = originalExecCommand; + } + }); +}); diff --git a/apps/web/src/components/chat/ProviderModelPicker.tsx b/apps/web/src/components/chat/ProviderModelPicker.tsx index 565a9d399d..346389f058 100644 --- a/apps/web/src/components/chat/ProviderModelPicker.tsx +++ b/apps/web/src/components/chat/ProviderModelPicker.tsx @@ -18,7 +18,7 @@ import { MenuSubTrigger, MenuTrigger, } from "../ui/menu"; -import { ClaudeAI, CursorIcon, Gemini, Icon, OpenAI, OpenCodeIcon } from "../Icons"; +import { ClaudeAI, CursorIcon, DroidIcon, Gemini, Icon, OpenAI, OpenCodeIcon } from "../Icons"; import { cn } from "~/lib/utils"; import { getProviderSnapshot } from "../../providerModels"; @@ -33,6 +33,7 @@ function isAvailableProviderOption(option: (typeof PROVIDER_OPTIONS)[number]): o const PROVIDER_ICON_BY_PROVIDER: Record = { codex: OpenAI, claudeAgent: ClaudeAI, + droid: DroidIcon, cursor: CursorIcon, }; diff --git a/apps/web/src/components/chat/TraitsPicker.browser.tsx b/apps/web/src/components/chat/TraitsPicker.browser.tsx index 74c22e6431..10d6412692 100644 --- a/apps/web/src/components/chat/TraitsPicker.browser.tsx +++ b/apps/web/src/components/chat/TraitsPicker.browser.tsx @@ -237,6 +237,42 @@ describe("TraitsPicker (Claude)", () => { }); }); + it("hides the trigger when no trait section can render", async () => { + const host = document.createElement("div"); + document.body.append(host); + const screen = await render( + {}} + />, + { container: host }, + ); + + try { + expect(document.querySelector("button")).toBeNull(); + } finally { + await screen.unmount(); + host.remove(); + } + }); + it("shows fast mode controls for Opus", async () => { await using _ = await mountClaudePicker(); diff --git a/apps/web/src/components/chat/TraitsPicker.tsx b/apps/web/src/components/chat/TraitsPicker.tsx index 061594ad53..b1849143a9 100644 --- a/apps/web/src/components/chat/TraitsPicker.tsx +++ b/apps/web/src/components/chat/TraitsPicker.tsx @@ -1,6 +1,7 @@ import { type ClaudeModelOptions, type CodexModelOptions, + type DroidModelOptions, type ProviderKind, type ProviderModelOptions, type ServerProviderModel, @@ -52,6 +53,9 @@ function getRawEffort( if (provider === "codex") { return trimOrNull((modelOptions as CodexModelOptions | undefined)?.reasoningEffort); } + if (provider === "droid") { + return trimOrNull((modelOptions as DroidModelOptions | undefined)?.effort); + } return trimOrNull((modelOptions as ClaudeModelOptions | undefined)?.effort); } @@ -73,9 +77,30 @@ function buildNextOptions( if (provider === "codex") { return { ...(modelOptions as CodexModelOptions | undefined), ...patch } as CodexModelOptions; } + if (provider === "droid") { + return { ...(modelOptions as DroidModelOptions | undefined), ...patch } as DroidModelOptions; + } return { ...(modelOptions as ClaudeModelOptions | undefined), ...patch } as ClaudeModelOptions; } +function hasVisibleTraitSections(input: { + effort: string | null; + thinkingEnabled: boolean | null; + supportsFastMode: boolean; + contextWindowOptions: ReadonlyArray<{ + value: string; + label: string; + isDefault?: boolean | undefined; + }>; +}): boolean { + return ( + input.effort !== null || + input.thinkingEnabled !== null || + input.supportsFastMode || + input.contextWindowOptions.length > 1 + ); +} + function getSelectedTraits( provider: ProviderKind, models: ReadonlyArray, @@ -138,6 +163,31 @@ function getSelectedTraits( }; } +export function shouldRenderTraitsPicker(input: { + provider: ProviderKind; + models: ReadonlyArray; + model: string | null | undefined; + prompt: string; + modelOptions: ProviderOptions | null | undefined; + allowPromptInjectedEffort?: boolean; +}): boolean { + const { effort, thinkingEnabled, caps, contextWindowOptions } = getSelectedTraits( + input.provider, + input.models, + input.model, + input.prompt, + input.modelOptions, + input.allowPromptInjectedEffort ?? true, + ); + + return hasVisibleTraitSections({ + effort, + thinkingEnabled, + supportsFastMode: caps.supportsFastMode, + contextWindowOptions, + }); +} + export interface TraitsMenuContentProps { provider: ProviderKind; models: ReadonlyArray; @@ -221,7 +271,14 @@ export const TraitsMenuContent = memo(function TraitsMenuContentImpl({ ], ); - if (effort === null && thinkingEnabled === null && contextWindowOptions.length <= 1) { + if ( + !hasVisibleTraitSections({ + effort, + thinkingEnabled, + supportsFastMode: caps.supportsFastMode, + contextWindowOptions, + }) + ) { return null; } @@ -366,6 +423,16 @@ export const TraitsPicker = memo(function TraitsPicker({ .join(" · "); const isCodexStyle = provider === "codex"; + const hasVisibleTraits = hasVisibleTraitSections({ + effort, + thinkingEnabled, + supportsFastMode: caps.supportsFastMode, + contextWindowOptions, + }); + + if (!hasVisibleTraits) { + return null; + } return ( {isCodexStyle ? ( - {triggerLabel} + {triggerLabel || "Traits"} ) : ( <> - {triggerLabel} + {triggerLabel || "Traits"}