Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/chat-agent-hardening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound.
8 changes: 7 additions & 1 deletion packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { nanoid } from "nanoid";
import { z } from "zod";
import { VERSION } from "../../version.js";
import { generateJWT } from "../jwt.js";
Expand Down Expand Up @@ -1276,12 +1277,17 @@ export class ApiClient {
part: TBody,
requestOptions?: ZodFetchOptions
) {
// Generated once per logical append, outside zodfetch, so its internal
// retries reuse the same part id and the server-side dedupe collapses a
// retried POST whose first attempt actually committed. Full-length nanoid
// (~126 bits) to match the browser transport's randomUUID entropy.
const partId = nanoid();
return zodfetch(
AppendToStreamResponseBody,
`${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`,
{
method: "POST",
headers: this.#getHeaders(false),
headers: { ...this.#getHeaders(false), "X-Part-Id": partId },
body: part,
},
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
Expand Down
15 changes: 14 additions & 1 deletion packages/core/src/v3/apiClient/runStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,20 @@ export class SSEStreamSubscription implements StreamSubscription {
this.retryCount = 0; // reset on success
armStall();

// Dedup window for record ids. Bounded with FIFO eviction so a
// long-lived `watch: true` subscription (one connection across many
// turns) doesn't grow this set without bound. The window only needs
// to cover the overlap a reconnect/replay can re-deliver, so a few
// thousand ids is ample.
const SEEN_IDS_CAP = 5000;
const seenIds = new Set<string>();
const rememberSeen = (id: string) => {
seenIds.add(id);
if (seenIds.size > SEEN_IDS_CAP) {
const oldest = seenIds.values().next().value;
if (oldest !== undefined) seenIds.delete(oldest);
}
};

const stream = response.body
.pipeThrough(new TextDecoderStream())
Expand Down Expand Up @@ -426,7 +439,7 @@ export class SSEStreamSubscription implements StreamSubscription {
| undefined;
if (parsedBody?.id) {
if (seenIds.has(parsedBody.id)) continue;
seenIds.add(parsedBody.id);
rememberSeen(parsedBody.id);
}
chunkController.enqueue({
id: record.seq_num.toString(),
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/sessionStreams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class SessionStreamsAPI implements SessionStreamManager {
public on(
sessionId: string,
io: SessionChannelIO,
handler: (data: unknown) => void | Promise<void>
handler: (data: unknown) => void | boolean | Promise<void>
): { off: () => void } {
return this.#getManager().on(sessionId, io, handler);
}
Expand Down
93 changes: 62 additions & 31 deletions packages/core/src/v3/sessionStreams/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js";
import { SessionChannelIO, SessionStreamManager } from "./types.js";
import { controlSubtype } from "./wireProtocol.js";

type SessionStreamHandler = (data: unknown) => void | Promise<void>;
// A handler that synchronously returns `true` CONSUMES the record: it is
// not buffered for a later `once()` and the committed-consume cursor
// advances past it. Anything else (void, a Promise) leaves the record
// available to other consumers. See `SessionStreamManager.on` in types.ts.
type SessionStreamHandler = (data: unknown) => void | boolean | Promise<void>;

type OnceWaiter = {
resolve: (result: InputStreamOnceResult<unknown>) => void;
Expand Down Expand Up @@ -113,30 +117,41 @@ export class StandardSessionStreamManager implements SessionStreamManager {
this.explicitlyDisconnected.delete(key);
this.#ensureTailConnected(sessionId, io);

// Selective drain: offer each buffered record to the new handler and
// remove ONLY the ones it consumed (returned `true` — e.g. the
// messages facade for message-kind records). Consumed records advance
// the committed-consume cursor, so a worker using `messagesInput.on()`
// for user-message delivery persists a `.in` cursor that matches what
// the handler processed. Records the handler did not consume (other
// kinds) STAY buffered for a future `once()` or a different handler —
// a blind drain here either swallowed them (delivered to a handler
// that filtered them out, then deleted) or re-delivered already-
// processed messages into every newly attached per-turn handler,
// duplicating turns.
const buffered = this.buffer.get(key);
if (buffered && buffered.length > 0) {
for (const data of buffered) {
this.#invokeHandler(handler, data);
}
// Advance the committed-consume cursor to the highest seq drained
// into the new handler. `on()`-drain removes the records from the
// buffer, so they're no longer available to a future `once()` —
// from the manager's perspective they've been consumed. Without
// this, a worker that uses `messagesInput.on()` for user-message
// delivery (pendingMessages mode) would persist a `.in` cursor
// that lags behind the records the handler already processed, and
// the next boot would re-deliver them.
const seqList = this.bufferSeqNums.get(key);
if (seqList) {
for (const s of seqList) {
const seqList = this.bufferSeqNums.get(key) ?? [];
const keptRecords: unknown[] = [];
// Kept in lock-step with `keptRecords` — drifting lengths would map
// seq_nums to the wrong records on subsequent shifts.
const keptSeqNums: Array<number | undefined> = [];
for (let i = 0; i < buffered.length; i++) {
const consumed = this.#invokeHandler(handler, buffered[i]);
if (consumed) {
const s = seqList[i];
if (s !== undefined) this.#advanceLastDispatched(key, s);
} else {
keptRecords.push(buffered[i]);
keptSeqNums.push(seqList[i]);
}
}
this.buffer.delete(key);
// Keep `bufferSeqNums` in lock-step with `buffer` — without this,
// the parallel array desyncs and the next `#dispatch` that buffers
// a record would shift a stale seqNum into `lastDispatchedSeqNum`.
this.bufferSeqNums.delete(key);
if (keptRecords.length > 0) {
this.buffer.set(key, keptRecords);
this.bufferSeqNums.set(key, keptSeqNums);
} else {
this.buffer.delete(key);
this.bufferSeqNums.delete(key);
}
}

return {
Expand Down Expand Up @@ -509,13 +524,21 @@ export class StandardSessionStreamManager implements SessionStreamManager {
return;
}

// Persistent handlers (e.g. `stopInput.on(...)`) get a copy of the chunk,
// but they don't "consume" it — handlers usually filter by `kind` and
// ignore chunks they don't care about. Buffer the chunk regardless so a
// subsequent `once()` (e.g. `messagesInput.waitWithIdleTimeout` in
// chat.agent's preload) can still pick up the same chunk that arrived
// before its waiter was registered.
this.#invokeHandlers(key, data);
// Persistent handlers get a copy of the chunk. A handler that
// synchronously returns `true` CONSUMES it (e.g. the messages facade
// for message-kind records): the record must not also be buffered, or
// the next `on()` attach / `once()` would deliver it a second time —
// in chat.agent's turn loop that duplicated user messages into a
// second turn. Records no handler consumed (e.g. a message arriving
// while only the stop facade is attached during preload) are buffered
// so a subsequent `once()` can still pick them up.
const consumed = this.#invokeHandlers(key, data);
if (consumed) {
if (seqNum !== undefined) {
this.#advanceLastDispatched(key, seqNum);
}
return;
}

let buffered = this.buffer.get(key);
if (!buffered) {
Expand All @@ -535,17 +558,24 @@ export class StandardSessionStreamManager implements SessionStreamManager {
bufferedSeqs.push(seqNum);
}

#invokeHandlers(key: string, data: unknown): void {
/** Returns true when any handler consumed the record. All handlers are invoked regardless. */
#invokeHandlers(key: string, data: unknown): boolean {
const handlers = this.handlers.get(key);
if (!handlers) return;
if (!handlers) return false;
let consumed = false;
for (const handler of handlers) {
this.#invokeHandler(handler, data);
if (this.#invokeHandler(handler, data)) {
consumed = true;
}
}
return consumed;
}

#invokeHandler(handler: SessionStreamHandler, data: unknown): void {
/** Returns true when the handler synchronously consumed the record (returned `true`). */
#invokeHandler(handler: SessionStreamHandler, data: unknown): boolean {
try {
const result = handler(data);
if (result === true) return true;
if (result && typeof result === "object" && "catch" in result) {
(result as Promise<void>).catch((error) => {
if (this.debug) {
Expand All @@ -558,6 +588,7 @@ export class StandardSessionStreamManager implements SessionStreamManager {
console.error("[SessionStreamManager] Handler error:", error);
}
}
return false;
}

#removeOnceWaiter(key: string, waiter: OnceWaiter): void {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/sessionStreams/noopManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export class NoopSessionStreamManager implements SessionStreamManager {
on(
_sessionId: string,
_io: SessionChannelIO,
_handler: (data: unknown) => void | Promise<void>
_handler: (data: unknown) => void | boolean | Promise<void>
): { off: () => void } {
return { off: () => {} };
}
Expand Down
13 changes: 11 additions & 2 deletions packages/core/src/v3/sessionStreams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@ export type SessionChannelIO = "out" | "in";
* `.on` / `.once` / `.peek` / `.wait` / `.waitWithIdleTimeout`.
*/
export interface SessionStreamManager {
/** Register a handler that fires every time data arrives on the given channel. */
/**
* Register a handler that fires every time data arrives on the given channel.
*
* A handler that synchronously returns `true` CONSUMES the record: it is
* not buffered for a later `once()` and the committed-consume cursor
* advances past it. Any other return value (including a Promise) leaves
* the record available to other consumers. Kind-filtering facades return
* `true` for the kinds they own so the same record is never delivered
* twice — once to the handler and again via a buffer drain.
*/
on(
sessionId: string,
io: SessionChannelIO,
handler: (data: unknown) => void | Promise<void>
handler: (data: unknown) => void | boolean | Promise<void>
): { off: () => void };

/** Wait for the next record on the given channel (buffered or live). */
Expand Down
Loading
Loading