From 9031c025d34d6ad9c974e09f9b5599c027f4140b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 5 Nov 2025 14:02:34 -0800 Subject: [PATCH 01/25] Core: require specifying runId when writing to stream --- packages/core/src/runtime.ts | 25 +++- packages/core/src/serialization.ts | 135 +++++++++++++----- packages/core/src/step/writable-stream.ts | 14 +- .../src/api/workflow-server-actions.ts | 4 +- packages/world-local/src/streamer.test.ts | 96 +++++++------ packages/world-local/src/streamer.ts | 10 +- packages/world-postgres/src/streamer.ts | 9 +- packages/world-vercel/src/streamer.ts | 10 +- packages/world/src/interfaces.ts | 9 +- 9 files changed, 211 insertions(+), 101 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 66a5b5616..1d344c6ef 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -177,7 +177,7 @@ export class Run { ): ReadableStream { const { ops = [], global = globalThis, startIndex, namespace } = options; const name = getWorkflowRunStreamId(this.runId, namespace); - return getExternalRevivers(global, ops).ReadableStream({ + return getExternalRevivers(global, ops, this.runId).ReadableStream({ name, startIndex, }) as ReadableStream; @@ -194,7 +194,13 @@ export class Run { const run = await this.world.runs.get(this.runId); if (run.status === 'completed') { - return hydrateWorkflowReturnValue(run.output, [], globalThis); + return hydrateWorkflowReturnValue( + run.output, + [], + globalThis, + {}, + this.runId + ); } if (run.status === 'cancelled') { @@ -652,7 +658,13 @@ export const stepEntrypoint = } // Hydrate the step input arguments const ops: Promise[] = []; - const args = hydrateStepArguments(step.input, ops); + const args = hydrateStepArguments( + step.input, + ops, + globalThis, + {}, + workflowRunId + ); span?.setAttributes({ ...Attribute.StepArgumentsCount(args.length), @@ -679,7 +691,12 @@ export const stepEntrypoint = () => stepFn(...args) ); - result = dehydrateStepReturnValue(result, ops); + result = dehydrateStepReturnValue( + result, + ops, + globalThis, + workflowRunId + ); waitUntil(Promise.all(ops)); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index c67bcac7c..08fbfcea5 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -80,7 +80,10 @@ export function getDeserializeStream( export class WorkflowServerReadableStream extends ReadableStream { #reader?: ReadableStreamDefaultReader; - constructor(name: string, startIndex?: number) { + constructor(runId: string, name: string, startIndex?: number) { + if (typeof runId !== 'string' || runId.length === 0) { + throw new Error(`"runId" is required, got "${runId}"`); + } if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); } @@ -92,7 +95,7 @@ export class WorkflowServerReadableStream extends ReadableStream { let reader = this.#reader; if (!reader) { const world = getWorld(); - const stream = await world.readFromStream(name, startIndex); + const stream = await world.readFromStream(runId, name, startIndex); reader = this.#reader = stream.getReader(); } if (!reader) { @@ -113,17 +116,20 @@ export class WorkflowServerReadableStream extends ReadableStream { } export class WorkflowServerWritableStream extends WritableStream { - constructor(name: string) { + constructor(runId: string, name: string) { + if (typeof runId !== 'string' || runId.length === 0) { + throw new Error(`"runId" is required, got "${runId}"`); + } if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); } const world = getWorld(); super({ async write(chunk) { - await world.writeToStream(name, chunk); + await world.writeToStream(runId, name, chunk); }, async close() { - await world.closeStream(name); + await world.closeStream(runId, name); }, }); } @@ -301,11 +307,13 @@ function getCommonReducers(global: Record = globalThis) { * * @param global * @param ops + * @param runId * @returns */ export function getExternalReducers( global: Record = globalThis, - ops: Promise[] + ops: Promise[], + runId?: string ): Reducers { return { ...getCommonReducers(global), @@ -321,13 +329,17 @@ export function getExternalReducers( const name = global.crypto.randomUUID(); const type = getStreamType(value); - const writable = new WorkflowServerWritableStream(name); + // Use a placeholder runId if not available (e.g., when initially starting workflow) + const streamRunId = runId || 'pending'; + const writable = new WorkflowServerWritableStream(streamRunId, name); if (type === 'bytes') { ops.push(value.pipeTo(writable)); } else { ops.push( value - .pipeThrough(getSerializeStream(getExternalReducers(global, ops))) + .pipeThrough( + getSerializeStream(getExternalReducers(global, ops, runId)) + ) .pipeTo(writable) ); } @@ -340,9 +352,13 @@ export function getExternalReducers( WritableStream: (value) => { if (!(value instanceof global.WritableStream)) return false; const name = global.crypto.randomUUID(); + // Use a placeholder runId if not available + const streamRunId = runId || 'pending'; ops.push( - new WorkflowServerReadableStream(name) - .pipeThrough(getDeserializeStream(getExternalRevivers(global, ops))) + new WorkflowServerReadableStream(streamRunId, name) + .pipeThrough( + getDeserializeStream(getExternalRevivers(global, ops, runId)) + ) .pipeTo(value) ); return { name }; @@ -402,11 +418,13 @@ export function getWorkflowReducers( * * @param global * @param ops + * @param runId * @returns */ function getStepReducers( global: Record = globalThis, - ops: Promise[] + ops: Promise[], + runId?: string ): Reducers { return { ...getCommonReducers(global), @@ -429,13 +447,17 @@ function getStepReducers( name = global.crypto.randomUUID(); type = getStreamType(value); - const writable = new WorkflowServerWritableStream(name); + // Use a placeholder runId if not available + const streamRunId = runId || 'pending'; + const writable = new WorkflowServerWritableStream(streamRunId, name); if (type === 'bytes') { ops.push(value.pipeTo(writable)); } else { ops.push( value - .pipeThrough(getSerializeStream(getStepReducers(global, ops))) + .pipeThrough( + getSerializeStream(getStepReducers(global, ops, runId)) + ) .pipeTo(writable) ); } @@ -452,9 +474,13 @@ function getStepReducers( let name = value[STREAM_NAME_SYMBOL]; if (!name) { name = global.crypto.randomUUID(); + // Use a placeholder runId if not available + const streamRunId = runId || 'pending'; ops.push( - new WorkflowServerReadableStream(name) - .pipeThrough(getDeserializeStream(getStepRevivers(global, ops))) + new WorkflowServerReadableStream(streamRunId, name) + .pipeThrough( + getDeserializeStream(getStepRevivers(global, ops, runId)) + ) .pipeTo(value) ); } @@ -544,10 +570,12 @@ function getCommonRevivers(global: Record = globalThis) { * * @param global * @param ops + * @param runId */ export function getExternalRevivers( global: Record = globalThis, - ops: Promise[] + ops: Promise[], + runId?: string ): Revivers { return { ...getCommonRevivers(global), @@ -579,7 +607,10 @@ export function getExternalRevivers( return response.body; } + // Use a placeholder runId if not available + const streamRunId = runId || 'pending'; const readable = new WorkflowServerReadableStream( + streamRunId, value.name, value.startIndex ); @@ -587,16 +618,22 @@ export function getExternalRevivers( return readable; } else { const transform = getDeserializeStream( - getExternalRevivers(global, ops) + getExternalRevivers(global, ops, runId) ); ops.push(readable.pipeTo(transform.writable)); return transform.readable; } }, WritableStream: (value) => { - const serialize = getSerializeStream(getExternalReducers(global, ops)); + const serialize = getSerializeStream( + getExternalReducers(global, ops, runId) + ); + // Use a placeholder runId if not available + const streamRunId = runId || 'pending'; ops.push( - serialize.readable.pipeTo(new WorkflowServerWritableStream(value.name)) + serialize.readable.pipeTo( + new WorkflowServerWritableStream(streamRunId, value.name) + ) ); return serialize.writable; }, @@ -675,11 +712,13 @@ export function getWorkflowRevivers( * * @param global * @param ops + * @param runId * @returns */ function getStepRevivers( global: Record = globalThis, - ops: Promise[] + ops: Promise[], + runId?: string ): Revivers { return { ...getCommonRevivers(global), @@ -720,19 +759,30 @@ function getStepRevivers( return response.body; } - const readable = new WorkflowServerReadableStream(value.name); + // Use a placeholder runId if not available + const streamRunId = runId || 'pending'; + const readable = new WorkflowServerReadableStream( + streamRunId, + value.name + ); if (value.type === 'bytes') { return readable; } else { - const transform = getDeserializeStream(getStepRevivers(global, ops)); + const transform = getDeserializeStream( + getStepRevivers(global, ops, runId) + ); ops.push(readable.pipeTo(transform.writable)); return transform.readable; } }, WritableStream: (value) => { - const serialize = getSerializeStream(getStepReducers(global, ops)); + const serialize = getSerializeStream(getStepReducers(global, ops, runId)); + // Use a placeholder runId if not available + const streamRunId = runId || 'pending'; ops.push( - serialize.readable.pipeTo(new WorkflowServerWritableStream(value.name)) + serialize.readable.pipeTo( + new WorkflowServerWritableStream(streamRunId, value.name) + ) ); return serialize.writable; }, @@ -746,15 +796,20 @@ function getStepRevivers( * * @param value * @param global + * @param runId * @returns The dehydrated value, ready to be inserted into the database */ export function dehydrateWorkflowArguments( value: unknown, ops: Promise[], - global: Record = globalThis + global: Record = globalThis, + runId?: string ) { try { - const str = devalue.stringify(value, getExternalReducers(global, ops)); + const str = devalue.stringify( + value, + getExternalReducers(global, ops, runId) + ); return revive(str); } catch (error) { throw new WorkflowRuntimeError( @@ -814,17 +869,21 @@ export function dehydrateWorkflowReturnValue( * return value of a completed workflow run. * * @param value + * @param ops * @param global + * @param extraRevivers + * @param runId * @returns The hydrated return value, ready to be consumed by the client */ export function hydrateWorkflowReturnValue( value: Parameters[0], ops: Promise[], global: Record = globalThis, - extraRevivers: Record any> = {} + extraRevivers: Record any> = {}, + runId?: string ) { const obj = devalue.unflatten(value, { - ...getExternalRevivers(global, ops), + ...getExternalRevivers(global, ops, runId), ...extraRevivers, }); return obj; @@ -859,17 +918,21 @@ export function dehydrateStepArguments( * from the database at the start of the step execution. * * @param value + * @param ops * @param global + * @param extraRevivers + * @param runId * @returns The hydrated value, ready to be consumed by the step user-code function */ export function hydrateStepArguments( value: Parameters[0], ops: Promise[], global: Record = globalThis, - extraRevivers: Record any> = {} + extraRevivers: Record any> = {}, + runId?: string ) { const obj = devalue.unflatten(value, { - ...getStepRevivers(global, ops), + ...getStepRevivers(global, ops, runId), ...extraRevivers, }); return obj; @@ -881,16 +944,19 @@ export function hydrateStepArguments( * into a format that can be saved to the database. * * @param value + * @param ops * @param global + * @param runId * @returns The dehydrated value, ready to be inserted into the database */ export function dehydrateStepReturnValue( value: unknown, ops: Promise[], - global: Record = globalThis + global: Record = globalThis, + runId?: string ) { try { - const str = devalue.stringify(value, getStepReducers(global, ops)); + const str = devalue.stringify(value, getStepReducers(global, ops, runId)); return revive(str); } catch (error) { throw new WorkflowRuntimeError( @@ -906,12 +972,15 @@ export function dehydrateStepReturnValue( * * @param value * @param global + * @param extraRevivers + * @param runId * @returns The hydrated return value of a step, ready to be consumed by the workflow handler */ export function hydrateStepReturnValue( value: Parameters[0], global: Record = globalThis, - extraRevivers: Record any> = {} + extraRevivers: Record any> = {}, + _runId?: string ) { const obj = devalue.unflatten(value, { ...getWorkflowRevivers(global), diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index c51e0c0dc..2ff071a15 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -1,7 +1,7 @@ import { - WorkflowServerWritableStream, - getSerializeStream, getExternalReducers, + getSerializeStream, + WorkflowServerWritableStream, } from '../serialization.js'; import { getWorkflowRunStreamId } from '../util.js'; import type { WorkflowWritableStreamOptions } from '../writable-stream.js'; @@ -30,20 +30,18 @@ export function getWritable( } const { namespace } = options; - const name = getWorkflowRunStreamId( - ctx.workflowMetadata.workflowRunId, - namespace - ); + const runId = ctx.workflowMetadata.workflowRunId; + const name = getWorkflowRunStreamId(runId, namespace); // Create a transform stream that serializes chunks and pipes to the workflow server const serialize = getSerializeStream( - getExternalReducers(globalThis, ctx.ops) + getExternalReducers(globalThis, ctx.ops, runId) ); // Pipe the serialized data to the workflow server stream // Register this async operation with the runtime's ops array so it's awaited via waitUntil ctx.ops.push( - serialize.readable.pipeTo(new WorkflowServerWritableStream(name)) + serialize.readable.pipeTo(new WorkflowServerWritableStream(runId, name)) ); // Return the writable side of the transform stream diff --git a/packages/web-shared/src/api/workflow-server-actions.ts b/packages/web-shared/src/api/workflow-server-actions.ts index 4d22987f5..6604d03d7 100644 --- a/packages/web-shared/src/api/workflow-server-actions.ts +++ b/packages/web-shared/src/api/workflow-server-actions.ts @@ -444,7 +444,9 @@ export async function readStreamServerAction( ): Promise>> { try { const world = getWorldFromEnv(env); - const stream = await world.readFromStream(streamId, startIndex); + // Extract runId from streamId (strm_{ULID}_user... -> wrun_{ULID}) + const runId = streamId.replace(/^strm_([^_]+)_user.*$/, 'wrun_$1'); + const stream = await world.readFromStream(runId, streamId, startIndex); return createResponse(stream); } catch (error) { console.error('Failed to read stream:', error); diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index a5bbf82f4..4e53781de 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -129,10 +129,11 @@ describe('streamer', () => { describe('writeToStream', () => { it('should write string chunks to a stream', async () => { const { testDir, streamer } = await setupStreamer(); + const runId = 'wrun_test123'; const streamName = 'test-stream'; - await streamer.writeToStream(streamName, 'hello'); - await streamer.writeToStream(streamName, ' world'); + await streamer.writeToStream(runId, streamName, 'hello'); + await streamer.writeToStream(runId, streamName, ' world'); // Verify chunks directory was created const chunksDir = path.join(testDir, 'streams', 'chunks'); @@ -145,12 +146,13 @@ describe('streamer', () => { it('should write Buffer chunks to a stream', async () => { const { testDir, streamer } = await setupStreamer(); + const runId = 'wrun_test123'; const streamName = 'buffer-stream'; const buffer1 = Buffer.from('chunk1'); const buffer2 = Buffer.from('chunk2'); - await streamer.writeToStream(streamName, buffer1); - await streamer.writeToStream(streamName, buffer2); + await streamer.writeToStream(runId, streamName, buffer1); + await streamer.writeToStream(runId, streamName, buffer2); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -161,10 +163,11 @@ describe('streamer', () => { it('should write Uint8Array chunks to a stream', async () => { const { testDir, streamer } = await setupStreamer(); + const runId = 'wrun_test123'; const streamName = 'uint8-stream'; const uint8Array = new Uint8Array([1, 2, 3, 4]); - await streamer.writeToStream(streamName, uint8Array); + await streamer.writeToStream(runId, streamName, uint8Array); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -175,10 +178,11 @@ describe('streamer', () => { it('should handle multiple streams independently', async () => { const { testDir, streamer } = await setupStreamer(); + const runId = 'wrun_test123'; - await streamer.writeToStream('stream1', 'data1'); - await streamer.writeToStream('stream2', 'data2'); - await streamer.writeToStream('stream1', 'data3'); + await streamer.writeToStream(runId, 'stream1', 'data1'); + await streamer.writeToStream(runId, 'stream2', 'data2'); + await streamer.writeToStream(runId, 'stream1', 'data3'); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -194,10 +198,10 @@ describe('streamer', () => { describe('closeStream', () => { it('should close an empty stream', async () => { const { testDir, streamer } = await setupStreamer(); - + const runId = 'wrun_test123'; const streamName = 'empty-stream'; - await streamer.closeStream(streamName); + await streamer.closeStream(runId, streamName); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -208,12 +212,12 @@ describe('streamer', () => { it('should close a stream with existing chunks', async () => { const { testDir, streamer } = await setupStreamer(); - + const runId = 'wrun_test123'; const streamName = 'existing-stream'; - await streamer.writeToStream(streamName, 'chunk1'); - await streamer.writeToStream(streamName, 'chunk2'); - await streamer.closeStream(streamName); + await streamer.writeToStream(runId, streamName, 'chunk1'); + await streamer.writeToStream(runId, streamName, 'chunk2'); + await streamer.closeStream(runId, streamName); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -225,18 +229,18 @@ describe('streamer', () => { describe('readFromStream', () => { it('should read chunks from a completed stream', async () => { const { streamer } = await setupStreamer(); - + const runId = 'wrun_test123'; const streamName = 'read-stream'; const chunk1 = 'hello '; const chunk2 = 'world'; - await streamer.writeToStream(streamName, chunk1); + await streamer.writeToStream(runId, streamName, chunk1); // Add a small delay to ensure different ULID timestamps await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, chunk2); - await streamer.closeStream(streamName); + await streamer.writeToStream(runId, streamName, chunk2); + await streamer.closeStream(runId, streamName); - const stream = await streamer.readFromStream(streamName); + const stream = await streamer.readFromStream(runId, streamName); const reader = stream.getReader(); const chunks: Uint8Array[] = []; @@ -256,18 +260,18 @@ describe('streamer', () => { it('should read binary data correctly', async () => { const { streamer } = await setupStreamer(); - + const runId = 'wrun_test123'; const streamName = 'binary-stream'; const binaryData1 = new Uint8Array([1, 2, 3]); const binaryData2 = new Uint8Array([4, 5, 6]); - await streamer.writeToStream(streamName, binaryData1); + await streamer.writeToStream(runId, streamName, binaryData1); // Add delay to ensure different ULID timestamps await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, binaryData2); - await streamer.closeStream(streamName); + await streamer.writeToStream(runId, streamName, binaryData2); + await streamer.closeStream(runId, streamName); - const stream = await streamer.readFromStream(streamName); + const stream = await streamer.readFromStream(runId, streamName); const reader = stream.getReader(); const chunks: Uint8Array[] = []; @@ -295,18 +299,18 @@ describe('streamer', () => { it('should preserve chunk order based on ULID timestamps', async () => { const { streamer } = await setupStreamer(); - + const runId = 'wrun_test123'; const streamName = 'ordered-stream'; // Write chunks with small delays to ensure different ULID timestamps - await streamer.writeToStream(streamName, '1'); + await streamer.writeToStream(runId, streamName, '1'); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, '2'); + await streamer.writeToStream(runId, streamName, '2'); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, '3'); - await streamer.closeStream(streamName); + await streamer.writeToStream(runId, streamName, '3'); + await streamer.closeStream(runId, streamName); - const stream = await streamer.readFromStream(streamName); + const stream = await streamer.readFromStream(runId, streamName); const reader = stream.getReader(); const chunks: string[] = []; @@ -327,21 +331,21 @@ describe('streamer', () => { describe('integration scenarios', () => { it('should handle complete write-close-read cycle', async () => { const { streamer } = await setupStreamer(); - + const runId = 'wrun_test123'; const streamName = 'integration-stream'; // Write chunks with proper timing - await streamer.writeToStream(streamName, 'start '); + await streamer.writeToStream(runId, streamName, 'start '); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, 'middle '); + await streamer.writeToStream(runId, streamName, 'middle '); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, 'end'); + await streamer.writeToStream(runId, streamName, 'end'); // Close the stream - await streamer.closeStream(streamName); + await streamer.closeStream(runId, streamName); // Read complete stream - const completeStream = await streamer.readFromStream(streamName); + const completeStream = await streamer.readFromStream(runId, streamName); const completeReader = completeStream.getReader(); const completeChunks: Uint8Array[] = []; let completeDone = false; @@ -362,25 +366,26 @@ describe('streamer', () => { // Run multiple iterations to increase probability of catching race conditions for (let iteration = 0; iteration < 10; iteration++) { const { streamer } = await setupStreamer(); + const runId = 'wrun_test123'; const streamName = `race-${iteration}`; // Write a few chunks to disk first - await streamer.writeToStream(streamName, '0\n'); - await streamer.writeToStream(streamName, '1\n'); + await streamer.writeToStream(runId, streamName, '0\n'); + await streamer.writeToStream(runId, streamName, '1\n'); // Start writing chunks in background IMMEDIATELY before reading const writeTask = (async () => { for (let i = 2; i < 10; i++) { - await streamer.writeToStream(streamName, `${i}\n`); + await streamer.writeToStream(runId, streamName, `${i}\n`); // No delay - fire them off as fast as possible to hit the race window } - await streamer.closeStream(streamName); + await streamer.closeStream(runId, streamName); })(); // Start reading - this triggers start() which should set up listeners // BEFORE listing files to avoid missing chunks, and track delivered // chunk IDs to avoid duplicates - const stream = await streamer.readFromStream(streamName); + const stream = await streamer.readFromStream(runId, streamName); const reader = stream.getReader(); const chunks: string[] = []; @@ -426,16 +431,17 @@ describe('streamer', () => { it('should maintain chronological order when chunks arrive during disk reading', async () => { const { streamer } = await setupStreamer(); + const runId = 'wrun_test123'; const streamName = 'ordering-test'; // Write chunks 0-4 to disk for (let i = 0; i < 5; i++) { - await streamer.writeToStream(streamName, `${i}\n`); + await streamer.writeToStream(runId, streamName, `${i}\n`); await new Promise((resolve) => setTimeout(resolve, 2)); } // Start reading - const stream = await streamer.readFromStream(streamName); + const stream = await streamer.readFromStream(runId, streamName); const reader = stream.getReader(); const chunks: string[] = []; @@ -452,10 +458,10 @@ describe('streamer', () => { // Immediately write more chunks (5-9) while disk reading might be in progress for (let i = 5; i < 10; i++) { - await streamer.writeToStream(streamName, `${i}\n`); + await streamer.writeToStream(runId, streamName, `${i}\n`); } - await streamer.closeStream(streamName); + await streamer.closeStream(runId, streamName); await readPromise; // Verify chunks are in exact chronological order (not just all present) diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index df234d553..c69ca07b8 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -48,7 +48,11 @@ export function createStreamer(basedir: string): Streamer { }>(); return { - async writeToStream(name, chunk) { + async writeToStream( + _runId: string, + name: string, + chunk: string | Uint8Array + ) { const chunkId = `strm_${monotonicUlid()}`; if (typeof chunk === 'string') { @@ -83,7 +87,7 @@ export function createStreamer(basedir: string): Streamer { }); }, - async closeStream(name) { + async closeStream(_runId: string, name: string) { const chunkId = `strm_${monotonicUlid()}`; const chunkPath = path.join( basedir, @@ -100,7 +104,7 @@ export function createStreamer(basedir: string): Streamer { streamEmitter.emit(`close:${name}` as const, { streamName: name }); }, - async readFromStream(name, startIndex = 0) { + async readFromStream(_runId: string, name: string, startIndex = 0) { const chunksDir = path.join(basedir, 'streams', 'chunks'); let removeListeners = () => {}; diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index 36696f4f2..62bd89d72 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -88,7 +88,11 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { }); return { - async writeToStream(name, chunk) { + async writeToStream( + _runId: string, + name: string, + chunk: string | Uint8Array + ) { const chunkId = genChunkId(); await drizzle.insert(streams).values({ chunkId, @@ -106,7 +110,7 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { ) ); }, - async closeStream(name: string): Promise { + async closeStream(_runId: string, name: string): Promise { const chunkId = genChunkId(); await drizzle.insert(streams).values({ chunkId, @@ -125,6 +129,7 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { ); }, async readFromStream( + _runId: string, name: string, startIndex?: number ): Promise> { diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index eae1fac89..9b73a146a 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -7,7 +7,11 @@ function getStreamUrl(name: string, httpConfig: HttpConfig) { export function createStreamer(config?: APIConfig): Streamer { return { - async writeToStream(name, chunk) { + async writeToStream( + _runId: string, + name: string, + chunk: string | Uint8Array + ) { const httpConfig = await getHttpConfig(config); await fetch(getStreamUrl(name, httpConfig), { method: 'PUT', @@ -17,7 +21,7 @@ export function createStreamer(config?: APIConfig): Streamer { }); }, - async closeStream(name) { + async closeStream(_runId: string, name: string) { const httpConfig = await getHttpConfig(config); httpConfig.headers.set('X-Stream-Done', 'true'); await fetch(getStreamUrl(name, httpConfig), { @@ -26,7 +30,7 @@ export function createStreamer(config?: APIConfig): Streamer { }); }, - async readFromStream(name, startIndex) { + async readFromStream(_runId: string, name: string, startIndex?: number) { const httpConfig = await getHttpConfig(config); const url = getStreamUrl(name, httpConfig); if (typeof startIndex === 'number') { diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index 29f1827c7..2d6ab8756 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -32,9 +32,14 @@ import type { } from './steps.js'; export interface Streamer { - writeToStream(name: string, chunk: string | Uint8Array): Promise; - closeStream(name: string): Promise; + writeToStream( + runId: string, + name: string, + chunk: string | Uint8Array + ): Promise; + closeStream(runId: string, name: string): Promise; readFromStream( + runId: string, name: string, startIndex?: number ): Promise>; From 69ae7176e9c6f1194ef134fe22e16feb043be76b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 5 Nov 2025 14:03:10 -0800 Subject: [PATCH 02/25] Changeset --- .changeset/purple-pianos-stare.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 .changeset/purple-pianos-stare.md diff --git a/.changeset/purple-pianos-stare.md b/.changeset/purple-pianos-stare.md new file mode 100644 index 000000000..6a6a899c9 --- /dev/null +++ b/.changeset/purple-pianos-stare.md @@ -0,0 +1,10 @@ +--- +"@workflow/world-postgres": patch +"@workflow/world-vercel": patch +"@workflow/world-local": patch +"@workflow/web-shared": patch +"@workflow/world": patch +"@workflow/core": patch +--- + +Require specifying runId when writing to stream From f9be1b614273df6b16ed6643c856d2300771a624 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 5 Nov 2025 14:12:13 -0800 Subject: [PATCH 03/25] Fix streamer URL --- packages/world-vercel/src/streamer.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 9b73a146a..ab1d31159 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -1,19 +1,21 @@ import type { Streamer } from '@workflow/world'; import { type APIConfig, getHttpConfig, type HttpConfig } from './utils.js'; -function getStreamUrl(name: string, httpConfig: HttpConfig) { - return new URL(`${httpConfig.baseUrl}/v1/stream/${encodeURIComponent(name)}`); +function getStreamUrl(runId: string, name: string, httpConfig: HttpConfig) { + return new URL( + `${httpConfig.baseUrl}/v1/run/${runId}/stream/${encodeURIComponent(name)}` + ); } export function createStreamer(config?: APIConfig): Streamer { return { async writeToStream( - _runId: string, + runId: string, name: string, chunk: string | Uint8Array ) { const httpConfig = await getHttpConfig(config); - await fetch(getStreamUrl(name, httpConfig), { + await fetch(getStreamUrl(runId, name, httpConfig), { method: 'PUT', body: chunk, headers: httpConfig.headers, @@ -21,18 +23,18 @@ export function createStreamer(config?: APIConfig): Streamer { }); }, - async closeStream(_runId: string, name: string) { + async closeStream(runId: string, name: string) { const httpConfig = await getHttpConfig(config); httpConfig.headers.set('X-Stream-Done', 'true'); - await fetch(getStreamUrl(name, httpConfig), { + await fetch(getStreamUrl(runId, name, httpConfig), { method: 'PUT', headers: httpConfig.headers, }); }, - async readFromStream(_runId: string, name: string, startIndex?: number) { + async readFromStream(runId: string, name: string, startIndex?: number) { const httpConfig = await getHttpConfig(config); - const url = getStreamUrl(name, httpConfig); + const url = getStreamUrl(runId, name, httpConfig); if (typeof startIndex === 'number') { url.searchParams.set('startIndex', String(startIndex)); } From 69ba8e6de67166835cc7b0e9e41ae6a2b18e2b23 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 5 Nov 2025 14:43:30 -0800 Subject: [PATCH 04/25] Add "use client" directive on top-level exports (node16 nextjs support) --- .changeset/some-papayas-give.md | 5 +++++ packages/web-shared/src/trace-viewer/context.tsx | 2 ++ packages/web-shared/src/workflow-trace-view.tsx | 2 ++ 3 files changed, 9 insertions(+) create mode 100644 .changeset/some-papayas-give.md diff --git a/.changeset/some-papayas-give.md b/.changeset/some-papayas-give.md new file mode 100644 index 000000000..7deadee72 --- /dev/null +++ b/.changeset/some-papayas-give.md @@ -0,0 +1,5 @@ +--- +"@workflow/web-shared": patch +--- + +Add "use client" directly on all top-level client component exports for node16-nextjs compatibility diff --git a/packages/web-shared/src/trace-viewer/context.tsx b/packages/web-shared/src/trace-viewer/context.tsx index 2fc1c09be..90eb96501 100644 --- a/packages/web-shared/src/trace-viewer/context.tsx +++ b/packages/web-shared/src/trace-viewer/context.tsx @@ -1,3 +1,5 @@ +'use client'; + import type { Dispatch, MutableRefObject, ReactNode, Reducer } from 'react'; import { createContext, diff --git a/packages/web-shared/src/workflow-trace-view.tsx b/packages/web-shared/src/workflow-trace-view.tsx index a3188448b..4a58487dd 100644 --- a/packages/web-shared/src/workflow-trace-view.tsx +++ b/packages/web-shared/src/workflow-trace-view.tsx @@ -1,3 +1,5 @@ +'use client'; + import type { Event, Hook, Step, WorkflowRun } from '@workflow/world'; import { Loader2 } from 'lucide-react'; import { useEffect, useMemo, useState } from 'react'; From 1399d4be4cc019cafd417da79cff7af685ac3d9d Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 5 Nov 2025 14:44:10 -0800 Subject: [PATCH 05/25] Revert "Add "use client" directive on top-level exports (node16 nextjs support)" This reverts commit 69ba8e6de67166835cc7b0e9e41ae6a2b18e2b23. --- .changeset/some-papayas-give.md | 5 ----- packages/web-shared/src/trace-viewer/context.tsx | 2 -- packages/web-shared/src/workflow-trace-view.tsx | 2 -- 3 files changed, 9 deletions(-) delete mode 100644 .changeset/some-papayas-give.md diff --git a/.changeset/some-papayas-give.md b/.changeset/some-papayas-give.md deleted file mode 100644 index 7deadee72..000000000 --- a/.changeset/some-papayas-give.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@workflow/web-shared": patch ---- - -Add "use client" directly on all top-level client component exports for node16-nextjs compatibility diff --git a/packages/web-shared/src/trace-viewer/context.tsx b/packages/web-shared/src/trace-viewer/context.tsx index 90eb96501..2fc1c09be 100644 --- a/packages/web-shared/src/trace-viewer/context.tsx +++ b/packages/web-shared/src/trace-viewer/context.tsx @@ -1,5 +1,3 @@ -'use client'; - import type { Dispatch, MutableRefObject, ReactNode, Reducer } from 'react'; import { createContext, diff --git a/packages/web-shared/src/workflow-trace-view.tsx b/packages/web-shared/src/workflow-trace-view.tsx index 4a58487dd..a3188448b 100644 --- a/packages/web-shared/src/workflow-trace-view.tsx +++ b/packages/web-shared/src/workflow-trace-view.tsx @@ -1,5 +1,3 @@ -'use client'; - import type { Event, Hook, Step, WorkflowRun } from '@workflow/world'; import { Loader2 } from 'lucide-react'; import { useEffect, useMemo, useState } from 'react'; From e4b21ffdb78227ac80fef0c84324945fc11e8554 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 5 Nov 2025 14:43:30 -0800 Subject: [PATCH 06/25] Add "use client" directive on top-level exports (node16 nextjs support) --- .changeset/some-papayas-give.md | 5 +++++ packages/web-shared/src/trace-viewer/context.tsx | 2 ++ packages/web-shared/src/workflow-trace-view.tsx | 2 ++ 3 files changed, 9 insertions(+) create mode 100644 .changeset/some-papayas-give.md diff --git a/.changeset/some-papayas-give.md b/.changeset/some-papayas-give.md new file mode 100644 index 000000000..7deadee72 --- /dev/null +++ b/.changeset/some-papayas-give.md @@ -0,0 +1,5 @@ +--- +"@workflow/web-shared": patch +--- + +Add "use client" directly on all top-level client component exports for node16-nextjs compatibility diff --git a/packages/web-shared/src/trace-viewer/context.tsx b/packages/web-shared/src/trace-viewer/context.tsx index 2fc1c09be..90eb96501 100644 --- a/packages/web-shared/src/trace-viewer/context.tsx +++ b/packages/web-shared/src/trace-viewer/context.tsx @@ -1,3 +1,5 @@ +'use client'; + import type { Dispatch, MutableRefObject, ReactNode, Reducer } from 'react'; import { createContext, diff --git a/packages/web-shared/src/workflow-trace-view.tsx b/packages/web-shared/src/workflow-trace-view.tsx index a3188448b..4a58487dd 100644 --- a/packages/web-shared/src/workflow-trace-view.tsx +++ b/packages/web-shared/src/workflow-trace-view.tsx @@ -1,3 +1,5 @@ +'use client'; + import type { Event, Hook, Step, WorkflowRun } from '@workflow/world'; import { Loader2 } from 'lucide-react'; import { useEffect, useMemo, useState } from 'react'; From 091a24b4cef733c5352234f598aabb4f01de8425 Mon Sep 17 00:00:00 2001 From: Vercel Date: Wed, 5 Nov 2025 23:12:27 +0000 Subject: [PATCH 07/25] Fix: Stream operations during workflow argument serialization use a placeholder `'pending'` runId that doesn\'t correspond to a valid workflow run\. This causes failures on Vercel deployments and violates the new requirement that `runId` be a valid workflow identifier\. --- packages/core/src/serialization.ts | 81 +++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 08fbfcea5..24c0a12f5 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -326,12 +326,16 @@ export function getExternalReducers( throw new Error('ReadableStream is locked'); } + if (!runId) { + throw new Error( + 'ReadableStream cannot be passed as a workflow argument. Streams are only supported within workflow or step functions, not as initial arguments.' + ); + } + const name = global.crypto.randomUUID(); const type = getStreamType(value); - // Use a placeholder runId if not available (e.g., when initially starting workflow) - const streamRunId = runId || 'pending'; - const writable = new WorkflowServerWritableStream(streamRunId, name); + const writable = new WorkflowServerWritableStream(runId, name); if (type === 'bytes') { ops.push(value.pipeTo(writable)); } else { @@ -351,11 +355,16 @@ export function getExternalReducers( WritableStream: (value) => { if (!(value instanceof global.WritableStream)) return false; + + if (!runId) { + throw new Error( + 'WritableStream cannot be passed as a workflow argument. Streams are only supported within workflow or step functions, not as initial arguments.' + ); + } + const name = global.crypto.randomUUID(); - // Use a placeholder runId if not available - const streamRunId = runId || 'pending'; ops.push( - new WorkflowServerReadableStream(streamRunId, name) + new WorkflowServerReadableStream(runId, name) .pipeThrough( getDeserializeStream(getExternalRevivers(global, ops, runId)) ) @@ -444,12 +453,16 @@ function getStepReducers( let type = value[STREAM_TYPE_SYMBOL]; if (!name) { + if (!runId) { + throw new Error( + 'ReadableStream cannot be serialized without a valid runId' + ); + } + name = global.crypto.randomUUID(); type = getStreamType(value); - // Use a placeholder runId if not available - const streamRunId = runId || 'pending'; - const writable = new WorkflowServerWritableStream(streamRunId, name); + const writable = new WorkflowServerWritableStream(runId, name); if (type === 'bytes') { ops.push(value.pipeTo(writable)); } else { @@ -473,11 +486,15 @@ function getStepReducers( let name = value[STREAM_NAME_SYMBOL]; if (!name) { + if (!runId) { + throw new Error( + 'WritableStream cannot be serialized without a valid runId' + ); + } + name = global.crypto.randomUUID(); - // Use a placeholder runId if not available - const streamRunId = runId || 'pending'; ops.push( - new WorkflowServerReadableStream(streamRunId, name) + new WorkflowServerReadableStream(runId, name) .pipeThrough( getDeserializeStream(getStepRevivers(global, ops, runId)) ) @@ -607,10 +624,14 @@ export function getExternalRevivers( return response.body; } - // Use a placeholder runId if not available - const streamRunId = runId || 'pending'; + if (!runId) { + throw new Error( + 'ReadableStream cannot be revived without a valid runId' + ); + } + const readable = new WorkflowServerReadableStream( - streamRunId, + runId, value.name, value.startIndex ); @@ -625,14 +646,18 @@ export function getExternalRevivers( } }, WritableStream: (value) => { + if (!runId) { + throw new Error( + 'WritableStream cannot be revived without a valid runId' + ); + } + const serialize = getSerializeStream( getExternalReducers(global, ops, runId) ); - // Use a placeholder runId if not available - const streamRunId = runId || 'pending'; ops.push( serialize.readable.pipeTo( - new WorkflowServerWritableStream(streamRunId, value.name) + new WorkflowServerWritableStream(runId, value.name) ) ); return serialize.writable; @@ -759,10 +784,14 @@ function getStepRevivers( return response.body; } - // Use a placeholder runId if not available - const streamRunId = runId || 'pending'; + if (!runId) { + throw new Error( + 'ReadableStream cannot be revived without a valid runId' + ); + } + const readable = new WorkflowServerReadableStream( - streamRunId, + runId, value.name ); if (value.type === 'bytes') { @@ -776,12 +805,16 @@ function getStepRevivers( } }, WritableStream: (value) => { + if (!runId) { + throw new Error( + 'WritableStream cannot be revived without a valid runId' + ); + } + const serialize = getSerializeStream(getStepReducers(global, ops, runId)); - // Use a placeholder runId if not available - const streamRunId = runId || 'pending'; ops.push( serialize.readable.pipeTo( - new WorkflowServerWritableStream(streamRunId, value.name) + new WorkflowServerWritableStream(runId, value.name) ) ); return serialize.writable; From 684cb73d7cd6dd1d8ed080c972f66cd23d73a800 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 6 Nov 2025 13:12:27 -0800 Subject: [PATCH 08/25] runId only required for writing to streams --- packages/core/src/serialization.ts | 11 +++----- .../src/api/workflow-server-actions.ts | 4 +-- packages/world-local/src/streamer.ts | 6 ++--- packages/world-postgres/src/streamer.ts | 5 ++-- packages/world-vercel/src/streamer.ts | 27 ++++++++++++------- packages/world/src/interfaces.ts | 5 ++-- 6 files changed, 29 insertions(+), 29 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 24c0a12f5..2c15738ca 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -95,7 +95,7 @@ export class WorkflowServerReadableStream extends ReadableStream { let reader = this.#reader; if (!reader) { const world = getWorld(); - const stream = await world.readFromStream(runId, name, startIndex); + const stream = await world.readFromStream(name, startIndex); reader = this.#reader = stream.getReader(); } if (!reader) { @@ -126,10 +126,10 @@ export class WorkflowServerWritableStream extends WritableStream { const world = getWorld(); super({ async write(chunk) { - await world.writeToStream(runId, name, chunk); + await world.writeToStream(name, runId, chunk); }, async close() { - await world.closeStream(runId, name); + await world.closeStream(name); }, }); } @@ -790,10 +790,7 @@ function getStepRevivers( ); } - const readable = new WorkflowServerReadableStream( - runId, - value.name - ); + const readable = new WorkflowServerReadableStream(runId, value.name); if (value.type === 'bytes') { return readable; } else { diff --git a/packages/web-shared/src/api/workflow-server-actions.ts b/packages/web-shared/src/api/workflow-server-actions.ts index 6604d03d7..4d22987f5 100644 --- a/packages/web-shared/src/api/workflow-server-actions.ts +++ b/packages/web-shared/src/api/workflow-server-actions.ts @@ -444,9 +444,7 @@ export async function readStreamServerAction( ): Promise>> { try { const world = getWorldFromEnv(env); - // Extract runId from streamId (strm_{ULID}_user... -> wrun_{ULID}) - const runId = streamId.replace(/^strm_([^_]+)_user.*$/, 'wrun_$1'); - const stream = await world.readFromStream(runId, streamId, startIndex); + const stream = await world.readFromStream(streamId, startIndex); return createResponse(stream); } catch (error) { console.error('Failed to read stream:', error); diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index c69ca07b8..4ce0b0d13 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -49,8 +49,8 @@ export function createStreamer(basedir: string): Streamer { return { async writeToStream( - _runId: string, name: string, + _runId: string, chunk: string | Uint8Array ) { const chunkId = `strm_${monotonicUlid()}`; @@ -87,7 +87,7 @@ export function createStreamer(basedir: string): Streamer { }); }, - async closeStream(_runId: string, name: string) { + async closeStream(name: string) { const chunkId = `strm_${monotonicUlid()}`; const chunkPath = path.join( basedir, @@ -104,7 +104,7 @@ export function createStreamer(basedir: string): Streamer { streamEmitter.emit(`close:${name}` as const, { streamName: name }); }, - async readFromStream(_runId: string, name: string, startIndex = 0) { + async readFromStream(name: string, startIndex = 0) { const chunksDir = path.join(basedir, 'streams', 'chunks'); let removeListeners = () => {}; diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index 62bd89d72..efb040fcb 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -89,8 +89,8 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { return { async writeToStream( - _runId: string, name: string, + _runId: string, chunk: string | Uint8Array ) { const chunkId = genChunkId(); @@ -110,7 +110,7 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { ) ); }, - async closeStream(_runId: string, name: string): Promise { + async closeStream(name: string): Promise { const chunkId = genChunkId(); await drizzle.insert(streams).values({ chunkId, @@ -129,7 +129,6 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { ); }, async readFromStream( - _runId: string, name: string, startIndex?: number ): Promise> { diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index ab1d31159..5d94430a1 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -1,21 +1,28 @@ import type { Streamer } from '@workflow/world'; import { type APIConfig, getHttpConfig, type HttpConfig } from './utils.js'; -function getStreamUrl(runId: string, name: string, httpConfig: HttpConfig) { - return new URL( - `${httpConfig.baseUrl}/v1/run/${runId}/stream/${encodeURIComponent(name)}` - ); +function getStreamUrl( + name: string, + runId: string | undefined, + httpConfig: HttpConfig +) { + if (runId) { + return new URL( + `${httpConfig.baseUrl}/v1/run/${runId}/stream/${encodeURIComponent(name)}` + ); + } + return new URL(`${httpConfig.baseUrl}/v1/stream/${encodeURIComponent(name)}`); } export function createStreamer(config?: APIConfig): Streamer { return { async writeToStream( - runId: string, name: string, + runId: string, chunk: string | Uint8Array ) { const httpConfig = await getHttpConfig(config); - await fetch(getStreamUrl(runId, name, httpConfig), { + await fetch(getStreamUrl(name, runId, httpConfig), { method: 'PUT', body: chunk, headers: httpConfig.headers, @@ -23,18 +30,18 @@ export function createStreamer(config?: APIConfig): Streamer { }); }, - async closeStream(runId: string, name: string) { + async closeStream(name: string) { const httpConfig = await getHttpConfig(config); httpConfig.headers.set('X-Stream-Done', 'true'); - await fetch(getStreamUrl(runId, name, httpConfig), { + await fetch(getStreamUrl(name, undefined, httpConfig), { method: 'PUT', headers: httpConfig.headers, }); }, - async readFromStream(runId: string, name: string, startIndex?: number) { + async readFromStream(name: string, startIndex?: number) { const httpConfig = await getHttpConfig(config); - const url = getStreamUrl(runId, name, httpConfig); + const url = getStreamUrl(name, undefined, httpConfig); if (typeof startIndex === 'number') { url.searchParams.set('startIndex', String(startIndex)); } diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index 2d6ab8756..05dc5e863 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -33,13 +33,12 @@ import type { export interface Streamer { writeToStream( - runId: string, name: string, + runId: string, chunk: string | Uint8Array ): Promise; - closeStream(runId: string, name: string): Promise; + closeStream(name: string): Promise; readFromStream( - runId: string, name: string, startIndex?: number ): Promise>; From 7e6d081de25d0440e4cc6d15d4af980cb22b348c Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 6 Nov 2025 13:45:34 -0800 Subject: [PATCH 09/25] Fixes --- packages/core/src/serialization.ts | 74 +++++++++++------------------- 1 file changed, 27 insertions(+), 47 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 2c15738ca..875c4dc99 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -80,10 +80,7 @@ export function getDeserializeStream( export class WorkflowServerReadableStream extends ReadableStream { #reader?: ReadableStreamDefaultReader; - constructor(runId: string, name: string, startIndex?: number) { - if (typeof runId !== 'string' || runId.length === 0) { - throw new Error(`"runId" is required, got "${runId}"`); - } + constructor(name: string, startIndex?: number) { if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); } @@ -326,26 +323,23 @@ export function getExternalReducers( throw new Error('ReadableStream is locked'); } - if (!runId) { - throw new Error( - 'ReadableStream cannot be passed as a workflow argument. Streams are only supported within workflow or step functions, not as initial arguments.' - ); - } - const name = global.crypto.randomUUID(); const type = getStreamType(value); - const writable = new WorkflowServerWritableStream(runId, name); - if (type === 'bytes') { - ops.push(value.pipeTo(writable)); - } else { - ops.push( - value - .pipeThrough( - getSerializeStream(getExternalReducers(global, ops, runId)) - ) - .pipeTo(writable) - ); + // Only pipe stream data if we have a runId + if (runId) { + const writable = new WorkflowServerWritableStream(runId, name); + if (type === 'bytes') { + ops.push(value.pipeTo(writable)); + } else { + ops.push( + value + .pipeThrough( + getSerializeStream(getExternalReducers(global, ops, runId)) + ) + .pipeTo(writable) + ); + } } const s: SerializableSpecial['ReadableStream'] = { name }; @@ -356,20 +350,19 @@ export function getExternalReducers( WritableStream: (value) => { if (!(value instanceof global.WritableStream)) return false; - if (!runId) { - throw new Error( - 'WritableStream cannot be passed as a workflow argument. Streams are only supported within workflow or step functions, not as initial arguments.' + const name = global.crypto.randomUUID(); + + // Only pipe stream data if we have a runId + if (runId) { + ops.push( + new WorkflowServerReadableStream(name) + .pipeThrough( + getDeserializeStream(getExternalRevivers(global, ops, runId)) + ) + .pipeTo(value) ); } - const name = global.crypto.randomUUID(); - ops.push( - new WorkflowServerReadableStream(runId, name) - .pipeThrough( - getDeserializeStream(getExternalRevivers(global, ops, runId)) - ) - .pipeTo(value) - ); return { name }; }, }; @@ -494,7 +487,7 @@ function getStepReducers( name = global.crypto.randomUUID(); ops.push( - new WorkflowServerReadableStream(runId, name) + new WorkflowServerReadableStream(name) .pipeThrough( getDeserializeStream(getStepRevivers(global, ops, runId)) ) @@ -624,14 +617,7 @@ export function getExternalRevivers( return response.body; } - if (!runId) { - throw new Error( - 'ReadableStream cannot be revived without a valid runId' - ); - } - const readable = new WorkflowServerReadableStream( - runId, value.name, value.startIndex ); @@ -784,13 +770,7 @@ function getStepRevivers( return response.body; } - if (!runId) { - throw new Error( - 'ReadableStream cannot be revived without a valid runId' - ); - } - - const readable = new WorkflowServerReadableStream(runId, value.name); + const readable = new WorkflowServerReadableStream(value.name); if (value.type === 'bytes') { return readable; } else { From a67edb5aa6aebee80047f9e4afb4224c080aece3 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 6 Nov 2025 16:29:04 -0800 Subject: [PATCH 10/25] Fix unit tests --- packages/world-local/src/streamer.test.ts | 67 +++++++++++------------ 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index 4e53781de..ff0c7e0de 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -132,8 +132,8 @@ describe('streamer', () => { const runId = 'wrun_test123'; const streamName = 'test-stream'; - await streamer.writeToStream(runId, streamName, 'hello'); - await streamer.writeToStream(runId, streamName, ' world'); + await streamer.writeToStream(streamName, runId, 'hello'); + await streamer.writeToStream(streamName, runId, ' world'); // Verify chunks directory was created const chunksDir = path.join(testDir, 'streams', 'chunks'); @@ -151,8 +151,8 @@ describe('streamer', () => { const buffer1 = Buffer.from('chunk1'); const buffer2 = Buffer.from('chunk2'); - await streamer.writeToStream(runId, streamName, buffer1); - await streamer.writeToStream(runId, streamName, buffer2); + await streamer.writeToStream(streamName, runId, buffer1); + await streamer.writeToStream(streamName, runId, buffer2); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -167,7 +167,7 @@ describe('streamer', () => { const streamName = 'uint8-stream'; const uint8Array = new Uint8Array([1, 2, 3, 4]); - await streamer.writeToStream(runId, streamName, uint8Array); + await streamer.writeToStream(streamName, runId, uint8Array); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -180,9 +180,9 @@ describe('streamer', () => { const { testDir, streamer } = await setupStreamer(); const runId = 'wrun_test123'; - await streamer.writeToStream(runId, 'stream1', 'data1'); - await streamer.writeToStream(runId, 'stream2', 'data2'); - await streamer.writeToStream(runId, 'stream1', 'data3'); + await streamer.writeToStream('stream1', runId, 'data1'); + await streamer.writeToStream('stream2', runId, 'data2'); + await streamer.writeToStream('stream1', runId, 'data3'); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -198,10 +198,9 @@ describe('streamer', () => { describe('closeStream', () => { it('should close an empty stream', async () => { const { testDir, streamer } = await setupStreamer(); - const runId = 'wrun_test123'; const streamName = 'empty-stream'; - await streamer.closeStream(runId, streamName); + await streamer.closeStream(streamName); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -215,9 +214,9 @@ describe('streamer', () => { const runId = 'wrun_test123'; const streamName = 'existing-stream'; - await streamer.writeToStream(runId, streamName, 'chunk1'); - await streamer.writeToStream(runId, streamName, 'chunk2'); - await streamer.closeStream(runId, streamName); + await streamer.writeToStream(streamName, runId, 'chunk1'); + await streamer.writeToStream(streamName, runId, 'chunk2'); + await streamer.closeStream(streamName); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -238,9 +237,9 @@ describe('streamer', () => { // Add a small delay to ensure different ULID timestamps await new Promise((resolve) => setTimeout(resolve, 2)); await streamer.writeToStream(runId, streamName, chunk2); - await streamer.closeStream(runId, streamName); + await streamer.closeStream(streamName); - const stream = await streamer.readFromStream(runId, streamName); + const stream = await streamer.readFromStream(streamName); const reader = stream.getReader(); const chunks: Uint8Array[] = []; @@ -265,13 +264,13 @@ describe('streamer', () => { const binaryData1 = new Uint8Array([1, 2, 3]); const binaryData2 = new Uint8Array([4, 5, 6]); - await streamer.writeToStream(runId, streamName, binaryData1); + await streamer.writeToStream(streamName, runId, binaryData1); // Add delay to ensure different ULID timestamps await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(runId, streamName, binaryData2); - await streamer.closeStream(runId, streamName); + await streamer.writeToStream(streamName, runId, binaryData2); + await streamer.closeStream(streamName); - const stream = await streamer.readFromStream(runId, streamName); + const stream = await streamer.readFromStream(streamName); const reader = stream.getReader(); const chunks: Uint8Array[] = []; @@ -303,14 +302,14 @@ describe('streamer', () => { const streamName = 'ordered-stream'; // Write chunks with small delays to ensure different ULID timestamps - await streamer.writeToStream(runId, streamName, '1'); + await streamer.writeToStream(streamName, runId, '1'); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(runId, streamName, '2'); + await streamer.writeToStream(streamName, runId, '2'); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(runId, streamName, '3'); - await streamer.closeStream(runId, streamName); + await streamer.writeToStream(streamName, runId, '3'); + await streamer.closeStream(streamName); - const stream = await streamer.readFromStream(runId, streamName); + const stream = await streamer.readFromStream(streamName); const reader = stream.getReader(); const chunks: string[] = []; @@ -342,10 +341,10 @@ describe('streamer', () => { await streamer.writeToStream(runId, streamName, 'end'); // Close the stream - await streamer.closeStream(runId, streamName); + await streamer.closeStream(streamName); // Read complete stream - const completeStream = await streamer.readFromStream(runId, streamName); + const completeStream = await streamer.readFromStream(streamName); const completeReader = completeStream.getReader(); const completeChunks: Uint8Array[] = []; let completeDone = false; @@ -370,22 +369,22 @@ describe('streamer', () => { const streamName = `race-${iteration}`; // Write a few chunks to disk first - await streamer.writeToStream(runId, streamName, '0\n'); - await streamer.writeToStream(runId, streamName, '1\n'); + await streamer.writeToStream(streamName, runId, '0\n'); + await streamer.writeToStream(streamName, runId, '1\n'); // Start writing chunks in background IMMEDIATELY before reading const writeTask = (async () => { for (let i = 2; i < 10; i++) { - await streamer.writeToStream(runId, streamName, `${i}\n`); + await streamer.writeToStream(streamName, runId, `${i}\n`); // No delay - fire them off as fast as possible to hit the race window } - await streamer.closeStream(runId, streamName); + await streamer.closeStream(streamName); })(); // Start reading - this triggers start() which should set up listeners // BEFORE listing files to avoid missing chunks, and track delivered // chunk IDs to avoid duplicates - const stream = await streamer.readFromStream(runId, streamName); + const stream = await streamer.readFromStream(streamName); const reader = stream.getReader(); const chunks: string[] = []; @@ -441,7 +440,7 @@ describe('streamer', () => { } // Start reading - const stream = await streamer.readFromStream(runId, streamName); + const stream = await streamer.readFromStream(streamName); const reader = stream.getReader(); const chunks: string[] = []; @@ -458,10 +457,10 @@ describe('streamer', () => { // Immediately write more chunks (5-9) while disk reading might be in progress for (let i = 5; i < 10; i++) { - await streamer.writeToStream(runId, streamName, `${i}\n`); + await streamer.writeToStream(streamName, runId, `${i}\n`); } - await streamer.closeStream(runId, streamName); + await streamer.closeStream(streamName); await readPromise; // Verify chunks are in exact chronological order (not just all present) From 7935f4e6396504316a062be51e148950777eff84 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 6 Nov 2025 16:33:09 -0800 Subject: [PATCH 11/25] Fix unit tests --- packages/world-local/src/streamer.test.ts | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index ff0c7e0de..4e8000761 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -100,9 +100,8 @@ describe('streamer', () => { const chunk = deserializeChunk( await fs.readFile(`${testDir}/streams/chunks/${file}`) ); - const time = decodeTime( - String(file.split('-').at(-1)).split('.')[0] - ); + const stream_id = String(file.split('-').at(-1)).split('.')[0]; + const time = decodeTime(stream_id); const timeDiff = time - lastTime; lastTime = time; @@ -233,10 +232,10 @@ describe('streamer', () => { const chunk1 = 'hello '; const chunk2 = 'world'; - await streamer.writeToStream(runId, streamName, chunk1); + await streamer.writeToStream(streamName, runId, chunk1); // Add a small delay to ensure different ULID timestamps await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(runId, streamName, chunk2); + await streamer.writeToStream(streamName, runId, chunk2); await streamer.closeStream(streamName); const stream = await streamer.readFromStream(streamName); @@ -334,11 +333,11 @@ describe('streamer', () => { const streamName = 'integration-stream'; // Write chunks with proper timing - await streamer.writeToStream(runId, streamName, 'start '); + await streamer.writeToStream(streamName, runId, 'start '); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(runId, streamName, 'middle '); + await streamer.writeToStream(streamName, runId, 'middle '); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(runId, streamName, 'end'); + await streamer.writeToStream(streamName, runId, 'end'); // Close the stream await streamer.closeStream(streamName); @@ -435,7 +434,7 @@ describe('streamer', () => { // Write chunks 0-4 to disk for (let i = 0; i < 5; i++) { - await streamer.writeToStream(runId, streamName, `${i}\n`); + await streamer.writeToStream(streamName, runId, `${i}\n`); await new Promise((resolve) => setTimeout(resolve, 2)); } From 205c22f746ca64f150564045667b62d49ac537d4 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 7 Nov 2025 15:46:29 -0800 Subject: [PATCH 12/25] Might as well try --- packages/core/src/observability.ts | 4 ++-- packages/core/src/runtime/resume-hook.ts | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/core/src/observability.ts b/packages/core/src/observability.ts index feaed5f39..980202fdb 100644 --- a/packages/core/src/observability.ts +++ b/packages/core/src/observability.ts @@ -99,7 +99,7 @@ const hydrateEventData = ( eventData: Object.fromEntries( Object.entries(event.eventData).map(([key, value]) => [ key, - hydrateStepArguments(value as any, [], globalThis), + hydrateStepArguments(value as any, [], globalThis, streamPrintRevivers), ]) ), }; @@ -111,7 +111,7 @@ const hydrateHookMetadata = ( return { ...hook, metadata: hook.metadata - ? hydrateStepArguments(hook.metadata, [], globalThis) + ? hydrateStepArguments(hook.metadata, [], globalThis, streamPrintRevivers) : hook.metadata, }; }; diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index aa6401aa8..0b6253702 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -22,7 +22,13 @@ export async function getHookByToken(token: string): Promise { const world = getWorld(); const hook = await world.hooks.getByToken(token); if (typeof hook.metadata !== 'undefined') { - hook.metadata = hydrateStepArguments(hook.metadata as any, [], globalThis); + hook.metadata = hydrateStepArguments( + hook.metadata as any, + [], + globalThis, + {}, + hook.runId + ); } return hook; } @@ -76,7 +82,8 @@ export async function resumeHook( const dehydratedPayload = dehydrateStepReturnValue( payload, ops, - globalThis + globalThis, + hook.runId ); waitUntil(Promise.all(ops)); From 0fe0303d446872f6fc85a742f55c70526b6493c9 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 10 Nov 2025 15:52:09 -0800 Subject: [PATCH 13/25] Forward compat --- packages/world-vercel/src/runs.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/world-vercel/src/runs.ts b/packages/world-vercel/src/runs.ts index a286ee0a1..ef3318239 100644 --- a/packages/world-vercel/src/runs.ts +++ b/packages/world-vercel/src/runs.ts @@ -30,6 +30,8 @@ const WorkflowRunWithRefsSchema = WorkflowRunSchema.omit({ outputRef: z.any().optional(), input: z.array(z.any()).optional(), output: z.any().optional(), + blobStorageBytes: z.number().optional(), + streamStorageBytes: z.number().optional(), }); // Helper to filter run data based on resolveData setting From fd4449cc417c7244ecfe5ce942f4f2edb7929024 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 11 Nov 2025 11:55:03 -0800 Subject: [PATCH 14/25] test --- packages/core/src/serialization.ts | 48 ++++++++++++++++-------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 875c4dc99..8e10a100e 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -326,20 +326,22 @@ export function getExternalReducers( const name = global.crypto.randomUUID(); const type = getStreamType(value); - // Only pipe stream data if we have a runId - if (runId) { - const writable = new WorkflowServerWritableStream(runId, name); - if (type === 'bytes') { - ops.push(value.pipeTo(writable)); - } else { - ops.push( - value - .pipeThrough( - getSerializeStream(getExternalReducers(global, ops, runId)) - ) - .pipeTo(writable) - ); - } + if (!runId) { + throw new Error( + 'ReadableStream cannot be serialized without a valid runId' + ); + } + const writable = new WorkflowServerWritableStream(runId, name); + if (type === 'bytes') { + ops.push(value.pipeTo(writable)); + } else { + ops.push( + value + .pipeThrough( + getSerializeStream(getExternalReducers(global, ops, runId)) + ) + .pipeTo(writable) + ); } const s: SerializableSpecial['ReadableStream'] = { name }; @@ -352,16 +354,18 @@ export function getExternalReducers( const name = global.crypto.randomUUID(); - // Only pipe stream data if we have a runId - if (runId) { - ops.push( - new WorkflowServerReadableStream(name) - .pipeThrough( - getDeserializeStream(getExternalRevivers(global, ops, runId)) - ) - .pipeTo(value) + if (!runId) { + throw new Error( + 'WritableStream cannot be serialized without a valid runId' ); } + ops.push( + new WorkflowServerReadableStream(name) + .pipeThrough( + getDeserializeStream(getExternalRevivers(global, ops, runId)) + ) + .pipeTo(value) + ); return { name }; }, From de320832218e1e5e530035118ebf92c1a8af655e Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 11 Nov 2025 12:00:32 -0800 Subject: [PATCH 15/25] Revert "test" This reverts commit fd4449cc417c7244ecfe5ce942f4f2edb7929024. --- packages/core/src/serialization.ts | 48 ++++++++++++++---------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 8e10a100e..875c4dc99 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -326,22 +326,20 @@ export function getExternalReducers( const name = global.crypto.randomUUID(); const type = getStreamType(value); - if (!runId) { - throw new Error( - 'ReadableStream cannot be serialized without a valid runId' - ); - } - const writable = new WorkflowServerWritableStream(runId, name); - if (type === 'bytes') { - ops.push(value.pipeTo(writable)); - } else { - ops.push( - value - .pipeThrough( - getSerializeStream(getExternalReducers(global, ops, runId)) - ) - .pipeTo(writable) - ); + // Only pipe stream data if we have a runId + if (runId) { + const writable = new WorkflowServerWritableStream(runId, name); + if (type === 'bytes') { + ops.push(value.pipeTo(writable)); + } else { + ops.push( + value + .pipeThrough( + getSerializeStream(getExternalReducers(global, ops, runId)) + ) + .pipeTo(writable) + ); + } } const s: SerializableSpecial['ReadableStream'] = { name }; @@ -354,18 +352,16 @@ export function getExternalReducers( const name = global.crypto.randomUUID(); - if (!runId) { - throw new Error( - 'WritableStream cannot be serialized without a valid runId' + // Only pipe stream data if we have a runId + if (runId) { + ops.push( + new WorkflowServerReadableStream(name) + .pipeThrough( + getDeserializeStream(getExternalRevivers(global, ops, runId)) + ) + .pipeTo(value) ); } - ops.push( - new WorkflowServerReadableStream(name) - .pipeThrough( - getDeserializeStream(getExternalRevivers(global, ops, runId)) - ) - .pipeTo(value) - ); return { name }; }, From 58e4247ddd02c462e162aa7edb02133e1fcffca9 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 11 Nov 2025 12:07:31 -0800 Subject: [PATCH 16/25] Fix --- packages/core/src/serialization.test.ts | 7 ++++++- packages/core/src/serialization.ts | 23 +++++++++++++---------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 746467aac..ba95945f8 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -177,7 +177,12 @@ describe('workflow arguments', () => { it('should work with WritableStream', () => { const stream = new WritableStream(); - const serialized = dehydrateWorkflowArguments(stream, []); + const serialized = dehydrateWorkflowArguments( + stream, + [], + globalThis, + 'wrun_test' + ); const uuid = serialized[2]; expect(serialized).toMatchInlineSnapshot(` [ diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 875c4dc99..cf7751a91 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -327,6 +327,7 @@ export function getExternalReducers( const type = getStreamType(value); // Only pipe stream data if we have a runId + // TODO: Does this cause any issues? if (runId) { const writable = new WorkflowServerWritableStream(runId, name); if (type === 'bytes') { @@ -349,19 +350,21 @@ export function getExternalReducers( WritableStream: (value) => { if (!(value instanceof global.WritableStream)) return false; + if (!runId) { + throw new Error( + 'WritableStream cannot be serialized without a valid runId' + ); + } const name = global.crypto.randomUUID(); - // Only pipe stream data if we have a runId - if (runId) { - ops.push( - new WorkflowServerReadableStream(name) - .pipeThrough( - getDeserializeStream(getExternalRevivers(global, ops, runId)) - ) - .pipeTo(value) - ); - } + ops.push( + new WorkflowServerReadableStream(name) + .pipeThrough( + getDeserializeStream(getExternalRevivers(global, ops, runId)) + ) + .pipeTo(value) + ); return { name }; }, From 5b83accf6d5bcbdd8f12e2f807c3b9e4bb9d0b65 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 11 Nov 2025 18:56:49 -0800 Subject: [PATCH 17/25] Fix test --- packages/world-vercel/src/streamer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 5d94430a1..a9a44068a 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -8,7 +8,7 @@ function getStreamUrl( ) { if (runId) { return new URL( - `${httpConfig.baseUrl}/v1/run/${runId}/stream/${encodeURIComponent(name)}` + `${httpConfig.baseUrl}/v1/runs/${runId}/stream/${encodeURIComponent(name)}` ); } return new URL(`${httpConfig.baseUrl}/v1/stream/${encodeURIComponent(name)}`); From 598ee12b46abbad018fdb2a0836d0f1be946d818 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 11 Nov 2025 19:03:56 -0800 Subject: [PATCH 18/25] Revert "Fix" This reverts commit 58e4247ddd02c462e162aa7edb02133e1fcffca9. --- packages/core/src/serialization.test.ts | 7 +------ packages/core/src/serialization.ts | 23 +++++++++++------------ 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index ba95945f8..746467aac 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -177,12 +177,7 @@ describe('workflow arguments', () => { it('should work with WritableStream', () => { const stream = new WritableStream(); - const serialized = dehydrateWorkflowArguments( - stream, - [], - globalThis, - 'wrun_test' - ); + const serialized = dehydrateWorkflowArguments(stream, []); const uuid = serialized[2]; expect(serialized).toMatchInlineSnapshot(` [ diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index cf7751a91..9cd33a2da 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -350,21 +350,20 @@ export function getExternalReducers( WritableStream: (value) => { if (!(value instanceof global.WritableStream)) return false; - if (!runId) { - throw new Error( - 'WritableStream cannot be serialized without a valid runId' - ); - } const name = global.crypto.randomUUID(); - ops.push( - new WorkflowServerReadableStream(name) - .pipeThrough( - getDeserializeStream(getExternalRevivers(global, ops, runId)) - ) - .pipeTo(value) - ); + // Only pipe stream data if we have a runId + // TODO: Does this cause any issues? + if (runId) { + ops.push( + new WorkflowServerReadableStream(name) + .pipeThrough( + getDeserializeStream(getExternalRevivers(global, ops, runId)) + ) + .pipeTo(value) + ); + } return { name }; }, From 6f00b3decb820fd74b850e285bdde070eded423f Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 11 Nov 2025 19:08:13 -0800 Subject: [PATCH 19/25] Use test deployment --- packages/world-vercel/src/utils.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 994b4256a..93b09c705 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -110,7 +110,8 @@ export const getHttpUrl = ( config?: APIConfig ): { baseUrl: string; usingProxy: boolean } => { const projectConfig = config?.projectConfig; - const defaultUrl = 'https://vercel-workflow.com/api'; + const defaultUrl = + 'https://workflow-server-git-peter-emit-storage-facts.vercel.sh/api'; const defaultProxyUrl = 'https://api.vercel.com/v1/workflow'; const usingProxy = Boolean( config?.baseUrl || (projectConfig?.projectId && projectConfig?.teamId) From 0d1eef2a070b965608aedfb51b288b3aa872c928 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 12 Nov 2025 15:36:43 -0800 Subject: [PATCH 20/25] New test --- packages/world-vercel/src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 93b09c705..7cfb3be1a 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -111,7 +111,7 @@ export const getHttpUrl = ( ): { baseUrl: string; usingProxy: boolean } => { const projectConfig = config?.projectConfig; const defaultUrl = - 'https://workflow-server-git-peter-emit-storage-facts.vercel.sh/api'; + 'https://workflow-server-git-peter-gb-hour-counting.vercel.sh/api'; const defaultProxyUrl = 'https://api.vercel.com/v1/workflow'; const usingProxy = Boolean( config?.baseUrl || (projectConfig?.projectId && projectConfig?.teamId) From 00a455d65ac277e50748311b32c7f5e69c00247a Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 1 Dec 2025 11:45:32 -0800 Subject: [PATCH 21/25] Update url --- packages/world-vercel/src/utils.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 7cfb3be1a..29a732916 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -110,8 +110,7 @@ export const getHttpUrl = ( config?: APIConfig ): { baseUrl: string; usingProxy: boolean } => { const projectConfig = config?.projectConfig; - const defaultUrl = - 'https://workflow-server-git-peter-gb-hour-counting.vercel.sh/api'; + const defaultUrl = 'https://workflow-server-git-schniz-hono.vercel.sh/api'; const defaultProxyUrl = 'https://api.vercel.com/v1/workflow'; const usingProxy = Boolean( config?.baseUrl || (projectConfig?.projectId && projectConfig?.teamId) From 08e3995a264077cb7ec354b35bdce95e64c45170 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 1 Dec 2025 11:46:25 -0800 Subject: [PATCH 22/25] Reset --- packages/core/src/serialization.ts | 10 +++------- packages/web-shared/src/trace-viewer/context.tsx | 2 -- packages/web-shared/src/workflow-trace-view.tsx | 2 -- packages/world-local/src/streamer.test.ts | 6 ------ 4 files changed, 3 insertions(+), 17 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 1b8c25d25..251b3f177 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -337,7 +337,6 @@ function getCommonReducers(global: Record = globalThis) { * * @param global * @param ops - * @param runId * @returns */ export function getExternalReducers( @@ -959,8 +958,7 @@ export function hydrateWorkflowReturnValue( ops: Promise[], runId: string | Promise, global: Record = globalThis, - extraRevivers: Record any> = {}, - runId?: string + extraRevivers: Record any> = {} ) { const obj = devalue.unflatten(value, { ...getExternalRevivers(global, ops, runId), @@ -1009,8 +1007,7 @@ export function hydrateStepArguments( ops: Promise[], runId: string | Promise, global: Record = globalThis, - extraRevivers: Record any> = {}, - runId?: string + extraRevivers: Record any> = {} ) { const obj = devalue.unflatten(value, { ...getStepRevivers(global, ops, runId), @@ -1060,8 +1057,7 @@ export function dehydrateStepReturnValue( export function hydrateStepReturnValue( value: Parameters[0], global: Record = globalThis, - extraRevivers: Record any> = {}, - _runId?: string + extraRevivers: Record any> = {} ) { const obj = devalue.unflatten(value, { ...getWorkflowRevivers(global), diff --git a/packages/web-shared/src/trace-viewer/context.tsx b/packages/web-shared/src/trace-viewer/context.tsx index 90eb96501..2fc1c09be 100644 --- a/packages/web-shared/src/trace-viewer/context.tsx +++ b/packages/web-shared/src/trace-viewer/context.tsx @@ -1,5 +1,3 @@ -'use client'; - import type { Dispatch, MutableRefObject, ReactNode, Reducer } from 'react'; import { createContext, diff --git a/packages/web-shared/src/workflow-trace-view.tsx b/packages/web-shared/src/workflow-trace-view.tsx index b8b1faeb4..4cd42ea1c 100644 --- a/packages/web-shared/src/workflow-trace-view.tsx +++ b/packages/web-shared/src/workflow-trace-view.tsx @@ -1,5 +1,3 @@ -'use client'; - import type { Event, Hook, Step, WorkflowRun } from '@workflow/world'; import { useEffect, useMemo, useState } from 'react'; import { toast } from 'sonner'; diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index 5174c6cca..033ce36c7 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -130,7 +130,6 @@ describe('streamer', () => { describe('writeToStream', () => { it('should write string chunks to a stream', async () => { const { testDir, streamer } = await setupStreamer(); - const runId = 'wrun_test123'; const streamName = 'test-stream'; await streamer.writeToStream(streamName, TEST_RUN_ID, 'hello'); @@ -147,7 +146,6 @@ describe('streamer', () => { it('should write Buffer chunks to a stream', async () => { const { testDir, streamer } = await setupStreamer(); - const runId = 'wrun_test123'; const streamName = 'buffer-stream'; const buffer1 = Buffer.from('chunk1'); const buffer2 = Buffer.from('chunk2'); @@ -164,7 +162,6 @@ describe('streamer', () => { it('should write Uint8Array chunks to a stream', async () => { const { testDir, streamer } = await setupStreamer(); - const runId = 'wrun_test123'; const streamName = 'uint8-stream'; const uint8Array = new Uint8Array([1, 2, 3, 4]); @@ -179,7 +176,6 @@ describe('streamer', () => { it('should handle multiple streams independently', async () => { const { testDir, streamer } = await setupStreamer(); - const runId = 'wrun_test123'; await streamer.writeToStream('stream1', TEST_RUN_ID, 'data1'); await streamer.writeToStream('stream2', TEST_RUN_ID, 'data2'); @@ -412,7 +408,6 @@ describe('streamer', () => { // Run multiple iterations to increase probability of catching race conditions for (let iteration = 0; iteration < 10; iteration++) { const { streamer } = await setupStreamer(); - const runId = 'wrun_test123'; const streamName = `race-${iteration}`; // Write a few chunks to disk first @@ -477,7 +472,6 @@ describe('streamer', () => { it('should maintain chronological order when chunks arrive during disk reading', async () => { const { streamer } = await setupStreamer(); - const runId = 'wrun_test123'; const streamName = 'ordering-test'; // Write chunks 0-4 to disk From 137d1056faf7deafa887bf94e5563a68183d5f6e Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Mon, 1 Dec 2025 11:46:55 -0800 Subject: [PATCH 23/25] WIP --- .changeset/some-papayas-give.md | 5 -- docs/app/(home)/components/frameworks.tsx | 74 +++++++++++------------ 2 files changed, 37 insertions(+), 42 deletions(-) delete mode 100644 .changeset/some-papayas-give.md diff --git a/.changeset/some-papayas-give.md b/.changeset/some-papayas-give.md deleted file mode 100644 index 7deadee72..000000000 --- a/.changeset/some-papayas-give.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@workflow/web-shared": patch ---- - -Add "use client" directly on all top-level client component exports for node16-nextjs compatibility diff --git a/docs/app/(home)/components/frameworks.tsx b/docs/app/(home)/components/frameworks.tsx index 041871417..1d21d9755 100644 --- a/docs/app/(home)/components/frameworks.tsx +++ b/docs/app/(home)/components/frameworks.tsx @@ -1,12 +1,12 @@ -'use client'; +"use client"; -import { track } from '@vercel/analytics'; -import Link from 'next/link'; -import type { ComponentProps } from 'react'; -import { toast } from 'sonner'; -import { Badge } from '@/components/ui/badge'; +import { track } from "@vercel/analytics"; +import Link from "next/link"; +import type { ComponentProps } from "react"; +import { toast } from "sonner"; +import { Badge } from "@/components/ui/badge"; -export const Express = (props: ComponentProps<'svg'>) => ( +export const Express = (props: ComponentProps<"svg">) => ( ) => ( ); -export const AstroDark = (props: ComponentProps<'svg'>) => ( +export const AstroDark = (props: ComponentProps<"svg">) => ( ) => ( ); -export const AstroLight = (props: ComponentProps<'svg'>) => ( +export const AstroLight = (props: ComponentProps<"svg">) => ( ) => ( ); -export const AstroGray = (props: ComponentProps<'svg'>) => ( +export const AstroGray = (props: ComponentProps<"svg">) => ( ) => ( ); -export const TanStack = (props: ComponentProps<'svg'>) => ( +export const TanStack = (props: ComponentProps<"svg">) => ( ) => ( ); -export const TanStackGray = (props: ComponentProps<'svg'>) => ( +export const TanStackGray = (props: ComponentProps<"svg">) => ( ) => ( ); -export const Vite = (props: ComponentProps<'svg'>) => ( +export const Vite = (props: ComponentProps<"svg">) => ( ) => ( ); -export const Nitro = (props: ComponentProps<'svg'>) => ( +export const Nitro = (props: ComponentProps<"svg">) => ( ) => ( /> ) => ( ); -export const SvelteKit = (props: ComponentProps<'svg'>) => ( +export const SvelteKit = (props: ComponentProps<"svg">) => ( ) => ( ); -export const SvelteKitGray = (props: ComponentProps<'svg'>) => ( +export const SvelteKitGray = (props: ComponentProps<"svg">) => ( ) => ( ); -export const Nuxt = (props: ComponentProps<'svg'>) => ( +export const Nuxt = (props: ComponentProps<"svg">) => ( ) => ( ); -export const NuxtGray = (props: ComponentProps<'svg'>) => ( +export const NuxtGray = (props: ComponentProps<"svg">) => ( ) => ( ); -export const Hono = (props: ComponentProps<'svg'>) => ( +export const Hono = (props: ComponentProps<"svg">) => ( Hono ) => ( ); -export const HonoGray = (props: ComponentProps<'svg'>) => ( +export const HonoGray = (props: ComponentProps<"svg">) => ( Hono ) => ( ); -export const Bun = (props: ComponentProps<'svg'>) => ( +export const Bun = (props: ComponentProps<"svg">) => ( ) => ( id="Top" d="M35.12,5.53A16.41,16.41,0,0,1,29.49,18c-.28.25-.06.73.3.59,3.37-1.31,7.92-5.23,6-13.14C35.71,5,35.12,5.12,35.12,5.53Zm2.27,0A16.24,16.24,0,0,1,39,19c-.12.35.31.65.55.36C41.74,16.56,43.65,11,37.93,5,37.64,4.74,37.19,5.14,37.39,5.49Zm2.76-.17A16.42,16.42,0,0,1,47,17.12a.33.33,0,0,0,.65.11c.92-3.49.4-9.44-7.17-12.53C40.08,4.54,39.82,5.08,40.15,5.32ZM21.69,15.76a16.94,16.94,0,0,0,10.47-9c.18-.36.75-.22.66.18-1.73,8-7.52,9.67-11.12,9.45C21.32,16.4,21.33,15.87,21.69,15.76Z" fill="#ccbea7" - style={{ fillRule: 'evenodd' }} + style={{ fillRule: "evenodd" }} /> ) => ( ); -export const BunGray = (props: ComponentProps<'svg'>) => ( +export const BunGray = (props: ComponentProps<"svg">) => ( ) => ( id="Top" d="M35.12,5.53A16.41,16.41,0,0,1,29.49,18c-.28.25-.06.73.3.59,3.37-1.31,7.92-5.23,6-13.14C35.71,5,35.12,5.12,35.12,5.53Zm2.27,0A16.24,16.24,0,0,1,39,19c-.12.35.31.65.55.36C41.74,16.56,43.65,11,37.93,5,37.64,4.74,37.19,5.14,37.39,5.49Zm2.76-.17A16.42,16.42,0,0,1,47,17.12a.33.33,0,0,0,.65.11c.92-3.49.4-9.44-7.17-12.53C40.08,4.54,39.82,5.08,40.15,5.32ZM21.69,15.76a16.94,16.94,0,0,0,10.47-9c.18-.36.75-.22.66.18-1.73,8-7.52,9.67-11.12,9.45C21.32,16.4,21.33,15.87,21.69,15.76Z" fill="var(--color-background)" - style={{ fillRule: 'evenodd' }} + style={{ fillRule: "evenodd" }} /> ) => ( ); -export const Nest = (props: ComponentProps<'svg'>) => ( +export const Nest = (props: ComponentProps<"svg">) => ( ) => ( ); -export const NestGray = (props: ComponentProps<'svg'>) => ( +export const NestGray = (props: ComponentProps<"svg">) => ( ) => ( ); -export const Next = (props: ComponentProps<'svg'>) => ( +export const Next = (props: ComponentProps<"svg">) => ( Next.js @@ -681,8 +681,8 @@ export const Next = (props: ComponentProps<'svg'>) => ( export const Frameworks = () => { const handleRequest = (framework: string) => { - track('Framework requested', { framework: framework.toLowerCase() }); - toast.success('Request received', { + track("Framework requested", { framework: framework.toLowerCase() }); + toast.success("Request received", { description: `Thanks for expressing interest in ${framework}. We will be adding support for it soon.`, }); }; @@ -742,21 +742,21 @@ export const Frameworks = () => {
handleRequest('NestJS')} + onClick={() => handleRequest("NestJS")} >
handleRequest('TanStack')} + onClick={() => handleRequest("TanStack")} >
handleRequest('Astro')} + onClick={() => handleRequest("Astro")} > From 2faeb94ac5085ed2b91b929aedc4a70db2b34122 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 5 Dec 2025 10:33:34 -0800 Subject: [PATCH 24/25] New url --- packages/world-vercel/src/utils.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 29a732916..798e47635 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -110,7 +110,8 @@ export const getHttpUrl = ( config?: APIConfig ): { baseUrl: string; usingProxy: boolean } => { const projectConfig = config?.projectConfig; - const defaultUrl = 'https://workflow-server-git-schniz-hono.vercel.sh/api'; + const defaultUrl = + 'https://workflow-server-git-peter-fix-hono-metrics.vercel.sh/api'; const defaultProxyUrl = 'https://api.vercel.com/v1/workflow'; const usingProxy = Boolean( config?.baseUrl || (projectConfig?.projectId && projectConfig?.teamId) From 6393bc83534b0d10943bb8fd54868eb575faadd3 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 10 Dec 2025 15:59:54 -0800 Subject: [PATCH 25/25] Update --- packages/core/e2e/e2e.test.ts | 104 +---------------------------- packages/world-vercel/src/utils.ts | 2 +- 2 files changed, 2 insertions(+), 104 deletions(-) diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 3af3487f2..a0f3271f0 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1,11 +1,7 @@ import { withResolvers } from '@workflow/utils'; import { assert, afterAll, describe, expect, test } from 'vitest'; import { dehydrateWorkflowArguments } from '../src/serialization'; -import { - cliInspectJson, - getProtectionBypassHeaders, - isLocalDeployment, -} from './utils'; +import { getProtectionBypassHeaders, isLocalDeployment } from './utils'; import fs from 'fs'; import path from 'path'; @@ -145,22 +141,6 @@ describe('e2e', () => { const run = await triggerWorkflow(workflow, [123]); const returnValue = await getWorkflowReturnValue(run.runId); expect(returnValue).toBe(133); - - const { json } = await cliInspectJson(`runs ${run.runId} --withData`); - expect(json).toMatchObject({ - runId: run.runId, - workflowName: expect.any(String), - status: 'completed', - input: [123], - output: 133, - }); - // In local vs. vercel backends, the workflow name is different, so we check for either, - // since this test runs against both. Also different workbenches have different directory structures. - expect(json.workflowName).toBeOneOf([ - `workflow//example/${workflow.workflowFile}//${workflow.workflowFn}`, - `workflow//${workflow.workflowFile}//${workflow.workflowFn}`, - `workflow//src/${workflow.workflowFile}//${workflow.workflowFn}`, - ]); }); const isNext = process.env.APP_NAME?.includes('nextjs'); @@ -591,33 +571,6 @@ describe('e2e', () => { // The step should have succeeded on attempt 3 expect(returnValue).toEqual({ finalAttempt: 3 }); - - // Also verify the run data shows the correct output - const { json: runData } = await cliInspectJson( - `runs ${run.runId} --withData` - ); - expect(runData).toMatchObject({ - runId: run.runId, - status: 'completed', - output: { finalAttempt: 3 }, - }); - - // Query steps separately to verify the step data - const { json: stepsData } = await cliInspectJson( - `steps --runId ${run.runId} --withData` - ); - expect(stepsData).toBeDefined(); - expect(Array.isArray(stepsData)).toBe(true); - expect(stepsData.length).toBeGreaterThan(0); - - // Find the stepThatRetriesAndSucceeds step - const retryStep = stepsData.find((s: any) => - s.stepName.includes('stepThatRetriesAndSucceeds') - ); - expect(retryStep).toBeDefined(); - expect(retryStep.status).toBe('completed'); - expect(retryStep.attempt).toBe(3); - expect(retryStep.output).toEqual([3]); }); test('retryableAndFatalErrorWorkflow', { timeout: 60_000 }, async () => { @@ -709,14 +662,6 @@ describe('e2e', () => { // Stack trace should NOT contain 'evalmachine' anywhere expect(returnValue.cause.stack).not.toContain('evalmachine'); - - // Verify the run failed with structured error - const { json: runData } = await cliInspectJson(`runs ${run.runId}`); - expect(runData.status).toBe('failed'); - expect(runData.error).toBeTypeOf('object'); - expect(runData.error.message).toContain( - 'Error from imported helper module' - ); } ); @@ -789,13 +734,6 @@ describe('e2e', () => { customData, hookCleanupTestData: 'workflow_completed', }); - - // Verify both runs completed successfully - const { json: run1Data } = await cliInspectJson(`runs ${run1.runId}`); - expect(run1Data.status).toBe('completed'); - - const { json: run2Data } = await cliInspectJson(`runs ${run2.runId}`); - expect(run2Data.status).toBe('completed'); } ); @@ -810,24 +748,6 @@ describe('e2e', () => { // doubleNumber(10) = 20, then multiply by 2 = 40 expect(returnValue).toBe(40); - - // Verify the run completed successfully - const { json: runData } = await cliInspectJson( - `runs ${run.runId} --withData` - ); - expect(runData.status).toBe('completed'); - expect(runData.output).toBe(40); - - // Verify that exactly 2 steps were executed: - // 1. stepWithStepFunctionArg(doubleNumber) - // (doubleNumber(10) is run inside the stepWithStepFunctionArg step) - const { json: eventsData } = await cliInspectJson( - `events --run ${run.runId} --json` - ); - const stepCompletedEvents = eventsData.filter( - (event) => event.eventType === 'step_completed' - ); - expect(stepCompletedEvents).toHaveLength(1); } ); @@ -846,13 +766,6 @@ describe('e2e', () => { // - 7 * 3 = 21, prefixed with "Result: " = "Result: 21" // - stepThatCallsStepFn wraps it: "Wrapped: Result: 21" expect(returnValue).toBe('Wrapped: Result: 21'); - - // Verify the run completed successfully - const { json: runData } = await cliInspectJson( - `runs ${run.runId} --withData` - ); - expect(runData.status).toBe('completed'); - expect(runData.output).toBe('Wrapped: Result: 21'); } ); @@ -897,21 +810,6 @@ describe('e2e', () => { childResult: inputValue * 2, // doubleValue(42) = 84 originalValue: inputValue, }); - - // Verify both runs completed successfully via CLI - const { json: parentRunData } = await cliInspectJson( - `runs ${run.runId} --withData` - ); - expect(parentRunData.status).toBe('completed'); - - const { json: childRunData } = await cliInspectJson( - `runs ${returnValue.childRunId} --withData` - ); - expect(childRunData.status).toBe('completed'); - expect(childRunData.output).toEqual({ - childResult: inputValue * 2, - originalValue: inputValue, - }); } ); }); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 798e47635..0ffd559a6 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -111,7 +111,7 @@ export const getHttpUrl = ( ): { baseUrl: string; usingProxy: boolean } => { const projectConfig = config?.projectConfig; const defaultUrl = - 'https://workflow-server-git-peter-fix-hono-metrics.vercel.sh/api'; + 'https://workflow-server-git-peter-redis-switchover.vercel.sh/api'; const defaultProxyUrl = 'https://api.vercel.com/v1/workflow'; const usingProxy = Boolean( config?.baseUrl || (projectConfig?.projectId && projectConfig?.teamId)