diff --git a/.changeset/ready-pens-hug.md b/.changeset/ready-pens-hug.md
new file mode 100644
index 0000000000..a845151cc8
--- /dev/null
+++ b/.changeset/ready-pens-hug.md
@@ -0,0 +1,2 @@
+---
+---
diff --git a/.github/scripts/render-event-log-race-repro-results.js b/.github/scripts/render-event-log-race-repro-results.js
new file mode 100644
index 0000000000..cf0922833a
--- /dev/null
+++ b/.github/scripts/render-event-log-race-repro-results.js
@@ -0,0 +1,405 @@
+#!/usr/bin/env node
+
+const fs = require('node:fs');
+
+const args = process.argv.slice(2);
+let resultsPath = 'event-log-race-repro-results.json';
+let runUrl = '';
+let previousCommentPath = '';
+let timestamp = new Date().toISOString();
+let runAttempt = '';
+let check = false;
+
+for (let index = 0; index < args.length; index += 1) {
+ const arg = args[index];
+ if (arg === '--run-url' && args[index + 1]) {
+ runUrl = args[index + 1];
+ index += 1;
+ } else if (arg === '--previous-comment' && args[index + 1]) {
+ previousCommentPath = args[index + 1];
+ index += 1;
+ } else if (arg === '--timestamp' && args[index + 1]) {
+ timestamp = args[index + 1];
+ index += 1;
+ } else if (arg === '--run-attempt' && args[index + 1]) {
+ runAttempt = args[index + 1];
+ index += 1;
+ } else if (arg === '--check') {
+ check = true;
+ } else if (!arg.startsWith('--')) {
+ resultsPath = arg;
+ }
+}
+
+const historyMarkerStart = '';
+
+const orderedOutcomes = [
+ 'completed',
+ 'CORRUPTED_EVENT_LOG',
+ 'USER_ERROR',
+ 'RUNTIME_ERROR',
+ 'stuck',
+ 'other',
+];
+
+function emptyDistribution() {
+ return Object.fromEntries(orderedOutcomes.map((outcome) => [outcome, 0]));
+}
+
+function loadResults() {
+ if (!fs.existsSync(resultsPath)) {
+ return null;
+ }
+ return JSON.parse(fs.readFileSync(resultsPath, 'utf8'));
+}
+
+function loadPreviousComment() {
+ if (!previousCommentPath || !fs.existsSync(previousCommentPath)) {
+ return '';
+ }
+ return fs.readFileSync(previousCommentPath, 'utf8');
+}
+
+function loadHistory(previousComment) {
+ if (!previousComment) {
+ return [];
+ }
+
+ const historyPattern = new RegExp(
+ `${historyMarkerStart}\\n([\\s\\S]*?)\\n${historyMarkerEnd}`
+ );
+ const match = previousComment.match(historyPattern);
+ if (!match) {
+ return [];
+ }
+
+ try {
+ const history = JSON.parse(match[1]);
+ return Array.isArray(history) ? history : [];
+ } catch {
+ return [];
+ }
+}
+
+function summarize(results) {
+ const distribution = emptyDistribution();
+ for (const result of results) {
+ distribution[result.outcome] = (distribution[result.outcome] ?? 0) + 1;
+ }
+ return distribution;
+}
+
+function summarizeByScenario(results) {
+ const byScenario = {};
+ for (const result of results) {
+ const scenario = result.scenario ?? 'unknown';
+ byScenario[scenario] ??= emptyDistribution();
+ byScenario[scenario][result.outcome] =
+ (byScenario[scenario][result.outcome] ?? 0) + 1;
+ }
+ return byScenario;
+}
+
+function nonCompletedCount(distribution) {
+ return orderedOutcomes
+ .filter((outcome) => outcome !== 'completed')
+ .reduce((sum, outcome) => sum + (distribution[outcome] ?? 0), 0);
+}
+
+function compactTimestamp(value) {
+ const parsed = Date.parse(value);
+ if (Number.isNaN(parsed)) {
+ return value;
+ }
+ return `${new Date(parsed).toISOString().replace('T', ' ').slice(0, 16)} UTC`;
+}
+
+function markdownLink(label, href) {
+ return href ? `[${label}](${href})` : label;
+}
+
+function renderRunHeader(entry) {
+ const links = [
+ entry.runUrl ? markdownLink('logs', entry.runUrl) : '',
+ entry.deploymentUrl ? markdownLink('deploy', entry.deploymentUrl) : '',
+ ].filter(Boolean);
+ const attemptSuffix = entry.runAttempt ? ` #${entry.runAttempt}` : '';
+ return `${compactTimestamp(entry.timestamp)}${attemptSuffix}
${links.join(' / ')}`;
+}
+
+function renderCount(value) {
+ return String(value ?? 0);
+}
+
+function renderResult(entry) {
+ if (entry.missingResults) {
+ return 'missing result file';
+ }
+ return entry.failedCount === 0
+ ? 'all completed'
+ : `${entry.failedCount}/${entry.total} non-completed`;
+}
+
+function renderConfig(entry) {
+ const config = entry.config ?? {};
+ const attempts = config.attempts ? `${config.attempts} runs` : '';
+ const scenarios = [
+ config.hookSleepAttempts ? `hook ${config.hookSleepAttempts}` : '',
+ config.stepFanoutAttempts ? `fanout ${config.stepFanoutAttempts}` : '',
+ config.stepSleepRaceAttempts ? `race ${config.stepSleepRaceAttempts}` : '',
+ ]
+ .filter(Boolean)
+ .join(', ');
+ const concurrency = config.concurrency ? `c${config.concurrency}` : '';
+ const stepConcurrency = config.stepConcurrency
+ ? `step c${config.stepConcurrency}`
+ : '';
+ const iterations = config.iterations ? `${config.iterations} iters` : '';
+ return [attempts, scenarios, concurrency, stepConcurrency, iterations]
+ .filter(Boolean)
+ .join(' / ');
+}
+
+function renderTiming(entry) {
+ const config = entry.config ?? {};
+ return [
+ config.sleepMs ? `sleep ${config.sleepMs}ms` : '',
+ config.resumeDelayMs || config.resumeJitterMs
+ ? `resume ${config.resumeDelayMs ?? 0}+${config.resumeJitterMs ?? 0}ms`
+ : '',
+ config.runTimeoutMs ? `timeout ${config.runTimeoutMs}ms` : '',
+ ]
+ .filter(Boolean)
+ .join(' / ');
+}
+
+function compactConfig(config = {}) {
+ return {
+ attempts: config.attempts,
+ hookSleepAttempts: config.hookSleepAttempts,
+ stepFanoutAttempts: config.stepFanoutAttempts,
+ stepSleepRaceAttempts: config.stepSleepRaceAttempts,
+ concurrency: config.concurrency,
+ stepConcurrency: config.stepConcurrency,
+ iterations: config.iterations,
+ sleepMs: config.sleepMs,
+ resumeDelayMs: config.resumeDelayMs,
+ resumeJitterMs: config.resumeJitterMs,
+ runTimeoutMs: config.runTimeoutMs,
+ stepFanoutRounds: config.stepFanoutRounds,
+ stepFanoutWidth: config.stepFanoutWidth,
+ stepRaceRounds: config.stepRaceRounds,
+ };
+}
+
+function compactHistoryEntry(entry, keepFailures = false) {
+ return {
+ timestamp: entry.timestamp,
+ runAttempt: entry.runAttempt,
+ runUrl: entry.runUrl,
+ deploymentUrl: entry.deploymentUrl,
+ missingResults: entry.missingResults,
+ distribution: entry.distribution ?? emptyDistribution(),
+ scenarioDistribution: entry.scenarioDistribution ?? {},
+ failedCount: entry.failedCount ?? 0,
+ total: entry.total ?? 0,
+ config: compactConfig(entry.config),
+ failing: keepFailures ? (entry.failing ?? []) : [],
+ truncatedFailingCount: keepFailures
+ ? (entry.truncatedFailingCount ?? 0)
+ : 0,
+ };
+}
+
+function buildEntry(resultsFile) {
+ if (!resultsFile) {
+ return {
+ timestamp,
+ runAttempt,
+ runUrl,
+ deploymentUrl: '',
+ missingResults: true,
+ distribution: emptyDistribution(),
+ failedCount: 1,
+ total: 0,
+ config: {},
+ scenarioDistribution: {},
+ failing: [],
+ };
+ }
+
+ const results = resultsFile.results ?? [];
+ const distribution = resultsFile.distribution ?? summarize(results);
+ const scenarioDistribution =
+ resultsFile.scenarioDistribution ?? summarizeByScenario(results);
+ const failedCount = nonCompletedCount(distribution);
+ const total = orderedOutcomes.reduce(
+ (sum, outcome) => sum + (distribution[outcome] ?? 0),
+ 0
+ );
+ const failing = results
+ .filter((result) => result.outcome !== 'completed')
+ .slice(0, 20)
+ .map((result) => ({
+ attempt: result.attempt,
+ scenario: result.scenario,
+ outcome: result.outcome,
+ status: result.status,
+ errorCode: result.errorCode,
+ runId: result.runId,
+ dashboardUrl: result.dashboardUrl,
+ }));
+
+ return {
+ timestamp,
+ runAttempt,
+ runUrl,
+ deploymentUrl: resultsFile.deploymentUrl,
+ missingResults: false,
+ distribution,
+ scenarioDistribution,
+ failedCount,
+ total,
+ config: compactConfig(resultsFile.config),
+ failing,
+ truncatedFailingCount: Math.max(
+ 0,
+ results.filter((result) => result.outcome !== 'completed').length -
+ failing.length
+ ),
+ };
+}
+
+function appendHistory(history, entry) {
+ const key = `${entry.runUrl || entry.timestamp}#${entry.runAttempt}`;
+ const nextHistory = history
+ .filter(
+ (historyEntry) =>
+ `${historyEntry.runUrl || historyEntry.timestamp}#${historyEntry.runAttempt}` !==
+ key
+ )
+ .map((historyEntry) => compactHistoryEntry(historyEntry));
+ nextHistory.push(compactHistoryEntry(entry, true));
+ return nextHistory;
+}
+
+function renderHistoryTable(history) {
+ console.log('### Run History\n');
+ console.log(`| Metric | ${history.map(renderRunHeader).join(' | ')} |`);
+ console.log(`|:--|${history.map(() => ':--').join('|')}|`);
+ console.log(`| Result | ${history.map(renderResult).join(' | ')} |`);
+ console.log(`| Total | ${history.map((entry) => entry.total).join(' | ')} |`);
+ for (const outcome of orderedOutcomes) {
+ console.log(
+ `| ${outcome} | ${history
+ .map((entry) => renderCount(entry.distribution?.[outcome]))
+ .join(' | ')} |`
+ );
+ }
+ console.log(`| Config | ${history.map(renderConfig).join(' | ')} |`);
+ console.log(`| Timing | ${history.map(renderTiming).join(' | ')} |`);
+ console.log('');
+}
+
+function renderLatestScenarioBreakdown(entry) {
+ if (entry.missingResults) {
+ return;
+ }
+
+ const scenarioEntries = Object.entries(entry.scenarioDistribution ?? {});
+ if (scenarioEntries.length === 0) {
+ return;
+ }
+
+ console.log('### Latest Scenario Breakdown\n');
+ console.log(`| Scenario | Total | ${orderedOutcomes.join(' | ')} |`);
+ console.log(`|:--|--:|${orderedOutcomes.map(() => '--:').join('|')}|`);
+ for (const [scenario, distribution] of scenarioEntries) {
+ const total = orderedOutcomes.reduce(
+ (sum, outcome) => sum + (distribution[outcome] ?? 0),
+ 0
+ );
+ console.log(
+ `| ${scenario} | ${total} | ${orderedOutcomes
+ .map((outcome) => renderCount(distribution[outcome]))
+ .join(' | ')} |`
+ );
+ }
+ console.log('');
+}
+
+function renderLatestFailures(entry) {
+ if (entry.missingResults) {
+ return;
+ }
+
+ if (entry.failing.length === 0) {
+ return;
+ }
+
+ console.log('### Latest Non-Completed Runs\n');
+ console.log('| Scenario | Attempt | Outcome | Status | Error code | Run |');
+ console.log('|:--|--:|:--|:--|:--|:--|');
+ for (const result of entry.failing) {
+ const run =
+ result.dashboardUrl && result.runId
+ ? `[${result.runId}](${result.dashboardUrl})`
+ : (result.runId ?? '');
+ console.log(
+ `| ${result.scenario ?? ''} | ${result.attempt} | ${result.outcome} | ${result.status ?? ''} | ${result.errorCode ?? ''} | ${run} |`
+ );
+ }
+ if (entry.truncatedFailingCount > 0) {
+ console.log(
+ `\nShowing 20 of ${entry.failing.length + entry.truncatedFailingCount} non-completed runs.`
+ );
+ }
+ console.log('');
+}
+
+function render(resultsFile, previousComment) {
+ const history = appendHistory(
+ loadHistory(previousComment),
+ buildEntry(resultsFile)
+ );
+ const latest = history[history.length - 1];
+
+ console.log('');
+ console.log('## Event Log Race Repro\n');
+ console.log(
+ latest.missingResults
+ ? 'No result file was produced by the latest repro job.'
+ : latest.failedCount === 0
+ ? 'The latest repro job completed all runs cleanly.'
+ : `${latest.failedCount} of ${latest.total} latest repro runs did not complete cleanly.`
+ );
+ console.log('');
+ console.log(historyMarkerStart);
+ console.log(JSON.stringify(history));
+ console.log(historyMarkerEnd);
+ console.log('');
+
+ renderHistoryTable(history);
+ renderLatestScenarioBreakdown(latest);
+ renderLatestFailures(latest);
+}
+
+const resultsFile = loadResults();
+const previousComment = loadPreviousComment();
+
+if (!resultsFile) {
+ if (!check) {
+ render(null, previousComment);
+ }
+ process.exit(check ? 1 : 0);
+}
+
+if (!check) {
+ render(resultsFile, previousComment);
+ process.exit(0);
+}
+
+const distribution =
+ resultsFile.distribution ?? summarize(resultsFile.results ?? []);
+process.exit(nonCompletedCount(distribution) > 0 ? 1 : 0);
diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml
new file mode 100644
index 0000000000..6991c8e11e
--- /dev/null
+++ b/.github/workflows/event-log-race-repro.yml
@@ -0,0 +1,168 @@
+name: Event Log Race Repro
+
+on:
+ pull_request:
+ branches: [main, stable]
+ types: [opened, reopened, synchronize, labeled]
+ workflow_dispatch:
+ inputs:
+ attempts:
+ description: 'Number of hook/sleep workflow runs to start'
+ required: false
+ default: '1500'
+ step_fanout_attempts:
+ description: 'Number of parallel-step fanout workflow runs to start'
+ required: false
+ default: '250'
+ step_sleep_race_attempts:
+ description: 'Number of step/sleep race workflow runs to start'
+ required: false
+ default: '250'
+ concurrency:
+ description: 'Number of concurrent workflow runs'
+ required: false
+ default: '50'
+ step_concurrency:
+ description: 'Number of concurrent step-heavy workflow runs'
+ required: false
+ default: '50'
+ iterations:
+ description: 'Hook/sleep race iterations per workflow run'
+ required: false
+ default: '5'
+ run_timeout_ms:
+ description: 'Per-run timeout before classifying as stuck'
+ required: false
+ default: '150000'
+ sleep_ms:
+ description: 'Workflow sleep duration in the hook/sleep race'
+ required: false
+ default: '5000'
+ resume_delay_ms:
+ description: 'Base delay before resuming the hook'
+ required: false
+ default: '15000'
+ resume_jitter_ms:
+ description: 'Additional random hook resume delay'
+ required: false
+ default: '10000'
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: ${{ github.ref != 'refs/heads/main' && github.ref != 'refs/heads/stable' }}
+
+jobs:
+ event-log-race-repro:
+ name: Event Log Race Repro
+ runs-on: ubuntu-latest
+ timeout-minutes: 120
+ if: ${{ github.event_name == 'workflow_dispatch' || contains(github.event.pull_request.labels.*.name, 'event-log-race-repro') }}
+ permissions:
+ contents: read
+ deployments: read
+ statuses: read
+ id-token: write
+ issues: write
+ pull-requests: write
+ env:
+ TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }}
+ TURBO_TEAM: ${{ vars.TURBO_TEAM }}
+ WORKFLOW_PUBLIC_MANIFEST: '1'
+ WORKFLOW_NEXT_LAZY_DISCOVERY: '0'
+
+ steps:
+ - name: Checkout Repo
+ uses: actions/checkout@v4
+ with:
+ ref: ${{ github.event.pull_request.head.sha || github.sha }}
+
+ - name: Setup environment
+ uses: ./.github/actions/setup-workflow-dev
+ with:
+ build-packages: 'false'
+
+ - name: Build CLI
+ run: pnpm turbo run build --filter='@workflow/cli'
+
+ - name: Wait for Vercel deployment
+ id: waitForDeployment
+ uses: vercel/wait-for-deployment-action@0e2b0c5c5cce31f1648108aeec56467187aca037
+ with:
+ project-slug: example-nextjs-workflow-turbopack
+ timeout: 1000
+ check-interval: 15
+ environment: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }}
+
+ - name: Run event log race repro
+ id: repro
+ continue-on-error: true
+ run: pnpm run test:e2e:event-log-race-repro --reporter=default
+ env:
+ NODE_OPTIONS: "--enable-source-maps"
+ DEPLOYMENT_URL: ${{ steps.waitForDeployment.outputs.deployment-url }}
+ VERCEL_DEPLOYMENT_ID: ${{ steps.waitForDeployment.outputs.deployment-id }}
+ APP_NAME: nextjs-turbopack
+ WORKFLOW_VERCEL_ENV: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }}
+ WORKFLOW_VERCEL_AUTH_TOKEN: ${{ secrets.VERCEL_LABS_TOKEN }}
+ WORKFLOW_VERCEL_TEAM: "team_nO2mCG4W8IxPIeKoSsqwAxxB"
+ WORKFLOW_VERCEL_PROJECT: "prj_yjkM7UdHliv8bfxZ1sMJQf1pMpdi"
+ WORKFLOW_VERCEL_PROJECT_SLUG: example-nextjs-workflow-turbopack
+ VERCEL_WORKFLOW_SERVER_URL: ${{ github.ref != 'refs/heads/main' && secrets.VERCEL_WORKFLOW_SERVER_URL || '' }}
+ EVENT_LOG_RACE_REPRO_ATTEMPTS: ${{ inputs.attempts || '1500' }}
+ EVENT_LOG_RACE_REPRO_STEP_FANOUT_ATTEMPTS: ${{ inputs.step_fanout_attempts || '250' }}
+ EVENT_LOG_RACE_REPRO_STEP_SLEEP_RACE_ATTEMPTS: ${{ inputs.step_sleep_race_attempts || '250' }}
+ EVENT_LOG_RACE_REPRO_CONCURRENCY: ${{ inputs.concurrency || '50' }}
+ EVENT_LOG_RACE_REPRO_STEP_CONCURRENCY: ${{ inputs.step_concurrency || '50' }}
+ EVENT_LOG_RACE_REPRO_ITERATIONS: ${{ inputs.iterations || '5' }}
+ EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS: ${{ inputs.run_timeout_ms || '150000' }}
+ EVENT_LOG_RACE_REPRO_SLEEP_MS: ${{ inputs.sleep_ms || '5000' }}
+ EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS: ${{ inputs.resume_delay_ms || '15000' }}
+ EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS: ${{ inputs.resume_jitter_ms || '10000' }}
+
+ - name: Fetch previous repro comment
+ if: always() && github.event_name == 'pull_request'
+ env:
+ GH_TOKEN: ${{ github.token }}
+ run: |
+ gh api \
+ "repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/comments?per_page=100" \
+ --jq '[.[] | select(.body | contains(""))][-1].body // ""' \
+ > event-log-race-repro-previous-comment.md
+
+ - name: Render repro summary
+ if: always()
+ run: |
+ previous_comment_args=()
+ if [ -f event-log-race-repro-previous-comment.md ]; then
+ previous_comment_args=(--previous-comment event-log-race-repro-previous-comment.md)
+ fi
+
+ node .github/scripts/render-event-log-race-repro-results.js \
+ event-log-race-repro-results.json \
+ --run-url "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" \
+ --run-attempt "${{ github.run_attempt }}" \
+ --timestamp "$(date -u +'%Y-%m-%dT%H:%M:%SZ')" \
+ "${previous_comment_args[@]}" \
+ | tee event-log-race-repro-summary.md >> "$GITHUB_STEP_SUMMARY"
+
+ - name: Update PR comment
+ if: always() && github.event_name == 'pull_request'
+ uses: marocchino/sticky-pull-request-comment@773744901bac0e8cbb5a0dc842800d45e9b2b405 # v2.9.4
+ with:
+ header: event-log-race-repro-results
+ path: event-log-race-repro-summary.md
+
+ - name: Upload repro results
+ if: always()
+ uses: actions/upload-artifact@v4
+ with:
+ name: event-log-race-repro-results
+ path: |
+ event-log-race-repro-results.json
+ event-log-race-repro-summary.md
+ retention-days: 7
+ if-no-files-found: ignore
+
+ - name: Fail on corrupted, failed, or stuck runs
+ if: always()
+ run: node .github/scripts/render-event-log-race-repro-results.js event-log-race-repro-results.json --check
diff --git a/package.json b/package.json
index 7cf5becf35..4440b0b8e7 100644
--- a/package.json
+++ b/package.json
@@ -34,6 +34,7 @@
"clean": "turbo clean",
"typecheck": "turbo typecheck",
"test:e2e": "vitest run packages/core/e2e/e2e.test.ts packages/core/e2e/e2e-agent.test.ts",
+ "test:e2e:event-log-race-repro": "vitest run packages/core/e2e/event-log-race-repro.test.ts",
"test:e2e:nextjs-webpack:staged": "node scripts/test-staged-nextjs-webpack.mjs",
"test:docs": "pnpm --filter @workflow/docs-typecheck test:docs",
"bench": "vitest bench packages/core/e2e/bench.bench.ts",
diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts
new file mode 100644
index 0000000000..ce58ca7080
--- /dev/null
+++ b/packages/core/e2e/event-log-race-repro.test.ts
@@ -0,0 +1,881 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import { setTimeout as sleep } from 'node:timers/promises';
+import { WorkflowRunFailedError } from '@workflow/errors';
+import { beforeAll, describe, expect, test } from 'vitest';
+import type { Run } from '../src/runtime';
+import {
+ getHookByToken,
+ getWorld,
+ start as rawStart,
+ resumeHook,
+} from '../src/runtime';
+import { getWorkflowMetadata, setupWorld, trackRun } from './utils';
+
+const deploymentUrl = process.env.DEPLOYMENT_URL;
+if (!deploymentUrl) {
+ throw new Error('`DEPLOYMENT_URL` environment variable is not set');
+}
+
+const RESULT_PATH = path.resolve(
+ process.cwd(),
+ 'event-log-race-repro-results.json'
+);
+const WORKFLOW_FILE = 'workflows/101_hook_sleep_repro.ts';
+
+type Scenario =
+ | 'hook-sleep'
+ | 'step-fanout'
+ | 'step-sleep-race-step-biased'
+ | 'step-sleep-race-sleep-biased';
+
+type Outcome =
+ | 'completed'
+ | 'CORRUPTED_EVENT_LOG'
+ | 'USER_ERROR'
+ | 'RUNTIME_ERROR'
+ | 'stuck'
+ | 'other';
+
+interface ReproConfig {
+ hookSleepAttempts: number;
+ stepFanoutAttempts: number;
+ stepSleepRaceAttempts: number;
+ concurrency: number;
+ stepConcurrency: number;
+ iterations: number;
+ sleepMs: number;
+ resumeDelayMs: number;
+ resumeJitterMs: number;
+ runTimeoutMs: number;
+ hookTimeoutMs: number;
+ sleepBranchWaitCount: number;
+ sleepBranchWaitMs: number;
+ sleepBranchWaitSpacingMs: number;
+ returnOnWake: boolean;
+ drainDelayMs: number;
+ finalDelayMs: number;
+ stepFanoutRounds: number;
+ stepFanoutWidth: number;
+ stepFanoutDelayMs: number;
+ stepFanoutDelayJitterMs: number;
+ stepFanoutAggregateDelayMs: number;
+ stepFanoutBetweenRoundSleepMs: number;
+ stepRaceRounds: number;
+ stepRaceStepWinDelayMs: number;
+ stepRaceStepWinSleepMs: number;
+ stepRaceSleepWinDelayMs: number;
+ stepRaceSleepWinSleepMs: number;
+ stepRacePostSleepMs: number;
+}
+
+interface ReproRunResult {
+ attempt: number;
+ scenario: Scenario;
+ token: string;
+ runId?: string;
+ outcome: Outcome;
+ status?: string;
+ errorCode?: string;
+ errorMessage?: string;
+ durationMs: number;
+ dashboardUrl?: string;
+}
+
+function envNumber(name: string, fallback: number) {
+ const raw = process.env[name];
+ if (!raw) return fallback;
+ const parsed = Number(raw);
+ return Number.isFinite(parsed) ? parsed : fallback;
+}
+
+function envBoolean(name: string, fallback: boolean) {
+ const raw = process.env[name];
+ if (!raw) return fallback;
+ if (raw === '1' || raw === 'true') return true;
+ if (raw === '0' || raw === 'false') return false;
+ return fallback;
+}
+
+const config: ReproConfig = {
+ hookSleepAttempts: envNumber('EVENT_LOG_RACE_REPRO_ATTEMPTS', 1500),
+ stepFanoutAttempts: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_FANOUT_ATTEMPTS',
+ 250
+ ),
+ stepSleepRaceAttempts: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_SLEEP_RACE_ATTEMPTS',
+ 250
+ ),
+ concurrency: envNumber('EVENT_LOG_RACE_REPRO_CONCURRENCY', 50),
+ stepConcurrency: envNumber('EVENT_LOG_RACE_REPRO_STEP_CONCURRENCY', 50),
+ iterations: envNumber('EVENT_LOG_RACE_REPRO_ITERATIONS', 5),
+ sleepMs: envNumber('EVENT_LOG_RACE_REPRO_SLEEP_MS', 5000),
+ resumeDelayMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS', 15_000),
+ resumeJitterMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS', 10_000),
+ runTimeoutMs: envNumber('EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS', 150_000),
+ hookTimeoutMs: envNumber('EVENT_LOG_RACE_REPRO_HOOK_TIMEOUT_MS', 60_000),
+ sleepBranchWaitCount: envNumber(
+ 'EVENT_LOG_RACE_REPRO_SLEEP_BRANCH_WAIT_COUNT',
+ 2
+ ),
+ sleepBranchWaitMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_SLEEP_BRANCH_WAIT_MS',
+ 1000
+ ),
+ sleepBranchWaitSpacingMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_SLEEP_BRANCH_WAIT_SPACING_MS',
+ 250
+ ),
+ returnOnWake: envBoolean('EVENT_LOG_RACE_REPRO_RETURN_ON_WAKE', true),
+ drainDelayMs: envNumber('EVENT_LOG_RACE_REPRO_DRAIN_DELAY_MS', 0),
+ finalDelayMs: envNumber('EVENT_LOG_RACE_REPRO_FINAL_DELAY_MS', 0),
+ stepFanoutRounds: envNumber('EVENT_LOG_RACE_REPRO_STEP_FANOUT_ROUNDS', 4),
+ stepFanoutWidth: envNumber('EVENT_LOG_RACE_REPRO_STEP_FANOUT_WIDTH', 4),
+ stepFanoutDelayMs: envNumber('EVENT_LOG_RACE_REPRO_STEP_FANOUT_DELAY_MS', 0),
+ stepFanoutDelayJitterMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_FANOUT_DELAY_JITTER_MS',
+ 75
+ ),
+ stepFanoutAggregateDelayMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_FANOUT_AGGREGATE_DELAY_MS',
+ 0
+ ),
+ stepFanoutBetweenRoundSleepMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_FANOUT_BETWEEN_ROUND_SLEEP_MS',
+ 250
+ ),
+ stepRaceRounds: envNumber('EVENT_LOG_RACE_REPRO_STEP_RACE_ROUNDS', 4),
+ stepRaceStepWinDelayMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_RACE_STEP_WIN_DELAY_MS',
+ 0
+ ),
+ stepRaceStepWinSleepMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_RACE_STEP_WIN_SLEEP_MS',
+ 5000
+ ),
+ stepRaceSleepWinDelayMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_RACE_SLEEP_WIN_DELAY_MS',
+ 5000
+ ),
+ stepRaceSleepWinSleepMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_RACE_SLEEP_WIN_SLEEP_MS',
+ 500
+ ),
+ stepRacePostSleepMs: envNumber(
+ 'EVENT_LOG_RACE_REPRO_STEP_RACE_POST_SLEEP_MS',
+ 1500
+ ),
+};
+
+async function start(
+ scenario: Scenario,
+ workflowFn: string,
+ workflow: { workflowId: string },
+ args: unknown[]
+): Promise> {
+ const run = await rawStart(workflow, args);
+ trackRun(run, {
+ testName: `event-log-race-repro:${scenario}`,
+ workflowFile: WORKFLOW_FILE,
+ workflowFn,
+ });
+ return run;
+}
+
+async function waitForHook(token: string, runId: string) {
+ const deadline = Date.now() + config.hookTimeoutMs;
+ let lastError: unknown;
+ while (Date.now() < deadline) {
+ try {
+ const hook = await getHookByToken(token);
+ if (hook.runId === runId) {
+ return hook;
+ }
+ lastError = new Error(
+ `Hook ${token} belonged to ${hook.runId}, expected ${runId}`
+ );
+ } catch (err) {
+ lastError = err;
+ }
+ await sleep(250);
+ }
+ throw lastError instanceof Error
+ ? lastError
+ : new Error(`Timed out waiting for hook ${token}`);
+}
+
+function getDashboardUrl(runId: string): string | undefined {
+ const projectSlug = process.env.WORKFLOW_VERCEL_PROJECT_SLUG;
+ const env = process.env.WORKFLOW_VERCEL_ENV;
+ if (!projectSlug || !env) return undefined;
+
+ const environment = env === 'production' ? 'production' : 'preview';
+ return `https://vercel.com/vercel-labs/${projectSlug}/observability/workflows/runs/${runId}?environment=${environment}`;
+}
+
+function classifyFailure(errorCode: string | undefined): Outcome {
+ if (
+ errorCode === 'CORRUPTED_EVENT_LOG' ||
+ errorCode === 'USER_ERROR' ||
+ errorCode === 'RUNTIME_ERROR'
+ ) {
+ return errorCode;
+ }
+ return 'other';
+}
+
+function hasWakeBranch(value: unknown) {
+ if (!value || typeof value !== 'object' || !('branches' in value)) {
+ return false;
+ }
+ const branches = (value as { branches?: unknown }).branches;
+ return (
+ Array.isArray(branches) &&
+ branches.some(
+ (branch) =>
+ branch &&
+ typeof branch === 'object' &&
+ 'branch' in branch &&
+ branch.branch === 'wake'
+ )
+ );
+}
+
+function isRecord(value: unknown): value is Record {
+ return !!value && typeof value === 'object';
+}
+
+function validateFanoutReturn(value: unknown) {
+ if (!isRecord(value)) {
+ return 'Run returned a non-object value.';
+ }
+
+ const records = value.roundRecords;
+ if (!Array.isArray(records)) {
+ return 'Run did not return roundRecords.';
+ }
+
+ if (records.length !== config.stepFanoutRounds) {
+ return `Expected ${config.stepFanoutRounds} fanout rounds, got ${records.length}.`;
+ }
+
+ for (const [index, record] of records.entries()) {
+ if (!isRecord(record)) {
+ return `Round ${index} record was not an object.`;
+ }
+ if (record.round !== index) {
+ return `Round ${index} returned unexpected round ${String(record.round)}.`;
+ }
+ if (record.count !== config.stepFanoutWidth) {
+ return `Round ${index} aggregated ${String(record.count)} steps instead of ${config.stepFanoutWidth}.`;
+ }
+ if (typeof record.checksum !== 'string') {
+ return `Round ${index} did not return a checksum.`;
+ }
+ }
+}
+
+function validateStepRaceReturn(value: unknown) {
+ if (!isRecord(value)) {
+ return 'Run returned a non-object value.';
+ }
+
+ const branches = value.branches;
+ if (!Array.isArray(branches)) {
+ return 'Run did not return branches.';
+ }
+
+ if (branches.length !== config.stepRaceRounds) {
+ return `Expected ${config.stepRaceRounds} race rounds, got ${branches.length}.`;
+ }
+
+ for (const [index, branch] of branches.entries()) {
+ if (!isRecord(branch)) {
+ return `Race round ${index} record was not an object.`;
+ }
+ if (branch.round !== index) {
+ return `Race round ${index} returned unexpected round ${String(branch.round)}.`;
+ }
+ if (branch.branch !== 'sleep' && branch.branch !== 'step') {
+ return `Race round ${index} took unexpected ${String(branch.branch)} branch.`;
+ }
+
+ const marker = branch.marker;
+ if (!isRecord(marker)) {
+ return `Race round ${index} marker was not an object.`;
+ }
+ if (marker.branch !== branch.branch) {
+ return `Race round ${index} marker branch ${String(marker.branch)} did not match returned branch ${String(branch.branch)}.`;
+ }
+ }
+}
+
+async function pollTerminalRun(
+ run: Run,
+ startedAt: number,
+ scenario: Scenario
+): Promise {
+ const world = await getWorld();
+ const deadline = startedAt + config.runTimeoutMs;
+ let lastStatus: string | undefined;
+
+ while (Date.now() < deadline) {
+ const runData = await world.runs.get(run.runId);
+ lastStatus = runData.status;
+
+ if (runData.status === 'completed') {
+ return {
+ attempt: -1,
+ scenario,
+ token: '',
+ runId: run.runId,
+ outcome: 'completed',
+ status: runData.status,
+ durationMs: Date.now() - startedAt,
+ dashboardUrl: getDashboardUrl(run.runId),
+ };
+ }
+
+ if (runData.status === 'failed') {
+ return {
+ attempt: -1,
+ scenario,
+ token: '',
+ runId: run.runId,
+ outcome: classifyFailure(runData.errorCode),
+ status: runData.status,
+ errorCode: runData.errorCode,
+ durationMs: Date.now() - startedAt,
+ dashboardUrl: getDashboardUrl(run.runId),
+ };
+ }
+
+ if (runData.status === 'cancelled') {
+ return {
+ attempt: -1,
+ scenario,
+ token: '',
+ runId: run.runId,
+ outcome: 'other',
+ status: runData.status,
+ errorCode: 'CANCELLED',
+ durationMs: Date.now() - startedAt,
+ dashboardUrl: getDashboardUrl(run.runId),
+ };
+ }
+
+ await sleep(1000);
+ }
+
+ return {
+ attempt: -1,
+ scenario,
+ token: '',
+ runId: run.runId,
+ outcome: 'stuck',
+ status: lastStatus,
+ durationMs: Date.now() - startedAt,
+ dashboardUrl: getDashboardUrl(run.runId),
+ };
+}
+
+async function withTimeout(
+ promise: Promise,
+ timeoutMs: number,
+ message: string
+) {
+ const timeout = sleep(timeoutMs).then(() => {
+ throw new Error(message);
+ });
+ return await Promise.race([promise, timeout]);
+}
+
+async function runHookSleepAttempt(attempt: number): Promise {
+ const scenario: Scenario = 'hook-sleep';
+ const startedAt = Date.now();
+ const token = `event-log-race-${scenario}-${Date.now()}-${attempt}-${Math.random()
+ .toString(36)
+ .slice(2)}`;
+
+ try {
+ const workflow = await getWorkflowMetadata(
+ deploymentUrl,
+ WORKFLOW_FILE,
+ 'hookSleepReproWorkflow'
+ );
+ const run = await start(scenario, 'hookSleepReproWorkflow', workflow, [
+ {
+ token,
+ iterations: config.iterations,
+ sleepMs: config.sleepMs,
+ returnOnWake: config.returnOnWake,
+ drainDelayMs: config.drainDelayMs,
+ finalDelayMs: config.finalDelayMs,
+ sleepBranchWaitCount: config.sleepBranchWaitCount,
+ sleepBranchWaitMs: config.sleepBranchWaitMs,
+ sleepBranchWaitSpacingMs: config.sleepBranchWaitSpacingMs,
+ },
+ ]);
+
+ const hook = await waitForHook(token, run.runId);
+ const jitter =
+ config.resumeJitterMs > 0
+ ? Math.floor(Math.random() * config.resumeJitterMs)
+ : 0;
+ const resumeDelayMs = config.resumeDelayMs + jitter;
+ const resumePromise = sleep(resumeDelayMs).then(() =>
+ resumeHook(hook, { attempt, sentAt: Date.now() })
+ );
+
+ const runResult = await pollTerminalRun(run, startedAt, scenario);
+ const resumeResult = await Promise.allSettled([
+ withTimeout(
+ resumePromise,
+ 30_000,
+ `Timed out resuming hook ${token} for run ${run.runId}`
+ ),
+ ]);
+
+ const resumeFailure = resumeResult.find(
+ (result) => result.status === 'rejected'
+ );
+ if (runResult.outcome === 'completed') {
+ if (resumeFailure?.status === 'rejected') {
+ return {
+ ...runResult,
+ attempt,
+ scenario,
+ token,
+ outcome: 'other',
+ errorCode: 'HOOK_RESUME_FAILED',
+ errorMessage: String(resumeFailure.reason),
+ };
+ }
+
+ const returnValue = await withTimeout(
+ run.returnValue,
+ 30_000,
+ `Timed out reading return value for run ${run.runId}`
+ );
+ if (!hasWakeBranch(returnValue)) {
+ return {
+ ...runResult,
+ attempt,
+ scenario,
+ token,
+ outcome: 'other',
+ errorCode: 'NO_WAKE_BRANCH',
+ errorMessage: 'Run completed without taking the hook wake branch.',
+ };
+ }
+ }
+
+ return {
+ ...runResult,
+ attempt,
+ scenario,
+ token,
+ errorMessage:
+ runResult.errorMessage ??
+ (resumeFailure?.status === 'rejected'
+ ? String(resumeFailure.reason)
+ : undefined),
+ };
+ } catch (err) {
+ if (WorkflowRunFailedError.is(err)) {
+ return {
+ attempt,
+ scenario,
+ token,
+ runId: err.runId,
+ outcome: classifyFailure(err.errorCode),
+ status: 'failed',
+ errorCode: err.errorCode,
+ errorMessage: err.message,
+ durationMs: Date.now() - startedAt,
+ dashboardUrl: getDashboardUrl(err.runId),
+ };
+ }
+
+ return {
+ attempt,
+ scenario,
+ token,
+ outcome: 'other',
+ errorMessage: err instanceof Error ? err.message : String(err),
+ durationMs: Date.now() - startedAt,
+ };
+ }
+}
+
+async function resumeGate(
+ scenario: Scenario,
+ attempt: number,
+ token: string,
+ runId: string
+) {
+ const hook = await waitForHook(token, runId);
+ await resumeHook(hook, { attempt, scenario, sentAt: Date.now() });
+}
+
+async function runStepFanoutAttempt(attempt: number): Promise {
+ const scenario: Scenario = 'step-fanout';
+ const startedAt = Date.now();
+ const token = `event-log-race-${scenario}-${Date.now()}-${attempt}-${Math.random()
+ .toString(36)
+ .slice(2)}`;
+
+ try {
+ const workflow = await getWorkflowMetadata(
+ deploymentUrl,
+ WORKFLOW_FILE,
+ 'stepFanoutReplayReproWorkflow'
+ );
+ const run = await start(
+ scenario,
+ 'stepFanoutReplayReproWorkflow',
+ workflow,
+ [
+ {
+ aggregateDelayMs: config.stepFanoutAggregateDelayMs,
+ betweenRoundSleepMs: config.stepFanoutBetweenRoundSleepMs,
+ rounds: config.stepFanoutRounds,
+ stepDelayJitterMs: config.stepFanoutDelayJitterMs,
+ stepDelayMs: config.stepFanoutDelayMs,
+ token,
+ width: config.stepFanoutWidth,
+ },
+ ]
+ );
+
+ await resumeGate(scenario, attempt, token, run.runId);
+
+ const runResult = await pollTerminalRun(run, startedAt, scenario);
+ if (runResult.outcome === 'completed') {
+ const returnValue = await withTimeout(
+ run.returnValue,
+ 30_000,
+ `Timed out reading return value for run ${run.runId}`
+ );
+ const validationError = validateFanoutReturn(returnValue);
+ if (validationError) {
+ return {
+ ...runResult,
+ attempt,
+ scenario,
+ token,
+ outcome: 'other',
+ errorCode: 'BAD_FANOUT_RETURN',
+ errorMessage: validationError,
+ };
+ }
+ }
+
+ return {
+ ...runResult,
+ attempt,
+ scenario,
+ token,
+ };
+ } catch (err) {
+ if (WorkflowRunFailedError.is(err)) {
+ return {
+ attempt,
+ scenario,
+ token,
+ runId: err.runId,
+ outcome: classifyFailure(err.errorCode),
+ status: 'failed',
+ errorCode: err.errorCode,
+ errorMessage: err.message,
+ durationMs: Date.now() - startedAt,
+ dashboardUrl: getDashboardUrl(err.runId),
+ };
+ }
+
+ return {
+ attempt,
+ scenario,
+ token,
+ outcome: 'other',
+ errorMessage: err instanceof Error ? err.message : String(err),
+ durationMs: Date.now() - startedAt,
+ };
+ }
+}
+
+async function runStepSleepRaceAttempt(
+ attempt: number,
+ bias: 'sleep' | 'step'
+): Promise {
+ const scenario: Scenario =
+ bias === 'step'
+ ? 'step-sleep-race-step-biased'
+ : 'step-sleep-race-sleep-biased';
+ const startedAt = Date.now();
+ const token = `event-log-race-${scenario}-${Date.now()}-${attempt}-${Math.random()
+ .toString(36)
+ .slice(2)}`;
+
+ try {
+ const workflow = await getWorkflowMetadata(
+ deploymentUrl,
+ WORKFLOW_FILE,
+ 'stepSleepRaceReproWorkflow'
+ );
+ const run = await start(scenario, 'stepSleepRaceReproWorkflow', workflow, [
+ {
+ postRaceSleepMs: config.stepRacePostSleepMs,
+ rounds: config.stepRaceRounds,
+ sleepMs:
+ bias === 'step'
+ ? config.stepRaceStepWinSleepMs
+ : config.stepRaceSleepWinSleepMs,
+ stepDelayMs:
+ bias === 'step'
+ ? config.stepRaceStepWinDelayMs
+ : config.stepRaceSleepWinDelayMs,
+ token,
+ },
+ ]);
+
+ await resumeGate(scenario, attempt, token, run.runId);
+
+ const runResult = await pollTerminalRun(run, startedAt, scenario);
+ if (runResult.outcome === 'completed') {
+ const returnValue = await withTimeout(
+ run.returnValue,
+ 30_000,
+ `Timed out reading return value for run ${run.runId}`
+ );
+ const validationError = validateStepRaceReturn(returnValue);
+ if (validationError) {
+ return {
+ ...runResult,
+ attempt,
+ scenario,
+ token,
+ outcome: 'other',
+ errorCode: 'BAD_STEP_RACE_RETURN',
+ errorMessage: validationError,
+ };
+ }
+ }
+
+ return {
+ ...runResult,
+ attempt,
+ scenario,
+ token,
+ };
+ } catch (err) {
+ if (WorkflowRunFailedError.is(err)) {
+ return {
+ attempt,
+ scenario,
+ token,
+ runId: err.runId,
+ outcome: classifyFailure(err.errorCode),
+ status: 'failed',
+ errorCode: err.errorCode,
+ errorMessage: err.message,
+ durationMs: Date.now() - startedAt,
+ dashboardUrl: getDashboardUrl(err.runId),
+ };
+ }
+
+ return {
+ attempt,
+ scenario,
+ token,
+ outcome: 'other',
+ errorMessage: err instanceof Error ? err.message : String(err),
+ durationMs: Date.now() - startedAt,
+ };
+ }
+}
+
+async function mapLimit(
+ items: T[],
+ limit: number,
+ fn: (item: T) => Promise
+) {
+ const results: R[] = new Array(items.length);
+ let index = 0;
+
+ async function worker() {
+ while (index < items.length) {
+ const currentIndex = index;
+ index += 1;
+ const item = items[currentIndex];
+ if (item === undefined) {
+ continue;
+ }
+ results[currentIndex] = await fn(item);
+ }
+ }
+
+ await Promise.all(
+ Array.from({ length: Math.min(limit, items.length) }, () => worker())
+ );
+ return results;
+}
+
+function summarize(results: ReproRunResult[]) {
+ return results.reduce>(
+ (acc, result) => {
+ acc[result.outcome] += 1;
+ return acc;
+ },
+ {
+ completed: 0,
+ CORRUPTED_EVENT_LOG: 0,
+ USER_ERROR: 0,
+ RUNTIME_ERROR: 0,
+ stuck: 0,
+ other: 0,
+ }
+ );
+}
+
+function summarizeByScenario(results: ReproRunResult[]) {
+ return results.reduce>>(
+ (acc, result) => {
+ acc[result.scenario][result.outcome] += 1;
+ return acc;
+ },
+ {
+ 'hook-sleep': {
+ completed: 0,
+ CORRUPTED_EVENT_LOG: 0,
+ USER_ERROR: 0,
+ RUNTIME_ERROR: 0,
+ stuck: 0,
+ other: 0,
+ },
+ 'step-fanout': {
+ completed: 0,
+ CORRUPTED_EVENT_LOG: 0,
+ USER_ERROR: 0,
+ RUNTIME_ERROR: 0,
+ stuck: 0,
+ other: 0,
+ },
+ 'step-sleep-race-step-biased': {
+ completed: 0,
+ CORRUPTED_EVENT_LOG: 0,
+ USER_ERROR: 0,
+ RUNTIME_ERROR: 0,
+ stuck: 0,
+ other: 0,
+ },
+ 'step-sleep-race-sleep-biased': {
+ completed: 0,
+ CORRUPTED_EVENT_LOG: 0,
+ USER_ERROR: 0,
+ RUNTIME_ERROR: 0,
+ stuck: 0,
+ other: 0,
+ },
+ }
+ );
+}
+
+function buildResultConfig(results: ReproRunResult[]) {
+ return {
+ ...config,
+ attempts: results.length,
+ };
+}
+
+function writeResults(results: ReproRunResult[]) {
+ fs.writeFileSync(
+ RESULT_PATH,
+ JSON.stringify(
+ {
+ completedAt: new Date().toISOString(),
+ deploymentUrl,
+ config: buildResultConfig(results),
+ distribution: summarize(results),
+ scenarioDistribution: summarizeByScenario(results),
+ results,
+ },
+ null,
+ 2
+ )
+ );
+}
+
+async function runScenario(
+ attempts: number,
+ concurrency: number,
+ run: (attempt: number) => Promise
+) {
+ if (attempts <= 0) {
+ return [];
+ }
+ const attemptNumbers = Array.from(
+ { length: attempts },
+ (_, index) => index + 1
+ );
+ return await mapLimit(attemptNumbers, concurrency, run);
+}
+
+const testTimeoutMs =
+ config.runTimeoutMs *
+ Math.ceil(config.hookSleepAttempts / config.concurrency) +
+ config.runTimeoutMs *
+ Math.ceil(config.stepFanoutAttempts / config.stepConcurrency) +
+ config.runTimeoutMs *
+ Math.ceil(
+ Math.ceil(config.stepSleepRaceAttempts / 2) / config.stepConcurrency
+ ) +
+ config.runTimeoutMs *
+ Math.ceil(
+ Math.floor(config.stepSleepRaceAttempts / 2) / config.stepConcurrency
+ ) +
+ 60_000;
+
+describe('event log race repro', () => {
+ beforeAll(() => {
+ setupWorld(deploymentUrl);
+ });
+
+ test(
+ 'event log races do not corrupt, stall, or take stale branches',
+ { timeout: testTimeoutMs },
+ async () => {
+ const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2);
+ const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2);
+ const results = [
+ ...(await runScenario(
+ config.hookSleepAttempts,
+ config.concurrency,
+ runHookSleepAttempt
+ )),
+ ...(await runScenario(
+ config.stepFanoutAttempts,
+ config.stepConcurrency,
+ runStepFanoutAttempt
+ )),
+ ...(await runScenario(
+ stepBiasedAttempts,
+ config.stepConcurrency,
+ (attempt) => runStepSleepRaceAttempt(attempt, 'step')
+ )),
+ ...(await runScenario(
+ sleepBiasedAttempts,
+ config.stepConcurrency,
+ (attempt) => runStepSleepRaceAttempt(attempt, 'sleep')
+ )),
+ ];
+ writeResults(results);
+
+ const nonCompleted = results.filter(
+ (result) => result.outcome !== 'completed'
+ );
+ expect(nonCompleted).toEqual([]);
+ }
+ );
+});
diff --git a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts
new file mode 100644
index 0000000000..13370a33d4
--- /dev/null
+++ b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts
@@ -0,0 +1,372 @@
+import { createHook, getWorkflowMetadata, sleep } from 'workflow';
+
+interface ReproInput {
+ token: string;
+ iterations?: number;
+ sleepMs?: number;
+ returnOnWake?: boolean;
+ finalDelayMs?: number;
+ drainDelayMs?: number;
+ sleepBranchWaitCount?: number;
+ sleepBranchWaitMs?: number;
+ sleepBranchWaitSpacingMs?: number;
+}
+
+interface WakePayload {
+ attempt: number;
+ sentAt: number;
+}
+
+interface GatePayload {
+ attempt: number;
+ scenario: string;
+ sentAt: number;
+}
+
+interface RaceBranchRecord {
+ branch: 'sleep' | 'wake';
+ iteration: number;
+ drained?: unknown;
+ event?: IteratorResult;
+}
+
+type RaceBranch =
+ | { kind: 'sleep' }
+ | { kind: 'hook'; event: IteratorResult };
+
+interface StepFanoutInput {
+ token: string;
+ rounds?: number;
+ width?: number;
+ stepDelayMs?: number;
+ stepDelayJitterMs?: number;
+ aggregateDelayMs?: number;
+ betweenRoundSleepMs?: number;
+}
+
+interface FanoutStepResult {
+ runId: string;
+ round: number;
+ index: number;
+ completedAt: number;
+}
+
+interface FanoutRoundRecord {
+ runId: string;
+ round: number;
+ count: number;
+ checksum: string;
+ completedAt: number;
+}
+
+interface StepSleepRaceInput {
+ token: string;
+ rounds?: number;
+ stepDelayMs?: number;
+ sleepMs?: number;
+ postRaceSleepMs?: number;
+}
+
+type StepSleepRaceBranch =
+ | { kind: 'sleep' }
+ | { kind: 'step'; value: StepRaceResult };
+
+interface StepRaceResult {
+ runId: string;
+ round: number;
+ completedAt: number;
+}
+
+async function syncStep(input: { runId: string; iteration: number }) {
+ 'use step';
+ return { ...input, syncedAt: Date.now() };
+}
+
+async function drainStep(input: {
+ delayMs: number;
+ runId: string;
+ iteration: number;
+}) {
+ 'use step';
+ if (input.delayMs > 0) {
+ await new Promise((resolve) => setTimeout(resolve, input.delayMs));
+ }
+ return { ...input, drainedAt: Date.now() };
+}
+
+async function finalStep(input: { delayMs: number; runId: string }) {
+ 'use step';
+ if (input.delayMs > 0) {
+ await new Promise((resolve) => setTimeout(resolve, input.delayMs));
+ }
+ return { ...input, finishedAt: Date.now() };
+}
+
+async function fanoutStep(input: {
+ delayMs: number;
+ runId: string;
+ round: number;
+ index: number;
+}): Promise {
+ 'use step';
+ if (input.delayMs > 0) {
+ await new Promise((resolve) => setTimeout(resolve, input.delayMs));
+ }
+ return {
+ runId: input.runId,
+ round: input.round,
+ index: input.index,
+ completedAt: Date.now(),
+ };
+}
+
+async function aggregateFanoutStep(input: {
+ delayMs: number;
+ runId: string;
+ round: number;
+ values: FanoutStepResult[];
+}): Promise {
+ 'use step';
+ if (input.delayMs > 0) {
+ await new Promise((resolve) => setTimeout(resolve, input.delayMs));
+ }
+ return {
+ runId: input.runId,
+ round: input.round,
+ count: input.values.length,
+ checksum: input.values
+ .map((value) => `${value.round}:${value.index}`)
+ .join(','),
+ completedAt: Date.now(),
+ };
+}
+
+async function racedStep(input: {
+ delayMs: number;
+ runId: string;
+ round: number;
+}): Promise {
+ 'use step';
+ if (input.delayMs > 0) {
+ await new Promise((resolve) => setTimeout(resolve, input.delayMs));
+ }
+ return {
+ runId: input.runId,
+ round: input.round,
+ completedAt: Date.now(),
+ };
+}
+
+async function stepBranchMarkerStep(input: { runId: string; round: number }) {
+ 'use step';
+ return { ...input, branch: 'step' as const, markedAt: Date.now() };
+}
+
+async function sleepBranchMarkerStep(input: { runId: string; round: number }) {
+ 'use step';
+ return { ...input, branch: 'sleep' as const, markedAt: Date.now() };
+}
+
+// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: Keep this close to the shared repro shape.
+export async function hookSleepReproWorkflow(input: ReproInput) {
+ 'use workflow';
+
+ const metadata = getWorkflowMetadata();
+ const hook = createHook({ token: input.token });
+ const iterator = hook[Symbol.asyncIterator]();
+
+ const iterations = input.iterations ?? 2;
+ const sleepMs = input.sleepMs ?? 5000;
+ const returnOnWake = input.returnOnWake ?? false;
+ const finalDelayMs = input.finalDelayMs ?? 0;
+ const sleepBranchWaitCount = input.sleepBranchWaitCount ?? 0;
+ const sleepBranchWaitMs = input.sleepBranchWaitMs ?? sleepMs;
+ const sleepBranchWaitSpacingMs = input.sleepBranchWaitSpacingMs ?? 0;
+
+ const branches: RaceBranchRecord[] = [];
+ let pendingHookRead: Promise> | undefined;
+
+ try {
+ for (let iteration = 0; iteration < iterations; iteration += 1) {
+ await syncStep({ runId: metadata.workflowRunId, iteration });
+
+ pendingHookRead ??= iterator.next();
+
+ const result = await Promise.race([
+ pendingHookRead.then((event) => ({ kind: 'hook' as const, event })),
+ sleep(sleepMs).then(() => ({ kind: 'sleep' as const })),
+ ]);
+
+ if (result.kind === 'sleep') {
+ branches.push({ branch: 'sleep', iteration });
+
+ if (sleepBranchWaitCount > 0) {
+ const waits = [];
+ for (let index = 0; index < sleepBranchWaitCount; index += 1) {
+ waits.push(
+ sleep(sleepBranchWaitMs + index * sleepBranchWaitSpacingMs)
+ );
+ }
+ await Promise.all(waits);
+ }
+
+ continue;
+ }
+
+ pendingHookRead = undefined;
+
+ const drained = await drainStep({
+ delayMs: input.drainDelayMs ?? 0,
+ runId: metadata.workflowRunId,
+ iteration,
+ });
+
+ branches.push({
+ branch: 'wake',
+ drained,
+ event: result.event,
+ iteration,
+ });
+
+ if (returnOnWake) {
+ if (finalDelayMs > 0) {
+ await finalStep({
+ delayMs: finalDelayMs,
+ runId: metadata.workflowRunId,
+ });
+ }
+
+ return { branches, runId: metadata.workflowRunId, sleepMs };
+ }
+ }
+
+ if (finalDelayMs > 0) {
+ await finalStep({ delayMs: finalDelayMs, runId: metadata.workflowRunId });
+ }
+
+ return { branches, runId: metadata.workflowRunId, sleepMs };
+ } finally {
+ hook.dispose();
+ }
+}
+
+export async function stepFanoutReplayReproWorkflow(input: StepFanoutInput) {
+ 'use workflow';
+
+ const metadata = getWorkflowMetadata();
+ const hook = createHook({ token: input.token });
+ const iterator = hook[Symbol.asyncIterator]();
+
+ const rounds = input.rounds ?? 4;
+ const width = input.width ?? 4;
+ const stepDelayMs = input.stepDelayMs ?? 0;
+ const stepDelayJitterMs = input.stepDelayJitterMs ?? 50;
+ const aggregateDelayMs = input.aggregateDelayMs ?? 0;
+ const betweenRoundSleepMs = input.betweenRoundSleepMs ?? 0;
+ const roundRecords: FanoutRoundRecord[] = [];
+
+ try {
+ await iterator.next();
+
+ for (let round = 0; round < rounds; round += 1) {
+ const values = await Promise.all(
+ Array.from({ length: width }, (_, index) =>
+ fanoutStep({
+ delayMs: stepDelayMs + ((round + index) % 2) * stepDelayJitterMs,
+ index,
+ round,
+ runId: metadata.workflowRunId,
+ })
+ )
+ );
+
+ roundRecords.push(
+ await aggregateFanoutStep({
+ delayMs: aggregateDelayMs,
+ round,
+ runId: metadata.workflowRunId,
+ values,
+ })
+ );
+
+ if (betweenRoundSleepMs > 0) {
+ await sleep(betweenRoundSleepMs);
+ }
+ }
+
+ return {
+ runId: metadata.workflowRunId,
+ roundRecords,
+ rounds,
+ width,
+ };
+ } finally {
+ hook.dispose();
+ }
+}
+
+export async function stepSleepRaceReproWorkflow(input: StepSleepRaceInput) {
+ 'use workflow';
+
+ const metadata = getWorkflowMetadata();
+ const hook = createHook({ token: input.token });
+ const iterator = hook[Symbol.asyncIterator]();
+
+ const rounds = input.rounds ?? 4;
+ const stepDelayMs = input.stepDelayMs ?? 0;
+ const sleepMs = input.sleepMs ?? 1000;
+ const postRaceSleepMs = input.postRaceSleepMs ?? 0;
+ const branches: Array<{
+ branch: 'sleep' | 'step';
+ marker: unknown;
+ round: number;
+ value?: StepRaceResult;
+ }> = [];
+
+ try {
+ await iterator.next();
+
+ for (let round = 0; round < rounds; round += 1) {
+ const result = await Promise.race([
+ racedStep({
+ delayMs: stepDelayMs,
+ round,
+ runId: metadata.workflowRunId,
+ }).then((value) => ({ kind: 'step' as const, value })),
+ sleep(sleepMs).then(() => ({ kind: 'sleep' as const })),
+ ]);
+
+ const marker =
+ result.kind === 'step'
+ ? await stepBranchMarkerStep({
+ round,
+ runId: metadata.workflowRunId,
+ })
+ : await sleepBranchMarkerStep({
+ round,
+ runId: metadata.workflowRunId,
+ });
+
+ branches.push({
+ branch: result.kind,
+ marker,
+ round,
+ value: result.kind === 'step' ? result.value : undefined,
+ });
+
+ if (postRaceSleepMs > 0) {
+ await sleep(postRaceSleepMs);
+ }
+ }
+
+ return {
+ branches,
+ runId: metadata.workflowRunId,
+ rounds,
+ sleepMs,
+ stepDelayMs,
+ };
+ } finally {
+ hook.dispose();
+ }
+}