Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions packages/bcode-laminar/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,45 @@ export const LaminarPlugin: Plugin = ({ client }) => {
// was historically being left un-ended and never exported. The host calls
// this hook from its top-level finally before forceFlush, so span.end()
// here gets its export drained by the host's existing forceFlush race.
shutdown: () => {
// Use console.error not client.app.log — the SDK server may already be
// torn down by shutdown time, and the host log is async via HTTP which
// doesn't honor the sync shutdown contract. v4-worker captures stderr
// into bcode-output-<runId>.log so this lands in the cloud artifact.
shutdown: async () => {
// End any still-open turn spans synchronously, then drain the inner
// BatchSpanProcessor. The host awaits this Promise with a bounded
// race so a wedged exporter cannot hang `process.exit()`.
//
// This runs AFTER the Effect runtime has torn down. The bus-based
// session.idle / server.instance.disposed handlers may have already
// emptied `sessionCurrentTurnSpan` and unregistered the global
// TracerProvider via `sdk.shutdown()`. Either way, the BSP itself
// still has its queue intact and can drain — we hold a direct ref
// to `processor` via closure.
//
// stderr writes go to v4-worker's bcode-output-<runId>.log so cloud
// verification can see whether this path executed. Temporary, will
// be removed once headless V4 telemetry is settled.
const sessionIds = Object.keys(sessionCurrentTurnSpan)
console.error(`[bcode-laminar] shutdown invoked: ending ${sessionIds.length} turn span(s)`)
process.stderr.write(`[bcode-laminar] shutdown: ending ${sessionIds.length} open turn span(s)\n`)
for (const sessionId of sessionIds) {
const span = sessionCurrentTurnSpan[sessionId]
if (!span) continue
try {
span.end()
} catch (err) {
console.error(`[bcode-laminar] span.end() threw for session ${sessionId}: ${(err as Error).message}`)
process.stderr.write(
`[bcode-laminar] span.end threw for session ${sessionId}: ${(err as Error).message}\n`,
)
}
delete sessionCurrentTurnSpan[sessionId]
}
console.error(`[bcode-laminar] shutdown complete`)
process.stderr.write(`[bcode-laminar] shutdown: forceFlush start\n`)
const start = Date.now()
try {
await processor.forceFlush()
process.stderr.write(`[bcode-laminar] shutdown: forceFlush done in ${Date.now() - start}ms\n`)
} catch (err) {
process.stderr.write(
`[bcode-laminar] shutdown: forceFlush threw after ${Date.now() - start}ms: ${(err as Error).message}\n`,
)
}
},
event: async ({ event }) => {
switch (event.type) {
Expand All @@ -113,16 +134,20 @@ export const LaminarPlugin: Plugin = ({ client }) => {
break
}
case "server.instance.disposed": {
// End any turn spans still open so they're flushed before shutdown.
// End any turn spans still open so they're queued before the host
// calls our `shutdown` hook.
//
// Do NOT call `sdk.shutdown()` here. It unregisters the global
// TracerProvider AND closes the BatchSpanProcessor — both
// observed to fire mid-await in headless `bcode run` mode, after
// which the sync `shutdown` hook's `processor.forceFlush()` is a
// no-op and turn spans are silently dropped. The sync hook is now
// the single drain point; this handler just ends spans.
for (const [sessionId, span] of Object.entries(sessionCurrentTurnSpan)) {
span.end()
delete sessionCurrentTurnSpan[sessionId]
}
for (const key of Object.keys(subagentSessionIds)) delete subagentSessionIds[key]
// sdk.shutdown() drains the inner BatchSpanProcessor and exporter
// and removes the global TracerProvider; explicit processor.shutdown()
// is redundant but harmless.
await sdk.shutdown().catch(() => {})
break
}
case "session.created":
Expand Down
63 changes: 29 additions & 34 deletions packages/opencode/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import { PluginCommand } from "./cli/cmd/plug"
import { Heap } from "./cli/heap"
import { drizzle } from "drizzle-orm/bun-sqlite"
import { ensureProcessMetadata } from "@opencode-ai/core/util/opencode-process"
import { trace } from "@opentelemetry/api"
import { isRecord } from "@/util/record"

const processMetadata = ensureProcessMetadata("main")
Expand Down Expand Up @@ -257,49 +256,45 @@ try {
// (see packages/opencode/src/plugin/index.ts pluginShutdownHooks).
// The import is wrapped so a module-load failure can't strand the process
// before forceFlush + process.exit() below.
// Plugin shutdown hooks are the single drain point. Each hook may return
// a Promise (e.g. bcode-laminar returning its BatchSpanProcessor's
// forceFlush) which we await with a 3s budget so a wedged exporter can't
// hang `process.exit()`. The previous fallback that called
// `trace.getTracerProvider().forceFlush()` was a no-op — the OTel API
// proxy provider doesn't expose forceFlush, and the bus-driven
// `server.instance.disposed` handler had often already called
// `sdk.shutdown()` and unregistered the global by the time we got here.
//
// stderr writes go to v4-worker's bcode-output-<runId>.log so cloud
// verification can see whether this path executed and how long it took.
// Will be removed once headless V4 telemetry is fully settled.
try {
const { pluginShutdownHooks } = await import("./plugin")
// Diagnostic via stderr: v4-worker captures into bcode-output-<runId>.log
// and we need to know whether this branch is even reached + how many hooks
// ran. Remove once V4 telemetry verification is settled.
process.stderr.write(`[bcode] shutdown: invoking ${pluginShutdownHooks.size} plugin shutdown hook(s)\n`)
let invoked = 0
for (const hook of pluginShutdownHooks) {
try {
hook()
invoked++
} catch (err) {
Log.Default.error("plugin shutdown hook failed", { error: err })
process.stderr.write(`[bcode] shutdown: hook threw: ${(err as Error).message}\n`)
}
}
process.stderr.write(`[bcode] shutdown: invoked ${invoked}/${pluginShutdownHooks.size} hook(s) successfully\n`)
} catch (err) {
Log.Default.error("plugin shutdown import failed", { error: err })
process.stderr.write(`[bcode] shutdown: import failed: ${(err as Error).message}\n`)
}
// Drain any registered OTel span processors (e.g. bcode-laminar) before
// exiting so the just-ended turn spans actually hit the wire. Bounded with
// a 3 s race so a wedged exporter cannot hang bcode on exit. Generic to any
// OTel-based plugin, not laminar-specific.
const provider = trace.getTracerProvider() as { forceFlush?: () => Promise<void> }
if (provider.forceFlush) {
process.stderr.write(`[bcode] shutdown: forceFlush starting\n`)
const hooks = Array.from(pluginShutdownHooks)
process.stderr.write(`[bcode] shutdown: invoking ${hooks.length} plugin shutdown hook(s)\n`)
const start = Date.now()
await Promise.race([
provider.forceFlush().catch((err: Error) => {
process.stderr.write(`[bcode] shutdown: forceFlush rejected: ${err.message}\n`)
}),
Promise.allSettled(
hooks.map((hook) =>
Promise.resolve()
.then(hook)
.catch((err: Error) => {
process.stderr.write(`[bcode] shutdown: hook threw: ${err.message}\n`)
Log.Default.error("plugin shutdown hook failed", { error: err })
}),
),
),
new Promise<void>((resolve) =>
setTimeout(() => {
process.stderr.write(`[bcode] shutdown: forceFlush timed out after 3000ms\n`)
process.stderr.write(`[bcode] shutdown: timed out after 3000ms\n`)
resolve()
}, 3000),
),
])
process.stderr.write(`[bcode] shutdown: forceFlush done in ${Date.now() - start}ms\n`)
} else {
process.stderr.write(`[bcode] shutdown: no forceFlush on global provider\n`)
process.stderr.write(`[bcode] shutdown: done in ${Date.now() - start}ms\n`)
} catch (err) {
Log.Default.error("plugin shutdown import failed", { error: err })
process.stderr.write(`[bcode] shutdown: import failed: ${(err as Error).message}\n`)
}
// Some subprocesses don't react properly to SIGTERM and similar signals.
// Most notably, some docker-container-based MCP servers don't handle such signals unless
Expand Down
19 changes: 12 additions & 7 deletions packages/plugin/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,19 @@ export interface Hooks {
event?: (input: { event: Event }) => Promise<void>
config?: (input: Config) => Promise<void>
/**
* Synchronous shutdown hook invoked once per process before
* `process.exit()`, after the event loop has finished its last task and
* before the host's OTel span exporter drain. Use this to end any
* still-open OTel spans your plugin created — async work is not honored
* here, but ending a span (`span.end()`) is synchronous and the host's
* `forceFlush` runs right after this hook.
* Shutdown hook invoked once per process before `process.exit()`. Runs
* AFTER the Effect runtime has torn down (so bus event handlers can no
* longer be relied upon) and BEFORE `process.exit()`.
*
* Returning a Promise lets the host drain async work — e.g. an OTel span
* exporter's `forceFlush()` — before the process actually exits. The host
* races each returned Promise against a bounded timeout so a wedged hook
* cannot hang shutdown.
*
* Use this to end any still-open spans your plugin created and, if you
* own a span exporter, return its `forceFlush()` Promise.
*/
shutdown?: () => void
shutdown?: () => void | Promise<void>
tool?: {
[key: string]: ToolDefinition
}
Expand Down
Loading