Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chilly-fences-chain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Chain replay-created events and owner-scope inline-capable step dispatch to avoid stale or duplicate replay writes.
7 changes: 7 additions & 0 deletions .changeset/event-write-occ-fence.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/core": patch
"@workflow/world": patch
"@workflow/world-vercel": patch
---

Add optional `lastKnownEventId` param to `events.create`, which the World can use to do optimistic concurrency control fencing on branch-decision event writes. Fence conflict surfaces as `EntityConflictError`, which the runtime treats as a signal that another invocation has the canonical view of the event log: the current write is dropped (no retry, no `run_failed`) and the canonical invocation is left to make progress.
5 changes: 5 additions & 0 deletions .changeset/quick-local-queues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-local": patch
---

Preserve completed local queue idempotency keys to avoid duplicate step dispatches.
4 changes: 2 additions & 2 deletions docs/content/docs/v4/changelog/eager-processing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ The design enforces a simple invariant: **exactly one handler owns each step, an
1. **Atomic `step_created`** --- the world's `events.create('step_created', correlationId=X)` is serialized per-correlationId. Exactly one concurrent caller succeeds; the rest receive `EntityConflictError`. In production worlds (Postgres, Vercel) this is enforced at the SQL/DB layer. In `world-local`, a per-step in-process async mutex in `packages/world-local/src/storage/events-storage.ts` wraps every step lifecycle event's check-and-write so the same guarantee holds for dev.
2. **Suspension handler reports ownership** --- `handleSuspension()` returns `createdStepCorrelationIds: Set<string>`, populated only for `step_created` writes that actually succeeded (not those that caught 409).
3. **Inline execution is gated on ownership** --- the runtime loop in `packages/core/src/runtime.ts` picks its inline step from `pendingSteps.filter(s => createdStepCorrelationIds.has(s.correlationId))`. A handler that didn't win any `step_created` race performs no inline execution.
4. **Queueing is unconditional** --- for every pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. This matches V1's enqueue pattern and is what makes crash recovery work: if a prior handler wrote `step_created` but crashed before enqueueing, a later handler (from flow-message redelivery or `reenqueueActiveRuns`) will enqueue the orphaned step. Concurrent handlers' redundant enqueues dedupe on the idempotency key.
4. **Queueing is owner-scoped when inline execution is possible** --- for every owned pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. A non-owner does not queue steps it merely observed from a stale replay, so it cannot race the owner's inline execution. When a wait is also pending, no handler executes steps inline, so every pending step is queueable and queue idempotency handles duplicates. If the workflow queue message itself is redelivered, the handler also recovers pre-existing `step_created` / `step_retrying` entries that have not reached `step_started`, covering the crash window after `step_created` but before queue dispatch.

Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** at least one queued dispatch (from whichever handler first reaches the suspension path after the `step_created` is visible). Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work).
Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** every non-inline owned step gets a queued dispatch from the owner. Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work).

**Retry semantics are preserved**: the per-step mutex in `world-local` only rejects `step_started` when the step is already in a *terminal* state (`completed` / `failed`). A step that is currently running (status=`running`) still accepts a second `step_started` write with an incremented attempt counter — this is how queue redelivery after a SIGKILL mid-execution legitimately re-runs the step. The previously-documented "attempt counter inflation" failure mode is therefore no longer reachable via the concurrent-inline path; see "Concurrent `step_started` Inflating Attempt Counter" below for the complementary executor-side guard that still catches edge cases (e.g., postgres retries under high contention).

Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/v5/changelog/eager-processing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ The design enforces a simple invariant: **exactly one handler owns each step, an
1. **Atomic `step_created`** --- the world's `events.create('step_created', correlationId=X)` is serialized per-correlationId. Exactly one concurrent caller succeeds; the rest receive `EntityConflictError`. In production worlds (Postgres, Vercel) this is enforced at the SQL/DB layer. In `world-local`, a per-step in-process async mutex in `packages/world-local/src/storage/events-storage.ts` wraps every step lifecycle event's check-and-write so the same guarantee holds for dev.
2. **Suspension handler reports ownership** --- `handleSuspension()` returns `createdStepCorrelationIds: Set<string>`, populated only for `step_created` writes that actually succeeded (not those that caught 409).
3. **Inline execution is gated on ownership** --- the runtime loop in `packages/core/src/runtime.ts` picks its inline step from `pendingSteps.filter(s => createdStepCorrelationIds.has(s.correlationId))`. A handler that didn't win any `step_created` race performs no inline execution.
4. **Queueing is unconditional** --- for every pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. This matches V1's enqueue pattern and is what makes crash recovery work: if a prior handler wrote `step_created` but crashed before enqueueing, a later handler (from flow-message redelivery or `reenqueueActiveRuns`) will enqueue the orphaned step. Concurrent handlers' redundant enqueues dedupe on the idempotency key.
4. **Queueing is owner-scoped when inline execution is possible** --- for every owned pending step except the one being inline-executed, the handler enqueues a background step message with `idempotencyKey: correlationId`. A non-owner does not queue steps it merely observed from a stale replay, so it cannot race the owner's inline execution. When a wait is also pending, no handler executes steps inline, so every pending step is queueable and queue idempotency handles duplicates. If the workflow queue message itself is redelivered, the handler also recovers pre-existing `step_created` / `step_retrying` entries that have not reached `step_started`, covering the crash window after `step_created` but before queue dispatch.

Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** at least one queued dispatch (from whichever handler first reaches the suspension path after the `step_created` is visible). Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work).
Together these give: every `step_created` event has exactly one inline executor (possibly zero if the inline path was skipped due to crash) **and** every non-inline owned step gets a queued dispatch from the owner. Step bodies are never executed concurrently, and `step_started` events never land in the log after `step_completed` for the same step. Event-log replay sees clean subscriber-matched sequences. With this invariant in place, the earlier `onUnconsumedEvent` skip logic for step/hook/wait lifecycle events was removed — any unconsumed event now immediately fatals as a corrupted event log (its original purpose before the V2 work).

**Retry semantics are preserved**: the per-step mutex in `world-local` only rejects `step_started` when the step is already in a *terminal* state (`completed` / `failed`). A step that is currently running (status=`running`) still accepts a second `step_started` write with an incremented attempt counter — this is how queue redelivery after a SIGKILL mid-execution legitimately re-runs the step. The previously-documented "attempt counter inflation" failure mode is therefore no longer reachable via the concurrent-inline path; see "Concurrent `step_started` Inflating Attempt Counter" below for the complementary executor-side guard that still catches edge cases (e.g., postgres retries under high contention).

Expand Down
32 changes: 32 additions & 0 deletions packages/core/src/classify-error.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import {
CorruptedEventLogError,
EntityConflictError,
HookConflictError,
RUN_ERROR_CODES,
RunExpiredError,
ThrottleError,
TooEarlyError,
WorkflowNotRegisteredError,
WorkflowRuntimeError,
WorkflowWorldError,
Expand Down Expand Up @@ -87,4 +91,32 @@ describe('classifyRunError', () => {
RUN_ERROR_CODES.USER_ERROR
);
});

it('classifies EntityConflictError as RUNTIME_ERROR (fence-conflict / 409 from runtime CAS write)', () => {
expect(
classifyRunError(
new EntityConflictError(
'fence conflict: Stale request: event-log fence conflict'
)
)
).toBe(RUN_ERROR_CODES.RUNTIME_ERROR);
});

it('classifies RunExpiredError as RUNTIME_ERROR (410 on runtime operation)', () => {
expect(classifyRunError(new RunExpiredError('run gone'))).toBe(
RUN_ERROR_CODES.RUNTIME_ERROR
);
});

it('classifies TooEarlyError as RUNTIME_ERROR (425 retry-after-not-reached)', () => {
expect(
classifyRunError(new TooEarlyError('too early', { retryAfter: 5 }))
).toBe(RUN_ERROR_CODES.RUNTIME_ERROR);
});

it('classifies ThrottleError as RUNTIME_ERROR (429 rate limited)', () => {
expect(
classifyRunError(new ThrottleError('throttled', { retryAfter: 1 }))
).toBe(RUN_ERROR_CODES.RUNTIME_ERROR);
});
});
21 changes: 21 additions & 0 deletions packages/core/src/classify-error.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import {
CorruptedEventLogError,
EntityConflictError,
RUN_ERROR_CODES,
type RunErrorCode,
RunExpiredError,
StepNotRegisteredError,
ThrottleError,
TooEarlyError,
WorkflowNotRegisteredError,
WorkflowRuntimeError,
WorkflowWorldError,
Expand All @@ -20,11 +24,28 @@ const WORLD_CONTRACT_ERROR_CODES = new Set([
* not enough — we have to enumerate every concrete subclass we want to
* recognize. Keep in sync with the `WorkflowRuntimeError` class hierarchy
* in `@workflow/errors`.
*
* The `WorkflowWorldError` subclasses below (`EntityConflictError`,
* `RunExpiredError`, `TooEarlyError`, `ThrottleError`) are infrastructure-
* level conditions surfaced by the runtime's own calls into the world
* (CAS rejections, run cleanup, retry-after, rate limits). The runtime
* normally retries past them; if they reach `classifyRunError`, the
* runtime's retry budget exhausted — that's a transient infra failure,
* not user code, so `RUNTIME_ERROR` is the truthful classification.
*
* The bare `WorkflowWorldError` parent is *not* in this list: it can
* also surface from user-code `fetch` calls into the workflow API, which
* should remain `USER_ERROR` (see the test case on line ~43 of
* `classify-error.test.ts`).
*/
const RUNTIME_ERROR_CHECKS = [
WorkflowRuntimeError.is,
StepNotRegisteredError.is,
WorkflowNotRegisteredError.is,
EntityConflictError.is,
RunExpiredError.is,
TooEarlyError.is,
ThrottleError.is,
];

/**
Expand Down
Loading
Loading