Skip to content

Commit 8cca401

Browse files
committed
WIP closure scope vars in step functions
1 parent aa5ff3c commit 8cca401

File tree

20 files changed

+1218
-177
lines changed

20 files changed

+1218
-177
lines changed

.changeset/cruel-corners-feel.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@workflow/swc-plugin": patch
3+
"@workflow/world": patch
4+
"@workflow/core": patch
5+
---
6+
7+
Add support for closure scope vars in step functions

packages/core/e2e/e2e.test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,4 +737,18 @@ describe('e2e', () => {
737737
expect(stepCompletedEvents).toHaveLength(1);
738738
}
739739
);
740+
741+
test(
742+
'closureVariableWorkflow - nested step functions with closure variables',
743+
{ timeout: 60_000 },
744+
async () => {
745+
// This workflow uses a nested step function that references closure variables
746+
// from the parent workflow scope (multiplier, prefix, baseValue)
747+
const run = await triggerWorkflow('closureVariableWorkflow', [7]);
748+
const returnValue = await getWorkflowReturnValue(run.runId);
749+
750+
// Expected: baseValue (7) * multiplier (3) = 21, prefixed with "Result: "
751+
expect(returnValue).toBe('Result: 21');
752+
}
753+
);
740754
});

packages/core/src/global.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export interface StepInvocationQueueItem {
55
correlationId: string;
66
stepName: string;
77
args: Serializable[];
8+
closureVars?: Record<string, Serializable>;
89
}
910

1011
export interface HookInvocationQueueItem {

packages/core/src/observability.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,20 @@ const hydrateStepIO = <
4949
>(
5050
step: T
5151
): T => {
52+
const hydratedInput = step.input
53+
? hydrateStepArguments(
54+
step.input,
55+
[],
56+
step.runId as string,
57+
globalThis,
58+
streamPrintRevivers
59+
)
60+
: step.input;
61+
5262
return {
5363
...step,
54-
input:
55-
step.input && Array.isArray(step.input) && step.input.length
56-
? hydrateStepArguments(
57-
step.input,
58-
[],
59-
step.runId as string,
60-
globalThis,
61-
streamPrintRevivers
62-
)
63-
: step.input,
64+
input: 'args' in hydratedInput ? hydratedInput.args : hydratedInput,
65+
// TODO: should we also render the closure vars for o11y?
6466
output: step.output
6567
? hydrateStepReturnValue(step.output, globalThis, streamPrintRevivers)
6668
: step.output,

packages/core/src/private.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ export function getStepFunction(stepId: string): StepFunction | undefined {
2929
return registeredSteps.get(stepId);
3030
}
3131

32+
/**
33+
* Get closure variables for the current step function
34+
* @internal
35+
*/
36+
export { __private_getClosureVars } from './step/get-closure-vars.js';
37+
3238
export interface WorkflowOrchestratorContext {
3339
globalThis: typeof globalThis;
3440
eventsConsumer: EventsConsumer;

packages/core/src/runtime.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -389,16 +389,19 @@ export function workflowEntrypoint(workflowCode: string) {
389389
if (queueItem.type === 'step') {
390390
// Handle step operations
391391
const ops: Promise<void>[] = [];
392-
const dehydratedArgs = dehydrateStepArguments(
393-
queueItem.args,
392+
const dehydratedInput = dehydrateStepArguments(
393+
{
394+
args: queueItem.args,
395+
closureVars: queueItem.closureVars,
396+
},
394397
err.globalThis
395398
);
396399

397400
try {
398401
const step = await world.steps.create(runId, {
399402
stepId: queueItem.correlationId,
400403
stepName: queueItem.stepName,
401-
input: dehydratedArgs as Serializable[],
404+
input: dehydratedInput as Serializable,
402405
});
403406

404407
waitUntil(
@@ -678,9 +681,15 @@ export const stepEntrypoint =
678681
`Step "${stepId}" has no "startedAt" timestamp`
679682
);
680683
}
681-
// Hydrate the step input arguments
684+
// Hydrate the step input arguments and closure variables
682685
const ops: Promise<void>[] = [];
683-
const args = hydrateStepArguments(step.input, ops, workflowRunId);
686+
const hydratedInput = hydrateStepArguments(
687+
step.input,
688+
ops,
689+
workflowRunId
690+
);
691+
692+
const args = hydratedInput.args;
684693

685694
span?.setAttributes({
686695
...Attribute.StepArgumentsCount(args.length),
@@ -703,8 +712,9 @@ export const stepEntrypoint =
703712
: `http://localhost:${port ?? 3000}`,
704713
},
705714
ops,
715+
closureVars: hydratedInput.closureVars,
706716
},
707-
() => stepFn(...args)
717+
() => stepFn.apply(null, args)
708718
);
709719

710720
// NOTE: None of the code from this point is guaranteed to run

packages/core/src/step.test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,75 @@ describe('createUseStep', () => {
199199
await myStepFunction();
200200
expect(ctx.onWorkflowError).not.toHaveBeenCalled();
201201
});
202+
203+
it('should capture closure variables when provided', async () => {
204+
const ctx = setupWorkflowContext([
205+
{
206+
eventId: 'evnt_0',
207+
runId: 'wrun_123',
208+
eventType: 'step_completed',
209+
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
210+
eventData: {
211+
result: ['Result: 42'],
212+
},
213+
createdAt: new Date(),
214+
},
215+
]);
216+
217+
const useStep = createUseStep(ctx);
218+
const count = 42;
219+
const prefix = 'Result: ';
220+
221+
// Create step with closure variables function
222+
const calculate = useStep('calculate', () => ({ count, prefix }));
223+
224+
// Call the step
225+
const result = await calculate();
226+
227+
// Verify result
228+
expect(result).toBe('Result: 42');
229+
230+
// Verify closure variables were added to invocation queue
231+
expect(ctx.invocationsQueue).toHaveLength(1);
232+
expect(ctx.invocationsQueue[0]).toMatchObject({
233+
type: 'step',
234+
stepName: 'calculate',
235+
args: [],
236+
closureVars: { count: 42, prefix: 'Result: ' },
237+
});
238+
});
239+
240+
it('should handle empty closure variables', async () => {
241+
const ctx = setupWorkflowContext([
242+
{
243+
eventId: 'evnt_0',
244+
runId: 'wrun_123',
245+
eventType: 'step_completed',
246+
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
247+
eventData: {
248+
result: [5],
249+
},
250+
createdAt: new Date(),
251+
},
252+
]);
253+
254+
const useStep = createUseStep(ctx);
255+
256+
// Create step without closure variables
257+
const add = useStep('add');
258+
259+
// Call the step
260+
const result = await add(2, 3);
261+
262+
// Verify result
263+
expect(result).toBe(5);
264+
265+
// Verify empty closure variables were added to invocation queue
266+
expect(ctx.invocationsQueue).toHaveLength(1);
267+
expect(ctx.invocationsQueue[0]).toMatchObject({
268+
type: 'step',
269+
stepName: 'add',
270+
args: [2, 3],
271+
});
272+
});
202273
});

packages/core/src/step.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,36 @@
11
import { FatalError, WorkflowRuntimeError } from '@workflow/errors';
22
import { withResolvers } from '@workflow/utils';
33
import { EventConsumerResult } from './events-consumer.js';
4-
import { WorkflowSuspension } from './global.js';
4+
import { type StepInvocationQueueItem, WorkflowSuspension } from './global.js';
55
import { stepLogger } from './logger.js';
66
import type { WorkflowOrchestratorContext } from './private.js';
77
import type { Serializable } from './schemas.js';
88
import { hydrateStepReturnValue } from './serialization.js';
99

1010
export function createUseStep(ctx: WorkflowOrchestratorContext) {
1111
return function useStep<Args extends Serializable[], Result>(
12-
stepName: string
12+
stepName: string,
13+
closureVarsFn?: () => Record<string, Serializable>
1314
) {
1415
const stepFunction = (...args: Args): Promise<Result> => {
1516
const { promise, resolve, reject } = withResolvers<Result>();
1617

1718
const correlationId = `step_${ctx.generateUlid()}`;
18-
ctx.invocationsQueue.push({
19+
20+
const queueItem: StepInvocationQueueItem = {
1921
type: 'step',
2022
correlationId,
2123
stepName,
2224
args,
23-
});
25+
};
26+
27+
// Invoke the closure variables function to get the closure scope
28+
const closureVars = closureVarsFn?.();
29+
if (closureVars) {
30+
queueItem.closureVars = closureVars;
31+
}
32+
33+
ctx.invocationsQueue.push(queueItem);
2434

2535
// Track whether we've already seen a "step_started" event for this step.
2636
// This is important because after a retryable failure, the step moves back to

packages/core/src/step/context-storage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ export const contextStorage = /* @__PURE__ */ new AsyncLocalStorage<{
66
stepMetadata: StepMetadata;
77
workflowMetadata: WorkflowMetadata;
88
ops: Promise<void>[];
9+
closureVars?: Record<string, any>;
910
}>();
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { contextStorage } from './context-storage.js';
2+
3+
/**
4+
* Returns the closure variables for the current step function.
5+
* This is an internal function used by the SWC transform to access
6+
* variables from the parent workflow scope.
7+
*
8+
* @internal
9+
*/
10+
export function __private_getClosureVars(): Record<string, any> {
11+
const ctx = contextStorage.getStore();
12+
if (!ctx) {
13+
throw new Error(
14+
'Closure variables can only be accessed inside a step function'
15+
);
16+
}
17+
return ctx.closureVars || {};
18+
}

0 commit comments

Comments
 (0)