Skip to content

Commit 076c32b

Browse files
committed
Use a single run with iterative waitpoint token completions
1 parent 01dc75a commit 076c32b

File tree

8 files changed

+652
-44
lines changed

8 files changed

+652
-44
lines changed

packages/trigger-sdk/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
},
8484
"peerDependencies": {
8585
"zod": "^3.0.0 || ^4.0.0",
86-
"ai": "^4.2.0 || ^5.0.0 || ^6.0.0"
86+
"ai": "^5.0.0 || ^6.0.0"
8787
},
8888
"peerDependenciesMeta": {
8989
"ai": {

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@ import {
44
Task,
55
type inferSchemaIn,
66
type PipeStreamOptions,
7+
type TaskIdentifier,
78
type TaskOptions,
89
type TaskSchema,
910
type TaskWithSchema,
1011
} from "@trigger.dev/core/v3";
1112
import type { UIMessage } from "ai";
1213
import { dynamicTool, jsonSchema, JSONSchema7, Schema, Tool, ToolCallOptions, zodSchema } from "ai";
14+
import { auth } from "./auth.js";
1315
import { metadata } from "./metadata.js";
1416
import { streams } from "./streams.js";
1517
import { createTask } from "./shared.js";
18+
import { wait } from "./wait.js";
1619

1720
const METADATA_KEY = "tool.execute.options";
1821

@@ -122,6 +125,29 @@ export const ai = {
122125
currentToolOptions: getToolOptionsFromMetadata,
123126
};
124127

128+
/**
129+
* Creates a public access token for a chat task.
130+
*
131+
* This is a convenience helper that creates a multi-use trigger public token
132+
* scoped to the given task. Use it in a server action to provide the frontend
133+
* `TriggerChatTransport` with an `accessToken`.
134+
*
135+
* @example
136+
* ```ts
137+
* // actions.ts
138+
* "use server";
139+
* import { createChatAccessToken } from "@trigger.dev/sdk/ai";
140+
* import type { chat } from "@/trigger/chat";
141+
*
142+
* export const getChatToken = () => createChatAccessToken<typeof chat>("ai-chat");
143+
* ```
144+
*/
145+
export async function createChatAccessToken<TTask extends AnyTask>(
146+
taskId: TaskIdentifier<TTask>
147+
): Promise<string> {
148+
return auth.createTriggerPublicToken(taskId as string, { multipleUse: true });
149+
}
150+
125151
// ---------------------------------------------------------------------------
126152
// Chat transport helpers — backend side
127153
// ---------------------------------------------------------------------------
@@ -161,6 +187,14 @@ export type ChatTaskPayload<TMessage extends UIMessage = UIMessage> = {
161187
metadata?: unknown;
162188
};
163189

190+
/**
191+
* Tracks how many times `pipeChat` has been called in the current `chatTask` run.
192+
* Used to prevent double-piping when a user both calls `pipeChat()` manually
193+
* and returns a streamable from their `run` function.
194+
* @internal
195+
*/
196+
let _chatPipeCount = 0;
197+
164198
/**
165199
* Options for `pipeChat`.
166200
*/
@@ -248,6 +282,7 @@ export async function pipeChat(
248282
source: UIMessageStreamable | AsyncIterable<unknown> | ReadableStream<unknown>,
249283
options?: PipeChatOptions
250284
): Promise<void> {
285+
_chatPipeCount++;
251286
const streamKey = options?.streamKey ?? CHAT_STREAM_KEY;
252287

253288
let stream: AsyncIterable<unknown> | ReadableStream<unknown>;
@@ -284,6 +319,11 @@ export async function pipeChat(
284319
* **Auto-piping:** If the `run` function returns a value with `.toUIMessageStream()`
285320
* (like a `StreamTextResult`), the stream is automatically piped to the frontend.
286321
* For complex flows, use `pipeChat()` manually from anywhere in your code.
322+
*
323+
* **Single-run mode:** By default, the task runs a waitpoint loop so that the
324+
* entire conversation lives inside one run. After each AI response, the task
325+
* emits a control chunk and pauses via `wait.forToken`. The frontend transport
326+
* resumes the same run by completing the token with the next set of messages.
287327
*/
288328
export type ChatTaskOptions<TIdentifier extends string> = Omit<
289329
TaskOptions<TIdentifier, ChatTaskPayload, unknown>,
@@ -299,6 +339,23 @@ export type ChatTaskOptions<TIdentifier extends string> = Omit<
299339
* the stream is automatically piped to the frontend.
300340
*/
301341
run: (payload: ChatTaskPayload) => Promise<unknown>;
342+
343+
/**
344+
* Maximum number of conversational turns (message round-trips) a single run
345+
* will handle before ending. After this many turns the run completes
346+
* normally and the next message will start a fresh run.
347+
*
348+
* @default 100
349+
*/
350+
maxTurns?: number;
351+
352+
/**
353+
* How long to wait for the next message before timing out and ending the run.
354+
* Accepts any duration string recognised by `wait.createToken` (e.g. `"1h"`, `"30m"`).
355+
*
356+
* @default "1h"
357+
*/
358+
turnTimeout?: string;
302359
};
303360

304361
/**
@@ -342,19 +399,49 @@ export type ChatTaskOptions<TIdentifier extends string> = Omit<
342399
export function chatTask<TIdentifier extends string>(
343400
options: ChatTaskOptions<TIdentifier>
344401
): Task<TIdentifier, ChatTaskPayload, unknown> {
345-
const { run: userRun, ...restOptions } = options;
402+
const { run: userRun, maxTurns = 100, turnTimeout = "1h", ...restOptions } = options;
346403

347404
return createTask<TIdentifier, ChatTaskPayload, unknown>({
348405
...restOptions,
349406
run: async (payload: ChatTaskPayload) => {
350-
const result = await userRun(payload);
407+
let currentPayload = payload;
408+
409+
for (let turn = 0; turn < maxTurns; turn++) {
410+
_chatPipeCount = 0;
411+
412+
const result = await userRun(currentPayload);
413+
414+
// Auto-pipe if the run function returned a StreamTextResult or similar,
415+
// but only if pipeChat() wasn't already called manually during this turn
416+
if (_chatPipeCount === 0 && isUIMessageStreamable(result)) {
417+
await pipeChat(result);
418+
}
419+
420+
// Create a waitpoint token and emit a control chunk so the frontend
421+
// knows to resume this run instead of triggering a new one.
422+
const token = await wait.createToken({ timeout: turnTimeout });
423+
424+
const { waitUntilComplete } = streams.writer(CHAT_STREAM_KEY, {
425+
execute: ({ write }) => {
426+
write({
427+
type: "__trigger_waitpoint_ready",
428+
tokenId: token.id,
429+
publicAccessToken: token.publicAccessToken,
430+
});
431+
},
432+
});
433+
await waitUntilComplete();
351434

352-
// Auto-pipe if the run function returned a StreamTextResult or similar
353-
if (isUIMessageStreamable(result)) {
354-
await pipeChat(result);
355-
}
435+
// Pause until the frontend completes the token with the next message
436+
const next = await wait.forToken<ChatTaskPayload>(token);
437+
438+
if (!next.ok) {
439+
// Timed out waiting for the next message — end the conversation
440+
return;
441+
}
356442

357-
return result;
443+
currentPayload = next.output;
444+
}
358445
},
359446
});
360447
}

0 commit comments

Comments
 (0)