diff --git a/packages/app/src/context/global-sync/event-reducer.ts b/packages/app/src/context/global-sync/event-reducer.ts index 241dfb14d7d..433c9f661a8 100644 --- a/packages/app/src/context/global-sync/event-reducer.ts +++ b/packages/app/src/context/global-sync/event-reducer.ts @@ -252,7 +252,12 @@ export function applyDirectoryEvent(input: { const part = draft[result.index] const field = props.field as keyof typeof part const existing = part[field] as string | undefined - ;(part[field] as string) = (existing ?? "") + props.delta + const MAX_PART_STRING_LENGTH = 1_048_576 // 1 MB per part field + const combined = (existing ?? "") + props.delta + ;(part[field] as string) = + combined.length > MAX_PART_STRING_LENGTH + ? combined.slice(combined.length - MAX_PART_STRING_LENGTH) + : combined }), ) break diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..1cf85d5d971 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -25,16 +25,18 @@ export namespace Bus { }, async (entry) => { const wildcard = entry.subscriptions.get("*") - if (!wildcard) return - const event = { - type: InstanceDisposed.type, - properties: { - directory: Instance.directory, - }, - } - for (const sub of [...wildcard]) { - sub(event) + if (wildcard) { + const event = { + type: InstanceDisposed.type, + properties: { + directory: Instance.directory, + }, + } + for (const sub of [...wildcard]) { + sub(event) + } } + entry.subscriptions.clear() }, ) diff --git a/packages/opencode/src/cli/cmd/tui/app.tsx b/packages/opencode/src/cli/cmd/tui/app.tsx index 97c910a47d4..01ae2678369 100644 --- a/packages/opencode/src/cli/cmd/tui/app.tsx +++ b/packages/opencode/src/cli/cmd/tui/app.tsx @@ -3,7 +3,7 @@ import { Clipboard } from "@tui/util/clipboard" import { Selection } from "@tui/util/selection" import { MouseButton, TextAttributes } from "@opentui/core" import { RouteProvider, useRoute } from "@tui/context/route" -import { Switch, Match, createEffect, untrack, ErrorBoundary, createSignal, onMount, batch, Show, on } from "solid-js" +import { Switch, Match, createEffect, untrack, ErrorBoundary, createSignal, onMount, onCleanup, batch, Show, on } from "solid-js" import { win32DisableProcessedInput, win32FlushInputBuffer, win32InstallCtrlCGuard } from "./win32" import { Installation } from "@/installation" import { Flag } from "@/flag/flag" @@ -673,11 +673,11 @@ function App() { } }) - sdk.event.on(TuiEvent.CommandExecute.type, (evt) => { + const unsub1 = sdk.event.on(TuiEvent.CommandExecute.type, (evt) => { command.trigger(evt.properties.command) }) - sdk.event.on(TuiEvent.ToastShow.type, (evt) => { + const unsub2 = sdk.event.on(TuiEvent.ToastShow.type, (evt) => { toast.show({ title: evt.properties.title, message: evt.properties.message, @@ -686,14 +686,14 @@ function App() { }) }) - sdk.event.on(TuiEvent.SessionSelect.type, (evt) => { + const unsub3 = sdk.event.on(TuiEvent.SessionSelect.type, (evt) => { route.navigate({ type: "session", sessionID: evt.properties.sessionID, }) }) - sdk.event.on(SessionApi.Event.Deleted.type, (evt) => { + const unsub4 = sdk.event.on(SessionApi.Event.Deleted.type, (evt) => { if (route.data.type === "session" && route.data.sessionID === evt.properties.info.id) { route.navigate({ type: "home" }) toast.show({ @@ -703,7 +703,7 @@ function App() { } }) - sdk.event.on(SessionApi.Event.Error.type, (evt) => { + const unsub5 = sdk.event.on(SessionApi.Event.Error.type, (evt) => { const error = evt.properties.error if (error && typeof error === "object" && error.name === "MessageAbortedError") return const message = (() => { @@ -725,7 +725,7 @@ function App() { }) }) - sdk.event.on(Installation.Event.UpdateAvailable.type, (evt) => { + const unsub6 = sdk.event.on(Installation.Event.UpdateAvailable.type, (evt) => { toast.show({ variant: "info", title: "Update Available", @@ -734,6 +734,15 @@ function App() { }) }) + onCleanup(() => { + unsub1() + unsub2() + unsub3() + unsub4() + unsub5() + unsub6() + }) + return ( { + const unsubPromptAppend = sdk.event.on(TuiEvent.PromptAppend.type, (evt) => { if (!input || input.isDestroyed) return input.insertText(evt.properties.text) setTimeout(() => { @@ -107,6 +107,7 @@ export function Prompt(props: PromptProps) { renderer.requestRender() }, 0) }) + onCleanup(unsubPromptAppend) createEffect(() => { if (props.disabled) input.cursorColor = theme.backgroundElement diff --git a/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx b/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx index 31401836766..e4783a04825 100644 --- a/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx +++ b/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx @@ -7,6 +7,7 @@ import { For, Match, on, + onCleanup, Show, Switch, useContext, @@ -31,6 +32,7 @@ import { Prompt, type PromptRef } from "@tui/component/prompt" import type { AssistantMessage, Part, ToolPart, UserMessage, TextPart, ReasoningPart } from "@opencode-ai/sdk/v2" import { useLocal } from "@tui/context/local" import { Locale } from "@/util/locale" +import { formatSize } from "@/util/format" import type { Tool } from "@/tool/tool" import type { ReadTool } from "@/tool/read" import type { WriteTool } from "@/tool/write" @@ -208,7 +210,7 @@ export function Session() { }) let lastSwitch: string | undefined = undefined - sdk.event.on("message.part.updated", (evt) => { + const unsubPartUpdated = sdk.event.on("message.part.updated", (evt) => { const part = evt.properties.part if (part.type !== "tool") return if (part.sessionID !== route.sessionID) return @@ -223,6 +225,7 @@ export function Session() { lastSwitch = part.id } }) + onCleanup(unsubPartUpdated) let scroll: ScrollBoxRenderable let prompt: PromptRef @@ -1737,6 +1740,14 @@ function Bash(props: ToolProps) { return [...lines().slice(0, 10), "…"].join("\n") }) + const filterInfo = createMemo(() => { + if (!props.metadata.filtered) return undefined + const total = formatSize(props.metadata.totalBytes ?? 0) + const omitted = formatSize(props.metadata.omittedBytes ?? 0) + const matches = props.metadata.matchCount ?? 0 + return `Filtered: ${matches} match${matches === 1 ? "" : "es"} from ${total} (${omitted} omitted)` + }) + const workdirDisplay = createMemo(() => { const workdir = props.input.workdir if (!workdir || workdir === ".") return undefined @@ -1776,6 +1787,9 @@ function Bash(props: ToolProps) { {limited()} + + {filterInfo()} + {expanded() ? "Click to collapse" : "Click to expand"} diff --git a/packages/opencode/src/cli/cmd/uninstall.ts b/packages/opencode/src/cli/cmd/uninstall.ts index 3d8e7e3f75e..2f39b661094 100644 --- a/packages/opencode/src/cli/cmd/uninstall.ts +++ b/packages/opencode/src/cli/cmd/uninstall.ts @@ -3,6 +3,7 @@ import { UI } from "../ui" import * as prompts from "@clack/prompts" import { Installation } from "../../installation" import { Global } from "../../global" +import { formatSize } from "../../util/format" import { $ } from "bun" import fs from "fs/promises" import path from "path" @@ -340,13 +341,6 @@ async function getDirectorySize(dir: string): Promise { return total } -function formatSize(bytes: number): string { - if (bytes < 1024) return `${bytes} B` - if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB` - if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MB` - return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)} GB` -} - function shortenPath(p: string): string { const home = os.homedir() if (p.startsWith(home)) { diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 9af79278c06..69975a39454 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -33,6 +33,7 @@ import path from "path" import { Global } from "./global" import { JsonMigration } from "./storage/json-migration" import { Database } from "./storage/db" +import { Instance } from "./project/instance" process.on("unhandledRejection", (e) => { Log.Default.error("rejection", { @@ -202,6 +203,12 @@ try { } process.exitCode = 1 } finally { + // Dispose all instance-scoped resources (LSP clients, bus subscriptions, PTY sessions, etc.) + try { + await Instance.disposeAll() + } catch { + // best-effort cleanup + } // Some subprocesses don't react properly to SIGTERM and similar signals. // Most notably, some docker-container-based MCP servers don't handle such signals unless // run using `docker run --init`. diff --git a/packages/opencode/src/lsp/client.ts b/packages/opencode/src/lsp/client.ts index 084ccf831ee..8f0adad97f0 100644 --- a/packages/opencode/src/lsp/client.ts +++ b/packages/opencode/src/lsp/client.ts @@ -48,6 +48,7 @@ export namespace LSPClient { new StreamMessageWriter(input.server.process.stdin as any), ) + const MAX_DIAGNOSTICS_FILES = 2_000 const diagnostics = new Map() connection.onNotification("textDocument/publishDiagnostics", (params) => { const filePath = Filesystem.normalizePath(fileURLToPath(params.uri)) @@ -56,6 +57,11 @@ export namespace LSPClient { count: params.diagnostics.length, }) const exists = diagnostics.has(filePath) + if (!exists && diagnostics.size >= MAX_DIAGNOSTICS_FILES) { + // Evict the oldest entry to stay within the size limit + const oldest = diagnostics.keys().next().value + if (oldest !== undefined) diagnostics.delete(oldest) + } diagnostics.set(filePath, params.diagnostics) if (!exists && input.serverID === "typescript") return Bus.publish(Event.Diagnostics, { path: filePath, serverID: input.serverID }) @@ -132,6 +138,7 @@ export namespace LSPClient { }) } + const MAX_OPEN_FILES = 1_000 const files: { [path: string]: number } = {} @@ -191,6 +198,11 @@ export namespace LSPClient { log.info("textDocument/didOpen", input) diagnostics.delete(input.path) + // Evict oldest tracked file if we're at the limit + if (Object.keys(files).length >= MAX_OPEN_FILES) { + const oldest = Object.keys(files)[0] + if (oldest) delete files[oldest] + } await connection.sendNotification("textDocument/didOpen", { textDocument: { uri: pathToFileURL(input.path).href, diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 3c29fe03d30..92ea0c3b3ac 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -150,6 +150,27 @@ export namespace MCP { // Store transports for OAuth servers to allow finishing auth type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport const pendingOAuthTransports = new Map() + const pendingOAuthTimers = new Map>() + const OAUTH_TRANSPORT_TTL_MS = 5 * 60 * 1_000 // 5 minutes + + function setPendingOAuthTransport(key: string, transport: TransportWithAuth) { + const existing = pendingOAuthTimers.get(key) + if (existing) clearTimeout(existing) + pendingOAuthTransports.set(key, transport) + const timer = setTimeout(() => { + pendingOAuthTransports.delete(key) + pendingOAuthTimers.delete(key) + log.info("evicted stale pending OAuth transport", { key }) + }, OAUTH_TRANSPORT_TTL_MS) + pendingOAuthTimers.set(key, timer) + } + + function deletePendingOAuthTransport(key: string) { + const timer = pendingOAuthTimers.get(key) + if (timer) clearTimeout(timer) + pendingOAuthTimers.delete(key) + pendingOAuthTransports.delete(key) + } // Prompt cache types type PromptInfo = Awaited>["prompts"][number] @@ -205,6 +226,8 @@ export namespace MCP { }), ), ) + for (const timer of pendingOAuthTimers.values()) clearTimeout(timer) + pendingOAuthTimers.clear() pendingOAuthTransports.clear() }, ) @@ -378,7 +401,7 @@ export namespace MCP { }).catch((e) => log.debug("failed to show toast", { error: e })) } else { // Store transport for later finishAuth call - pendingOAuthTransports.set(key, transport) + setPendingOAuthTransport(key, transport) status = { status: "needs_auth" as const } // Show toast for needs_auth Bus.publish(TuiEvent.ToastShow, { @@ -772,7 +795,7 @@ export namespace MCP { } catch (error) { if (error instanceof UnauthorizedError && capturedUrl) { // Store transport for finishAuth - pendingOAuthTransports.set(mcpName, transport) + setPendingOAuthTransport(mcpName, transport) return { authorizationUrl: capturedUrl.toString() } } throw error @@ -879,7 +902,7 @@ export namespace MCP { } // Re-add the MCP server to establish connection - pendingOAuthTransports.delete(mcpName) + deletePendingOAuthTransport(mcpName) const result = await add(mcpName, mcpConfig) const statusRecord = result.status as Record @@ -899,7 +922,7 @@ export namespace MCP { export async function removeAuth(mcpName: string): Promise { await McpAuth.remove(mcpName) McpOAuthCallback.cancelPending(mcpName) - pendingOAuthTransports.delete(mcpName) + deletePendingOAuthTransport(mcpName) await McpAuth.clearOAuthState(mcpName) log.info("removed oauth credentials", { mcpName }) } diff --git a/packages/opencode/src/permission/next.ts b/packages/opencode/src/permission/next.ts index 1e1df62a3ce..67eb7d9415a 100644 --- a/packages/opencode/src/permission/next.ts +++ b/packages/opencode/src/permission/next.ts @@ -283,4 +283,18 @@ export namespace PermissionNext { const s = await state() return Object.values(s.pending).map((x) => x.info) } + + export async function clearSession(sessionID: string) { + const s = await state() + for (const [id, pending] of Object.entries(s.pending)) { + if (pending.info.sessionID !== sessionID) continue + delete s.pending[id] + pending.reject(new RejectedError()) + Bus.publish(Event.Replied, { + sessionID, + requestID: id, + reply: "reject", + }) + } + } } diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index dee3fbc5429..a6cfb3b305e 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -84,7 +84,8 @@ export namespace Pty { interface ActiveSession { info: Info process: IPty - buffer: string + bufferChunks: string[] + bufferLen: number bufferCursor: number cursor: number subscribers: Map @@ -161,7 +162,8 @@ export namespace Pty { const session: ActiveSession = { info, process: ptyProcess, - buffer: "", + bufferChunks: [], + bufferLen: 0, bufferCursor: 0, cursor: 0, subscribers: new Map(), @@ -188,11 +190,13 @@ export namespace Pty { } } - session.buffer += chunk - if (session.buffer.length <= BUFFER_LIMIT) return - const excess = session.buffer.length - BUFFER_LIMIT - session.buffer = session.buffer.slice(excess) - session.bufferCursor += excess + session.bufferChunks.push(chunk) + session.bufferLen += chunk.length + while (session.bufferLen > BUFFER_LIMIT && session.bufferChunks.length > 1) { + const removed = session.bufferChunks.shift()! + session.bufferLen -= removed.length + session.bufferCursor += removed.length + } }) ptyProcess.onExit(({ exitCode }) => { log.info("session exited", { id, exitCode }) @@ -285,11 +289,12 @@ export namespace Pty { cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0 const data = (() => { - if (!session.buffer) return "" + if (session.bufferChunks.length === 0) return "" if (from >= end) return "" + const combined = session.bufferChunks.join("") const offset = Math.max(0, from - start) - if (offset >= session.buffer.length) return "" - return session.buffer.slice(offset) + if (offset >= combined.length) return "" + return combined.slice(offset) })() if (data) { diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 9245426057c..bc1f7bde7fc 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -91,6 +91,11 @@ export namespace SessionCompaction { for (const part of toPrune) { if (part.state.status === "completed") { part.state.time.compacted = Date.now() + // Clear tool output and attachments from both DB and in-memory store. + // toModelMessages already substitutes "[Old tool result content cleared]" + // for compacted parts, so this aligns stored data with model behavior. + part.state.output = "[compacted]" + part.state.attachments = undefined await Session.updatePart(part) } } diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 22de477f8d1..abf552d7028 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -653,6 +653,7 @@ export namespace Session { await remove(child.id) } await unshare(sessionID).catch(() => {}) + await PermissionNext.clearSession(sessionID).catch(() => {}) // CASCADE delete handles messages and parts automatically Database.use((db) => { db.delete(SessionTable).where(eq(SessionTable.id, sessionID)).run() diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 75bd3c9dfac..a8f93ff0dac 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -29,7 +29,7 @@ import { ReadTool } from "../tool/read" import { FileTime } from "../file/time" import { Flag } from "../flag/flag" import { ulid } from "ulid" -import { spawn } from "child_process" + import { Command } from "../command" import { $, fileURLToPath, pathToFileURL } from "bun" import { ConfigMarkdown } from "../config/markdown" @@ -44,7 +44,8 @@ import { SessionStatus } from "./status" import { LLM } from "./llm" import { iife } from "@/util/iife" import { Shell } from "@/shell/shell" -import { Truncate } from "@/tool/truncation" +import { Truncate, StreamingOutput } from "@/tool/truncation" +import { spawn } from "child_process" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -1623,40 +1624,32 @@ NOTE: At any point in time through this workflow you should feel free to ask the { cwd, sessionID: input.sessionID, callID: part.callID }, { env: {} }, ) + const streaming = new StreamingOutput() + const proc = spawn(shell, args, { cwd, detached: process.platform !== "win32", - stdio: ["ignore", "pipe", "pipe"], env: { ...process.env, ...shellEnv.env, TERM: "dumb", }, + stdio: ["ignore", "pipe", "pipe"], }) - let output = "" - - proc.stdout?.on("data", (chunk) => { - output += chunk.toString() + const append = (chunk: Buffer) => { + const preview = streaming.append(chunk) if (part.state.status === "running") { part.state.metadata = { - output: output, + output: preview, description: "", } Session.updatePart(part) } - }) + } - proc.stderr?.on("data", (chunk) => { - output += chunk.toString() - if (part.state.status === "running") { - part.state.metadata = { - output: output, - description: "", - } - Session.updatePart(part) - } - }) + proc.stdout?.on("data", append) + proc.stderr?.on("data", append) let aborted = false let exited = false @@ -1675,33 +1668,72 @@ NOTE: At any point in time through this workflow you should feel free to ask the abort.addEventListener("abort", abortHandler, { once: true }) - await new Promise((resolve) => { - proc.on("close", () => { - exited = true + await new Promise((resolve, reject) => { + const cleanup = () => { abort.removeEventListener("abort", abortHandler) + } + + proc.once("exit", () => { + exited = true + cleanup() + resolve() + }) + + proc.once("error", (error) => { + exited = true + cleanup() + reject(error) + }) + + proc.once("close", () => { + exited = true + cleanup() resolve() }) }) + streaming.close() + if (aborted) { - output += "\n\n" + ["", "User aborted the command", ""].join("\n") + streaming.appendMetadata("\n\n" + ["", "User aborted the command", ""].join("\n")) } + msg.time.completed = Date.now() await Session.updateMessage(msg) + if (part.state.status === "running") { - part.state = { - status: "completed", - time: { - ...part.state.time, - end: Date.now(), - }, - input: part.state.input, - title: "", - metadata: { + if (streaming.truncated) { + part.state = { + status: "completed", + time: { + ...part.state.time, + end: Date.now(), + }, + input: part.state.input, + title: "", + metadata: { + output: `[output streamed to file: ${streaming.totalBytes} bytes]`, + description: "", + outputPath: streaming.outputPath, + }, + output: streaming.finalize(), + } + } else { + const output = streaming.inMemoryOutput + part.state = { + status: "completed", + time: { + ...part.state.time, + end: Date.now(), + }, + input: part.state.input, + title: "", + metadata: { + output, + description: "", + }, output, - description: "", - }, - output, + } } await Session.updatePart(part) } diff --git a/packages/opencode/src/tool/bash.ts b/packages/opencode/src/tool/bash.ts index 0751f789b7d..78790485662 100644 --- a/packages/opencode/src/tool/bash.ts +++ b/packages/opencode/src/tool/bash.ts @@ -9,13 +9,13 @@ import { lazy } from "@/util/lazy" import { Language } from "web-tree-sitter" import { $ } from "bun" -import { Filesystem } from "@/util/filesystem" import { fileURLToPath } from "url" import { Flag } from "@/flag/flag.ts" import { Shell } from "@/shell/shell" +import { Filesystem } from "@/util/filesystem" import { BashArity } from "@/permission/arity" -import { Truncate } from "./truncation" +import { Truncate, StreamingOutput } from "./truncation" import { Plugin } from "@/plugin" const MAX_METADATA_LENGTH = 30_000 @@ -23,6 +23,19 @@ const DEFAULT_TIMEOUT = Flag.OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS || 2 export const log = Log.create({ service: "bash-tool" }) +export interface BashMetadata { + output: string + exit: number | null + description: string + truncated?: boolean + outputPath?: string + filtered?: boolean + filterPattern?: string + matchCount?: number + totalBytes?: number + omittedBytes?: number +} + const resolveWasm = (asset: string) => { if (asset.startsWith("file://")) return fileURLToPath(asset) if (asset.startsWith("/") || /^[a-z]:/i.test(asset)) return asset @@ -69,6 +82,12 @@ export const BashTool = Tool.define("bash", async () => { `The working directory to run the command in. Defaults to ${Instance.directory}. Use this instead of 'cd' commands.`, ) .optional(), + output_filter: z + .string() + .describe( + `Optional regex pattern to filter output. When set, full output streams to a file while lines matching the pattern are returned inline. Useful for build commands where you only care about warnings/errors. Example: "^(warning|error|WARN|ERROR):.*" to capture compiler diagnostics. The regex is matched against each line.`, + ) + .optional(), description: z .string() .describe( @@ -81,6 +100,16 @@ export const BashTool = Tool.define("bash", async () => { throw new Error(`Invalid timeout value: ${params.timeout}. Timeout must be a positive number.`) } const timeout = params.timeout ?? DEFAULT_TIMEOUT + + // Parse output_filter regex if provided + let filter: RegExp | undefined + if (params.output_filter) { + try { + filter = new RegExp(params.output_filter) + } catch (e) { + throw new Error(`Invalid output_filter regex: ${params.output_filter}. ${e}`) + } + } const tree = await parser().then((p) => p.parse(params.command)) if (!tree) { throw new Error("Failed to parse command") @@ -169,6 +198,8 @@ export const BashTool = Tool.define("bash", async () => { { cwd, sessionID: ctx.sessionID, callID: ctx.callID }, { env: {} }, ) + const streaming = new StreamingOutput({ filter }) + const proc = spawn(params.command, { shell, cwd, @@ -180,8 +211,6 @@ export const BashTool = Tool.define("bash", async () => { detached: process.platform !== "win32", }) - let output = "" - // Initialize metadata with empty output ctx.metadata({ metadata: { @@ -191,11 +220,12 @@ export const BashTool = Tool.define("bash", async () => { }) const append = (chunk: Buffer) => { - output += chunk.toString() + const preview = streaming.append(chunk) + const display = + preview.length > MAX_METADATA_LENGTH ? preview.slice(0, MAX_METADATA_LENGTH) + "\n\n..." : preview ctx.metadata({ metadata: { - // truncate the metadata to avoid GIANT blobs of data (has nothing to do w/ what agent can access) - output: output.length > MAX_METADATA_LENGTH ? output.slice(0, MAX_METADATA_LENGTH) + "\n\n..." : output, + output: display, description: params.description, }, }) @@ -244,29 +274,75 @@ export const BashTool = Tool.define("bash", async () => { cleanup() reject(error) }) + + proc.once("close", () => { + exited = true + cleanup() + resolve() + }) }) - const resultMetadata: string[] = [] + streaming.close() + const resultMetadata: string[] = [] if (timedOut) { resultMetadata.push(`bash tool terminated command after exceeding timeout ${timeout} ms`) } - if (aborted) { resultMetadata.push("User aborted the command") } - if (resultMetadata.length > 0) { - output += "\n\n\n" + resultMetadata.join("\n") + "\n" + streaming.appendMetadata("\n\n\n" + resultMetadata.join("\n") + "\n") } + // If using filter, return filtered lines + if (streaming.hasFilter) { + const output = streaming.truncated + ? `${streaming.filteredOutput}\n${streaming.finalize(params.output_filter)}` + : streaming.finalize(params.output_filter) + + return { + title: params.description, + metadata: { + output: streaming.filteredOutput || `[no matches for filter: ${params.output_filter}]`, + exit: proc.exitCode, + description: params.description, + truncated: streaming.truncated, + outputPath: streaming.outputPath, + filtered: true, + filterPattern: params.output_filter, + matchCount: streaming.matchCount, + totalBytes: streaming.totalBytes, + omittedBytes: streaming.omittedBytes, + } as BashMetadata, + output, + } + } + + // If we streamed to a file (threshold exceeded), return truncated result + if (streaming.truncated) { + return { + title: params.description, + metadata: { + output: `[output streamed to file: ${streaming.totalBytes} bytes]`, + exit: proc.exitCode, + description: params.description, + truncated: true, + outputPath: streaming.outputPath, + totalBytes: streaming.totalBytes, + } as BashMetadata, + output: streaming.finalize(), + } + } + + const output = streaming.inMemoryOutput return { title: params.description, metadata: { output: output.length > MAX_METADATA_LENGTH ? output.slice(0, MAX_METADATA_LENGTH) + "\n\n..." : output, exit: proc.exitCode, description: params.description, - }, + } as BashMetadata, output, } }, diff --git a/packages/opencode/src/tool/bash.txt b/packages/opencode/src/tool/bash.txt index 47e9378e755..2015c8e6f88 100644 --- a/packages/opencode/src/tool/bash.txt +++ b/packages/opencode/src/tool/bash.txt @@ -25,6 +25,7 @@ Usage notes: - You can specify an optional timeout in milliseconds. If not specified, commands will time out after 120000ms (2 minutes). - It is very helpful if you write a clear, concise description of what this command does in 5-10 words. - If the output exceeds ${maxLines} lines or ${maxBytes} bytes, it will be truncated and the full output will be written to a file. You can use Read with offset/limit to read specific sections or Grep to search the full content. Because of this, you do NOT need to use `head`, `tail`, or other truncation commands to limit output - just run the command directly. + - For build commands (make, cargo build, npm run build, tsc, etc.) that produce lots of output where you only care about warnings/errors, use the `output_filter` parameter with a regex like "^(warning|error|WARN|ERROR):". This streams full output to a file while returning only matching lines inline, saving you from having to grep the output afterward. - Avoid using Bash with the `find`, `grep`, `cat`, `head`, `tail`, `sed`, `awk`, or `echo` commands, unless explicitly instructed or when these commands are truly necessary for the task. Instead, always prefer using the dedicated tools for these commands: - File search: Use Glob (NOT find or ls) diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index 8c8cf827aba..94cb91f6bf3 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -152,6 +152,12 @@ export const TaskTool = Tool.define("task", async (ctx) => { "", ].join("\n") + // Clean up subagent session to free in-memory state (messages, parts, + // event listeners). The task output has already been captured above. + // If the LLM later tries to resume via task_id, Session.get() will + // fail gracefully and a fresh session will be created instead. + Session.remove(session.id).catch(() => {}) + return { title: params.description, metadata: { diff --git a/packages/opencode/src/tool/truncation.ts b/packages/opencode/src/tool/truncation.ts index 58b0cc13d9a..3819d909fbd 100644 --- a/packages/opencode/src/tool/truncation.ts +++ b/packages/opencode/src/tool/truncation.ts @@ -1,4 +1,5 @@ import fs from "fs/promises" +import fsSync from "fs" import path from "path" import { Global } from "../global" import { Identifier } from "../id/id" @@ -7,6 +8,188 @@ import type { Agent } from "../agent/agent" import { Scheduler } from "../scheduler" import { Filesystem } from "../util/filesystem" import { Glob } from "../util/glob" +import { Log } from "../util/log" + +const log = Log.create({ service: "truncation" }) + +export interface StreamingOutputOptions { + threshold?: number + /** Optional regex to filter output lines. Matching lines are collected separately. */ + filter?: RegExp +} + +/** + * Streaming output accumulator that spills to disk when threshold is exceeded. + * Avoids O(n²) memory growth from string concatenation. + * + * Optionally supports line filtering - when a filter regex is provided, matching + * lines are collected separately while full output still streams to file. + */ +export class StreamingOutput { + private output = "" + private outputBytes = 0 + private streamFile: { fd: number; path: string } | undefined + private streamedBytes = 0 + private threshold: number + private filter?: RegExp + private filtered = "" + private filteredBytes = 0 + private filteredCount = 0 + private lineBuffer = "" + + constructor(options: StreamingOutputOptions = {}) { + this.threshold = options.threshold ?? Truncate.MAX_BYTES + this.filter = options.filter + } + + /** Append a chunk of output. Returns the current preview string. */ + append(chunk: Buffer): string { + const text = chunk.toString() + this.outputBytes += chunk.length + + // Spill to file when threshold exceeded + if (!this.streamFile && this.outputBytes > this.threshold) { + this.streamFile = this.createStreamFile() + } + + if (this.streamFile) { + fsSync.writeSync(this.streamFile.fd, text) + this.streamedBytes += Buffer.byteLength(text, "utf-8") + } else { + this.output += text + } + + // Process filter if active + if (this.filter) { + this.lineBuffer += text + const lines = this.lineBuffer.split("\n") + this.lineBuffer = lines.pop() || "" + for (const line of lines) { + if (this.filter.test(line)) { + const entry = line + "\n" + this.filtered += entry + this.filteredBytes += Buffer.byteLength(entry, "utf-8") + this.filteredCount++ + } + } + } + + return this.preview() + } + + /** Get current preview - either full output, streaming indicator, or filter status */ + preview(): string { + if (this.filter) { + if (this.filtered) return this.filtered + return `[filtering: ${this.outputBytes} bytes, ${this.matchCount} matches...]\n` + } + if (this.streamFile) { + return `[streaming to file: ${this.streamedBytes} bytes written...]\n` + } + return this.output + } + + /** Whether output was streamed to file */ + get truncated(): boolean { + return this.streamFile !== undefined + } + + /** Total bytes written */ + get totalBytes(): number { + return this.streamFile ? this.streamedBytes : this.outputBytes + } + + /** Path to output file (if streaming) */ + get outputPath(): string | undefined { + return this.streamFile?.path + } + + /** Get the in-memory output (only valid if not truncated) */ + get inMemoryOutput(): string { + return this.output + } + + /** Get filtered output (only when filter is active) */ + get filteredOutput(): string { + return this.filtered + } + + /** Number of lines matching the filter */ + get matchCount(): number { + return this.filteredCount + } + + /** Bytes omitted by filtering */ + get omittedBytes(): number { + return this.totalBytes - this.filteredBytes + } + + /** Whether a filter is active */ + get hasFilter(): boolean { + return this.filter !== undefined + } + + /** Close the stream file if open. Call this after command completes. */ + close(): void { + // Process any remaining content in line buffer + if (this.filter && this.lineBuffer) { + if (this.filter.test(this.lineBuffer)) { + const entry = this.lineBuffer + "\n" + this.filtered += entry + this.filteredBytes += Buffer.byteLength(entry, "utf-8") + this.filteredCount++ + } + } + if (this.streamFile) { + fsSync.closeSync(this.streamFile.fd) + } + } + + /** Append metadata to output (either in memory or to file) */ + appendMetadata(text: string): void { + if (this.streamFile) { + fsSync.appendFileSync(this.streamFile.path, text) + } else { + this.output += text + } + } + + /** Get final output string (for non-truncated) or hint message (for truncated) */ + finalize(filterPattern?: string): string { + if (this.filter) { + if (this.streamFile) { + return `Filtered ${this.matchCount} matching lines from ${this.totalBytes} bytes of output.\nFull output saved to: ${this.streamFile.path}\nUse Grep to search or Read with offset/limit to view specific sections.\nNote: This file will be deleted after a few more commands. Copy it if you need to preserve it.` + } + return this.filtered || `[no matches for filter: ${filterPattern}]` + } + if (this.streamFile) { + return `The command output was ${this.streamedBytes} bytes and was truncated (inline limit: ${this.threshold} bytes).\nFull output saved to: ${this.streamFile.path}\nUse Grep to search the full content or Read with offset/limit to view specific sections.\nNote: This file will be deleted after a few more commands. Copy it if you need to preserve it.` + } + return this.output + } + + private createStreamFile(): { fd: number; path: string } | undefined { + let fd: number = -1 + try { + const dir = Truncate.DIR + fsSync.mkdirSync(dir, { recursive: true }) + Truncate.cleanup().catch(() => {}) + const filepath = path.join(dir, Identifier.ascending("tool")) + fd = fsSync.openSync(filepath, "w") + // Write existing buffered output to file + if (this.output) { + fsSync.writeSync(fd, this.output) + this.streamedBytes += Buffer.byteLength(this.output, "utf-8") + } + this.output = "" // Clear memory buffer + return { fd, path: filepath } + } catch (e) { + if (fd >= 0) fsSync.closeSync(fd) + log.warn("failed to create stream file, continuing in memory", { error: e }) + return undefined + } + } +} export namespace Truncate { export const MAX_LINES = 2000 diff --git a/packages/opencode/src/util/format.ts b/packages/opencode/src/util/format.ts index 4ae62eac450..c105fcdb738 100644 --- a/packages/opencode/src/util/format.ts +++ b/packages/opencode/src/util/format.ts @@ -1,3 +1,10 @@ +export function formatSize(bytes: number): string { + if (bytes < 1024) return `${bytes} B` + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB` + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MB` + return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)} GB` +} + export function formatDuration(secs: number) { if (secs <= 0) return "" if (secs < 60) return `${secs}s` diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f0..daf1cc4d49b 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,20 +1,45 @@ +const DONE = Symbol("queue.done") + export class AsyncQueue implements AsyncIterable { private queue: T[] = [] - private resolvers: ((value: T) => void)[] = [] + private resolvers: ((value: T | typeof DONE) => void)[] = [] + private closed = false push(item: T) { + if (this.closed) return const resolve = this.resolvers.shift() if (resolve) resolve(item) else this.queue.push(item) } - async next(): Promise { + close() { + if (this.closed) return + this.closed = true + for (const resolve of this.resolvers) { + resolve(DONE) + } + this.resolvers.length = 0 + this.queue.length = 0 + } + + drain(): T[] { + const items = [...this.queue] + this.queue.length = 0 + return items + } + + async next(): Promise { + if (this.closed) return DONE if (this.queue.length > 0) return this.queue.shift()! return new Promise((resolve) => this.resolvers.push(resolve)) } async *[Symbol.asyncIterator]() { - while (true) yield await this.next() + while (!this.closed) { + const value = await this.next() + if (value === DONE) return + yield value as T + } } } diff --git a/packages/opencode/test/tool/bash.test.ts b/packages/opencode/test/tool/bash.test.ts index ac93016927a..13560f6e144 100644 --- a/packages/opencode/test/tool/bash.test.ts +++ b/packages/opencode/test/tool/bash.test.ts @@ -349,7 +349,8 @@ describe("tool.bash truncation", () => { ) expect((result.metadata as any).truncated).toBe(true) expect(result.output).toContain("truncated") - expect(result.output).toContain("The tool call succeeded but the output was truncated") + // Streaming truncation uses different message format + expect(result.output).toContain("Full output saved to:") }, }) }) @@ -399,4 +400,210 @@ describe("tool.bash truncation", () => { }, }) }) + + test("streams to file during execution for very large output", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + // Generate 100KB of data - well over the 50KB threshold + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'x'`, + description: "Generate large streaming output", + }, + ctx, + ) + expect((result.metadata as any).truncated).toBe(true) + const filepath = (result.metadata as any).outputPath + expect(filepath).toBeTruthy() + + // Verify the full output was saved + const saved = await Bun.file(filepath).text() + expect(saved.length).toBe(byteCount) + expect(saved[0]).toBe("x") + expect(saved[byteCount - 1]).toBe("x") + }, + }) + }) + + test("preserves exit code when streaming to file", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'x'; exit 42`, + description: "Generate large output with non-zero exit", + }, + ctx, + ) + expect((result.metadata as any).truncated).toBe(true) + expect((result.metadata as any).exit).toBe(42) + }, + }) + }) + + test("streams stderr to file", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'e' >&2`, + description: "Generate large stderr output", + }, + ctx, + ) + expect((result.metadata as any).truncated).toBe(true) + const filepath = (result.metadata as any).outputPath + expect(filepath).toBeTruthy() + + const saved = await Bun.file(filepath).text() + expect(saved.length).toBe(byteCount) + expect(saved[0]).toBe("e") + }, + }) + }) + + test("output message contains file path when streaming", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'x'`, + description: "Check output message format", + }, + ctx, + ) + const filepath = (result.metadata as any).outputPath + expect(result.output).toContain("Full output saved to:") + expect(result.output).toContain(filepath) + }, + }) + }) + + test("output_filter captures matching lines in memory for small output", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + // Small build output with warnings/errors - stays in memory + const result = await bash.execute( + { + command: + "bun -e \"console.log('compiling...');console.log('warning: unused variable');console.log('done');console.log('error: type mismatch')\"", + output_filter: "^(warning|error):", + description: "Build with filter (small)", + }, + ctx, + ) + + // Should NOT be truncated (small output stays in memory) + expect((result.metadata as any).truncated).toBe(false) + expect((result.metadata as any).filtered).toBe(true) + expect((result.metadata as any).matchCount).toBe(2) + + // The output should contain only the filtered lines + expect(result.output).toContain("warning: unused variable") + expect(result.output).toContain("error: type mismatch") + expect(result.output).not.toContain("compiling...") + expect(result.output).not.toContain("done") + }, + }) + }) + + test("output_filter streams to file when output exceeds threshold", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + // Generate large output that exceeds threshold + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'x'; echo ""; echo "warning: this is a warning"`, + output_filter: "^warning:", + description: "Build with filter (large)", + }, + ctx, + ) + + // Should be truncated (large output spills to file) + expect((result.metadata as any).truncated).toBe(true) + expect((result.metadata as any).filtered).toBe(true) + const filepath = (result.metadata as any).outputPath + expect(filepath).toBeTruthy() + + // The inline output should contain only the filtered line + expect(result.output).toContain("warning: this is a warning") + expect(result.output).toContain("Filtered 1 matching line") + + // The full output file should contain everything + const saved = await Bun.file(filepath).text() + expect(saved.length).toBeGreaterThan(byteCount) + expect(saved).toContain("warning: this is a warning") + }, + }) + }) + + test("output_filter with no matches returns empty filtered output", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + const result = await bash.execute( + { + command: `echo "all good"; echo "no problems here"`, + output_filter: "^(warning|error):", + description: "Build with no matches", + }, + ctx, + ) + + // Small output stays in memory, no truncation + expect((result.metadata as any).truncated).toBe(false) + expect((result.metadata as any).filtered).toBe(true) + expect((result.metadata as any).matchCount).toBe(0) + expect(result.output).toContain("[no matches for filter:") + }, + }) + }) + + test("invalid output_filter regex throws error", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + await expect( + bash.execute( + { + command: `echo test`, + output_filter: "[invalid(regex", + description: "Invalid regex test", + }, + ctx, + ), + ).rejects.toThrow("Invalid output_filter regex") + }, + }) + }) }) diff --git a/packages/sdk/js/src/gen/core/serverSentEvents.gen.ts b/packages/sdk/js/src/gen/core/serverSentEvents.gen.ts index 8f7fac549d2..88cc283fafd 100644 --- a/packages/sdk/js/src/gen/core/serverSentEvents.gen.ts +++ b/packages/sdk/js/src/gen/core/serverSentEvents.gen.ts @@ -119,10 +119,16 @@ export const createSseClient = ({ signal.addEventListener("abort", abortHandler) + const MAX_BUFFER_SIZE = 10_485_760 // 10 MB try { while (true) { const { done, value } = await reader.read() if (done) break + if (buffer.length + value.length > MAX_BUFFER_SIZE) { + throw new Error( + `SSE buffer overflow: accumulated ${buffer.length + value.length} bytes (limit ${MAX_BUFFER_SIZE})`, + ) + } buffer += value const chunks = buffer.split("\n\n") diff --git a/packages/sdk/js/src/server.ts b/packages/sdk/js/src/server.ts index 174131ccfd5..5006859c5c4 100644 --- a/packages/sdk/js/src/server.ts +++ b/packages/sdk/js/src/server.ts @@ -75,10 +75,14 @@ export async function createOpencodeServer(options?: ServerOptions) { reject(error) }) if (options.signal) { - options.signal.addEventListener("abort", () => { - clearTimeout(id) - reject(new Error("Aborted")) - }) + options.signal.addEventListener( + "abort", + () => { + clearTimeout(id) + reject(new Error("Aborted")) + }, + { once: true }, + ) } }) diff --git a/packages/sdk/js/src/v2/gen/core/serverSentEvents.gen.ts b/packages/sdk/js/src/v2/gen/core/serverSentEvents.gen.ts index 056a8125932..c3eb5c3f388 100644 --- a/packages/sdk/js/src/v2/gen/core/serverSentEvents.gen.ts +++ b/packages/sdk/js/src/v2/gen/core/serverSentEvents.gen.ts @@ -146,10 +146,16 @@ export const createSseClient = ({ signal.addEventListener("abort", abortHandler) + const MAX_BUFFER_SIZE = 10_485_760 // 10 MB try { while (true) { const { done, value } = await reader.read() if (done) break + if (buffer.length + value.length > MAX_BUFFER_SIZE) { + throw new Error( + `SSE buffer overflow: accumulated ${buffer.length + value.length} bytes (limit ${MAX_BUFFER_SIZE})`, + ) + } buffer += value // Normalize line endings: CRLF -> LF, then CR -> LF buffer = buffer.replace(/\r\n/g, "\n").replace(/\r/g, "\n") diff --git a/packages/sdk/js/src/v2/server.ts b/packages/sdk/js/src/v2/server.ts index 174131ccfd5..5006859c5c4 100644 --- a/packages/sdk/js/src/v2/server.ts +++ b/packages/sdk/js/src/v2/server.ts @@ -75,10 +75,14 @@ export async function createOpencodeServer(options?: ServerOptions) { reject(error) }) if (options.signal) { - options.signal.addEventListener("abort", () => { - clearTimeout(id) - reject(new Error("Aborted")) - }) + options.signal.addEventListener( + "abort", + () => { + clearTimeout(id) + reject(new Error("Aborted")) + }, + { once: true }, + ) } })