Skip to content

Commit f8e5d10

Browse files
TooTallNateijjk
andauthored
Support passing step function references across serialization boundaries (#307)
--------- Co-authored-by: JJ Kasper <jj@jjsweb.site> Signed-off-by: Nathan Rajlich <n@n8.io>
1 parent d113251 commit f8e5d10

File tree

26 files changed

+535
-20
lines changed

26 files changed

+535
-20
lines changed

.changeset/tidy-states-see.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@workflow/swc-plugin": patch
3+
"@workflow/core": patch
4+
---
5+
6+
Support serializing step function references

packages/core/e2e/e2e.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,4 +687,36 @@ describe('e2e', () => {
687687
expect(run2Data.status).toBe('completed');
688688
}
689689
);
690+
691+
test(
692+
'stepFunctionPassingWorkflow - step function references can be passed as arguments',
693+
{ timeout: 60_000 },
694+
async () => {
695+
// This workflow passes a step function reference to another step
696+
// The receiving step calls the passed function and returns the result
697+
const run = await triggerWorkflow('stepFunctionPassingWorkflow', []);
698+
const returnValue = await getWorkflowReturnValue(run.runId);
699+
700+
// doubleNumber(10) = 20, then multiply by 2 = 40
701+
expect(returnValue).toBe(40);
702+
703+
// Verify the run completed successfully
704+
const { json: runData } = await cliInspectJson(
705+
`runs ${run.runId} --withData`
706+
);
707+
expect(runData.status).toBe('completed');
708+
expect(runData.output).toBe(40);
709+
710+
// Verify that exactly 2 steps were executed:
711+
// 1. stepWithStepFunctionArg(doubleNumber)
712+
// (doubleNumber(10) is run inside the stepWithStepFunctionArg step)
713+
const { json: eventsData } = await cliInspectJson(
714+
`events --run ${run.runId} --json`
715+
);
716+
const stepCompletedEvents = eventsData.filter(
717+
(event) => event.eventType === 'step_completed'
718+
);
719+
expect(stepCompletedEvents).toHaveLength(1);
720+
}
721+
);
690722
});

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
"@workflow/world-local": "workspace:*",
5858
"@workflow/world-vercel": "workspace:*",
5959
"debug": "4.4.3",
60-
"devalue": "5.4.1",
60+
"devalue": "5.5.0",
6161
"ms": "2.1.3",
6262
"nanoid": "5.1.6",
6363
"seedrandom": "3.0.5",

packages/core/src/runtime/world.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { createRequire } from 'node:module';
2-
import Path from 'node:path';
2+
import { join } from 'node:path';
33
import type { World } from '@workflow/world';
44
import { createEmbeddedWorld } from '@workflow/world-local';
55
import { createVercelWorld } from '@workflow/world-vercel';
66

7-
const require = createRequire(Path.join(process.cwd(), 'index.js'));
7+
const require = createRequire(join(process.cwd(), 'index.js'));
88

99
let worldCache: World | undefined;
1010
let stubbedWorldCache: World | undefined;

packages/core/src/schemas.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,5 @@ export type Serializable =
6363
| Uint8ClampedArray
6464
| Uint16Array
6565
| Uint32Array
66-
| WritableStream<Uint8Array>;
66+
| WritableStream<Uint8Array>
67+
| ((...args: Serializable[]) => Promise<Serializable>); // Step function

packages/core/src/serialization.test.ts

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
import { runInContext } from 'node:vm';
22
import type { WorkflowRuntimeError } from '@workflow/errors';
33
import { describe, expect, it } from 'vitest';
4+
import { getStepFunction, registerStepFunction } from './private.js';
45
import {
56
dehydrateStepArguments,
67
dehydrateStepReturnValue,
78
dehydrateWorkflowArguments,
89
dehydrateWorkflowReturnValue,
10+
getCommonRevivers,
911
getStreamType,
12+
getWorkflowReducers,
1013
hydrateWorkflowArguments,
1114
} from './serialization.js';
12-
import { STREAM_NAME_SYMBOL } from './symbols.js';
15+
import { STEP_FUNCTION_NAME_SYMBOL, STREAM_NAME_SYMBOL } from './symbols.js';
1316
import { createContext } from './vm/index.js';
1417

1518
describe('getStreamType', () => {
@@ -783,3 +786,130 @@ describe('step return value', () => {
783786
);
784787
});
785788
});
789+
790+
describe('step function serialization', () => {
791+
const { globalThis: vmGlobalThis } = createContext({
792+
seed: 'test',
793+
fixedTimestamp: 1714857600000,
794+
});
795+
796+
it('should detect step function by checking for STEP_FUNCTION_NAME_SYMBOL', () => {
797+
const stepName = 'myStep';
798+
const stepFn = async (x: number) => x * 2;
799+
800+
// Attach the symbol like useStep() does
801+
Object.defineProperty(stepFn, STEP_FUNCTION_NAME_SYMBOL, {
802+
value: stepName,
803+
writable: false,
804+
enumerable: false,
805+
configurable: false,
806+
});
807+
808+
// Verify the symbol is attached correctly
809+
expect((stepFn as any)[STEP_FUNCTION_NAME_SYMBOL]).toBe(stepName);
810+
});
811+
812+
it('should not have STEP_FUNCTION_NAME_SYMBOL on regular functions', () => {
813+
const regularFn = async (x: number) => x * 2;
814+
815+
// Regular functions should not have the symbol
816+
expect((regularFn as any)[STEP_FUNCTION_NAME_SYMBOL]).toBeUndefined();
817+
});
818+
819+
it('should lookup registered step function by name', () => {
820+
const stepName = 'myRegisteredStep';
821+
const stepFn = async (x: number) => x * 2;
822+
823+
// Register the step function
824+
registerStepFunction(stepName, stepFn);
825+
826+
// Should be retrievable by name
827+
const retrieved = getStepFunction(stepName);
828+
expect(retrieved).toBe(stepFn);
829+
});
830+
831+
it('should return undefined for non-existent registered step function', () => {
832+
const retrieved = getStepFunction('nonExistentStep');
833+
expect(retrieved).toBeUndefined();
834+
});
835+
836+
it('should deserialize step function name through reviver', () => {
837+
const stepName = 'testStep';
838+
const stepFn = async () => 42;
839+
840+
// Register the step function
841+
registerStepFunction(stepName, stepFn);
842+
843+
// Get the reviver and test it directly
844+
const revivers = getCommonRevivers(vmGlobalThis);
845+
const result = revivers.StepFunction(stepName);
846+
847+
expect(result).toBe(stepFn);
848+
});
849+
850+
it('should throw error when reviver cannot find registered step function', () => {
851+
const revivers = getCommonRevivers(vmGlobalThis);
852+
853+
let err: Error | undefined;
854+
try {
855+
revivers.StepFunction('nonExistentStep');
856+
} catch (err_) {
857+
err = err_ as Error;
858+
}
859+
860+
expect(err).toBeDefined();
861+
expect(err?.message).toContain('Step function "nonExistentStep" not found');
862+
expect(err?.message).toContain('Make sure the step function is registered');
863+
});
864+
865+
it('should dehydrate step function passed as argument to a step', () => {
866+
const stepName = 'step//workflows/test.ts//myStep';
867+
const stepFn = async (x: number) => x * 2;
868+
869+
// Register the step function
870+
registerStepFunction(stepName, stepFn);
871+
872+
// Attach the symbol to the function (like the SWC compiler would)
873+
Object.defineProperty(stepFn, STEP_FUNCTION_NAME_SYMBOL, {
874+
value: stepName,
875+
writable: false,
876+
enumerable: false,
877+
configurable: false,
878+
});
879+
880+
// Simulate passing a step function as an argument within a workflow
881+
// When calling a step from within a workflow context
882+
const args = [stepFn, 42];
883+
884+
// This should serialize the step function by its name using the reducer
885+
const dehydrated = dehydrateStepArguments(args, globalThis);
886+
887+
// Verify it dehydrated successfully
888+
expect(dehydrated).toBeDefined();
889+
expect(Array.isArray(dehydrated)).toBe(true);
890+
// The dehydrated structure is the flattened format from devalue
891+
// It should contain the step function serialized as its name
892+
expect(dehydrated).toContain(stepName);
893+
expect(dehydrated).toContain(42);
894+
});
895+
896+
it('should serialize step function to name through reducer', () => {
897+
const stepName = 'step//workflows/test.ts//anotherStep';
898+
const stepFn = async () => 'result';
899+
900+
// Attach the symbol to the function (like the SWC compiler would)
901+
Object.defineProperty(stepFn, STEP_FUNCTION_NAME_SYMBOL, {
902+
value: stepName,
903+
writable: false,
904+
enumerable: false,
905+
configurable: false,
906+
});
907+
908+
// Get the reducer and verify it detects the step function
909+
const reducer = getWorkflowReducers(globalThis).StepFunction;
910+
const result = reducer(stepFn);
911+
912+
// Should return the step name
913+
expect(result).toBe(stepName);
914+
});
915+
});

packages/core/src/serialization.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { WorkflowRuntimeError } from '@workflow/errors';
22
import * as devalue from 'devalue';
3+
import { getStepFunction } from './private.js';
34
import { getWorld } from './runtime/world.js';
45
import {
56
BODY_INIT_SYMBOL,
7+
STEP_FUNCTION_NAME_SYMBOL,
68
STREAM_NAME_SYMBOL,
79
STREAM_TYPE_SYMBOL,
810
WEBHOOK_RESPONSE_WRITABLE,
@@ -169,6 +171,7 @@ export interface SerializableSpecial {
169171
redirected: boolean;
170172
};
171173
Set: any[];
174+
StepFunction: string; // step function name/ID
172175
URL: string;
173176
URLSearchParams: string;
174177
Uint8Array: string; // base64 string
@@ -275,6 +278,11 @@ function getCommonReducers(global: Record<string, any> = globalThis) {
275278
};
276279
},
277280
Set: (value) => value instanceof global.Set && Array.from(value),
281+
StepFunction: (value) => {
282+
if (typeof value !== 'function') return false;
283+
const stepName = value[STEP_FUNCTION_NAME_SYMBOL];
284+
return typeof stepName === 'string' ? stepName : false;
285+
},
278286
URL: (value) => value instanceof global.URL && value.href,
279287
URLSearchParams: (value) => {
280288
if (!(value instanceof global.URLSearchParams)) return false;
@@ -464,7 +472,7 @@ function getStepReducers(
464472
};
465473
}
466474

467-
function getCommonRevivers(global: Record<string, any> = globalThis) {
475+
export function getCommonRevivers(global: Record<string, any> = globalThis) {
468476
function reviveArrayBuffer(value: string) {
469477
// Handle sentinel value for zero-length buffers
470478
const base64 = value === '.' ? '' : value;
@@ -516,6 +524,15 @@ function getCommonRevivers(global: Record<string, any> = globalThis) {
516524
Map: (value) => new global.Map(value),
517525
RegExp: (value) => new global.RegExp(value.source, value.flags),
518526
Set: (value) => new global.Set(value),
527+
StepFunction: (value) => {
528+
const stepFn = getStepFunction(value);
529+
if (!stepFn) {
530+
throw new Error(
531+
`Step function "${value}" not found. Make sure the step function is registered.`
532+
);
533+
}
534+
return stepFn;
535+
},
519536
URL: (value) => new global.URL(value),
520537
URLSearchParams: (value) =>
521538
new global.URLSearchParams(value === '.' ? '' : value),

packages/core/src/symbols.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ export const BODY_INIT_SYMBOL = Symbol.for('BODY_INIT');
99
export const WEBHOOK_RESPONSE_WRITABLE = Symbol.for(
1010
'WEBHOOK_RESPONSE_WRITABLE'
1111
);
12+
export const STEP_FUNCTION_NAME_SYMBOL = Symbol.for(
13+
'WORKFLOW_STEP_FUNCTION_NAME'
14+
);

packages/nuxt/src/module.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { defineNuxtModule } from '@nuxt/kit';
2+
import type { NuxtModule } from '@nuxt/schema';
23
import type { ModuleOptions as NitroModuleOptions } from '@workflow/nitro';
34

45
// Module options TypeScript interface definition
@@ -30,4 +31,4 @@ export default defineNuxtModule<ModuleOptions>({
3031
nuxt.options.nitro.modules.push('@workflow/nitro');
3132
}
3233
},
33-
});
34+
}) satisfies NuxtModule<ModuleOptions>;

packages/nuxt/tsconfig.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
{
2-
"extends": "./.nuxt/tsconfig.json",
3-
"exclude": ["dist", "node_modules"]
2+
"extends": "@workflow/tsconfig/base.json",
3+
"exclude": ["dist", "node_modules"],
4+
"compilerOptions": {
5+
"declaration": false,
6+
"declarationMap": false
7+
}
48
}

0 commit comments

Comments
 (0)