diff --git a/.changeset/chilly-fences-chain.md b/.changeset/chilly-fences-chain.md new file mode 100644 index 0000000000..7e58facf55 --- /dev/null +++ b/.changeset/chilly-fences-chain.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Chain replay-created events and owner-scope inline-capable step dispatch to avoid stale or duplicate replay writes. diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md new file mode 100644 index 0000000000..ecc03d2daa --- /dev/null +++ b/.changeset/event-write-occ-fence.md @@ -0,0 +1,7 @@ +--- +"@workflow/core": patch +"@workflow/world": patch +"@workflow/world-vercel": patch +--- + +Add optional `lastKnownEventId` param to `events.create`, which the World can use to do optimistic concurrency control fencing on branch-decision event writes. Fence conflict surfaces as `EntityConflictError`, which the runtime treats as a signal that another invocation has the canonical view of the event log: the current write is dropped (no retry, no `run_failed`) and the canonical invocation is left to make progress. diff --git a/.changeset/quick-local-queues.md b/.changeset/quick-local-queues.md new file mode 100644 index 0000000000..b019a5800c --- /dev/null +++ b/.changeset/quick-local-queues.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-local": patch +--- + +Preserve completed local queue idempotency keys to avoid duplicate step dispatches. diff --git a/docs/content/docs/v4/changelog/eager-processing.mdx b/docs/content/docs/v4/changelog/eager-processing.mdx index 9be4e0cb1b..3ab42e0340 100644 --- a/docs/content/docs/v4/changelog/eager-processing.mdx +++ b/docs/content/docs/v4/changelog/eager-processing.mdx @@ -115,9 +115,9 @@ The design enforces a simple invariant: **exactly one handler owns each step, an 1. **Atomic `step_created`** --- the world's `events.create('step_created', correlationId=X)` is serialized per-correlationId. Exactly one concurrent caller succeeds; the rest receive `EntityConflictError`. In production worlds (Postgres, Vercel) this is enforced at the SQL/DB layer. In `world-local`, a per-step in-process async mutex in `packages/world-local/src/storage/events-storage.ts` wraps every step lifecycle event's check-and-write so the same guarantee holds for dev. 2. **Suspension handler reports ownership** --- `handleSuspension()` returns `createdStepCorrelationIds: Set`, populated only for `step_created` writes that actually succeeded (not those that caught 409). 3. **Inline execution is gated on ownership** --- the runtime loop in `packages/core/src/runtime.ts` picks its inline step from `pendingSteps.filter(s => createdStepCorrelationIds.has(s.correlationId))`. A handler that didn't win any `step_created` race performs no inline execution. -4. **Queueing is unconditional** --- for every pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. This matches V1's enqueue pattern and is what makes crash recovery work: if a prior handler wrote `step_created` but crashed before enqueueing, a later handler (from flow-message redelivery or `reenqueueActiveRuns`) will enqueue the orphaned step. Concurrent handlers' redundant enqueues dedupe on the idempotency key. +4. **Queueing is owner-scoped when inline execution is possible** --- for every owned pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. A non-owner does not queue steps it merely observed from a stale replay, so it cannot race the owner's inline execution. When a wait is also pending, no handler executes steps inline, so every pending step is queueable and queue idempotency handles duplicates. If the workflow queue message itself is redelivered, the handler also recovers pre-existing `step_created` / `step_retrying` entries that have not reached `step_started`, covering the crash window after `step_created` but before queue dispatch. -Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** at least one queued dispatch (from whichever handler first reaches the suspension path after the `step_created` is visible). Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work). +Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** every non-inline owned step gets a queued dispatch from the owner. Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work). **Retry semantics are preserved**: the per-step mutex in `world-local` only rejects `step_started` when the step is already in a *terminal* state (`completed` / `failed`). A step that is currently running (status=`running`) still accepts a second `step_started` write with an incremented attempt counter — this is how queue redelivery after a SIGKILL mid-execution legitimately re-runs the step. The previously-documented "attempt counter inflation" failure mode is therefore no longer reachable via the concurrent-inline path; see "Concurrent `step_started` Inflating Attempt Counter" below for the complementary executor-side guard that still catches edge cases (e.g., postgres retries under high contention). diff --git a/docs/content/docs/v5/changelog/eager-processing.mdx b/docs/content/docs/v5/changelog/eager-processing.mdx index 141e656c81..2e80fe7d31 100644 --- a/docs/content/docs/v5/changelog/eager-processing.mdx +++ b/docs/content/docs/v5/changelog/eager-processing.mdx @@ -115,9 +115,9 @@ The design enforces a simple invariant: **exactly one handler owns each step, an 1. **Atomic `step_created`** --- the world's `events.create('step_created', correlationId=X)` is serialized per-correlationId. Exactly one concurrent caller succeeds; the rest receive `EntityConflictError`. In production worlds (Postgres, Vercel) this is enforced at the SQL/DB layer. In `world-local`, a per-step in-process async mutex in `packages/world-local/src/storage/events-storage.ts` wraps every step lifecycle event's check-and-write so the same guarantee holds for dev. 2. **Suspension handler reports ownership** --- `handleSuspension()` returns `createdStepCorrelationIds: Set`, populated only for `step_created` writes that actually succeeded (not those that caught 409). 3. **Inline execution is gated on ownership** --- the runtime loop in `packages/core/src/runtime.ts` picks its inline step from `pendingSteps.filter(s => createdStepCorrelationIds.has(s.correlationId))`. A handler that didn't win any `step_created` race performs no inline execution. -4. **Queueing is unconditional** --- for every pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. This matches V1's enqueue pattern and is what makes crash recovery work: if a prior handler wrote `step_created` but crashed before enqueueing, a later handler (from flow-message redelivery or `reenqueueActiveRuns`) will enqueue the orphaned step. Concurrent handlers' redundant enqueues dedupe on the idempotency key. +4. **Queueing is owner-scoped when inline execution is possible** --- for every owned pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. A non-owner does not queue steps it merely observed from a stale replay, so it cannot race the owner's inline execution. When a wait is also pending, no handler executes steps inline, so every pending step is queueable and queue idempotency handles duplicates. If the workflow queue message itself is redelivered, the handler also recovers pre-existing `step_created` / `step_retrying` entries that have not reached `step_started`, covering the crash window after `step_created` but before queue dispatch. -Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** at least one queued dispatch (from whichever handler first reaches the suspension path after the `step_created` is visible). Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work). +Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** every non-inline owned step gets a queued dispatch from the owner. Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work). **Retry semantics are preserved**: the per-step mutex in `world-local` only rejects `step_started` when the step is already in a *terminal* state (`completed` / `failed`). A step that is currently running (status=`running`) still accepts a second `step_started` write with an incremented attempt counter — this is how queue redelivery after a SIGKILL mid-execution legitimately re-runs the step. The previously-documented "attempt counter inflation" failure mode is therefore no longer reachable via the concurrent-inline path; see "Concurrent `step_started` Inflating Attempt Counter" below for the complementary executor-side guard that still catches edge cases (e.g., postgres retries under high contention). diff --git a/packages/core/src/classify-error.test.ts b/packages/core/src/classify-error.test.ts index 89bc6acb83..4b0cd573bb 100644 --- a/packages/core/src/classify-error.test.ts +++ b/packages/core/src/classify-error.test.ts @@ -1,7 +1,11 @@ import { CorruptedEventLogError, + EntityConflictError, HookConflictError, RUN_ERROR_CODES, + RunExpiredError, + ThrottleError, + TooEarlyError, WorkflowNotRegisteredError, WorkflowRuntimeError, WorkflowWorldError, @@ -87,4 +91,32 @@ describe('classifyRunError', () => { RUN_ERROR_CODES.USER_ERROR ); }); + + it('classifies EntityConflictError as RUNTIME_ERROR (fence-conflict / 409 from runtime CAS write)', () => { + expect( + classifyRunError( + new EntityConflictError( + 'fence conflict: Stale request: event-log fence conflict' + ) + ) + ).toBe(RUN_ERROR_CODES.RUNTIME_ERROR); + }); + + it('classifies RunExpiredError as RUNTIME_ERROR (410 on runtime operation)', () => { + expect(classifyRunError(new RunExpiredError('run gone'))).toBe( + RUN_ERROR_CODES.RUNTIME_ERROR + ); + }); + + it('classifies TooEarlyError as RUNTIME_ERROR (425 retry-after-not-reached)', () => { + expect( + classifyRunError(new TooEarlyError('too early', { retryAfter: 5 })) + ).toBe(RUN_ERROR_CODES.RUNTIME_ERROR); + }); + + it('classifies ThrottleError as RUNTIME_ERROR (429 rate limited)', () => { + expect( + classifyRunError(new ThrottleError('throttled', { retryAfter: 1 })) + ).toBe(RUN_ERROR_CODES.RUNTIME_ERROR); + }); }); diff --git a/packages/core/src/classify-error.ts b/packages/core/src/classify-error.ts index 6d1da5031a..8c08a798f4 100644 --- a/packages/core/src/classify-error.ts +++ b/packages/core/src/classify-error.ts @@ -1,8 +1,12 @@ import { CorruptedEventLogError, + EntityConflictError, RUN_ERROR_CODES, type RunErrorCode, + RunExpiredError, StepNotRegisteredError, + ThrottleError, + TooEarlyError, WorkflowNotRegisteredError, WorkflowRuntimeError, WorkflowWorldError, @@ -20,11 +24,28 @@ const WORLD_CONTRACT_ERROR_CODES = new Set([ * not enough — we have to enumerate every concrete subclass we want to * recognize. Keep in sync with the `WorkflowRuntimeError` class hierarchy * in `@workflow/errors`. + * + * The `WorkflowWorldError` subclasses below (`EntityConflictError`, + * `RunExpiredError`, `TooEarlyError`, `ThrottleError`) are infrastructure- + * level conditions surfaced by the runtime's own calls into the world + * (CAS rejections, run cleanup, retry-after, rate limits). The runtime + * normally retries past them; if they reach `classifyRunError`, the + * runtime's retry budget exhausted — that's a transient infra failure, + * not user code, so `RUNTIME_ERROR` is the truthful classification. + * + * The bare `WorkflowWorldError` parent is *not* in this list: it can + * also surface from user-code `fetch` calls into the workflow API, which + * should remain `USER_ERROR` (see the test case on line ~43 of + * `classify-error.test.ts`). */ const RUNTIME_ERROR_CHECKS = [ WorkflowRuntimeError.is, StepNotRegisteredError.is, WorkflowNotRegisteredError.is, + EntityConflictError.is, + RunExpiredError.is, + TooEarlyError.is, + ThrottleError.is, ]; /** diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 1aa36e32ad..a389f50a63 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -19,6 +19,7 @@ import { classifyRunError, isWorldContractError } from './classify-error.js'; import { describeError } from './describe-error.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; +import { fencedEventCreate } from './runtime/fenced-write.js'; import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; import { getQueueOverhead, @@ -796,23 +797,45 @@ export function workflowEntrypoint( }, })); + // The last known event ID is used for optimistic concurrency + // control, being provided to the World, which will reject if any + // other events arrive between our event log read and event write. + // On fence conflict, bail the write — another invocation has + // the canonical view of the log; trying to retry in place + // here only spins against a moving target and (under stress) + // amplifies the stuck-fence pattern caused by the server-side + // patch-then-PUT non-atomicity. See `runtime/fenced-write.ts` + // for the full reasoning. + let fenceEventId: string | undefined = + events.length > 0 + ? events[events.length - 1].eventId + : undefined; for (const waitEvent of waitsToComplete) { - try { - await world.events.create(runId, waitEvent, { - requestId, - }); - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info( - 'Wait already completed, skipping', - { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - } - ); - continue; - } - throw err; + const writeResult = await fencedEventCreate({ + world, + runId, + event: waitEvent, + requestId, + fenceEventId, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + if (!writeResult.written) { + // Either fence conflict (canonical replay + // elsewhere) or the wait was already completed + // by a concurrent invocation. Both terminal + // for this invocation's attempt; continue with + // remaining waitsToComplete using the latest + // fence we have observed. + runtimeLogger.info( + 'Wait completion skipped (fence conflict or already completed)', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + } + ); } } @@ -886,22 +909,95 @@ export function workflowEntrypoint( replayMs: Date.now() - replayStart, }); - // Workflow completed + // Workflow completed. + // + // Fence the `run_completed` write against the + // load-time tail of `events`: if a concurrent + // replay with a fresher view has already written + // its own terminal event (run_completed / run_failed), + // CAS rejects this write and we treat the run as + // already-terminal (same shape as the existing + // EntityConflictError / RunExpiredError branches). try { - await world.events.create( + let runCompletedFence = + events.length > 0 + ? events[events.length - 1].eventId + : undefined; + try { + const terminalFenceRefresh = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + if (terminalFenceRefresh.events.length > 0) { + const existingIds = new Set( + events.map((e) => e.eventId) + ); + for (const event of terminalFenceRefresh.events) { + if (!existingIds.has(event.eventId)) { + events.push(event); + } + } + runCompletedFence = + terminalFenceRefresh.events[ + terminalFenceRefresh.events.length - 1 + ].eventId; + } + eventsCursor = + terminalFenceRefresh.cursor ?? eventsCursor; + cachedEvents = events; + } catch (refreshErr) { + runtimeLogger.debug( + 'Failed to refresh terminal fence before completing workflow run', + { + workflowRunId: runId, + message: + refreshErr instanceof Error + ? refreshErr.message + : String(refreshErr), + } + ); + } + const terminalEventAfterReplay = events.find( + (e) => + e.eventType === 'run_completed' || + e.eventType === 'run_failed' || + e.eventType === 'run_cancelled' + ); + if (terminalEventAfterReplay) { + runtimeLogger.debug( + 'Run completed by concurrent handler after replay, exiting', + { + workflowRunId: runId, + eventType: terminalEventAfterReplay.eventType, + } + ); + return; + } + const writeResult = await fencedEventCreate({ + world, runId, - { + event: { eventType: 'run_completed', specVersion: SPEC_VERSION_CURRENT, eventData: { output: result }, }, - { requestId } - ); + requestId, + fenceEventId: runCompletedFence, + onEntityConflict: () => 'abort', + }); + if (!writeResult.written) { + // `written: false` covers both fence conflict + // (another invocation has the canonical view) + // and entity-conflict abort (run already + // terminal). Either way the run is done from + // this invocation's perspective. + runtimeLogger.info( + 'Tried completing workflow run, but run has already finished or another invocation is canonical.', + { workflowRunId: runId } + ); + return; + } } catch (err) { - if ( - EntityConflictError.is(err) || - RunExpiredError.is(err) - ) { + if (RunExpiredError.is(err)) { runtimeLogger.info( 'Tried completing workflow run, but run has already finished.', { workflowRunId: runId, message: err.message } @@ -935,14 +1031,30 @@ export function workflowEntrypoint( runtimeLogger.debug(suspensionMessage); } - // V2: handle suspension without queuing steps + // V2: handle suspension without queuing steps. + // + // Pass the load-time tail eventId + cursor so the + // handler can fence its branch-decision writes + // (step_created, hook_created, hook_disposed, + // wait_created) against this snapshot of the log. + // This extends OCC fencing to the other writes whose + // outcome depends on a branch decision the workflow VM + // made from the loaded log. const suspensionStart = Date.now(); + // `cachedEvents` mirrors `events` (assigned right + // before runWorkflow above) and is visible in the + // outer catch scope where `events` is not. + const suspensionFenceEventId = + cachedEvents && cachedEvents.length > 0 + ? cachedEvents[cachedEvents.length - 1].eventId + : undefined; const suspensionResult = await handleSuspension({ suspension: err, world, run: workflowRun, span, requestId, + fenceEventId: suspensionFenceEventId, }); runtimeLogger.debug('Suspension handled', { workflowRunId: runId, @@ -993,10 +1105,10 @@ export function workflowEntrypoint( // the step, replay still picks the step because // wait_completed is only created on the *next* loop // iteration, which doesn't run until the step - // finishes. Queueing every step in this case lets - // the wait timeout drive a continuation in parallel, - // matching V1's behavior where each step ran in a - // separate function invocation. + // finishes. Queueing every owned step in this case + // lets the wait timeout drive a continuation in + // parallel, matching V1's behavior where each step + // ran in a separate function invocation. const inlineStep: | (typeof pendingSteps)[number] | undefined = @@ -1004,19 +1116,73 @@ export function workflowEntrypoint( ? ownedPendingSteps[0] : undefined; - // Queue every pending step except the one we're - // executing inline. This mirrors V1's unconditional - // enqueue-with-idempotency pattern and is what makes - // crash recovery work: if a prior handler wrote - // step_created events but crashed before enqueuing, - // a later handler (e.g., from flow-message - // redelivery or reenqueueActiveRuns) will enqueue - // the orphaned steps. In the happy path with a - // single owner, concurrent handlers' queue attempts - // dedupe on correlationId. Skipping the inline step - // avoids a queue handler racing against our own - // inline executor. - for (const step of pendingSteps) { + // Queue only steps this handler owns when inline + // execution is possible. A non-owner may have a + // stale replay view where another handler just + // created a step and is about to execute it inline; + // queueing that step here would bypass the ownership + // invariant and can run the step body twice. + // + // When a wait is pending, no handler executes a step + // inline: all step work is queue-driven so the wait + // timeout can race it. In that branch it is safe, and + // important for crash recovery, to enqueue every + // pending step and let queue idempotency dedupe. + // + // Recovery exception: if this workflow queue message + // is itself being redelivered, pick up pre-existing + // step_created / step_retrying events from the loaded + // log that never reached step_started. This covers the + // crash window after a handler writes step_created but + // before it queues the step. + const recoverablePendingStepCorrelationIds = + new Set(); + if (metadata.attempt > 1 && cachedEvents) { + const latestStepEvents = new Map(); + for (const event of cachedEvents) { + if ( + event.correlationId && + (event.eventType === 'step_created' || + event.eventType === 'step_retrying' || + event.eventType === 'step_started' || + event.eventType === 'step_completed' || + event.eventType === 'step_failed') + ) { + latestStepEvents.set( + event.correlationId, + event.eventType + ); + } + } + for (const [ + correlationId, + eventType, + ] of latestStepEvents) { + if ( + eventType === 'step_created' || + eventType === 'step_retrying' + ) { + recoverablePendingStepCorrelationIds.add( + correlationId + ); + } + } + } + + const queueablePendingSteps = + suspensionResult.timeoutSeconds !== undefined + ? pendingSteps + : pendingSteps.filter( + (step) => + suspensionResult.createdStepCorrelationIds.has( + step.correlationId + ) || + recoverablePendingStepCorrelationIds.has( + step.correlationId + ) + ); + + for (const step of queueablePendingSteps) { if ( inlineStep && step.correlationId === inlineStep.correlationId @@ -1041,7 +1207,8 @@ export function workflowEntrypoint( } // Nothing to execute inline — we already queued all - // pending steps above, exit and let the queue drive. + // steps this handler can safely dispatch, exit and + // let the queue drive. if (!inlineStep) { if (suspensionResult.timeoutSeconds !== undefined) { return { @@ -1200,10 +1367,79 @@ export function workflowEntrypoint( // Serialize the original thrown value so its full // type identity and custom properties round-trip // through the event log. + // + // Fenced for the same reason as run_completed: + // don't let a stale-view replay paper over a + // concurrent terminal write. try { - await world.events.create( + // `cachedEvents` mirrors the local `events` + // (which lives in the try-block above); see + // the suspension catch for the same trick. + let runFailedEvents = cachedEvents ?? []; + let runFailedFence = + runFailedEvents.length > 0 + ? runFailedEvents[runFailedEvents.length - 1] + .eventId + : undefined; + try { + const terminalFenceRefresh = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + if (eventsCursor) { + if (terminalFenceRefresh.events.length > 0) { + const existingIds = new Set( + runFailedEvents.map((e) => e.eventId) + ); + for (const event of terminalFenceRefresh.events) { + if (!existingIds.has(event.eventId)) { + runFailedEvents.push(event); + } + } + } + } else { + runFailedEvents = terminalFenceRefresh.events; + } + if (terminalFenceRefresh.events.length > 0) { + runFailedFence = + terminalFenceRefresh.events[ + terminalFenceRefresh.events.length - 1 + ].eventId; + } + eventsCursor = + terminalFenceRefresh.cursor ?? eventsCursor; + cachedEvents = runFailedEvents; + } catch (refreshErr) { + runtimeLogger.debug( + 'Failed to refresh terminal fence before failing workflow run', + { + workflowRunId: runId, + message: + refreshErr instanceof Error + ? refreshErr.message + : String(refreshErr), + } + ); + } + const terminalEventAfterReplay = runFailedEvents.find( + (e) => + e.eventType === 'run_completed' || + e.eventType === 'run_failed' || + e.eventType === 'run_cancelled' + ); + if (terminalEventAfterReplay) { + runtimeLogger.debug( + 'Run completed by concurrent handler after replay, exiting', + { + workflowRunId: runId, + eventType: terminalEventAfterReplay.eventType, + } + ); + return; + } + const writeResult = await fencedEventCreate({ + world, runId, - { + event: { eventType: 'run_failed', specVersion: SPEC_VERSION_CURRENT, eventData: { @@ -1215,13 +1451,19 @@ export function workflowEntrypoint( errorCode, }, }, - { requestId } - ); + requestId, + fenceEventId: runFailedFence, + onEntityConflict: () => 'abort', + }); + if (!writeResult.written) { + runtimeLogger.info( + 'Tried failing workflow run, but run has already finished or another invocation is canonical.', + { workflowRunId: runId } + ); + return; + } } catch (failErr) { - if ( - EntityConflictError.is(failErr) || - RunExpiredError.is(failErr) - ) { + if (RunExpiredError.is(failErr)) { runtimeLogger.info( 'Tried failing workflow run, but run has already finished.', { diff --git a/packages/core/src/runtime/fenced-write.ts b/packages/core/src/runtime/fenced-write.ts new file mode 100644 index 0000000000..ddd7b0eb1c --- /dev/null +++ b/packages/core/src/runtime/fenced-write.ts @@ -0,0 +1,183 @@ +/** + * Helper for "branch-decision" event writes that need OCC fencing. + * + * Used by the writes whose outcome depends on a branch decision the + * workflow VM made from its loaded event log: + * + * suspension-handler.ts: + * - step_created + * - hook_created + * - hook_disposed + * - wait_created + * + * runtime.ts terminal writes: + * - run_completed + * - run_failed + * + * `hook_received` is deliberately NOT fenced: fencing the user's signal + * would drop it on contention; stale-snapshot protection belongs on the + * writes that consume hooks, not the writes that deliver them. + * + * On a fence conflict, this helper **bails the current write** rather + * than retrying in place. A fence conflict means some other invocation + * has already advanced the event log past our snapshot, so the work + * implied by our snapshot is either already done or will be done by + * whoever advanced the log. Retrying in place caused two problems: + * + * 1. Under high contention (e.g. a hook flood firing many concurrent + * ticks), the retry loop became a stuck-fence spin: lambdas would + * spend their budget retrying against an ever-changing tail and + * either succeed by luck or exhaust MAX_FENCE_RETRIES and throw. + * A thrown EntityConflictError surfaces as `run_failed` (a + * transient infra issue mis-classified as terminal failure). + * + * 2. The retries-against-the-same-fence shape, paired with the + * server-side patch-then-PUT non-atomicity, made stuck-fence runs + * measurably worse: every retry attempt against an already-advanced + * fence is wasted compute, and the spin keeps the run stuck + * longer in the affected-runs-per-stress-cycle window. + * + * The simpler model matches Peter's existing workflow-server comment + * ("the @workflow/core suspension handler swallows it"): a fence + * conflict signals "another replay is canonical; this one isn't" — + * exit cleanly, trust the canonical replay, no error, no retry, no + * re-enqueue. + */ +import { EntityConflictError } from '@workflow/errors'; +import type { CreateEventRequest, World } from '@workflow/world'; +import { runtimeLogger } from '../logger.js'; + +/** + * Returns true when an EntityConflictError carries the fence-conflict + * shape. Anything else with a 409/410/etc shape is some other kind of + * conflict (entity already exists, run already terminal, hook token taken) + * that the caller's existing handlers want to keep dealing with. + * + * TODO: switch to a typed error class once the wire format exposes one. + */ +function isFenceConflict(err: unknown): boolean { + return ( + EntityConflictError.is(err) && + typeof err.message === 'string' && + /fence conflict/i.test(err.message) + ); +} + +export interface FencedWriteParams { + world: World; + runId: string; + event: CreateEventRequest; + requestId?: string; + /** + * Caller-provided fence value. Pass `undefined` for the first attempt + * if no fence has been observed yet (e.g. on a fresh resume); the + * helper will then issue an unfenced write — which still atomically + * advances `run.lastKnownEventId` on the server side so future fenced + * writers see the materialized value. + */ + fenceEventId: string | undefined; + /** + * Called when the server rejects with a *non-fence* EntityConflictError + * (e.g. the entity already exists because a concurrent handler beat us + * to it). Returning `'abort'` is the typical answer — the existing + * handlers in suspension-handler / runtime already log + skip. Returning + * `'rethrow'` re-throws so the caller can deal with it. + */ + onEntityConflict: (err: EntityConflictError) => 'abort' | 'rethrow'; +} + +export interface FencedWriteResult { + /** + * Whether the event was actually written. + * + * `false` covers two cases the caller usually wants to treat the same + * way (no further action required for this write): + * - Fence conflict — another invocation has the canonical view. + * - Non-fence EntityConflictError with `onEntityConflict: 'abort'` + * (entity already exists, run already terminal, etc.). + */ + written: boolean; + /** + * eventId of the newly-written event (when `written` is true), so + * the caller can advance its tracked fence value. + */ + newFenceEventId?: string; + /** + * The server's response event, when `written` is true. Allows callers + * to read fields like the resolved eventType (e.g. `hook_conflict` + * instead of `hook_created` when the server detected a token clash). + */ + event?: { eventType: string; eventId: string }; +} + +/** + * Issues `world.events.create(runId, event, { requestId, lastKnownEventId })` + * once. On fence conflict, returns `{ written: false }` immediately + * (no retry, no throw, no re-enqueue) — see the file-level comment for + * the reasoning. + * + * On any non-fence EntityConflictError, defers to `onEntityConflict` for + * the abort-vs-rethrow decision (preserves the existing + * "EntityConflictError → log and skip" behavior for callers that want it). + */ +export async function fencedEventCreate( + params: FencedWriteParams +): Promise { + const { world, runId, event, requestId, fenceEventId, onEntityConflict } = + params; + try { + const result = await world.events.create(runId, event, { + requestId, + ...(fenceEventId ? { lastKnownEventId: fenceEventId } : {}), + }); + // The server response schema marks `event` as optional for legacy + // compatibility. In practice creates always return the persisted + // event, but if it's missing we keep the caller's fence at its + // prior value rather than silently advancing to a value we didn't + // observe on the wire. + if (!result.event) { + runtimeLogger.warn( + 'Branch-decision write missing event in response; keeping prior fence', + { + workflowRunId: runId, + eventType: event.eventType, + correlationId: event.correlationId, + fenceEventId, + } + ); + } + return { + written: true, + newFenceEventId: result.event?.eventId ?? fenceEventId, + event: result.event + ? { + eventType: result.event.eventType, + eventId: result.event.eventId, + } + : undefined, + }; + } catch (err) { + if (isFenceConflict(err)) { + // Another invocation has the canonical view of the event log. + // Bail out cleanly: no retry, no throw. The canonical invocation + // is responsible for whatever progress the workflow needs. + runtimeLogger.info( + 'Branch-decision write fence conflict; yielding to canonical replay', + { + workflowRunId: runId, + eventType: event.eventType, + correlationId: event.correlationId, + } + ); + return { written: false }; + } + if (EntityConflictError.is(err)) { + const decision = onEntityConflict(err); + if (decision === 'abort') { + return { written: false }; + } + throw err; + } + throw err; + } +} diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 0787d4d4e5..c6722f822a 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,7 +156,11 @@ export async function resumeHook( }) ); - // Create a hook_received event with the payload + // Create a hook_received event with the payload + // + // From a concurrency control perspective, this is done unconditionally. + // Any other event creations or invocations use the `lastKnownEventId` + // fence to ensure hook_received being added won't cause ordering issues. await world.events.create( hook.runId, { diff --git a/packages/core/src/runtime/suspension-handler.test.ts b/packages/core/src/runtime/suspension-handler.test.ts new file mode 100644 index 0000000000..7f78e16747 --- /dev/null +++ b/packages/core/src/runtime/suspension-handler.test.ts @@ -0,0 +1,143 @@ +import { EntityConflictError } from '@workflow/errors'; +import type { + CreateEventParams, + CreateEventRequest, + Event, + WorkflowRun, + World, +} from '@workflow/world'; +import { describe, expect, it, vi } from 'vitest'; +import { WorkflowSuspension } from '../global.js'; +import { handleSuspension } from './suspension-handler.js'; + +describe('handleSuspension', () => { + it('chains the local event-log fence across writes from the same suspension', async () => { + const runId = 'wrun_test_fence_chain'; + let serverTip = 'evnt_000'; + let sequence = 0; + const writes: Array<{ + eventType: string; + correlationId?: string; + lastKnownEventId?: string; + }> = []; + + const world = { + getEncryptionKeyForRun: vi.fn(async () => undefined), + events: { + create: vi.fn( + async ( + _runId: string, + event: CreateEventRequest, + params?: CreateEventParams + ) => { + if ( + params?.lastKnownEventId !== undefined && + params.lastKnownEventId !== serverTip + ) { + throw new EntityConflictError( + `fence conflict: expected ${params.lastKnownEventId}, current ${serverTip}` + ); + } + + const eventId = `evnt_${String(++sequence).padStart(3, '0')}`; + writes.push({ + eventType: event.eventType, + correlationId: event.correlationId, + lastKnownEventId: params?.lastKnownEventId, + }); + serverTip = eventId; + + return { + event: { + ...event, + eventId, + createdAt: new Date(), + } as Event, + }; + } + ), + }, + streams: { + write: vi.fn(async () => undefined), + close: vi.fn(async () => undefined), + }, + } as unknown as World; + + const suspension = new WorkflowSuspension( + new Map([ + [ + 'hook-user', + { + type: 'hook', + correlationId: 'hook_user', + token: 'user-token', + }, + ], + [ + 'hook-abort', + { + type: 'hook', + correlationId: 'hook_abort', + token: 'abrt_abort', + isSystem: true, + abortRequested: true, + abortReason: 'cancelled', + }, + ], + [ + 'step', + { + type: 'step', + correlationId: 'step_1', + stepName: 'do work', + args: [], + }, + ], + [ + 'wait', + { + type: 'wait', + correlationId: 'wait_1', + resumeAt: new Date(Date.now() + 10_000), + }, + ], + ]), + globalThis + ); + + await handleSuspension({ + suspension, + world, + run: { runId } as WorkflowRun, + fenceEventId: 'evnt_000', + }); + + expect(writes).toEqual([ + { + eventType: 'hook_created', + correlationId: 'hook_user', + lastKnownEventId: 'evnt_000', + }, + { + eventType: 'hook_created', + correlationId: 'hook_abort', + lastKnownEventId: 'evnt_001', + }, + { + eventType: 'hook_received', + correlationId: 'hook_abort', + lastKnownEventId: undefined, + }, + { + eventType: 'step_created', + correlationId: 'step_1', + lastKnownEventId: 'evnt_003', + }, + { + eventType: 'wait_created', + correlationId: 'wait_1', + lastKnownEventId: 'evnt_004', + }, + ]); + }); +}); diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index 8f084ed446..1be5a8af3a 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -1,5 +1,4 @@ import type { Span } from '@opentelemetry/api'; -import { waitUntil } from '@vercel/functions'; import { EntityConflictError, HookNotFoundError, @@ -23,6 +22,7 @@ import { runtimeLogger } from '../logger.js'; import { dehydrateStepArguments } from '../serialization.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; import { getAbortStreamIdFromToken } from '../util.js'; +import { fencedEventCreate } from './fenced-write.js'; export interface SuspensionHandlerParams { suspension: WorkflowSuspension; @@ -30,6 +30,18 @@ export interface SuspensionHandlerParams { run: WorkflowRun; span?: Span; requestId?: string; + /** + * Caller's most recent view of the event log (tail eventId), used as + * the OCC fence on every branch-decision write below. Pass `undefined` + * when the caller has no events loaded yet; the writes will then be + * unfenced (still atomically advance `run.lastKnownEventId` on the + * server side so future fenced writers chain off the new value). + * + * Conceptually identical to the elapsed-wait scan fence. See + * `fenced-write.ts` for the rationale for extending it to + * step/wait/hook `_created` and `hook_disposed`. + */ + fenceEventId?: string; } /** @@ -61,7 +73,8 @@ export interface SuspensionHandlerResult { * * Processing order: * 1. Hooks are processed first to prevent race conditions with webhook receivers - * 2. Step events and wait events are created in parallel + * 2. Writes that advance the event-log fence are issued in DB-order from this + * replay turn, so the next local write chains off the previous event ID. */ export async function handleSuspension({ suspension, @@ -69,8 +82,17 @@ export async function handleSuspension({ run, span, requestId, + fenceEventId: initialFenceEventId, }: SuspensionHandlerParams): Promise { const runId = run.runId; + + // Per-suspension shared fence state. Each successful fenced write advances + // `fenceEventId` so subsequent writes from this handler chain off it. + // Branch-decision event writes must not race each other locally: the server + // fence is a single-tip CAS, so parallel sibling writes from one replay turn + // would force self-conflicts against the same starting fence. + let fenceEventId = initialFenceEventId; + // Separate queue items by type const stepItems = suspension.steps.filter( (item): item is StepInvocationQueueItem => item.type === 'step' @@ -119,89 +141,97 @@ export async function handleSuspension({ ); // Process hooks first to prevent race conditions with webhook receivers. - // All hook creations run in parallel. // Track any hook conflicts that occur — these are returned to the caller // so the V2 handler can re-invoke immediately. let hasHookConflict = false; - if (hookEvents.length > 0) { - await Promise.all( - hookEvents.map(async (hookEvent) => { - try { - const result = await world.events.create(runId, hookEvent, { - requestId, - }); - // Check if the world returned a hook_conflict event instead of hook_created. - // The hook_conflict event is stored in the event log and will be replayed - // on the next workflow invocation, causing the hook's promise to reject. - // Note: hook events always create an event (legacy runs throw, not return undefined) - if (result.event!.eventType === 'hook_conflict') { - hasHookConflict = true; - } - } catch (err) { - if (EntityConflictError.is(err) || RunExpiredError.is(err)) { - runtimeLogger.info( - 'Workflow run already completed, skipping hook', - { - workflowRunId: runId, - message: err.message, - } - ); - } else { - throw err; + for (const hookEvent of hookEvents) { + try { + const writeResult = await fencedEventCreate({ + world, + runId, + event: hookEvent, + requestId, + fenceEventId, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + if (!writeResult.written) { + // Already created concurrently — surface as info, same shape + // as the pre-fence "EntityConflictError → skip" branch did. + runtimeLogger.info( + 'Workflow run already completed or hook already created, skipping', + { + workflowRunId: runId, + correlationId: hookEvent.correlationId, } - } - }) - ); + ); + continue; + } + // Preserve the "world resolved hook_created → hook_conflict" + // short-circuit. The hook_conflict event lives in the log and + // will be replayed; the immediate signal lets the caller + // re-invoke right away rather than waiting on the queue. + if (writeResult.event?.eventType === 'hook_conflict') { + hasHookConflict = true; + } + } catch (err) { + if (RunExpiredError.is(err)) { + runtimeLogger.info('Workflow run already completed, skipping hook', { + workflowRunId: runId, + message: err.message, + }); + } else { + throw err; + } + } } // Process hook disposals — these release hook tokens for reuse by other workflows. - if (hooksNeedingDisposal.length > 0) { - await Promise.all( - hooksNeedingDisposal.map(async (queueItem) => { - const hookDisposedEvent: CreateEventRequest = { - eventType: 'hook_disposed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - token: queueItem.token, - }, - }; - try { - await world.events.create(runId, hookDisposedEvent, { requestId }); - } catch (err) { - if (EntityConflictError.is(err)) { - // Hook was already disposed by a concurrent invocation — safe to skip - runtimeLogger.info( - 'Hook already disposed, skipping duplicate disposal', - { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - } - ); - } else if (RunExpiredError.is(err)) { - runtimeLogger.info( - 'Workflow run already completed, skipping hook disposal', - { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - } - ); - } else if (HookNotFoundError.is(err)) { - // Hook may have already been disposed or never created - runtimeLogger.info('Hook not found for disposal, continuing', { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - }); - } else { - throw err; + for (const queueItem of hooksNeedingDisposal) { + const hookDisposedEvent: CreateEventRequest = { + eventType: 'hook_disposed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + token: queueItem.token, + }, + }; + try { + const writeResult = await fencedEventCreate({ + world, + runId, + event: hookDisposedEvent, + requestId, + fenceEventId, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + } catch (err) { + if (RunExpiredError.is(err)) { + runtimeLogger.info( + 'Workflow run already completed, skipping hook disposal', + { + workflowRunId: runId, + correlationId: queueItem.correlationId, + message: err.message, } - } - }) - ); + ); + } else if (HookNotFoundError.is(err)) { + // Hook may have already been disposed or never created + runtimeLogger.info('Hook not found for disposal, continuing', { + workflowRunId: runId, + correlationId: queueItem.correlationId, + message: err.message, + }); + } else { + throw err; + } + } } // Process abort requests — resume the hook with abort payload and write stream packet @@ -209,76 +239,73 @@ export async function handleSuspension({ (item) => item.abortRequested && !item.disposed ); - if (hooksNeedingAbort.length > 0) { - await Promise.all( - hooksNeedingAbort.map(async (queueItem) => { - try { - // Dehydrate the abort payload for storage - const abortPayload = await dehydrateStepArguments( - { aborted: true, reason: queueItem.abortReason }, - runId, - encryptionKey, - suspension.globalThis - ); + for (const queueItem of hooksNeedingAbort) { + try { + // Dehydrate the abort payload for storage + const abortPayload = await dehydrateStepArguments( + { aborted: true, reason: queueItem.abortReason }, + runId, + encryptionKey, + suspension.globalThis + ); - // Create hook_received event with abort payload - await world.events.create(runId, { - eventType: 'hook_received' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - token: queueItem.token, - payload: abortPayload, - }, - }); + // Create hook_received event with abort payload. This event is + // deliberately unfenced because it represents signal delivery, but + // successful writes still advance the server's run.lastKnownEventId. + const abortEventResult = await world.events.create(runId, { + eventType: 'hook_received' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + token: queueItem.token, + payload: abortPayload, + }, + }); + if (abortEventResult.event?.eventId) { + fenceEventId = abortEventResult.event.eventId; + } - // Write stream cancellation packet for real-time step propagation. - // Reuse the same dehydrated payload as the hook event so the reason - // round-trips through `dehydrateStepArguments` / `hydrateStepArguments` - // (handles DOMException, custom errors, encryption, etc.) instead of - // bare JSON.stringify which loses type information and drops undefined. - // streamName is set on the queue item at controller construction time - // (see workflow/abort-controller.ts). - try { - const streamName = getAbortStreamIdFromToken(queueItem.token); - await world.streams.write( - runId, - streamName, - abortPayload as Uint8Array - ); - await world.streams.close(runId, streamName); - } catch { - // Best-effort stream write — hook event provides the durable fallback - runtimeLogger.debug( - 'Failed to write abort stream packet, hook event will provide fallback', - { - workflowRunId: runId, - correlationId: queueItem.correlationId, - } - ); - } - } catch (err) { - if (EntityConflictError.is(err) || RunExpiredError.is(err)) { - runtimeLogger.info( - 'Workflow run already completed, skipping abort', - { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - } - ); - } else { - throw err; + // Write stream cancellation packet for real-time step propagation. + // Reuse the same dehydrated payload as the hook event so the reason + // round-trips through `dehydrateStepArguments` / `hydrateStepArguments` + // (handles DOMException, custom errors, encryption, etc.) instead of + // bare JSON.stringify which loses type information and drops undefined. + // streamName is set on the queue item at controller construction time + // (see workflow/abort-controller.ts). + try { + const streamName = getAbortStreamIdFromToken(queueItem.token); + await world.streams.write( + runId, + streamName, + abortPayload as Uint8Array + ); + await world.streams.close(runId, streamName); + } catch { + // Best-effort stream write — hook event provides the durable fallback + runtimeLogger.debug( + 'Failed to write abort stream packet, hook event will provide fallback', + { + workflowRunId: runId, + correlationId: queueItem.correlationId, } - } - }) - ); + ); + } + } catch (err) { + if (EntityConflictError.is(err) || RunExpiredError.is(err)) { + runtimeLogger.info('Workflow run already completed, skipping abort', { + workflowRunId: runId, + correlationId: queueItem.correlationId, + message: err.message, + }); + } else { + throw err; + } + } } // Create step events for steps that don't have them yet. // Unlike V1, we do NOT queue step messages from here — the caller // decides which steps to execute inline vs. queue to background. - // Wait events are also created in parallel below. const stepsNeedingCreation = new Set( stepItems .filter((queueItem) => !queueItem.hasCreatedEvent) @@ -291,90 +318,104 @@ export async function handleSuspension({ // racing with concurrent handlers on step execution. const createdStepCorrelationIds = new Set(); - const ops: Promise[] = []; + const ops: Array<() => Promise> = []; - // Steps: create step_created events (no queuing — V2 returns pending steps to caller) + // Steps: create step_created events (no queuing — V2 returns pending + // steps to caller). + // + // Fenced: under concurrent replay, two invocations with diverging + // event-log snapshots can pick different branch decisions for the same + // deterministic correlationId (e.g. a hook/sleep race that one replay + // saw resolve to "wake" and another to "sleep"). The fence rejects the + // stale-view writer so only the authoritative replay's step_created + // lands; the other invocation reloads and retries against the new tail. for (const queueItem of stepItems) { if (stepsNeedingCreation.has(queueItem.correlationId)) { - ops.push( - (async () => { - const dehydratedInput = await dehydrateStepArguments( + ops.push(async () => { + const dehydratedInput = await dehydrateStepArguments( + { + args: queueItem.args, + closureVars: queueItem.closureVars, + thisVal: queueItem.thisVal, + }, + runId, + encryptionKey, + suspension.globalThis + ); + const stepEvent: CreateEventRequest = { + eventType: 'step_created' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + stepName: queueItem.stepName, + input: dehydratedInput as SerializedData, + }, + }; + const writeResult = await fencedEventCreate({ + world, + runId, + event: stepEvent, + requestId, + fenceEventId, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + if (writeResult.written) { + createdStepCorrelationIds.add(queueItem.correlationId); + } else { + runtimeLogger.info( + 'Step already exists (post-fence-conflict), continuing', { - args: queueItem.args, - closureVars: queueItem.closureVars, - thisVal: queueItem.thisVal, - }, - runId, - encryptionKey, - suspension.globalThis - ); - const stepEvent: CreateEventRequest = { - eventType: 'step_created' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - stepName: queueItem.stepName, - input: dehydratedInput as SerializedData, - }, - }; - try { - await world.events.create(runId, stepEvent, { requestId }); - createdStepCorrelationIds.add(queueItem.correlationId); - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info('Step already exists, continuing', { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - }); - } else { - throw err; + workflowRunId: runId, + correlationId: queueItem.correlationId, } - } - })() - ); + ); + } + }); } } - // Create wait events (same as V1) + // Create wait events. Same fencing rationale as `step_created`: a + // stale-view replay can otherwise call `sleep(...)` on a code path + // that the authoritative replay doesn't take, landing a `wait_created` + // that future replays will see as an orphan. for (const queueItem of waitItems) { if (!queueItem.hasCreatedEvent) { - ops.push( - (async () => { - const waitEvent: CreateEventRequest = { - eventType: 'wait_created' as const, - specVersion: SPEC_VERSION_CURRENT, + ops.push(async () => { + const waitEvent: CreateEventRequest = { + eventType: 'wait_created' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + resumeAt: queueItem.resumeAt, + }, + }; + const writeResult = await fencedEventCreate({ + world, + runId, + event: waitEvent, + requestId, + fenceEventId, + onEntityConflict: () => 'abort', + }); + if (writeResult.newFenceEventId) { + fenceEventId = writeResult.newFenceEventId; + } + if (!writeResult.written) { + runtimeLogger.info('Wait already exists, continuing', { + workflowRunId: runId, correlationId: queueItem.correlationId, - eventData: { - resumeAt: queueItem.resumeAt, - }, - }; - try { - await world.events.create(runId, waitEvent, { requestId }); - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info('Wait already exists, continuing', { - workflowRunId: runId, - correlationId: queueItem.correlationId, - message: err.message, - }); - } else { - throw err; - } - } - })() - ); + }); + } + }); } } - waitUntil( - Promise.all(ops).catch((opErr) => { - const isAbortError = - opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted'; - if (!isAbortError) throw opErr; - }) - ); - await Promise.all(ops); + for (const op of ops) { + await op(); + } // Calculate minimum timeout from waits const now = Date.now(); diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index c9e1c6c5d1..8d02fd89a3 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -50,6 +50,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { stepName, args, }); + ctx.eventsConsumer.subscribe((event) => { if (!event) { // We've reached the end of the events, so this step has either not been run or is currently running. diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index 1cbc7bd39d..cb6d699e43 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -44,6 +44,7 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { let conflictErrorRef: HookConflictError | null = null; webhookLogger.debug('Hook consumer setup', { correlationId, token }); + ctx.eventsConsumer.subscribe((event) => { // If there are no events and there are promises waiting, // it means the hook has been awaited, but an incoming payload has not yet been received. diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index 3874f63648..cd0a6e8da2 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -169,6 +169,37 @@ describe('queue timeout re-enqueue', () => { expect(mockSetTimeout).not.toHaveBeenCalled(); }); + it('dedupes idempotency keys after the handler completes', async () => { + let callCount = 0; + const handler = localQueue.createQueueHandler('__wkf_step_', async () => { + callCount++; + return undefined; + }); + + localQueue.registerHandler('__wkf_step_', handler); + + const first = await localQueue.queue( + '__wkf_step_test' as any, + stepPayload, + { idempotencyKey: stepPayload.stepId } + ); + + await vi.waitFor(() => { + expect(callCount).toBe(1); + }); + + const second = await localQueue.queue( + '__wkf_step_test' as any, + stepPayload, + { idempotencyKey: stepPayload.stepId } + ); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(second.messageId).toBe(first.messageId); + expect(callCount).toBe(1); + }); + it('logs actionable guidance for detached ArrayBuffer proxy failures', async () => { const consoleError = vi .spyOn(console, 'error') diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 7ac0277dad..96dcf64b8e 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -54,6 +54,8 @@ const WORKFLOW_LOCAL_QUEUE_CONCURRENCY = parseInt(process.env.WORKFLOW_LOCAL_QUEUE_CONCURRENCY ?? '0', 10) || DEFAULT_CONCURRENCY_LIMIT; +const COMPLETED_IDEMPOTENCY_CACHE_LIMIT = 10_000; + export type DirectHandler = (req: Request) => Promise; export type LocalQueue = Queue & { @@ -123,9 +125,21 @@ export function createQueue(config: Partial): LocalQueue { * that we don't queue the same message multiple times */ const inflightMessages = new Map(); + const completedMessages = new Map(); /** Direct in-process handlers by queue prefix, bypassing HTTP when set. */ const directHandlers = new Map(); + function markMessageCompleted(idempotencyKey: string, messageId: MessageId) { + completedMessages.delete(idempotencyKey); + completedMessages.set(idempotencyKey, messageId); + if (completedMessages.size > COMPLETED_IDEMPOTENCY_CACHE_LIMIT) { + const oldestKey = completedMessages.keys().next().value; + if (oldestKey) { + completedMessages.delete(oldestKey); + } + } + } + const queue: Queue['queue'] = async (queueName, message, opts) => { const cleanup = [] as (() => void)[]; @@ -134,6 +148,11 @@ export function createQueue(config: Partial): LocalQueue { if (existing) { return { messageId: existing }; } + + const completed = completedMessages.get(opts.idempotencyKey); + if (completed) { + return { messageId: completed }; + } } const body = transport.serialize(message); @@ -224,6 +243,9 @@ export function createQueue(config: Partial): LocalQueue { continue; } } catch {} + if (opts?.idempotencyKey) { + markMessageCompleted(opts.idempotencyKey, messageId); + } return; } diff --git a/packages/world-testing/src/inline-batches-debug.mts b/packages/world-testing/src/inline-batches-debug.mts index bf7a356cb3..32c72440c9 100644 --- a/packages/world-testing/src/inline-batches-debug.mts +++ b/packages/world-testing/src/inline-batches-debug.mts @@ -1,5 +1,5 @@ -import { expect, test, vi } from 'vitest'; import { hydrateWorkflowReturnValue } from '@workflow/core/serialization'; +import { expect, test, vi } from 'vitest'; import { createFetcher, startServer } from './util.mjs'; /** @@ -258,6 +258,9 @@ export function inlineBatchesDebug(world: string) { // Sanity: the run did complete. Everything else is diagnostic. expect(run.status).toBe('completed'); + expect(stepsSkippedAlreadyDone).toBe(0); + expect(stepCompletedRaces).toBe(0); + expect(unconsumedEventSkips).toBe(0); // Hard assertion: no step should have hit max retries for this workflow. expect(maxDeliveries).toBe(0); } diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 433cedcc7c..b76e7e2271 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -457,6 +457,9 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), }, config, schema: EventResultResolveWireSchema, @@ -482,6 +485,9 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), }, config, schema: EventResultLazyWireSchema, diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 8acefbaa69..eb1f1994b4 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -383,6 +383,20 @@ export async function makeRequest({ if (response.status === 409) { throwWithTrace(new EntityConflictError(defaultMessage)); } + if (response.status === 412) { + // OCC fence conflict. Surface as EntityConflictError so the + // existing conflict branching downstream applies, and ensure the + // message carries a "fence conflict" marker even when the + // response body didn't parse — the fence-retry loop in + // runtime/fenced-write.ts dispatches off that marker, and + // upstream error pages (CDN HTML, gateway timeouts, etc.) + // shouldn't be allowed to demote a fence conflict to a generic + // EntityConflictError that callers treat as terminal. + const fenceMessage = /fence conflict/i.test(defaultMessage) + ? defaultMessage + : `fence conflict: ${defaultMessage}`; + throwWithTrace(new EntityConflictError(fenceMessage)); + } if (response.status === 410) { throwWithTrace(new RunExpiredError(defaultMessage)); } diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index a541ac0563..dab8ba89d7 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -391,6 +391,13 @@ export interface CreateEventParams { resolveData?: ResolveData; /** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */ requestId?: string; + /** + * Optimistic concurrency control fence: when set, the event write is + * rejected with a conflict unless the run's materialized `lastKnownEventId` + * equals this value. Lets the runtime stop an invocation operating on a stale + * event log snapshot from advancing the log. + */ + lastKnownEventId?: string; } /**