Skip to content

Commit 668559e

Browse files
authored
fix(streams): buffer v1 streams on read to prevent split chunks (#2669)
1 parent d0ad38d commit 668559e

File tree

3 files changed

+149
-14
lines changed

3 files changed

+149
-14
lines changed

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -204,20 +204,52 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
204204
},
205205
})
206206
.pipeThrough(
207-
// Transform 1: Split data content by newlines, preserving metadata
208-
new TransformStream<StreamChunk, StreamChunk & { line?: string }>({
209-
transform(chunk, controller) {
210-
if (chunk.type === "ping") {
211-
controller.enqueue(chunk);
212-
} else if (chunk.type === "data" || chunk.type === "legacy-data") {
213-
// Split data by newlines, emit separate chunks with same metadata
214-
const lines = chunk.data.split("\n").filter((line) => line.trim().length > 0);
215-
for (const line of lines) {
216-
controller.enqueue({ ...chunk, line });
207+
// Transform 1: Buffer partial lines across Redis entries
208+
(() => {
209+
let buffer = "";
210+
let lastRedisId = "0";
211+
212+
return new TransformStream<StreamChunk, StreamChunk & { line: string }>({
213+
transform(chunk, controller) {
214+
if (chunk.type === "ping") {
215+
controller.enqueue(chunk as any);
216+
} else if (chunk.type === "data" || chunk.type === "legacy-data") {
217+
// Buffer partial lines: accumulate until we see newlines
218+
buffer += chunk.data;
219+
220+
// Split on newlines
221+
const lines = buffer.split("\n");
222+
223+
// The last element might be incomplete, hold it back in buffer
224+
buffer = lines.pop() || "";
225+
226+
// Emit complete lines with the Redis ID of the chunk that completed them
227+
for (const line of lines) {
228+
if (line.trim().length > 0) {
229+
controller.enqueue({
230+
...chunk,
231+
line,
232+
});
233+
}
234+
}
235+
236+
// Update last Redis ID for next iteration
237+
lastRedisId = chunk.redisId;
217238
}
218-
}
219-
},
220-
})
239+
},
240+
flush(controller) {
241+
// On stream end, emit any leftover buffered text
242+
if (buffer.trim().length > 0) {
243+
controller.enqueue({
244+
type: "data",
245+
redisId: lastRedisId,
246+
data: "",
247+
line: buffer.trim(),
248+
});
249+
}
250+
},
251+
});
252+
})()
221253
)
222254
.pipeThrough(
223255
// Transform 2: Format as SSE

apps/webapp/test/redisRealtimeStreams.test.ts

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,4 +1417,108 @@ describe("RedisRealtimeStreams", () => {
14171417
await redis.quit();
14181418
}
14191419
);
1420+
1421+
redisTest(
1422+
"Should handle chunks split mid-line (regression test)",
1423+
{ timeout: 30_000 },
1424+
async ({ redisOptions }) => {
1425+
const redis = new Redis(redisOptions);
1426+
const redisRealtimeStreams = new RedisRealtimeStreams({
1427+
redis: redisOptions,
1428+
});
1429+
1430+
const runId = "run_split_test";
1431+
const streamId = "test-split-stream";
1432+
1433+
// Simulate what happens in production: a JSON line split across multiple network chunks
1434+
// This reproduces the issue where we see partial chunks like:
1435+
// - "{\"timestamp\":"
1436+
// - "1762880245493,\"chunkIndex\":780,\"data\":\"Chunk 781/1000\"}"
1437+
const fullLine = JSON.stringify({
1438+
timestamp: 1762880245493,
1439+
chunkIndex: 780,
1440+
data: "Chunk 781/1000",
1441+
});
1442+
1443+
// Split the line at an arbitrary position (in the middle of the JSON)
1444+
const splitPoint = 16; // Splits after '{"timestamp":'
1445+
const chunk1 = fullLine.substring(0, splitPoint);
1446+
const chunk2 = fullLine.substring(splitPoint);
1447+
1448+
// Create a ReadableStream that sends split chunks
1449+
const encoder = new TextEncoder();
1450+
const stream = new ReadableStream({
1451+
start(controller) {
1452+
controller.enqueue(encoder.encode(chunk1));
1453+
controller.enqueue(encoder.encode(chunk2 + "\n")); // Add newline at end
1454+
controller.close();
1455+
},
1456+
});
1457+
1458+
// Ingest the split data
1459+
await redisRealtimeStreams.ingestData(stream, runId, streamId, "client1");
1460+
1461+
// Now consume the stream and verify we get the complete line, not split chunks
1462+
const abortController = new AbortController();
1463+
const response = await redisRealtimeStreams.streamResponse(
1464+
new Request("http://localhost/test"),
1465+
runId,
1466+
streamId,
1467+
abortController.signal
1468+
);
1469+
1470+
const reader = response.body!.getReader();
1471+
const decoder = new TextDecoder();
1472+
let receivedData = "";
1473+
1474+
// Read all chunks from the response
1475+
const readTimeout = setTimeout(() => {
1476+
abortController.abort();
1477+
}, 5000);
1478+
1479+
try {
1480+
while (true) {
1481+
const { done, value } = await reader.read();
1482+
if (done) break;
1483+
1484+
receivedData += decoder.decode(value, { stream: true });
1485+
1486+
// Once we have data, we can stop
1487+
if (receivedData.includes("data: ")) {
1488+
break;
1489+
}
1490+
}
1491+
} finally {
1492+
clearTimeout(readTimeout);
1493+
abortController.abort();
1494+
reader.releaseLock();
1495+
}
1496+
1497+
// Parse the SSE data
1498+
const lines = receivedData.split("\n").filter((line) => line.startsWith("data: "));
1499+
1500+
// We should receive exactly ONE complete line, not two partial lines
1501+
expect(lines.length).toBe(1);
1502+
1503+
// Extract the data (remove "data: " prefix)
1504+
const dataLine = lines[0].substring(6);
1505+
1506+
// Verify it's the complete, valid JSON
1507+
expect(dataLine).toBe(fullLine);
1508+
1509+
// Verify it parses correctly as JSON
1510+
const parsed = JSON.parse(dataLine) as {
1511+
timestamp: number;
1512+
chunkIndex: number;
1513+
data: string;
1514+
};
1515+
expect(parsed.timestamp).toBe(1762880245493);
1516+
expect(parsed.chunkIndex).toBe(780);
1517+
expect(parsed.data).toBe("Chunk 781/1000");
1518+
1519+
// Cleanup
1520+
await redis.del(`stream:${runId}:${streamId}`);
1521+
await redis.quit();
1522+
}
1523+
);
14201524
});

references/realtime-streams/src/trigger/streams.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -822,7 +822,6 @@ const streamsStressTesterTask = task({
822822

823823
switch (payload.streamsVersion) {
824824
case "v1": {
825-
assert.ok(chunks.length < 2000, "Expected less than 2000 chunks");
826825
break;
827826
}
828827
case "v2": {

0 commit comments

Comments
 (0)