Skip to content

Commit 5a4deac

Browse files
committed
Use a readable stream directly
1 parent 3cc9ba6 commit 5a4deac

File tree

8 files changed

+78
-62
lines changed

8 files changed

+78
-62
lines changed

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ApiClient } from "../apiClient/index.js";
2-
import { ensureAsyncIterable } from "../streams/asyncIterableStream.js";
2+
import { ensureAsyncIterable, ensureReadableStream } from "../streams/asyncIterableStream.js";
33
import { taskContext } from "../task-context-api.js";
44
import { StreamInstance } from "./streamInstance.js";
55
import {
@@ -30,7 +30,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
3030
options?: RealtimeStreamOperationOptions
3131
): RealtimeStreamInstance<T> {
3232
// Normalize ReadableStream to AsyncIterable
33-
const asyncIterableSource = ensureAsyncIterable(source);
33+
const readableStreamSource = ensureReadableStream(source);
3434

3535
const runId = getRunIdForOptions(options);
3636

@@ -52,7 +52,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5252
baseUrl: this.baseUrl,
5353
runId,
5454
key,
55-
source: asyncIterableSource,
55+
source: readableStreamSource,
5656
signal: combinedSignal,
5757
requestOptions: options?.requestOptions,
5858
target: options?.target,

packages/core/src/v3/realtimeStreams/streamInstance.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export type StreamInstanceOptions<T> = {
1010
baseUrl: string;
1111
runId: string;
1212
key: string;
13-
source: AsyncIterable<T>;
13+
source: ReadableStream<T>;
1414
signal?: AbortSignal;
1515
requestOptions?: AnyZodFetchOptions;
1616
target?: "self" | "parent" | "root" | string;

packages/core/src/v3/realtimeStreams/streamsWriterV1.ts

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export type StreamsWriterV1Options<T> = {
88
baseUrl: string;
99
runId: string;
1010
key: string;
11-
source: AsyncIterable<T>;
11+
source: ReadableStream<T>;
1212
headers?: Record<string, string>;
1313
signal?: AbortSignal;
1414
version?: string;
@@ -43,7 +43,7 @@ export class StreamsWriterV1<T> implements StreamsWriter {
4343
private streamComplete = false;
4444

4545
constructor(private options: StreamsWriterV1Options<T>) {
46-
const [serverStream, consumerStream] = this.createTeeStreams();
46+
const [serverStream, consumerStream] = this.options.source.tee();
4747
this.serverStream = serverStream;
4848
this.consumerStream = consumerStream;
4949
this.maxRetries = options.maxRetries ?? 10;
@@ -60,23 +60,6 @@ export class StreamsWriterV1<T> implements StreamsWriter {
6060
return randomBytes(4).toString("hex");
6161
}
6262

63-
private createTeeStreams() {
64-
const readableSource = new ReadableStream<T>({
65-
start: async (controller) => {
66-
try {
67-
for await (const value of this.options.source) {
68-
controller.enqueue(value);
69-
}
70-
controller.close();
71-
} catch (error) {
72-
controller.error(error);
73-
}
74-
},
75-
});
76-
77-
return readableSource.tee();
78-
}
79-
8063
private startBuffering(): void {
8164
this.streamReader = this.serverStream.getReader();
8265

@@ -131,10 +114,10 @@ export class StreamsWriterV1<T> implements StreamsWriter {
131114
if (this.isRetryableError(error)) {
132115
if (this.retryCount < this.maxRetries) {
133116
this.retryCount++;
134-
117+
135118
// Clean up the current request to avoid socket leaks
136119
req.destroy();
137-
120+
138121
const delayMs = this.calculateBackoffDelay();
139122

140123
await this.delay(delayMs);
@@ -157,10 +140,10 @@ export class StreamsWriterV1<T> implements StreamsWriter {
157140
// Timeout is retryable
158141
if (this.retryCount < this.maxRetries) {
159142
this.retryCount++;
160-
143+
161144
// Clean up the current request to avoid socket leaks
162145
req.destroy();
163-
146+
164147
const delayMs = this.calculateBackoffDelay();
165148

166149
await this.delay(delayMs);
@@ -182,13 +165,13 @@ export class StreamsWriterV1<T> implements StreamsWriter {
182165
if (res.statusCode && this.isRetryableStatusCode(res.statusCode)) {
183166
if (this.retryCount < this.maxRetries) {
184167
this.retryCount++;
185-
168+
186169
// Drain and destroy the response and request to avoid socket leaks
187170
// We need to consume the response before destroying it
188171
res.resume(); // Start draining the response
189172
res.destroy(); // Destroy the response to free the socket
190173
req.destroy(); // Destroy the request as well
191-
174+
192175
const delayMs = this.calculateBackoffDelay();
193176

194177
await this.delay(delayMs);
@@ -391,7 +374,7 @@ export class StreamsWriterV1<T> implements StreamsWriter {
391374
if (this.isRetryableError(error) && attempt < maxHeadRetries) {
392375
// Clean up the current request to avoid socket leaks
393376
req.destroy();
394-
377+
395378
await this.delay(1000 * (attempt + 1)); // Simple linear backoff
396379
const result = await this.queryServerLastChunkIndex(attempt + 1);
397380
resolve(result);
@@ -424,7 +407,7 @@ export class StreamsWriterV1<T> implements StreamsWriter {
424407
res.resume();
425408
res.destroy();
426409
req.destroy();
427-
410+
428411
await this.delay(1000 * (attempt + 1));
429412
const result = await this.queryServerLastChunkIndex(attempt + 1);
430413
resolve(result);

packages/core/src/v3/realtimeStreams/streamsWriterV2.ts

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export type StreamsWriterV2Options<T = any> = {
55
basin: string;
66
stream: string;
77
accessToken: string;
8-
source: AsyncIterable<T>;
8+
source: ReadableStream<T>;
99
signal?: AbortSignal;
1010
flushIntervalMs?: number; // Used as lingerDuration for BatchTransform (default 200ms)
1111
maxRetries?: number; // Not used with appendSession, kept for compatibility
@@ -81,7 +81,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
8181
});
8282
}
8383

84-
const [serverStream, consumerStream] = this.createTeeStreams();
84+
const [serverStream, consumerStream] = this.options.source.tee();
8585
this.serverStream = serverStream;
8686
this.consumerStream = consumerStream;
8787

@@ -111,31 +111,6 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
111111
this.log("[S2MetadataStream] Abort cleanup complete");
112112
}
113113

114-
private createTeeStreams() {
115-
const readableSource = new ReadableStream<T>({
116-
start: async (controller) => {
117-
try {
118-
let count = 0;
119-
120-
for await (const value of this.options.source) {
121-
if (this.aborted) {
122-
controller.error(new Error("Stream aborted"));
123-
return;
124-
}
125-
controller.enqueue(value);
126-
count++;
127-
}
128-
129-
controller.close();
130-
} catch (error) {
131-
controller.error(error);
132-
}
133-
},
134-
});
135-
136-
return readableSource.tee();
137-
}
138-
139114
private async initializeServerStream(): Promise<void> {
140115
try {
141116
if (this.aborted) {
@@ -171,7 +146,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
171146
)
172147
.pipeThrough(
173148
new BatchTransform({
174-
lingerDuration: this.flushIntervalMs,
149+
lingerDurationMillis: this.flushIntervalMs,
175150
})
176151
)
177152
.pipeTo(session.writable);

packages/core/src/v3/streams/asyncIterableStream.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,26 @@ export function ensureAsyncIterable<T>(
133133
},
134134
};
135135
}
136+
137+
export function ensureReadableStream<T>(
138+
input: AsyncIterable<T> | ReadableStream<T>
139+
): ReadableStream<T> {
140+
if ("getReader" in input) {
141+
return input as ReadableStream<T>;
142+
}
143+
144+
return new ReadableStream<T>({
145+
async start(controller) {
146+
const iterator = input[Symbol.asyncIterator]();
147+
148+
while (true) {
149+
const { done, value } = await iterator.next();
150+
if (done) {
151+
break;
152+
}
153+
controller.enqueue(value);
154+
}
155+
controller.close();
156+
},
157+
});
158+
}

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

references/realtime-streams/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
"react": "19.1.0",
1919
"react-dom": "19.1.0",
2020
"shiki": "^3.13.0",
21-
"streamdown": "^1.4.0"
21+
"streamdown": "^1.4.0",
22+
"zod": "3.25.76"
2223
},
2324
"devDependencies": {
2425
"@tailwindcss/postcss": "^4",

references/realtime-streams/src/trigger/ai-chat.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { aiStream } from "@/app/streams";
22
import { openai } from "@ai-sdk/openai";
33
import { logger, streams, task } from "@trigger.dev/sdk";
4-
import { convertToModelMessages, streamText, UIMessage } from "ai";
4+
import { convertToModelMessages, readUIMessageStream, streamText, tool, UIMessage } from "ai";
5+
import { z } from "zod/v4";
56

67
export type AIChatPayload = {
78
messages: UIMessage[];
@@ -19,13 +20,43 @@ export const aiChatTask = task({
1920
model: openai("gpt-4o"),
2021
system: "You are a helpful assistant.",
2122
messages: convertToModelMessages(payload.messages),
23+
tools: {
24+
getCommonUseCases: tool({
25+
description: "Get common use cases",
26+
inputSchema: z.object({
27+
useCase: z.string().describe("The use case to get common use cases for"),
28+
}),
29+
execute: async ({ useCase }) => {
30+
return {
31+
useCase,
32+
commonUseCases: [
33+
"Streaming data to a client",
34+
"Streaming data to a server",
35+
"Streaming data to a database",
36+
"Streaming data to a file",
37+
"Streaming data to a socket",
38+
"Streaming data to a queue",
39+
"Streaming data to a message broker",
40+
"Streaming data to a message queue",
41+
"Streaming data to a message broker",
42+
],
43+
};
44+
},
45+
}),
46+
},
2247
});
2348

2449
// Get the UI message stream
2550
const uiMessageStream = result.toUIMessageStream();
2651

2752
// Append the stream to metadata
28-
const { waitUntilComplete } = aiStream.pipe(uiMessageStream);
53+
const { waitUntilComplete, stream } = aiStream.pipe(uiMessageStream);
54+
55+
for await (const uiMessage of readUIMessageStream({
56+
stream: stream,
57+
})) {
58+
logger.log("Current message state", { uiMessage });
59+
}
2960

3061
// Wait for the stream to complete
3162
await waitUntilComplete();

0 commit comments

Comments
 (0)