Skip to content

Commit 4cadc19

Browse files
committed
feat(webapp,core): Session channel waitpoints — server side
Give Session channels run-engine waitpoint semantics so a task can suspend while idle on a session channel and resume when an external client sends a record — parallel to what streams.input offers run-scoped streams. Webapp - POST /api/v1/runs/:runFriendlyId/session-streams/wait — creates a manual waitpoint attached to {sessionId, io} and race-checks the S2 stream starting at lastSeqNum so pre-arrived data fires it immediately. Mirrors the existing input-stream waitpoint route. - sessionStreamWaitpointCache.server.ts — Redis set keyed on {sessionFriendlyId, io}, drained atomically on each append so concurrent multi-tab waiters all wake together. - realtime.v1.sessions.$session.$io.append now drains pending waitpoints after every record lands and completes each with the appended body. - S2RealtimeStreams.readSessionStreamRecords — session-channel parallel of readRecords, feeds the race-check path. Core - CreateSessionStreamWaitpoint request/response schemas alongside the existing Session CRUD schemas. Server API contract only — the client ApiClient + SDK wrapper ship on the AI-chat branch.
1 parent 95f3c00 commit 4cadc19

7 files changed

Lines changed: 373 additions & 2 deletions

File tree

.changeset/session-primitive.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
"@trigger.dev/core": patch
33
---
44

5-
Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc.
5+
Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc. Ships the `CreateSessionStreamWaitpoint` request/response schemas alongside the main Session CRUD.

.server-changes/session-primitive.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ type: feature
44
---
55

66
Add `Session` primitive — a durable, typed, bidirectional I/O primitive that outlives a single run, intended for agent/chat use cases. Ships the Postgres schema (`Session` table), control-plane CRUD routes (`POST/GET/PATCH /api/v1/sessions`, `POST /api/v1/sessions/:session/close` — polymorphic on friendlyId or externalId), `sessions` JWT scope, ClickHouse `sessions_v1` table, and `SessionsReplicationService` (logical replication from Postgres `Session` → ClickHouse `sessions_v1`). Run-scoped realtime streams (`streams.pipe`/`streams.input`) are unchanged and do **not** create Session rows.
7+
8+
Adds `POST /api/v1/runs/:runFriendlyId/session-streams/wait` (session-stream waitpoint creation) and wires `POST /realtime/v1/sessions/:session/:io/append` to fire any pending waitpoints on the channel. Gives `session.in` run-engine waitpoint semantics matching run-scoped input streams: a task can suspend while idle on a session channel and resume when an external client sends a record. Redis-backed pending-waitpoint set (`ssw:{sessionFriendlyId}:{io}`) is drained atomically on each append so multiple concurrent waiters (e.g. multi-tab chat) all resume together.
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import {
3+
CreateSessionStreamWaitpointRequestBody,
4+
type CreateSessionStreamWaitpointResponseBody,
5+
} from "@trigger.dev/core/v3";
6+
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
7+
import { z } from "zod";
8+
import { $replica } from "~/db.server";
9+
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
10+
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
11+
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
12+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
13+
import {
14+
addSessionStreamWaitpoint,
15+
removeSessionStreamWaitpoint,
16+
} from "~/services/sessionStreamWaitpointCache.server";
17+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
18+
import { parseDelay } from "~/utils/delays";
19+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
20+
import { engine } from "~/v3/runEngine.server";
21+
import { ServiceValidationError } from "~/v3/services/baseService.server";
22+
23+
const ParamsSchema = z.object({
24+
runFriendlyId: z.string(),
25+
});
26+
27+
const { action, loader } = createActionApiRoute(
28+
{
29+
params: ParamsSchema,
30+
body: CreateSessionStreamWaitpointRequestBody,
31+
maxContentLength: 1024 * 10, // 10KB
32+
method: "POST",
33+
},
34+
async ({ authentication, body, params }) => {
35+
try {
36+
const run = await $replica.taskRun.findFirst({
37+
where: {
38+
friendlyId: params.runFriendlyId,
39+
runtimeEnvironmentId: authentication.environment.id,
40+
},
41+
select: {
42+
id: true,
43+
friendlyId: true,
44+
realtimeStreamsVersion: true,
45+
},
46+
});
47+
48+
if (!run) {
49+
return json({ error: "Run not found" }, { status: 404 });
50+
}
51+
52+
const session = await resolveSessionByIdOrExternalId(
53+
$replica,
54+
authentication.environment.id,
55+
body.session
56+
);
57+
58+
if (!session) {
59+
return json({ error: "Session not found" }, { status: 404 });
60+
}
61+
62+
const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
63+
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
64+
: undefined;
65+
66+
const timeout = await parseDelay(body.timeout);
67+
68+
const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags;
69+
70+
if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) {
71+
throw new ServiceValidationError(
72+
`Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.`
73+
);
74+
}
75+
76+
if (bodyTags && bodyTags.length > 0) {
77+
for (const tag of bodyTags) {
78+
await createWaitpointTag({
79+
tag,
80+
environmentId: authentication.environment.id,
81+
projectId: authentication.environment.projectId,
82+
});
83+
}
84+
}
85+
86+
// Step 1: Create the waitpoint.
87+
const result = await engine.createManualWaitpoint({
88+
environmentId: authentication.environment.id,
89+
projectId: authentication.environment.projectId,
90+
idempotencyKey: body.idempotencyKey,
91+
idempotencyKeyExpiresAt,
92+
timeout,
93+
tags: bodyTags,
94+
});
95+
96+
// Step 2: Register the waitpoint on the session channel so the next
97+
// append fires it. Keyed by (sessionFriendlyId, io) — both runs on a
98+
// multi-tab session wake on the same record.
99+
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
100+
await addSessionStreamWaitpoint(
101+
session.friendlyId,
102+
body.io,
103+
result.waitpoint.id,
104+
ttlMs && ttlMs > 0 ? ttlMs : undefined
105+
);
106+
107+
// Step 3: Race-check. If a record landed on the channel before this
108+
// .wait() call, complete the waitpoint synchronously with that data
109+
// and remove the pending registration.
110+
if (!result.isCached) {
111+
try {
112+
const realtimeStream = getRealtimeStreamInstance(
113+
authentication.environment,
114+
run.realtimeStreamsVersion
115+
);
116+
117+
if (realtimeStream instanceof S2RealtimeStreams) {
118+
const records = await realtimeStream.readSessionStreamRecords(
119+
session.friendlyId,
120+
body.io,
121+
body.lastSeqNum
122+
);
123+
124+
if (records.length > 0) {
125+
const record = records[0]!;
126+
127+
await engine.completeWaitpoint({
128+
id: result.waitpoint.id,
129+
output: {
130+
value: record.data,
131+
type: "application/json",
132+
isError: false,
133+
},
134+
});
135+
136+
await removeSessionStreamWaitpoint(
137+
session.friendlyId,
138+
body.io,
139+
result.waitpoint.id
140+
);
141+
}
142+
}
143+
} catch {
144+
// Non-fatal: pending registration stays in Redis; the next append
145+
// will complete the waitpoint via the append handler path.
146+
}
147+
}
148+
149+
return json<CreateSessionStreamWaitpointResponseBody>({
150+
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
151+
isCached: result.isCached,
152+
});
153+
} catch (error) {
154+
if (error instanceof ServiceValidationError) {
155+
return json({ error: error.message }, { status: 422 });
156+
} else if (error instanceof Error) {
157+
return json({ error: error.message }, { status: 500 });
158+
}
159+
160+
return json({ error: "Something went wrong" }, { status: 500 });
161+
}
162+
}
163+
);
164+
165+
export { action, loader };

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ import { tryCatch } from "@trigger.dev/core/utils";
33
import { nanoid } from "nanoid";
44
import { z } from "zod";
55
import { $replica } from "~/db.server";
6+
import { logger } from "~/services/logger.server";
67
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
78
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
89
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
10+
import { drainSessionStreamWaitpoints } from "~/services/sessionStreamWaitpointCache.server";
911
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
12+
import { engine } from "~/v3/runEngine.server";
1013
import { ServiceValidationError } from "~/v3/services/common.server";
1114

1215
const ParamsSchema = z.object({
@@ -81,6 +84,43 @@ const { action } = createActionApiRoute(
8184
return json({ ok: false, error: appendError.message }, { status: 500 });
8285
}
8386

87+
// Fire any run-scoped waitpoints registered against this channel. Best
88+
// effort — a failure here must not fail the append (the record is
89+
// durable in S2; the SSE tail will still deliver it).
90+
const [drainError, waitpointIds] = await tryCatch(
91+
drainSessionStreamWaitpoints(session.friendlyId, params.io)
92+
);
93+
if (drainError) {
94+
logger.error("Failed to drain session stream waitpoints", {
95+
sessionFriendlyId: session.friendlyId,
96+
io: params.io,
97+
error: drainError,
98+
});
99+
} else if (waitpointIds && waitpointIds.length > 0) {
100+
await Promise.all(
101+
waitpointIds.map(async (waitpointId) => {
102+
const [completeError] = await tryCatch(
103+
engine.completeWaitpoint({
104+
id: waitpointId,
105+
output: {
106+
value: part,
107+
type: "application/json",
108+
isError: false,
109+
},
110+
})
111+
);
112+
if (completeError) {
113+
logger.error("Failed to complete session stream waitpoint", {
114+
sessionFriendlyId: session.friendlyId,
115+
io: params.io,
116+
waitpointId,
117+
error: completeError,
118+
});
119+
}
120+
})
121+
);
122+
}
123+
84124
return json({ ok: true }, { status: 200 });
85125
}
86126
);

apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,22 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
186186
streamId: string,
187187
afterSeqNum?: number
188188
): Promise<StreamRecord[]> {
189-
const s2Stream = this.toStreamName(runId, streamId);
189+
return this.#readRecordsByName(this.toStreamName(runId, streamId), afterSeqNum);
190+
}
191+
192+
/**
193+
* Read records from a `Session`-primitive channel starting after the
194+
* given sequence number. Used by the `.wait()` race-check path.
195+
*/
196+
async readSessionStreamRecords(
197+
friendlyId: string,
198+
io: "out" | "in",
199+
afterSeqNum?: number
200+
): Promise<StreamRecord[]> {
201+
return this.#readRecordsByName(this.toSessionStreamName(friendlyId, io), afterSeqNum);
202+
}
203+
204+
async #readRecordsByName(s2Stream: string, afterSeqNum?: number): Promise<StreamRecord[]> {
190205
const startSeq = afterSeqNum != null ? afterSeqNum + 1 : 0;
191206

192207
const qs = new URLSearchParams();
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import { Redis } from "ioredis";
2+
import { env } from "~/env.server";
3+
import { singleton } from "~/utils/singleton";
4+
import { logger } from "./logger.server";
5+
6+
// "ssw" — session-stream-waitpoint. Parallel to the input-stream variant
7+
// (`isw:{runFriendlyId}:{streamId}`). Keyed purely on `{sessionId, io}` so
8+
// a send() lands on the channel regardless of which run is waiting, and
9+
// multiple concurrent waiters (e.g. two agents on one chat) all wake.
10+
const KEY_PREFIX = "ssw:";
11+
const DEFAULT_TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days
12+
13+
function buildKey(sessionFriendlyId: string, io: "out" | "in"): string {
14+
return `${KEY_PREFIX}${sessionFriendlyId}:${io}`;
15+
}
16+
17+
function initializeRedis(): Redis | undefined {
18+
const host = env.CACHE_REDIS_HOST;
19+
if (!host) {
20+
return undefined;
21+
}
22+
23+
return new Redis({
24+
connectionName: "sessionStreamWaitpointCache",
25+
host,
26+
port: env.CACHE_REDIS_PORT,
27+
username: env.CACHE_REDIS_USERNAME,
28+
password: env.CACHE_REDIS_PASSWORD,
29+
keyPrefix: "tr:",
30+
enableAutoPipelining: true,
31+
...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
32+
});
33+
}
34+
35+
const redis = singleton("sessionStreamWaitpointCache", initializeRedis);
36+
37+
/**
38+
* Register a waitpoint as pending on the given session channel. Called
39+
* from the `.wait()` create-waitpoint route. Multiple waiters on the same
40+
* channel are allowed (stored as a Redis set).
41+
*/
42+
export async function addSessionStreamWaitpoint(
43+
sessionFriendlyId: string,
44+
io: "out" | "in",
45+
waitpointId: string,
46+
ttlMs?: number
47+
): Promise<void> {
48+
if (!redis) return;
49+
50+
try {
51+
const key = buildKey(sessionFriendlyId, io);
52+
await redis.sadd(key, waitpointId);
53+
await redis.pexpire(key, ttlMs ?? DEFAULT_TTL_MS);
54+
} catch (error) {
55+
logger.error("Failed to set session stream waitpoint cache", {
56+
sessionFriendlyId,
57+
io,
58+
error,
59+
});
60+
}
61+
}
62+
63+
/**
64+
* Atomically read + clear all waitpoints registered on the given session
65+
* channel. Called from the append handler so the next append sees an
66+
* empty set even if two appends race.
67+
*/
68+
export async function drainSessionStreamWaitpoints(
69+
sessionFriendlyId: string,
70+
io: "out" | "in"
71+
): Promise<string[]> {
72+
if (!redis) return [];
73+
74+
try {
75+
const key = buildKey(sessionFriendlyId, io);
76+
const pipeline = redis.multi();
77+
pipeline.smembers(key);
78+
pipeline.del(key);
79+
const results = await pipeline.exec();
80+
if (!results) return [];
81+
const [smembersResult] = results;
82+
if (!smembersResult) return [];
83+
const [err, members] = smembersResult;
84+
if (err) return [];
85+
return Array.isArray(members) ? (members as string[]) : [];
86+
} catch (error) {
87+
logger.error("Failed to drain session stream waitpoint cache", {
88+
sessionFriendlyId,
89+
io,
90+
error,
91+
});
92+
return [];
93+
}
94+
}
95+
96+
/**
97+
* Remove a single waitpoint from the pending set. Called after a race
98+
* where `.wait()` completed the waitpoint from pre-arrived data.
99+
*/
100+
export async function removeSessionStreamWaitpoint(
101+
sessionFriendlyId: string,
102+
io: "out" | "in",
103+
waitpointId: string
104+
): Promise<void> {
105+
if (!redis) return;
106+
107+
try {
108+
const key = buildKey(sessionFriendlyId, io);
109+
await redis.srem(key, waitpointId);
110+
} catch (error) {
111+
logger.error("Failed to remove session stream waitpoint cache entry", {
112+
sessionFriendlyId,
113+
io,
114+
error,
115+
});
116+
}
117+
}

0 commit comments

Comments
 (0)