Skip to content

Commit 0415ccb

Browse files
committed
feat(streams): add inputStream.waitWithWarmup(), warm timeout config in sidebar, preload payload option
1 parent 49d41c4 commit 0415ccb

File tree

6 files changed

+103
-51
lines changed

6 files changed

+103
-51
lines changed

packages/core/src/v3/realtimeStreams/types.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,14 @@ export type RealtimeDefinedInputStream<TData> = {
193193
* Uses a waitpoint token internally. Can only be called inside a task.run().
194194
*/
195195
wait: (options?: InputStreamWaitOptions) => ManualWaitpointPromise<TData>;
196+
/**
197+
* Wait for data with a warm phase before suspending.
198+
*
199+
* Keeps the task warm (active, using compute) for `warmTimeoutInSeconds`,
200+
* then suspends via `.wait()` if no data arrives. If data arrives during
201+
* the warm phase the task responds instantly without suspending.
202+
*/
203+
waitWithWarmup: (options: InputStreamWaitWithWarmupOptions) => Promise<{ ok: true; output: TData } | { ok: false; error?: any }>;
196204
/**
197205
* Send data to this input stream on a specific run.
198206
* This is used from outside the task (e.g., from your backend or another task).
@@ -249,6 +257,15 @@ export type InputStreamWaitOptions = {
249257
spanName?: string;
250258
};
251259

260+
export type InputStreamWaitWithWarmupOptions = {
261+
/** Seconds to keep the task warm before suspending. */
262+
warmTimeoutInSeconds: number;
263+
/** Maximum time to wait after suspending (duration string, e.g. "1h"). */
264+
timeout?: string;
265+
/** Override the default span name for the outer operation. */
266+
spanName?: string;
267+
};
268+
252269
export type InferInputStreamType<T> = T extends RealtimeDefinedInputStream<infer TData>
253270
? TData
254271
: unknown;

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 19 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ export type ChatTaskWirePayload<TMessage extends UIMessage = UIMessage, TMetadat
320320
continuation?: boolean;
321321
/** The run ID of the previous run (only set when `continuation` is true). */
322322
previousRunId?: string;
323+
/** Override warm timeout for this run (seconds). Set by transport.preload(). */
324+
warmTimeoutInSeconds?: number;
323325
};
324326

325327
/**
@@ -941,40 +943,26 @@ function chatTask<
941943

942944
// Wait for the first real message — use preload-specific timeouts if configured
943945
const effectivePreloadWarmTimeout =
944-
(metadata.get(WARM_TIMEOUT_METADATA_KEY) as number | undefined)
946+
payload.warmTimeoutInSeconds
945947
?? preloadWarmTimeoutInSeconds
946948
?? warmTimeoutInSeconds;
947949

948-
let firstMessage: ChatTaskWirePayload | undefined;
950+
const effectivePreloadTimeout =
951+
(metadata.get(TURN_TIMEOUT_METADATA_KEY) as string | undefined)
952+
?? preloadTimeout
953+
?? turnTimeout;
949954

950-
if (effectivePreloadWarmTimeout > 0) {
951-
const warm = await messagesInput.once({
952-
timeoutMs: effectivePreloadWarmTimeout * 1000,
953-
spanName: "preload wait (warm)",
954-
});
955+
const preloadResult = await messagesInput.waitWithWarmup({
956+
warmTimeoutInSeconds: effectivePreloadWarmTimeout,
957+
timeout: effectivePreloadTimeout,
958+
spanName: "waiting for first message",
959+
});
955960

956-
if (warm.ok) {
957-
firstMessage = warm.output;
958-
}
961+
if (!preloadResult.ok) {
962+
return; // Timed out waiting for first message — end run
959963
}
960964

961-
if (!firstMessage) {
962-
const effectivePreloadTimeout =
963-
(metadata.get(TURN_TIMEOUT_METADATA_KEY) as string | undefined)
964-
?? preloadTimeout
965-
?? turnTimeout;
966-
967-
const suspended = await messagesInput.wait({
968-
timeout: effectivePreloadTimeout,
969-
spanName: "preload wait (suspended)",
970-
});
971-
972-
if (!suspended.ok) {
973-
return; // Timed out waiting for first message — end run
974-
}
975-
976-
firstMessage = suspended.output;
977-
}
965+
let firstMessage = preloadResult.output;
978966

979967
currentWirePayload = firstMessage;
980968
}
@@ -1335,35 +1323,19 @@ function chatTask<
13351323
return "continue";
13361324
}
13371325

1338-
// Phase 1: Keep the run warm for quick response to the next message.
1339-
// The run stays active (using compute) during this window.
1326+
// Wait for the next message — stay warm briefly, then suspend
13401327
const effectiveWarmTimeout =
13411328
(metadata.get(WARM_TIMEOUT_METADATA_KEY) as number | undefined) ?? warmTimeoutInSeconds;
1342-
1343-
if (effectiveWarmTimeout > 0) {
1344-
const warm = await messagesInput.once({
1345-
timeoutMs: effectiveWarmTimeout * 1000,
1346-
spanName: "waiting (warm)",
1347-
});
1348-
1349-
if (warm.ok) {
1350-
// Message arrived while warm — respond instantly
1351-
currentWirePayload = warm.output;
1352-
return "continue";
1353-
}
1354-
}
1355-
1356-
// Phase 2: Suspend the task (frees compute) until the next message arrives
13571329
const effectiveTurnTimeout =
13581330
(metadata.get(TURN_TIMEOUT_METADATA_KEY) as string | undefined) ?? turnTimeout;
13591331

1360-
const next = await messagesInput.wait({
1332+
const next = await messagesInput.waitWithWarmup({
1333+
warmTimeoutInSeconds: effectiveWarmTimeout,
13611334
timeout: effectiveTurnTimeout,
1362-
spanName: "waiting (suspended)",
1335+
spanName: "waiting for next message",
13631336
});
13641337

13651338
if (!next.ok) {
1366-
// Timed out waiting for the next message — end the conversation
13671339
return "exit";
13681340
}
13691341

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
464464
*
465465
* No-op if a session already exists for this chatId.
466466
*/
467-
async preload(chatId: string): Promise<void> {
467+
async preload(chatId: string, options?: { warmTimeoutInSeconds?: number }): Promise<void> {
468468
// Don't preload if session already exists
469469
if (this.sessions.get(chatId)?.runId) return;
470470

@@ -473,6 +473,9 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
473473
chatId,
474474
trigger: "preload" as const,
475475
metadata: this.defaultMetadata,
476+
...(options?.warmTimeoutInSeconds !== undefined
477+
? { warmTimeoutInSeconds: options.warmTimeoutInSeconds }
478+
: {}),
476479
};
477480

478481
const currentToken = await this.resolveAccessToken();

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
InputStreamOncePromise,
2626
type InputStreamOnceResult,
2727
type InputStreamWaitOptions,
28+
type InputStreamWaitWithWarmupOptions,
2829
type SendInputStreamOptions,
2930
type InferInputStreamType,
3031
type StreamWriteResult,
@@ -767,6 +768,7 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
767768
const result = await tracer.startActiveSpan(
768769
options?.spanName ?? `inputStream.wait()`,
769770
async (span) => {
771+
770772
// 1. Block the run on the waitpoint
771773
const waitResponse = await apiClient.waitForWaitpointToken({
772774
runFriendlyId: ctx.run.id,
@@ -786,7 +788,7 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
786788
// 3. Suspend the task
787789
const waitResult = await runtime.waitUntil(response.waitpointId);
788790

789-
// 3. Parse the output
791+
// 4. Parse the output
790792
const data =
791793
waitResult.output !== undefined
792794
? await conditionallyImportAndParsePacket(
@@ -840,6 +842,45 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
840842
}
841843
});
842844
},
845+
async waitWithWarmup(options) {
846+
const self = this;
847+
const spanName = options.spanName ?? `inputStream.waitWithWarmup()`;
848+
849+
return tracer.startActiveSpan(
850+
spanName,
851+
async (span) => {
852+
// Warm phase: keep compute alive
853+
if (options.warmTimeoutInSeconds > 0) {
854+
const warm = await inputStreams.once(opts.id, {
855+
timeoutMs: options.warmTimeoutInSeconds * 1000,
856+
});
857+
if (warm.ok) {
858+
span.setAttribute("wait.resolved", "warm");
859+
return { ok: true as const, output: warm.output as TData };
860+
}
861+
}
862+
863+
// Cold phase: suspend via .wait() — creates a child span
864+
span.setAttribute("wait.resolved", "suspended");
865+
const waitResult = await self.wait({
866+
timeout: options.timeout,
867+
spanName: "suspended",
868+
});
869+
870+
return waitResult;
871+
},
872+
{
873+
attributes: {
874+
[SemanticInternalAttributes.STYLE_ICON]: "streams",
875+
streamId: opts.id,
876+
...accessoryAttributes({
877+
items: [{ text: opts.id, variant: "normal" }],
878+
style: "codepath",
879+
}),
880+
},
881+
}
882+
);
883+
},
843884
async send(runId, data, options) {
844885
return tracer.startActiveSpan(
845886
`inputStream.send()`,

references/ai-chat/src/components/chat-app.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ export function ChatApp({
5252
// Model for new chats (before first message is sent)
5353
const [newChatModel, setNewChatModel] = useState(DEFAULT_MODEL);
5454
const [preloadEnabled, setPreloadEnabled] = useState(true);
55+
const [warmTimeoutInSeconds, setWarmTimeoutInSeconds] = useState(60);
5556

5657
const handleSessionChange = useCallback(
5758
(chatId: string, session: SessionInfo | null) => {
@@ -101,7 +102,7 @@ export function ChatApp({
101102
setNewChatModel(DEFAULT_MODEL);
102103
if (preloadEnabled) {
103104
// Eagerly start the run — onPreload fires immediately for initialization
104-
transport.preload(id);
105+
transport.preload(id, { warmTimeoutInSeconds });
105106
}
106107
}
107108

@@ -154,6 +155,8 @@ export function ChatApp({
154155
onDeleteChat={handleDeleteChat}
155156
preloadEnabled={preloadEnabled}
156157
onPreloadChange={setPreloadEnabled}
158+
warmTimeoutInSeconds={warmTimeoutInSeconds}
159+
onWarmTimeoutChange={setWarmTimeoutInSeconds}
157160
/>
158161
<div className="flex-1">
159162
{activeChatId ? (

references/ai-chat/src/components/chat-sidebar.tsx

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ type ChatSidebarProps = {
2626
onDeleteChat: (id: string) => void;
2727
preloadEnabled: boolean;
2828
onPreloadChange: (enabled: boolean) => void;
29+
warmTimeoutInSeconds: number;
30+
onWarmTimeoutChange: (seconds: number) => void;
2931
};
3032

3133
export function ChatSidebar({
@@ -36,6 +38,8 @@ export function ChatSidebar({
3638
onDeleteChat,
3739
preloadEnabled,
3840
onPreloadChange,
41+
warmTimeoutInSeconds,
42+
onWarmTimeoutChange,
3943
}: ChatSidebarProps) {
4044
const sorted = [...chats].sort((a, b) => b.updatedAt - a.updatedAt);
4145

@@ -82,7 +86,7 @@ export function ChatSidebar({
8286
))}
8387
</div>
8488

85-
<div className="shrink-0 border-t border-gray-200 px-3 py-2.5">
89+
<div className="shrink-0 border-t border-gray-200 px-3 py-2.5 space-y-2">
8690
<label className="flex items-center gap-2 text-xs text-gray-500 cursor-pointer select-none">
8791
<input
8892
type="checkbox"
@@ -92,6 +96,18 @@ export function ChatSidebar({
9296
/>
9397
Preload new chats
9498
</label>
99+
<div className="flex items-center gap-2 text-xs text-gray-500">
100+
<span className="shrink-0">Warm timeout</span>
101+
<input
102+
type="number"
103+
min={0}
104+
step={5}
105+
value={warmTimeoutInSeconds}
106+
onChange={(e) => onWarmTimeoutChange(Number(e.target.value))}
107+
className="w-16 rounded border border-gray-300 px-1.5 py-0.5 text-xs text-gray-600 outline-none focus:border-blue-500"
108+
/>
109+
<span>s</span>
110+
</div>
95111
</div>
96112
</div>
97113
);

0 commit comments

Comments
 (0)