From 77796461305673697ad02b7f17f3b7649126ed65 Mon Sep 17 00:00:00 2001 From: Helge Tesdal Date: Mon, 27 Apr 2026 10:04:02 +0200 Subject: [PATCH] perf(run-events): unblock bus dispatch with runFork in subscriber callbacks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bus.subscribeCallback wraps the callback's return value in Effect.tryPromise(() => Promise.resolve().then(() => callback(msg))) inside a runForEach loop, so a Promise-returning callback (such as Effect.runPromise) serializes handler completion per subscription — each descendant question/permission event waits for the previous handler's reject/reply round trip before dispatch advances. Replace Effect.runPromise with Effect.runFork. runFork returns a Fiber synchronously (non-thenable), so the bus's tryPromise resolves immediately and the next event dispatches without waiting. Handler defects no longer surface through Bus.on's tryPromise.catch, so wrap the forked effect with Effect.tapCause + log.error. Drops the per-event Promise wrapper allocation and unblocks concurrent dispatch for long-running subagent loops with many simultaneous descendants. Addresses audit finding F7 (Opus diamond review, 2026-04-22). --- packages/opencode/src/cli/cmd/run-events.ts | 47 +++++++++++++++++-- packages/opencode/test/cli/run-events.test.ts | 46 ++++++++++++++++++ 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/packages/opencode/src/cli/cmd/run-events.ts b/packages/opencode/src/cli/cmd/run-events.ts index bb1c36c32101..791a157902ac 100644 --- a/packages/opencode/src/cli/cmd/run-events.ts +++ b/packages/opencode/src/cli/cmd/run-events.ts @@ -1,4 +1,4 @@ -import { Effect, Option } from "effect" +import { Cause, Effect, Fiber, Option } from "effect" import { Bus } from "@/bus" import { Permission } from "@/permission" import { Question } from "@/question" @@ -86,8 +86,44 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { } } + // bus.subscribeCallback wraps the callback in an Effect.tryPromise-based + // subscription handler, so a Promise-returning callback (like Effect.runPromise) + // serializes handler completion per subscription. runFork returns a Fiber + // synchronously (non-thenable), unblocking dispatch so descendant question/ + // permission events are processed concurrently — important for long-running + // subagent loops with many simultaneous descendants. Defects inside the forked + // fiber do not surface through that subscription callback wrapper, so log them + // here instead. Track in-flight fibers so unsubscribe() can interrupt them and + // bound handler work to the RunEvents lifecycle. + const inflight = new Set>() + let closed = false + const fork = (effect: Effect.Effect) => { + if (closed) { + // unsubscribe() already ran but bus subscription teardown is async, so + // a late callback can still reach fork(). Skip starting the handler + // entirely so no side effects (bump, reject, reply) leak past teardown. + // Returning undefined (not a Promise) still unblocks the bus dispatch + // wrapper without spawning a no-op fiber. + return + } + const fiber = Effect.runFork( + effect.pipe( + Effect.tapCause((cause) => + Cause.hasInterruptsOnly(cause) + ? Effect.void + : Effect.sync(() => log.error("handler failed", { cause })), + ), + ), + ) + inflight.add(fiber) + // Register cleanup outside the forked effect to avoid a TDZ/race between + // synchronous fiber completion and inflight.add — Fiber.await observes + // completion regardless of how fast the fiber runs. + Effect.runFork(Fiber.await(fiber).pipe(Effect.ensuring(Effect.sync(() => inflight.delete(fiber))))) + } + const unsubQuestion = yield* bus.subscribeCallback(Question.Event.Asked, (evt) => - Effect.runPromise( + fork( Effect.gen(function* () { const mine = yield* isDescendant(evt.properties.sessionID) if (!mine) return @@ -98,7 +134,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { ) const unsubPermission = yield* bus.subscribeCallback(Permission.Event.Asked, (evt) => - Effect.runPromise( + fork( Effect.gen(function* () { const mine = yield* isDescendant(evt.properties.sessionID) if (!mine) return @@ -113,8 +149,13 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { ) const unsubscribe = () => { + closed = true unsubQuestion() unsubPermission() + inflight.forEach((fiber) => Effect.runFork(Fiber.interrupt(fiber))) + // Don't clear() — let the per-fiber Fiber.await observers remove entries + // as their interrupts settle, so any stragglers caught by the closed-flag + // branch above still get cleaned up correctly. } return { stats, unsubscribe } satisfies Handle diff --git a/packages/opencode/test/cli/run-events.test.ts b/packages/opencode/test/cli/run-events.test.ts index 5b5d2cb15d11..ad0ebb804916 100644 --- a/packages/opencode/test/cli/run-events.test.ts +++ b/packages/opencode/test/cli/run-events.test.ts @@ -523,4 +523,50 @@ describe("cli/run-events", () => { }), ), ) + + // Lifecycle test for F7 fiber-tracking: after unsubscribe(), late-arriving + // bus callbacks must not produce new auto-rejects. This exercises both the + // bus unsubscription path and the `closed` flag in fork() that prevents + // late callbacks from forking new handler fibers after teardown has begun. + it.live("does not auto-reject question.asked after unsubscribe()", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const question = yield* Question.Service + const rootSessionID = SessionID.make("ses_root_post_unsub_000000000000") + const handler = yield* RunEvents.make({ + rootSessionID, + skipPermissions: false, + jsonMode: false, + }) + yield* Effect.sync(() => handler.unsubscribe()) + + // Ask after unsubscribe — without the bus subscription, no auto-reject + // handler runs and the question stays pending. + const fiber = yield* question + .ask({ + sessionID: rootSessionID, + questions: [{ question: "post?", header: "h", options: [{ label: "x", description: "x" }] }], + }) + .pipe(Effect.forkScoped) + + const pending = yield* pollForLength(() => question.list(), 1) + expect(pending[0].sessionID).toBe(rootSessionID) + expect(handler.stats.autoRejectedQuestions).toBe(0) + + // Give any late async bus callbacks a chance to run, then verify the + // question is still pending and no auto-reject occurred. + yield* Effect.sleep("50 millis") + const stillPending = yield* question.list() + expect(stillPending).toHaveLength(1) + expect(stillPending[0].id).toBe(pending[0].id) + expect(stillPending[0].sessionID).toBe(rootSessionID) + expect(handler.stats.autoRejectedQuestions).toBe(0) + + // Manually clear the pending question so the test can finish. + yield* question.reject(pending[0].id) + const exit = yield* Fiber.await(fiber) + expect(Exit.isFailure(exit)).toBe(true) + }), + ), + ) })