From 6efcd2099160b132b7f1fdd3b0365230d536be16 Mon Sep 17 00:00:00 2001 From: Keaton Hoskins Date: Sat, 28 Feb 2026 15:14:29 +0800 Subject: [PATCH 1/5] fix: resolve 10 memory leak sources across multiple subsystems Fixes subagent session deallocation, delta string cap, tool output on compact, SSE buffer guard, MCP OAuth transport TTL, pending permissions cleanup, TUI event listener cleanup, LSP diagnostics cap, and AbortSignal listener. Original work from PR #14650 by @kryptobaseddev Co-authored-by: Keaton Hoskins --- .../src/context/global-sync/event-reducer.ts | 7 ++++- packages/opencode/src/cli/cmd/tui/app.tsx | 23 +++++++++----- .../cli/cmd/tui/component/prompt/index.tsx | 3 +- .../src/cli/cmd/tui/routes/session/index.tsx | 4 ++- packages/opencode/src/lsp/client.ts | 12 +++++++ packages/opencode/src/mcp/index.ts | 31 ++++++++++++++++--- packages/opencode/src/permission/next.ts | 14 +++++++++ packages/opencode/src/session/compaction.ts | 5 +++ packages/opencode/src/session/index.ts | 1 + packages/opencode/src/tool/task.ts | 6 ++++ .../js/src/gen/core/serverSentEvents.gen.ts | 6 ++++ packages/sdk/js/src/server.ts | 12 ++++--- .../src/v2/gen/core/serverSentEvents.gen.ts | 6 ++++ packages/sdk/js/src/v2/server.ts | 12 ++++--- 14 files changed, 120 insertions(+), 22 deletions(-) diff --git a/packages/app/src/context/global-sync/event-reducer.ts b/packages/app/src/context/global-sync/event-reducer.ts index 241dfb14d7d..433c9f661a8 100644 --- a/packages/app/src/context/global-sync/event-reducer.ts +++ b/packages/app/src/context/global-sync/event-reducer.ts @@ -252,7 +252,12 @@ export function applyDirectoryEvent(input: { const part = draft[result.index] const field = props.field as keyof typeof part const existing = part[field] as string | undefined - ;(part[field] as string) = (existing ?? "") + props.delta + const MAX_PART_STRING_LENGTH = 1_048_576 // 1 MB per part field + const combined = (existing ?? "") + props.delta + ;(part[field] as string) = + combined.length > MAX_PART_STRING_LENGTH + ? combined.slice(combined.length - MAX_PART_STRING_LENGTH) + : combined }), ) break diff --git a/packages/opencode/src/cli/cmd/tui/app.tsx b/packages/opencode/src/cli/cmd/tui/app.tsx index 97c910a47d4..01ae2678369 100644 --- a/packages/opencode/src/cli/cmd/tui/app.tsx +++ b/packages/opencode/src/cli/cmd/tui/app.tsx @@ -3,7 +3,7 @@ import { Clipboard } from "@tui/util/clipboard" import { Selection } from "@tui/util/selection" import { MouseButton, TextAttributes } from "@opentui/core" import { RouteProvider, useRoute } from "@tui/context/route" -import { Switch, Match, createEffect, untrack, ErrorBoundary, createSignal, onMount, batch, Show, on } from "solid-js" +import { Switch, Match, createEffect, untrack, ErrorBoundary, createSignal, onMount, onCleanup, batch, Show, on } from "solid-js" import { win32DisableProcessedInput, win32FlushInputBuffer, win32InstallCtrlCGuard } from "./win32" import { Installation } from "@/installation" import { Flag } from "@/flag/flag" @@ -673,11 +673,11 @@ function App() { } }) - sdk.event.on(TuiEvent.CommandExecute.type, (evt) => { + const unsub1 = sdk.event.on(TuiEvent.CommandExecute.type, (evt) => { command.trigger(evt.properties.command) }) - sdk.event.on(TuiEvent.ToastShow.type, (evt) => { + const unsub2 = sdk.event.on(TuiEvent.ToastShow.type, (evt) => { toast.show({ title: evt.properties.title, message: evt.properties.message, @@ -686,14 +686,14 @@ function App() { }) }) - sdk.event.on(TuiEvent.SessionSelect.type, (evt) => { + const unsub3 = sdk.event.on(TuiEvent.SessionSelect.type, (evt) => { route.navigate({ type: "session", sessionID: evt.properties.sessionID, }) }) - sdk.event.on(SessionApi.Event.Deleted.type, (evt) => { + const unsub4 = sdk.event.on(SessionApi.Event.Deleted.type, (evt) => { if (route.data.type === "session" && route.data.sessionID === evt.properties.info.id) { route.navigate({ type: "home" }) toast.show({ @@ -703,7 +703,7 @@ function App() { } }) - sdk.event.on(SessionApi.Event.Error.type, (evt) => { + const unsub5 = sdk.event.on(SessionApi.Event.Error.type, (evt) => { const error = evt.properties.error if (error && typeof error === "object" && error.name === "MessageAbortedError") return const message = (() => { @@ -725,7 +725,7 @@ function App() { }) }) - sdk.event.on(Installation.Event.UpdateAvailable.type, (evt) => { + const unsub6 = sdk.event.on(Installation.Event.UpdateAvailable.type, (evt) => { toast.show({ variant: "info", title: "Update Available", @@ -734,6 +734,15 @@ function App() { }) }) + onCleanup(() => { + unsub1() + unsub2() + unsub3() + unsub4() + unsub5() + unsub6() + }) + return ( { + const unsubPromptAppend = sdk.event.on(TuiEvent.PromptAppend.type, (evt) => { if (!input || input.isDestroyed) return input.insertText(evt.properties.text) setTimeout(() => { @@ -107,6 +107,7 @@ export function Prompt(props: PromptProps) { renderer.requestRender() }, 0) }) + onCleanup(unsubPromptAppend) createEffect(() => { if (props.disabled) input.cursorColor = theme.backgroundElement diff --git a/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx b/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx index 31401836766..4e9a9a063fc 100644 --- a/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx +++ b/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx @@ -7,6 +7,7 @@ import { For, Match, on, + onCleanup, Show, Switch, useContext, @@ -208,7 +209,7 @@ export function Session() { }) let lastSwitch: string | undefined = undefined - sdk.event.on("message.part.updated", (evt) => { + const unsubPartUpdated = sdk.event.on("message.part.updated", (evt) => { const part = evt.properties.part if (part.type !== "tool") return if (part.sessionID !== route.sessionID) return @@ -223,6 +224,7 @@ export function Session() { lastSwitch = part.id } }) + onCleanup(unsubPartUpdated) let scroll: ScrollBoxRenderable let prompt: PromptRef diff --git a/packages/opencode/src/lsp/client.ts b/packages/opencode/src/lsp/client.ts index 084ccf831ee..8f0adad97f0 100644 --- a/packages/opencode/src/lsp/client.ts +++ b/packages/opencode/src/lsp/client.ts @@ -48,6 +48,7 @@ export namespace LSPClient { new StreamMessageWriter(input.server.process.stdin as any), ) + const MAX_DIAGNOSTICS_FILES = 2_000 const diagnostics = new Map() connection.onNotification("textDocument/publishDiagnostics", (params) => { const filePath = Filesystem.normalizePath(fileURLToPath(params.uri)) @@ -56,6 +57,11 @@ export namespace LSPClient { count: params.diagnostics.length, }) const exists = diagnostics.has(filePath) + if (!exists && diagnostics.size >= MAX_DIAGNOSTICS_FILES) { + // Evict the oldest entry to stay within the size limit + const oldest = diagnostics.keys().next().value + if (oldest !== undefined) diagnostics.delete(oldest) + } diagnostics.set(filePath, params.diagnostics) if (!exists && input.serverID === "typescript") return Bus.publish(Event.Diagnostics, { path: filePath, serverID: input.serverID }) @@ -132,6 +138,7 @@ export namespace LSPClient { }) } + const MAX_OPEN_FILES = 1_000 const files: { [path: string]: number } = {} @@ -191,6 +198,11 @@ export namespace LSPClient { log.info("textDocument/didOpen", input) diagnostics.delete(input.path) + // Evict oldest tracked file if we're at the limit + if (Object.keys(files).length >= MAX_OPEN_FILES) { + const oldest = Object.keys(files)[0] + if (oldest) delete files[oldest] + } await connection.sendNotification("textDocument/didOpen", { textDocument: { uri: pathToFileURL(input.path).href, diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 3c29fe03d30..92ea0c3b3ac 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -150,6 +150,27 @@ export namespace MCP { // Store transports for OAuth servers to allow finishing auth type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport const pendingOAuthTransports = new Map() + const pendingOAuthTimers = new Map>() + const OAUTH_TRANSPORT_TTL_MS = 5 * 60 * 1_000 // 5 minutes + + function setPendingOAuthTransport(key: string, transport: TransportWithAuth) { + const existing = pendingOAuthTimers.get(key) + if (existing) clearTimeout(existing) + pendingOAuthTransports.set(key, transport) + const timer = setTimeout(() => { + pendingOAuthTransports.delete(key) + pendingOAuthTimers.delete(key) + log.info("evicted stale pending OAuth transport", { key }) + }, OAUTH_TRANSPORT_TTL_MS) + pendingOAuthTimers.set(key, timer) + } + + function deletePendingOAuthTransport(key: string) { + const timer = pendingOAuthTimers.get(key) + if (timer) clearTimeout(timer) + pendingOAuthTimers.delete(key) + pendingOAuthTransports.delete(key) + } // Prompt cache types type PromptInfo = Awaited>["prompts"][number] @@ -205,6 +226,8 @@ export namespace MCP { }), ), ) + for (const timer of pendingOAuthTimers.values()) clearTimeout(timer) + pendingOAuthTimers.clear() pendingOAuthTransports.clear() }, ) @@ -378,7 +401,7 @@ export namespace MCP { }).catch((e) => log.debug("failed to show toast", { error: e })) } else { // Store transport for later finishAuth call - pendingOAuthTransports.set(key, transport) + setPendingOAuthTransport(key, transport) status = { status: "needs_auth" as const } // Show toast for needs_auth Bus.publish(TuiEvent.ToastShow, { @@ -772,7 +795,7 @@ export namespace MCP { } catch (error) { if (error instanceof UnauthorizedError && capturedUrl) { // Store transport for finishAuth - pendingOAuthTransports.set(mcpName, transport) + setPendingOAuthTransport(mcpName, transport) return { authorizationUrl: capturedUrl.toString() } } throw error @@ -879,7 +902,7 @@ export namespace MCP { } // Re-add the MCP server to establish connection - pendingOAuthTransports.delete(mcpName) + deletePendingOAuthTransport(mcpName) const result = await add(mcpName, mcpConfig) const statusRecord = result.status as Record @@ -899,7 +922,7 @@ export namespace MCP { export async function removeAuth(mcpName: string): Promise { await McpAuth.remove(mcpName) McpOAuthCallback.cancelPending(mcpName) - pendingOAuthTransports.delete(mcpName) + deletePendingOAuthTransport(mcpName) await McpAuth.clearOAuthState(mcpName) log.info("removed oauth credentials", { mcpName }) } diff --git a/packages/opencode/src/permission/next.ts b/packages/opencode/src/permission/next.ts index 1e1df62a3ce..67eb7d9415a 100644 --- a/packages/opencode/src/permission/next.ts +++ b/packages/opencode/src/permission/next.ts @@ -283,4 +283,18 @@ export namespace PermissionNext { const s = await state() return Object.values(s.pending).map((x) => x.info) } + + export async function clearSession(sessionID: string) { + const s = await state() + for (const [id, pending] of Object.entries(s.pending)) { + if (pending.info.sessionID !== sessionID) continue + delete s.pending[id] + pending.reject(new RejectedError()) + Bus.publish(Event.Replied, { + sessionID, + requestID: id, + reply: "reject", + }) + } + } } diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 9245426057c..bc1f7bde7fc 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -91,6 +91,11 @@ export namespace SessionCompaction { for (const part of toPrune) { if (part.state.status === "completed") { part.state.time.compacted = Date.now() + // Clear tool output and attachments from both DB and in-memory store. + // toModelMessages already substitutes "[Old tool result content cleared]" + // for compacted parts, so this aligns stored data with model behavior. + part.state.output = "[compacted]" + part.state.attachments = undefined await Session.updatePart(part) } } diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 22de477f8d1..abf552d7028 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -653,6 +653,7 @@ export namespace Session { await remove(child.id) } await unshare(sessionID).catch(() => {}) + await PermissionNext.clearSession(sessionID).catch(() => {}) // CASCADE delete handles messages and parts automatically Database.use((db) => { db.delete(SessionTable).where(eq(SessionTable.id, sessionID)).run() diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index 8c8cf827aba..94cb91f6bf3 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -152,6 +152,12 @@ export const TaskTool = Tool.define("task", async (ctx) => { "", ].join("\n") + // Clean up subagent session to free in-memory state (messages, parts, + // event listeners). The task output has already been captured above. + // If the LLM later tries to resume via task_id, Session.get() will + // fail gracefully and a fresh session will be created instead. + Session.remove(session.id).catch(() => {}) + return { title: params.description, metadata: { diff --git a/packages/sdk/js/src/gen/core/serverSentEvents.gen.ts b/packages/sdk/js/src/gen/core/serverSentEvents.gen.ts index 8f7fac549d2..88cc283fafd 100644 --- a/packages/sdk/js/src/gen/core/serverSentEvents.gen.ts +++ b/packages/sdk/js/src/gen/core/serverSentEvents.gen.ts @@ -119,10 +119,16 @@ export const createSseClient = ({ signal.addEventListener("abort", abortHandler) + const MAX_BUFFER_SIZE = 10_485_760 // 10 MB try { while (true) { const { done, value } = await reader.read() if (done) break + if (buffer.length + value.length > MAX_BUFFER_SIZE) { + throw new Error( + `SSE buffer overflow: accumulated ${buffer.length + value.length} bytes (limit ${MAX_BUFFER_SIZE})`, + ) + } buffer += value const chunks = buffer.split("\n\n") diff --git a/packages/sdk/js/src/server.ts b/packages/sdk/js/src/server.ts index 174131ccfd5..5006859c5c4 100644 --- a/packages/sdk/js/src/server.ts +++ b/packages/sdk/js/src/server.ts @@ -75,10 +75,14 @@ export async function createOpencodeServer(options?: ServerOptions) { reject(error) }) if (options.signal) { - options.signal.addEventListener("abort", () => { - clearTimeout(id) - reject(new Error("Aborted")) - }) + options.signal.addEventListener( + "abort", + () => { + clearTimeout(id) + reject(new Error("Aborted")) + }, + { once: true }, + ) } }) diff --git a/packages/sdk/js/src/v2/gen/core/serverSentEvents.gen.ts b/packages/sdk/js/src/v2/gen/core/serverSentEvents.gen.ts index 056a8125932..c3eb5c3f388 100644 --- a/packages/sdk/js/src/v2/gen/core/serverSentEvents.gen.ts +++ b/packages/sdk/js/src/v2/gen/core/serverSentEvents.gen.ts @@ -146,10 +146,16 @@ export const createSseClient = ({ signal.addEventListener("abort", abortHandler) + const MAX_BUFFER_SIZE = 10_485_760 // 10 MB try { while (true) { const { done, value } = await reader.read() if (done) break + if (buffer.length + value.length > MAX_BUFFER_SIZE) { + throw new Error( + `SSE buffer overflow: accumulated ${buffer.length + value.length} bytes (limit ${MAX_BUFFER_SIZE})`, + ) + } buffer += value // Normalize line endings: CRLF -> LF, then CR -> LF buffer = buffer.replace(/\r\n/g, "\n").replace(/\r/g, "\n") diff --git a/packages/sdk/js/src/v2/server.ts b/packages/sdk/js/src/v2/server.ts index 174131ccfd5..5006859c5c4 100644 --- a/packages/sdk/js/src/v2/server.ts +++ b/packages/sdk/js/src/v2/server.ts @@ -75,10 +75,14 @@ export async function createOpencodeServer(options?: ServerOptions) { reject(error) }) if (options.signal) { - options.signal.addEventListener("abort", () => { - clearTimeout(id) - reject(new Error("Aborted")) - }) + options.signal.addEventListener( + "abort", + () => { + clearTimeout(id) + reject(new Error("Aborted")) + }, + { once: true }, + ) } }) From 2e0f23206c5c77692022d323b3a4c9682b2e37cd Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 28 Feb 2026 15:14:38 +0800 Subject: [PATCH 2/5] =?UTF-8?q?fix:=20stream=20large=20bash=20output=20to?= =?UTF-8?q?=20tmpfile=20to=20prevent=20O(n=C2=B2)=20memory=20growth?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stream command output directly to a temp file when it exceeds threshold, avoiding memory bloat for commands with huge output. Adds output_filter param for regex-based line filtering. Original work from PR #8953 by @cgwalters Co-authored-by: Colin Walters --- .../src/cli/cmd/tui/routes/session/index.tsx | 12 ++ packages/opencode/src/cli/cmd/uninstall.ts | 8 +- packages/opencode/src/session/prompt.ts | 102 +++++---- packages/opencode/src/tool/bash.ts | 100 +++++++-- packages/opencode/src/tool/bash.txt | 1 + packages/opencode/src/tool/truncation.ts | 183 ++++++++++++++++ packages/opencode/src/util/format.ts | 7 + packages/opencode/test/tool/bash.test.ts | 198 +++++++++++++++++- 8 files changed, 556 insertions(+), 55 deletions(-) diff --git a/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx b/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx index 4e9a9a063fc..e4783a04825 100644 --- a/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx +++ b/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx @@ -32,6 +32,7 @@ import { Prompt, type PromptRef } from "@tui/component/prompt" import type { AssistantMessage, Part, ToolPart, UserMessage, TextPart, ReasoningPart } from "@opencode-ai/sdk/v2" import { useLocal } from "@tui/context/local" import { Locale } from "@/util/locale" +import { formatSize } from "@/util/format" import type { Tool } from "@/tool/tool" import type { ReadTool } from "@/tool/read" import type { WriteTool } from "@/tool/write" @@ -1739,6 +1740,14 @@ function Bash(props: ToolProps) { return [...lines().slice(0, 10), "…"].join("\n") }) + const filterInfo = createMemo(() => { + if (!props.metadata.filtered) return undefined + const total = formatSize(props.metadata.totalBytes ?? 0) + const omitted = formatSize(props.metadata.omittedBytes ?? 0) + const matches = props.metadata.matchCount ?? 0 + return `Filtered: ${matches} match${matches === 1 ? "" : "es"} from ${total} (${omitted} omitted)` + }) + const workdirDisplay = createMemo(() => { const workdir = props.input.workdir if (!workdir || workdir === ".") return undefined @@ -1778,6 +1787,9 @@ function Bash(props: ToolProps) { {limited()} + + {filterInfo()} + {expanded() ? "Click to collapse" : "Click to expand"} diff --git a/packages/opencode/src/cli/cmd/uninstall.ts b/packages/opencode/src/cli/cmd/uninstall.ts index 3d8e7e3f75e..2f39b661094 100644 --- a/packages/opencode/src/cli/cmd/uninstall.ts +++ b/packages/opencode/src/cli/cmd/uninstall.ts @@ -3,6 +3,7 @@ import { UI } from "../ui" import * as prompts from "@clack/prompts" import { Installation } from "../../installation" import { Global } from "../../global" +import { formatSize } from "../../util/format" import { $ } from "bun" import fs from "fs/promises" import path from "path" @@ -340,13 +341,6 @@ async function getDirectorySize(dir: string): Promise { return total } -function formatSize(bytes: number): string { - if (bytes < 1024) return `${bytes} B` - if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB` - if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MB` - return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)} GB` -} - function shortenPath(p: string): string { const home = os.homedir() if (p.startsWith(home)) { diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 75bd3c9dfac..a8f93ff0dac 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -29,7 +29,7 @@ import { ReadTool } from "../tool/read" import { FileTime } from "../file/time" import { Flag } from "../flag/flag" import { ulid } from "ulid" -import { spawn } from "child_process" + import { Command } from "../command" import { $, fileURLToPath, pathToFileURL } from "bun" import { ConfigMarkdown } from "../config/markdown" @@ -44,7 +44,8 @@ import { SessionStatus } from "./status" import { LLM } from "./llm" import { iife } from "@/util/iife" import { Shell } from "@/shell/shell" -import { Truncate } from "@/tool/truncation" +import { Truncate, StreamingOutput } from "@/tool/truncation" +import { spawn } from "child_process" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -1623,40 +1624,32 @@ NOTE: At any point in time through this workflow you should feel free to ask the { cwd, sessionID: input.sessionID, callID: part.callID }, { env: {} }, ) + const streaming = new StreamingOutput() + const proc = spawn(shell, args, { cwd, detached: process.platform !== "win32", - stdio: ["ignore", "pipe", "pipe"], env: { ...process.env, ...shellEnv.env, TERM: "dumb", }, + stdio: ["ignore", "pipe", "pipe"], }) - let output = "" - - proc.stdout?.on("data", (chunk) => { - output += chunk.toString() + const append = (chunk: Buffer) => { + const preview = streaming.append(chunk) if (part.state.status === "running") { part.state.metadata = { - output: output, + output: preview, description: "", } Session.updatePart(part) } - }) + } - proc.stderr?.on("data", (chunk) => { - output += chunk.toString() - if (part.state.status === "running") { - part.state.metadata = { - output: output, - description: "", - } - Session.updatePart(part) - } - }) + proc.stdout?.on("data", append) + proc.stderr?.on("data", append) let aborted = false let exited = false @@ -1675,33 +1668,72 @@ NOTE: At any point in time through this workflow you should feel free to ask the abort.addEventListener("abort", abortHandler, { once: true }) - await new Promise((resolve) => { - proc.on("close", () => { - exited = true + await new Promise((resolve, reject) => { + const cleanup = () => { abort.removeEventListener("abort", abortHandler) + } + + proc.once("exit", () => { + exited = true + cleanup() + resolve() + }) + + proc.once("error", (error) => { + exited = true + cleanup() + reject(error) + }) + + proc.once("close", () => { + exited = true + cleanup() resolve() }) }) + streaming.close() + if (aborted) { - output += "\n\n" + ["", "User aborted the command", ""].join("\n") + streaming.appendMetadata("\n\n" + ["", "User aborted the command", ""].join("\n")) } + msg.time.completed = Date.now() await Session.updateMessage(msg) + if (part.state.status === "running") { - part.state = { - status: "completed", - time: { - ...part.state.time, - end: Date.now(), - }, - input: part.state.input, - title: "", - metadata: { + if (streaming.truncated) { + part.state = { + status: "completed", + time: { + ...part.state.time, + end: Date.now(), + }, + input: part.state.input, + title: "", + metadata: { + output: `[output streamed to file: ${streaming.totalBytes} bytes]`, + description: "", + outputPath: streaming.outputPath, + }, + output: streaming.finalize(), + } + } else { + const output = streaming.inMemoryOutput + part.state = { + status: "completed", + time: { + ...part.state.time, + end: Date.now(), + }, + input: part.state.input, + title: "", + metadata: { + output, + description: "", + }, output, - description: "", - }, - output, + } } await Session.updatePart(part) } diff --git a/packages/opencode/src/tool/bash.ts b/packages/opencode/src/tool/bash.ts index 0751f789b7d..78790485662 100644 --- a/packages/opencode/src/tool/bash.ts +++ b/packages/opencode/src/tool/bash.ts @@ -9,13 +9,13 @@ import { lazy } from "@/util/lazy" import { Language } from "web-tree-sitter" import { $ } from "bun" -import { Filesystem } from "@/util/filesystem" import { fileURLToPath } from "url" import { Flag } from "@/flag/flag.ts" import { Shell } from "@/shell/shell" +import { Filesystem } from "@/util/filesystem" import { BashArity } from "@/permission/arity" -import { Truncate } from "./truncation" +import { Truncate, StreamingOutput } from "./truncation" import { Plugin } from "@/plugin" const MAX_METADATA_LENGTH = 30_000 @@ -23,6 +23,19 @@ const DEFAULT_TIMEOUT = Flag.OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS || 2 export const log = Log.create({ service: "bash-tool" }) +export interface BashMetadata { + output: string + exit: number | null + description: string + truncated?: boolean + outputPath?: string + filtered?: boolean + filterPattern?: string + matchCount?: number + totalBytes?: number + omittedBytes?: number +} + const resolveWasm = (asset: string) => { if (asset.startsWith("file://")) return fileURLToPath(asset) if (asset.startsWith("/") || /^[a-z]:/i.test(asset)) return asset @@ -69,6 +82,12 @@ export const BashTool = Tool.define("bash", async () => { `The working directory to run the command in. Defaults to ${Instance.directory}. Use this instead of 'cd' commands.`, ) .optional(), + output_filter: z + .string() + .describe( + `Optional regex pattern to filter output. When set, full output streams to a file while lines matching the pattern are returned inline. Useful for build commands where you only care about warnings/errors. Example: "^(warning|error|WARN|ERROR):.*" to capture compiler diagnostics. The regex is matched against each line.`, + ) + .optional(), description: z .string() .describe( @@ -81,6 +100,16 @@ export const BashTool = Tool.define("bash", async () => { throw new Error(`Invalid timeout value: ${params.timeout}. Timeout must be a positive number.`) } const timeout = params.timeout ?? DEFAULT_TIMEOUT + + // Parse output_filter regex if provided + let filter: RegExp | undefined + if (params.output_filter) { + try { + filter = new RegExp(params.output_filter) + } catch (e) { + throw new Error(`Invalid output_filter regex: ${params.output_filter}. ${e}`) + } + } const tree = await parser().then((p) => p.parse(params.command)) if (!tree) { throw new Error("Failed to parse command") @@ -169,6 +198,8 @@ export const BashTool = Tool.define("bash", async () => { { cwd, sessionID: ctx.sessionID, callID: ctx.callID }, { env: {} }, ) + const streaming = new StreamingOutput({ filter }) + const proc = spawn(params.command, { shell, cwd, @@ -180,8 +211,6 @@ export const BashTool = Tool.define("bash", async () => { detached: process.platform !== "win32", }) - let output = "" - // Initialize metadata with empty output ctx.metadata({ metadata: { @@ -191,11 +220,12 @@ export const BashTool = Tool.define("bash", async () => { }) const append = (chunk: Buffer) => { - output += chunk.toString() + const preview = streaming.append(chunk) + const display = + preview.length > MAX_METADATA_LENGTH ? preview.slice(0, MAX_METADATA_LENGTH) + "\n\n..." : preview ctx.metadata({ metadata: { - // truncate the metadata to avoid GIANT blobs of data (has nothing to do w/ what agent can access) - output: output.length > MAX_METADATA_LENGTH ? output.slice(0, MAX_METADATA_LENGTH) + "\n\n..." : output, + output: display, description: params.description, }, }) @@ -244,29 +274,75 @@ export const BashTool = Tool.define("bash", async () => { cleanup() reject(error) }) + + proc.once("close", () => { + exited = true + cleanup() + resolve() + }) }) - const resultMetadata: string[] = [] + streaming.close() + const resultMetadata: string[] = [] if (timedOut) { resultMetadata.push(`bash tool terminated command after exceeding timeout ${timeout} ms`) } - if (aborted) { resultMetadata.push("User aborted the command") } - if (resultMetadata.length > 0) { - output += "\n\n\n" + resultMetadata.join("\n") + "\n" + streaming.appendMetadata("\n\n\n" + resultMetadata.join("\n") + "\n") } + // If using filter, return filtered lines + if (streaming.hasFilter) { + const output = streaming.truncated + ? `${streaming.filteredOutput}\n${streaming.finalize(params.output_filter)}` + : streaming.finalize(params.output_filter) + + return { + title: params.description, + metadata: { + output: streaming.filteredOutput || `[no matches for filter: ${params.output_filter}]`, + exit: proc.exitCode, + description: params.description, + truncated: streaming.truncated, + outputPath: streaming.outputPath, + filtered: true, + filterPattern: params.output_filter, + matchCount: streaming.matchCount, + totalBytes: streaming.totalBytes, + omittedBytes: streaming.omittedBytes, + } as BashMetadata, + output, + } + } + + // If we streamed to a file (threshold exceeded), return truncated result + if (streaming.truncated) { + return { + title: params.description, + metadata: { + output: `[output streamed to file: ${streaming.totalBytes} bytes]`, + exit: proc.exitCode, + description: params.description, + truncated: true, + outputPath: streaming.outputPath, + totalBytes: streaming.totalBytes, + } as BashMetadata, + output: streaming.finalize(), + } + } + + const output = streaming.inMemoryOutput return { title: params.description, metadata: { output: output.length > MAX_METADATA_LENGTH ? output.slice(0, MAX_METADATA_LENGTH) + "\n\n..." : output, exit: proc.exitCode, description: params.description, - }, + } as BashMetadata, output, } }, diff --git a/packages/opencode/src/tool/bash.txt b/packages/opencode/src/tool/bash.txt index 47e9378e755..2015c8e6f88 100644 --- a/packages/opencode/src/tool/bash.txt +++ b/packages/opencode/src/tool/bash.txt @@ -25,6 +25,7 @@ Usage notes: - You can specify an optional timeout in milliseconds. If not specified, commands will time out after 120000ms (2 minutes). - It is very helpful if you write a clear, concise description of what this command does in 5-10 words. - If the output exceeds ${maxLines} lines or ${maxBytes} bytes, it will be truncated and the full output will be written to a file. You can use Read with offset/limit to read specific sections or Grep to search the full content. Because of this, you do NOT need to use `head`, `tail`, or other truncation commands to limit output - just run the command directly. + - For build commands (make, cargo build, npm run build, tsc, etc.) that produce lots of output where you only care about warnings/errors, use the `output_filter` parameter with a regex like "^(warning|error|WARN|ERROR):". This streams full output to a file while returning only matching lines inline, saving you from having to grep the output afterward. - Avoid using Bash with the `find`, `grep`, `cat`, `head`, `tail`, `sed`, `awk`, or `echo` commands, unless explicitly instructed or when these commands are truly necessary for the task. Instead, always prefer using the dedicated tools for these commands: - File search: Use Glob (NOT find or ls) diff --git a/packages/opencode/src/tool/truncation.ts b/packages/opencode/src/tool/truncation.ts index 58b0cc13d9a..3819d909fbd 100644 --- a/packages/opencode/src/tool/truncation.ts +++ b/packages/opencode/src/tool/truncation.ts @@ -1,4 +1,5 @@ import fs from "fs/promises" +import fsSync from "fs" import path from "path" import { Global } from "../global" import { Identifier } from "../id/id" @@ -7,6 +8,188 @@ import type { Agent } from "../agent/agent" import { Scheduler } from "../scheduler" import { Filesystem } from "../util/filesystem" import { Glob } from "../util/glob" +import { Log } from "../util/log" + +const log = Log.create({ service: "truncation" }) + +export interface StreamingOutputOptions { + threshold?: number + /** Optional regex to filter output lines. Matching lines are collected separately. */ + filter?: RegExp +} + +/** + * Streaming output accumulator that spills to disk when threshold is exceeded. + * Avoids O(n²) memory growth from string concatenation. + * + * Optionally supports line filtering - when a filter regex is provided, matching + * lines are collected separately while full output still streams to file. + */ +export class StreamingOutput { + private output = "" + private outputBytes = 0 + private streamFile: { fd: number; path: string } | undefined + private streamedBytes = 0 + private threshold: number + private filter?: RegExp + private filtered = "" + private filteredBytes = 0 + private filteredCount = 0 + private lineBuffer = "" + + constructor(options: StreamingOutputOptions = {}) { + this.threshold = options.threshold ?? Truncate.MAX_BYTES + this.filter = options.filter + } + + /** Append a chunk of output. Returns the current preview string. */ + append(chunk: Buffer): string { + const text = chunk.toString() + this.outputBytes += chunk.length + + // Spill to file when threshold exceeded + if (!this.streamFile && this.outputBytes > this.threshold) { + this.streamFile = this.createStreamFile() + } + + if (this.streamFile) { + fsSync.writeSync(this.streamFile.fd, text) + this.streamedBytes += Buffer.byteLength(text, "utf-8") + } else { + this.output += text + } + + // Process filter if active + if (this.filter) { + this.lineBuffer += text + const lines = this.lineBuffer.split("\n") + this.lineBuffer = lines.pop() || "" + for (const line of lines) { + if (this.filter.test(line)) { + const entry = line + "\n" + this.filtered += entry + this.filteredBytes += Buffer.byteLength(entry, "utf-8") + this.filteredCount++ + } + } + } + + return this.preview() + } + + /** Get current preview - either full output, streaming indicator, or filter status */ + preview(): string { + if (this.filter) { + if (this.filtered) return this.filtered + return `[filtering: ${this.outputBytes} bytes, ${this.matchCount} matches...]\n` + } + if (this.streamFile) { + return `[streaming to file: ${this.streamedBytes} bytes written...]\n` + } + return this.output + } + + /** Whether output was streamed to file */ + get truncated(): boolean { + return this.streamFile !== undefined + } + + /** Total bytes written */ + get totalBytes(): number { + return this.streamFile ? this.streamedBytes : this.outputBytes + } + + /** Path to output file (if streaming) */ + get outputPath(): string | undefined { + return this.streamFile?.path + } + + /** Get the in-memory output (only valid if not truncated) */ + get inMemoryOutput(): string { + return this.output + } + + /** Get filtered output (only when filter is active) */ + get filteredOutput(): string { + return this.filtered + } + + /** Number of lines matching the filter */ + get matchCount(): number { + return this.filteredCount + } + + /** Bytes omitted by filtering */ + get omittedBytes(): number { + return this.totalBytes - this.filteredBytes + } + + /** Whether a filter is active */ + get hasFilter(): boolean { + return this.filter !== undefined + } + + /** Close the stream file if open. Call this after command completes. */ + close(): void { + // Process any remaining content in line buffer + if (this.filter && this.lineBuffer) { + if (this.filter.test(this.lineBuffer)) { + const entry = this.lineBuffer + "\n" + this.filtered += entry + this.filteredBytes += Buffer.byteLength(entry, "utf-8") + this.filteredCount++ + } + } + if (this.streamFile) { + fsSync.closeSync(this.streamFile.fd) + } + } + + /** Append metadata to output (either in memory or to file) */ + appendMetadata(text: string): void { + if (this.streamFile) { + fsSync.appendFileSync(this.streamFile.path, text) + } else { + this.output += text + } + } + + /** Get final output string (for non-truncated) or hint message (for truncated) */ + finalize(filterPattern?: string): string { + if (this.filter) { + if (this.streamFile) { + return `Filtered ${this.matchCount} matching lines from ${this.totalBytes} bytes of output.\nFull output saved to: ${this.streamFile.path}\nUse Grep to search or Read with offset/limit to view specific sections.\nNote: This file will be deleted after a few more commands. Copy it if you need to preserve it.` + } + return this.filtered || `[no matches for filter: ${filterPattern}]` + } + if (this.streamFile) { + return `The command output was ${this.streamedBytes} bytes and was truncated (inline limit: ${this.threshold} bytes).\nFull output saved to: ${this.streamFile.path}\nUse Grep to search the full content or Read with offset/limit to view specific sections.\nNote: This file will be deleted after a few more commands. Copy it if you need to preserve it.` + } + return this.output + } + + private createStreamFile(): { fd: number; path: string } | undefined { + let fd: number = -1 + try { + const dir = Truncate.DIR + fsSync.mkdirSync(dir, { recursive: true }) + Truncate.cleanup().catch(() => {}) + const filepath = path.join(dir, Identifier.ascending("tool")) + fd = fsSync.openSync(filepath, "w") + // Write existing buffered output to file + if (this.output) { + fsSync.writeSync(fd, this.output) + this.streamedBytes += Buffer.byteLength(this.output, "utf-8") + } + this.output = "" // Clear memory buffer + return { fd, path: filepath } + } catch (e) { + if (fd >= 0) fsSync.closeSync(fd) + log.warn("failed to create stream file, continuing in memory", { error: e }) + return undefined + } + } +} export namespace Truncate { export const MAX_LINES = 2000 diff --git a/packages/opencode/src/util/format.ts b/packages/opencode/src/util/format.ts index 4ae62eac450..c105fcdb738 100644 --- a/packages/opencode/src/util/format.ts +++ b/packages/opencode/src/util/format.ts @@ -1,3 +1,10 @@ +export function formatSize(bytes: number): string { + if (bytes < 1024) return `${bytes} B` + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB` + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MB` + return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)} GB` +} + export function formatDuration(secs: number) { if (secs <= 0) return "" if (secs < 60) return `${secs}s` diff --git a/packages/opencode/test/tool/bash.test.ts b/packages/opencode/test/tool/bash.test.ts index ac93016927a..1331b7849bd 100644 --- a/packages/opencode/test/tool/bash.test.ts +++ b/packages/opencode/test/tool/bash.test.ts @@ -349,7 +349,8 @@ describe("tool.bash truncation", () => { ) expect((result.metadata as any).truncated).toBe(true) expect(result.output).toContain("truncated") - expect(result.output).toContain("The tool call succeeded but the output was truncated") + // Streaming truncation uses different message format + expect(result.output).toContain("Full output saved to:") }, }) }) @@ -399,4 +400,199 @@ describe("tool.bash truncation", () => { }, }) }) + + test("streams to file during execution for very large output", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + // Generate 100KB of data - well over the 50KB threshold + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'x'`, + description: "Generate large streaming output", + }, + ctx, + ) + expect((result.metadata as any).truncated).toBe(true) + const filepath = (result.metadata as any).outputPath + expect(filepath).toBeTruthy() + + // Verify the full output was saved + const saved = await Bun.file(filepath).text() + expect(saved.length).toBe(byteCount) + expect(saved[0]).toBe("x") + expect(saved[byteCount - 1]).toBe("x") + }, + }) + }) + + test("preserves exit code when streaming to file", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'x'; exit 42`, + description: "Generate large output with non-zero exit", + }, + ctx, + ) + expect((result.metadata as any).truncated).toBe(true) + expect((result.metadata as any).exit).toBe(42) + }, + }) + }) + + test("streams stderr to file", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'e' >&2`, + description: "Generate large stderr output", + }, + ctx, + ) + expect((result.metadata as any).truncated).toBe(true) + const filepath = (result.metadata as any).outputPath + expect(filepath).toBeTruthy() + + const saved = await Bun.file(filepath).text() + expect(saved.length).toBe(byteCount) + expect(saved[0]).toBe("e") + }, + }) + }) + + test("output message contains file path when streaming", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'x'`, + description: "Check output message format", + }, + ctx, + ) + const filepath = (result.metadata as any).outputPath + expect(result.output).toContain("Full output saved to:") + expect(result.output).toContain(filepath) + }, + }) + }) + + test("output_filter captures matching lines in memory for small output", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + // Small build output with warnings/errors - stays in memory + const result = await bash.execute( + { + command: `echo "compiling..."; echo "warning: unused variable"; echo "done"; echo "error: type mismatch"`, + output_filter: "^(warning|error):", + description: "Build with filter (small)", + }, + ctx, + ) + + // Should NOT be truncated (small output stays in memory) + expect((result.metadata as any).truncated).toBe(false) + expect((result.metadata as any).filtered).toBe(true) + expect((result.metadata as any).matchCount).toBe(2) + + // The output should contain only the filtered lines + expect(result.output).toContain("warning: unused variable") + expect(result.output).toContain("error: type mismatch") + expect(result.output).not.toContain("compiling...") + expect(result.output).not.toContain("done") + }, + }) + }) + + test("output_filter streams to file when output exceeds threshold", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + // Generate large output that exceeds threshold + const byteCount = Truncate.MAX_BYTES * 2 + const result = await bash.execute( + { + command: `head -c ${byteCount} /dev/zero | tr '\\0' 'x'; echo ""; echo "warning: this is a warning"`, + output_filter: "^warning:", + description: "Build with filter (large)", + }, + ctx, + ) + + // Should be truncated (large output spills to file) + expect((result.metadata as any).truncated).toBe(true) + expect((result.metadata as any).filtered).toBe(true) + const filepath = (result.metadata as any).outputPath + expect(filepath).toBeTruthy() + + // The inline output should contain only the filtered line + expect(result.output).toContain("warning: this is a warning") + expect(result.output).toContain("Filtered 1 matching line") + + // The full output file should contain everything + const saved = await Bun.file(filepath).text() + expect(saved.length).toBeGreaterThan(byteCount) + expect(saved).toContain("warning: this is a warning") + }, + }) + }) + + test("output_filter with no matches returns empty filtered output", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + const result = await bash.execute( + { + command: `echo "all good"; echo "no problems here"`, + output_filter: "^(warning|error):", + description: "Build with no matches", + }, + ctx, + ) + + // Small output stays in memory, no truncation + expect((result.metadata as any).truncated).toBe(false) + expect((result.metadata as any).filtered).toBe(true) + expect((result.metadata as any).matchCount).toBe(0) + expect(result.output).toContain("[no matches for filter:") + }, + }) + }) + + test("invalid output_filter regex throws error", async () => { + await Instance.provide({ + directory: projectRoot, + fn: async () => { + const bash = await BashTool.init() + await expect( + bash.execute( + { + command: `echo test`, + output_filter: "[invalid(regex", + description: "Invalid regex test", + }, + ctx, + ), + ).rejects.toThrow("Invalid output_filter regex") + }, + }) + }) }) From ff830d949f62be783190294ef909ba11819debde Mon Sep 17 00:00:00 2001 From: root Date: Sat, 28 Feb 2026 15:14:50 +0800 Subject: [PATCH 3/5] fix: port AsyncQueue termination, bus cleanup, PTY chunking, and shutdown disposal Additional memory leak fixes for queue termination, event bus cleanup, PTY output chunking, and proper shutdown disposal. Co-authored-by: Keaton Hoskins Co-authored-by: Colin Walters --- packages/opencode/src/bus/index.ts | 20 ++++++++++--------- packages/opencode/src/index.ts | 7 +++++++ packages/opencode/src/pty/index.ts | 25 +++++++++++++---------- packages/opencode/src/util/queue.ts | 31 ++++++++++++++++++++++++++--- 4 files changed, 61 insertions(+), 22 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..1cf85d5d971 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -25,16 +25,18 @@ export namespace Bus { }, async (entry) => { const wildcard = entry.subscriptions.get("*") - if (!wildcard) return - const event = { - type: InstanceDisposed.type, - properties: { - directory: Instance.directory, - }, - } - for (const sub of [...wildcard]) { - sub(event) + if (wildcard) { + const event = { + type: InstanceDisposed.type, + properties: { + directory: Instance.directory, + }, + } + for (const sub of [...wildcard]) { + sub(event) + } } + entry.subscriptions.clear() }, ) diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 9af79278c06..69975a39454 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -33,6 +33,7 @@ import path from "path" import { Global } from "./global" import { JsonMigration } from "./storage/json-migration" import { Database } from "./storage/db" +import { Instance } from "./project/instance" process.on("unhandledRejection", (e) => { Log.Default.error("rejection", { @@ -202,6 +203,12 @@ try { } process.exitCode = 1 } finally { + // Dispose all instance-scoped resources (LSP clients, bus subscriptions, PTY sessions, etc.) + try { + await Instance.disposeAll() + } catch { + // best-effort cleanup + } // Some subprocesses don't react properly to SIGTERM and similar signals. // Most notably, some docker-container-based MCP servers don't handle such signals unless // run using `docker run --init`. diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index dee3fbc5429..a6cfb3b305e 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -84,7 +84,8 @@ export namespace Pty { interface ActiveSession { info: Info process: IPty - buffer: string + bufferChunks: string[] + bufferLen: number bufferCursor: number cursor: number subscribers: Map @@ -161,7 +162,8 @@ export namespace Pty { const session: ActiveSession = { info, process: ptyProcess, - buffer: "", + bufferChunks: [], + bufferLen: 0, bufferCursor: 0, cursor: 0, subscribers: new Map(), @@ -188,11 +190,13 @@ export namespace Pty { } } - session.buffer += chunk - if (session.buffer.length <= BUFFER_LIMIT) return - const excess = session.buffer.length - BUFFER_LIMIT - session.buffer = session.buffer.slice(excess) - session.bufferCursor += excess + session.bufferChunks.push(chunk) + session.bufferLen += chunk.length + while (session.bufferLen > BUFFER_LIMIT && session.bufferChunks.length > 1) { + const removed = session.bufferChunks.shift()! + session.bufferLen -= removed.length + session.bufferCursor += removed.length + } }) ptyProcess.onExit(({ exitCode }) => { log.info("session exited", { id, exitCode }) @@ -285,11 +289,12 @@ export namespace Pty { cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0 const data = (() => { - if (!session.buffer) return "" + if (session.bufferChunks.length === 0) return "" if (from >= end) return "" + const combined = session.bufferChunks.join("") const offset = Math.max(0, from - start) - if (offset >= session.buffer.length) return "" - return session.buffer.slice(offset) + if (offset >= combined.length) return "" + return combined.slice(offset) })() if (data) { diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f0..daf1cc4d49b 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,20 +1,45 @@ +const DONE = Symbol("queue.done") + export class AsyncQueue implements AsyncIterable { private queue: T[] = [] - private resolvers: ((value: T) => void)[] = [] + private resolvers: ((value: T | typeof DONE) => void)[] = [] + private closed = false push(item: T) { + if (this.closed) return const resolve = this.resolvers.shift() if (resolve) resolve(item) else this.queue.push(item) } - async next(): Promise { + close() { + if (this.closed) return + this.closed = true + for (const resolve of this.resolvers) { + resolve(DONE) + } + this.resolvers.length = 0 + this.queue.length = 0 + } + + drain(): T[] { + const items = [...this.queue] + this.queue.length = 0 + return items + } + + async next(): Promise { + if (this.closed) return DONE if (this.queue.length > 0) return this.queue.shift()! return new Promise((resolve) => this.resolvers.push(resolve)) } async *[Symbol.asyncIterator]() { - while (true) yield await this.next() + while (!this.closed) { + const value = await this.next() + if (value === DONE) return + yield value as T + } } } From 763998b18610d5fd0e2eec908ea9334079ec5655 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 28 Feb 2026 16:01:58 +0800 Subject: [PATCH 4/5] fix: skip Unix-specific bash tests on Windows --- packages/opencode/test/tool/bash.test.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/opencode/test/tool/bash.test.ts b/packages/opencode/test/tool/bash.test.ts index 1331b7849bd..1b7d082f2aa 100644 --- a/packages/opencode/test/tool/bash.test.ts +++ b/packages/opencode/test/tool/bash.test.ts @@ -402,6 +402,8 @@ describe("tool.bash truncation", () => { }) test("streams to file during execution for very large output", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return await Instance.provide({ directory: projectRoot, fn: async () => { @@ -429,6 +431,8 @@ describe("tool.bash truncation", () => { }) test("preserves exit code when streaming to file", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return await Instance.provide({ directory: projectRoot, fn: async () => { @@ -448,6 +452,8 @@ describe("tool.bash truncation", () => { }) test("streams stderr to file", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return await Instance.provide({ directory: projectRoot, fn: async () => { @@ -472,6 +478,8 @@ describe("tool.bash truncation", () => { }) test("output message contains file path when streaming", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return await Instance.provide({ directory: projectRoot, fn: async () => { @@ -521,6 +529,8 @@ describe("tool.bash truncation", () => { }) test("output_filter streams to file when output exceeds threshold", async () => { + // Skip on Windows - uses /dev/zero and head -c which are Unix-specific + if (process.platform === "win32") return await Instance.provide({ directory: projectRoot, fn: async () => { From ab406e549cfeb2c3945c0ab1aac96bfe0f4f24b8 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 28 Feb 2026 16:20:45 +0800 Subject: [PATCH 5/5] fix: make output_filter small-output test shell-safe on Windows --- packages/opencode/test/tool/bash.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/opencode/test/tool/bash.test.ts b/packages/opencode/test/tool/bash.test.ts index 1b7d082f2aa..13560f6e144 100644 --- a/packages/opencode/test/tool/bash.test.ts +++ b/packages/opencode/test/tool/bash.test.ts @@ -507,7 +507,8 @@ describe("tool.bash truncation", () => { // Small build output with warnings/errors - stays in memory const result = await bash.execute( { - command: `echo "compiling..."; echo "warning: unused variable"; echo "done"; echo "error: type mismatch"`, + command: + "bun -e \"console.log('compiling...');console.log('warning: unused variable');console.log('done');console.log('error: type mismatch')\"", output_filter: "^(warning|error):", description: "Build with filter (small)", },