Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/cruel-corners-feel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/web-shared": patch
"@workflow/swc-plugin": patch
"@workflow/world": patch
"@workflow/core": patch
---

Add support for closure scope vars in step functions
14 changes: 14 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -737,4 +737,18 @@ describe('e2e', () => {
expect(stepCompletedEvents).toHaveLength(1);
}
);

test(
'closureVariableWorkflow - nested step functions with closure variables',
{ timeout: 60_000 },
async () => {
// This workflow uses a nested step function that references closure variables
// from the parent workflow scope (multiplier, prefix, baseValue)
const run = await triggerWorkflow('closureVariableWorkflow', [7]);
const returnValue = await getWorkflowReturnValue(run.runId);

// Expected: baseValue (7) * multiplier (3) = 21, prefixed with "Result: "
expect(returnValue).toBe('Result: 21');
}
);
});
1 change: 1 addition & 0 deletions packages/core/src/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export interface StepInvocationQueueItem {
correlationId: string;
stepName: string;
args: Serializable[];
closureVars?: Record<string, Serializable>;
}

export interface HookInvocationQueueItem {
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ export function getStepFunction(stepId: string): StepFunction | undefined {
return registeredSteps.get(stepId);
}

/**
* Get closure variables for the current step function
* @internal
*/
export { __private_getClosureVars } from './step/get-closure-vars.js';

export interface WorkflowOrchestratorContext {
globalThis: typeof globalThis;
eventsConsumer: EventsConsumer;
Expand Down
22 changes: 16 additions & 6 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,16 +389,19 @@ export function workflowEntrypoint(workflowCode: string) {
if (queueItem.type === 'step') {
// Handle step operations
const ops: Promise<void>[] = [];
const dehydratedArgs = dehydrateStepArguments(
queueItem.args,
const dehydratedInput = dehydrateStepArguments(
{
args: queueItem.args,
closureVars: queueItem.closureVars,
},
err.globalThis
);

try {
const step = await world.steps.create(runId, {
stepId: queueItem.correlationId,
stepName: queueItem.stepName,
input: dehydratedArgs as Serializable[],
input: dehydratedInput as Serializable,
});

waitUntil(
Expand Down Expand Up @@ -678,9 +681,15 @@ export const stepEntrypoint =
`Step "${stepId}" has no "startedAt" timestamp`
);
}
// Hydrate the step input arguments
// Hydrate the step input arguments and closure variables
const ops: Promise<void>[] = [];
const args = hydrateStepArguments(step.input, ops, workflowRunId);
const hydratedInput = hydrateStepArguments(
step.input,
ops,
workflowRunId
);

const args = hydratedInput.args;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing backward compatibility handling for step input format. The code assumes hydratedInput has an args property, but old steps created before this change will have step.input as a serialized array, causing a runtime error when trying to access .args.

View Details
📝 Patch Details
diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts
index a788e48..e57e3ba 100644
--- a/packages/core/src/runtime.ts
+++ b/packages/core/src/runtime.ts
@@ -681,7 +681,9 @@ export const stepEntrypoint =
               workflowRunId
             );
 
-            const args = hydratedInput.args;
+            // Handle both new format { args, closureVars } and legacy format (just args array)
+            const args = 'args' in hydratedInput ? hydratedInput.args : hydratedInput;
+            const closureVars = 'closureVars' in hydratedInput ? hydratedInput.closureVars : undefined;
 
             span?.setAttributes({
               ...Attribute.StepArgumentsCount(args.length),
@@ -704,7 +706,7 @@ export const stepEntrypoint =
                     : `http://localhost:${port ?? 3000}`,
                 },
                 ops,
-                closureVars: hydratedInput.closureVars,
+                closureVars,
               },
               () => stepFn.apply(null, args)
             );

Analysis

Missing backward compatibility handling for step input format in stepEntrypoint

What fails: stepEntrypoint() in runtime.ts lines 684 and 707 assume hydratedInput has args and closureVars properties. When old steps created before the closureVars feature was added are replayed, they have step.input as a serialized array (not wrapped in an object). Upon hydration, this returns the array directly, and accessing .args on an array returns undefined. This causes line 686 args.length to throw: TypeError: Cannot read properties of undefined (reading 'length')

How to reproduce:

  1. A step was created and stored with the old format: step.input = dehydrateStepArguments([arg1, arg2], globalThis) (just an array)
  2. Replay/resume the workflow run after the code was updated to wrap inputs in { args, closureVars }
  3. stepEntrypoint calls hydrateStepArguments(step.input, ...)
  4. Since the stored input is a plain array, hydration returns the array directly
  5. Code attempts const args = hydratedInput.args which is undefined on an array
  6. Code tries args.length on undefined, throwing TypeError

Expected behavior: Code should handle both:

  • New format: { args: [...], closureVars: {...} }
  • Legacy format: plain array [...]

This pattern is already implemented in observability.ts line 63: 'args' in hydratedInput ? hydratedInput.args : hydratedInput

Fix applied: Added defensive checks that mirror the observability.ts pattern:

const args = 'args' in hydratedInput ? hydratedInput.args : hydratedInput;
const closureVars = 'closureVars' in hydratedInput ? hydratedInput.closureVars : undefined;

This ensures backward compatibility with steps stored in the old format while supporting the new closureVars feature.


span?.setAttributes({
...Attribute.StepArgumentsCount(args.length),
Expand All @@ -703,8 +712,9 @@ export const stepEntrypoint =
: `http://localhost:${port ?? 3000}`,
},
ops,
closureVars: hydratedInput.closureVars,
},
() => stepFn(...args)
() => stepFn.apply(null, args)
);

// NOTE: None of the code from this point is guaranteed to run
Expand Down
71 changes: 71 additions & 0 deletions packages/core/src/step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,75 @@ describe('createUseStep', () => {
await myStepFunction();
expect(ctx.onWorkflowError).not.toHaveBeenCalled();
});

it('should capture closure variables when provided', async () => {
const ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_123',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: {
result: ['Result: 42'],
},
createdAt: new Date(),
},
]);

const useStep = createUseStep(ctx);
const count = 42;
const prefix = 'Result: ';

// Create step with closure variables function
const calculate = useStep('calculate', () => ({ count, prefix }));

// Call the step
const result = await calculate();

// Verify result
expect(result).toBe('Result: 42');

// Verify closure variables were added to invocation queue
expect(ctx.invocationsQueue).toHaveLength(1);
expect(ctx.invocationsQueue[0]).toMatchObject({
type: 'step',
stepName: 'calculate',
args: [],
closureVars: { count: 42, prefix: 'Result: ' },
});
});

it('should handle empty closure variables', async () => {
const ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_123',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: {
result: [5],
},
createdAt: new Date(),
},
]);

const useStep = createUseStep(ctx);

// Create step without closure variables
const add = useStep('add');

// Call the step
const result = await add(2, 3);

// Verify result
expect(result).toBe(5);

// Verify empty closure variables were added to invocation queue
expect(ctx.invocationsQueue).toHaveLength(1);
expect(ctx.invocationsQueue[0]).toMatchObject({
type: 'step',
stepName: 'add',
args: [2, 3],
});
});
});
18 changes: 14 additions & 4 deletions packages/core/src/step.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,36 @@
import { FatalError, WorkflowRuntimeError } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import { EventConsumerResult } from './events-consumer.js';
import { WorkflowSuspension } from './global.js';
import { type StepInvocationQueueItem, WorkflowSuspension } from './global.js';
import { stepLogger } from './logger.js';
import type { WorkflowOrchestratorContext } from './private.js';
import type { Serializable } from './schemas.js';
import { hydrateStepReturnValue } from './serialization.js';

export function createUseStep(ctx: WorkflowOrchestratorContext) {
return function useStep<Args extends Serializable[], Result>(
stepName: string
stepName: string,
closureVarsFn?: () => Record<string, Serializable>
) {
const stepFunction = (...args: Args): Promise<Result> => {
const { promise, resolve, reject } = withResolvers<Result>();

const correlationId = `step_${ctx.generateUlid()}`;
ctx.invocationsQueue.push({

const queueItem: StepInvocationQueueItem = {
type: 'step',
correlationId,
stepName,
args,
});
};

// Invoke the closure variables function to get the closure scope
const closureVars = closureVarsFn?.();
if (closureVars) {
queueItem.closureVars = closureVars;
}

ctx.invocationsQueue.push(queueItem);

// Track whether we've already seen a "step_started" event for this step.
// This is important because after a retryable failure, the step moves back to
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/step/context-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ export const contextStorage = /* @__PURE__ */ new AsyncLocalStorage<{
stepMetadata: StepMetadata;
workflowMetadata: WorkflowMetadata;
ops: Promise<void>[];
closureVars?: Record<string, any>;
}>();
18 changes: 18 additions & 0 deletions packages/core/src/step/get-closure-vars.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { contextStorage } from './context-storage.js';

/**
* Returns the closure variables for the current step function.
* This is an internal function used by the SWC transform to access
* variables from the parent workflow scope.
*
* @internal
*/
export function __private_getClosureVars(): Record<string, any> {
const ctx = contextStorage.getStore();
if (!ctx) {
throw new Error(
'Closure variables can only be accessed inside a step function'
);
}
return ctx.closureVars || {};
}
92 changes: 92 additions & 0 deletions packages/core/src/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2304,4 +2304,96 @@ describe('runWorkflow', () => {
);
});
});

describe('closure variables', () => {
it('should serialize and deserialize closure variables for nested step functions', async () => {
let error: Error | undefined;
try {
const ops: Promise<any>[] = [];
const workflowRun: WorkflowRun = {
runId: 'test-run-123',
workflowName: 'workflow',
status: 'running',
input: dehydrateWorkflowArguments([], ops),
createdAt: new Date('2024-01-01T00:00:00.000Z'),
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
startedAt: new Date('2024-01-01T00:00:00.000Z'),
deploymentId: 'test-deployment',
};

const events: Event[] = [];

await runWorkflow(
`const useStep = globalThis[Symbol.for("WORKFLOW_USE_STEP")];
async function workflow() {
const multiplier = 3;
const prefix = 'Result: ';
const calculate = useStep('step//input.js//_anonymousStep0', () => ({ multiplier, prefix }));
const result = await calculate(7);
return result;
}${getWorkflowTransformCode('workflow')}`,
workflowRun,
events
);
} catch (err) {
error = err as Error;
}

// Should suspend to create the step
assert(error);
expect(error.name).toEqual('WorkflowSuspension');
expect((error as WorkflowSuspension).steps).toHaveLength(1);

const step = (error as WorkflowSuspension).steps[0];
expect(step).toMatchObject({
type: 'step',
stepName: 'step//input.js//_anonymousStep0',
args: [7],
closureVars: { multiplier: 3, prefix: 'Result: ' },
});
});

it('should handle step functions without closure variables', async () => {
let error: Error | undefined;
try {
const ops: Promise<any>[] = [];
const workflowRun: WorkflowRun = {
runId: 'test-run-123',
workflowName: 'workflow',
status: 'running',
input: dehydrateWorkflowArguments([], ops),
createdAt: new Date('2024-01-01T00:00:00.000Z'),
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
startedAt: new Date('2024-01-01T00:00:00.000Z'),
deploymentId: 'test-deployment',
};

const events: Event[] = [];

await runWorkflow(
`const add = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("add");
async function workflow() {
const result = await add(5, 10);
return result;
}${getWorkflowTransformCode('workflow')}`,
workflowRun,
events
);
} catch (err) {
error = err as Error;
}

// Should suspend to create the step
assert(error);
expect(error.name).toEqual('WorkflowSuspension');
expect((error as WorkflowSuspension).steps).toHaveLength(1);

const step = (error as WorkflowSuspension).steps[0];
expect(step).toMatchObject({
type: 'step',
stepName: 'add',
args: [5, 10],
});
});
});
});
Loading
Loading