perf(run-events): unblock bus dispatch with runFork in subscriber callbacks (F7)#10
perf(run-events): unblock bus dispatch with runFork in subscriber callbacks (F7)#10tesdal wants to merge 1 commit intophase-ab-basefrom
Conversation
|
Hey! Your PR title Please update it to start with one of:
Where See CONTRIBUTING.md for details. |
There was a problem hiding this comment.
Pull request overview
This PR addresses audit finding F7 by preventing Bus.subscribeCallback handlers in RunEvents from serializing event dispatch when the subscriber callback returns a Promise (via Effect.runPromise). It switches the handlers to fork work synchronously so the bus dispatch loop can continue without waiting for each handler to complete.
Changes:
- Replace
Effect.runPromise(...)with arunFork-based helper in theQuestion.Event.AskedandPermission.Event.Askedsubscribers to avoid per-subscription serialization. - Add
Effect.tapCause(...)logging to surface handler failures/defects that no longer propagate through the bus callback wrapper. - Deduplicate handler launch logic via a local
forkhelper.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Bus.subscribeCallback (src/bus/index.ts:126-152) wraps the callback return | ||
| // in `Effect.tryPromise(() => Promise.resolve().then(() => callback(msg)))` | ||
| // inside `runForEach`, so a Promise-returning callback (like Effect.runPromise) | ||
| // serializes handler completion per subscription. runFork returns a Fiber | ||
| // synchronously (non-thenable), unblocking dispatch so descendant question/ | ||
| // permission events are processed concurrently — important for long-running | ||
| // subagent loops with many simultaneous descendants. Defects inside the fiber | ||
| // no longer surface through Bus.on's tryPromise.catch, so log them here. |
There was a problem hiding this comment.
The explanatory comment references “Bus.on's tryPromise.catch”, but the implementation is an internal on() helper using Effect.tryPromise({ try, catch }) (no .catch method and no exported Bus.on). Tweaking the wording to match the actual code (or avoiding hardcoded line ranges) would reduce confusion/drift when bus/index.ts changes.
| // Bus.subscribeCallback (src/bus/index.ts:126-152) wraps the callback return | |
| // in `Effect.tryPromise(() => Promise.resolve().then(() => callback(msg)))` | |
| // inside `runForEach`, so a Promise-returning callback (like Effect.runPromise) | |
| // serializes handler completion per subscription. runFork returns a Fiber | |
| // synchronously (non-thenable), unblocking dispatch so descendant question/ | |
| // permission events are processed concurrently — important for long-running | |
| // subagent loops with many simultaneous descendants. Defects inside the fiber | |
| // no longer surface through Bus.on's tryPromise.catch, so log them here. | |
| // bus.subscribeCallback wraps the callback in an Effect.tryPromise-based | |
| // subscription handler, so a Promise-returning callback (like Effect.runPromise) | |
| // serializes handler completion per subscription. runFork returns a Fiber | |
| // synchronously (non-thenable), unblocking dispatch so descendant question/ | |
| // permission events are processed concurrently — important for long-running | |
| // subagent loops with many simultaneous descendants. Defects inside the forked | |
| // fiber do not surface through that subscription callback wrapper, so log them | |
| // here instead. |
| const fork = (effect: Effect.Effect<void>) => | ||
| Effect.runFork( | ||
| effect.pipe(Effect.tapCause((cause) => Effect.sync(() => log.error("handler failed", { cause })))), | ||
| ) |
There was a problem hiding this comment.
fork() spawns detached fibers via Effect.runFork, but unsubscribe() only removes the bus subscriptions; any already-forked handler fibers will continue running and may still reject/reply after unsubscribe() (and potentially keep work alive during shutdown). Consider tracking the returned fibers and interrupting them in unsubscribe(), or forking them into a scope that you close on unsubscribe so handler work is reliably bounded to the RunEvents lifecycle.
5b5228f to
856363c
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| effect.pipe( | ||
| Effect.tapCause((cause) => Effect.sync(() => log.error("handler failed", { cause }))), | ||
| Effect.ensuring(Effect.sync(() => inflight.delete(fiber))), | ||
| ), |
There was a problem hiding this comment.
Effect.tapCause will also run when the forked fiber is interrupted (e.g. via unsubscribe()), so normal shutdown will be logged as handler failed. Consider filtering out interrupt-only causes (e.g. Cause.hasInterruptsOnly(cause)) so only real handler failures/defects are logged.
856363c to
cea916b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Effect.ensuring(Effect.sync(() => inflight.delete(fiber))), | ||
| ), | ||
| ) | ||
| inflight.add(fiber) |
There was a problem hiding this comment.
fork() adds the fiber to inflight after starting it, but removal is wired via Effect.ensuring inside the forked effect. If the handler effect completes synchronously (or fails fast), the ensuring finalizer can run before inflight.add(fiber) executes, leaving a completed fiber permanently stuck in inflight (and potentially triggering a TDZ ReferenceError if the finalizer runs before fiber is assigned). Consider registering cleanup in a way that cannot race with insertion (e.g., add to the set immediately after creating the fiber, then attach a completion watcher such as awaiting the fiber and deleting it, or using a fiber observer API if available).
| Effect.ensuring(Effect.sync(() => inflight.delete(fiber))), | |
| ), | |
| ) | |
| inflight.add(fiber) | |
| ), | |
| ) | |
| inflight.add(fiber) | |
| Effect.runFork( | |
| Fiber.await(fiber).pipe( | |
| Effect.ensuring(Effect.sync(() => inflight.delete(fiber))), | |
| ), | |
| ) |
| const unsubscribe = () => { | ||
| unsubQuestion() | ||
| unsubPermission() | ||
| inflight.forEach((fiber) => Effect.runFork(Fiber.interrupt(fiber))) | ||
| inflight.clear() |
There was a problem hiding this comment.
unsubscribe() calls unsubQuestion() / unsubPermission() but those ultimately close the Bus subscription scope asynchronously (see Bus.on uses bridge.fork(Scope.close(...))). That means callbacks may still be mid-flight and can still call fork(...) while unsubscribe() is iterating/clearing inflight, causing fibers to escape interruption and outlive the RunEvents lifecycle. Consider adding a local closed flag checked by fork() (skip/interrupt immediately when closed) and avoid inflight.clear() so late-arriving fibers still get cleaned up by the completion handler.
| @@ -98,7 +123,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { | |||
| ) | |||
|
|
|||
| const unsubPermission = yield* bus.subscribeCallback(Permission.Event.Asked, (evt) => | |||
| Effect.runPromise( | |||
| fork( | |||
| Effect.gen(function* () { | |||
| const mine = yield* isDescendant(evt.properties.sessionID) | |||
| if (!mine) return | |||
| @@ -115,6 +140,8 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { | |||
| const unsubscribe = () => { | |||
| unsubQuestion() | |||
| unsubPermission() | |||
| inflight.forEach((fiber) => Effect.runFork(Fiber.interrupt(fiber))) | |||
| inflight.clear() | |||
| } | |||
There was a problem hiding this comment.
This change introduces new lifecycle behavior (forking handlers + tracking/interrupting inflight fibers + logging via tapCause) but the existing tests don’t appear to assert that unsubscribe() actually interrupts long-running in-flight handlers, or that inflight is cleaned up after handler completion. Adding a focused test that publishes many events with an intentionally blocking handler and verifies dispatch is unblocked + fibers are interrupted on unsubscribe would help prevent regressions of the audit finding.
cea916b to
0696184
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const fiber = Effect.runFork( | ||
| effect.pipe( | ||
| Effect.tapCause((cause) => | ||
| Cause.hasInterruptsOnly(cause) | ||
| ? Effect.void | ||
| : Effect.sync(() => log.error("handler failed", { cause })), | ||
| ), | ||
| ), | ||
| ) | ||
| if (closed) { | ||
| // unsubscribe() already ran but bus subscription teardown is async, so | ||
| // a late callback can still reach fork(). Interrupt immediately. | ||
| Effect.runFork(Fiber.interrupt(fiber)) | ||
| return fiber |
There was a problem hiding this comment.
fork() starts the handler fiber before checking closed. If a bus callback arrives after unsubscribe() sets closed = true, this can still run side effects (e.g., bumping stats / rejecting) before the subsequent interrupt executes, which undermines the guarantee implied by the closed flag and the added lifecycle test. Consider checking closed before calling Effect.runFork(effect) and returning a no-op (or a trivially interrupted fiber) so that post-unsubscribe callbacks cannot start handler work at all.
0696184 to
4d0ea5f
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // unsubscribe() already ran but bus subscription teardown is async, so | ||
| // a late callback can still reach fork(). Skip starting the handler | ||
| // entirely so no side effects (bump, reject, reply) leak past teardown. | ||
| return Effect.runFork(Effect.void) |
There was a problem hiding this comment.
In the closed branch, fork() says it will “skip starting the handler entirely”, but it still spawns a no-op fiber via Effect.runFork(Effect.void). Since the bus wrapper only needs a non-Promise return to unblock dispatch, returning undefined/void here would avoid creating fibers after teardown and better match the intent/comment.
| return Effect.runFork(Effect.void) | |
| return |
| const pending = yield* pollForLength(() => question.list(), 1) | ||
| expect(pending[0].sessionID).toBe(rootSessionID) | ||
| expect(handler.stats.autoRejectedQuestions).toBe(0) | ||
|
|
There was a problem hiding this comment.
This test can pass even if a late bus callback auto-rejects after the initial stats assertion (e.g., unsubscribe teardown is async). To make it reliably assert the intended lifecycle behavior, add a short wait and re-check that the question is still pending and autoRejectedQuestions is still 0 before manually rejecting.
| // Give any late async bus callbacks a chance to run, then verify the | |
| // question is still pending and no auto-reject occurred. | |
| yield* Effect.sleep("50 millis") | |
| const stillPending = yield* question.list() | |
| expect(stillPending).toHaveLength(1) | |
| expect(stillPending[0].id).toBe(pending[0].id) | |
| expect(stillPending[0].sessionID).toBe(rootSessionID) | |
| expect(handler.stats.autoRejectedQuestions).toBe(0) |
4d0ea5f to
e184720
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // bus unsubscription path and the `closed` flag in fork() that interrupts | ||
| // any handler reaching it after teardown has begun. |
There was a problem hiding this comment.
The test comment says the closed flag in fork() “interrupts” late handlers, but the implementation actually skips starting the handler when closed is true (no interrupt occurs in that branch). Consider rewording to avoid misleading future readers (e.g., “prevents late callbacks from forking new handler fibers after teardown begins”).
| // bus unsubscription path and the `closed` flag in fork() that interrupts | |
| // any handler reaching it after teardown has begun. | |
| // bus unsubscription path and the `closed` flag in fork() that prevents | |
| // late callbacks from forking new handler fibers after teardown has begun. |
…lbacks Bus.subscribeCallback wraps the callback's return value in Effect.tryPromise(() => Promise.resolve().then(() => callback(msg))) inside a runForEach loop, so a Promise-returning callback (such as Effect.runPromise) serializes handler completion per subscription — each descendant question/permission event waits for the previous handler's reject/reply round trip before dispatch advances. Replace Effect.runPromise with Effect.runFork. runFork returns a Fiber synchronously (non-thenable), so the bus's tryPromise resolves immediately and the next event dispatches without waiting. Handler defects no longer surface through Bus.on's tryPromise.catch, so wrap the forked effect with Effect.tapCause + log.error. Drops the per-event Promise wrapper allocation and unblocks concurrent dispatch for long-running subagent loops with many simultaneous descendants. Addresses audit finding F7 (Opus diamond review, 2026-04-22).
e184720 to
7779646
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
F7 review complete after 7 Copilot rounds. All comments resolved. Merging local/audit-f7-runevents-runtime into local/integration-v2 via --no-ff. |
PR: #10 (closed, review-only) Diamond: codex-5.3 + Opus both APPROVED Copilot: 7 rounds, R7 clean (no new comments)
Summary
Addresses audit finding F7 (Opus diamond review, 2026-04-22) on the opencode subagent-hang hardening branch.
packages/opencode/src/cli/cmd/run-events.tswas invokingEffect.runPromise(Effect.gen(...))inside each bus subscriber callback.Bus.subscribeCallback(src/bus/index.ts:126-152) wraps the callback's return value inEffect.tryPromise(() => Promise.resolve().then(() => callback(msg)))insiderunForEach, so aPromise-returning callback serializes handler completion per subscription — each descendant question/permission event waits for the previous handler's reject/reply round-trip before dispatch advances.Change
Effect.runPromise→Effect.runForkin bothQuestion.Event.AskedandPermission.Event.Askedsubscribers.runForkreturns a Fiber synchronously (non-thenable), so the bus'stryPromiseresolves immediately and the next event dispatches without waiting.Effect.tapCause((cause) => log.error("handler failed", { cause }))because handler defects no longer surface throughBus.on'stryPromise.catch.forkhelper to deduplicate the two near-identical handler shapes.Diamond review
Both
codex-5.3(spec/correctness) andgeneral(Opus, quality/design) reviewers were run.Effect.tapCausedefect logging added. Opus also caught a 130-char line-length violation (repo limit 120) which was wrapped via amend.Verification
bun typecheckclean (frompackages/opencode/).bun test test/cli/run-events.test.ts→ 12 pass, 0 fail.This is review-only — will be closed and
--no-ffmerged intolocal/integration-v2after Copilot iteration.