From 06bd1d48e66b376ebfeb302446a61d75bc7b84cb Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 26 May 2026 18:08:29 +0200 Subject: [PATCH 01/10] feat(core,world-vercel): fence event writes against a stale snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The elapsed-wait scan now snapshots the loaded events' tail eventId and passes it as `lastKnownEventId` on each `wait_completed` write, so a concurrent `resumeHook` that has already advanced the canonical log is detected — the server's CAS rejects the write, we surface it as the existing `EntityConflictError`, and the next iteration re-replays against the fresh event list (mirroring the duplicate-wait fall-through that was already there). `resumeHook` sends `asOfTimestamp` (Date.now() at call time) so the server resolves the fence to the highest eventId strictly before resume time — no client-side event pre-read needed. Plumbed through `CreateEventParams` on `@workflow/world` so future worlds can forward as-is. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/event-write-occ-fence.md | 13 ++++++++++ packages/core/src/runtime.ts | 33 +++++++++++++++++++++--- packages/core/src/runtime/resume-hook.ts | 10 +++++-- packages/world-vercel/src/events.ts | 6 +++++ packages/world/src/events.ts | 14 ++++++++++ 5 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 .changeset/event-write-occ-fence.md diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md new file mode 100644 index 0000000000..ab4b7decc1 --- /dev/null +++ b/.changeset/event-write-occ-fence.md @@ -0,0 +1,13 @@ +--- +"@workflow/core": patch +"@workflow/world": patch +"@workflow/world-vercel": patch +--- + +Add an optimistic-concurrency fence to event writes that talk to workflow-server. + +- The elapsed-wait scan now passes `lastKnownEventId` snapshotted from the loaded events when committing `wait_completed`, so a stale-snapshot tick can't slip a sleep-branch event past a freshly-committed `hook_received`. +- `resumeHook` sends `asOfTimestamp` with the new `hook_received` event so the server-side fence is anchored at the resume call's wall-clock without paying for a client-side event pre-read. +- The `CreateEventParams` shape on `@workflow/world` grows two optional fields (`lastKnownEventId`, `asOfTimestamp`) that worlds may forward as-is. + +Conflict surfaces as the existing `EntityConflictError`, which the runtime already reloads-and-continues on. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 1aa36e32ad..f7a4e2c304 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -796,18 +796,43 @@ export function workflowEntrypoint( }, })); + // Snapshot the loaded events' tail eventId as the OCC + // fence. If a concurrent writer (e.g. `resumeHook`) + // committed something between our load and these + // writes, the server's CAS will reject and we'll + // surface that as `EntityConflictError` — same handling + // as a duplicate wait completion (skip + continue, + // we'll re-replay the next iteration with fresh events). + let fenceEventId: string | undefined = + events.length > 0 + ? events[events.length - 1].eventId + : undefined; for (const waitEvent of waitsToComplete) { try { - await world.events.create(runId, waitEvent, { - requestId, - }); + const result = await world.events.create( + runId, + waitEvent, + { + requestId, + ...(fenceEventId + ? { lastKnownEventId: fenceEventId } + : {}), + } + ); + // Advance the local fence so the next wait_completed + // (or subsequent write) chains off the just-committed + // event, not the snapshot tail. + if (result.event) { + fenceEventId = result.event.eventId; + } } catch (err) { if (EntityConflictError.is(err)) { runtimeLogger.info( - 'Wait already completed, skipping', + 'Wait already completed or fence conflict, skipping', { workflowRunId: runId, correlationId: waitEvent.correlationId, + fenceEventId, } ); continue; diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 0787d4d4e5..85381ec6a8 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,7 +156,13 @@ export async function resumeHook( }) ); - // Create a hook_received event with the payload + // Create a hook_received event with the payload. `asOfTimestamp` + // anchors the OCC fence: the server resolves it to the highest + // eventId strictly before `now` and uses that as the expected + // previous fence, so this `hook_received` is guaranteed to land + // after anything the caller could have observed at resume time — + // no client-side event pre-load required. + const asOfTimestamp = Date.now(); await world.events.create( hook.runId, { @@ -168,7 +174,7 @@ export async function resumeHook( payload: dehydratedPayload, }, }, - { v1Compat } + { v1Compat, asOfTimestamp } ); span?.setAttributes({ diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 433cedcc7c..a896e04330 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -457,6 +457,12 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), + ...(params?.asOfTimestamp !== undefined + ? { asOfTimestamp: params.asOfTimestamp } + : {}), }, config, schema: EventResultResolveWireSchema, diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index a541ac0563..3a33e2bfc0 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -391,6 +391,20 @@ export interface CreateEventParams { resolveData?: ResolveData; /** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */ requestId?: string; + /** + * OCC fence: when set, the event write is rejected with a conflict + * unless the run's materialized `lastKnownEventId` equals this value. + * Lets the runtime stop a stale-snapshot tick from advancing the log. + */ + lastKnownEventId?: string; + /** + * OCC fence (alternative form): unix-ms cutoff. Server resolves to the + * highest eventId strictly before this timestamp and uses that as the + * expected fence. Lets `resumeHook` fence `hook_received` after anything + * the caller could have observed without paying for a separate read. + * Ignored when `lastKnownEventId` is also set. + */ + asOfTimestamp?: number; } /** From e65e9b06c9bbbe7d900086572852cb623313d695 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 26 May 2026 18:20:49 +0200 Subject: [PATCH 02/10] resume-hook: drop asOfTimestamp fence (let hook_received always append) --- packages/core/src/runtime/resume-hook.ts | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 85381ec6a8..c89aec4621 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,13 +156,12 @@ export async function resumeHook( }) ); - // Create a hook_received event with the payload. `asOfTimestamp` - // anchors the OCC fence: the server resolves it to the highest - // eventId strictly before `now` and uses that as the expected - // previous fence, so this `hook_received` is guaranteed to land - // after anything the caller could have observed at resume time — - // no client-side event pre-load required. - const asOfTimestamp = Date.now(); + // Append `hook_received` unconditionally — ULID ordering already + // places this write after anything committed before us. We do + // NOT send `lastKnownEventId` here: a fence would only ever + // reject the hook in favor of an unrelated concurrent write, + // which would lose the user's hook signal. Stale-snapshot + // protection lives on the *tick* writes that consume hooks. await world.events.create( hook.runId, { @@ -174,7 +173,7 @@ export async function resumeHook( payload: dehydratedPayload, }, }, - { v1Compat, asOfTimestamp } + { v1Compat } ); span?.setAttributes({ From db3cf6292c3a8f8eee75030cf706d9ceab33894b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 26 May 2026 20:36:44 +0200 Subject: [PATCH 03/10] runtime: retry in-place on fence conflict (avoid thunder-herd) --- packages/core/src/runtime.ts | 117 +++++++++++++++++++++++++---------- 1 file changed, 86 insertions(+), 31 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index f7a4e2c304..90d87688f6 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -798,46 +798,101 @@ export function workflowEntrypoint( // Snapshot the loaded events' tail eventId as the OCC // fence. If a concurrent writer (e.g. `resumeHook`) - // committed something between our load and these - // writes, the server's CAS will reject and we'll - // surface that as `EntityConflictError` — same handling - // as a duplicate wait completion (skip + continue, - // we'll re-replay the next iteration with fresh events). + // committed something between our load and this write, + // the server's CAS rejects and we retry *in-place* + // with a freshly-loaded fence rather than throwing + // the whole tick away. Falling back to queue + // redelivery thunder-herds — every redelivery spawns + // another concurrent tick which fences-conflicts + // again, and workflows stall in `running`. let fenceEventId: string | undefined = events.length > 0 ? events[events.length - 1].eventId : undefined; + const MAX_FENCE_RETRIES = 5; for (const waitEvent of waitsToComplete) { - try { - const result = await world.events.create( - runId, - waitEvent, - { - requestId, - ...(fenceEventId - ? { lastKnownEventId: fenceEventId } - : {}), - } - ); - // Advance the local fence so the next wait_completed - // (or subsequent write) chains off the just-committed - // event, not the snapshot tail. - if (result.event) { - fenceEventId = result.event.eventId; - } - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info( - 'Wait already completed or fence conflict, skipping', + let attempts = 0; + let written = false; + while (!written) { + try { + const result = await world.events.create( + runId, + waitEvent, { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - fenceEventId, + requestId, + ...(fenceEventId + ? { lastKnownEventId: fenceEventId } + : {}), } ); - continue; + if (result.event) { + fenceEventId = result.event.eventId; + } + written = true; + } catch (err) { + if (!EntityConflictError.is(err)) { + throw err; + } + const isFenceConflict = /fence conflict/i.test( + err.message + ); + const isDuplicateWait = /workflow wait/i.test( + err.message + ); + if (isDuplicateWait) { + runtimeLogger.info( + 'Wait already completed, skipping', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + } + ); + break; + } + if (!isFenceConflict) { + throw err; + } + attempts += 1; + if (attempts > MAX_FENCE_RETRIES) { + runtimeLogger.warn( + 'Wait completion gave up after fence retries; falling back to queue redelivery', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + attempts, + } + ); + throw err; + } + const loaded = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + if (eventsCursor) { + for (const e of loaded.events) { + if ( + !events.some((x) => x.eventId === e.eventId) + ) { + events.push(e); + } + } + eventsCursor = loaded.cursor ?? eventsCursor; + } else { + events = loaded.events; + eventsCursor = loaded.cursor; + } + const alreadyCompleted = events.some( + (e) => + e.eventType === 'wait_completed' && + e.correlationId === waitEvent.correlationId + ); + if (alreadyCompleted) { + break; + } + fenceEventId = events[events.length - 1]?.eventId; + await new Promise((r) => + setTimeout(r, 25 * attempts) + ); } - throw err; } } From 1fca755fadce9e855a7858b4719ad5f460b9bbc2 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 26 May 2026 21:25:46 +0200 Subject: [PATCH 04/10] runtime: treat all non-fence wait conflicts as "already completed" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The earlier revision filtered duplicate-wait conflicts by a workflow-server-specific error message ("Workflow wait ..."), which meant world-local's "Wait \"...\" already completed" (and any other world's duplicate-wait error shape) fell through and bubbled the EntityConflictError out of the elapsed-wait scan. abortHookOrdering e2e suites started failing as a result. Invert the filter: only the fence-conflict message (a workflow-server- only error) drives the retry path. Anything else is the pre-OCC "skip and continue" shape — matches the original behavior across all world implementations. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/core/src/runtime.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 90d87688f6..69f0d8db14 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -833,13 +833,19 @@ export function workflowEntrypoint( if (!EntityConflictError.is(err)) { throw err; } + // Fence conflicts surface a specific error + // message from workflow-server. Anything + // else (workflow-server "Workflow wait …", + // world-local 'Wait "…" already completed', + // and any other world's duplicate-wait + // shape) is the existing + // wait-already-completed conflict — skip + // and continue, matching pre-OCC behavior + // across worlds. const isFenceConflict = /fence conflict/i.test( err.message ); - const isDuplicateWait = /workflow wait/i.test( - err.message - ); - if (isDuplicateWait) { + if (!isFenceConflict) { runtimeLogger.info( 'Wait already completed, skipping', { @@ -849,9 +855,6 @@ export function workflowEntrypoint( ); break; } - if (!isFenceConflict) { - throw err; - } attempts += 1; if (attempts > MAX_FENCE_RETRIES) { runtimeLogger.warn( From 1e69c82b3bbf433a56f731c1673506d55a93306d Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 27 May 2026 09:59:56 +0200 Subject: [PATCH 05/10] world-vercel: forward fence params in the lazy event-create branch The lazy-refs branch of createWorkflowRunEventInner forgot to thread `lastKnownEventId` and `asOfTimestamp` into the request body, so the fence was silently dropped for any event whose type went through the lazy path (i.e., not in `eventsNeedingResolve`). The resolve branch already had the forwarding. Caught by Vercel Agent Review. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/events.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index a896e04330..48a417f7e1 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -488,6 +488,12 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), + ...(params?.asOfTimestamp !== undefined + ? { asOfTimestamp: params.asOfTimestamp } + : {}), }, config, schema: EventResultLazyWireSchema, From ec7cad1bd2b0910160a7af4ccb815e3929a28c76 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 27 May 2026 11:53:40 +0200 Subject: [PATCH 06/10] Apply suggestions from code review Co-authored-by: Peter Wielander Signed-off-by: Peter Wielander --- .changeset/event-write-occ-fence.md | 8 +------ packages/core/src/runtime.ts | 29 ++++++++++-------------- packages/core/src/runtime/resume-hook.ts | 11 ++++----- packages/world/src/events.ts | 17 +++++++------- 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md index ab4b7decc1..df5ca57ec9 100644 --- a/.changeset/event-write-occ-fence.md +++ b/.changeset/event-write-occ-fence.md @@ -4,10 +4,4 @@ "@workflow/world-vercel": patch --- -Add an optimistic-concurrency fence to event writes that talk to workflow-server. - -- The elapsed-wait scan now passes `lastKnownEventId` snapshotted from the loaded events when committing `wait_completed`, so a stale-snapshot tick can't slip a sleep-branch event past a freshly-committed `hook_received`. -- `resumeHook` sends `asOfTimestamp` with the new `hook_received` event so the server-side fence is anchored at the resume call's wall-clock without paying for a client-side event pre-read. -- The `CreateEventParams` shape on `@workflow/world` grows two optional fields (`lastKnownEventId`, `asOfTimestamp`) that worlds may forward as-is. - -Conflict surfaces as the existing `EntityConflictError`, which the runtime already reloads-and-continues on. +Add optional `lastKnownEventId` and `asOfTimestamp` params to `events.create`, which the World can use to do optimisti concurrency control fencing. Conflict surfaces as existing `EntityConflictError`, which the runtime already reloads-and-continues on. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 69f0d8db14..e204eb9e56 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -796,15 +796,12 @@ export function workflowEntrypoint( }, })); - // Snapshot the loaded events' tail eventId as the OCC - // fence. If a concurrent writer (e.g. `resumeHook`) - // committed something between our load and this write, - // the server's CAS rejects and we retry *in-place* - // with a freshly-loaded fence rather than throwing - // the whole tick away. Falling back to queue - // redelivery thunder-herds — every redelivery spawns - // another concurrent tick which fences-conflicts - // again, and workflows stall in `running`. + // The last known event ID is used for optimistic concurrency + // control, being provided to the World, which will reject if any + // other events arrive between our event log read and event write. + // On fail, we retry *in-place* with a freshly-loaded fence rather than + // terminating the invocation. Falling back to queue redelivery directly + // could cause retry-storms under high load. let fenceEventId: string | undefined = events.length > 0 ? events[events.length - 1].eventId @@ -834,14 +831,12 @@ export function workflowEntrypoint( throw err; } // Fence conflicts surface a specific error - // message from workflow-server. Anything - // else (workflow-server "Workflow wait …", - // world-local 'Wait "…" already completed', - // and any other world's duplicate-wait - // shape) is the existing - // wait-already-completed conflict — skip - // and continue, matching pre-OCC behavior - // across worlds. + // message from workflow-server. + // Most 409s will simply exit since we assume a separate + // invocation is active. This should hold true for fence conflicts + // too, but to guarantee correctness, will be re-tried here directly. + // TODO: We can remove the retry here after extensive validation. + // The cost is low in the meantime. const isFenceConflict = /fence conflict/i.test( err.message ); diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index c89aec4621..c6722f822a 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,12 +156,11 @@ export async function resumeHook( }) ); - // Append `hook_received` unconditionally — ULID ordering already - // places this write after anything committed before us. We do - // NOT send `lastKnownEventId` here: a fence would only ever - // reject the hook in favor of an unrelated concurrent write, - // which would lose the user's hook signal. Stale-snapshot - // protection lives on the *tick* writes that consume hooks. + // Create a hook_received event with the payload + // + // From a concurrency control perspective, this is done unconditionally. + // Any other event creations or invocations use the `lastKnownEventId` + // fence to ensure hook_received being added won't cause ordering issues. await world.events.create( hook.runId, { diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index 3a33e2bfc0..0d385175c7 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -392,17 +392,18 @@ export interface CreateEventParams { /** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */ requestId?: string; /** - * OCC fence: when set, the event write is rejected with a conflict - * unless the run's materialized `lastKnownEventId` equals this value. - * Lets the runtime stop a stale-snapshot tick from advancing the log. + * Optimistic concurrency control fence: when set, the event write is + * rejected with a conflict unless the run's materialized `lastKnownEventId` + * equals this value. Lets the runtime stop an invocation operating on a stale + * event log snapshot from advancing the log. */ lastKnownEventId?: string; /** - * OCC fence (alternative form): unix-ms cutoff. Server resolves to the - * highest eventId strictly before this timestamp and uses that as the - * expected fence. Lets `resumeHook` fence `hook_received` after anything - * the caller could have observed without paying for a separate read. - * Ignored when `lastKnownEventId` is also set. + * Optimistic concurrency control fence (alternative form), see above for reasoning. + * This is a unix-ms cutoff. Server resolves to the highest eventId strictly before this + * timestamp and uses that as the expected fence. Lets `resumeHook` fence + * `hook_received` after anything the caller could have observed without paying + * for a separate read. Ignored when `lastKnownEventId` is also set. */ asOfTimestamp?: number; } From b5c567c91addaec3bb70ccf58fdd7193bae499cc Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 28 May 2026 12:15:46 -0700 Subject: [PATCH 07/10] [core] Extend OCC fence to all branch-decision writes (#2132) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [DEBUG] Trace replay event log and step/hook/sleep assignments Temporary diagnostic instrumentation for investigating intermittent CorruptedEventLogError 'step consumer mismatch' failures. Emits console.log lines tagged 'WF_TRACE' at four points: - runWorkflow start: dumps the full event array the replay will consume (eventIds, types, correlationIds, stepNames) plus a sha256 digest - step/hook/sleep subscribe: per-replay correlationId -> name assignment - step consumer mismatch: structured record of the failure including the event index in the SDK's view of the log - runWorkflow end: completed | failed | suspended Used to diff successive replays of the same runId and confirm whether the SDK actually sees the same event array each time. * [DEBUG] Extend OCC fence to all branch-decision writes Peter's PR #2113 fences `wait_completed` writes from the elapsed-wait scan. This commit extends the fence to every other write whose outcome depends on a branch decision the workflow VM made from its loaded event log — per the table @VaguelySerious himself laid out in his PR comment: suspension-handler.ts: - step_created (the smoking gun on wrun_01KSPS7XEGHF4A6WYF4DB03D40) - hook_created - hook_disposed - wait_created runtime.ts terminal writes: - run_completed - run_failed `hook_received` is deliberately NOT fenced (Peter's reasoning preserved verbatim: fencing the user's signal would drop it on contention; stale- snapshot protection belongs on the writes that consume hooks, not the ones that deliver them). The fence value is the load-time tail of the events array passed into `runWorkflow`. `suspension-handler` receives the fence + cursor from the runtime and reloads on conflict; the runtime's terminal writes read the cursor directly. The new `__fenced-write.ts` helper encapsulates the retry loop so we don't have to copy/paste Peter's pattern six times. It's named with the leading-underscore convention to flag it as throwaway diagnostic code, matching `__debug-replay-trace.ts`. * [DEBUG] Point at Peter's workflow-server PR 447 preview + map HTTP 412 Two changes both needed for the extended-fence test loop to actually exercise the OCC code path on the server: 1. Hardcode WORKFLOW_SERVER_URL_OVERRIDE to https://workflow-server-83nn57dvc.vercel.sh (preview deployment of workflow-server PR 447, branch alias workflow-server-git-peter-event-write-cas.vercel.sh). The previous preview at workflow-server-7pxaxn4d4.vercel.sh was Pranay's monotonic- append PR 456 \u2014 different fix, doesn't implement the CAS the SDK side now sends. 2. Map HTTP 412 \u2192 EntityConflictError in the world-vercel error mapper. workflow-server PR 447 returns 412 with a 'fence conflict' message for EventLogFenceConflictError; the SDK's existing fence-retry loops (Peter's wait_completed scan + the new ones in suspension-handler and runtime terminal writes) match on /fence conflict/i against the message of an EntityConflictError. Without this mapping the 412 falls through to WorkflowWorldError and the regex match never fires. * fix(core): chain fences for replay-created events * test: clear workflow server debug override * fix(core): scope step dispatch to owners * fix(core): recover wait-raced step dispatch * Remove diagnostic instrumentation, rename fenced-write helper Strip the WF_TRACE replay tracing that was used to diagnose the CORRUPTED_EVENT_LOG race \u2014 it's served its purpose now that the fix is in. Specifically: - Delete packages/core/src/__debug-replay-trace.ts and its 8 call sites in workflow.ts, step.ts, workflow/hook.ts, workflow/sleep.ts. - Drop the matching [DEBUG] inline narrative comments at each call site. - Rename packages/core/src/runtime/__fenced-write.ts \u2192 fenced-write.ts (the leading-underscore convention marked it as throwaway diagnostic code; the helper is intended to stay). - Trim the file header on fenced-write.ts and the related narrative comment in suspension-handler.ts to drop the failing-runId / PR-number references that only made sense in the debug context. No behavioral change. typecheck clean (0 errors); 1014/1014 unit tests pass (same as parent commit 77f057a75). * world-vercel: always preserve fence-conflict marker on HTTP 412 Address Copilot review on PR 2132 (https://github.com/vercel/workflow/pull/2132#discussion_r3319911635). The fence-retry loop in runtime/fenced-write.ts detects OCC conflicts via /fence conflict/i.test(err.message). The 412 branch was relying on the server's JSON body to populate that message via errorData.message, but parseResponseBody().catch(() => ({})) swallows JSON parse failures silently — so any non-JSON 412 response (CDN HTML, gateway timeout page, intermediate proxy error) would surface as EntityConflictError(" /endpoint -> HTTP 412: Precondition Failed"), the regex would miss it, and the retry loop would mis-classify the conflict as terminal. Prefix the message with `fence conflict:` whenever the parsed body didn't already carry the marker, so the retry detection is robust to response-body parse failures. Tests: world-vercel 69/69 pass. --------- Co-authored-by: Peter Wielander --- .changeset/chilly-fences-chain.md | 5 + .changeset/quick-local-queues.md | 5 + .../docs/v4/changelog/eager-processing.mdx | 4 +- .../docs/v5/changelog/eager-processing.mdx | 4 +- packages/core/src/runtime.ts | 329 +++++++++-- packages/core/src/runtime/fenced-write.ts | 181 ++++++ .../src/runtime/suspension-handler.test.ts | 143 +++++ .../core/src/runtime/suspension-handler.ts | 546 +++++++++++------- packages/core/src/step.ts | 1 + packages/core/src/workflow/hook.ts | 1 + packages/world-local/src/queue.test.ts | 31 + packages/world-local/src/queue.ts | 22 + .../src/inline-batches-debug.mts | 5 +- packages/world-vercel/src/utils.ts | 14 + 14 files changed, 1043 insertions(+), 248 deletions(-) create mode 100644 .changeset/chilly-fences-chain.md create mode 100644 .changeset/quick-local-queues.md create mode 100644 packages/core/src/runtime/fenced-write.ts create mode 100644 packages/core/src/runtime/suspension-handler.test.ts diff --git a/.changeset/chilly-fences-chain.md b/.changeset/chilly-fences-chain.md new file mode 100644 index 0000000000..7e58facf55 --- /dev/null +++ b/.changeset/chilly-fences-chain.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Chain replay-created events and owner-scope inline-capable step dispatch to avoid stale or duplicate replay writes. diff --git a/.changeset/quick-local-queues.md b/.changeset/quick-local-queues.md new file mode 100644 index 0000000000..b019a5800c --- /dev/null +++ b/.changeset/quick-local-queues.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-local": patch +--- + +Preserve completed local queue idempotency keys to avoid duplicate step dispatches. diff --git a/docs/content/docs/v4/changelog/eager-processing.mdx b/docs/content/docs/v4/changelog/eager-processing.mdx index 9be4e0cb1b..3ab42e0340 100644 --- a/docs/content/docs/v4/changelog/eager-processing.mdx +++ b/docs/content/docs/v4/changelog/eager-processing.mdx @@ -115,9 +115,9 @@ The design enforces a simple invariant: **exactly one handler owns each step, an 1. **Atomic `step_created`** --- the world's `events.create('step_created', correlationId=X)` is serialized per-correlationId. Exactly one concurrent caller succeeds; the rest receive `EntityConflictError`. In production worlds (Postgres, Vercel) this is enforced at the SQL/DB layer. In `world-local`, a per-step in-process async mutex in `packages/world-local/src/storage/events-storage.ts` wraps every step lifecycle event's check-and-write so the same guarantee holds for dev. 2. **Suspension handler reports ownership** --- `handleSuspension()` returns `createdStepCorrelationIds: Set`, populated only for `step_created` writes that actually succeeded (not those that caught 409). 3. **Inline execution is gated on ownership** --- the runtime loop in `packages/core/src/runtime.ts` picks its inline step from `pendingSteps.filter(s => createdStepCorrelationIds.has(s.correlationId))`. A handler that didn't win any `step_created` race performs no inline execution. -4. **Queueing is unconditional** --- for every pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. This matches V1's enqueue pattern and is what makes crash recovery work: if a prior handler wrote `step_created` but crashed before enqueueing, a later handler (from flow-message redelivery or `reenqueueActiveRuns`) will enqueue the orphaned step. Concurrent handlers' redundant enqueues dedupe on the idempotency key. +4. **Queueing is owner-scoped when inline execution is possible** --- for every owned pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. A non-owner does not queue steps it merely observed from a stale replay, so it cannot race the owner's inline execution. When a wait is also pending, no handler executes steps inline, so every pending step is queueable and queue idempotency handles duplicates. If the workflow queue message itself is redelivered, the handler also recovers pre-existing `step_created` / `step_retrying` entries that have not reached `step_started`, covering the crash window after `step_created` but before queue dispatch. -Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** at least one queued dispatch (from whichever handler first reaches the suspension path after the `step_created` is visible). Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work). +Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** every non-inline owned step gets a queued dispatch from the owner. Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work). **Retry semantics are preserved**: the per-step mutex in `world-local` only rejects `step_started` when the step is already in a *terminal* state (`completed` / `failed`). A step that is currently running (status=`running`) still accepts a second `step_started` write with an incremented attempt counter — this is how queue redelivery after a SIGKILL mid-execution legitimately re-runs the step. The previously-documented "attempt counter inflation" failure mode is therefore no longer reachable via the concurrent-inline path; see "Concurrent `step_started` Inflating Attempt Counter" below for the complementary executor-side guard that still catches edge cases (e.g., postgres retries under high contention). diff --git a/docs/content/docs/v5/changelog/eager-processing.mdx b/docs/content/docs/v5/changelog/eager-processing.mdx index 141e656c81..2e80fe7d31 100644 --- a/docs/content/docs/v5/changelog/eager-processing.mdx +++ b/docs/content/docs/v5/changelog/eager-processing.mdx @@ -115,9 +115,9 @@ The design enforces a simple invariant: **exactly one handler owns each step, an 1. **Atomic `step_created`** --- the world's `events.create('step_created', correlationId=X)` is serialized per-correlationId. Exactly one concurrent caller succeeds; the rest receive `EntityConflictError`. In production worlds (Postgres, Vercel) this is enforced at the SQL/DB layer. In `world-local`, a per-step in-process async mutex in `packages/world-local/src/storage/events-storage.ts` wraps every step lifecycle event's check-and-write so the same guarantee holds for dev. 2. **Suspension handler reports ownership** --- `handleSuspension()` returns `createdStepCorrelationIds: Set`, populated only for `step_created` writes that actually succeeded (not those that caught 409). 3. **Inline execution is gated on ownership** --- the runtime loop in `packages/core/src/runtime.ts` picks its inline step from `pendingSteps.filter(s => createdStepCorrelationIds.has(s.correlationId))`. A handler that didn't win any `step_created` race performs no inline execution. -4. **Queueing is unconditional** --- for every pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. This matches V1's enqueue pattern and is what makes crash recovery work: if a prior handler wrote `step_created` but crashed before enqueueing, a later handler (from flow-message redelivery or `reenqueueActiveRuns`) will enqueue the orphaned step. Concurrent handlers' redundant enqueues dedupe on the idempotency key. +4. **Queueing is owner-scoped when inline execution is possible** --- for every owned pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. A non-owner does not queue steps it merely observed from a stale replay, so it cannot race the owner's inline execution. When a wait is also pending, no handler executes steps inline, so every pending step is queueable and queue idempotency handles duplicates. If the workflow queue message itself is redelivered, the handler also recovers pre-existing `step_created` / `step_retrying` entries that have not reached `step_started`, covering the crash window after `step_created` but before queue dispatch. -Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** at least one queued dispatch (from whichever handler first reaches the suspension path after the `step_created` is visible). Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work). +Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** every non-inline owned step gets a queued dispatch from the owner. Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work). **Retry semantics are preserved**: the per-step mutex in `world-local` only rejects `step_started` when the step is already in a *terminal* state (`completed` / `failed`). A step that is currently running (status=`running`) still accepts a second `step_started` write with an incremented attempt counter — this is how queue redelivery after a SIGKILL mid-execution legitimately re-runs the step. The previously-documented "attempt counter inflation" failure mode is therefore no longer reachable via the concurrent-inline path; see "Concurrent `step_started` Inflating Attempt Counter" below for the complementary executor-side guard that still catches edge cases (e.g., postgres retries under high contention). diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index e204eb9e56..4fe5bf85e6 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -19,6 +19,7 @@ import { classifyRunError, isWorldContractError } from './classify-error.js'; import { describeError } from './describe-error.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; +import { fencedEventCreate } from './runtime/fenced-write.js'; import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; import { getQueueOverhead, @@ -964,22 +965,111 @@ export function workflowEntrypoint( replayMs: Date.now() - replayStart, }); - // Workflow completed + // Workflow completed. + // + // Fence the `run_completed` write against the + // load-time tail of `events`: if a concurrent + // replay with a fresher view has already written + // its own terminal event (run_completed / run_failed), + // CAS rejects this write and we treat the run as + // already-terminal (same shape as the existing + // EntityConflictError / RunExpiredError branches). try { - await world.events.create( + let runCompletedFence = + events.length > 0 + ? events[events.length - 1].eventId + : undefined; + try { + const terminalFenceRefresh = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + if (terminalFenceRefresh.events.length > 0) { + const existingIds = new Set( + events.map((e) => e.eventId) + ); + for (const event of terminalFenceRefresh.events) { + if (!existingIds.has(event.eventId)) { + events.push(event); + } + } + runCompletedFence = + terminalFenceRefresh.events[ + terminalFenceRefresh.events.length - 1 + ].eventId; + } + eventsCursor = + terminalFenceRefresh.cursor ?? eventsCursor; + cachedEvents = events; + } catch (refreshErr) { + runtimeLogger.debug( + 'Failed to refresh terminal fence before completing workflow run', + { + workflowRunId: runId, + message: + refreshErr instanceof Error + ? refreshErr.message + : String(refreshErr), + } + ); + } + const terminalEventAfterReplay = events.find( + (e) => + e.eventType === 'run_completed' || + e.eventType === 'run_failed' || + e.eventType === 'run_cancelled' + ); + if (terminalEventAfterReplay) { + runtimeLogger.debug( + 'Run completed by concurrent handler after replay, exiting', + { + workflowRunId: runId, + eventType: terminalEventAfterReplay.eventType, + } + ); + return; + } + const writeResult = await fencedEventCreate({ + world, runId, - { + event: { eventType: 'run_completed', specVersion: SPEC_VERSION_CURRENT, eventData: { output: result }, }, - { requestId } - ); + requestId, + fenceEventId: runCompletedFence, + onConflictRefresh: async () => { + // Terminal-state idempotency: if any + // terminal run event landed since our load, + // abort. Don't retry — once a run is in a + // terminal state, our write is wrong. + const loaded = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + eventsCursor = loaded.cursor ?? eventsCursor; + const reachedTerminal = loaded.events.some( + (e) => + e.eventType === 'run_completed' || + e.eventType === 'run_failed' || + e.eventType === 'run_cancelled' + ); + if (reachedTerminal) return { kind: 'abort' }; + const fresh = + loaded.events[loaded.events.length - 1] + ?.eventId ?? runCompletedFence; + return { kind: 'retry', fenceEventId: fresh }; + }, + onEntityConflict: () => 'abort', + }); + if (!writeResult.written) { + runtimeLogger.info( + 'Tried completing workflow run, but run has already finished.', + { workflowRunId: runId } + ); + return; + } } catch (err) { - if ( - EntityConflictError.is(err) || - RunExpiredError.is(err) - ) { + if (RunExpiredError.is(err)) { runtimeLogger.info( 'Tried completing workflow run, but run has already finished.', { workflowRunId: runId, message: err.message } @@ -1013,14 +1103,31 @@ export function workflowEntrypoint( runtimeLogger.debug(suspensionMessage); } - // V2: handle suspension without queuing steps + // V2: handle suspension without queuing steps. + // + // Pass the load-time tail eventId + cursor so the + // handler can fence its branch-decision writes + // (step_created, hook_created, hook_disposed, + // wait_created) against this snapshot of the log. + // This extends OCC fencing to the other writes whose + // outcome depends on a branch decision the workflow VM + // made from the loaded log. const suspensionStart = Date.now(); + // `cachedEvents` mirrors `events` (assigned right + // before runWorkflow above) and is visible in the + // outer catch scope where `events` is not. + const suspensionFenceEventId = + cachedEvents && cachedEvents.length > 0 + ? cachedEvents[cachedEvents.length - 1].eventId + : undefined; const suspensionResult = await handleSuspension({ suspension: err, world, run: workflowRun, span, requestId, + fenceEventId: suspensionFenceEventId, + eventsCursor, }); runtimeLogger.debug('Suspension handled', { workflowRunId: runId, @@ -1071,10 +1178,10 @@ export function workflowEntrypoint( // the step, replay still picks the step because // wait_completed is only created on the *next* loop // iteration, which doesn't run until the step - // finishes. Queueing every step in this case lets - // the wait timeout drive a continuation in parallel, - // matching V1's behavior where each step ran in a - // separate function invocation. + // finishes. Queueing every owned step in this case + // lets the wait timeout drive a continuation in + // parallel, matching V1's behavior where each step + // ran in a separate function invocation. const inlineStep: | (typeof pendingSteps)[number] | undefined = @@ -1082,19 +1189,73 @@ export function workflowEntrypoint( ? ownedPendingSteps[0] : undefined; - // Queue every pending step except the one we're - // executing inline. This mirrors V1's unconditional - // enqueue-with-idempotency pattern and is what makes - // crash recovery work: if a prior handler wrote - // step_created events but crashed before enqueuing, - // a later handler (e.g., from flow-message - // redelivery or reenqueueActiveRuns) will enqueue - // the orphaned steps. In the happy path with a - // single owner, concurrent handlers' queue attempts - // dedupe on correlationId. Skipping the inline step - // avoids a queue handler racing against our own - // inline executor. - for (const step of pendingSteps) { + // Queue only steps this handler owns when inline + // execution is possible. A non-owner may have a + // stale replay view where another handler just + // created a step and is about to execute it inline; + // queueing that step here would bypass the ownership + // invariant and can run the step body twice. + // + // When a wait is pending, no handler executes a step + // inline: all step work is queue-driven so the wait + // timeout can race it. In that branch it is safe, and + // important for crash recovery, to enqueue every + // pending step and let queue idempotency dedupe. + // + // Recovery exception: if this workflow queue message + // is itself being redelivered, pick up pre-existing + // step_created / step_retrying events from the loaded + // log that never reached step_started. This covers the + // crash window after a handler writes step_created but + // before it queues the step. + const recoverablePendingStepCorrelationIds = + new Set(); + if (metadata.attempt > 1 && cachedEvents) { + const latestStepEvents = new Map(); + for (const event of cachedEvents) { + if ( + event.correlationId && + (event.eventType === 'step_created' || + event.eventType === 'step_retrying' || + event.eventType === 'step_started' || + event.eventType === 'step_completed' || + event.eventType === 'step_failed') + ) { + latestStepEvents.set( + event.correlationId, + event.eventType + ); + } + } + for (const [ + correlationId, + eventType, + ] of latestStepEvents) { + if ( + eventType === 'step_created' || + eventType === 'step_retrying' + ) { + recoverablePendingStepCorrelationIds.add( + correlationId + ); + } + } + } + + const queueablePendingSteps = + suspensionResult.timeoutSeconds !== undefined + ? pendingSteps + : pendingSteps.filter( + (step) => + suspensionResult.createdStepCorrelationIds.has( + step.correlationId + ) || + recoverablePendingStepCorrelationIds.has( + step.correlationId + ) + ); + + for (const step of queueablePendingSteps) { if ( inlineStep && step.correlationId === inlineStep.correlationId @@ -1119,7 +1280,8 @@ export function workflowEntrypoint( } // Nothing to execute inline — we already queued all - // pending steps above, exit and let the queue drive. + // steps this handler can safely dispatch, exit and + // let the queue drive. if (!inlineStep) { if (suspensionResult.timeoutSeconds !== undefined) { return { @@ -1278,10 +1440,79 @@ export function workflowEntrypoint( // Serialize the original thrown value so its full // type identity and custom properties round-trip // through the event log. + // + // Fenced for the same reason as run_completed: + // don't let a stale-view replay paper over a + // concurrent terminal write. try { - await world.events.create( + // `cachedEvents` mirrors the local `events` + // (which lives in the try-block above); see + // the suspension catch for the same trick. + let runFailedEvents = cachedEvents ?? []; + let runFailedFence = + runFailedEvents.length > 0 + ? runFailedEvents[runFailedEvents.length - 1] + .eventId + : undefined; + try { + const terminalFenceRefresh = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + if (eventsCursor) { + if (terminalFenceRefresh.events.length > 0) { + const existingIds = new Set( + runFailedEvents.map((e) => e.eventId) + ); + for (const event of terminalFenceRefresh.events) { + if (!existingIds.has(event.eventId)) { + runFailedEvents.push(event); + } + } + } + } else { + runFailedEvents = terminalFenceRefresh.events; + } + if (terminalFenceRefresh.events.length > 0) { + runFailedFence = + terminalFenceRefresh.events[ + terminalFenceRefresh.events.length - 1 + ].eventId; + } + eventsCursor = + terminalFenceRefresh.cursor ?? eventsCursor; + cachedEvents = runFailedEvents; + } catch (refreshErr) { + runtimeLogger.debug( + 'Failed to refresh terminal fence before failing workflow run', + { + workflowRunId: runId, + message: + refreshErr instanceof Error + ? refreshErr.message + : String(refreshErr), + } + ); + } + const terminalEventAfterReplay = runFailedEvents.find( + (e) => + e.eventType === 'run_completed' || + e.eventType === 'run_failed' || + e.eventType === 'run_cancelled' + ); + if (terminalEventAfterReplay) { + runtimeLogger.debug( + 'Run completed by concurrent handler after replay, exiting', + { + workflowRunId: runId, + eventType: terminalEventAfterReplay.eventType, + } + ); + return; + } + const writeResult = await fencedEventCreate({ + world, runId, - { + event: { eventType: 'run_failed', specVersion: SPEC_VERSION_CURRENT, eventData: { @@ -1293,13 +1524,39 @@ export function workflowEntrypoint( errorCode, }, }, - { requestId } - ); + requestId, + fenceEventId: runFailedFence, + onConflictRefresh: async () => { + const loaded = eventsCursor + ? await loadWorkflowRunEvents( + runId, + eventsCursor + ) + : await loadWorkflowRunEvents(runId); + eventsCursor = loaded.cursor ?? eventsCursor; + const reachedTerminal = loaded.events.some( + (e) => + e.eventType === 'run_completed' || + e.eventType === 'run_failed' || + e.eventType === 'run_cancelled' + ); + if (reachedTerminal) return { kind: 'abort' }; + const fresh = + loaded.events[loaded.events.length - 1] + ?.eventId ?? runFailedFence; + return { kind: 'retry', fenceEventId: fresh }; + }, + onEntityConflict: () => 'abort', + }); + if (!writeResult.written) { + runtimeLogger.info( + 'Tried failing workflow run, but run has already finished.', + { workflowRunId: runId } + ); + return; + } } catch (failErr) { - if ( - EntityConflictError.is(failErr) || - RunExpiredError.is(failErr) - ) { + if (RunExpiredError.is(failErr)) { runtimeLogger.info( 'Tried failing workflow run, but run has already finished.', { diff --git a/packages/core/src/runtime/fenced-write.ts b/packages/core/src/runtime/fenced-write.ts new file mode 100644 index 0000000000..91d1a308c1 --- /dev/null +++ b/packages/core/src/runtime/fenced-write.ts @@ -0,0 +1,181 @@ +/** + * Helper for "branch-decision" event writes that need OCC fencing. + * + * Applies the same fence-and-retry pattern the elapsed-wait scan uses for + * `wait_completed` to every other write whose outcome depends on a branch + * decision the workflow VM made from its loaded event log: + * + * suspension-handler.ts: + * - step_created + * - hook_created + * - hook_disposed + * - wait_created + * + * runtime.ts terminal writes: + * - run_completed + * - run_failed + * + * `hook_received` is deliberately NOT fenced: fencing the user's signal + * would drop it on contention; stale-snapshot protection belongs on the + * writes that consume hooks, not the writes that deliver them. + * + * Each fenced write tries up to MAX_FENCE_RETRIES times. On a fence + * conflict, the caller is expected to refresh `fenceEventId` from a fresh + * event log read. The `onConflictRefresh` callback gives the caller a + * chance to do that, and to decide whether to give up (e.g. when an + * idempotency check confirms the write is no longer needed). + */ +import { EntityConflictError } from '@workflow/errors'; +import type { CreateEventRequest, World } from '@workflow/world'; +import { runtimeLogger } from '../logger.js'; + +const MAX_FENCE_RETRIES = 5; + +/** + * Returns true when an EntityConflictError carries the fence-conflict + * shape. Anything else with a 409/410/etc shape is some other kind of + * conflict (entity already exists, run already terminal, hook token taken) + * that the caller's existing handlers want to keep dealing with. + * + * TODO: switch to a typed error class once the wire format exposes one. + */ +function isFenceConflict(err: unknown): boolean { + return ( + EntityConflictError.is(err) && + typeof err.message === 'string' && + /fence conflict/i.test(err.message) + ); +} + +export interface FencedWriteParams { + world: World; + runId: string; + event: CreateEventRequest; + requestId?: string; + /** + * Caller-provided fence value. Pass `undefined` for the first attempt + * if no fence has been observed yet (e.g. on a fresh resume); the + * helper will then issue an unfenced write — which still atomically + * advances `run.lastKnownEventId` on the server side so future fenced + * writers see the materialized value. + * + * After a successful fenced write, the caller should update its + * tracked fence to the returned `newFenceEventId`. + */ + fenceEventId: string | undefined; + /** + * Called when the server rejects the write with a fence conflict. + * Implementations should: + * 1. Reload events from the cursor. + * 2. Check whether the write is still necessary (e.g. wait_completed + * idempotency — if the event already exists in the reloaded log, + * return `'abort'` so we don't retry pointlessly). + * 3. Return `{ kind: 'retry', fenceEventId: }` to retry + * against the new tail, or `{ kind: 'abort' }` to give up. + * + * Receives the attempt number (1-indexed) so backoff can be tuned by + * the caller if it wants. + */ + onConflictRefresh: ( + attempt: number + ) => Promise< + { kind: 'retry'; fenceEventId: string | undefined } | { kind: 'abort' } + >; + /** + * Called when the server rejects with a *non-fence* EntityConflictError + * (e.g. the entity already exists because a concurrent handler beat us + * to it). Returning `'abort'` is the typical answer — the existing + * handlers in suspension-handler / runtime already log + skip. Returning + * `'rethrow'` re-throws so the caller can deal with it. + */ + onEntityConflict: (err: EntityConflictError) => 'abort' | 'rethrow'; +} + +export interface FencedWriteResult { + /** Whether the event was actually written (false = aborted via dedup). */ + written: boolean; + /** + * eventId of the newly-written event (when `written` is true), so + * the caller can advance its tracked fence value. + */ + newFenceEventId?: string; + /** + * The server's response event, when `written` is true. Allows callers + * to read fields like the resolved eventType (e.g. `hook_conflict` + * instead of `hook_created` when the server detected a token clash). + */ + event?: { eventType: string; eventId: string }; +} + +/** + * Issues `world.events.create(runId, event, { requestId, lastKnownEventId })` + * with up to MAX_FENCE_RETRIES retries on fence conflict. + * + * On any non-fence EntityConflictError, defers to `onEntityConflict` for + * the abort-vs-rethrow decision (preserves the existing + * "EntityConflictError → log and skip" behavior for callers that want it). + */ +export async function fencedEventCreate( + params: FencedWriteParams +): Promise { + const { + world, + runId, + event, + requestId, + onConflictRefresh, + onEntityConflict, + } = params; + let fenceEventId = params.fenceEventId; + let attempt = 0; + // biome-ignore lint/correctness/noConstantCondition: bounded by MAX_FENCE_RETRIES + while (true) { + try { + const result = await world.events.create(runId, event, { + requestId, + ...(fenceEventId ? { lastKnownEventId: fenceEventId } : {}), + }); + return { + written: true, + newFenceEventId: result.event?.eventId ?? fenceEventId, + event: result.event + ? { + eventType: result.event.eventType, + eventId: result.event.eventId, + } + : undefined, + }; + } catch (err) { + if (isFenceConflict(err)) { + attempt += 1; + if (attempt > MAX_FENCE_RETRIES) { + runtimeLogger.warn( + 'Branch-decision write gave up after fence retries', + { + workflowRunId: runId, + eventType: event.eventType, + correlationId: event.correlationId, + attempts: attempt, + } + ); + throw err; + } + const decision = await onConflictRefresh(attempt); + if (decision.kind === 'abort') { + return { written: false }; + } + fenceEventId = decision.fenceEventId; + await new Promise((r) => setTimeout(r, 25 * attempt)); + continue; + } + if (EntityConflictError.is(err)) { + const decision = onEntityConflict(err); + if (decision === 'abort') { + return { written: false }; + } + throw err; + } + throw err; + } + } +} diff --git a/packages/core/src/runtime/suspension-handler.test.ts b/packages/core/src/runtime/suspension-handler.test.ts new file mode 100644 index 0000000000..7f78e16747 --- /dev/null +++ b/packages/core/src/runtime/suspension-handler.test.ts @@ -0,0 +1,143 @@ +import { EntityConflictError } from '@workflow/errors'; +import type { + CreateEventParams, + CreateEventRequest, + Event, + WorkflowRun, + World, +} from '@workflow/world'; +import { describe, expect, it, vi } from 'vitest'; +import { WorkflowSuspension } from '../global.js'; +import { handleSuspension } from './suspension-handler.js'; + +describe('handleSuspension', () => { + it('chains the local event-log fence across writes from the same suspension', async () => { + const runId = 'wrun_test_fence_chain'; + let serverTip = 'evnt_000'; + let sequence = 0; + const writes: Array<{ + eventType: string; + correlationId?: string; + lastKnownEventId?: string; + }> = []; + + const world = { + getEncryptionKeyForRun: vi.fn(async () => undefined), + events: { + create: vi.fn( + async ( + _runId: string, + event: CreateEventRequest, + params?: CreateEventParams + ) => { + if ( + params?.lastKnownEventId !== undefined && + params.lastKnownEventId !== serverTip + ) { + throw new EntityConflictError( + `fence conflict: expected ${params.lastKnownEventId}, current ${serverTip}` + ); + } + + const eventId = `evnt_${String(++sequence).padStart(3, '0')}`; + writes.push({ + eventType: event.eventType, + correlationId: event.correlationId, + lastKnownEventId: params?.lastKnownEventId, + }); + serverTip = eventId; + + return { + event: { + ...event, + eventId, + createdAt: new Date(), + } as Event, + }; + } + ), + }, + streams: { + write: vi.fn(async () => undefined), + close: vi.fn(async () => undefined), + }, + } as unknown as World; + + const suspension = new WorkflowSuspension( + new Map([ + [ + 'hook-user', + { + type: 'hook', + correlationId: 'hook_user', + token: 'user-token', + }, + ], + [ + 'hook-abort', + { + type: 'hook', + correlationId: 'hook_abort', + token: 'abrt_abort', + isSystem: true, + abortRequested: true, + abortReason: 'cancelled', + }, + ], + [ + 'step', + { + type: 'step', + correlationId: 'step_1', + stepName: 'do work', + args: [], + }, + ], + [ + 'wait', + { + type: 'wait', + correlationId: 'wait_1', + resumeAt: new Date(Date.now() + 10_000), + }, + ], + ]), + globalThis + ); + + await handleSuspension({ + suspension, + world, + run: { runId } as WorkflowRun, + fenceEventId: 'evnt_000', + }); + + expect(writes).toEqual([ + { + eventType: 'hook_created', + correlationId: 'hook_user', + lastKnownEventId: 'evnt_000', + }, + { + eventType: 'hook_created', + correlationId: 'hook_abort', + lastKnownEventId: 'evnt_001', + }, + { + eventType: 'hook_received', + correlationId: 'hook_abort', + lastKnownEventId: undefined, + }, + { + eventType: 'step_created', + correlationId: 'step_1', + lastKnownEventId: 'evnt_003', + }, + { + eventType: 'wait_created', + correlationId: 'wait_1', + lastKnownEventId: 'evnt_004', + }, + ]); + }); +}); diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index 8f084ed446..982de0e743 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -1,5 +1,4 @@ import type { Span } from '@opentelemetry/api'; -import { waitUntil } from '@vercel/functions'; import { EntityConflictError, HookNotFoundError, @@ -7,6 +6,7 @@ import { } from '@workflow/errors'; import { type CreateEventRequest, + type Event, type SerializedData, SPEC_VERSION_CURRENT, type WorkflowRun, @@ -23,6 +23,8 @@ import { runtimeLogger } from '../logger.js'; import { dehydrateStepArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; import { getAbortStreamIdFromToken } from '../util.js'; +import { fencedEventCreate } from './fenced-write.js'; +import { loadWorkflowRunEvents } from './helpers.js'; export interface SuspensionHandlerParams { suspension: WorkflowSuspension; @@ -30,6 +32,25 @@ export interface SuspensionHandlerParams { run: WorkflowRun; span?: Span; requestId?: string; + /** + * Caller's most recent view of the event log (tail eventId), used as + * the OCC fence on every branch-decision write below. Pass `undefined` + * when the caller has no events loaded yet; the writes will then be + * unfenced (still atomically advance `run.lastKnownEventId` on the + * server side so future fenced writers chain off the new value). + * + * Conceptually identical to the elapsed-wait scan fence. See + * `fenced-write.ts` for the rationale for extending it to + * step/wait/hook `_created` and `hook_disposed`. + */ + fenceEventId?: string; + /** + * Caller's events-cursor token (from `loadWorkflowRunEvents`). Used to + * refresh the fence after a CAS conflict — the handler pulls fresh + * events from this cursor, takes the new tail as the next fence, and + * retries. + */ + eventsCursor?: string | null; } /** @@ -61,7 +82,8 @@ export interface SuspensionHandlerResult { * * Processing order: * 1. Hooks are processed first to prevent race conditions with webhook receivers - * 2. Step events and wait events are created in parallel + * 2. Writes that advance the event-log fence are issued in DB-order from this + * replay turn, so the next local write chains off the previous event ID. */ export async function handleSuspension({ suspension, @@ -69,8 +91,40 @@ export async function handleSuspension({ run, span, requestId, + fenceEventId: initialFenceEventId, + eventsCursor: initialEventsCursor, }: SuspensionHandlerParams): Promise { const runId = run.runId; + + // Per-suspension shared fence state. Each successful fenced write advances + // `fenceEventId` so subsequent writes from this handler chain off it. + // Branch-decision event writes must not race each other locally: the server + // fence is a single-tip CAS, so parallel sibling writes from one replay turn + // would force self-conflicts against the same starting fence. + let fenceEventId = initialFenceEventId; + let eventsCursor = initialEventsCursor; + + /** + * Reloads events from the cursor and returns the new tail as a fresh + * fence. Also returns the freshly-loaded events so callers can run + * idempotency checks (e.g. "is this `wait_created` already in the log? + * → abort the write"). + * + * NOTE: when `eventsCursor` is unset the reload is a full re-read of + * the run's log, matching the elapsed-wait scan fallback. + */ + async function refreshFence(): Promise<{ + fenceEventId: string | undefined; + loadedEvents: Event[]; + }> { + const loaded = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + eventsCursor = loaded.cursor ?? eventsCursor; + const tail = + loaded.events[loaded.events.length - 1]?.eventId ?? fenceEventId; + return { fenceEventId: tail, loadedEvents: loaded.events }; + } // Separate queue items by type const stepItems = suspension.steps.filter( (item): item is StepInvocationQueueItem => item.type === 'step' @@ -119,89 +173,126 @@ export async function handleSuspension({ ); // Process hooks first to prevent race conditions with webhook receivers. - // All hook creations run in parallel. // Track any hook conflicts that occur — these are returned to the caller // so the V2 handler can re-invoke immediately. let hasHookConflict = false; - if (hookEvents.length > 0) { - await Promise.all( - hookEvents.map(async (hookEvent) => { - try { - const result = await world.events.create(runId, hookEvent, { - requestId, - }); - // Check if the world returned a hook_conflict event instead of hook_created. - // The hook_conflict event is stored in the event log and will be replayed - // on the next workflow invocation, causing the hook's promise to reject. - // Note: hook events always create an event (legacy runs throw, not return undefined) - if (result.event!.eventType === 'hook_conflict') { - hasHookConflict = true; + for (const hookEvent of hookEvents) { + try { + const writeResult = await fencedEventCreate({ + world, + runId, + event: hookEvent, + requestId, + fenceEventId, + onConflictRefresh: async () => { + const { fenceEventId: fresh, loadedEvents } = await refreshFence(); + // Idempotency: if the hook was already created (by us in a + // previous attempt that 412'd then succeeded server-side, + // or by a concurrent handler), don't retry. + const alreadyCreated = loadedEvents.some( + (e) => + (e.eventType === 'hook_created' || + e.eventType === 'hook_conflict') && + e.correlationId === hookEvent.correlationId + ); + if (alreadyCreated) { + return { kind: 'abort' }; } - } catch (err) { - if (EntityConflictError.is(err) || RunExpiredError.is(err)) { - runtimeLogger.info( - 'Workflow run already completed, skipping hook', - { - workflowRunId: runId, - message: err.message, - } - ); - } else { - throw err; + return { kind: 'retry', fenceEventId: fresh }; + }, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + if (!writeResult.written) { + // Already created concurrently — surface as info, same shape + // as the pre-fence "EntityConflictError → skip" branch did. + runtimeLogger.info( + 'Workflow run already completed or hook already created, skipping', + { + workflowRunId: runId, + correlationId: hookEvent.correlationId, } - } - }) - ); + ); + continue; + } + // Preserve the "world resolved hook_created → hook_conflict" + // short-circuit. The hook_conflict event lives in the log and + // will be replayed; the immediate signal lets the caller + // re-invoke right away rather than waiting on the queue. + if (writeResult.event?.eventType === 'hook_conflict') { + hasHookConflict = true; + } + } catch (err) { + if (RunExpiredError.is(err)) { + runtimeLogger.info('Workflow run already completed, skipping hook', { + workflowRunId: runId, + message: err.message, + }); + } else { + throw err; + } + } } // Process hook disposals — these release hook tokens for reuse by other workflows. - if (hooksNeedingDisposal.length > 0) { - await Promise.all( - hooksNeedingDisposal.map(async (queueItem) => { - const hookDisposedEvent: CreateEventRequest = { - eventType: 'hook_disposed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - token: queueItem.token, - }, - }; - try { - await world.events.create(runId, hookDisposedEvent, { requestId }); - } catch (err) { - if (EntityConflictError.is(err)) { - // Hook was already disposed by a concurrent invocation — safe to skip - runtimeLogger.info( - 'Hook already disposed, skipping duplicate disposal', - { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - } - ); - } else if (RunExpiredError.is(err)) { - runtimeLogger.info( - 'Workflow run already completed, skipping hook disposal', - { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - } - ); - } else if (HookNotFoundError.is(err)) { - // Hook may have already been disposed or never created - runtimeLogger.info('Hook not found for disposal, continuing', { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - }); - } else { - throw err; + for (const queueItem of hooksNeedingDisposal) { + const hookDisposedEvent: CreateEventRequest = { + eventType: 'hook_disposed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + token: queueItem.token, + }, + }; + try { + const writeResult = await fencedEventCreate({ + world, + runId, + event: hookDisposedEvent, + requestId, + fenceEventId, + onConflictRefresh: async () => { + const { fenceEventId: fresh, loadedEvents } = await refreshFence(); + // Idempotency: if already disposed, abort. + const alreadyDisposed = loadedEvents.some( + (e) => + e.eventType === 'hook_disposed' && + e.correlationId === queueItem.correlationId + ); + if (alreadyDisposed) { + return { kind: 'abort' }; } - } - }) - ); + return { kind: 'retry', fenceEventId: fresh }; + }, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + } catch (err) { + if (RunExpiredError.is(err)) { + runtimeLogger.info( + 'Workflow run already completed, skipping hook disposal', + { + workflowRunId: runId, + correlationId: queueItem.correlationId, + message: err.message, + } + ); + } else if (HookNotFoundError.is(err)) { + // Hook may have already been disposed or never created + runtimeLogger.info('Hook not found for disposal, continuing', { + workflowRunId: runId, + correlationId: queueItem.correlationId, + message: err.message, + }); + } else { + throw err; + } + } } // Process abort requests — resume the hook with abort payload and write stream packet @@ -209,76 +300,73 @@ export async function handleSuspension({ (item) => item.abortRequested && !item.disposed ); - if (hooksNeedingAbort.length > 0) { - await Promise.all( - hooksNeedingAbort.map(async (queueItem) => { - try { - // Dehydrate the abort payload for storage - const abortPayload = await dehydrateStepArguments( - { aborted: true, reason: queueItem.abortReason }, - runId, - encryptionKey, - suspension.globalThis - ); + for (const queueItem of hooksNeedingAbort) { + try { + // Dehydrate the abort payload for storage + const abortPayload = await dehydrateStepArguments( + { aborted: true, reason: queueItem.abortReason }, + runId, + encryptionKey, + suspension.globalThis + ); - // Create hook_received event with abort payload - await world.events.create(runId, { - eventType: 'hook_received' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - token: queueItem.token, - payload: abortPayload, - }, - }); + // Create hook_received event with abort payload. This event is + // deliberately unfenced because it represents signal delivery, but + // successful writes still advance the server's run.lastKnownEventId. + const abortEventResult = await world.events.create(runId, { + eventType: 'hook_received' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + token: queueItem.token, + payload: abortPayload, + }, + }); + if (abortEventResult.event?.eventId) { + fenceEventId = abortEventResult.event.eventId; + } - // Write stream cancellation packet for real-time step propagation. - // Reuse the same dehydrated payload as the hook event so the reason - // round-trips through `dehydrateStepArguments` / `hydrateStepArguments` - // (handles DOMException, custom errors, encryption, etc.) instead of - // bare JSON.stringify which loses type information and drops undefined. - // streamName is set on the queue item at controller construction time - // (see workflow/abort-controller.ts). - try { - const streamName = getAbortStreamIdFromToken(queueItem.token); - await world.streams.write( - runId, - streamName, - abortPayload as Uint8Array - ); - await world.streams.close(runId, streamName); - } catch { - // Best-effort stream write — hook event provides the durable fallback - runtimeLogger.debug( - 'Failed to write abort stream packet, hook event will provide fallback', - { - workflowRunId: runId, - correlationId: queueItem.correlationId, - } - ); - } - } catch (err) { - if (EntityConflictError.is(err) || RunExpiredError.is(err)) { - runtimeLogger.info( - 'Workflow run already completed, skipping abort', - { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - } - ); - } else { - throw err; + // Write stream cancellation packet for real-time step propagation. + // Reuse the same dehydrated payload as the hook event so the reason + // round-trips through `dehydrateStepArguments` / `hydrateStepArguments` + // (handles DOMException, custom errors, encryption, etc.) instead of + // bare JSON.stringify which loses type information and drops undefined. + // streamName is set on the queue item at controller construction time + // (see workflow/abort-controller.ts). + try { + const streamName = getAbortStreamIdFromToken(queueItem.token); + await world.streams.write( + runId, + streamName, + abortPayload as Uint8Array + ); + await world.streams.close(runId, streamName); + } catch { + // Best-effort stream write — hook event provides the durable fallback + runtimeLogger.debug( + 'Failed to write abort stream packet, hook event will provide fallback', + { + workflowRunId: runId, + correlationId: queueItem.correlationId, } - } - }) - ); + ); + } + } catch (err) { + if (EntityConflictError.is(err) || RunExpiredError.is(err)) { + runtimeLogger.info('Workflow run already completed, skipping abort', { + workflowRunId: runId, + correlationId: queueItem.correlationId, + message: err.message, + }); + } else { + throw err; + } + } } // Create step events for steps that don't have them yet. // Unlike V1, we do NOT queue step messages from here — the caller // decides which steps to execute inline vs. queue to background. - // Wait events are also created in parallel below. const stepsNeedingCreation = new Set( stepItems .filter((queueItem) => !queueItem.hasCreatedEvent) @@ -291,90 +379,134 @@ export async function handleSuspension({ // racing with concurrent handlers on step execution. const createdStepCorrelationIds = new Set(); - const ops: Promise[] = []; + const ops: Array<() => Promise> = []; - // Steps: create step_created events (no queuing — V2 returns pending steps to caller) + // Steps: create step_created events (no queuing — V2 returns pending + // steps to caller). + // + // Fenced: under concurrent replay, two invocations with diverging + // event-log snapshots can pick different branch decisions for the same + // deterministic correlationId (e.g. a hook/sleep race that one replay + // saw resolve to "wake" and another to "sleep"). The fence rejects the + // stale-view writer so only the authoritative replay's step_created + // lands; the other invocation reloads and retries against the new tail. for (const queueItem of stepItems) { if (stepsNeedingCreation.has(queueItem.correlationId)) { - ops.push( - (async () => { - const dehydratedInput = await dehydrateStepArguments( + ops.push(async () => { + const dehydratedInput = await dehydrateStepArguments( + { + args: queueItem.args, + closureVars: queueItem.closureVars, + thisVal: queueItem.thisVal, + }, + runId, + encryptionKey, + suspension.globalThis + ); + const stepEvent: CreateEventRequest = { + eventType: 'step_created' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + stepName: queueItem.stepName, + input: dehydratedInput as SerializedData, + }, + }; + const writeResult = await fencedEventCreate({ + world, + runId, + event: stepEvent, + requestId, + fenceEventId, + onConflictRefresh: async () => { + const { fenceEventId: fresh, loadedEvents } = await refreshFence(); + // Idempotency: if step_created for this correlationId is + // already in the log, abort. Distinct from the + // EntityConflictError-on-duplicate-stepId case (that's + // handled by onEntityConflict below) — this branch + // catches the case where _our_ stale-snapshot CAS lost + // to a concurrent writer for the same correlationId. + const alreadyCreated = loadedEvents.some( + (e) => + e.eventType === 'step_created' && + e.correlationId === queueItem.correlationId + ); + if (alreadyCreated) { + return { kind: 'abort' }; + } + return { kind: 'retry', fenceEventId: fresh }; + }, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + if (writeResult.written) { + createdStepCorrelationIds.add(queueItem.correlationId); + } else { + runtimeLogger.info( + 'Step already exists (post-fence-conflict), continuing', { - args: queueItem.args, - closureVars: queueItem.closureVars, - thisVal: queueItem.thisVal, - }, - runId, - encryptionKey, - suspension.globalThis - ); - const stepEvent: CreateEventRequest = { - eventType: 'step_created' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - stepName: queueItem.stepName, - input: dehydratedInput as SerializedData, - }, - }; - try { - await world.events.create(runId, stepEvent, { requestId }); - createdStepCorrelationIds.add(queueItem.correlationId); - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info('Step already exists, continuing', { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - }); - } else { - throw err; + workflowRunId: runId, + correlationId: queueItem.correlationId, } - } - })() - ); + ); + } + }); } } - // Create wait events (same as V1) + // Create wait events. Same fencing rationale as `step_created`: a + // stale-view replay can otherwise call `sleep(...)` on a code path + // that the authoritative replay doesn't take, landing a `wait_created` + // that future replays will see as an orphan. for (const queueItem of waitItems) { if (!queueItem.hasCreatedEvent) { - ops.push( - (async () => { - const waitEvent: CreateEventRequest = { - eventType: 'wait_created' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - resumeAt: queueItem.resumeAt, - }, - }; - try { - await world.events.create(runId, waitEvent, { requestId }); - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info('Wait already exists, continuing', { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - }); - } else { - throw err; + ops.push(async () => { + const waitEvent: CreateEventRequest = { + eventType: 'wait_created' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + resumeAt: queueItem.resumeAt, + }, + }; + const writeResult = await fencedEventCreate({ + world, + runId, + event: waitEvent, + requestId, + fenceEventId, + onConflictRefresh: async () => { + const { fenceEventId: fresh, loadedEvents } = await refreshFence(); + const alreadyCreated = loadedEvents.some( + (e) => + e.eventType === 'wait_created' && + e.correlationId === queueItem.correlationId + ); + if (alreadyCreated) { + return { kind: 'abort' }; } - } - })() - ); + return { kind: 'retry', fenceEventId: fresh }; + }, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + if (!writeResult.written) { + runtimeLogger.info('Wait already exists, continuing', { + workflowRunId: runId, + correlationId: queueItem.correlationId, + }); + } + }); } } - waitUntil( - Promise.all(ops).catch((opErr) => { - const isAbortError = - opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted'; - if (!isAbortError) throw opErr; - }) - ); - await Promise.all(ops); + for (const op of ops) { + await op(); + } // Calculate minimum timeout from waits const now = Date.now(); diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index c9e1c6c5d1..8d02fd89a3 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -50,6 +50,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { stepName, args, }); + ctx.eventsConsumer.subscribe((event) => { if (!event) { // We've reached the end of the events, so this step has either not been run or is currently running. diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index 1cbc7bd39d..cb6d699e43 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -44,6 +44,7 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { let conflictErrorRef: HookConflictError | null = null; webhookLogger.debug('Hook consumer setup', { correlationId, token }); + ctx.eventsConsumer.subscribe((event) => { // If there are no events and there are promises waiting, // it means the hook has been awaited, but an incoming payload has not yet been received. diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index 3874f63648..cd0a6e8da2 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -169,6 +169,37 @@ describe('queue timeout re-enqueue', () => { expect(mockSetTimeout).not.toHaveBeenCalled(); }); + it('dedupes idempotency keys after the handler completes', async () => { + let callCount = 0; + const handler = localQueue.createQueueHandler('__wkf_step_', async () => { + callCount++; + return undefined; + }); + + localQueue.registerHandler('__wkf_step_', handler); + + const first = await localQueue.queue( + '__wkf_step_test' as any, + stepPayload, + { idempotencyKey: stepPayload.stepId } + ); + + await vi.waitFor(() => { + expect(callCount).toBe(1); + }); + + const second = await localQueue.queue( + '__wkf_step_test' as any, + stepPayload, + { idempotencyKey: stepPayload.stepId } + ); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(second.messageId).toBe(first.messageId); + expect(callCount).toBe(1); + }); + it('logs actionable guidance for detached ArrayBuffer proxy failures', async () => { const consoleError = vi .spyOn(console, 'error') diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 7ac0277dad..96dcf64b8e 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -54,6 +54,8 @@ const WORKFLOW_LOCAL_QUEUE_CONCURRENCY = parseInt(process.env.WORKFLOW_LOCAL_QUEUE_CONCURRENCY ?? '0', 10) || DEFAULT_CONCURRENCY_LIMIT; +const COMPLETED_IDEMPOTENCY_CACHE_LIMIT = 10_000; + export type DirectHandler = (req: Request) => Promise; export type LocalQueue = Queue & { @@ -123,9 +125,21 @@ export function createQueue(config: Partial): LocalQueue { * that we don't queue the same message multiple times */ const inflightMessages = new Map(); + const completedMessages = new Map(); /** Direct in-process handlers by queue prefix, bypassing HTTP when set. */ const directHandlers = new Map(); + function markMessageCompleted(idempotencyKey: string, messageId: MessageId) { + completedMessages.delete(idempotencyKey); + completedMessages.set(idempotencyKey, messageId); + if (completedMessages.size > COMPLETED_IDEMPOTENCY_CACHE_LIMIT) { + const oldestKey = completedMessages.keys().next().value; + if (oldestKey) { + completedMessages.delete(oldestKey); + } + } + } + const queue: Queue['queue'] = async (queueName, message, opts) => { const cleanup = [] as (() => void)[]; @@ -134,6 +148,11 @@ export function createQueue(config: Partial): LocalQueue { if (existing) { return { messageId: existing }; } + + const completed = completedMessages.get(opts.idempotencyKey); + if (completed) { + return { messageId: completed }; + } } const body = transport.serialize(message); @@ -224,6 +243,9 @@ export function createQueue(config: Partial): LocalQueue { continue; } } catch {} + if (opts?.idempotencyKey) { + markMessageCompleted(opts.idempotencyKey, messageId); + } return; } diff --git a/packages/world-testing/src/inline-batches-debug.mts b/packages/world-testing/src/inline-batches-debug.mts index bf7a356cb3..32c72440c9 100644 --- a/packages/world-testing/src/inline-batches-debug.mts +++ b/packages/world-testing/src/inline-batches-debug.mts @@ -1,5 +1,5 @@ -import { expect, test, vi } from 'vitest'; import { hydrateWorkflowReturnValue } from '@workflow/core/serialization'; +import { expect, test, vi } from 'vitest'; import { createFetcher, startServer } from './util.mjs'; /** @@ -258,6 +258,9 @@ export function inlineBatchesDebug(world: string) { // Sanity: the run did complete. Everything else is diagnostic. expect(run.status).toBe('completed'); + expect(stepsSkippedAlreadyDone).toBe(0); + expect(stepCompletedRaces).toBe(0); + expect(unconsumedEventSkips).toBe(0); // Hard assertion: no step should have hit max retries for this workflow. expect(maxDeliveries).toBe(0); } diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 8acefbaa69..eb1f1994b4 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -383,6 +383,20 @@ export async function makeRequest({ if (response.status === 409) { throwWithTrace(new EntityConflictError(defaultMessage)); } + if (response.status === 412) { + // OCC fence conflict. Surface as EntityConflictError so the + // existing conflict branching downstream applies, and ensure the + // message carries a "fence conflict" marker even when the + // response body didn't parse — the fence-retry loop in + // runtime/fenced-write.ts dispatches off that marker, and + // upstream error pages (CDN HTML, gateway timeouts, etc.) + // shouldn't be allowed to demote a fence conflict to a generic + // EntityConflictError that callers treat as terminal. + const fenceMessage = /fence conflict/i.test(defaultMessage) + ? defaultMessage + : `fence conflict: ${defaultMessage}`; + throwWithTrace(new EntityConflictError(fenceMessage)); + } if (response.status === 410) { throwWithTrace(new RunExpiredError(defaultMessage)); } From e5cd6869805f6d9e87d5b4df964f534a0ba02e9f Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 28 May 2026 12:28:35 -0700 Subject: [PATCH 08/10] core: address review feedback on fence-conflict handling - Log each fence-conflict retry at info level (datadog visibility for the retry path between the first conflict and the give-up warning). - Keep prior fence when a create response is missing `event`; emit a warn instead of silently advancing to a value we didn't observe on the wire. The schema marks `event` as optional for legacy compat; in practice creates always return it, but the type permits drift. - Replace `events.some(\u2026)` inside the reload-merge loop with a Set-based dedup so the retry path is O(n + m) instead of O(n \u00d7 m). - Drop `asOfTimestamp` from CreateEventParams. The original motivation was `resumeHook`-style writes; the runtime keeps `hook_received` unfenced (fencing the user signal would drop it on contention) so nothing in this PR exercises the param. Reintroduce when a real caller appears. Addresses inline review comments on #2113. --- .changeset/event-write-occ-fence.md | 2 +- packages/core/src/runtime.ts | 58 +++++++++++++++++++---- packages/core/src/runtime/fenced-write.ts | 27 +++++++++++ packages/world-vercel/src/events.ts | 6 --- packages/world/src/events.ts | 8 ---- 5 files changed, 76 insertions(+), 25 deletions(-) diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md index df5ca57ec9..09e21caeb7 100644 --- a/.changeset/event-write-occ-fence.md +++ b/.changeset/event-write-occ-fence.md @@ -4,4 +4,4 @@ "@workflow/world-vercel": patch --- -Add optional `lastKnownEventId` and `asOfTimestamp` params to `events.create`, which the World can use to do optimisti concurrency control fencing. Conflict surfaces as existing `EntityConflictError`, which the runtime already reloads-and-continues on. +Add optional `lastKnownEventId` param to `events.create`, which the World can use to do optimistic concurrency control fencing on branch-decision event writes. Conflict surfaces as existing `EntityConflictError`, which the runtime retries in place against a freshly-loaded fence. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 4fe5bf85e6..8d2e21d9d1 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -823,25 +823,47 @@ export function workflowEntrypoint( : {}), } ); + // The server response schema marks `event` as + // optional for legacy compatibility. In practice + // creates always return the persisted event, but + // if the response is missing it we leave the + // fence at its prior value (a stale fence will + // simply force one more fence-and-reload cycle + // on the next iteration; we never silently + // advance to a value we didn't actually observe + // on the wire). if (result.event) { fenceEventId = result.event.eventId; + } else { + runtimeLogger.warn( + 'wait_completed write missing event in response; keeping prior fence', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + fenceEventId, + } + ); } written = true; } catch (err) { if (!EntityConflictError.is(err)) { throw err; } - // Fence conflicts surface a specific error - // message from workflow-server. - // Most 409s will simply exit since we assume a separate - // invocation is active. This should hold true for fence conflicts - // too, but to guarantee correctness, will be re-tried here directly. - // TODO: We can remove the retry here after extensive validation. - // The cost is low in the meantime. + // Fence-conflict detection is anchored on + // packages/world-vercel/src/utils.ts mapping + // HTTP 412 to EntityConflictError with a + // guaranteed `fence conflict:` prefix on the + // message. The status-based marker is enforced + // client-side, so this regex can't silently + // regress against a server wording change. const isFenceConflict = /fence conflict/i.test( err.message ); if (!isFenceConflict) { + // Non-fence 409s mean the wait was already + // completed by a concurrent invocation (or + // never existed). Both cases are terminal for + // this wait — log and move on. runtimeLogger.info( 'Wait already completed, skipping', { @@ -852,6 +874,15 @@ export function workflowEntrypoint( break; } attempts += 1; + runtimeLogger.info( + 'wait_completed fence conflict; reloading and retrying', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + attempt: attempts, + maxAttempts: MAX_FENCE_RETRIES, + } + ); if (attempts > MAX_FENCE_RETRIES) { runtimeLogger.warn( 'Wait completion gave up after fence retries; falling back to queue redelivery', @@ -867,11 +898,18 @@ export function workflowEntrypoint( ? await loadWorkflowRunEvents(runId, eventsCursor) : await loadWorkflowRunEvents(runId); if (eventsCursor) { + // Build a Set of existing eventIds once per + // reload so the merge is O(n + m) rather + // than O(n × m). Matters on the retry path, + // which is exactly where the log will be + // longest. + const existingIds = new Set( + events.map((e) => e.eventId) + ); for (const e of loaded.events) { - if ( - !events.some((x) => x.eventId === e.eventId) - ) { + if (!existingIds.has(e.eventId)) { events.push(e); + existingIds.add(e.eventId); } } eventsCursor = loaded.cursor ?? eventsCursor; diff --git a/packages/core/src/runtime/fenced-write.ts b/packages/core/src/runtime/fenced-write.ts index 91d1a308c1..c51ae41a44 100644 --- a/packages/core/src/runtime/fenced-write.ts +++ b/packages/core/src/runtime/fenced-write.ts @@ -135,6 +135,23 @@ export async function fencedEventCreate( requestId, ...(fenceEventId ? { lastKnownEventId: fenceEventId } : {}), }); + // The server response schema marks `event` as optional for legacy + // compatibility. In practice creates always return the persisted + // event, but if it's missing we leave the caller's fence at its + // prior value rather than silently advancing to a value we didn't + // observe on the wire. A stale fence on the next call will simply + // surface one extra fence-and-reload cycle. + if (!result.event) { + runtimeLogger.warn( + 'Branch-decision write missing event in response; keeping prior fence', + { + workflowRunId: runId, + eventType: event.eventType, + correlationId: event.correlationId, + fenceEventId, + } + ); + } return { written: true, newFenceEventId: result.event?.eventId ?? fenceEventId, @@ -148,6 +165,16 @@ export async function fencedEventCreate( } catch (err) { if (isFenceConflict(err)) { attempt += 1; + runtimeLogger.info( + 'Branch-decision write fence conflict; reloading and retrying', + { + workflowRunId: runId, + eventType: event.eventType, + correlationId: event.correlationId, + attempt, + maxAttempts: MAX_FENCE_RETRIES, + } + ); if (attempt > MAX_FENCE_RETRIES) { runtimeLogger.warn( 'Branch-decision write gave up after fence retries', diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 48a417f7e1..b76e7e2271 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -460,9 +460,6 @@ async function createWorkflowRunEventInner( ...(params?.lastKnownEventId ? { lastKnownEventId: params.lastKnownEventId } : {}), - ...(params?.asOfTimestamp !== undefined - ? { asOfTimestamp: params.asOfTimestamp } - : {}), }, config, schema: EventResultResolveWireSchema, @@ -491,9 +488,6 @@ async function createWorkflowRunEventInner( ...(params?.lastKnownEventId ? { lastKnownEventId: params.lastKnownEventId } : {}), - ...(params?.asOfTimestamp !== undefined - ? { asOfTimestamp: params.asOfTimestamp } - : {}), }, config, schema: EventResultLazyWireSchema, diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index 0d385175c7..dab8ba89d7 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -398,14 +398,6 @@ export interface CreateEventParams { * event log snapshot from advancing the log. */ lastKnownEventId?: string; - /** - * Optimistic concurrency control fence (alternative form), see above for reasoning. - * This is a unix-ms cutoff. Server resolves to the highest eventId strictly before this - * timestamp and uses that as the expected fence. Lets `resumeHook` fence - * `hook_received` after anything the caller could have observed without paying - * for a separate read. Ignored when `lastKnownEventId` is also set. - */ - asOfTimestamp?: number; } /** From 43dd837ae25ee9a06b3ed0584e1a9f9925a75c35 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 28 May 2026 15:17:37 -0700 Subject: [PATCH 09/10] core: classify EntityConflictError + friends as RUNTIME_ERROR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The four `WorkflowWorldError` subclasses surfaced by the runtime's own calls into the world layer represent infrastructure-level conditions, not user-code failures: - `EntityConflictError`: CAS rejection on event writes (409 / 412), including OCC fence conflicts that exhausted the in-place retry budget in `fenced-write.ts`. - `RunExpiredError`: 410 — run was cleaned up or already terminal. - `TooEarlyError`: 425 — retry-after timestamp not yet reached. - `ThrottleError`: 429 — rate limited by the workflow backend. When any of these reach `classifyRunError` the runtime's own retry logic has already exhausted (otherwise the error would have been swallowed upstream). The truthful classification is `RUNTIME_ERROR`, not `USER_ERROR`. Same shape as the existing entries (`WorkflowRuntimeError`, `WorkflowNotRegisteredError`, `StepNotRegisteredError`). The bare `WorkflowWorldError` parent stays out of the runtime list: it can also surface from user-code `fetch` calls into the workflow API, where `USER_ERROR` is the correct attribution (see the existing "WorkflowWorldError with status 500" test). Caught during the stress validation of #2113 + workflow-server#447 end-to-end: fence-conflict retries exhausting under the 180-way hook race were surfacing as `USER_ERROR`, masking an obvious infra condition as user code. Tests added for each of the four subclasses. --- packages/core/src/classify-error.test.ts | 32 ++++++++++++++++++++++++ packages/core/src/classify-error.ts | 21 ++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/packages/core/src/classify-error.test.ts b/packages/core/src/classify-error.test.ts index 89bc6acb83..4b0cd573bb 100644 --- a/packages/core/src/classify-error.test.ts +++ b/packages/core/src/classify-error.test.ts @@ -1,7 +1,11 @@ import { CorruptedEventLogError, + EntityConflictError, HookConflictError, RUN_ERROR_CODES, + RunExpiredError, + ThrottleError, + TooEarlyError, WorkflowNotRegisteredError, WorkflowRuntimeError, WorkflowWorldError, @@ -87,4 +91,32 @@ describe('classifyRunError', () => { RUN_ERROR_CODES.USER_ERROR ); }); + + it('classifies EntityConflictError as RUNTIME_ERROR (fence-conflict / 409 from runtime CAS write)', () => { + expect( + classifyRunError( + new EntityConflictError( + 'fence conflict: Stale request: event-log fence conflict' + ) + ) + ).toBe(RUN_ERROR_CODES.RUNTIME_ERROR); + }); + + it('classifies RunExpiredError as RUNTIME_ERROR (410 on runtime operation)', () => { + expect(classifyRunError(new RunExpiredError('run gone'))).toBe( + RUN_ERROR_CODES.RUNTIME_ERROR + ); + }); + + it('classifies TooEarlyError as RUNTIME_ERROR (425 retry-after-not-reached)', () => { + expect( + classifyRunError(new TooEarlyError('too early', { retryAfter: 5 })) + ).toBe(RUN_ERROR_CODES.RUNTIME_ERROR); + }); + + it('classifies ThrottleError as RUNTIME_ERROR (429 rate limited)', () => { + expect( + classifyRunError(new ThrottleError('throttled', { retryAfter: 1 })) + ).toBe(RUN_ERROR_CODES.RUNTIME_ERROR); + }); }); diff --git a/packages/core/src/classify-error.ts b/packages/core/src/classify-error.ts index 6d1da5031a..8c08a798f4 100644 --- a/packages/core/src/classify-error.ts +++ b/packages/core/src/classify-error.ts @@ -1,8 +1,12 @@ import { CorruptedEventLogError, + EntityConflictError, RUN_ERROR_CODES, type RunErrorCode, + RunExpiredError, StepNotRegisteredError, + ThrottleError, + TooEarlyError, WorkflowNotRegisteredError, WorkflowRuntimeError, WorkflowWorldError, @@ -20,11 +24,28 @@ const WORLD_CONTRACT_ERROR_CODES = new Set([ * not enough — we have to enumerate every concrete subclass we want to * recognize. Keep in sync with the `WorkflowRuntimeError` class hierarchy * in `@workflow/errors`. + * + * The `WorkflowWorldError` subclasses below (`EntityConflictError`, + * `RunExpiredError`, `TooEarlyError`, `ThrottleError`) are infrastructure- + * level conditions surfaced by the runtime's own calls into the world + * (CAS rejections, run cleanup, retry-after, rate limits). The runtime + * normally retries past them; if they reach `classifyRunError`, the + * runtime's retry budget exhausted — that's a transient infra failure, + * not user code, so `RUNTIME_ERROR` is the truthful classification. + * + * The bare `WorkflowWorldError` parent is *not* in this list: it can + * also surface from user-code `fetch` calls into the workflow API, which + * should remain `USER_ERROR` (see the test case on line ~43 of + * `classify-error.test.ts`). */ const RUNTIME_ERROR_CHECKS = [ WorkflowRuntimeError.is, StepNotRegisteredError.is, WorkflowNotRegisteredError.is, + EntityConflictError.is, + RunExpiredError.is, + TooEarlyError.is, + ThrottleError.is, ]; /** From 98c97417fde0df67e91a3f0aef36e7b42bf104f1 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 28 May 2026 16:05:30 -0700 Subject: [PATCH 10/10] core: bail on fence conflict instead of retrying in place MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a fenced event write rejects with EventLogFenceConflictError, the SDK previously retried up to MAX_FENCE_RETRIES = 5 times against a freshly-loaded tail, with linear backoff. Under stress this behaved poorly in two ways: 1. The retry loop spins against an ever-changing tail. Under high contention (e.g. a hook flood triggering many concurrent ticks), exhausting the budget throws an EntityConflictError, which surfaces as run_failed — a transient infra condition mis-classified as a terminal failure. 2. The retries amplified the server-side stuck-fence pattern (run.lastKnownEventId advancing past a non-existent eventId due to the patch-then-PUT non-atomicity documented in c06d6ce of workflow-server). Every retry that hit the same stale fence wasted compute and prolonged the affected window. Switch to bail-on-conflict: on fence conflict, fencedEventCreate returns {written: false} immediately. No retry, no throw, no re-enqueue. A fence conflict means another invocation has the canonical view of the event log — the canonical invocation is responsible for whatever progress the workflow needs, and the losing invocation just exits cleanly. This matches the existing workflow-server comment ('the @workflow/core suspension handler swallows it') and the original design intent. Net change: ~250 lines removed from runtime.ts + fenced-write.ts + suspension-handler.ts. The custom retry loop in the wait_completed elapsed-wait scan is also folded into fencedEventCreate. Behavioral effects: - USER_ERROR / RUNTIME_ERROR run failures from exhausted fence retries are eliminated. Fence conflicts no longer mark runs as failed. - Higher hook-payload throughput under stress: the 180-way race is no longer amplified by 5x retries per losing lambda. - The server-side stuck-fence window (patch-then-PUT non-atomicity) is unchanged — that needs to be addressed in workflow-server, not here. But the SDK no longer makes it worse by spinning. Tests: 1018 core tests pass. The previously-tested 'fence-conflict retries and reloads' behavior is removed; the replacement behavior ('fence-conflict returns {written:false} once') is exercised implicitly via the suspension-handler integration tests. --- .changeset/event-write-occ-fence.md | 2 +- packages/core/src/runtime.ts | 205 ++++-------------- packages/core/src/runtime/fenced-write.ts | 203 ++++++++--------- .../core/src/runtime/suspension-handler.ts | 91 -------- 4 files changed, 127 insertions(+), 374 deletions(-) diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md index 09e21caeb7..ecc03d2daa 100644 --- a/.changeset/event-write-occ-fence.md +++ b/.changeset/event-write-occ-fence.md @@ -4,4 +4,4 @@ "@workflow/world-vercel": patch --- -Add optional `lastKnownEventId` param to `events.create`, which the World can use to do optimistic concurrency control fencing on branch-decision event writes. Conflict surfaces as existing `EntityConflictError`, which the runtime retries in place against a freshly-loaded fence. +Add optional `lastKnownEventId` param to `events.create`, which the World can use to do optimistic concurrency control fencing on branch-decision event writes. Fence conflict surfaces as `EntityConflictError`, which the runtime treats as a signal that another invocation has the canonical view of the event log: the current write is dropped (no retry, no `run_failed`) and the canonical invocation is left to make progress. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 8d2e21d9d1..a389f50a63 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -800,136 +800,42 @@ export function workflowEntrypoint( // The last known event ID is used for optimistic concurrency // control, being provided to the World, which will reject if any // other events arrive between our event log read and event write. - // On fail, we retry *in-place* with a freshly-loaded fence rather than - // terminating the invocation. Falling back to queue redelivery directly - // could cause retry-storms under high load. + // On fence conflict, bail the write — another invocation has + // the canonical view of the log; trying to retry in place + // here only spins against a moving target and (under stress) + // amplifies the stuck-fence pattern caused by the server-side + // patch-then-PUT non-atomicity. See `runtime/fenced-write.ts` + // for the full reasoning. let fenceEventId: string | undefined = events.length > 0 ? events[events.length - 1].eventId : undefined; - const MAX_FENCE_RETRIES = 5; for (const waitEvent of waitsToComplete) { - let attempts = 0; - let written = false; - while (!written) { - try { - const result = await world.events.create( - runId, - waitEvent, - { - requestId, - ...(fenceEventId - ? { lastKnownEventId: fenceEventId } - : {}), - } - ); - // The server response schema marks `event` as - // optional for legacy compatibility. In practice - // creates always return the persisted event, but - // if the response is missing it we leave the - // fence at its prior value (a stale fence will - // simply force one more fence-and-reload cycle - // on the next iteration; we never silently - // advance to a value we didn't actually observe - // on the wire). - if (result.event) { - fenceEventId = result.event.eventId; - } else { - runtimeLogger.warn( - 'wait_completed write missing event in response; keeping prior fence', - { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - fenceEventId, - } - ); - } - written = true; - } catch (err) { - if (!EntityConflictError.is(err)) { - throw err; - } - // Fence-conflict detection is anchored on - // packages/world-vercel/src/utils.ts mapping - // HTTP 412 to EntityConflictError with a - // guaranteed `fence conflict:` prefix on the - // message. The status-based marker is enforced - // client-side, so this regex can't silently - // regress against a server wording change. - const isFenceConflict = /fence conflict/i.test( - err.message - ); - if (!isFenceConflict) { - // Non-fence 409s mean the wait was already - // completed by a concurrent invocation (or - // never existed). Both cases are terminal for - // this wait — log and move on. - runtimeLogger.info( - 'Wait already completed, skipping', - { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - } - ); - break; - } - attempts += 1; - runtimeLogger.info( - 'wait_completed fence conflict; reloading and retrying', - { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - attempt: attempts, - maxAttempts: MAX_FENCE_RETRIES, - } - ); - if (attempts > MAX_FENCE_RETRIES) { - runtimeLogger.warn( - 'Wait completion gave up after fence retries; falling back to queue redelivery', - { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - attempts, - } - ); - throw err; - } - const loaded = eventsCursor - ? await loadWorkflowRunEvents(runId, eventsCursor) - : await loadWorkflowRunEvents(runId); - if (eventsCursor) { - // Build a Set of existing eventIds once per - // reload so the merge is O(n + m) rather - // than O(n × m). Matters on the retry path, - // which is exactly where the log will be - // longest. - const existingIds = new Set( - events.map((e) => e.eventId) - ); - for (const e of loaded.events) { - if (!existingIds.has(e.eventId)) { - events.push(e); - existingIds.add(e.eventId); - } - } - eventsCursor = loaded.cursor ?? eventsCursor; - } else { - events = loaded.events; - eventsCursor = loaded.cursor; - } - const alreadyCompleted = events.some( - (e) => - e.eventType === 'wait_completed' && - e.correlationId === waitEvent.correlationId - ); - if (alreadyCompleted) { - break; + const writeResult = await fencedEventCreate({ + world, + runId, + event: waitEvent, + requestId, + fenceEventId, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + if (!writeResult.written) { + // Either fence conflict (canonical replay + // elsewhere) or the wait was already completed + // by a concurrent invocation. Both terminal + // for this invocation's attempt; continue with + // remaining waitsToComplete using the latest + // fence we have observed. + runtimeLogger.info( + 'Wait completion skipped (fence conflict or already completed)', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, } - fenceEventId = events[events.length - 1]?.eventId; - await new Promise((r) => - setTimeout(r, 25 * attempts) - ); - } + ); } } @@ -1076,32 +982,16 @@ export function workflowEntrypoint( }, requestId, fenceEventId: runCompletedFence, - onConflictRefresh: async () => { - // Terminal-state idempotency: if any - // terminal run event landed since our load, - // abort. Don't retry — once a run is in a - // terminal state, our write is wrong. - const loaded = eventsCursor - ? await loadWorkflowRunEvents(runId, eventsCursor) - : await loadWorkflowRunEvents(runId); - eventsCursor = loaded.cursor ?? eventsCursor; - const reachedTerminal = loaded.events.some( - (e) => - e.eventType === 'run_completed' || - e.eventType === 'run_failed' || - e.eventType === 'run_cancelled' - ); - if (reachedTerminal) return { kind: 'abort' }; - const fresh = - loaded.events[loaded.events.length - 1] - ?.eventId ?? runCompletedFence; - return { kind: 'retry', fenceEventId: fresh }; - }, onEntityConflict: () => 'abort', }); if (!writeResult.written) { + // `written: false` covers both fence conflict + // (another invocation has the canonical view) + // and entity-conflict abort (run already + // terminal). Either way the run is done from + // this invocation's perspective. runtimeLogger.info( - 'Tried completing workflow run, but run has already finished.', + 'Tried completing workflow run, but run has already finished or another invocation is canonical.', { workflowRunId: runId } ); return; @@ -1165,7 +1055,6 @@ export function workflowEntrypoint( span, requestId, fenceEventId: suspensionFenceEventId, - eventsCursor, }); runtimeLogger.debug('Suspension handled', { workflowRunId: runId, @@ -1564,31 +1453,11 @@ export function workflowEntrypoint( }, requestId, fenceEventId: runFailedFence, - onConflictRefresh: async () => { - const loaded = eventsCursor - ? await loadWorkflowRunEvents( - runId, - eventsCursor - ) - : await loadWorkflowRunEvents(runId); - eventsCursor = loaded.cursor ?? eventsCursor; - const reachedTerminal = loaded.events.some( - (e) => - e.eventType === 'run_completed' || - e.eventType === 'run_failed' || - e.eventType === 'run_cancelled' - ); - if (reachedTerminal) return { kind: 'abort' }; - const fresh = - loaded.events[loaded.events.length - 1] - ?.eventId ?? runFailedFence; - return { kind: 'retry', fenceEventId: fresh }; - }, onEntityConflict: () => 'abort', }); if (!writeResult.written) { runtimeLogger.info( - 'Tried failing workflow run, but run has already finished.', + 'Tried failing workflow run, but run has already finished or another invocation is canonical.', { workflowRunId: runId } ); return; diff --git a/packages/core/src/runtime/fenced-write.ts b/packages/core/src/runtime/fenced-write.ts index c51ae41a44..ddd7b0eb1c 100644 --- a/packages/core/src/runtime/fenced-write.ts +++ b/packages/core/src/runtime/fenced-write.ts @@ -1,9 +1,8 @@ /** * Helper for "branch-decision" event writes that need OCC fencing. * - * Applies the same fence-and-retry pattern the elapsed-wait scan uses for - * `wait_completed` to every other write whose outcome depends on a branch - * decision the workflow VM made from its loaded event log: + * Used by the writes whose outcome depends on a branch decision the + * workflow VM made from its loaded event log: * * suspension-handler.ts: * - step_created @@ -19,18 +18,35 @@ * would drop it on contention; stale-snapshot protection belongs on the * writes that consume hooks, not the writes that deliver them. * - * Each fenced write tries up to MAX_FENCE_RETRIES times. On a fence - * conflict, the caller is expected to refresh `fenceEventId` from a fresh - * event log read. The `onConflictRefresh` callback gives the caller a - * chance to do that, and to decide whether to give up (e.g. when an - * idempotency check confirms the write is no longer needed). + * On a fence conflict, this helper **bails the current write** rather + * than retrying in place. A fence conflict means some other invocation + * has already advanced the event log past our snapshot, so the work + * implied by our snapshot is either already done or will be done by + * whoever advanced the log. Retrying in place caused two problems: + * + * 1. Under high contention (e.g. a hook flood firing many concurrent + * ticks), the retry loop became a stuck-fence spin: lambdas would + * spend their budget retrying against an ever-changing tail and + * either succeed by luck or exhaust MAX_FENCE_RETRIES and throw. + * A thrown EntityConflictError surfaces as `run_failed` (a + * transient infra issue mis-classified as terminal failure). + * + * 2. The retries-against-the-same-fence shape, paired with the + * server-side patch-then-PUT non-atomicity, made stuck-fence runs + * measurably worse: every retry attempt against an already-advanced + * fence is wasted compute, and the spin keeps the run stuck + * longer in the affected-runs-per-stress-cycle window. + * + * The simpler model matches Peter's existing workflow-server comment + * ("the @workflow/core suspension handler swallows it"): a fence + * conflict signals "another replay is canonical; this one isn't" — + * exit cleanly, trust the canonical replay, no error, no retry, no + * re-enqueue. */ import { EntityConflictError } from '@workflow/errors'; import type { CreateEventRequest, World } from '@workflow/world'; import { runtimeLogger } from '../logger.js'; -const MAX_FENCE_RETRIES = 5; - /** * Returns true when an EntityConflictError carries the fence-conflict * shape. Anything else with a 409/410/etc shape is some other kind of @@ -58,29 +74,8 @@ export interface FencedWriteParams { * helper will then issue an unfenced write — which still atomically * advances `run.lastKnownEventId` on the server side so future fenced * writers see the materialized value. - * - * After a successful fenced write, the caller should update its - * tracked fence to the returned `newFenceEventId`. */ fenceEventId: string | undefined; - /** - * Called when the server rejects the write with a fence conflict. - * Implementations should: - * 1. Reload events from the cursor. - * 2. Check whether the write is still necessary (e.g. wait_completed - * idempotency — if the event already exists in the reloaded log, - * return `'abort'` so we don't retry pointlessly). - * 3. Return `{ kind: 'retry', fenceEventId: }` to retry - * against the new tail, or `{ kind: 'abort' }` to give up. - * - * Receives the attempt number (1-indexed) so backoff can be tuned by - * the caller if it wants. - */ - onConflictRefresh: ( - attempt: number - ) => Promise< - { kind: 'retry'; fenceEventId: string | undefined } | { kind: 'abort' } - >; /** * Called when the server rejects with a *non-fence* EntityConflictError * (e.g. the entity already exists because a concurrent handler beat us @@ -92,7 +87,15 @@ export interface FencedWriteParams { } export interface FencedWriteResult { - /** Whether the event was actually written (false = aborted via dedup). */ + /** + * Whether the event was actually written. + * + * `false` covers two cases the caller usually wants to treat the same + * way (no further action required for this write): + * - Fence conflict — another invocation has the canonical view. + * - Non-fence EntityConflictError with `onEntityConflict: 'abort'` + * (entity already exists, run already terminal, etc.). + */ written: boolean; /** * eventId of the newly-written event (when `written` is true), so @@ -109,7 +112,9 @@ export interface FencedWriteResult { /** * Issues `world.events.create(runId, event, { requestId, lastKnownEventId })` - * with up to MAX_FENCE_RETRIES retries on fence conflict. + * once. On fence conflict, returns `{ written: false }` immediately + * (no retry, no throw, no re-enqueue) — see the file-level comment for + * the reasoning. * * On any non-fence EntityConflictError, defers to `onEntityConflict` for * the abort-vs-rethrow decision (preserves the existing @@ -118,91 +123,61 @@ export interface FencedWriteResult { export async function fencedEventCreate( params: FencedWriteParams ): Promise { - const { - world, - runId, - event, - requestId, - onConflictRefresh, - onEntityConflict, - } = params; - let fenceEventId = params.fenceEventId; - let attempt = 0; - // biome-ignore lint/correctness/noConstantCondition: bounded by MAX_FENCE_RETRIES - while (true) { - try { - const result = await world.events.create(runId, event, { - requestId, - ...(fenceEventId ? { lastKnownEventId: fenceEventId } : {}), - }); - // The server response schema marks `event` as optional for legacy - // compatibility. In practice creates always return the persisted - // event, but if it's missing we leave the caller's fence at its - // prior value rather than silently advancing to a value we didn't - // observe on the wire. A stale fence on the next call will simply - // surface one extra fence-and-reload cycle. - if (!result.event) { - runtimeLogger.warn( - 'Branch-decision write missing event in response; keeping prior fence', - { - workflowRunId: runId, - eventType: event.eventType, - correlationId: event.correlationId, - fenceEventId, - } - ); - } - return { - written: true, - newFenceEventId: result.event?.eventId ?? fenceEventId, - event: result.event - ? { - eventType: result.event.eventType, - eventId: result.event.eventId, - } - : undefined, - }; - } catch (err) { - if (isFenceConflict(err)) { - attempt += 1; - runtimeLogger.info( - 'Branch-decision write fence conflict; reloading and retrying', - { - workflowRunId: runId, - eventType: event.eventType, - correlationId: event.correlationId, - attempt, - maxAttempts: MAX_FENCE_RETRIES, - } - ); - if (attempt > MAX_FENCE_RETRIES) { - runtimeLogger.warn( - 'Branch-decision write gave up after fence retries', - { - workflowRunId: runId, - eventType: event.eventType, - correlationId: event.correlationId, - attempts: attempt, - } - ); - throw err; - } - const decision = await onConflictRefresh(attempt); - if (decision.kind === 'abort') { - return { written: false }; + const { world, runId, event, requestId, fenceEventId, onEntityConflict } = + params; + try { + const result = await world.events.create(runId, event, { + requestId, + ...(fenceEventId ? { lastKnownEventId: fenceEventId } : {}), + }); + // The server response schema marks `event` as optional for legacy + // compatibility. In practice creates always return the persisted + // event, but if it's missing we keep the caller's fence at its + // prior value rather than silently advancing to a value we didn't + // observe on the wire. + if (!result.event) { + runtimeLogger.warn( + 'Branch-decision write missing event in response; keeping prior fence', + { + workflowRunId: runId, + eventType: event.eventType, + correlationId: event.correlationId, + fenceEventId, } - fenceEventId = decision.fenceEventId; - await new Promise((r) => setTimeout(r, 25 * attempt)); - continue; - } - if (EntityConflictError.is(err)) { - const decision = onEntityConflict(err); - if (decision === 'abort') { - return { written: false }; + ); + } + return { + written: true, + newFenceEventId: result.event?.eventId ?? fenceEventId, + event: result.event + ? { + eventType: result.event.eventType, + eventId: result.event.eventId, + } + : undefined, + }; + } catch (err) { + if (isFenceConflict(err)) { + // Another invocation has the canonical view of the event log. + // Bail out cleanly: no retry, no throw. The canonical invocation + // is responsible for whatever progress the workflow needs. + runtimeLogger.info( + 'Branch-decision write fence conflict; yielding to canonical replay', + { + workflowRunId: runId, + eventType: event.eventType, + correlationId: event.correlationId, } - throw err; + ); + return { written: false }; + } + if (EntityConflictError.is(err)) { + const decision = onEntityConflict(err); + if (decision === 'abort') { + return { written: false }; } throw err; } + throw err; } } diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index 982de0e743..1be5a8af3a 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -6,7 +6,6 @@ import { } from '@workflow/errors'; import { type CreateEventRequest, - type Event, type SerializedData, SPEC_VERSION_CURRENT, type WorkflowRun, @@ -24,7 +23,6 @@ import { dehydrateStepArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; import { getAbortStreamIdFromToken } from '../util.js'; import { fencedEventCreate } from './fenced-write.js'; -import { loadWorkflowRunEvents } from './helpers.js'; export interface SuspensionHandlerParams { suspension: WorkflowSuspension; @@ -44,13 +42,6 @@ export interface SuspensionHandlerParams { * step/wait/hook `_created` and `hook_disposed`. */ fenceEventId?: string; - /** - * Caller's events-cursor token (from `loadWorkflowRunEvents`). Used to - * refresh the fence after a CAS conflict — the handler pulls fresh - * events from this cursor, takes the new tail as the next fence, and - * retries. - */ - eventsCursor?: string | null; } /** @@ -92,7 +83,6 @@ export async function handleSuspension({ span, requestId, fenceEventId: initialFenceEventId, - eventsCursor: initialEventsCursor, }: SuspensionHandlerParams): Promise { const runId = run.runId; @@ -102,29 +92,7 @@ export async function handleSuspension({ // fence is a single-tip CAS, so parallel sibling writes from one replay turn // would force self-conflicts against the same starting fence. let fenceEventId = initialFenceEventId; - let eventsCursor = initialEventsCursor; - /** - * Reloads events from the cursor and returns the new tail as a fresh - * fence. Also returns the freshly-loaded events so callers can run - * idempotency checks (e.g. "is this `wait_created` already in the log? - * → abort the write"). - * - * NOTE: when `eventsCursor` is unset the reload is a full re-read of - * the run's log, matching the elapsed-wait scan fallback. - */ - async function refreshFence(): Promise<{ - fenceEventId: string | undefined; - loadedEvents: Event[]; - }> { - const loaded = eventsCursor - ? await loadWorkflowRunEvents(runId, eventsCursor) - : await loadWorkflowRunEvents(runId); - eventsCursor = loaded.cursor ?? eventsCursor; - const tail = - loaded.events[loaded.events.length - 1]?.eventId ?? fenceEventId; - return { fenceEventId: tail, loadedEvents: loaded.events }; - } // Separate queue items by type const stepItems = suspension.steps.filter( (item): item is StepInvocationQueueItem => item.type === 'step' @@ -185,22 +153,6 @@ export async function handleSuspension({ event: hookEvent, requestId, fenceEventId, - onConflictRefresh: async () => { - const { fenceEventId: fresh, loadedEvents } = await refreshFence(); - // Idempotency: if the hook was already created (by us in a - // previous attempt that 412'd then succeeded server-side, - // or by a concurrent handler), don't retry. - const alreadyCreated = loadedEvents.some( - (e) => - (e.eventType === 'hook_created' || - e.eventType === 'hook_conflict') && - e.correlationId === hookEvent.correlationId - ); - if (alreadyCreated) { - return { kind: 'abort' }; - } - return { kind: 'retry', fenceEventId: fresh }; - }, onEntityConflict: () => 'abort', }); if (writeResult.newFenceEventId) { @@ -254,19 +206,6 @@ export async function handleSuspension({ event: hookDisposedEvent, requestId, fenceEventId, - onConflictRefresh: async () => { - const { fenceEventId: fresh, loadedEvents } = await refreshFence(); - // Idempotency: if already disposed, abort. - const alreadyDisposed = loadedEvents.some( - (e) => - e.eventType === 'hook_disposed' && - e.correlationId === queueItem.correlationId - ); - if (alreadyDisposed) { - return { kind: 'abort' }; - } - return { kind: 'retry', fenceEventId: fresh }; - }, onEntityConflict: () => 'abort', }); if (writeResult.newFenceEventId) { @@ -418,24 +357,6 @@ export async function handleSuspension({ event: stepEvent, requestId, fenceEventId, - onConflictRefresh: async () => { - const { fenceEventId: fresh, loadedEvents } = await refreshFence(); - // Idempotency: if step_created for this correlationId is - // already in the log, abort. Distinct from the - // EntityConflictError-on-duplicate-stepId case (that's - // handled by onEntityConflict below) — this branch - // catches the case where _our_ stale-snapshot CAS lost - // to a concurrent writer for the same correlationId. - const alreadyCreated = loadedEvents.some( - (e) => - e.eventType === 'step_created' && - e.correlationId === queueItem.correlationId - ); - if (alreadyCreated) { - return { kind: 'abort' }; - } - return { kind: 'retry', fenceEventId: fresh }; - }, onEntityConflict: () => 'abort', }); if (writeResult.newFenceEventId) { @@ -477,18 +398,6 @@ export async function handleSuspension({ event: waitEvent, requestId, fenceEventId, - onConflictRefresh: async () => { - const { fenceEventId: fresh, loadedEvents } = await refreshFence(); - const alreadyCreated = loadedEvents.some( - (e) => - e.eventType === 'wait_created' && - e.correlationId === queueItem.correlationId - ); - if (alreadyCreated) { - return { kind: 'abort' }; - } - return { kind: 'retry', fenceEventId: fresh }; - }, onEntityConflict: () => 'abort', }); if (writeResult.newFenceEventId) {