Skip to content

Commit 55ed015

Browse files
committed
Improve stream dashboard perf with virtualization
1 parent 0ba82d2 commit 55ed015

File tree

2 files changed

+136
-14
lines changed
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey
  • references/realtime-streams/src/trigger

2 files changed

+136
-14
lines changed

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { BoltIcon, BoltSlashIcon } from "@heroicons/react/20/solid";
22
import { type LoaderFunctionArgs } from "@remix-run/server-runtime";
33
import { type SSEStreamPart, SSEStreamSubscription } from "@trigger.dev/core/v3";
4+
import { useVirtualizer } from "@tanstack/react-virtual";
45
import { Clipboard, ClipboardCheck } from "lucide-react";
56
import { useCallback, useEffect, useRef, useState } from "react";
67
import simplur from "simplur";
@@ -117,6 +118,8 @@ export function RealtimeStreamViewer({
117118
const [mouseOver, setMouseOver] = useState(false);
118119
const [copied, setCopied] = useState(false);
119120

121+
console.log("chunks.length", chunks.length);
122+
120123
const getCompactText = useCallback(() => {
121124
return chunks
122125
.map((chunk) => {
@@ -208,6 +211,14 @@ export function RealtimeStreamViewer({
208211
const maxLineNumberWidth = (chunks.length > 0 ? lastLineNumber : firstLineNumber).toString()
209212
.length;
210213

214+
// Virtual rendering for list view
215+
const rowVirtualizer = useVirtualizer({
216+
count: chunks.length,
217+
getScrollElement: () => scrollRef.current,
218+
estimateSize: () => 28,
219+
overscan: 5,
220+
});
221+
211222
return (
212223
<div className="flex h-full flex-col overflow-hidden">
213224
{/* Header */}
@@ -360,16 +371,33 @@ export function RealtimeStreamViewer({
360371

361372
{chunks.length > 0 && viewMode === "list" && (
362373
<div className="font-mono text-xs leading-tight">
363-
{chunks.map((chunk, index) => (
364-
<StreamChunkLine
365-
key={index}
366-
chunk={chunk}
367-
lineNumber={firstLineNumber + index}
368-
maxLineNumberWidth={maxLineNumberWidth}
374+
<div
375+
style={{
376+
height: `${rowVirtualizer.getTotalSize()}px`,
377+
width: "100%",
378+
position: "relative",
379+
}}
380+
>
381+
{rowVirtualizer.getVirtualItems().map((virtualItem) => (
382+
<StreamChunkLine
383+
key={virtualItem.key}
384+
chunk={chunks[virtualItem.index]}
385+
lineNumber={firstLineNumber + virtualItem.index}
386+
maxLineNumberWidth={maxLineNumberWidth}
387+
size={virtualItem.size}
388+
start={virtualItem.start}
389+
/>
390+
))}
391+
{/* Sentinel element for IntersectionObserver */}
392+
<div
393+
ref={bottomRef}
394+
className="h-px"
395+
style={{
396+
position: "absolute",
397+
top: `${rowVirtualizer.getTotalSize()}px`,
398+
}}
369399
/>
370-
))}
371-
{/* Sentinel element for IntersectionObserver */}
372-
<div ref={bottomRef} className="h-px" />
400+
</div>
373401
</div>
374402
)}
375403

@@ -402,10 +430,14 @@ function StreamChunkLine({
402430
chunk,
403431
lineNumber,
404432
maxLineNumberWidth,
433+
size,
434+
start,
405435
}: {
406436
chunk: StreamChunk;
407437
lineNumber: number;
408438
maxLineNumberWidth: number;
439+
size: number;
440+
start: number;
409441
}) {
410442
const formattedData =
411443
typeof chunk.data === "string" ? chunk.data : JSON.stringify(chunk.data, null, 2);
@@ -421,7 +453,17 @@ function StreamChunkLine({
421453
const timestamp = `${timeString}.${milliseconds}`;
422454

423455
return (
424-
<div className="group flex w-full gap-3 py-1 hover:bg-charcoal-800">
456+
<div
457+
className="group flex w-full gap-3 py-1 hover:bg-charcoal-800"
458+
style={{
459+
position: "absolute",
460+
top: 0,
461+
left: 0,
462+
width: "100%",
463+
height: `${size}px`,
464+
transform: `translateY(${start}px)`,
465+
}}
466+
>
425467
{/* Line number */}
426468
<div
427469
className="flex-none select-none pl-2 text-right text-charcoal-500"

references/realtime-streams/src/trigger/streams.ts

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { demoStream } from "@/app/streams";
2-
import { logger, task } from "@trigger.dev/sdk";
2+
import { logger, metadata, task } from "@trigger.dev/sdk";
33
import { setTimeout } from "timers/promises";
44

55
export type STREAMS = {
@@ -46,8 +46,6 @@ export type StreamPayload = {
4646
export const streamsTask = task({
4747
id: "streams",
4848
run: async (payload: StreamPayload = {}, { ctx }) => {
49-
await setTimeout(1000);
50-
5149
const scenario = payload.scenario ?? "continuous";
5250
logger.info("Starting stream scenario", { scenario });
5351

@@ -118,7 +116,89 @@ export const streamsTask = task({
118116

119117
await waitUntilComplete();
120118

121-
await streamsChildTask.triggerAndWait({});
119+
logger.info("Stream completed", { scenario });
120+
121+
return {
122+
scenario,
123+
scenarioDescription,
124+
};
125+
},
126+
});
127+
128+
export const metadataStreamsTask = task({
129+
id: "metadata-streams",
130+
run: async (payload: StreamPayload = {}, { ctx }) => {
131+
const scenario = payload.scenario ?? "continuous";
132+
logger.info("Starting stream scenario", { scenario });
133+
134+
let generator: AsyncGenerator<string>;
135+
let scenarioDescription: string;
136+
137+
switch (scenario) {
138+
case "stall": {
139+
const stallDurationMs = payload.stallDurationMs ?? 3 * 60 * 1000; // Default 3 minutes
140+
const includePing = payload.includePing ?? false;
141+
generator = generateLLMTokenStream(includePing, stallDurationMs);
142+
scenarioDescription = `Stall scenario: ${stallDurationMs / 1000}s with ${
143+
includePing ? "ping tokens" : "no pings"
144+
}`;
145+
break;
146+
}
147+
case "continuous": {
148+
const durationSec = payload.durationSec ?? 45;
149+
const intervalMs = payload.intervalMs ?? 10;
150+
generator = generateContinuousTokenStream(durationSec, intervalMs);
151+
scenarioDescription = `Continuous scenario: ${durationSec}s with ${intervalMs}ms intervals`;
152+
break;
153+
}
154+
case "burst": {
155+
const burstCount = payload.burstCount ?? 10;
156+
const tokensPerBurst = payload.tokensPerBurst ?? 20;
157+
const burstIntervalMs = payload.burstIntervalMs ?? 5;
158+
const pauseBetweenBurstsMs = payload.pauseBetweenBurstsMs ?? 2000;
159+
generator = generateBurstTokenStream(
160+
burstCount,
161+
tokensPerBurst,
162+
burstIntervalMs,
163+
pauseBetweenBurstsMs
164+
);
165+
scenarioDescription = `Burst scenario: ${burstCount} bursts of ${tokensPerBurst} tokens`;
166+
break;
167+
}
168+
case "slow-steady": {
169+
const durationMin = payload.durationMin ?? 5;
170+
const tokenIntervalSec = payload.tokenIntervalSec ?? 5;
171+
generator = generateSlowSteadyTokenStream(durationMin, tokenIntervalSec);
172+
scenarioDescription = `Slow steady scenario: ${durationMin}min with ${tokenIntervalSec}s intervals`;
173+
break;
174+
}
175+
case "markdown": {
176+
const tokenDelayMs = payload.tokenDelayMs ?? 15;
177+
generator = generateMarkdownTokenStream(tokenDelayMs);
178+
scenarioDescription = `Markdown scenario: generating formatted content with ${tokenDelayMs}ms delays`;
179+
break;
180+
}
181+
case "performance": {
182+
const chunkCount = payload.chunkCount ?? 500;
183+
const chunkIntervalMs = payload.chunkIntervalMs ?? 10;
184+
generator = generatePerformanceStream(chunkCount, chunkIntervalMs);
185+
scenarioDescription = `Performance scenario: ${chunkCount} chunks with ${chunkIntervalMs}ms intervals`;
186+
break;
187+
}
188+
default: {
189+
throw new Error(`Unknown scenario: ${scenario}`);
190+
}
191+
}
192+
193+
logger.info("Starting stream", { scenarioDescription });
194+
195+
const mockStream = createStreamFromGenerator(generator);
196+
197+
const stream = await metadata.stream("demo", mockStream);
198+
199+
for await (const chunk of stream) {
200+
logger.info("Received chunk", { chunk });
201+
}
122202

123203
logger.info("Stream completed", { scenario });
124204

0 commit comments

Comments
 (0)