Skip to content

Commit 4fd9b56

Browse files
TooTallNateadriandlam
authored andcommitted
Add support for closure scope vars in step functions (#358)
### What changed? - Enhanced the SWC plugin to detect and collect closure variables from nested step functions - Modified the workflow runtime to pass closure variables to step functions during execution - Updated the serialization/deserialization logic to handle the new closure variable format - Added a mechanism to access closure variables within step functions - Added tests to verify closure variable functionality in nested step functions - Closure variables are stored on the step function execution's `AsyncLocalStorage`​ context ### Example ```typescript export async function myWorkflow(baseValue: number) { 'use workflow'; const multiplier = 3; const prefix = 'Result: '; const calculate = async () => { 'use step'; const result = baseValue * multiplier; return `${prefix}${result}`; }; return await calculate(); } ``` ### Why make this change? Previously, nested step functions couldn't access variables from their parent workflow scope, limiting their usefulness and requiring workarounds like passing all needed values as parameters. This change enables a more natural programming model where step functions can access variables from their enclosing scope, making workflow code more intuitive and reducing the need for explicit parameter passing.
1 parent d06b485 commit 4fd9b56

File tree

20 files changed

+1237
-167
lines changed

20 files changed

+1237
-167
lines changed

.changeset/cruel-corners-feel.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@workflow/web-shared": patch
3+
"@workflow/swc-plugin": patch
4+
"@workflow/world": patch
5+
"@workflow/core": patch
6+
---
7+
8+
Add support for closure scope vars in step functions

packages/core/e2e/e2e.test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,4 +737,18 @@ describe('e2e', () => {
737737
expect(stepCompletedEvents).toHaveLength(1);
738738
}
739739
);
740+
741+
test(
742+
'closureVariableWorkflow - nested step functions with closure variables',
743+
{ timeout: 60_000 },
744+
async () => {
745+
// This workflow uses a nested step function that references closure variables
746+
// from the parent workflow scope (multiplier, prefix, baseValue)
747+
const run = await triggerWorkflow('closureVariableWorkflow', [7]);
748+
const returnValue = await getWorkflowReturnValue(run.runId);
749+
750+
// Expected: baseValue (7) * multiplier (3) = 21, prefixed with "Result: "
751+
expect(returnValue).toBe('Result: 21');
752+
}
753+
);
740754
});

packages/core/src/global.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export interface StepInvocationQueueItem {
55
correlationId: string;
66
stepName: string;
77
args: Serializable[];
8+
closureVars?: Record<string, Serializable>;
89
}
910

1011
export interface HookInvocationQueueItem {

packages/core/src/private.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ export function getStepFunction(stepId: string): StepFunction | undefined {
2929
return registeredSteps.get(stepId);
3030
}
3131

32+
/**
33+
* Get closure variables for the current step function
34+
* @internal
35+
*/
36+
export { __private_getClosureVars } from './step/get-closure-vars.js';
37+
3238
export interface WorkflowOrchestratorContext {
3339
globalThis: typeof globalThis;
3440
eventsConsumer: EventsConsumer;

packages/core/src/runtime.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -389,16 +389,19 @@ export function workflowEntrypoint(workflowCode: string) {
389389
if (queueItem.type === 'step') {
390390
// Handle step operations
391391
const ops: Promise<void>[] = [];
392-
const dehydratedArgs = dehydrateStepArguments(
393-
queueItem.args,
392+
const dehydratedInput = dehydrateStepArguments(
393+
{
394+
args: queueItem.args,
395+
closureVars: queueItem.closureVars,
396+
},
394397
err.globalThis
395398
);
396399

397400
try {
398401
const step = await world.steps.create(runId, {
399402
stepId: queueItem.correlationId,
400403
stepName: queueItem.stepName,
401-
input: dehydratedArgs as Serializable[],
404+
input: dehydratedInput as Serializable,
402405
});
403406

404407
waitUntil(
@@ -678,9 +681,15 @@ export const stepEntrypoint =
678681
`Step "${stepId}" has no "startedAt" timestamp`
679682
);
680683
}
681-
// Hydrate the step input arguments
684+
// Hydrate the step input arguments and closure variables
682685
const ops: Promise<void>[] = [];
683-
const args = hydrateStepArguments(step.input, ops, workflowRunId);
686+
const hydratedInput = hydrateStepArguments(
687+
step.input,
688+
ops,
689+
workflowRunId
690+
);
691+
692+
const args = hydratedInput.args;
684693

685694
span?.setAttributes({
686695
...Attribute.StepArgumentsCount(args.length),
@@ -703,8 +712,9 @@ export const stepEntrypoint =
703712
: `http://localhost:${port ?? 3000}`,
704713
},
705714
ops,
715+
closureVars: hydratedInput.closureVars,
706716
},
707-
() => stepFn(...args)
717+
() => stepFn.apply(null, args)
708718
);
709719

710720
// NOTE: None of the code from this point is guaranteed to run

packages/core/src/step.test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,75 @@ describe('createUseStep', () => {
199199
await myStepFunction();
200200
expect(ctx.onWorkflowError).not.toHaveBeenCalled();
201201
});
202+
203+
it('should capture closure variables when provided', async () => {
204+
const ctx = setupWorkflowContext([
205+
{
206+
eventId: 'evnt_0',
207+
runId: 'wrun_123',
208+
eventType: 'step_completed',
209+
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
210+
eventData: {
211+
result: ['Result: 42'],
212+
},
213+
createdAt: new Date(),
214+
},
215+
]);
216+
217+
const useStep = createUseStep(ctx);
218+
const count = 42;
219+
const prefix = 'Result: ';
220+
221+
// Create step with closure variables function
222+
const calculate = useStep('calculate', () => ({ count, prefix }));
223+
224+
// Call the step
225+
const result = await calculate();
226+
227+
// Verify result
228+
expect(result).toBe('Result: 42');
229+
230+
// Verify closure variables were added to invocation queue
231+
expect(ctx.invocationsQueue).toHaveLength(1);
232+
expect(ctx.invocationsQueue[0]).toMatchObject({
233+
type: 'step',
234+
stepName: 'calculate',
235+
args: [],
236+
closureVars: { count: 42, prefix: 'Result: ' },
237+
});
238+
});
239+
240+
it('should handle empty closure variables', async () => {
241+
const ctx = setupWorkflowContext([
242+
{
243+
eventId: 'evnt_0',
244+
runId: 'wrun_123',
245+
eventType: 'step_completed',
246+
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
247+
eventData: {
248+
result: [5],
249+
},
250+
createdAt: new Date(),
251+
},
252+
]);
253+
254+
const useStep = createUseStep(ctx);
255+
256+
// Create step without closure variables
257+
const add = useStep('add');
258+
259+
// Call the step
260+
const result = await add(2, 3);
261+
262+
// Verify result
263+
expect(result).toBe(5);
264+
265+
// Verify empty closure variables were added to invocation queue
266+
expect(ctx.invocationsQueue).toHaveLength(1);
267+
expect(ctx.invocationsQueue[0]).toMatchObject({
268+
type: 'step',
269+
stepName: 'add',
270+
args: [2, 3],
271+
});
272+
});
202273
});

packages/core/src/step.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,36 @@
11
import { FatalError, WorkflowRuntimeError } from '@workflow/errors';
22
import { withResolvers } from '@workflow/utils';
33
import { EventConsumerResult } from './events-consumer.js';
4-
import { WorkflowSuspension } from './global.js';
4+
import { type StepInvocationQueueItem, WorkflowSuspension } from './global.js';
55
import { stepLogger } from './logger.js';
66
import type { WorkflowOrchestratorContext } from './private.js';
77
import type { Serializable } from './schemas.js';
88
import { hydrateStepReturnValue } from './serialization.js';
99

1010
export function createUseStep(ctx: WorkflowOrchestratorContext) {
1111
return function useStep<Args extends Serializable[], Result>(
12-
stepName: string
12+
stepName: string,
13+
closureVarsFn?: () => Record<string, Serializable>
1314
) {
1415
const stepFunction = (...args: Args): Promise<Result> => {
1516
const { promise, resolve, reject } = withResolvers<Result>();
1617

1718
const correlationId = `step_${ctx.generateUlid()}`;
18-
ctx.invocationsQueue.push({
19+
20+
const queueItem: StepInvocationQueueItem = {
1921
type: 'step',
2022
correlationId,
2123
stepName,
2224
args,
23-
});
25+
};
26+
27+
// Invoke the closure variables function to get the closure scope
28+
const closureVars = closureVarsFn?.();
29+
if (closureVars) {
30+
queueItem.closureVars = closureVars;
31+
}
32+
33+
ctx.invocationsQueue.push(queueItem);
2434

2535
// Track whether we've already seen a "step_started" event for this step.
2636
// This is important because after a retryable failure, the step moves back to

packages/core/src/step/context-storage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ export const contextStorage = /* @__PURE__ */ new AsyncLocalStorage<{
66
stepMetadata: StepMetadata;
77
workflowMetadata: WorkflowMetadata;
88
ops: Promise<void>[];
9+
closureVars?: Record<string, any>;
910
}>();
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { contextStorage } from './context-storage.js';
2+
3+
/**
4+
* Returns the closure variables for the current step function.
5+
* This is an internal function used by the SWC transform to access
6+
* variables from the parent workflow scope.
7+
*
8+
* @internal
9+
*/
10+
export function __private_getClosureVars(): Record<string, any> {
11+
const ctx = contextStorage.getStore();
12+
if (!ctx) {
13+
throw new Error(
14+
'Closure variables can only be accessed inside a step function'
15+
);
16+
}
17+
return ctx.closureVars || {};
18+
}

packages/core/src/workflow.test.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2304,4 +2304,96 @@ describe('runWorkflow', () => {
23042304
);
23052305
});
23062306
});
2307+
2308+
describe('closure variables', () => {
2309+
it('should serialize and deserialize closure variables for nested step functions', async () => {
2310+
let error: Error | undefined;
2311+
try {
2312+
const ops: Promise<any>[] = [];
2313+
const workflowRun: WorkflowRun = {
2314+
runId: 'test-run-123',
2315+
workflowName: 'workflow',
2316+
status: 'running',
2317+
input: dehydrateWorkflowArguments([], ops),
2318+
createdAt: new Date('2024-01-01T00:00:00.000Z'),
2319+
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
2320+
startedAt: new Date('2024-01-01T00:00:00.000Z'),
2321+
deploymentId: 'test-deployment',
2322+
};
2323+
2324+
const events: Event[] = [];
2325+
2326+
await runWorkflow(
2327+
`const useStep = globalThis[Symbol.for("WORKFLOW_USE_STEP")];
2328+
async function workflow() {
2329+
const multiplier = 3;
2330+
const prefix = 'Result: ';
2331+
const calculate = useStep('step//input.js//_anonymousStep0', () => ({ multiplier, prefix }));
2332+
const result = await calculate(7);
2333+
return result;
2334+
}${getWorkflowTransformCode('workflow')}`,
2335+
workflowRun,
2336+
events
2337+
);
2338+
} catch (err) {
2339+
error = err as Error;
2340+
}
2341+
2342+
// Should suspend to create the step
2343+
assert(error);
2344+
expect(error.name).toEqual('WorkflowSuspension');
2345+
expect((error as WorkflowSuspension).steps).toHaveLength(1);
2346+
2347+
const step = (error as WorkflowSuspension).steps[0];
2348+
expect(step).toMatchObject({
2349+
type: 'step',
2350+
stepName: 'step//input.js//_anonymousStep0',
2351+
args: [7],
2352+
closureVars: { multiplier: 3, prefix: 'Result: ' },
2353+
});
2354+
});
2355+
2356+
it('should handle step functions without closure variables', async () => {
2357+
let error: Error | undefined;
2358+
try {
2359+
const ops: Promise<any>[] = [];
2360+
const workflowRun: WorkflowRun = {
2361+
runId: 'test-run-123',
2362+
workflowName: 'workflow',
2363+
status: 'running',
2364+
input: dehydrateWorkflowArguments([], ops),
2365+
createdAt: new Date('2024-01-01T00:00:00.000Z'),
2366+
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
2367+
startedAt: new Date('2024-01-01T00:00:00.000Z'),
2368+
deploymentId: 'test-deployment',
2369+
};
2370+
2371+
const events: Event[] = [];
2372+
2373+
await runWorkflow(
2374+
`const add = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("add");
2375+
async function workflow() {
2376+
const result = await add(5, 10);
2377+
return result;
2378+
}${getWorkflowTransformCode('workflow')}`,
2379+
workflowRun,
2380+
events
2381+
);
2382+
} catch (err) {
2383+
error = err as Error;
2384+
}
2385+
2386+
// Should suspend to create the step
2387+
assert(error);
2388+
expect(error.name).toEqual('WorkflowSuspension');
2389+
expect((error as WorkflowSuspension).steps).toHaveLength(1);
2390+
2391+
const step = (error as WorkflowSuspension).steps[0];
2392+
expect(step).toMatchObject({
2393+
type: 'step',
2394+
stepName: 'add',
2395+
args: [5, 10],
2396+
});
2397+
});
2398+
});
23072399
});

0 commit comments

Comments
 (0)