Skip to content

Commit 2f8a039

Browse files
authored
fix(cloudflare): Keep http root span alive until streaming responses are consumed (#18087)
Fixes: https://linear.app/getsentry/issue/JS-1103/spans-are-not-flushed-to-dashboard-when-using-streamtext-with-vercel The Cloudflare request wrapper was ending the root HTTP span immediately when the handler returned a streaming Response (e.g. `result.toTextStreamResponse()`). Since Vercel AI child spans only finish after the stream is consumed by the client, they were filtered out by Sentry's `isFullFinishedSpan` check, resulting in transactions with 0 spans. -------------- This PR implements a streaming response detection and handles this from within the http handler: 1. Created `classifyResponseStreaming()` helper - Detects streaming vs non streaming, via Content-Type (SSE), Content-Length 2. Updated request wrapper - Changed from `startSpan()` to `startSpanManual()` for manual span control - Monitors streaming response consumption in background - Ends root span only after stream fully consumed by client
1 parent 5c51fdb commit 2f8a039

File tree

6 files changed

+189
-33
lines changed

6 files changed

+189
-33
lines changed

packages/cloudflare/src/request.ts

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ import {
88
parseStringToURLObject,
99
SEMANTIC_ATTRIBUTE_SENTRY_OP,
1010
setHttpStatus,
11-
startSpan,
11+
startSpanManual,
1212
winterCGHeadersToDict,
1313
withIsolationScope,
1414
} from '@sentry/core';
1515
import type { CloudflareOptions } from './client';
1616
import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils';
1717
import { init } from './sdk';
18+
import { classifyResponseStreaming } from './utils/streaming';
1819

1920
interface RequestHandlerWrapperOptions {
2021
options: CloudflareOptions;
@@ -97,30 +98,76 @@ export function wrapRequestHandler(
9798
// Note: This span will not have a duration unless I/O happens in the handler. This is
9899
// because of how the cloudflare workers runtime works.
99100
// See: https://developers.cloudflare.com/workers/runtime-apis/performance/
100-
return startSpan(
101-
{
102-
name,
103-
attributes,
104-
},
105-
async span => {
101+
102+
// Use startSpanManual to control when span ends (needed for streaming responses)
103+
return startSpanManual({ name, attributes }, async span => {
104+
let res: Response;
105+
106+
try {
107+
res = await handler();
108+
setHttpStatus(span, res.status);
109+
110+
// After the handler runs, the span name might have been updated by nested instrumentation
111+
// (e.g., Remix parameterizing routes). The span should already have the correct name
112+
// from that instrumentation, so we don't need to do anything here.
113+
} catch (e) {
114+
span.end();
115+
if (captureErrors) {
116+
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
117+
}
118+
waitUntil?.(flush(2000));
119+
throw e;
120+
}
121+
122+
// Classify response to detect actual streaming
123+
const classification = classifyResponseStreaming(res);
124+
125+
if (classification.isStreaming && res.body) {
126+
// Streaming response detected - monitor consumption to keep span alive
106127
try {
107-
const res = await handler();
108-
setHttpStatus(span, res.status);
128+
const [clientStream, monitorStream] = res.body.tee();
109129

110-
// After the handler runs, the span name might have been updated by nested instrumentation
111-
// (e.g., Remix parameterizing routes). The span should already have the correct name
112-
// from that instrumentation, so we don't need to do anything here.
113-
return res;
130+
// Monitor stream consumption and end span when complete
131+
const streamMonitor = (async () => {
132+
const reader = monitorStream.getReader();
133+
134+
try {
135+
let done = false;
136+
while (!done) {
137+
const result = await reader.read();
138+
done = result.done;
139+
}
140+
} catch {
141+
// Stream error or cancellation - will end span in finally
142+
} finally {
143+
reader.releaseLock();
144+
span.end();
145+
waitUntil?.(flush(2000));
146+
}
147+
})();
148+
149+
// Keep worker alive until stream monitoring completes (otherwise span won't end)
150+
waitUntil?.(streamMonitor);
151+
152+
// Return response with client stream
153+
return new Response(clientStream, {
154+
status: res.status,
155+
statusText: res.statusText,
156+
headers: res.headers,
157+
});
114158
} catch (e) {
115-
if (captureErrors) {
116-
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
117-
}
118-
throw e;
119-
} finally {
159+
// tee() failed (e.g stream already locked) - fall back to non-streaming handling
160+
span.end();
120161
waitUntil?.(flush(2000));
162+
return res;
121163
}
122-
},
123-
);
164+
}
165+
166+
// Non-streaming response - end span immediately and return original
167+
span.end();
168+
waitUntil?.(flush(2000));
169+
return res;
170+
});
124171
},
125172
);
126173
});
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
export type StreamingGuess = {
2+
isStreaming: boolean;
3+
};
4+
5+
/**
6+
* Classifies a Response as streaming or non-streaming.
7+
*
8+
* Heuristics:
9+
* - No body → not streaming
10+
* - Known streaming Content-Types → streaming (SSE, NDJSON, JSON streaming)
11+
* - text/plain without Content-Length → streaming (some AI APIs)
12+
* - Otherwise → not streaming (conservative default, including HTML/SSR)
13+
*
14+
* We avoid probing the stream to prevent blocking on transform streams (like injectTraceMetaTags)
15+
* or SSR streams that may not have data ready immediately.
16+
*/
17+
export function classifyResponseStreaming(res: Response): StreamingGuess {
18+
if (!res.body) {
19+
return { isStreaming: false };
20+
}
21+
22+
const contentType = res.headers.get('content-type') ?? '';
23+
const contentLength = res.headers.get('content-length');
24+
25+
// Streaming: Known streaming content types
26+
// - text/event-stream: Server-Sent Events (Vercel AI SDK, real-time APIs)
27+
// - application/x-ndjson, application/ndjson: Newline-delimited JSON
28+
// - application/stream+json: JSON streaming
29+
// - text/plain (without Content-Length): Some AI APIs use this for streaming text
30+
if (
31+
/^text\/event-stream\b/i.test(contentType) ||
32+
/^application\/(x-)?ndjson\b/i.test(contentType) ||
33+
/^application\/stream\+json\b/i.test(contentType) ||
34+
(/^text\/plain\b/i.test(contentType) && !contentLength)
35+
) {
36+
return { isStreaming: true };
37+
}
38+
39+
// Default: treat as non-streaming
40+
return { isStreaming: false };
41+
}

packages/cloudflare/test/durableobject.test.ts

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,18 @@ describe('instrumentDurableObjectWithSentry', () => {
116116
});
117117

118118
it('flush performs after all waitUntil promises are finished', async () => {
119+
// Spy on Client.prototype.flush and mock it to resolve immediately to avoid timeout issues with fake timers
120+
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
119121
vi.useFakeTimers();
120122
onTestFinished(() => {
121123
vi.useRealTimers();
122124
});
123-
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
125+
126+
// Measure delta instead of absolute call count to avoid interference from parallel tests.
127+
// Since we spy on the prototype, other tests running in parallel may also call flush.
128+
// By measuring before/after, we only verify that THIS test triggered exactly one flush call.
129+
const before = flush.mock.calls.length;
130+
124131
const waitUntil = vi.fn();
125132
const testClass = vi.fn(context => ({
126133
fetch: () => {
@@ -133,12 +140,29 @@ describe('instrumentDurableObjectWithSentry', () => {
133140
waitUntil,
134141
} as unknown as ExecutionContext;
135142
const dObject: any = Reflect.construct(instrumented, [context, {} as any]);
136-
expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow();
137-
expect(flush).not.toBeCalled();
138-
expect(waitUntil).toHaveBeenCalledOnce();
143+
144+
// Call fetch (don't await yet)
145+
const responsePromise = dObject.fetch(new Request('https://example.com'));
146+
147+
// Advance past classification timeout and get response
148+
vi.advanceTimersByTime(30);
149+
const response = await responsePromise;
150+
151+
// Consume response (triggers span end for buffered responses)
152+
await response.text();
153+
154+
// The flush should now be queued in waitUntil
155+
expect(waitUntil).toHaveBeenCalled();
156+
157+
// Advance to trigger the setTimeout in the handler's waitUntil
139158
vi.advanceTimersToNextTimer();
140159
await Promise.all(waitUntil.mock.calls.map(([p]) => p));
141-
expect(flush).toBeCalled();
160+
161+
const after = flush.mock.calls.length;
162+
const delta = after - before;
163+
164+
// Verify that exactly one flush call was made during this test
165+
expect(delta).toBe(1);
142166
});
143167

144168
describe('instrumentPrototypeMethods option', () => {

packages/cloudflare/test/handler.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ describe('withSentry', () => {
7272
createMockExecutionContext(),
7373
);
7474

75-
expect(result).toBe(response);
75+
// Response may be wrapped for streaming detection, verify content
76+
expect(result?.status).toBe(response.status);
77+
if (result) {
78+
expect(await result.text()).toBe('test');
79+
}
7680
});
7781

7882
test('merges options from env and callback', async () => {

packages/cloudflare/test/pages-plugin.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ describe('sentryPagesPlugin', () => {
5252
pluginArgs: MOCK_OPTIONS,
5353
});
5454

55-
expect(result).toBe(response);
55+
// Response may be wrapped for streaming detection, verify content
56+
expect(result.status).toBe(response.status);
57+
expect(await result.text()).toBe('test');
5658
});
5759
});

packages/cloudflare/test/request.test.ts

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ describe('withSentry', () => {
3333
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
3434
() => response,
3535
);
36-
expect(result).toBe(response);
36+
// Response may be wrapped for streaming detection, verify content matches
37+
expect(result.status).toBe(response.status);
38+
expect(await result.text()).toBe('test');
3739
});
3840

3941
test('flushes the event after the handler is done using the cloudflare context.waitUntil', async () => {
@@ -48,6 +50,25 @@ describe('withSentry', () => {
4850
expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise));
4951
});
5052

53+
test('handles streaming responses correctly', async () => {
54+
const stream = new ReadableStream({
55+
start(controller) {
56+
controller.enqueue(new TextEncoder().encode('chunk1'));
57+
controller.enqueue(new TextEncoder().encode('chunk2'));
58+
controller.close();
59+
},
60+
});
61+
const streamingResponse = new Response(stream);
62+
63+
const result = await wrapRequestHandler(
64+
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
65+
() => streamingResponse,
66+
);
67+
68+
const text = await result.text();
69+
expect(text).toBe('chunk1chunk2');
70+
});
71+
5172
test("doesn't error if context is undefined", () => {
5273
expect(() =>
5374
wrapRequestHandler(
@@ -69,11 +90,18 @@ describe('withSentry', () => {
6990
});
7091

7192
test('flush must be called when all waitUntil are done', async () => {
72-
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
93+
// Spy on Client.prototype.flush and mock it to resolve immediately to avoid timeout issues with fake timers
94+
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
7395
vi.useFakeTimers();
7496
onTestFinished(() => {
7597
vi.useRealTimers();
7698
});
99+
100+
// Measure delta instead of absolute call count to avoid interference from parallel tests.
101+
// Since we spy on the prototype, other tests running in parallel may also call flush.
102+
// By measuring before/after, we only verify that THIS test triggered exactly one flush call.
103+
const before = flushSpy.mock.calls.length;
104+
77105
const waits: Promise<unknown>[] = [];
78106
const waitUntil = vi.fn(promise => waits.push(promise));
79107

@@ -83,13 +111,20 @@ describe('withSentry', () => {
83111

84112
await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
85113
addDelayedWaitUntil(context);
86-
return new Response('test');
114+
const response = new Response('test');
115+
// Add Content-Length to skip probing
116+
response.headers.set('content-length', '4');
117+
return response;
87118
});
88-
expect(flush).not.toBeCalled();
89119
expect(waitUntil).toBeCalled();
90-
vi.advanceTimersToNextTimerAsync().then(() => vi.runAllTimers());
120+
vi.advanceTimersToNextTimer().runAllTimers();
91121
await Promise.all(waits);
92-
expect(flush).toHaveBeenCalledOnce();
122+
123+
const after = flushSpy.mock.calls.length;
124+
const delta = after - before;
125+
126+
// Verify that exactly one flush call was made during this test
127+
expect(delta).toBe(1);
93128
});
94129

95130
describe('scope instrumentation', () => {
@@ -303,6 +338,9 @@ describe('withSentry', () => {
303338
},
304339
);
305340

341+
// Wait for async span end and transaction capture
342+
await new Promise(resolve => setTimeout(resolve, 50));
343+
306344
expect(sentryEvent.transaction).toEqual('GET /');
307345
expect(sentryEvent.spans).toHaveLength(0);
308346
expect(sentryEvent.contexts?.trace).toEqual({

0 commit comments

Comments
 (0)