-
Notifications
You must be signed in to change notification settings - Fork 127
Add support for closure scope vars in step functions #358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🦋 Changeset detectedLatest commit: 90022d1 The changes in this PR will be included in the next version bump. This PR includes changesets to release 16 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Suggestion:
The observability code checks if step.input is an array before hydrating it, but the PR changes new step inputs to be objects with args and closureVars properties, so they won't be hydrated for display.
View Details
📝 Patch Details
diff --git a/packages/core/src/observability.ts b/packages/core/src/observability.ts
index 9422e6f..814d1ac 100644
--- a/packages/core/src/observability.ts
+++ b/packages/core/src/observability.ts
@@ -48,18 +48,42 @@ const hydrateStepIO = <
>(
step: T
): T => {
+ // Handle both old format (array) and new format (object with args and closureVars)
+ let hydratedInput = step.input;
+
+ if (step.input) {
+ if (Array.isArray(step.input) && step.input.length) {
+ // Old format: input is an array of arguments
+ hydratedInput = hydrateStepArguments(
+ step.input,
+ [],
+ step.runId as string,
+ globalThis,
+ streamPrintRevivers
+ );
+ } else if (
+ typeof step.input === 'object' &&
+ 'args' in step.input &&
+ Array.isArray(step.input.args) &&
+ step.input.args.length
+ ) {
+ // New format: input is { args: [...], closureVars: {...} }
+ hydratedInput = {
+ ...step.input,
+ args: hydrateStepArguments(
+ step.input.args,
+ [],
+ step.runId as string,
+ globalThis,
+ streamPrintRevivers
+ ),
+ };
+ }
+ }
+
return {
...step,
- input:
- step.input && Array.isArray(step.input) && step.input.length
- ? hydrateStepArguments(
- step.input,
- [],
- step.runId as string,
- globalThis,
- streamPrintRevivers
- )
- : step.input,
+ input: hydratedInput,
output: step.output
? hydrateStepReturnValue(step.output, globalThis, streamPrintRevivers)
: step.output,
Analysis
Step input not hydrated for display when using closure variables
What fails: hydrateStepIO() in packages/core/src/observability.ts (line 54) skips hydration for new step inputs, causing closure variable-based steps to display in serialized form instead of human-readable format in logging and observability features.
How to reproduce:
- Create a workflow with step functions that use closure variables
- Call
hydrateResourceIO()with a retrieved step object that has the new input format - Observe that
step.inputis returned unhydrated (still in serialized form)
Expected vs actual behavior:
- Old format (array):
step.input = [1, 2]→Array.isArray(step.input)istrue→ hydrated ✓ - New format (object):
step.input = { args: [1, 2], closureVars: {...} }→Array.isArray(step.input)isfalse→ NOT hydrated ✗
Root cause: The runtime (packages/core/src/runtime.ts line 392-398) changed how step inputs are persisted when the closure variables feature was added. Steps are now stored with structure { args: [...], closureVars: {...} } instead of just [...]. The observability code at line 54 only checked for array format using Array.isArray(step.input), missing the new object format.
Fix: Updated hydrateStepIO() to handle both formats:
- Check if input is an array (old format) and hydrate directly
- Check if input is an object with
argsproperty (new format) and hydrate theargsfield while preservingclosureVars
VaguelySerious
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Vade commented before I could on backwards compatibility, the suggested patch looks good to me. Aside from that, see my comment on o11y
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Suggestion:
The StepSchema zod definition still specifies input: z.array(z.any()) but the CreateStepRequest interface was changed to accept input: SerializedData (any). This schema-interface mismatch could cause validation issues.
View Details
📝 Patch Details
diff --git a/packages/world-vercel/src/steps.ts b/packages/world-vercel/src/steps.ts
index 8323344..1d3a449 100644
--- a/packages/world-vercel/src/steps.ts
+++ b/packages/world-vercel/src/steps.ts
@@ -41,7 +41,7 @@ const StepWireWithRefsSchema = StepWireSchema.omit({
// We discard the results of the refs, so we don't care about the type here
inputRef: z.any().optional(),
outputRef: z.any().optional(),
- input: z.array(z.any()).optional(),
+ input: z.any().optional(),
output: z.any().optional(),
});
diff --git a/packages/world/src/steps.ts b/packages/world/src/steps.ts
index 8c973f6..e37a6f7 100644
--- a/packages/world/src/steps.ts
+++ b/packages/world/src/steps.ts
@@ -22,7 +22,7 @@ export const StepSchema = z.object({
stepId: z.string(),
stepName: z.string(),
status: StepStatusSchema,
- input: z.array(z.any()),
+ input: z.any(),
output: z.any().optional(),
error: StructuredErrorSchema.optional(),
attempt: z.number(),
Analysis
StepSchema zod definition mismatch - input field changed from array to object
What fails: StepSchema.parse() in packages/world/src/steps.ts:25 rejects step input data when steps are created with the new closure scope format. The schema validates input: z.array(z.any()) but runtime sends input: { args, closureVars } (an object), causing validation errors like "Invalid input: expected array, received object" when steps are read from storage.
How to reproduce:
-
In
packages/core/src/runtime.ts, step creation now sends:const step = await world.steps.create(runId, { stepId: queueItem.correlationId, stepName: queueItem.stepName, input: dehydrateStepArguments({ args: queueItem.args, closureVars: queueItem.closureVars, // <- New format }, err.globalThis), });
-
This object format is stored via
packages/world-local/src/storage.tswhich callsreadJSON(stepPath, StepSchema) -
When reading the stored step back,
StepSchema.parse()is called on the JSON and fails with:Invalid input: expected array, received object
Result: Schema validation fails because input is an object { args: [...], closureVars: {...} } but schema expects array
Expected: Schema should accept both array (backward compatibility) and object (new closure scope format) since SerializedData is unknown
Root cause: In commit 8cca401 ("WIP closure scope vars in step functions"), the CreateStepRequest.input type was changed from SerializedData[] to SerializedData, but the StepSchema validation schema was not updated to match.
Fix applied:
- Changed
packages/world/src/steps.tsline 25:input: z.array(z.any())→input: z.any() - Changed
packages/world-vercel/src/steps.tsline 44:input: z.array(z.any()).optional()→input: z.any().optional()(inStepWireWithRefsSchema)
Both changes align the schemas with the actual runtime behavior and the CreateStepRequest interface type.
Merge activity
|
### What changed?
- Enhanced the SWC plugin to detect and collect closure variables from nested step functions
- Modified the workflow runtime to pass closure variables to step functions during execution
- Updated the serialization/deserialization logic to handle the new closure variable format
- Added a mechanism to access closure variables within step functions
- Added tests to verify closure variable functionality in nested step functions
- Closure variables are stored on the step function execution's `AsyncLocalStorage` context
### Example
```typescript
export async function myWorkflow(baseValue: number) {
'use workflow';
const multiplier = 3;
const prefix = 'Result: ';
const calculate = async () => {
'use step';
const result = baseValue * multiplier;
return `${prefix}${result}`;
};
return await calculate();
}
```
### Why make this change?
Previously, nested step functions couldn't access variables from their parent workflow scope, limiting their usefulness and requiring workarounds like passing all needed values as parameters. This change enables a more natural programming model where step functions can access variables from their enclosing scope, making workflow code more intuitive and reducing the need for explicit parameter passing.
What changed?
AsyncLocalStorage contextExample
Why make this change?
Previously, nested step functions couldn't access variables from their parent workflow scope, limiting their usefulness and requiring workarounds like passing all needed values as parameters. This change enables a more natural programming model where step functions can access variables from their enclosing scope, making workflow code more intuitive and reducing the need for explicit parameter passing.