From 30e46ba4052ab341ff2714d6d0da9d7afd825f6a Mon Sep 17 00:00:00 2001 From: Tanner Linsley Date: Thu, 21 May 2026 20:15:52 -0600 Subject: [PATCH 1/2] =?UTF-8?q?docs(research):=20add=20SCHEDULING.md=20?= =?UTF-8?q?=E2=80=94=20cron=20landscape=20+=20future=20package=20shape?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Captures research on how major workflow engines (Inngest, Trigger.dev, Temporal, Cloudflare Workflows, Hatchet, DBOS, Restate, AWS Step Functions, Vercel WDK) model recurring execution. Five consistent patterns surface; informs the shape of a future @tanstack/workflow-cron package. Key finding: every mature engine separates the cron from the workflow body and fires fresh invocations per tick. Nobody runs production workflows as "loop forever with sleep." The closure engine already supports the fresh-per-tick model — a scheduler is purely additive. Also touches up research/README.md to call the directory a sketchpad for upcoming work, not strictly historical. --- research/README.md | 3 +- research/SCHEDULING.md | 93 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 research/SCHEDULING.md diff --git a/research/README.md b/research/README.md index 7bb76cb..618b0a7 100644 --- a/research/README.md +++ b/research/README.md @@ -1,6 +1,6 @@ # Research archive -Point-in-time design notes from the planning phase that led to `@tanstack/workflow-core`. **Not maintained.** Treat as historical context for why the engine is shaped the way it is. +Design notes captured at decision points. **Not maintained as living docs** — treat as historical context for why the engine is shaped the way it is, and a sketchpad for upcoming work. If you want current docs, see [/docs](../docs/) and [packages/workflow-core/README.md](../packages/workflow-core/README.md). The current API and engine may differ from what's described here in places — these were exploratory snapshots, not specs. @@ -13,6 +13,7 @@ If you want current docs, see [/docs](../docs/) and [packages/workflow-core/READ | [PRIOR_ART_AI_ORCHESTRATION.md](PRIOR_ART_AI_ORCHESTRATION.md) | Inventory of Alem Tuzlak + Tom Beckenham's existing generator-based engine in `@tanstack/ai-orchestration` ([TanStack/ai#542](https://github.com/TanStack/ai/pull/542)) — the parent we extracted from. | Engine extracted. AI surface (agents, orchestrators, AG-UI events) stays in `ai-orchestration`. | | [SRC_SKEW_AND_RESUMPTION.md](SRC_SKEW_AND_RESUMPTION.md) | Analysis of fingerprint-based source-skew handling and its gaps (Prettier reformat / minifier drift / silent corruption in patch mode). | Motivated the move to explicit versioning. | | [EXPLICIT_VERSIONING.md](EXPLICIT_VERSIONING.md) | Alternative design: explicit `version` + `previousVersions` registry + lint-time lock file, replacing runtime fingerprinting. | **Shipped.** `createWorkflow({ version }).previousVersions([...])` + version-routing engine. Lockfile + ESLint plugin still to come. | +| [SCHEDULING.md](SCHEDULING.md) | How major workflow engines model cron / recurring execution; what a future `@tanstack/workflow-cron` would look like. | Forward-looking. No package yet. Engine needs no changes; deferred until a durable storage adapter ships. | ## How these came to be diff --git a/research/SCHEDULING.md b/research/SCHEDULING.md new file mode 100644 index 0000000..ddeea70 --- /dev/null +++ b/research/SCHEDULING.md @@ -0,0 +1,93 @@ +# Scheduling and cron — landscape + +How major workflow engines model recurring execution, and what that implies for TanStack Workflow. + +## What we have today + +- `ctx.sleep(ms)` / `ctx.sleepUntil(timestamp)` durably pause a run by emitting `SIGNAL_AWAITED { name: '__timer', deadline }`. +- `RunState.waitingFor = { signalName: '__timer', deadline }` is persisted, so out-of-process workers can discover pending wakes by querying the store. + +No timer driver and no cron primitive ship in `@tanstack/workflow-core`. The engine emits deadlines; nothing currently consumes them. + +## How others handle it + +| Library | Where the cron is declared | Who fires the tick | Workflow body | Overlap handling | +| ------------------------- | ---------------------------------------------------------------------------------------------------------- | ------------------------- | ------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------- | +| **Inngest** | On the function: `inngest.createFunction({ id, cron: '0 9 * * MON' }, ...)` | Inngest control plane | Fresh invocation per tick | Per-function `concurrency` | +| **Trigger.dev** | On the task: `schedules.task({ cron: '0 * * * *' }, ...)` | Trigger scheduler service | Fresh run per tick | `queue` + `concurrencyKey` | +| **Temporal** | Separate `Schedule` resource (recommended over the legacy CronWorkflow option) | Temporal matching service | Fresh workflow execution per tick | Explicit `overlapPolicy`: skip / buffer-one / buffer-all / cancel-other / terminate-other / allow-all | +| **Cloudflare Workflows** | `wrangler.toml`'s `[[triggers.crons]]` on the Worker that _starts_ the workflow | Cloudflare edge cron | Fresh workflow instance per tick | Application-level | +| **Hatchet** | `@hatchet.workflow(schedule="0 9 * * MON")` decorator or fluent API | Hatchet engine | Fresh run per tick | `concurrency_limit` + group keys | +| **DBOS** | `@DBOS.scheduled('0 9 * * MON')` decorator | DBOS runtime | Fresh invocation per tick | Per-workflow `WorkflowQueue` | +| **Restate** | `ctx.workflowSendDelayed(...)` for one-shot delays; recurring is built by the user with self-delayed sends | Restate server | Fresh handler invocation; the workflow self-schedules the next tick at the end of each run | Single-writer per object/workflow id | +| **AWS Step Functions** | EventBridge Rule / EventBridge Scheduler outside the state machine | EventBridge | Fresh execution per tick | Application-level (Distributed Map has its own controls) | +| **Vercel WDK** | Not in the SDK. Vercel Cron Jobs hits an HTTP route that starts the workflow | Vercel Cron service | Fresh workflow per route invocation | Application-level | +| **Mastra / LangGraph.js** | Not built-in; users wire to external scheduler | External | Fresh run | Application-level | + +## Patterns that come up consistently + +**1. The cron metadata lives at the registration site, not in the workflow body.** Every mature engine separates _what the workflow does_ from _when it runs_. The workflow doesn't know it's being scheduled. + +**2. Each tick is a fresh execution.** No "loop with sleep" production pattern anywhere. Reasons: + +- Log doesn't grow unbounded. +- Replay cost stays constant per tick. +- "When's the next run?" is answerable from the schedule, not by inspecting a running workflow's pause state. +- Overlap policies are well-defined (skip if previous still running, buffer, etc.). +- Failed runs don't block the next tick. + +**3. The scheduler is its own service.** Inngest, Trigger, Temporal, Hatchet, DBOS all run a scheduling component separate from the engine that drives workflow execution. It polls a schedule table / cron expression and fires new invocations. + +**4. Overlap policies are explicit.** Temporal's six-option enum is the gold standard. Inngest / Trigger / Hatchet have variations. **Cloudflare and AWS push this to the user.** Restate's single-writer-per-object property gets it for free at the cost of forcing object-shaped modeling. + +**5. Cron expression vs. delay-based.** Most use cron strings (`0 9 * * MON`). Restate's self-delayed-send model is the outlier — workflows reschedule themselves by enqueueing a delayed invocation at the end of each run. Durable, no separate scheduler, but requires the workflow to be aware of its own recurrence. + +## Implications for TanStack Workflow + +The closure engine already supports the "fresh invocation per tick" model — that's literally just calling `runWorkflow(...)` repeatedly. So a future `@tanstack/workflow-cron` package would be small. + +### Sketch + +```ts +// Hypothetical +import { createSchedule, runSchedules } from '@tanstack/workflow-cron' + +createSchedule({ + id: 'daily-report', + workflow: dailyReport, // a normal workflow definition + cron: '0 9 * * MON', + input: () => ({ runId: crypto.randomUUID() }), + overlapPolicy: 'skip', // skip | buffer | cancel-previous | allow +}) + +// Run by a worker process (or DO alarm, or AWS scheduled task) +await runSchedules({ runStore, scheduleStore }) +``` + +Two pieces: + +- **Schedule definitions** — declarative, sit next to workflow definitions. +- **A driver** — polls a schedule store, computes "next fire time," fires `runWorkflow`. Can be deployed as a long-running worker, Durable Object alarm, AWS Lambda + EventBridge, Cloudflare Cron Trigger, etc. + +### Three deployment options without re-implementing the scheduler + +1. **Bring your own scheduler.** Most projects already have one (`node-cron`, EventBridge, Cloudflare Crons). Just call `runWorkflow` from it. Zero new package. +2. **Embedded driver.** `@tanstack/workflow-cron` ships a `runSchedules({ runStore, scheduleStore })` callable from a tiny always-on worker. +3. **Platform adapters.** `@tanstack/workflow-cron-do` (Durable Object alarms), `@tanstack/workflow-cron-eventbridge`, etc. Same schedule definitions, different driver. + +### Other notes + +- **The engine doesn't need any changes for cron.** The `sleep` / `sleepUntil` primitives + `runWorkflow` start path are already enough. A scheduler is purely additive. +- **Overlap policies are worth getting right from day one.** Temporal's six-mode design is well-trodden; copy it. Common defaults: `skip` (don't fire while one is running) and `allow` (fan out, no coordination). +- **The Restate self-rescheduling pattern is interesting but probably not the primary model.** It forces every recurring workflow to know about its own cadence, which is the coupling every other design explicitly avoids. It could be a secondary pattern for use cases that genuinely want self-pacing (backoff loops, retry-with-decay). + +## Open questions + +- **Schedule definition storage.** Inline in code (Inngest / DBOS / Hatchet) vs in a database (Temporal). The code-as-source-of-truth model has stronger ergonomics; the DB model lets non-developers add schedules. Probably code-first with an escape hatch. +- **Catch-up policy.** If the driver was down for an hour and missed three ticks, do we run the missed ticks (catch-up) or skip (last-run-wins)? Temporal supports both; most others assume skip. Default skip; opt-in catch-up. +- **Time zones.** Cron `0 3 * * *` is "3am in whose time zone?" Inngest takes a `tz` option. Default UTC; explicit override. +- **Cron parser dependency.** Real cron expressions need a library (`cron-parser`, `croner`, etc.). Adds a small dep. Worth scoping: do we ship full cron syntax or a narrower interval API (`every: '24h'`)? + +## Status + +Research only. No package, no recipe, no test. Recommendation: defer until at least one durable storage adapter ships, then design `@tanstack/workflow-cron` against a real Postgres / DO store rather than the in-memory one. From 7f45a5589e3633f038ca75bd37a68dd2fc1f26a4 Mon Sep 17 00:00:00 2001 From: Tanner Linsley Date: Thu, 21 May 2026 20:21:14 -0600 Subject: [PATCH 2/2] docs: add scheduling recipes + external-cron example test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Concrete companion to research/SCHEDULING.md — shows the bring-your- own-scheduler pattern that's possible with what shipping today. docs/concepts/scheduling.md - "Mental model in one paragraph" — external scheduler + fresh workflow invocation per tick, never loop with sleep - Six recipes: node-cron, Cloudflare Worker cron, Vercel Cron Job, AWS EventBridge → Lambda, skip-overlap policy via deterministic runId, buffer-one policy via local queue - Sketch of a writing-your-own schedule store + worker packages/workflow-core/tests/examples.external-cron.test.ts - vitest-fake-timer-driven scheduler against the engine, three scenarios: - fresh runId per tick, multiple ticks finish independently - skip-overlap via deterministic runId + state check - buffer-one via local pending flag, third tick collapses into the buffer docs/config.json — adds Concepts → Scheduling nav entry. research/SCHEDULING.md — points at the new recipes + test under "Status" so the research and the user-facing material cross-link. Verification: tsc clean, eslint clean, 105 / 22 tests pass (+3 new), tsdown build clean. NOTE: PR CI is currently blocked by a GitHub Actions billing issue on the TanStack org — no jobs are starting. Push for visibility; will merge once billing resolves. --- docs/concepts/scheduling.md | 222 ++++++++++++++ docs/config.json | 4 + .../tests/examples.external-cron.test.ts | 270 ++++++++++++++++++ research/SCHEDULING.md | 4 +- 4 files changed, 499 insertions(+), 1 deletion(-) create mode 100644 docs/concepts/scheduling.md create mode 100644 packages/workflow-core/tests/examples.external-cron.test.ts diff --git a/docs/concepts/scheduling.md b/docs/concepts/scheduling.md new file mode 100644 index 0000000..e3ed888 --- /dev/null +++ b/docs/concepts/scheduling.md @@ -0,0 +1,222 @@ +# Scheduling and recurring runs + +`@tanstack/workflow-core` ships no cron primitive. The shape every mature workflow engine converges on — Inngest, Trigger.dev, Temporal, Hatchet, DBOS — is **external scheduler + fresh workflow invocation per tick**. Bring your own scheduler. The engine doesn't need to know. + +This page is recipes for that pattern. + +## The model in one paragraph + +You declare a normal workflow. Something outside the engine (cron daemon, EventBridge, Durable Object alarm, Vercel Cron Job, a `setInterval` in a worker, anything) fires on schedule. Each tick calls `runWorkflow({ workflow, input, runStore })` with fresh input. The workflow runs end-to-end and finishes; the next tick is a new `runId`. No "loop forever with sleep" — log doesn't grow, replay cost is constant, and "when's the next run?" is answerable from the scheduler, not from the engine. + +## Recipe: a node process with `node-cron` + +```ts +import cron from 'node-cron' +import { runWorkflow, inMemoryRunStore } from '@tanstack/workflow-core' +import { dailyReport } from './workflows/daily-report' + +const runStore = inMemoryRunStore() // swap for a durable adapter in prod + +// 09:00 every Monday in UTC +cron.schedule( + '0 9 * * MON', + async () => { + for await (const _ of runWorkflow({ + workflow: dailyReport, + input: { triggeredAt: Date.now() }, + runStore, + })) { + // events flow through here — forward to Redis, log, ignore + } + }, + { timezone: 'UTC' }, +) +``` + +The workflow body itself is a normal one-shot: + +```ts +export const dailyReport = createWorkflow({ + id: 'daily-report', + input: z.object({ triggeredAt: z.number() }), +}).handler(async (ctx) => { + const report = await ctx.step('gen', generateReport) + await ctx.step('email', () => emailReport(report)) + return { ranAt: ctx.input.triggeredAt, report: report.summary } +}) +``` + +## Recipe: Cloudflare Worker cron trigger + +```toml +# wrangler.toml +[[triggers.crons]] +cron = "0 9 * * MON" +``` + +```ts +// worker.ts +import { runWorkflow } from '@tanstack/workflow-core' +import { dailyReport } from './workflows/daily-report' +import { d1RunStore } from './storage' // hypothetical D1-backed store + +export default { + async scheduled(event: ScheduledEvent, env: Env) { + const runStore = d1RunStore(env.DB) + for await (const _ of runWorkflow({ + workflow: dailyReport, + input: { triggeredAt: event.scheduledTime }, + runStore, + })) { + /* forward / log */ + } + }, +} +``` + +## Recipe: Vercel Cron Job hitting a route + +```jsonc +// vercel.json +{ + "crons": [{ "path": "/api/cron/daily-report", "schedule": "0 9 * * MON" }] +} +``` + +```ts +// app/api/cron/daily-report/route.ts (App Router) or pages/api/cron/... +import { runWorkflow } from '@tanstack/workflow-core' +import { dailyReport } from '@/workflows/daily-report' +import { runStore } from '@/lib/run-store' + +export async function GET(req: Request) { + // Vercel signs the request; verify the secret before running + if (req.headers.get('authorization') !== `Bearer ${process.env.CRON_SECRET}`) { + return new Response('unauthorized', { status: 401 }) + } + for await (const _ of runWorkflow({ + workflow: dailyReport, + input: { triggeredAt: Date.now() }, + runStore, + })) { + /* … */ + } + return new Response('ok') +} +``` + +## Recipe: AWS EventBridge → Lambda + +```yaml +# serverless.yml (excerpt) +functions: + dailyReport: + handler: handlers/daily-report.handler + events: + - schedule: cron(0 9 ? * MON *) # EventBridge cron syntax — UTC +``` + +```ts +// handlers/daily-report.ts +import { runWorkflow } from '@tanstack/workflow-core' +import { dailyReport } from '../workflows/daily-report' +import { dynamoRunStore } from '../storage' + +export const handler = async () => { + for await (const _ of runWorkflow({ + workflow: dailyReport, + input: { triggeredAt: Date.now() }, + runStore: dynamoRunStore(), + })) { + /* … */ + } +} +``` + +## Recipe: skip-overlap policy + +Most schedulers (`cron`, EventBridge, Vercel) don't know whether the previous tick is still running. If you want "skip the new tick if the previous one is still in flight," gate on a marker in the run store before starting: + +```ts +async function tick() { + // Use a deterministic runId so concurrent ticks can't both create one. + const runId = `daily-report:${new Date().toISOString().slice(0, 10)}` // one per day + const existing = await runStore.getRunState(runId) + if (existing && existing.status !== 'finished' && existing.status !== 'errored') { + return // previous tick still running, skip + } + for await (const _ of runWorkflow({ + workflow: dailyReport, + runId, + input: { triggeredAt: Date.now() }, + runStore, + })) { + /* … */ + } +} +``` + +The engine's start-path idempotency check (same `runId` + same workflow fingerprint = attach-snapshot, not double-start) means a second-of-two concurrent calls degrades to read-only safely even if the gate above races. + +## Recipe: buffer-one policy + +If you want "if a tick fires while one is running, run it again as soon as the previous finishes," queue locally: + +```ts +let pending = false +let inFlight: Promise | null = null + +async function tick() { + if (inFlight) { + pending = true + return + } + inFlight = (async () => { + for await (const _ of runWorkflow({ workflow: dailyReport, input: { triggeredAt: Date.now() }, runStore })) { + /* … */ + } + })().finally(async () => { + inFlight = null + if (pending) { + pending = false + await tick() + } + }) + await inFlight +} +``` + +Holds at most one pending tick — extra ticks during a long run collapse into a single follow-up. + +## Recipe: writing your own schedule store + +For shops that don't have a host-managed cron, run a tiny worker that polls a schedule table: + +```ts +interface Schedule { + id: string + workflowId: string + cronExpr: string + nextFireAt: number + inputBuilder: () => unknown + overlapPolicy: 'skip' | 'buffer' | 'allow' +} + +async function tickAllSchedules(schedules: Array, runStore: RunStore) { + const now = Date.now() + for (const s of schedules) { + if (s.nextFireAt > now) continue + await fireOne(s, runStore) + s.nextFireAt = computeNext(s.cronExpr, now) // use a cron-parser lib + } +} + +// Long-running worker +setInterval(() => tickAllSchedules(schedules, runStore), 30_000) +``` + +A more durable version persists `nextFireAt` alongside each schedule definition; a deeper one elects a single leader to avoid duplicate ticks across instances. That whole layer is what a future `@tanstack/workflow-cron` package would provide — see [research/SCHEDULING.md](../../research/SCHEDULING.md) for the design sketch. + +## Test pattern + +The repo's `tests/examples.external-cron.test.ts` exercises this end-to-end with vitest fake timers — a deterministic scheduler driving multiple ticks against the engine, verifying each tick produces an independent run and that skip-overlap works. diff --git a/docs/config.json b/docs/config.json index 07231ac..8aaab42 100644 --- a/docs/config.json +++ b/docs/config.json @@ -37,6 +37,10 @@ { "label": "Replay and resume", "to": "concepts/replay-and-resume" + }, + { + "label": "Scheduling", + "to": "concepts/scheduling" } ] } diff --git a/packages/workflow-core/tests/examples.external-cron.test.ts b/packages/workflow-core/tests/examples.external-cron.test.ts new file mode 100644 index 0000000..db7b906 --- /dev/null +++ b/packages/workflow-core/tests/examples.external-cron.test.ts @@ -0,0 +1,270 @@ +/** + * External cron scheduling — what's possible with workflow-core today, + * with zero engine changes. The engine doesn't ship a cron primitive + * or a timer driver; the pattern every mature workflow library + * converges on is **external scheduler + fresh workflow invocation + * per tick**. This test exercises that pattern end-to-end. + * + * Three scenarios: + * 1. A bare scheduler fires the workflow once per "minute" (driven + * by vitest fake timers). Each tick produces an independent + * runId and finishes cleanly. + * 2. Skip-overlap policy via deterministic runId + state check. + * 3. Buffer-one policy — if a tick fires while one is running, + * queue one follow-up. + * + * See docs/concepts/scheduling.md for the user-facing recipes. + */ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { z } from 'zod' +import { createWorkflow, inMemoryRunStore, runWorkflow } from '../src' +import type { WorkflowEvent } from '../src' +import { collect } from './test-utils' + +// ============================================================ +// Shared workflow — a one-shot run per tick. No internal sleep, +// no recurrence inside the body. The scheduler decides cadence. +// ============================================================ + +function makeDailyReport(workFn: () => Promise<{ summary: string }>) { + return createWorkflow({ + id: 'daily-report', + input: z.object({ triggeredAt: z.number() }), + }).handler(async (ctx) => { + const report = await ctx.step('gen', workFn) + await ctx.step('email', () => ({ sent: true, summary: report.summary })) + return { ranAt: ctx.input.triggeredAt, summary: report.summary } + }) +} + +// ============================================================ +// Minimal scheduler — fires every `intervalMs`. In production +// this is `node-cron`, EventBridge, Cloudflare Cron, Durable +// Object alarms, etc. Here it's a setInterval that vitest's +// fake timers drive deterministically. +// ============================================================ + +interface SchedulerOptions { + intervalMs: number + onTick: () => Promise | void +} + +function startScheduler(options: SchedulerOptions): () => void { + const handle = setInterval(() => { + void Promise.resolve(options.onTick()).catch(() => { + /* swallow — production code routes to error tracker */ + }) + }, options.intervalMs) + return () => clearInterval(handle) +} + +// ============================================================ +// Tests +// ============================================================ + +describe('external cron scheduling', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + afterEach(() => { + vi.useRealTimers() + }) + + it('fires a fresh workflow run per tick, each with an independent runId', async () => { + const runStore = inMemoryRunStore() + const workflow = makeDailyReport(async () => ({ summary: 'all green' })) + const runIds: Array = [] + const finishedOutputs: Array = [] + + const stop = startScheduler({ + intervalMs: 60_000, // "every minute" + onTick: async () => { + const events: Array = [] + for await (const e of runWorkflow({ + workflow, + input: { triggeredAt: Date.now() }, + runStore, + })) { + events.push(e) + } + const started = events.find((e) => e.type === 'RUN_STARTED') + if (started) runIds.push((started as { runId: string }).runId) + const finished = events.find((e) => e.type === 'RUN_FINISHED') + if (finished) { + finishedOutputs.push((finished as { output: unknown }).output) + } + }, + }) + + // Advance fake time through three ticks. + await vi.advanceTimersByTimeAsync(60_000 * 3) + stop() + // Drain any microtasks the last tick scheduled. + await vi.runAllTimersAsync() + + expect(runIds).toHaveLength(3) + expect(new Set(runIds).size).toBe(3) // all distinct + expect(finishedOutputs).toHaveLength(3) + for (const out of finishedOutputs) { + expect(out).toMatchObject({ summary: 'all green' }) + } + }) + + it('skip-overlap policy: deterministic runId blocks a second tick while the first is running', async () => { + // The scheduler holds a deterministic runId per "day." If a tick + // fires while the previous run for the same id is still running + // or paused, we read the run state and bail. + const runStore = inMemoryRunStore() + let invocations = 0 + const workFn = vi.fn(async () => { + invocations++ + return { summary: `invocation-${invocations}` } + }) + const workflow = makeDailyReport(workFn) + + const runIdAttempts: Array = [] + const skippedAttempts: Array = [] + + async function tickWithSkipOverlap() { + // One run per "minute window" in this test — production code + // would use a day key, schedule id, etc. + const minuteKey = Math.floor(Date.now() / 60_000) + const runId = `daily-report:${minuteKey}` + runIdAttempts.push(runId) + + const existing = await runStore.getRunState(runId) + if ( + existing && + existing.status !== 'finished' && + existing.status !== 'errored' + ) { + skippedAttempts.push(runId) + return + } + await collect( + runWorkflow({ + workflow, + runId, + input: { triggeredAt: Date.now() }, + runStore, + }), + ) + } + + // Fire two ticks in quick succession (both within the same minute + // window → same runId). The second one should skip because the + // first is still resident. + // + // To simulate "first is still running" we fire them in parallel + // and let the runStore's idempotency-check on getRunState catch the + // second one — the in-memory store's deleteRun on finish runs + // synchronously enough that we need to interleave deliberately. + // + // Strategy: drive two ticks concurrently, then advance time so the + // first completes. Then a third tick (still in the same minute) + // sees a finished run and would NOT skip — but we want skip + // semantics for in-flight overlap. + // + // Easier: hold the first tick's `step` fn open by making it await + // a manually-resolved promise. + let releaseFirstStep: (() => void) | null = null + workFn.mockImplementationOnce(async () => { + invocations++ + await new Promise((r) => { + releaseFirstStep = r + }) + return { summary: 'first' } + }) + + const tick1 = tickWithSkipOverlap() + // Yield so tick1 progresses to inside the step fn and is "running". + await vi.advanceTimersByTimeAsync(0) + + // Fire tick2 — same minute window, first is still running. + const tick2 = tickWithSkipOverlap() + await vi.advanceTimersByTimeAsync(0) + + // Release the first tick. + releaseFirstStep!() + await tick1 + await tick2 + + expect(runIdAttempts).toHaveLength(2) + expect(runIdAttempts[0]).toBe(runIdAttempts[1]) // same minute key + expect(skippedAttempts).toHaveLength(1) + expect(invocations).toBe(1) // workFn ran exactly once + }) + + it('buffer-one policy: one extra tick during a long run becomes one follow-up', async () => { + const runStore = inMemoryRunStore() + const completedSummaries: Array = [] + let runOrder = 0 + + // Hold the first run open so subsequent ticks deterministically + // observe it as in-flight. + let releaseFirst: (() => void) | null = null + let firstStepStarted: (() => void) | null = null + const firstStepStartedPromise = new Promise((r) => { + firstStepStarted = r + }) + let firstCallHandled = false + + const heldWorkFn = vi.fn(async () => { + const tag = `run-${++runOrder}` + if (!firstCallHandled) { + firstCallHandled = true + firstStepStarted!() + await new Promise((r) => { + releaseFirst = r + }) + } + return { summary: tag } + }) + const workflow = makeDailyReport(heldWorkFn) + + let pending = false + let inFlight: Promise | null = null + + const tick = async () => { + if (inFlight) { + pending = true + return + } + const promise = (async () => { + const events = await collect( + runWorkflow({ + workflow, + input: { triggeredAt: Date.now() }, + runStore, + }), + ) + const out = events.find((e) => e.type === 'RUN_FINISHED') as + | { output: { summary: string } } + | undefined + if (out) completedSummaries.push(out.output.summary) + })() + inFlight = promise.finally(async () => { + inFlight = null + if (pending) { + pending = false + await tick() + } + }) + await inFlight + } + + // Fire three ticks while the first is held; only one should buffer. + const t1 = tick() + await firstStepStartedPromise + const t2 = tick() // buffered as pending=true + const t3 = tick() // collapses into existing pending (no extra) + + // Release the first; the buffered tick runs. + releaseFirst!() + await Promise.all([t1, t2, t3]) + + // Two runs total: the held one + one buffered follow-up. The third + // tick collapsed into the buffer. + expect(completedSummaries).toHaveLength(2) + }) +}) diff --git a/research/SCHEDULING.md b/research/SCHEDULING.md index ddeea70..bcfb7b7 100644 --- a/research/SCHEDULING.md +++ b/research/SCHEDULING.md @@ -90,4 +90,6 @@ Two pieces: ## Status -Research only. No package, no recipe, no test. Recommendation: defer until at least one durable storage adapter ships, then design `@tanstack/workflow-cron` against a real Postgres / DO store rather than the in-memory one. +Research and recipes. See [docs/concepts/scheduling.md](../docs/concepts/scheduling.md) for the user-facing recipes that exercise the "bring your own scheduler" pattern with what's shipping today, and [packages/workflow-core/tests/examples.external-cron.test.ts](../packages/workflow-core/tests/examples.external-cron.test.ts) for a vitest-driven test that proves the pattern works end-to-end (single-tick, skip-overlap, buffer-one). + +No `@tanstack/workflow-cron` package yet. Recommendation: defer until at least one durable storage adapter ships, then design against a real Postgres / DO store rather than the in-memory one.