Skip to content

Commit 536d9fa

Browse files
authored
feat(realtime): Realtime streams v2 (#2632)
1 parent d75c3ae commit 536d9fa

File tree

112 files changed

+11502
-1375
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+11502
-1375
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": minor
3+
"@trigger.dev/react-hooks": minor
4+
---
5+
6+
Realtime streams v2
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
export function ListBulletIcon({ className }: { className?: string }) {
2+
return (
3+
<svg className={className} viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
4+
<path
5+
d="M9 5H20"
6+
stroke="currentColor"
7+
strokeWidth="2"
8+
strokeLinecap="round"
9+
strokeLinejoin="round"
10+
/>
11+
<path
12+
d="M9 12H20"
13+
stroke="currentColor"
14+
strokeWidth="2"
15+
strokeLinecap="round"
16+
strokeLinejoin="round"
17+
/>
18+
<path
19+
d="M9 19H20"
20+
stroke="currentColor"
21+
strokeWidth="2"
22+
strokeLinecap="round"
23+
strokeLinejoin="round"
24+
/>
25+
<circle cx="4" cy="5" r="1" fill="currentColor" />
26+
<circle cx="4" cy="12" r="1" fill="currentColor" />
27+
<circle cx="4" cy="19" r="1" fill="currentColor" />
28+
</svg>
29+
);
30+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
export function MoveToBottomIcon({ className }: { className?: string }) {
2+
return (
3+
<svg className={className} viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
4+
<path
5+
d="M12 15L12 3"
6+
stroke="currentColor"
7+
strokeWidth="2"
8+
strokeLinecap="round"
9+
strokeLinejoin="round"
10+
/>
11+
<path
12+
d="M3 21L21 21"
13+
stroke="currentColor"
14+
strokeWidth="2"
15+
strokeLinecap="round"
16+
strokeLinejoin="round"
17+
/>
18+
<path
19+
d="M7.5 12.5L12 17L16.5 12.5"
20+
stroke="currentColor"
21+
strokeWidth="2"
22+
strokeLinecap="round"
23+
strokeLinejoin="round"
24+
/>
25+
</svg>
26+
);
27+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
export function SnakedArrowIcon({ className }: { className?: string }) {
2+
return (
3+
<svg className={className} viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
4+
<path
5+
d="M5 5H16C17.6569 5 19 6.34315 19 8L19 8.5C19 10.1569 17.6569 11.5 16 11.5H8C6.34314 11.5 5 12.8431 5 14.5L5 15C4.99999 16.6569 6.34314 18 8 18H18.634"
6+
stroke="currentColor"
7+
strokeWidth="2"
8+
strokeLinecap="round"
9+
strokeLinejoin="round"
10+
/>
11+
<path
12+
d="M16 21L19 18L16 15"
13+
stroke="currentColor"
14+
strokeWidth="2"
15+
strokeLinecap="round"
16+
strokeLinejoin="round"
17+
/>
18+
</svg>
19+
);
20+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
export function StreamsIcon({ className }: { className?: string }) {
2+
return (
3+
<svg className={className} viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
4+
<path d="M3 19C3 19 5.01155 17 8 17C10.9885 17 13 18.9973 16 18.9973C19 18.9973 21 17 21 17" stroke="currentColor" strokeWidth="2" strokeLinecap="round"/>
5+
<path d="M3 13.0001C3 13.0001 5.01155 11 8 11C10.9885 11 13 13 16 13C19 13 21 11.0001 21 11.0001" stroke="currentColor" strokeWidth="2" strokeLinecap="round"/>
6+
<path d="M3 7C3 7 5.01155 5 8 5C10.9885 5 13 6.9973 16 6.9973C19 6.9973 21 5 21 5" stroke="currentColor" strokeWidth="2" strokeLinecap="round"/>
7+
</svg>
8+
);
9+
}
10+

apps/webapp/app/components/runs/v3/RunIcon.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { TriggerIcon } from "~/assets/icons/TriggerIcon";
2020
import { PythonLogoIcon } from "~/assets/icons/PythonLogoIcon";
2121
import { TraceIcon } from "~/assets/icons/TraceIcon";
2222
import { WaitpointTokenIcon } from "~/assets/icons/WaitpointTokenIcon";
23+
import { StreamsIcon } from "~/assets/icons/StreamsIcon";
2324

2425
type TaskIconProps = {
2526
name: string | undefined;
@@ -107,6 +108,8 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
107108
case "task-hook-onFailure":
108109
case "task-hook-catchError":
109110
return <FunctionIcon className={cn(className, "text-error")} />;
111+
case "streams":
112+
return <StreamsIcon className={cn(className, "text-text-dimmed")} />;
110113
}
111114

112115
return <InformationCircleIcon className={cn(className, "text-text-dimmed")} />;

apps/webapp/app/env.server.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ const EnvironmentSchema = z
219219
.string()
220220
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
221221
REALTIME_STREAMS_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
222+
REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce.number().int().default(60000), // 1 minute
222223

223224
REALTIME_MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS: z.coerce
224225
.number()
@@ -1222,6 +1223,16 @@ const EnvironmentSchema = z
12221223
EVENT_LOOP_MONITOR_UTILIZATION_SAMPLE_RATE: z.coerce.number().default(0.05),
12231224

12241225
VERY_SLOW_QUERY_THRESHOLD_MS: z.coerce.number().int().optional(),
1226+
1227+
REALTIME_STREAMS_S2_BASIN: z.string().optional(),
1228+
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
1229+
REALTIME_STREAMS_S2_LOG_LEVEL: z
1230+
.enum(["log", "error", "warn", "info", "debug"])
1231+
.default("info"),
1232+
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
1233+
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
1234+
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
1235+
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
12251236
})
12261237
.and(GithubAppEnvSchema)
12271238
.and(S2EnvSchema);

apps/webapp/app/models/organization.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export async function createOrganization(
6666
role: "ADMIN",
6767
},
6868
},
69-
v3Enabled: !features.isManagedCloud,
69+
v3Enabled: true,
7070
},
7171
include: {
7272
members: true,

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { WaitpointPresenter } from "./WaitpointPresenter.server";
1919
import { engine } from "~/v3/runEngine.server";
2020
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
2121
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";
22+
import { safeJsonParse } from "~/utils/json";
2223

2324
type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
2425
export type Span = NonNullable<NonNullable<Result>["span"]>;
@@ -551,6 +552,41 @@ export class SpanPresenter extends BasePresenter {
551552
},
552553
};
553554
}
555+
case "realtime-stream": {
556+
if (!span.entity.id) {
557+
logger.error(`SpanPresenter: No realtime stream id`, {
558+
spanId,
559+
realtimeStreamId: span.entity.id,
560+
});
561+
return { ...data, entity: null };
562+
}
563+
564+
const [runId, streamKey] = span.entity.id.split(":");
565+
566+
if (!runId || !streamKey) {
567+
logger.error(`SpanPresenter: Invalid realtime stream id`, {
568+
spanId,
569+
realtimeStreamId: span.entity.id,
570+
});
571+
return { ...data, entity: null };
572+
}
573+
574+
const metadata = span.entity.metadata
575+
? (safeJsonParse(span.entity.metadata) as Record<string, unknown> | undefined)
576+
: undefined;
577+
578+
return {
579+
...data,
580+
entity: {
581+
type: "realtime-stream" as const,
582+
object: {
583+
runId,
584+
streamKey,
585+
metadata,
586+
},
587+
},
588+
};
589+
}
554590
default:
555591
return { ...data, entity: null };
556592
}

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export const HeadersSchema = z.object({
3333
"x-trigger-client": z.string().nullish(),
3434
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
3535
"x-trigger-request-idempotency-key": z.string().nullish(),
36+
"x-trigger-realtime-streams-version": z.string().nullish(),
3637
traceparent: z.string().optional(),
3738
tracestate: z.string().optional(),
3839
});
@@ -63,6 +64,7 @@ const { action, loader } = createActionApiRoute(
6364
"x-trigger-client": triggerClient,
6465
"x-trigger-engine-version": engineVersion,
6566
"x-trigger-request-idempotency-key": requestIdempotencyKey,
67+
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
6668
} = headers;
6769

6870
const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
@@ -108,14 +110,7 @@ const { action, loader } = createActionApiRoute(
108110
options: body.options,
109111
isFromWorker,
110112
traceContext,
111-
});
112-
113-
logger.debug("[otelContext]", {
114-
taskId: params.taskId,
115-
headers,
116-
options: body.options,
117-
isFromWorker,
118-
traceContext,
113+
realtimeStreamsVersion,
119114
});
120115

121116
const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL);
@@ -131,6 +126,7 @@ const { action, loader } = createActionApiRoute(
131126
traceContext,
132127
spanParentAsLink: spanParentAsLink === 1,
133128
oneTimeUseToken,
129+
realtimeStreamsVersion: realtimeStreamsVersion ?? undefined,
134130
},
135131
engineVersion ?? undefined
136132
);

0 commit comments

Comments
 (0)