Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 25 additions & 29 deletions packages/cloudflare/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,40 +129,36 @@ export function wrapRequestHandler(
const classification = classifyResponseStreaming(res);

if (classification.isStreaming && res.body) {
// Streaming response detected - monitor consumption to keep span alive
try {
const [clientStream, monitorStream] = res.body.tee();

// Monitor stream consumption and end span when complete
const streamMonitor = (async () => {
const reader = monitorStream.getReader();

try {
let done = false;
while (!done) {
const result = await reader.read();
done = result.done;
}
} catch {
// Stream error or cancellation - will end span in finally
} finally {
reader.releaseLock();
span.end();
waitUntil?.(flushAndDispose(client));
}
})();

// Keep worker alive until stream monitoring completes (otherwise span won't end)
waitUntil?.(streamMonitor);

// Return response with client stream
return new Response(clientStream, {
let ended = false;

const endSpanOnce = (): void => {
if (ended) return;

ended = true;
span.end();
waitUntil?.(flushAndDispose(client));
};

const transform = new TransformStream({
flush() {
// Source stream completed normally.
endSpanOnce();
},
cancel() {
// Client disconnected (or downstream cancelled). The `cancel`
// is being called while the response is still considered
// active, so this is a safe place to end the span.
endSpanOnce();
Comment thread
JPeer264 marked this conversation as resolved.
Comment thread
JPeer264 marked this conversation as resolved.
},
});
Comment thread
JPeer264 marked this conversation as resolved.

return new Response(res.body.pipeThrough(transform), {
status: res.status,
statusText: res.statusText,
headers: res.headers,
});
} catch (_e) {
// tee() failed (e.g stream already locked) - fall back to non-streaming handling
} catch {
span.end();
waitUntil?.(flushAndDispose(client));
return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ describe('instrumentFetch', () => {
const wrappedHandler = withSentry(vi.fn(), handler);
const waits: Promise<unknown>[] = [];
const waitUntil = vi.fn(promise => waits.push(promise));
await wrappedHandler.fetch?.(new Request('https://example.com'), MOCK_ENV_WITHOUT_DSN, {
waitUntil,
} as unknown as ExecutionContext);
await wrappedHandler
.fetch?.(new Request('https://example.com'), MOCK_ENV_WITHOUT_DSN, {
waitUntil,
} as unknown as ExecutionContext)
.then(response => response.text());
expect(flush).not.toBeCalled();
expect(waitUntil).toBeCalled();
vi.advanceTimersToNextTimer().runAllTimers();
Expand Down
184 changes: 173 additions & 11 deletions packages/cloudflare/test/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const MOCK_OPTIONS: CloudflareOptions = {
dsn: 'https://public@dsn.ingest.sentry.io/1337',
};

const NODE_MAJOR_VERSION = parseInt(process.versions.node.split('.')[0]!);

function addDelayedWaitUntil(context: ExecutionContext) {
context.waitUntil(new Promise<void>(resolve => setTimeout(() => resolve())));
}
Expand Down Expand Up @@ -44,7 +46,7 @@ describe('withSentry', () => {
await wrapRequestHandler(
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context },
() => new Response('test'),
);
).then(response => response.text());

expect(waitUntilSpy).toHaveBeenCalledTimes(1);
expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise));
Expand Down Expand Up @@ -111,11 +113,8 @@ describe('withSentry', () => {

await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
addDelayedWaitUntil(context);
const response = new Response('test');
// Add Content-Length to skip probing
response.headers.set('content-length', '4');
return response;
});
return new Response('test');
}).then(response => response.text());
expect(waitUntil).toBeCalled();
vi.advanceTimersToNextTimer().runAllTimers();
await Promise.all(waits);
Expand Down Expand Up @@ -336,7 +335,7 @@ describe('withSentry', () => {
SentryCore.captureMessage('sentry-trace');
return new Response('test');
},
);
).then(response => response.text());

// Wait for async span end and transaction capture
await new Promise(resolve => setTimeout(resolve, 50));
Expand Down Expand Up @@ -389,10 +388,8 @@ describe('flushAndDispose', () => {
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);

await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
const response = new Response('test');
response.headers.set('content-length', '4');
return response;
});
return new Response('test');
}).then(response => response.text());

// Wait for all waitUntil promises to resolve
await Promise.all(waits);
Expand Down Expand Up @@ -518,6 +515,171 @@ describe('flushAndDispose', () => {
disposeSpy.mockRestore();
});

// Regression tests for https://github.com/getsentry/sentry-javascript/issues/20409
//
// Pre-fix: streaming responses were observed via `body.tee()` + a long-running
// `waitUntil(streamMonitor)`. Cloudflare caps `waitUntil` at ~30s after the
// handler returns, so any stream taking longer than 30s to fully emit had the
// monitor cancelled before `span.end()` / `flushAndDispose()` ran — silently
// dropping the root `http.server` span.
//
// Post-fix: the body is piped through a passthrough `TransformStream`; the
// `flush` (normal completion) and `cancel` (client disconnect) callbacks fire
// while the response stream is still active (no waitUntil cap), so they can
// safely end the span and register `flushAndDispose` via a fresh `waitUntil`
// window. The contract guaranteed below: `waitUntil` is NOT called with any
// long-running stream-observation promise — only with `flushAndDispose`, and
// only after the response stream has finished (either by completion or cancel).
describe('regression #20409: streaming responses do not park stream observation in waitUntil', () => {
test('waitUntil is not called until streaming response is fully delivered', async () => {
const waits: Promise<unknown>[] = [];
const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise));
const context = { waitUntil } as unknown as ExecutionContext;

const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose');

// Stream emits chunk1, then waits indefinitely until we open the gate
// before emitting chunk2 + closing. Models a long-running upstream
// (e.g. SSE / LLM streaming) whose body takes longer than the
// handler-return time to fully drain.
let releaseLastChunk!: () => void;
const lastChunkGate = new Promise<void>(resolve => {
releaseLastChunk = resolve;
});

const stream = new ReadableStream({
async start(controller) {
controller.enqueue(new TextEncoder().encode('chunk1'));
await lastChunkGate;
controller.enqueue(new TextEncoder().encode('chunk2'));
controller.close();
},
});

const result = await wrapRequestHandler(
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context },
() => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }),
);

// Handler has returned, but the source stream has NOT closed yet.
// The pre-fix code would have already enqueued a long-running
// `waitUntil(streamMonitor)` task at this point. The post-fix code
// must not call waitUntil at all here.
expect(waitUntil).not.toHaveBeenCalled();

// Drain the response — Cloudflare would do this when forwarding to the client.
const reader = result.body!.getReader();
await reader.read(); // chunk1
// Source still hasn't closed — still no waitUntil.
expect(waitUntil).not.toHaveBeenCalled();

releaseLastChunk();
await reader.read(); // chunk2
await reader.read(); // done
reader.releaseLock();

// Stream completed → TransformStream `flush` fired → span ended →
// `flushAndDispose(client)` queued via waitUntil exactly once.
await Promise.all(waits);
expect(waitUntil).toHaveBeenCalledTimes(1);
expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise));
expect(flushSpy).toHaveBeenCalled();
expect(disposeSpy).toHaveBeenCalled();

flushSpy.mockRestore();
disposeSpy.mockRestore();
});

// Node 18's TransformStream does not invoke the transformer's `cancel` hook
// when the downstream consumer cancels (WHATWG spec addition landed in Node 20).
// Cloudflare Workers run modern V8 where this works, so we only skip the
// test under Node 18.
test.skipIf(NODE_MAJOR_VERSION < 20)(
'waitUntil is called once and dispose runs when client cancels mid-stream',
async () => {
const waits: Promise<unknown>[] = [];
const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise));
const context = { waitUntil } as unknown as ExecutionContext;

const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose');

// Stream emits one chunk and then never closes — models an upstream
// that keeps emitting indefinitely. We then cancel the response from
// the consumer side to model a client disconnect.
let sourceCancelled = false;
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('chunk1'));
// intentionally don't close
},
cancel() {
sourceCancelled = true;
},
});

const result = await wrapRequestHandler(
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context },
() => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }),
);

// Handler returned, source still open — no waitUntil yet.
expect(waitUntil).not.toHaveBeenCalled();

const reader = result.body!.getReader();
await reader.read(); // chunk1
await reader.cancel('client disconnected'); // simulates client disconnect
reader.releaseLock();

// TransformStream `cancel` fired → span ended → flushAndDispose queued.
await Promise.all(waits);
expect(waitUntil).toHaveBeenCalledTimes(1);
expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise));
expect(flushSpy).toHaveBeenCalled();
expect(disposeSpy).toHaveBeenCalled();
// pipeThrough should also propagate the cancel upstream to the source.
expect(sourceCancelled).toBe(true);

flushSpy.mockRestore();
disposeSpy.mockRestore();
},
);

test('waitUntil is called exactly once even if the response is consumed multiple times', async () => {
// Sanity: no matter how the response is drained, the TransformStream's
// flush callback must only end the span (and queue flushAndDispose) once.
const waits: Promise<unknown>[] = [];
const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise));
const context = { waitUntil } as unknown as ExecutionContext;

const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose');

const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('a'));
controller.enqueue(new TextEncoder().encode('b'));
controller.close();
},
});

const result = await wrapRequestHandler(
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context },
() => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }),
);

const text = await result.text();
expect(text).toBe('ab');

await Promise.all(waits);
expect(waitUntil).toHaveBeenCalledTimes(1);

flushSpy.mockRestore();
disposeSpy.mockRestore();
});
});

test('dispose is NOT called for protocol upgrade responses (status 101)', async () => {
const context = createMockExecutionContext();
const waits: Promise<unknown>[] = [];
Expand Down
Loading