From b654a4c225587065cba4829e7f65802d18f6843b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 15:25:31 +0200 Subject: [PATCH 01/10] test: add event log race repro ci --- .changeset/ready-pens-hug.md | 2 + .../render-event-log-race-repro-results.js | 146 ++++++ .github/workflows/event-log-race-repro.yml | 127 +++++ package.json | 1 + .../core/e2e/event-log-race-repro.test.ts | 447 ++++++++++++++++++ .../workflows/101_hook_sleep_repro.ts | 137 ++++++ 6 files changed, 860 insertions(+) create mode 100644 .changeset/ready-pens-hug.md create mode 100644 .github/scripts/render-event-log-race-repro-results.js create mode 100644 .github/workflows/event-log-race-repro.yml create mode 100644 packages/core/e2e/event-log-race-repro.test.ts create mode 100644 workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts diff --git a/.changeset/ready-pens-hug.md b/.changeset/ready-pens-hug.md new file mode 100644 index 0000000000..a845151cc8 --- /dev/null +++ b/.changeset/ready-pens-hug.md @@ -0,0 +1,2 @@ +--- +--- diff --git a/.github/scripts/render-event-log-race-repro-results.js b/.github/scripts/render-event-log-race-repro-results.js new file mode 100644 index 0000000000..366be40ed9 --- /dev/null +++ b/.github/scripts/render-event-log-race-repro-results.js @@ -0,0 +1,146 @@ +#!/usr/bin/env node + +const fs = require('node:fs'); + +const args = process.argv.slice(2); +let resultsPath = 'event-log-race-repro-results.json'; +let runUrl = ''; +let check = false; + +for (let index = 0; index < args.length; index += 1) { + const arg = args[index]; + if (arg === '--run-url' && args[index + 1]) { + runUrl = args[index + 1]; + index += 1; + } else if (arg === '--check') { + check = true; + } else if (!arg.startsWith('--')) { + resultsPath = arg; + } +} + +const orderedOutcomes = [ + 'completed', + 'CORRUPTED_EVENT_LOG', + 'USER_ERROR', + 'RUNTIME_ERROR', + 'stuck', + 'other', +]; + +function emptyDistribution() { + return Object.fromEntries(orderedOutcomes.map((outcome) => [outcome, 0])); +} + +function loadResults() { + if (!fs.existsSync(resultsPath)) { + return null; + } + return JSON.parse(fs.readFileSync(resultsPath, 'utf8')); +} + +function summarize(results) { + const distribution = emptyDistribution(); + for (const result of results) { + distribution[result.outcome] = (distribution[result.outcome] ?? 0) + 1; + } + return distribution; +} + +function nonCompletedCount(distribution) { + return orderedOutcomes + .filter((outcome) => outcome !== 'completed') + .reduce((sum, outcome) => sum + (distribution[outcome] ?? 0), 0); +} + +function renderMissing() { + console.log(''); + console.log('## Event Log Race Repro\n'); + console.log('No result file was produced by the repro job.\n'); + if (runUrl) { + console.log(`Workflow run: ${runUrl}`); + } +} + +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: This is a small markdown renderer for one CI comment. +function render(resultsFile) { + const results = resultsFile.results ?? []; + const distribution = resultsFile.distribution ?? summarize(results); + const failedCount = nonCompletedCount(distribution); + const total = orderedOutcomes.reduce( + (sum, outcome) => sum + (distribution[outcome] ?? 0), + 0 + ); + + console.log(''); + console.log('## Event Log Race Repro\n'); + console.log( + failedCount === 0 + ? 'All repro runs completed.' + : `${failedCount} of ${total} repro runs did not complete cleanly.` + ); + console.log(''); + + console.log('| Outcome | Count |'); + console.log('|:--|--:|'); + for (const outcome of orderedOutcomes) { + console.log(`| ${outcome} | ${distribution[outcome] ?? 0} |`); + } + console.log(''); + + const config = resultsFile.config ?? {}; + console.log('### Configuration\n'); + console.log('| Setting | Value |'); + console.log('|:--|:--|'); + console.log(`| Attempts | ${config.attempts ?? ''} |`); + console.log(`| Concurrency | ${config.concurrency ?? ''} |`); + console.log(`| Sleep | ${config.sleepMs ?? ''}ms |`); + console.log(`| Resume delay | ${config.resumeDelayMs ?? ''}ms |`); + console.log(`| Resume jitter | ${config.resumeJitterMs ?? ''}ms |`); + console.log(`| Run timeout | ${config.runTimeoutMs ?? ''}ms |`); + if (resultsFile.deploymentUrl) { + console.log(`| Deployment | ${resultsFile.deploymentUrl} |`); + } + console.log(''); + + const failing = results.filter((result) => result.outcome !== 'completed'); + if (failing.length > 0) { + console.log('### Non-Completed Runs\n'); + console.log('| Attempt | Outcome | Status | Error code | Run |'); + console.log('|--:|:--|:--|:--|:--|'); + for (const result of failing.slice(0, 20)) { + const run = + result.dashboardUrl && result.runId + ? `[${result.runId}](${result.dashboardUrl})` + : (result.runId ?? ''); + console.log( + `| ${result.attempt} | ${result.outcome} | ${result.status ?? ''} | ${result.errorCode ?? ''} | ${run} |` + ); + } + if (failing.length > 20) { + console.log(`\nShowing 20 of ${failing.length} non-completed runs.`); + } + console.log(''); + } + + if (runUrl) { + console.log(`Workflow run: ${runUrl}`); + } +} + +const resultsFile = loadResults(); + +if (!resultsFile) { + if (!check) { + renderMissing(); + } + process.exit(check ? 1 : 0); +} + +if (!check) { + render(resultsFile); +} + +const distribution = + resultsFile.distribution ?? summarize(resultsFile.results ?? []); +process.exit(nonCompletedCount(distribution) > 0 ? 1 : 0); diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml new file mode 100644 index 0000000000..ad7d1a2ee4 --- /dev/null +++ b/.github/workflows/event-log-race-repro.yml @@ -0,0 +1,127 @@ +name: Event Log Race Repro + +on: + pull_request: + branches: [main, stable] + types: [opened, reopened, synchronize, labeled] + workflow_dispatch: + inputs: + attempts: + description: 'Number of workflow runs to start' + required: false + default: '60' + concurrency: + description: 'Number of concurrent workflow runs' + required: false + default: '12' + run_timeout_ms: + description: 'Per-run timeout before classifying as stuck' + required: false + default: '150000' + sleep_ms: + description: 'Workflow sleep duration in the hook/sleep race' + required: false + default: '5000' + resume_delay_ms: + description: 'Base delay before resuming the hook' + required: false + default: '5000' + resume_jitter_ms: + description: 'Additional random hook resume delay' + required: false + default: '5000' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/main' && github.ref != 'refs/heads/stable' }} + +jobs: + event-log-race-repro: + name: Event Log Race Repro + runs-on: ubuntu-latest + timeout-minutes: 90 + if: ${{ github.event_name == 'workflow_dispatch' || contains(github.event.pull_request.labels.*.name, 'event-log-race-repro') }} + permissions: + contents: read + deployments: read + statuses: read + id-token: write + issues: write + pull-requests: write + env: + TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} + TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' + WORKFLOW_NEXT_LAZY_DISCOVERY: '0' + + steps: + - name: Checkout Repo + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} + + - name: Setup environment + uses: ./.github/actions/setup-workflow-dev + with: + build-packages: 'false' + + - name: Wait for Vercel deployment + id: waitForDeployment + uses: vercel/wait-for-deployment-action@0e2b0c5c5cce31f1648108aeec56467187aca037 + with: + project-slug: example-nextjs-workflow-turbopack + timeout: 1000 + check-interval: 15 + environment: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }} + + - name: Run event log race repro + id: repro + continue-on-error: true + run: pnpm run test:e2e:event-log-race-repro --reporter=default + env: + NODE_OPTIONS: "--enable-source-maps" + DEPLOYMENT_URL: ${{ steps.waitForDeployment.outputs.deployment-url }} + VERCEL_DEPLOYMENT_ID: ${{ steps.waitForDeployment.outputs.deployment-id }} + APP_NAME: nextjs-turbopack + WORKFLOW_VERCEL_ENV: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }} + WORKFLOW_VERCEL_AUTH_TOKEN: ${{ secrets.VERCEL_LABS_TOKEN }} + WORKFLOW_VERCEL_TEAM: "team_nO2mCG4W8IxPIeKoSsqwAxxB" + WORKFLOW_VERCEL_PROJECT: "prj_yjkM7UdHliv8bfxZ1sMJQf1pMpdi" + WORKFLOW_VERCEL_PROJECT_SLUG: example-nextjs-workflow-turbopack + VERCEL_WORKFLOW_SERVER_URL: ${{ github.ref != 'refs/heads/main' && secrets.VERCEL_WORKFLOW_SERVER_URL || '' }} + EVENT_LOG_RACE_REPRO_ATTEMPTS: ${{ inputs.attempts || '60' }} + EVENT_LOG_RACE_REPRO_CONCURRENCY: ${{ inputs.concurrency || '12' }} + EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS: ${{ inputs.run_timeout_ms || '150000' }} + EVENT_LOG_RACE_REPRO_SLEEP_MS: ${{ inputs.sleep_ms || '5000' }} + EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS: ${{ inputs.resume_delay_ms || '5000' }} + EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS: ${{ inputs.resume_jitter_ms || '5000' }} + + - name: Render repro summary + if: always() + run: | + node .github/scripts/render-event-log-race-repro-results.js \ + event-log-race-repro-results.json \ + --run-url "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" \ + | tee event-log-race-repro-summary.md >> "$GITHUB_STEP_SUMMARY" + + - name: Update PR comment + if: always() && github.event_name == 'pull_request' + uses: marocchino/sticky-pull-request-comment@773744901bac0e8cbb5a0dc842800d45e9b2b405 # v2.9.4 + with: + header: event-log-race-repro-results + path: event-log-race-repro-summary.md + + - name: Upload repro results + if: always() + uses: actions/upload-artifact@v4 + with: + name: event-log-race-repro-results + path: | + event-log-race-repro-results.json + event-log-race-repro-summary.md + retention-days: 7 + if-no-files-found: ignore + + - name: Fail on corrupted, failed, or stuck runs + if: always() + run: node .github/scripts/render-event-log-race-repro-results.js event-log-race-repro-results.json --check diff --git a/package.json b/package.json index 7cf5becf35..4440b0b8e7 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "clean": "turbo clean", "typecheck": "turbo typecheck", "test:e2e": "vitest run packages/core/e2e/e2e.test.ts packages/core/e2e/e2e-agent.test.ts", + "test:e2e:event-log-race-repro": "vitest run packages/core/e2e/event-log-race-repro.test.ts", "test:e2e:nextjs-webpack:staged": "node scripts/test-staged-nextjs-webpack.mjs", "test:docs": "pnpm --filter @workflow/docs-typecheck test:docs", "bench": "vitest bench packages/core/e2e/bench.bench.ts", diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts new file mode 100644 index 0000000000..542a4d8c10 --- /dev/null +++ b/packages/core/e2e/event-log-race-repro.test.ts @@ -0,0 +1,447 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { WorkflowRunFailedError } from '@workflow/errors'; +import { beforeAll, describe, expect, test } from 'vitest'; +import type { Run } from '../src/runtime'; +import { + getHookByToken, + getWorld, + start as rawStart, + resumeHook, +} from '../src/runtime'; +import { getWorkflowMetadata, setupWorld, trackRun } from './utils'; + +const deploymentUrl = process.env.DEPLOYMENT_URL; +if (!deploymentUrl) { + throw new Error('`DEPLOYMENT_URL` environment variable is not set'); +} + +const RESULT_PATH = path.resolve( + process.cwd(), + 'event-log-race-repro-results.json' +); + +type Outcome = + | 'completed' + | 'CORRUPTED_EVENT_LOG' + | 'USER_ERROR' + | 'RUNTIME_ERROR' + | 'stuck' + | 'other'; + +interface ReproConfig { + attempts: number; + concurrency: number; + iterations: number; + sleepMs: number; + resumeDelayMs: number; + resumeJitterMs: number; + runTimeoutMs: number; + hookTimeoutMs: number; + sleepBranchWaitCount: number; + sleepBranchWaitMs: number; + sleepBranchWaitSpacingMs: number; + returnOnWake: boolean; + drainDelayMs: number; + finalDelayMs: number; +} + +interface ReproRunResult { + attempt: number; + token: string; + runId?: string; + outcome: Outcome; + status?: string; + errorCode?: string; + errorMessage?: string; + durationMs: number; + dashboardUrl?: string; +} + +function envNumber(name: string, fallback: number) { + const raw = process.env[name]; + if (!raw) return fallback; + const parsed = Number(raw); + return Number.isFinite(parsed) ? parsed : fallback; +} + +function envBoolean(name: string, fallback: boolean) { + const raw = process.env[name]; + if (!raw) return fallback; + if (raw === '1' || raw === 'true') return true; + if (raw === '0' || raw === 'false') return false; + return fallback; +} + +const config: ReproConfig = { + attempts: envNumber('EVENT_LOG_RACE_REPRO_ATTEMPTS', 60), + concurrency: envNumber('EVENT_LOG_RACE_REPRO_CONCURRENCY', 12), + iterations: envNumber('EVENT_LOG_RACE_REPRO_ITERATIONS', 3), + sleepMs: envNumber('EVENT_LOG_RACE_REPRO_SLEEP_MS', 5000), + resumeDelayMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS', 5000), + resumeJitterMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS', 5000), + runTimeoutMs: envNumber('EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS', 150_000), + hookTimeoutMs: envNumber('EVENT_LOG_RACE_REPRO_HOOK_TIMEOUT_MS', 60_000), + sleepBranchWaitCount: envNumber( + 'EVENT_LOG_RACE_REPRO_SLEEP_BRANCH_WAIT_COUNT', + 2 + ), + sleepBranchWaitMs: envNumber( + 'EVENT_LOG_RACE_REPRO_SLEEP_BRANCH_WAIT_MS', + 1000 + ), + sleepBranchWaitSpacingMs: envNumber( + 'EVENT_LOG_RACE_REPRO_SLEEP_BRANCH_WAIT_SPACING_MS', + 250 + ), + returnOnWake: envBoolean('EVENT_LOG_RACE_REPRO_RETURN_ON_WAKE', true), + drainDelayMs: envNumber('EVENT_LOG_RACE_REPRO_DRAIN_DELAY_MS', 0), + finalDelayMs: envNumber('EVENT_LOG_RACE_REPRO_FINAL_DELAY_MS', 0), +}; + +async function start( + ...args: Parameters> +): Promise> { + const run = await rawStart(...args); + trackRun(run, { + testName: 'event-log-race-repro', + workflowFile: 'workflows/101_hook_sleep_repro.ts', + workflowFn: 'hookSleepReproWorkflow', + }); + return run; +} + +async function waitForHook(token: string, runId: string) { + const deadline = Date.now() + config.hookTimeoutMs; + let lastError: unknown; + while (Date.now() < deadline) { + try { + const hook = await getHookByToken(token); + if (hook.runId === runId) { + return hook; + } + lastError = new Error( + `Hook ${token} belonged to ${hook.runId}, expected ${runId}` + ); + } catch (err) { + lastError = err; + } + await sleep(250); + } + throw lastError instanceof Error + ? lastError + : new Error(`Timed out waiting for hook ${token}`); +} + +function getDashboardUrl(runId: string): string | undefined { + const projectSlug = process.env.WORKFLOW_VERCEL_PROJECT_SLUG; + const env = process.env.WORKFLOW_VERCEL_ENV; + if (!projectSlug || !env) return undefined; + + const environment = env === 'production' ? 'production' : 'preview'; + return `https://vercel.com/vercel-labs/${projectSlug}/observability/workflows/runs/${runId}?environment=${environment}`; +} + +function classifyFailure(errorCode: string | undefined): Outcome { + if ( + errorCode === 'CORRUPTED_EVENT_LOG' || + errorCode === 'USER_ERROR' || + errorCode === 'RUNTIME_ERROR' + ) { + return errorCode; + } + return 'other'; +} + +function hasWakeBranch(value: unknown) { + if (!value || typeof value !== 'object' || !('branches' in value)) { + return false; + } + const branches = (value as { branches?: unknown }).branches; + return ( + Array.isArray(branches) && + branches.some( + (branch) => + branch && + typeof branch === 'object' && + 'branch' in branch && + branch.branch === 'wake' + ) + ); +} + +async function pollTerminalRun( + run: Run, + startedAt: number +): Promise { + const world = await getWorld(); + const deadline = startedAt + config.runTimeoutMs; + let lastStatus: string | undefined; + + while (Date.now() < deadline) { + const runData = await world.runs.get(run.runId); + lastStatus = runData.status; + + if (runData.status === 'completed') { + return { + attempt: -1, + token: '', + runId: run.runId, + outcome: 'completed', + status: runData.status, + durationMs: Date.now() - startedAt, + dashboardUrl: getDashboardUrl(run.runId), + }; + } + + if (runData.status === 'failed') { + return { + attempt: -1, + token: '', + runId: run.runId, + outcome: classifyFailure(runData.errorCode), + status: runData.status, + errorCode: runData.errorCode, + durationMs: Date.now() - startedAt, + dashboardUrl: getDashboardUrl(run.runId), + }; + } + + if (runData.status === 'cancelled') { + return { + attempt: -1, + token: '', + runId: run.runId, + outcome: 'other', + status: runData.status, + errorCode: 'CANCELLED', + durationMs: Date.now() - startedAt, + dashboardUrl: getDashboardUrl(run.runId), + }; + } + + await sleep(1000); + } + + return { + attempt: -1, + token: '', + runId: run.runId, + outcome: 'stuck', + status: lastStatus, + durationMs: Date.now() - startedAt, + dashboardUrl: getDashboardUrl(run.runId), + }; +} + +async function withTimeout( + promise: Promise, + timeoutMs: number, + message: string +) { + const timeout = sleep(timeoutMs).then(() => { + throw new Error(message); + }); + return await Promise.race([promise, timeout]); +} + +async function runAttempt(attempt: number): Promise { + const startedAt = Date.now(); + const token = `event-log-race-${Date.now()}-${attempt}-${Math.random() + .toString(36) + .slice(2)}`; + + try { + const workflow = await getWorkflowMetadata( + deploymentUrl, + 'workflows/101_hook_sleep_repro.ts', + 'hookSleepReproWorkflow' + ); + const run = await start(workflow, [ + { + token, + iterations: config.iterations, + sleepMs: config.sleepMs, + returnOnWake: config.returnOnWake, + drainDelayMs: config.drainDelayMs, + finalDelayMs: config.finalDelayMs, + sleepBranchWaitCount: config.sleepBranchWaitCount, + sleepBranchWaitMs: config.sleepBranchWaitMs, + sleepBranchWaitSpacingMs: config.sleepBranchWaitSpacingMs, + }, + ]); + + const hook = await waitForHook(token, run.runId); + const jitter = + config.resumeJitterMs > 0 + ? Math.floor(Math.random() * config.resumeJitterMs) + : 0; + const resumeDelayMs = config.resumeDelayMs + jitter; + const resumePromise = sleep(resumeDelayMs).then(() => + resumeHook(hook, { attempt, sentAt: Date.now() }) + ); + + const runResult = await pollTerminalRun(run, startedAt); + const resumeResult = await Promise.allSettled([ + withTimeout( + resumePromise, + 30_000, + `Timed out resuming hook ${token} for run ${run.runId}` + ), + ]); + + const resumeFailure = resumeResult.find( + (result) => result.status === 'rejected' + ); + if (runResult.outcome === 'completed') { + if (resumeFailure?.status === 'rejected') { + return { + ...runResult, + attempt, + token, + outcome: 'other', + errorCode: 'HOOK_RESUME_FAILED', + errorMessage: String(resumeFailure.reason), + }; + } + + const returnValue = await withTimeout( + run.returnValue, + 30_000, + `Timed out reading return value for run ${run.runId}` + ); + if (!hasWakeBranch(returnValue)) { + return { + ...runResult, + attempt, + token, + outcome: 'other', + errorCode: 'NO_WAKE_BRANCH', + errorMessage: 'Run completed without taking the hook wake branch.', + }; + } + } + + return { + ...runResult, + attempt, + token, + errorMessage: + runResult.errorMessage ?? + (resumeFailure?.status === 'rejected' + ? String(resumeFailure.reason) + : undefined), + }; + } catch (err) { + if (WorkflowRunFailedError.is(err)) { + return { + attempt, + token, + runId: err.runId, + outcome: classifyFailure(err.errorCode), + status: 'failed', + errorCode: err.errorCode, + errorMessage: err.message, + durationMs: Date.now() - startedAt, + dashboardUrl: getDashboardUrl(err.runId), + }; + } + + return { + attempt, + token, + outcome: 'other', + errorMessage: err instanceof Error ? err.message : String(err), + durationMs: Date.now() - startedAt, + }; + } +} + +async function mapLimit( + items: T[], + limit: number, + fn: (item: T) => Promise +) { + const results: R[] = new Array(items.length); + let index = 0; + + async function worker() { + while (index < items.length) { + const currentIndex = index; + index += 1; + const item = items[currentIndex]; + if (item === undefined) { + continue; + } + results[currentIndex] = await fn(item); + } + } + + await Promise.all( + Array.from({ length: Math.min(limit, items.length) }, () => worker()) + ); + return results; +} + +function summarize(results: ReproRunResult[]) { + return results.reduce>( + (acc, result) => { + acc[result.outcome] += 1; + return acc; + }, + { + completed: 0, + CORRUPTED_EVENT_LOG: 0, + USER_ERROR: 0, + RUNTIME_ERROR: 0, + stuck: 0, + other: 0, + } + ); +} + +function writeResults(results: ReproRunResult[]) { + fs.writeFileSync( + RESULT_PATH, + JSON.stringify( + { + completedAt: new Date().toISOString(), + deploymentUrl, + config, + distribution: summarize(results), + results, + }, + null, + 2 + ) + ); +} + +const testTimeoutMs = + config.runTimeoutMs * Math.ceil(config.attempts / config.concurrency) + + 60_000; + +describe('event log race repro', () => { + beforeAll(() => { + setupWorld(deploymentUrl); + }); + + test( + 'hook/sleep race does not corrupt or stall runs', + { timeout: testTimeoutMs }, + async () => { + const attempts = Array.from( + { length: config.attempts }, + (_, index) => index + 1 + ); + const results = await mapLimit(attempts, config.concurrency, runAttempt); + writeResults(results); + + const nonCompleted = results.filter( + (result) => result.outcome !== 'completed' + ); + expect(nonCompleted).toEqual([]); + } + ); +}); diff --git a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts new file mode 100644 index 0000000000..c118b26049 --- /dev/null +++ b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts @@ -0,0 +1,137 @@ +import { createHook, getWorkflowMetadata, sleep } from 'workflow'; + +interface ReproInput { + token: string; + iterations?: number; + sleepMs?: number; + returnOnWake?: boolean; + finalDelayMs?: number; + drainDelayMs?: number; + sleepBranchWaitCount?: number; + sleepBranchWaitMs?: number; + sleepBranchWaitSpacingMs?: number; +} + +interface WakePayload { + attempt: number; + sentAt: number; +} + +interface RaceBranchRecord { + branch: 'sleep' | 'wake'; + iteration: number; + drained?: unknown; + event?: IteratorResult; +} + +type RaceBranch = + | { kind: 'sleep' } + | { kind: 'hook'; event: IteratorResult }; + +async function syncStep(input: { runId: string; iteration: number }) { + 'use step'; + return { ...input, syncedAt: Date.now() }; +} + +async function drainStep(input: { + delayMs: number; + runId: string; + iteration: number; +}) { + 'use step'; + if (input.delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, input.delayMs)); + } + return { ...input, drainedAt: Date.now() }; +} + +async function finalStep(input: { delayMs: number; runId: string }) { + 'use step'; + if (input.delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, input.delayMs)); + } + return { ...input, finishedAt: Date.now() }; +} + +// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: Keep this close to the shared repro shape. +export async function hookSleepReproWorkflow(input: ReproInput) { + 'use workflow'; + + const metadata = getWorkflowMetadata(); + const hook = createHook({ token: input.token }); + const iterator = hook[Symbol.asyncIterator](); + + const iterations = input.iterations ?? 2; + const sleepMs = input.sleepMs ?? 5000; + const returnOnWake = input.returnOnWake ?? false; + const finalDelayMs = input.finalDelayMs ?? 0; + const sleepBranchWaitCount = input.sleepBranchWaitCount ?? 0; + const sleepBranchWaitMs = input.sleepBranchWaitMs ?? sleepMs; + const sleepBranchWaitSpacingMs = input.sleepBranchWaitSpacingMs ?? 0; + + const branches: RaceBranchRecord[] = []; + let pendingHookRead: Promise> | undefined; + + try { + for (let iteration = 0; iteration < iterations; iteration += 1) { + await syncStep({ runId: metadata.workflowRunId, iteration }); + + pendingHookRead ??= iterator.next(); + + const result = await Promise.race([ + pendingHookRead.then((event) => ({ kind: 'hook' as const, event })), + sleep(sleepMs).then(() => ({ kind: 'sleep' as const })), + ]); + + if (result.kind === 'sleep') { + branches.push({ branch: 'sleep', iteration }); + + if (sleepBranchWaitCount > 0) { + const waits = []; + for (let index = 0; index < sleepBranchWaitCount; index += 1) { + waits.push( + sleep(sleepBranchWaitMs + index * sleepBranchWaitSpacingMs) + ); + } + await Promise.all(waits); + } + + continue; + } + + pendingHookRead = undefined; + + const drained = await drainStep({ + delayMs: input.drainDelayMs ?? 0, + runId: metadata.workflowRunId, + iteration, + }); + + branches.push({ + branch: 'wake', + drained, + event: result.event, + iteration, + }); + + if (returnOnWake) { + if (finalDelayMs > 0) { + await finalStep({ + delayMs: finalDelayMs, + runId: metadata.workflowRunId, + }); + } + + return { branches, runId: metadata.workflowRunId, sleepMs }; + } + } + + if (finalDelayMs > 0) { + await finalStep({ delayMs: finalDelayMs, runId: metadata.workflowRunId }); + } + + return { branches, runId: metadata.workflowRunId, sleepMs }; + } finally { + hook.dispose(); + } +} From fce70e9f41c1b8ca6b0259fc2404f249f11ece8d Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 15:30:58 +0200 Subject: [PATCH 02/10] ci: build cli before event log race repro --- .github/workflows/event-log-race-repro.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml index ad7d1a2ee4..5c10f2d2be 100644 --- a/.github/workflows/event-log-race-repro.yml +++ b/.github/workflows/event-log-race-repro.yml @@ -65,6 +65,9 @@ jobs: with: build-packages: 'false' + - name: Build CLI + run: pnpm turbo run build --filter='@workflow/cli' + - name: Wait for Vercel deployment id: waitForDeployment uses: vercel/wait-for-deployment-action@0e2b0c5c5cce31f1648108aeec56467187aca037 From 0a740f1db8cf0b6143dd8a2774f206b3cfc585b1 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 15:37:47 +0200 Subject: [PATCH 03/10] test: target stale hook before step repro --- .../core/e2e/event-log-race-repro.test.ts | 26 +++++++++++---- .../workflows/101_hook_sleep_repro.ts | 32 +++++++++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts index 542a4d8c10..12b0bcc0ab 100644 --- a/packages/core/e2e/event-log-race-repro.test.ts +++ b/packages/core/e2e/event-log-race-repro.test.ts @@ -21,6 +21,8 @@ const RESULT_PATH = path.resolve( process.cwd(), 'event-log-race-repro-results.json' ); +const WORKFLOW_FILE = 'workflows/101_hook_sleep_repro.ts'; +const WORKFLOW_FN = 'hookSleepStepReproWorkflow'; type Outcome = | 'completed' @@ -106,8 +108,8 @@ async function start( const run = await rawStart(...args); trackRun(run, { testName: 'event-log-race-repro', - workflowFile: 'workflows/101_hook_sleep_repro.ts', - workflowFn: 'hookSleepReproWorkflow', + workflowFile: WORKFLOW_FILE, + workflowFn: WORKFLOW_FN, }); return run; } @@ -171,6 +173,16 @@ function hasWakeBranch(value: unknown) { ); } +function hasHookEvent(value: unknown) { + return ( + !!value && + typeof value === 'object' && + 'event' in value && + !!value.event && + typeof value.event === 'object' + ); +} + async function pollTerminalRun( run: Run, startedAt: number @@ -255,8 +267,8 @@ async function runAttempt(attempt: number): Promise { try { const workflow = await getWorkflowMetadata( deploymentUrl, - 'workflows/101_hook_sleep_repro.ts', - 'hookSleepReproWorkflow' + WORKFLOW_FILE, + WORKFLOW_FN ); const run = await start(workflow, [ { @@ -311,14 +323,14 @@ async function runAttempt(attempt: number): Promise { 30_000, `Timed out reading return value for run ${run.runId}` ); - if (!hasWakeBranch(returnValue)) { + if (!hasWakeBranch(returnValue) && !hasHookEvent(returnValue)) { return { ...runResult, attempt, token, outcome: 'other', - errorCode: 'NO_WAKE_BRANCH', - errorMessage: 'Run completed without taking the hook wake branch.', + errorCode: 'NO_HOOK_EVENT', + errorMessage: 'Run completed without consuming the hook event.', }; } } diff --git a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts index c118b26049..fcb3e79aee 100644 --- a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts +++ b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts @@ -135,3 +135,35 @@ export async function hookSleepReproWorkflow(input: ReproInput) { hook.dispose(); } } + +export async function hookSleepStepReproWorkflow(input: ReproInput) { + 'use workflow'; + + const metadata = getWorkflowMetadata(); + const hook = createHook({ token: input.token }); + const iterator = hook[Symbol.asyncIterator](); + const sleepMs = input.sleepMs ?? 5000; + const finalDelayMs = input.finalDelayMs ?? 0; + + try { + await sleep(sleepMs); + + const synced = await syncStep({ + runId: metadata.workflowRunId, + iteration: 0, + }); + + const event = await iterator.next(); + + if (finalDelayMs > 0) { + await finalStep({ + delayMs: finalDelayMs, + runId: metadata.workflowRunId, + }); + } + + return { event, runId: metadata.workflowRunId, sleepMs, synced }; + } finally { + hook.dispose(); + } +} From 8dc1d3da976f5abd3a025c563e5b9a8f1f861d18 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 15:43:04 +0200 Subject: [PATCH 04/10] ci: run event log repro against public backend --- .github/workflows/event-log-race-repro.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml index 5c10f2d2be..964e82c3b0 100644 --- a/.github/workflows/event-log-race-repro.yml +++ b/.github/workflows/event-log-race-repro.yml @@ -91,7 +91,9 @@ jobs: WORKFLOW_VERCEL_TEAM: "team_nO2mCG4W8IxPIeKoSsqwAxxB" WORKFLOW_VERCEL_PROJECT: "prj_yjkM7UdHliv8bfxZ1sMJQf1pMpdi" WORKFLOW_VERCEL_PROJECT_SLUG: example-nextjs-workflow-turbopack - VERCEL_WORKFLOW_SERVER_URL: ${{ github.ref != 'refs/heads/main' && secrets.VERCEL_WORKFLOW_SERVER_URL || '' }} + # This repro should validate the public workflow-server behavior for + # the SDK revision under test, not a protected server preview. + VERCEL_WORKFLOW_SERVER_URL: '' EVENT_LOG_RACE_REPRO_ATTEMPTS: ${{ inputs.attempts || '60' }} EVENT_LOG_RACE_REPRO_CONCURRENCY: ${{ inputs.concurrency || '12' }} EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS: ${{ inputs.run_timeout_ms || '150000' }} From e7972e994b67d138d5d22e2a11bd2e5c27eaac3c Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 15:45:36 +0200 Subject: [PATCH 05/10] ci: keep workflow server override for repro --- .github/workflows/event-log-race-repro.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml index 964e82c3b0..5c10f2d2be 100644 --- a/.github/workflows/event-log-race-repro.yml +++ b/.github/workflows/event-log-race-repro.yml @@ -91,9 +91,7 @@ jobs: WORKFLOW_VERCEL_TEAM: "team_nO2mCG4W8IxPIeKoSsqwAxxB" WORKFLOW_VERCEL_PROJECT: "prj_yjkM7UdHliv8bfxZ1sMJQf1pMpdi" WORKFLOW_VERCEL_PROJECT_SLUG: example-nextjs-workflow-turbopack - # This repro should validate the public workflow-server behavior for - # the SDK revision under test, not a protected server preview. - VERCEL_WORKFLOW_SERVER_URL: '' + VERCEL_WORKFLOW_SERVER_URL: ${{ github.ref != 'refs/heads/main' && secrets.VERCEL_WORKFLOW_SERVER_URL || '' }} EVENT_LOG_RACE_REPRO_ATTEMPTS: ${{ inputs.attempts || '60' }} EVENT_LOG_RACE_REPRO_CONCURRENCY: ${{ inputs.concurrency || '12' }} EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS: ${{ inputs.run_timeout_ms || '150000' }} From c74c315671f5318d22a02a835643c5cdfc3b4ea6 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 15:46:34 +0200 Subject: [PATCH 06/10] test: increase event log repro sample size --- .github/workflows/event-log-race-repro.yml | 8 ++++---- packages/core/e2e/event-log-race-repro.test.ts | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml index 5c10f2d2be..0f6c9b35be 100644 --- a/.github/workflows/event-log-race-repro.yml +++ b/.github/workflows/event-log-race-repro.yml @@ -9,11 +9,11 @@ on: attempts: description: 'Number of workflow runs to start' required: false - default: '60' + default: '1500' concurrency: description: 'Number of concurrent workflow runs' required: false - default: '12' + default: '50' run_timeout_ms: description: 'Per-run timeout before classifying as stuck' required: false @@ -92,8 +92,8 @@ jobs: WORKFLOW_VERCEL_PROJECT: "prj_yjkM7UdHliv8bfxZ1sMJQf1pMpdi" WORKFLOW_VERCEL_PROJECT_SLUG: example-nextjs-workflow-turbopack VERCEL_WORKFLOW_SERVER_URL: ${{ github.ref != 'refs/heads/main' && secrets.VERCEL_WORKFLOW_SERVER_URL || '' }} - EVENT_LOG_RACE_REPRO_ATTEMPTS: ${{ inputs.attempts || '60' }} - EVENT_LOG_RACE_REPRO_CONCURRENCY: ${{ inputs.concurrency || '12' }} + EVENT_LOG_RACE_REPRO_ATTEMPTS: ${{ inputs.attempts || '1500' }} + EVENT_LOG_RACE_REPRO_CONCURRENCY: ${{ inputs.concurrency || '50' }} EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS: ${{ inputs.run_timeout_ms || '150000' }} EVENT_LOG_RACE_REPRO_SLEEP_MS: ${{ inputs.sleep_ms || '5000' }} EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS: ${{ inputs.resume_delay_ms || '5000' }} diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts index 12b0bcc0ab..70e6484433 100644 --- a/packages/core/e2e/event-log-race-repro.test.ts +++ b/packages/core/e2e/event-log-race-repro.test.ts @@ -77,8 +77,8 @@ function envBoolean(name: string, fallback: boolean) { } const config: ReproConfig = { - attempts: envNumber('EVENT_LOG_RACE_REPRO_ATTEMPTS', 60), - concurrency: envNumber('EVENT_LOG_RACE_REPRO_CONCURRENCY', 12), + attempts: envNumber('EVENT_LOG_RACE_REPRO_ATTEMPTS', 1500), + concurrency: envNumber('EVENT_LOG_RACE_REPRO_CONCURRENCY', 50), iterations: envNumber('EVENT_LOG_RACE_REPRO_ITERATIONS', 3), sleepMs: envNumber('EVENT_LOG_RACE_REPRO_SLEEP_MS', 5000), resumeDelayMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS', 5000), From a9cac4fc01c98ad01dc8b20e01529196acab7d8b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 16:08:09 +0200 Subject: [PATCH 07/10] test: target hook sleep stale step repro --- .../render-event-log-race-repro-results.js | 1 + .github/workflows/event-log-race-repro.yml | 13 +++++--- .../core/e2e/event-log-race-repro.test.ts | 24 ++++---------- .../workflows/101_hook_sleep_repro.ts | 32 ------------------- 4 files changed, 17 insertions(+), 53 deletions(-) diff --git a/.github/scripts/render-event-log-race-repro-results.js b/.github/scripts/render-event-log-race-repro-results.js index 366be40ed9..8d40d4746c 100644 --- a/.github/scripts/render-event-log-race-repro-results.js +++ b/.github/scripts/render-event-log-race-repro-results.js @@ -94,6 +94,7 @@ function render(resultsFile) { console.log('|:--|:--|'); console.log(`| Attempts | ${config.attempts ?? ''} |`); console.log(`| Concurrency | ${config.concurrency ?? ''} |`); + console.log(`| Iterations | ${config.iterations ?? ''} |`); console.log(`| Sleep | ${config.sleepMs ?? ''}ms |`); console.log(`| Resume delay | ${config.resumeDelayMs ?? ''}ms |`); console.log(`| Resume jitter | ${config.resumeJitterMs ?? ''}ms |`); diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml index 0f6c9b35be..ea2448b344 100644 --- a/.github/workflows/event-log-race-repro.yml +++ b/.github/workflows/event-log-race-repro.yml @@ -14,6 +14,10 @@ on: description: 'Number of concurrent workflow runs' required: false default: '50' + iterations: + description: 'Hook/sleep race iterations per workflow run' + required: false + default: '5' run_timeout_ms: description: 'Per-run timeout before classifying as stuck' required: false @@ -25,11 +29,11 @@ on: resume_delay_ms: description: 'Base delay before resuming the hook' required: false - default: '5000' + default: '15000' resume_jitter_ms: description: 'Additional random hook resume delay' required: false - default: '5000' + default: '10000' concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -94,10 +98,11 @@ jobs: VERCEL_WORKFLOW_SERVER_URL: ${{ github.ref != 'refs/heads/main' && secrets.VERCEL_WORKFLOW_SERVER_URL || '' }} EVENT_LOG_RACE_REPRO_ATTEMPTS: ${{ inputs.attempts || '1500' }} EVENT_LOG_RACE_REPRO_CONCURRENCY: ${{ inputs.concurrency || '50' }} + EVENT_LOG_RACE_REPRO_ITERATIONS: ${{ inputs.iterations || '5' }} EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS: ${{ inputs.run_timeout_ms || '150000' }} EVENT_LOG_RACE_REPRO_SLEEP_MS: ${{ inputs.sleep_ms || '5000' }} - EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS: ${{ inputs.resume_delay_ms || '5000' }} - EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS: ${{ inputs.resume_jitter_ms || '5000' }} + EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS: ${{ inputs.resume_delay_ms || '15000' }} + EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS: ${{ inputs.resume_jitter_ms || '10000' }} - name: Render repro summary if: always() diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts index 70e6484433..9084725b87 100644 --- a/packages/core/e2e/event-log-race-repro.test.ts +++ b/packages/core/e2e/event-log-race-repro.test.ts @@ -22,7 +22,7 @@ const RESULT_PATH = path.resolve( 'event-log-race-repro-results.json' ); const WORKFLOW_FILE = 'workflows/101_hook_sleep_repro.ts'; -const WORKFLOW_FN = 'hookSleepStepReproWorkflow'; +const WORKFLOW_FN = 'hookSleepReproWorkflow'; type Outcome = | 'completed' @@ -79,10 +79,10 @@ function envBoolean(name: string, fallback: boolean) { const config: ReproConfig = { attempts: envNumber('EVENT_LOG_RACE_REPRO_ATTEMPTS', 1500), concurrency: envNumber('EVENT_LOG_RACE_REPRO_CONCURRENCY', 50), - iterations: envNumber('EVENT_LOG_RACE_REPRO_ITERATIONS', 3), + iterations: envNumber('EVENT_LOG_RACE_REPRO_ITERATIONS', 5), sleepMs: envNumber('EVENT_LOG_RACE_REPRO_SLEEP_MS', 5000), - resumeDelayMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS', 5000), - resumeJitterMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS', 5000), + resumeDelayMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS', 15_000), + resumeJitterMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS', 10_000), runTimeoutMs: envNumber('EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS', 150_000), hookTimeoutMs: envNumber('EVENT_LOG_RACE_REPRO_HOOK_TIMEOUT_MS', 60_000), sleepBranchWaitCount: envNumber( @@ -173,16 +173,6 @@ function hasWakeBranch(value: unknown) { ); } -function hasHookEvent(value: unknown) { - return ( - !!value && - typeof value === 'object' && - 'event' in value && - !!value.event && - typeof value.event === 'object' - ); -} - async function pollTerminalRun( run: Run, startedAt: number @@ -323,14 +313,14 @@ async function runAttempt(attempt: number): Promise { 30_000, `Timed out reading return value for run ${run.runId}` ); - if (!hasWakeBranch(returnValue) && !hasHookEvent(returnValue)) { + if (!hasWakeBranch(returnValue)) { return { ...runResult, attempt, token, outcome: 'other', - errorCode: 'NO_HOOK_EVENT', - errorMessage: 'Run completed without consuming the hook event.', + errorCode: 'NO_WAKE_BRANCH', + errorMessage: 'Run completed without taking the hook wake branch.', }; } } diff --git a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts index fcb3e79aee..c118b26049 100644 --- a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts +++ b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts @@ -135,35 +135,3 @@ export async function hookSleepReproWorkflow(input: ReproInput) { hook.dispose(); } } - -export async function hookSleepStepReproWorkflow(input: ReproInput) { - 'use workflow'; - - const metadata = getWorkflowMetadata(); - const hook = createHook({ token: input.token }); - const iterator = hook[Symbol.asyncIterator](); - const sleepMs = input.sleepMs ?? 5000; - const finalDelayMs = input.finalDelayMs ?? 0; - - try { - await sleep(sleepMs); - - const synced = await syncStep({ - runId: metadata.workflowRunId, - iteration: 0, - }); - - const event = await iterator.next(); - - if (finalDelayMs > 0) { - await finalStep({ - delayMs: finalDelayMs, - runId: metadata.workflowRunId, - }); - } - - return { event, runId: metadata.workflowRunId, sleepMs, synced }; - } finally { - hook.dispose(); - } -} From 3e139ac6de8668fc06715e7c4b9fc6c979aad762 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 16:54:17 +0200 Subject: [PATCH 08/10] ci: preserve event log repro history --- .../render-event-log-race-repro-results.js | 304 ++++++++++++++---- .github/workflows/event-log-race-repro.yml | 18 ++ 2 files changed, 267 insertions(+), 55 deletions(-) diff --git a/.github/scripts/render-event-log-race-repro-results.js b/.github/scripts/render-event-log-race-repro-results.js index 8d40d4746c..0fa95a9c63 100644 --- a/.github/scripts/render-event-log-race-repro-results.js +++ b/.github/scripts/render-event-log-race-repro-results.js @@ -5,6 +5,9 @@ const fs = require('node:fs'); const args = process.argv.slice(2); let resultsPath = 'event-log-race-repro-results.json'; let runUrl = ''; +let previousCommentPath = ''; +let timestamp = new Date().toISOString(); +let runAttempt = ''; let check = false; for (let index = 0; index < args.length; index += 1) { @@ -12,6 +15,15 @@ for (let index = 0; index < args.length; index += 1) { if (arg === '--run-url' && args[index + 1]) { runUrl = args[index + 1]; index += 1; + } else if (arg === '--previous-comment' && args[index + 1]) { + previousCommentPath = args[index + 1]; + index += 1; + } else if (arg === '--timestamp' && args[index + 1]) { + timestamp = args[index + 1]; + index += 1; + } else if (arg === '--run-attempt' && args[index + 1]) { + runAttempt = args[index + 1]; + index += 1; } else if (arg === '--check') { check = true; } else if (!arg.startsWith('--')) { @@ -19,6 +31,9 @@ for (let index = 0; index < args.length; index += 1) { } } +const historyMarkerStart = ''; + const orderedOutcomes = [ 'completed', 'CORRUPTED_EVENT_LOG', @@ -39,6 +54,34 @@ function loadResults() { return JSON.parse(fs.readFileSync(resultsPath, 'utf8')); } +function loadPreviousComment() { + if (!previousCommentPath || !fs.existsSync(previousCommentPath)) { + return ''; + } + return fs.readFileSync(previousCommentPath, 'utf8'); +} + +function loadHistory(previousComment) { + if (!previousComment) { + return []; + } + + const historyPattern = new RegExp( + `${historyMarkerStart}\\n([\\s\\S]*?)\\n${historyMarkerEnd}` + ); + const match = previousComment.match(historyPattern); + if (!match) { + return []; + } + + try { + const history = JSON.parse(match[1]); + return Array.isArray(history) ? history : []; + } catch { + return []; + } +} + function summarize(results) { const distribution = emptyDistribution(); for (const result of results) { @@ -53,17 +96,107 @@ function nonCompletedCount(distribution) { .reduce((sum, outcome) => sum + (distribution[outcome] ?? 0), 0); } -function renderMissing() { - console.log(''); - console.log('## Event Log Race Repro\n'); - console.log('No result file was produced by the repro job.\n'); - if (runUrl) { - console.log(`Workflow run: ${runUrl}`); +function compactTimestamp(value) { + const parsed = Date.parse(value); + if (Number.isNaN(parsed)) { + return value; } + return `${new Date(parsed).toISOString().replace('T', ' ').slice(0, 16)} UTC`; +} + +function markdownLink(label, href) { + return href ? `[${label}](${href})` : label; +} + +function renderRunHeader(entry) { + const links = [ + entry.runUrl ? markdownLink('logs', entry.runUrl) : '', + entry.deploymentUrl ? markdownLink('deploy', entry.deploymentUrl) : '', + ].filter(Boolean); + const attemptSuffix = entry.runAttempt ? ` #${entry.runAttempt}` : ''; + return `${compactTimestamp(entry.timestamp)}${attemptSuffix}
${links.join(' / ')}`; +} + +function renderCount(value) { + return String(value ?? 0); +} + +function renderResult(entry) { + if (entry.missingResults) { + return 'missing result file'; + } + return entry.failedCount === 0 + ? 'all completed' + : `${entry.failedCount}/${entry.total} non-completed`; +} + +function renderConfig(entry) { + const config = entry.config ?? {}; + const attempts = config.attempts ? `${config.attempts} runs` : ''; + const concurrency = config.concurrency ? `c${config.concurrency}` : ''; + const iterations = config.iterations ? `${config.iterations} iters` : ''; + return [attempts, concurrency, iterations].filter(Boolean).join(' / '); +} + +function renderTiming(entry) { + const config = entry.config ?? {}; + return [ + config.sleepMs ? `sleep ${config.sleepMs}ms` : '', + config.resumeDelayMs || config.resumeJitterMs + ? `resume ${config.resumeDelayMs ?? 0}+${config.resumeJitterMs ?? 0}ms` + : '', + config.runTimeoutMs ? `timeout ${config.runTimeoutMs}ms` : '', + ] + .filter(Boolean) + .join(' / '); } -// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: This is a small markdown renderer for one CI comment. -function render(resultsFile) { +function compactConfig(config = {}) { + return { + attempts: config.attempts, + concurrency: config.concurrency, + iterations: config.iterations, + sleepMs: config.sleepMs, + resumeDelayMs: config.resumeDelayMs, + resumeJitterMs: config.resumeJitterMs, + runTimeoutMs: config.runTimeoutMs, + }; +} + +function compactHistoryEntry(entry, keepFailures = false) { + return { + timestamp: entry.timestamp, + runAttempt: entry.runAttempt, + runUrl: entry.runUrl, + deploymentUrl: entry.deploymentUrl, + missingResults: entry.missingResults, + distribution: entry.distribution ?? emptyDistribution(), + failedCount: entry.failedCount ?? 0, + total: entry.total ?? 0, + config: compactConfig(entry.config), + failing: keepFailures ? (entry.failing ?? []) : [], + truncatedFailingCount: keepFailures + ? (entry.truncatedFailingCount ?? 0) + : 0, + }; +} + +function buildEntry(resultsFile) { + if (!resultsFile) { + return { + timestamp, + runAttempt, + runUrl, + deploymentUrl: '', + missingResults: true, + distribution: emptyDistribution(), + failedCount: 1, + total: 0, + config: {}, + failing: [], + }; + } + const results = resultsFile.results ?? []; const distribution = resultsFile.distribution ?? summarize(results); const failedCount = nonCompletedCount(distribution); @@ -71,75 +204,136 @@ function render(resultsFile) { (sum, outcome) => sum + (distribution[outcome] ?? 0), 0 ); + const failing = results + .filter((result) => result.outcome !== 'completed') + .slice(0, 20) + .map((result) => ({ + attempt: result.attempt, + outcome: result.outcome, + status: result.status, + errorCode: result.errorCode, + runId: result.runId, + dashboardUrl: result.dashboardUrl, + })); - console.log(''); - console.log('## Event Log Race Repro\n'); - console.log( - failedCount === 0 - ? 'All repro runs completed.' - : `${failedCount} of ${total} repro runs did not complete cleanly.` - ); - console.log(''); + return { + timestamp, + runAttempt, + runUrl, + deploymentUrl: resultsFile.deploymentUrl, + missingResults: false, + distribution, + failedCount, + total, + config: compactConfig(resultsFile.config), + failing, + truncatedFailingCount: Math.max( + 0, + results.filter((result) => result.outcome !== 'completed').length - + failing.length + ), + }; +} - console.log('| Outcome | Count |'); - console.log('|:--|--:|'); +function appendHistory(history, entry) { + const key = `${entry.runUrl || entry.timestamp}#${entry.runAttempt}`; + const nextHistory = history + .filter( + (historyEntry) => + `${historyEntry.runUrl || historyEntry.timestamp}#${historyEntry.runAttempt}` !== + key + ) + .map((historyEntry) => compactHistoryEntry(historyEntry)); + nextHistory.push(compactHistoryEntry(entry, true)); + return nextHistory; +} + +function renderHistoryTable(history) { + console.log('### Run History\n'); + console.log(`| Metric | ${history.map(renderRunHeader).join(' | ')} |`); + console.log(`|:--|${history.map(() => ':--').join('|')}|`); + console.log(`| Result | ${history.map(renderResult).join(' | ')} |`); + console.log(`| Total | ${history.map((entry) => entry.total).join(' | ')} |`); for (const outcome of orderedOutcomes) { - console.log(`| ${outcome} | ${distribution[outcome] ?? 0} |`); + console.log( + `| ${outcome} | ${history + .map((entry) => renderCount(entry.distribution?.[outcome])) + .join(' | ')} |` + ); } + console.log(`| Config | ${history.map(renderConfig).join(' | ')} |`); + console.log(`| Timing | ${history.map(renderTiming).join(' | ')} |`); console.log(''); +} - const config = resultsFile.config ?? {}; - console.log('### Configuration\n'); - console.log('| Setting | Value |'); - console.log('|:--|:--|'); - console.log(`| Attempts | ${config.attempts ?? ''} |`); - console.log(`| Concurrency | ${config.concurrency ?? ''} |`); - console.log(`| Iterations | ${config.iterations ?? ''} |`); - console.log(`| Sleep | ${config.sleepMs ?? ''}ms |`); - console.log(`| Resume delay | ${config.resumeDelayMs ?? ''}ms |`); - console.log(`| Resume jitter | ${config.resumeJitterMs ?? ''}ms |`); - console.log(`| Run timeout | ${config.runTimeoutMs ?? ''}ms |`); - if (resultsFile.deploymentUrl) { - console.log(`| Deployment | ${resultsFile.deploymentUrl} |`); +function renderLatestFailures(entry) { + if (entry.missingResults) { + return; } - console.log(''); - const failing = results.filter((result) => result.outcome !== 'completed'); - if (failing.length > 0) { - console.log('### Non-Completed Runs\n'); - console.log('| Attempt | Outcome | Status | Error code | Run |'); - console.log('|--:|:--|:--|:--|:--|'); - for (const result of failing.slice(0, 20)) { - const run = - result.dashboardUrl && result.runId - ? `[${result.runId}](${result.dashboardUrl})` - : (result.runId ?? ''); - console.log( - `| ${result.attempt} | ${result.outcome} | ${result.status ?? ''} | ${result.errorCode ?? ''} | ${run} |` - ); - } - if (failing.length > 20) { - console.log(`\nShowing 20 of ${failing.length} non-completed runs.`); - } - console.log(''); + if (entry.failing.length === 0) { + return; } - if (runUrl) { - console.log(`Workflow run: ${runUrl}`); + console.log('### Latest Non-Completed Runs\n'); + console.log('| Attempt | Outcome | Status | Error code | Run |'); + console.log('|--:|:--|:--|:--|:--|'); + for (const result of entry.failing) { + const run = + result.dashboardUrl && result.runId + ? `[${result.runId}](${result.dashboardUrl})` + : (result.runId ?? ''); + console.log( + `| ${result.attempt} | ${result.outcome} | ${result.status ?? ''} | ${result.errorCode ?? ''} | ${run} |` + ); + } + if (entry.truncatedFailingCount > 0) { + console.log( + `\nShowing 20 of ${entry.failing.length + entry.truncatedFailingCount} non-completed runs.` + ); } + console.log(''); +} + +function render(resultsFile, previousComment) { + const history = appendHistory( + loadHistory(previousComment), + buildEntry(resultsFile) + ); + const latest = history[history.length - 1]; + + console.log(''); + console.log('## Event Log Race Repro\n'); + console.log( + latest.missingResults + ? 'No result file was produced by the latest repro job.' + : latest.failedCount === 0 + ? 'The latest repro job completed all runs cleanly.' + : `${latest.failedCount} of ${latest.total} latest repro runs did not complete cleanly.` + ); + console.log(''); + console.log(historyMarkerStart); + console.log(JSON.stringify(history)); + console.log(historyMarkerEnd); + console.log(''); + + renderHistoryTable(history); + renderLatestFailures(latest); } const resultsFile = loadResults(); +const previousComment = loadPreviousComment(); if (!resultsFile) { if (!check) { - renderMissing(); + render(null, previousComment); } process.exit(check ? 1 : 0); } if (!check) { - render(resultsFile); + render(resultsFile, previousComment); + process.exit(0); } const distribution = diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml index ea2448b344..5eac5d157f 100644 --- a/.github/workflows/event-log-race-repro.yml +++ b/.github/workflows/event-log-race-repro.yml @@ -104,12 +104,30 @@ jobs: EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS: ${{ inputs.resume_delay_ms || '15000' }} EVENT_LOG_RACE_REPRO_RESUME_JITTER_MS: ${{ inputs.resume_jitter_ms || '10000' }} + - name: Fetch previous repro comment + if: always() && github.event_name == 'pull_request' + env: + GH_TOKEN: ${{ github.token }} + run: | + gh api \ + "repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/comments?per_page=100" \ + --jq '[.[] | select(.body | contains(""))][-1].body // ""' \ + > event-log-race-repro-previous-comment.md + - name: Render repro summary if: always() run: | + previous_comment_args=() + if [ -f event-log-race-repro-previous-comment.md ]; then + previous_comment_args=(--previous-comment event-log-race-repro-previous-comment.md) + fi + node .github/scripts/render-event-log-race-repro-results.js \ event-log-race-repro-results.json \ --run-url "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" \ + --run-attempt "${{ github.run_attempt }}" \ + --timestamp "$(date -u +'%Y-%m-%dT%H:%M:%SZ')" \ + "${previous_comment_args[@]}" \ | tee event-log-race-repro-summary.md >> "$GITHUB_STEP_SUMMARY" - name: Update PR comment From ac85f83aa7dbb7d9a3346a62065b65e3f8936a97 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 17:50:10 +0200 Subject: [PATCH 09/10] test: broaden event log race repro coverage --- .../render-event-log-race-repro-results.js | 72 ++- .github/workflows/event-log-race-repro.yml | 19 +- .../core/e2e/event-log-race-repro.test.ts | 476 +++++++++++++++++- .../workflows/101_hook_sleep_repro.ts | 229 +++++++++ 4 files changed, 767 insertions(+), 29 deletions(-) diff --git a/.github/scripts/render-event-log-race-repro-results.js b/.github/scripts/render-event-log-race-repro-results.js index 0fa95a9c63..cf0922833a 100644 --- a/.github/scripts/render-event-log-race-repro-results.js +++ b/.github/scripts/render-event-log-race-repro-results.js @@ -90,6 +90,17 @@ function summarize(results) { return distribution; } +function summarizeByScenario(results) { + const byScenario = {}; + for (const result of results) { + const scenario = result.scenario ?? 'unknown'; + byScenario[scenario] ??= emptyDistribution(); + byScenario[scenario][result.outcome] = + (byScenario[scenario][result.outcome] ?? 0) + 1; + } + return byScenario; +} + function nonCompletedCount(distribution) { return orderedOutcomes .filter((outcome) => outcome !== 'completed') @@ -133,9 +144,21 @@ function renderResult(entry) { function renderConfig(entry) { const config = entry.config ?? {}; const attempts = config.attempts ? `${config.attempts} runs` : ''; + const scenarios = [ + config.hookSleepAttempts ? `hook ${config.hookSleepAttempts}` : '', + config.stepFanoutAttempts ? `fanout ${config.stepFanoutAttempts}` : '', + config.stepSleepRaceAttempts ? `race ${config.stepSleepRaceAttempts}` : '', + ] + .filter(Boolean) + .join(', '); const concurrency = config.concurrency ? `c${config.concurrency}` : ''; + const stepConcurrency = config.stepConcurrency + ? `step c${config.stepConcurrency}` + : ''; const iterations = config.iterations ? `${config.iterations} iters` : ''; - return [attempts, concurrency, iterations].filter(Boolean).join(' / '); + return [attempts, scenarios, concurrency, stepConcurrency, iterations] + .filter(Boolean) + .join(' / '); } function renderTiming(entry) { @@ -154,12 +177,19 @@ function renderTiming(entry) { function compactConfig(config = {}) { return { attempts: config.attempts, + hookSleepAttempts: config.hookSleepAttempts, + stepFanoutAttempts: config.stepFanoutAttempts, + stepSleepRaceAttempts: config.stepSleepRaceAttempts, concurrency: config.concurrency, + stepConcurrency: config.stepConcurrency, iterations: config.iterations, sleepMs: config.sleepMs, resumeDelayMs: config.resumeDelayMs, resumeJitterMs: config.resumeJitterMs, runTimeoutMs: config.runTimeoutMs, + stepFanoutRounds: config.stepFanoutRounds, + stepFanoutWidth: config.stepFanoutWidth, + stepRaceRounds: config.stepRaceRounds, }; } @@ -171,6 +201,7 @@ function compactHistoryEntry(entry, keepFailures = false) { deploymentUrl: entry.deploymentUrl, missingResults: entry.missingResults, distribution: entry.distribution ?? emptyDistribution(), + scenarioDistribution: entry.scenarioDistribution ?? {}, failedCount: entry.failedCount ?? 0, total: entry.total ?? 0, config: compactConfig(entry.config), @@ -193,12 +224,15 @@ function buildEntry(resultsFile) { failedCount: 1, total: 0, config: {}, + scenarioDistribution: {}, failing: [], }; } const results = resultsFile.results ?? []; const distribution = resultsFile.distribution ?? summarize(results); + const scenarioDistribution = + resultsFile.scenarioDistribution ?? summarizeByScenario(results); const failedCount = nonCompletedCount(distribution); const total = orderedOutcomes.reduce( (sum, outcome) => sum + (distribution[outcome] ?? 0), @@ -209,6 +243,7 @@ function buildEntry(resultsFile) { .slice(0, 20) .map((result) => ({ attempt: result.attempt, + scenario: result.scenario, outcome: result.outcome, status: result.status, errorCode: result.errorCode, @@ -223,6 +258,7 @@ function buildEntry(resultsFile) { deploymentUrl: resultsFile.deploymentUrl, missingResults: false, distribution, + scenarioDistribution, failedCount, total, config: compactConfig(resultsFile.config), @@ -266,6 +302,33 @@ function renderHistoryTable(history) { console.log(''); } +function renderLatestScenarioBreakdown(entry) { + if (entry.missingResults) { + return; + } + + const scenarioEntries = Object.entries(entry.scenarioDistribution ?? {}); + if (scenarioEntries.length === 0) { + return; + } + + console.log('### Latest Scenario Breakdown\n'); + console.log(`| Scenario | Total | ${orderedOutcomes.join(' | ')} |`); + console.log(`|:--|--:|${orderedOutcomes.map(() => '--:').join('|')}|`); + for (const [scenario, distribution] of scenarioEntries) { + const total = orderedOutcomes.reduce( + (sum, outcome) => sum + (distribution[outcome] ?? 0), + 0 + ); + console.log( + `| ${scenario} | ${total} | ${orderedOutcomes + .map((outcome) => renderCount(distribution[outcome])) + .join(' | ')} |` + ); + } + console.log(''); +} + function renderLatestFailures(entry) { if (entry.missingResults) { return; @@ -276,15 +339,15 @@ function renderLatestFailures(entry) { } console.log('### Latest Non-Completed Runs\n'); - console.log('| Attempt | Outcome | Status | Error code | Run |'); - console.log('|--:|:--|:--|:--|:--|'); + console.log('| Scenario | Attempt | Outcome | Status | Error code | Run |'); + console.log('|:--|--:|:--|:--|:--|:--|'); for (const result of entry.failing) { const run = result.dashboardUrl && result.runId ? `[${result.runId}](${result.dashboardUrl})` : (result.runId ?? ''); console.log( - `| ${result.attempt} | ${result.outcome} | ${result.status ?? ''} | ${result.errorCode ?? ''} | ${run} |` + `| ${result.scenario ?? ''} | ${result.attempt} | ${result.outcome} | ${result.status ?? ''} | ${result.errorCode ?? ''} | ${run} |` ); } if (entry.truncatedFailingCount > 0) { @@ -318,6 +381,7 @@ function render(resultsFile, previousComment) { console.log(''); renderHistoryTable(history); + renderLatestScenarioBreakdown(latest); renderLatestFailures(latest); } diff --git a/.github/workflows/event-log-race-repro.yml b/.github/workflows/event-log-race-repro.yml index 5eac5d157f..6991c8e11e 100644 --- a/.github/workflows/event-log-race-repro.yml +++ b/.github/workflows/event-log-race-repro.yml @@ -7,13 +7,25 @@ on: workflow_dispatch: inputs: attempts: - description: 'Number of workflow runs to start' + description: 'Number of hook/sleep workflow runs to start' required: false default: '1500' + step_fanout_attempts: + description: 'Number of parallel-step fanout workflow runs to start' + required: false + default: '250' + step_sleep_race_attempts: + description: 'Number of step/sleep race workflow runs to start' + required: false + default: '250' concurrency: description: 'Number of concurrent workflow runs' required: false default: '50' + step_concurrency: + description: 'Number of concurrent step-heavy workflow runs' + required: false + default: '50' iterations: description: 'Hook/sleep race iterations per workflow run' required: false @@ -43,7 +55,7 @@ jobs: event-log-race-repro: name: Event Log Race Repro runs-on: ubuntu-latest - timeout-minutes: 90 + timeout-minutes: 120 if: ${{ github.event_name == 'workflow_dispatch' || contains(github.event.pull_request.labels.*.name, 'event-log-race-repro') }} permissions: contents: read @@ -97,7 +109,10 @@ jobs: WORKFLOW_VERCEL_PROJECT_SLUG: example-nextjs-workflow-turbopack VERCEL_WORKFLOW_SERVER_URL: ${{ github.ref != 'refs/heads/main' && secrets.VERCEL_WORKFLOW_SERVER_URL || '' }} EVENT_LOG_RACE_REPRO_ATTEMPTS: ${{ inputs.attempts || '1500' }} + EVENT_LOG_RACE_REPRO_STEP_FANOUT_ATTEMPTS: ${{ inputs.step_fanout_attempts || '250' }} + EVENT_LOG_RACE_REPRO_STEP_SLEEP_RACE_ATTEMPTS: ${{ inputs.step_sleep_race_attempts || '250' }} EVENT_LOG_RACE_REPRO_CONCURRENCY: ${{ inputs.concurrency || '50' }} + EVENT_LOG_RACE_REPRO_STEP_CONCURRENCY: ${{ inputs.step_concurrency || '50' }} EVENT_LOG_RACE_REPRO_ITERATIONS: ${{ inputs.iterations || '5' }} EVENT_LOG_RACE_REPRO_RUN_TIMEOUT_MS: ${{ inputs.run_timeout_ms || '150000' }} EVENT_LOG_RACE_REPRO_SLEEP_MS: ${{ inputs.sleep_ms || '5000' }} diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts index 9084725b87..c1cae295ee 100644 --- a/packages/core/e2e/event-log-race-repro.test.ts +++ b/packages/core/e2e/event-log-race-repro.test.ts @@ -22,7 +22,12 @@ const RESULT_PATH = path.resolve( 'event-log-race-repro-results.json' ); const WORKFLOW_FILE = 'workflows/101_hook_sleep_repro.ts'; -const WORKFLOW_FN = 'hookSleepReproWorkflow'; + +type Scenario = + | 'hook-sleep' + | 'step-fanout' + | 'step-sleep-race-step-wins' + | 'step-sleep-race-sleep-wins'; type Outcome = | 'completed' @@ -33,8 +38,11 @@ type Outcome = | 'other'; interface ReproConfig { - attempts: number; + hookSleepAttempts: number; + stepFanoutAttempts: number; + stepSleepRaceAttempts: number; concurrency: number; + stepConcurrency: number; iterations: number; sleepMs: number; resumeDelayMs: number; @@ -47,10 +55,23 @@ interface ReproConfig { returnOnWake: boolean; drainDelayMs: number; finalDelayMs: number; + stepFanoutRounds: number; + stepFanoutWidth: number; + stepFanoutDelayMs: number; + stepFanoutDelayJitterMs: number; + stepFanoutAggregateDelayMs: number; + stepFanoutBetweenRoundSleepMs: number; + stepRaceRounds: number; + stepRaceStepWinDelayMs: number; + stepRaceStepWinSleepMs: number; + stepRaceSleepWinDelayMs: number; + stepRaceSleepWinSleepMs: number; + stepRacePostSleepMs: number; } interface ReproRunResult { attempt: number; + scenario: Scenario; token: string; runId?: string; outcome: Outcome; @@ -77,8 +98,17 @@ function envBoolean(name: string, fallback: boolean) { } const config: ReproConfig = { - attempts: envNumber('EVENT_LOG_RACE_REPRO_ATTEMPTS', 1500), + hookSleepAttempts: envNumber('EVENT_LOG_RACE_REPRO_ATTEMPTS', 1500), + stepFanoutAttempts: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_FANOUT_ATTEMPTS', + 250 + ), + stepSleepRaceAttempts: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_SLEEP_RACE_ATTEMPTS', + 250 + ), concurrency: envNumber('EVENT_LOG_RACE_REPRO_CONCURRENCY', 50), + stepConcurrency: envNumber('EVENT_LOG_RACE_REPRO_STEP_CONCURRENCY', 50), iterations: envNumber('EVENT_LOG_RACE_REPRO_ITERATIONS', 5), sleepMs: envNumber('EVENT_LOG_RACE_REPRO_SLEEP_MS', 5000), resumeDelayMs: envNumber('EVENT_LOG_RACE_REPRO_RESUME_DELAY_MS', 15_000), @@ -100,16 +130,55 @@ const config: ReproConfig = { returnOnWake: envBoolean('EVENT_LOG_RACE_REPRO_RETURN_ON_WAKE', true), drainDelayMs: envNumber('EVENT_LOG_RACE_REPRO_DRAIN_DELAY_MS', 0), finalDelayMs: envNumber('EVENT_LOG_RACE_REPRO_FINAL_DELAY_MS', 0), + stepFanoutRounds: envNumber('EVENT_LOG_RACE_REPRO_STEP_FANOUT_ROUNDS', 4), + stepFanoutWidth: envNumber('EVENT_LOG_RACE_REPRO_STEP_FANOUT_WIDTH', 4), + stepFanoutDelayMs: envNumber('EVENT_LOG_RACE_REPRO_STEP_FANOUT_DELAY_MS', 0), + stepFanoutDelayJitterMs: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_FANOUT_DELAY_JITTER_MS', + 75 + ), + stepFanoutAggregateDelayMs: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_FANOUT_AGGREGATE_DELAY_MS', + 0 + ), + stepFanoutBetweenRoundSleepMs: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_FANOUT_BETWEEN_ROUND_SLEEP_MS', + 250 + ), + stepRaceRounds: envNumber('EVENT_LOG_RACE_REPRO_STEP_RACE_ROUNDS', 4), + stepRaceStepWinDelayMs: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_RACE_STEP_WIN_DELAY_MS', + 0 + ), + stepRaceStepWinSleepMs: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_RACE_STEP_WIN_SLEEP_MS', + 5000 + ), + stepRaceSleepWinDelayMs: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_RACE_SLEEP_WIN_DELAY_MS', + 5000 + ), + stepRaceSleepWinSleepMs: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_RACE_SLEEP_WIN_SLEEP_MS', + 500 + ), + stepRacePostSleepMs: envNumber( + 'EVENT_LOG_RACE_REPRO_STEP_RACE_POST_SLEEP_MS', + 1500 + ), }; -async function start( - ...args: Parameters> -): Promise> { - const run = await rawStart(...args); +async function start( + scenario: Scenario, + workflowFn: string, + workflow: { workflowId: string }, + args: unknown[] +): Promise> { + const run = await rawStart(workflow, args); trackRun(run, { - testName: 'event-log-race-repro', + testName: `event-log-race-repro:${scenario}`, workflowFile: WORKFLOW_FILE, - workflowFn: WORKFLOW_FN, + workflowFn, }); return run; } @@ -173,9 +242,74 @@ function hasWakeBranch(value: unknown) { ); } +function isRecord(value: unknown): value is Record { + return !!value && typeof value === 'object'; +} + +function validateFanoutReturn(value: unknown) { + if (!isRecord(value)) { + return 'Run returned a non-object value.'; + } + + const records = value.roundRecords; + if (!Array.isArray(records)) { + return 'Run did not return roundRecords.'; + } + + if (records.length !== config.stepFanoutRounds) { + return `Expected ${config.stepFanoutRounds} fanout rounds, got ${records.length}.`; + } + + for (const [index, record] of records.entries()) { + if (!isRecord(record)) { + return `Round ${index} record was not an object.`; + } + if (record.round !== index) { + return `Round ${index} returned unexpected round ${String(record.round)}.`; + } + if (record.count !== config.stepFanoutWidth) { + return `Round ${index} aggregated ${String(record.count)} steps instead of ${config.stepFanoutWidth}.`; + } + if (typeof record.checksum !== 'string') { + return `Round ${index} did not return a checksum.`; + } + } +} + +function validateStepRaceReturn( + value: unknown, + expectedBranch: 'sleep' | 'step' +) { + if (!isRecord(value)) { + return 'Run returned a non-object value.'; + } + + const branches = value.branches; + if (!Array.isArray(branches)) { + return 'Run did not return branches.'; + } + + if (branches.length !== config.stepRaceRounds) { + return `Expected ${config.stepRaceRounds} race rounds, got ${branches.length}.`; + } + + for (const [index, branch] of branches.entries()) { + if (!isRecord(branch)) { + return `Race round ${index} record was not an object.`; + } + if (branch.round !== index) { + return `Race round ${index} returned unexpected round ${String(branch.round)}.`; + } + if (branch.branch !== expectedBranch) { + return `Race round ${index} took ${String(branch.branch)} branch instead of ${expectedBranch}.`; + } + } +} + async function pollTerminalRun( run: Run, - startedAt: number + startedAt: number, + scenario: Scenario ): Promise { const world = await getWorld(); const deadline = startedAt + config.runTimeoutMs; @@ -188,6 +322,7 @@ async function pollTerminalRun( if (runData.status === 'completed') { return { attempt: -1, + scenario, token: '', runId: run.runId, outcome: 'completed', @@ -200,6 +335,7 @@ async function pollTerminalRun( if (runData.status === 'failed') { return { attempt: -1, + scenario, token: '', runId: run.runId, outcome: classifyFailure(runData.errorCode), @@ -213,6 +349,7 @@ async function pollTerminalRun( if (runData.status === 'cancelled') { return { attempt: -1, + scenario, token: '', runId: run.runId, outcome: 'other', @@ -228,6 +365,7 @@ async function pollTerminalRun( return { attempt: -1, + scenario, token: '', runId: run.runId, outcome: 'stuck', @@ -248,9 +386,10 @@ async function withTimeout( return await Promise.race([promise, timeout]); } -async function runAttempt(attempt: number): Promise { +async function runHookSleepAttempt(attempt: number): Promise { + const scenario: Scenario = 'hook-sleep'; const startedAt = Date.now(); - const token = `event-log-race-${Date.now()}-${attempt}-${Math.random() + const token = `event-log-race-${scenario}-${Date.now()}-${attempt}-${Math.random() .toString(36) .slice(2)}`; @@ -258,9 +397,9 @@ async function runAttempt(attempt: number): Promise { const workflow = await getWorkflowMetadata( deploymentUrl, WORKFLOW_FILE, - WORKFLOW_FN + 'hookSleepReproWorkflow' ); - const run = await start(workflow, [ + const run = await start(scenario, 'hookSleepReproWorkflow', workflow, [ { token, iterations: config.iterations, @@ -284,7 +423,7 @@ async function runAttempt(attempt: number): Promise { resumeHook(hook, { attempt, sentAt: Date.now() }) ); - const runResult = await pollTerminalRun(run, startedAt); + const runResult = await pollTerminalRun(run, startedAt, scenario); const resumeResult = await Promise.allSettled([ withTimeout( resumePromise, @@ -301,6 +440,7 @@ async function runAttempt(attempt: number): Promise { return { ...runResult, attempt, + scenario, token, outcome: 'other', errorCode: 'HOOK_RESUME_FAILED', @@ -317,6 +457,7 @@ async function runAttempt(attempt: number): Promise { return { ...runResult, attempt, + scenario, token, outcome: 'other', errorCode: 'NO_WAKE_BRANCH', @@ -328,6 +469,7 @@ async function runAttempt(attempt: number): Promise { return { ...runResult, attempt, + scenario, token, errorMessage: runResult.errorMessage ?? @@ -339,6 +481,7 @@ async function runAttempt(attempt: number): Promise { if (WorkflowRunFailedError.is(err)) { return { attempt, + scenario, token, runId: err.runId, outcome: classifyFailure(err.errorCode), @@ -352,6 +495,197 @@ async function runAttempt(attempt: number): Promise { return { attempt, + scenario, + token, + outcome: 'other', + errorMessage: err instanceof Error ? err.message : String(err), + durationMs: Date.now() - startedAt, + }; + } +} + +async function resumeGate( + scenario: Scenario, + attempt: number, + token: string, + runId: string +) { + const hook = await waitForHook(token, runId); + await resumeHook(hook, { attempt, scenario, sentAt: Date.now() }); +} + +async function runStepFanoutAttempt(attempt: number): Promise { + const scenario: Scenario = 'step-fanout'; + const startedAt = Date.now(); + const token = `event-log-race-${scenario}-${Date.now()}-${attempt}-${Math.random() + .toString(36) + .slice(2)}`; + + try { + const workflow = await getWorkflowMetadata( + deploymentUrl, + WORKFLOW_FILE, + 'stepFanoutReplayReproWorkflow' + ); + const run = await start( + scenario, + 'stepFanoutReplayReproWorkflow', + workflow, + [ + { + aggregateDelayMs: config.stepFanoutAggregateDelayMs, + betweenRoundSleepMs: config.stepFanoutBetweenRoundSleepMs, + rounds: config.stepFanoutRounds, + stepDelayJitterMs: config.stepFanoutDelayJitterMs, + stepDelayMs: config.stepFanoutDelayMs, + token, + width: config.stepFanoutWidth, + }, + ] + ); + + await resumeGate(scenario, attempt, token, run.runId); + + const runResult = await pollTerminalRun(run, startedAt, scenario); + if (runResult.outcome === 'completed') { + const returnValue = await withTimeout( + run.returnValue, + 30_000, + `Timed out reading return value for run ${run.runId}` + ); + const validationError = validateFanoutReturn(returnValue); + if (validationError) { + return { + ...runResult, + attempt, + scenario, + token, + outcome: 'other', + errorCode: 'BAD_FANOUT_RETURN', + errorMessage: validationError, + }; + } + } + + return { + ...runResult, + attempt, + scenario, + token, + }; + } catch (err) { + if (WorkflowRunFailedError.is(err)) { + return { + attempt, + scenario, + token, + runId: err.runId, + outcome: classifyFailure(err.errorCode), + status: 'failed', + errorCode: err.errorCode, + errorMessage: err.message, + durationMs: Date.now() - startedAt, + dashboardUrl: getDashboardUrl(err.runId), + }; + } + + return { + attempt, + scenario, + token, + outcome: 'other', + errorMessage: err instanceof Error ? err.message : String(err), + durationMs: Date.now() - startedAt, + }; + } +} + +async function runStepSleepRaceAttempt( + attempt: number, + expectedBranch: 'sleep' | 'step' +): Promise { + const scenario: Scenario = + expectedBranch === 'step' + ? 'step-sleep-race-step-wins' + : 'step-sleep-race-sleep-wins'; + const startedAt = Date.now(); + const token = `event-log-race-${scenario}-${Date.now()}-${attempt}-${Math.random() + .toString(36) + .slice(2)}`; + + try { + const workflow = await getWorkflowMetadata( + deploymentUrl, + WORKFLOW_FILE, + 'stepSleepRaceReproWorkflow' + ); + const run = await start(scenario, 'stepSleepRaceReproWorkflow', workflow, [ + { + postRaceSleepMs: config.stepRacePostSleepMs, + rounds: config.stepRaceRounds, + sleepMs: + expectedBranch === 'step' + ? config.stepRaceStepWinSleepMs + : config.stepRaceSleepWinSleepMs, + stepDelayMs: + expectedBranch === 'step' + ? config.stepRaceStepWinDelayMs + : config.stepRaceSleepWinDelayMs, + token, + }, + ]); + + await resumeGate(scenario, attempt, token, run.runId); + + const runResult = await pollTerminalRun(run, startedAt, scenario); + if (runResult.outcome === 'completed') { + const returnValue = await withTimeout( + run.returnValue, + 30_000, + `Timed out reading return value for run ${run.runId}` + ); + const validationError = validateStepRaceReturn( + returnValue, + expectedBranch + ); + if (validationError) { + return { + ...runResult, + attempt, + scenario, + token, + outcome: 'other', + errorCode: 'BAD_STEP_RACE_RETURN', + errorMessage: validationError, + }; + } + } + + return { + ...runResult, + attempt, + scenario, + token, + }; + } catch (err) { + if (WorkflowRunFailedError.is(err)) { + return { + attempt, + scenario, + token, + runId: err.runId, + outcome: classifyFailure(err.errorCode), + status: 'failed', + errorCode: err.errorCode, + errorMessage: err.message, + durationMs: Date.now() - startedAt, + dashboardUrl: getDashboardUrl(err.runId), + }; + } + + return { + attempt, + scenario, token, outcome: 'other', errorMessage: err instanceof Error ? err.message : String(err), @@ -403,6 +737,56 @@ function summarize(results: ReproRunResult[]) { ); } +function summarizeByScenario(results: ReproRunResult[]) { + return results.reduce>>( + (acc, result) => { + acc[result.scenario][result.outcome] += 1; + return acc; + }, + { + 'hook-sleep': { + completed: 0, + CORRUPTED_EVENT_LOG: 0, + USER_ERROR: 0, + RUNTIME_ERROR: 0, + stuck: 0, + other: 0, + }, + 'step-fanout': { + completed: 0, + CORRUPTED_EVENT_LOG: 0, + USER_ERROR: 0, + RUNTIME_ERROR: 0, + stuck: 0, + other: 0, + }, + 'step-sleep-race-step-wins': { + completed: 0, + CORRUPTED_EVENT_LOG: 0, + USER_ERROR: 0, + RUNTIME_ERROR: 0, + stuck: 0, + other: 0, + }, + 'step-sleep-race-sleep-wins': { + completed: 0, + CORRUPTED_EVENT_LOG: 0, + USER_ERROR: 0, + RUNTIME_ERROR: 0, + stuck: 0, + other: 0, + }, + } + ); +} + +function buildResultConfig(results: ReproRunResult[]) { + return { + ...config, + attempts: results.length, + }; +} + function writeResults(results: ReproRunResult[]) { fs.writeFileSync( RESULT_PATH, @@ -410,8 +794,9 @@ function writeResults(results: ReproRunResult[]) { { completedAt: new Date().toISOString(), deploymentUrl, - config, + config: buildResultConfig(results), distribution: summarize(results), + scenarioDistribution: summarizeByScenario(results), results, }, null, @@ -420,8 +805,34 @@ function writeResults(results: ReproRunResult[]) { ); } +async function runScenario( + attempts: number, + concurrency: number, + run: (attempt: number) => Promise +) { + if (attempts <= 0) { + return []; + } + const attemptNumbers = Array.from( + { length: attempts }, + (_, index) => index + 1 + ); + return await mapLimit(attemptNumbers, concurrency, run); +} + const testTimeoutMs = - config.runTimeoutMs * Math.ceil(config.attempts / config.concurrency) + + config.runTimeoutMs * + Math.ceil(config.hookSleepAttempts / config.concurrency) + + config.runTimeoutMs * + Math.ceil(config.stepFanoutAttempts / config.stepConcurrency) + + config.runTimeoutMs * + Math.ceil( + Math.ceil(config.stepSleepRaceAttempts / 2) / config.stepConcurrency + ) + + config.runTimeoutMs * + Math.ceil( + Math.floor(config.stepSleepRaceAttempts / 2) / config.stepConcurrency + ) + 60_000; describe('event log race repro', () => { @@ -430,14 +841,33 @@ describe('event log race repro', () => { }); test( - 'hook/sleep race does not corrupt or stall runs', + 'event log races do not corrupt, stall, or take stale branches', { timeout: testTimeoutMs }, async () => { - const attempts = Array.from( - { length: config.attempts }, - (_, index) => index + 1 - ); - const results = await mapLimit(attempts, config.concurrency, runAttempt); + const stepWinsAttempts = Math.ceil(config.stepSleepRaceAttempts / 2); + const sleepWinsAttempts = Math.floor(config.stepSleepRaceAttempts / 2); + const results = [ + ...(await runScenario( + config.hookSleepAttempts, + config.concurrency, + runHookSleepAttempt + )), + ...(await runScenario( + config.stepFanoutAttempts, + config.stepConcurrency, + runStepFanoutAttempt + )), + ...(await runScenario( + stepWinsAttempts, + config.stepConcurrency, + (attempt) => runStepSleepRaceAttempt(attempt, 'step') + )), + ...(await runScenario( + sleepWinsAttempts, + config.stepConcurrency, + (attempt) => runStepSleepRaceAttempt(attempt, 'sleep') + )), + ]; writeResults(results); const nonCompleted = results.filter( diff --git a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts index c118b26049..0dd3ed6305 100644 --- a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts +++ b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts @@ -17,6 +17,12 @@ interface WakePayload { sentAt: number; } +interface GatePayload { + attempt: number; + scenario: string; + sentAt: number; +} + interface RaceBranchRecord { branch: 'sleep' | 'wake'; iteration: number; @@ -28,6 +34,49 @@ type RaceBranch = | { kind: 'sleep' } | { kind: 'hook'; event: IteratorResult }; +interface StepFanoutInput { + token: string; + rounds?: number; + width?: number; + stepDelayMs?: number; + stepDelayJitterMs?: number; + aggregateDelayMs?: number; + betweenRoundSleepMs?: number; +} + +interface FanoutStepResult { + runId: string; + round: number; + index: number; + completedAt: number; +} + +interface FanoutRoundRecord { + runId: string; + round: number; + count: number; + checksum: string; + completedAt: number; +} + +interface StepSleepRaceInput { + token: string; + rounds?: number; + stepDelayMs?: number; + sleepMs?: number; + postRaceSleepMs?: number; +} + +type StepSleepRaceBranch = + | { kind: 'sleep' } + | { kind: 'step'; value: StepRaceResult }; + +interface StepRaceResult { + runId: string; + round: number; + completedAt: number; +} + async function syncStep(input: { runId: string; iteration: number }) { 'use step'; return { ...input, syncedAt: Date.now() }; @@ -53,6 +102,70 @@ async function finalStep(input: { delayMs: number; runId: string }) { return { ...input, finishedAt: Date.now() }; } +async function fanoutStep(input: { + delayMs: number; + runId: string; + round: number; + index: number; +}): Promise { + 'use step'; + if (input.delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, input.delayMs)); + } + return { + runId: input.runId, + round: input.round, + index: input.index, + completedAt: Date.now(), + }; +} + +async function aggregateFanoutStep(input: { + delayMs: number; + runId: string; + round: number; + values: FanoutStepResult[]; +}): Promise { + 'use step'; + if (input.delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, input.delayMs)); + } + return { + runId: input.runId, + round: input.round, + count: input.values.length, + checksum: input.values + .map((value) => `${value.round}:${value.index}`) + .join(','), + completedAt: Date.now(), + }; +} + +async function racedStep(input: { + delayMs: number; + runId: string; + round: number; +}): Promise { + 'use step'; + if (input.delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, input.delayMs)); + } + return { + runId: input.runId, + round: input.round, + completedAt: Date.now(), + }; +} + +async function branchMarkerStep(input: { + branch: 'sleep' | 'step'; + runId: string; + round: number; +}) { + 'use step'; + return { ...input, markedAt: Date.now() }; +} + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: Keep this close to the shared repro shape. export async function hookSleepReproWorkflow(input: ReproInput) { 'use workflow'; @@ -135,3 +248,119 @@ export async function hookSleepReproWorkflow(input: ReproInput) { hook.dispose(); } } + +export async function stepFanoutReplayReproWorkflow(input: StepFanoutInput) { + 'use workflow'; + + const metadata = getWorkflowMetadata(); + const hook = createHook({ token: input.token }); + const iterator = hook[Symbol.asyncIterator](); + + const rounds = input.rounds ?? 4; + const width = input.width ?? 4; + const stepDelayMs = input.stepDelayMs ?? 0; + const stepDelayJitterMs = input.stepDelayJitterMs ?? 50; + const aggregateDelayMs = input.aggregateDelayMs ?? 0; + const betweenRoundSleepMs = input.betweenRoundSleepMs ?? 0; + const roundRecords: FanoutRoundRecord[] = []; + + try { + await iterator.next(); + + for (let round = 0; round < rounds; round += 1) { + const values = await Promise.all( + Array.from({ length: width }, (_, index) => + fanoutStep({ + delayMs: stepDelayMs + ((round + index) % 2) * stepDelayJitterMs, + index, + round, + runId: metadata.workflowRunId, + }) + ) + ); + + roundRecords.push( + await aggregateFanoutStep({ + delayMs: aggregateDelayMs, + round, + runId: metadata.workflowRunId, + values, + }) + ); + + if (betweenRoundSleepMs > 0) { + await sleep(betweenRoundSleepMs); + } + } + + return { + runId: metadata.workflowRunId, + roundRecords, + rounds, + width, + }; + } finally { + hook.dispose(); + } +} + +export async function stepSleepRaceReproWorkflow(input: StepSleepRaceInput) { + 'use workflow'; + + const metadata = getWorkflowMetadata(); + const hook = createHook({ token: input.token }); + const iterator = hook[Symbol.asyncIterator](); + + const rounds = input.rounds ?? 4; + const stepDelayMs = input.stepDelayMs ?? 0; + const sleepMs = input.sleepMs ?? 1000; + const postRaceSleepMs = input.postRaceSleepMs ?? 0; + const branches: Array<{ + branch: 'sleep' | 'step'; + marker: unknown; + round: number; + value?: StepRaceResult; + }> = []; + + try { + await iterator.next(); + + for (let round = 0; round < rounds; round += 1) { + const result = await Promise.race([ + racedStep({ + delayMs: stepDelayMs, + round, + runId: metadata.workflowRunId, + }).then((value) => ({ kind: 'step' as const, value })), + sleep(sleepMs).then(() => ({ kind: 'sleep' as const })), + ]); + + const marker = await branchMarkerStep({ + branch: result.kind, + round, + runId: metadata.workflowRunId, + }); + + branches.push({ + branch: result.kind, + marker, + round, + value: result.kind === 'step' ? result.value : undefined, + }); + + if (postRaceSleepMs > 0) { + await sleep(postRaceSleepMs); + } + } + + return { + branches, + runId: metadata.workflowRunId, + rounds, + sleepMs, + stepDelayMs, + }; + } finally { + hook.dispose(); + } +} From ebde477160309503233d2deb7beaafd4c5e9d873 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 29 May 2026 18:22:17 +0200 Subject: [PATCH 10/10] test: avoid assuming step race wall-clock winner --- .../core/e2e/event-log-race-repro.test.ts | 50 ++++++++++--------- .../workflows/101_hook_sleep_repro.ts | 28 +++++++---- 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts index c1cae295ee..ce58ca7080 100644 --- a/packages/core/e2e/event-log-race-repro.test.ts +++ b/packages/core/e2e/event-log-race-repro.test.ts @@ -26,8 +26,8 @@ const WORKFLOW_FILE = 'workflows/101_hook_sleep_repro.ts'; type Scenario = | 'hook-sleep' | 'step-fanout' - | 'step-sleep-race-step-wins' - | 'step-sleep-race-sleep-wins'; + | 'step-sleep-race-step-biased' + | 'step-sleep-race-sleep-biased'; type Outcome = | 'completed' @@ -276,10 +276,7 @@ function validateFanoutReturn(value: unknown) { } } -function validateStepRaceReturn( - value: unknown, - expectedBranch: 'sleep' | 'step' -) { +function validateStepRaceReturn(value: unknown) { if (!isRecord(value)) { return 'Run returned a non-object value.'; } @@ -300,8 +297,16 @@ function validateStepRaceReturn( if (branch.round !== index) { return `Race round ${index} returned unexpected round ${String(branch.round)}.`; } - if (branch.branch !== expectedBranch) { - return `Race round ${index} took ${String(branch.branch)} branch instead of ${expectedBranch}.`; + if (branch.branch !== 'sleep' && branch.branch !== 'step') { + return `Race round ${index} took unexpected ${String(branch.branch)} branch.`; + } + + const marker = branch.marker; + if (!isRecord(marker)) { + return `Race round ${index} marker was not an object.`; + } + if (marker.branch !== branch.branch) { + return `Race round ${index} marker branch ${String(marker.branch)} did not match returned branch ${String(branch.branch)}.`; } } } @@ -602,12 +607,12 @@ async function runStepFanoutAttempt(attempt: number): Promise { async function runStepSleepRaceAttempt( attempt: number, - expectedBranch: 'sleep' | 'step' + bias: 'sleep' | 'step' ): Promise { const scenario: Scenario = - expectedBranch === 'step' - ? 'step-sleep-race-step-wins' - : 'step-sleep-race-sleep-wins'; + bias === 'step' + ? 'step-sleep-race-step-biased' + : 'step-sleep-race-sleep-biased'; const startedAt = Date.now(); const token = `event-log-race-${scenario}-${Date.now()}-${attempt}-${Math.random() .toString(36) @@ -624,11 +629,11 @@ async function runStepSleepRaceAttempt( postRaceSleepMs: config.stepRacePostSleepMs, rounds: config.stepRaceRounds, sleepMs: - expectedBranch === 'step' + bias === 'step' ? config.stepRaceStepWinSleepMs : config.stepRaceSleepWinSleepMs, stepDelayMs: - expectedBranch === 'step' + bias === 'step' ? config.stepRaceStepWinDelayMs : config.stepRaceSleepWinDelayMs, token, @@ -644,10 +649,7 @@ async function runStepSleepRaceAttempt( 30_000, `Timed out reading return value for run ${run.runId}` ); - const validationError = validateStepRaceReturn( - returnValue, - expectedBranch - ); + const validationError = validateStepRaceReturn(returnValue); if (validationError) { return { ...runResult, @@ -760,7 +762,7 @@ function summarizeByScenario(results: ReproRunResult[]) { stuck: 0, other: 0, }, - 'step-sleep-race-step-wins': { + 'step-sleep-race-step-biased': { completed: 0, CORRUPTED_EVENT_LOG: 0, USER_ERROR: 0, @@ -768,7 +770,7 @@ function summarizeByScenario(results: ReproRunResult[]) { stuck: 0, other: 0, }, - 'step-sleep-race-sleep-wins': { + 'step-sleep-race-sleep-biased': { completed: 0, CORRUPTED_EVENT_LOG: 0, USER_ERROR: 0, @@ -844,8 +846,8 @@ describe('event log race repro', () => { 'event log races do not corrupt, stall, or take stale branches', { timeout: testTimeoutMs }, async () => { - const stepWinsAttempts = Math.ceil(config.stepSleepRaceAttempts / 2); - const sleepWinsAttempts = Math.floor(config.stepSleepRaceAttempts / 2); + const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2); + const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2); const results = [ ...(await runScenario( config.hookSleepAttempts, @@ -858,12 +860,12 @@ describe('event log race repro', () => { runStepFanoutAttempt )), ...(await runScenario( - stepWinsAttempts, + stepBiasedAttempts, config.stepConcurrency, (attempt) => runStepSleepRaceAttempt(attempt, 'step') )), ...(await runScenario( - sleepWinsAttempts, + sleepBiasedAttempts, config.stepConcurrency, (attempt) => runStepSleepRaceAttempt(attempt, 'sleep') )), diff --git a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts index 0dd3ed6305..13370a33d4 100644 --- a/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts +++ b/workbench/nextjs-turbopack/workflows/101_hook_sleep_repro.ts @@ -157,13 +157,14 @@ async function racedStep(input: { }; } -async function branchMarkerStep(input: { - branch: 'sleep' | 'step'; - runId: string; - round: number; -}) { +async function stepBranchMarkerStep(input: { runId: string; round: number }) { + 'use step'; + return { ...input, branch: 'step' as const, markedAt: Date.now() }; +} + +async function sleepBranchMarkerStep(input: { runId: string; round: number }) { 'use step'; - return { ...input, markedAt: Date.now() }; + return { ...input, branch: 'sleep' as const, markedAt: Date.now() }; } // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: Keep this close to the shared repro shape. @@ -335,11 +336,16 @@ export async function stepSleepRaceReproWorkflow(input: StepSleepRaceInput) { sleep(sleepMs).then(() => ({ kind: 'sleep' as const })), ]); - const marker = await branchMarkerStep({ - branch: result.kind, - round, - runId: metadata.workflowRunId, - }); + const marker = + result.kind === 'step' + ? await stepBranchMarkerStep({ + round, + runId: metadata.workflowRunId, + }) + : await sleepBranchMarkerStep({ + round, + runId: metadata.workflowRunId, + }); branches.push({ branch: result.kind,