-
Notifications
You must be signed in to change notification settings - Fork 0
perf(run-events): unblock bus dispatch with runFork in subscriber callbacks (F7) #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| import { Effect, Option } from "effect" | ||
| import { Cause, Effect, Fiber, Option } from "effect" | ||
| import { Bus } from "@/bus" | ||
| import { Permission } from "@/permission" | ||
| import { Question } from "@/question" | ||
|
|
@@ -86,8 +86,44 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { | |
| } | ||
| } | ||
|
|
||
| // bus.subscribeCallback wraps the callback in an Effect.tryPromise-based | ||
| // subscription handler, so a Promise-returning callback (like Effect.runPromise) | ||
| // serializes handler completion per subscription. runFork returns a Fiber | ||
| // synchronously (non-thenable), unblocking dispatch so descendant question/ | ||
| // permission events are processed concurrently — important for long-running | ||
| // subagent loops with many simultaneous descendants. Defects inside the forked | ||
| // fiber do not surface through that subscription callback wrapper, so log them | ||
| // here instead. Track in-flight fibers so unsubscribe() can interrupt them and | ||
| // bound handler work to the RunEvents lifecycle. | ||
| const inflight = new Set<Fiber.Fiber<void>>() | ||
| let closed = false | ||
| const fork = (effect: Effect.Effect<void>) => { | ||
| if (closed) { | ||
| // unsubscribe() already ran but bus subscription teardown is async, so | ||
| // a late callback can still reach fork(). Skip starting the handler | ||
| // entirely so no side effects (bump, reject, reply) leak past teardown. | ||
| // Returning undefined (not a Promise) still unblocks the bus dispatch | ||
| // wrapper without spawning a no-op fiber. | ||
| return | ||
| } | ||
| const fiber = Effect.runFork( | ||
| effect.pipe( | ||
| Effect.tapCause((cause) => | ||
| Cause.hasInterruptsOnly(cause) | ||
| ? Effect.void | ||
| : Effect.sync(() => log.error("handler failed", { cause })), | ||
| ), | ||
| ), | ||
| ) | ||
| inflight.add(fiber) | ||
| // Register cleanup outside the forked effect to avoid a TDZ/race between | ||
| // synchronous fiber completion and inflight.add — Fiber.await observes | ||
| // completion regardless of how fast the fiber runs. | ||
| Effect.runFork(Fiber.await(fiber).pipe(Effect.ensuring(Effect.sync(() => inflight.delete(fiber))))) | ||
| } | ||
|
|
||
| const unsubQuestion = yield* bus.subscribeCallback(Question.Event.Asked, (evt) => | ||
| Effect.runPromise( | ||
| fork( | ||
| Effect.gen(function* () { | ||
| const mine = yield* isDescendant(evt.properties.sessionID) | ||
| if (!mine) return | ||
|
|
@@ -98,7 +134,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { | |
| ) | ||
|
|
||
| const unsubPermission = yield* bus.subscribeCallback(Permission.Event.Asked, (evt) => | ||
| Effect.runPromise( | ||
| fork( | ||
| Effect.gen(function* () { | ||
| const mine = yield* isDescendant(evt.properties.sessionID) | ||
| if (!mine) return | ||
|
|
@@ -113,8 +149,13 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { | |
| ) | ||
|
|
||
| const unsubscribe = () => { | ||
| closed = true | ||
| unsubQuestion() | ||
| unsubPermission() | ||
| inflight.forEach((fiber) => Effect.runFork(Fiber.interrupt(fiber))) | ||
| // Don't clear() — let the per-fiber Fiber.await observers remove entries | ||
| // as their interrupts settle, so any stragglers caught by the closed-flag | ||
| // branch above still get cleaned up correctly. | ||
| } | ||
|
Comment on lines
89
to
159
|
||
|
|
||
| return { stats, unsubscribe } satisfies Handle | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -523,4 +523,50 @@ describe("cli/run-events", () => { | |||||||||||||||||||||
| }), | ||||||||||||||||||||||
| ), | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Lifecycle test for F7 fiber-tracking: after unsubscribe(), late-arriving | ||||||||||||||||||||||
| // bus callbacks must not produce new auto-rejects. This exercises both the | ||||||||||||||||||||||
| // bus unsubscription path and the `closed` flag in fork() that prevents | ||||||||||||||||||||||
| // late callbacks from forking new handler fibers after teardown has begun. | ||||||||||||||||||||||
| it.live("does not auto-reject question.asked after unsubscribe()", () => | ||||||||||||||||||||||
| provideTmpdirInstance(() => | ||||||||||||||||||||||
| Effect.gen(function* () { | ||||||||||||||||||||||
| const question = yield* Question.Service | ||||||||||||||||||||||
| const rootSessionID = SessionID.make("ses_root_post_unsub_000000000000") | ||||||||||||||||||||||
| const handler = yield* RunEvents.make({ | ||||||||||||||||||||||
| rootSessionID, | ||||||||||||||||||||||
| skipPermissions: false, | ||||||||||||||||||||||
| jsonMode: false, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| yield* Effect.sync(() => handler.unsubscribe()) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Ask after unsubscribe — without the bus subscription, no auto-reject | ||||||||||||||||||||||
| // handler runs and the question stays pending. | ||||||||||||||||||||||
| const fiber = yield* question | ||||||||||||||||||||||
| .ask({ | ||||||||||||||||||||||
| sessionID: rootSessionID, | ||||||||||||||||||||||
| questions: [{ question: "post?", header: "h", options: [{ label: "x", description: "x" }] }], | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| .pipe(Effect.forkScoped) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| const pending = yield* pollForLength(() => question.list(), 1) | ||||||||||||||||||||||
| expect(pending[0].sessionID).toBe(rootSessionID) | ||||||||||||||||||||||
| expect(handler.stats.autoRejectedQuestions).toBe(0) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
||||||||||||||||||||||
| // Give any late async bus callbacks a chance to run, then verify the | |
| // question is still pending and no auto-reject occurred. | |
| yield* Effect.sleep("50 millis") | |
| const stillPending = yield* question.list() | |
| expect(stillPending).toHaveLength(1) | |
| expect(stillPending[0].id).toBe(pending[0].id) | |
| expect(stillPending[0].sessionID).toBe(rootSessionID) | |
| expect(handler.stats.autoRejectedQuestions).toBe(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Effect.tapCausewill also run when the forked fiber is interrupted (e.g. viaunsubscribe()), so normal shutdown will be logged ashandler failed. Consider filtering out interrupt-only causes (e.g.Cause.hasInterruptsOnly(cause)) so only real handler failures/defects are logged.