From 7ffb2f06c4924f8f141f0ada224bfaad1e43184d Mon Sep 17 00:00:00 2001 From: isaacs Date: Fri, 8 May 2026 22:05:22 -0700 Subject: [PATCH 1/5] feat(core, node): portable Connect integration Platform-portable Connect tracing integration in `@sentry/core` (`patchConnectModule`, `setupConnectErrorHandler`), similar to portable Express integration, and rewire the Node SDK's Connect integration to call into it through the otel InstrumentationBase class. Remove OTel-specific span attribute fix-up. Spans created with correct origin (`auto.http.connect`) and op directly in the middleware. --- .../node-connect/tests/transactions.test.ts | 4 +- .../suites/tracing/connect/test.ts | 50 +- packages/core/src/index.ts | 2 + .../core/src/integrations/connect/index.ts | 309 ++++++++++++ .../lib/integrations/connect/index.test.ts | 451 ++++++++++++++++++ .../node/src/integrations/tracing/connect.ts | 109 ++--- 6 files changed, 837 insertions(+), 88 deletions(-) create mode 100644 packages/core/src/integrations/connect/index.ts create mode 100644 packages/core/test/lib/integrations/connect/index.test.ts diff --git a/dev-packages/e2e-tests/test-applications/node-connect/tests/transactions.test.ts b/dev-packages/e2e-tests/test-applications/node-connect/tests/transactions.test.ts index 9b06ad052f58..84c5a6019210 100644 --- a/dev-packages/e2e-tests/test-applications/node-connect/tests/transactions.test.ts +++ b/dev-packages/e2e-tests/test-applications/node-connect/tests/transactions.test.ts @@ -72,7 +72,7 @@ test('Sends an API route transaction', async ({ baseURL }) => { }, { data: { - 'sentry.origin': 'auto.http.otel.connect', + 'sentry.origin': 'auto.http.connect', 'sentry.op': 'request_handler.connect', 'http.route': '/test-transaction', 'connect.type': 'request_handler', @@ -86,7 +86,7 @@ test('Sends an API route transaction', async ({ baseURL }) => { status: 'ok', timestamp: expect.any(Number), trace_id: expect.stringMatching(/[a-f0-9]{32}/), - origin: 'auto.http.otel.connect', + origin: 'auto.http.connect', }, ], transaction: 'GET /test-transaction', diff --git a/dev-packages/node-integration-tests/suites/tracing/connect/test.ts b/dev-packages/node-integration-tests/suites/tracing/connect/test.ts index 0c37c58f8a4c..bcc8033710d1 100644 --- a/dev-packages/node-integration-tests/suites/tracing/connect/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/connect/test.ts @@ -14,11 +14,11 @@ describe('connect auto-instrumentation', () => { 'connect.name': '/', 'connect.type': 'request_handler', 'http.route': '/', - 'sentry.origin': 'auto.http.otel.connect', + 'sentry.origin': 'auto.http.connect', 'sentry.op': 'request_handler.connect', }), description: '/', - origin: 'auto.http.otel.connect', + origin: 'auto.http.connect', op: 'request_handler.connect', status: 'ok', }), @@ -36,32 +36,26 @@ describe('connect auto-instrumentation', () => { }, }; - createEsmAndCjsTests( - __dirname, - 'scenario.mjs', - 'instrument.mjs', - (createTestRunner, test) => { - test('should auto-instrument `connect` package.', async () => { - const runner = createTestRunner().expect({ transaction: EXPECTED_TRANSACTION }).start(); - runner.makeRequest('get', '/'); - await runner.completed(); - }); + createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createTestRunner, test) => { + test('should auto-instrument `connect` package.', async () => { + const runner = createTestRunner().expect({ transaction: EXPECTED_TRANSACTION }).start(); + runner.makeRequest('get', '/'); + await runner.completed(); + }); - test('should capture errors in `connect` middleware.', async () => { - const runner = createTestRunner().ignore('transaction').expect({ event: EXPECTED_EVENT }).start(); - runner.makeRequest('get', '/error'); - await runner.completed(); - }); + test('should capture errors in `connect` middleware.', async () => { + const runner = createTestRunner().ignore('transaction').expect({ event: EXPECTED_EVENT }).start(); + runner.makeRequest('get', '/error'); + await runner.completed(); + }); - test('should report errored transactions.', async () => { - const runner = createTestRunner() - .ignore('event') - .expect({ transaction: { transaction: 'GET /error' } }) - .start(); - runner.makeRequest('get', '/error'); - await runner.completed(); - }); - }, - { failsOnEsm: true }, - ); + test('should report errored transactions.', async () => { + const runner = createTestRunner() + .ignore('event') + .expect({ transaction: { transaction: 'GET /error' } }) + .start(); + runner.makeRequest('get', '/error'); + await runner.completed(); + }); + }); }); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 1751192d13dc..a093a70b6264 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -138,6 +138,8 @@ export type { ExpressMiddleware, ExpressErrorMiddleware, } from './integrations/express/types'; +export { patchConnectModule, setupConnectErrorHandler } from './integrations/connect/index'; +export type { ConnectIntegrationOptions, ConnectModule } from './integrations/connect/index'; export { dedupeIntegration } from './integrations/dedupe'; export { extraErrorDataIntegration } from './integrations/extraerrordata'; export { rewriteFramesIntegration } from './integrations/rewriteframes'; diff --git a/packages/core/src/integrations/connect/index.ts b/packages/core/src/integrations/connect/index.ts new file mode 100644 index 000000000000..b008e10e5ca0 --- /dev/null +++ b/packages/core/src/integrations/connect/index.ts @@ -0,0 +1,309 @@ +/** + * Platform-portable Connect tracing integration. + * + * @module + * + * This Sentry integration is a derivative work based on the OpenTelemetry + * Connect instrumentation. + * + * + * + * Extended under the terms of the Apache 2.0 license linked below: + * + * ---- + * + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { captureException } from '../../exports'; +import { SEMANTIC_ATTRIBUTE_SENTRY_OP, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes'; +import { startInactiveSpan } from '../../tracing'; +import { getActiveSpan } from '../../utils/spanUtils'; +import { wrapMethod } from '../../utils/object'; +import { DEBUG_BUILD } from '../../debug-build'; +import { debug } from '../../utils/debug-logger'; +import { getDefaultExport } from '../../utils/get-default-export'; + +export const ANONYMOUS_NAME = 'anonymous'; + +// Symbol used to store the route stack on the request object (non-enumerable) +const _LAYERS_STORE_PROPERTY = Symbol('sentry.connect.request-route-stack'); + +// --- Type definitions --- + +export type ConnectRequest = { + method?: string; + url?: string; + [key: symbol]: unknown; +}; + +export type ConnectResponse = { + addListener(event: string, listener: () => void): void; + removeListener(event: string, listener: () => void): void; +}; + +export type ConnectMiddleware = ((...args: unknown[]) => unknown) & { + name?: string; + length: number; +}; + +export type ConnectApp = { + use: (...args: unknown[]) => ConnectApp; + handle: (...args: unknown[]) => unknown; +}; + +// The connect module export is a factory function: connect() returns a ConnectApp +export type ConnectModule = (...args: unknown[]) => ConnectApp; + +export type ConnectModuleExport = + | ConnectModule + | { + default: ConnectModule; + }; + +export interface ConnectIntegrationOptions { + /** + * Optional callback invoked each time a named route handler resolves the + * matched HTTP route. Platform-specific integrations (e.g. Node.js) can use + * this to propagate the resolved route to OTel RPCMetadata. + */ + onRouteResolved?: (route: string) => void; +} + +// --- Internal route stack management --- +// Tracks nested route paths on the request object, mirroring the OTel +// connect instrumentation's approach for building the full HTTP route. + +function addNewStackLayer(req: ConnectRequest): () => void { + let layers = req[_LAYERS_STORE_PROPERTY] as string[] | undefined; + if (!Array.isArray(layers)) { + layers = []; + Object.defineProperty(req, _LAYERS_STORE_PROPERTY, { + enumerable: false, + value: layers, + }); + } + layers.push('/'); + const stackLength = layers.length; + return () => { + if ( + Array.isArray(req[_LAYERS_STORE_PROPERTY]) && + (req[_LAYERS_STORE_PROPERTY] as string[]).length === stackLength + ) { + (req[_LAYERS_STORE_PROPERTY] as string[]).pop(); + } + }; +} + +function replaceCurrentStackRoute(req: ConnectRequest, newRoute: string): void { + if (!newRoute) return; + const layers = req[_LAYERS_STORE_PROPERTY] as string[] | undefined; + if (Array.isArray(layers) && layers.length > 0) { + layers.splice(-1, 1, newRoute); + } +} + +// Combines all stack layers into a single route path, deduplicating slashes: +// ['/api/', '/users', '/:id'] => '/api/users/:id' +function generateRoute(req: ConnectRequest): string { + const layers = req[_LAYERS_STORE_PROPERTY] as string[] | undefined; + /* v8 ignore start */ + if (!Array.isArray(layers) || layers.length === 0) return '/'; + return layers.reduce((acc: string, sub: string) => acc.replace(/\/+$/, '') + sub); +} + +// --- Middleware patching --- + +function patchMiddleware( + routeName: string, + middleware: ConnectMiddleware, + options?: ConnectIntegrationOptions, +): ConnectMiddleware { + // Error middlewares have 4 arguments: (err, req, res, next) + const isErrorMiddleware = middleware.length === 4; + + function patchedMiddleware(this: unknown, ...args: unknown[]): unknown { + const parentSpan = getActiveSpan(); + if (!parentSpan) { + return middleware.apply(this, args); + } + + const [reqArgIdx, resArgIdx, nextArgIdx] = isErrorMiddleware ? [1, 2, 3] : [0, 1, 2]; + const req = args[reqArgIdx] as ConnectRequest; + const res = args[resArgIdx] as ConnectResponse; + const next = args[nextArgIdx] as ((...a: unknown[]) => unknown) | undefined; + + replaceCurrentStackRoute(req, routeName); + + const isRequestHandler = !!routeName; + const connectType = isRequestHandler ? 'request_handler' : 'middleware'; + const connectName = isRequestHandler ? routeName : middleware.name || ANONYMOUS_NAME; + + if (isRequestHandler) { + options?.onRouteResolved?.(generateRoute(req)); + } + + // Create the span as inactive so the middleware (and anything it calls, + // including the connect error-handler middleware) runs with the original + // parent span — typically the http.server span — as the active span. + // This matches the behavior of OpenTelemetry's connect instrumentation + // and keeps captureException calls inside middleware attached to the + // root server span instead of the per-middleware span. + const span = startInactiveSpan({ + name: connectName, + attributes: { + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: `${connectType}.connect`, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.http.connect', + 'http.route': routeName.length > 0 ? routeName : '/', + 'connect.type': connectType, + 'connect.name': connectName, + }, + }); + + let spanFinished = false; + + function finishSpan(): void { + if (!spanFinished) { + spanFinished = true; + span.end(); + } + res.removeListener('close', finishSpan); + } + + // End the span when the response closes (handles async middlewares that + // do not call next()) + res.addListener('close', finishSpan); + + if (typeof next === 'function') { + args[nextArgIdx] = function patchedNext(this: unknown, ...nextArgs: unknown[]) { + finishSpan(); + return next.apply(this, nextArgs); + }; + } + + return middleware.apply(this, args); + } + + // Preserve the original function's arity so connect can detect error + // middlewares (length === 4) correctly. + Object.defineProperty(patchedMiddleware, 'length', { + value: middleware.length, + writable: false, + configurable: true, + }); + + return patchedMiddleware; +} + +// --- App patching --- + +/** + * Patch an already-created connect app instance. + */ +export function patchConnectApp(app: ConnectApp, options?: ConnectIntegrationOptions): void { + const originalUse = app.use; + try { + wrapMethod(app, 'use', function patchedUse(this: ConnectApp, ...args: unknown[]) { + // connect.use([route,] middleware) — the route is optional + const middleware = args[args.length - 1] as ConnectMiddleware; + /* v8 ignore start */ + const routeName = args.length > 1 ? String(args[args.length - 2] ?? '') : ''; + args[args.length - 1] = patchMiddleware(routeName, middleware, options); + return originalUse.apply(this, args); + }); + } catch (e) { + DEBUG_BUILD && debug.error('Failed to patch connect use method:', e); + } + + const originalHandle = app.handle; + try { + wrapMethod(app, 'handle', function patchedHandle(this: unknown, ...args: unknown[]) { + // handle(req, res[, out]) — 'out' is the fallback called when no + // middleware matches the request (used for nested apps). + const req = args[0] as ConnectRequest; + const out = args[2]; + const completeStack = addNewStackLayer(req); + if (typeof out === 'function') { + args[2] = function patchedOut(this: unknown, ...outArgs: unknown[]) { + completeStack(); + return (out as (...a: unknown[]) => unknown).apply(this, outArgs); + }; + } + return originalHandle.apply(this, args); + }); + } catch (e) { + DEBUG_BUILD && debug.error('Failed to patch connect handle method:', e); + } +} + +/** + * Wrap the connect factory function so that every app created with it is + * automatically patched. + * + * @example + * ```javascript + * import connect from 'connect'; + * import * as Sentry from '@sentry/node'; // or any SDK that extends core + * + * const patchedConnect = Sentry.patchConnectModule(connect); + * const app = patchedConnect(); + * ``` + */ +export function patchConnectModule( + connectModule: ConnectModuleExport, + options?: ConnectIntegrationOptions, +): ConnectModule { + const connect = getDefaultExport(connectModule); + return function patchedConnect(this: unknown, ...args: unknown[]) { + const app = connect.apply(this, args) as ConnectApp; + patchConnectApp(app, options); + return app; + } as ConnectModule; +} + +// --- Error handler --- + +function connectErrorMiddleware(err: unknown, _req: unknown, _res: unknown, next: (err: unknown) => void): void { + captureException(err, { + mechanism: { + handled: false, + type: 'auto.middleware.connect', + }, + }); + next(err); +} + +/** + * Add a Connect middleware to capture errors to Sentry. + * + * @param app The Connect app to attach the error handler to + * + * @example + * ```javascript + * const Sentry = require('@sentry/node'); + * const connect = require('connect'); + * + * const app = connect(); + * + * Sentry.setupConnectErrorHandler(app); + * + * // Add your connect routes here + * + * app.listen(3000); + * ``` + */ +export function setupConnectErrorHandler(app: { use: (middleware: unknown) => void }): void { + app.use(connectErrorMiddleware); +} diff --git a/packages/core/test/lib/integrations/connect/index.test.ts b/packages/core/test/lib/integrations/connect/index.test.ts new file mode 100644 index 000000000000..e5d79116b862 --- /dev/null +++ b/packages/core/test/lib/integrations/connect/index.test.ts @@ -0,0 +1,451 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { + ANONYMOUS_NAME, + patchConnectApp, + patchConnectModule, + setupConnectErrorHandler, + type ConnectApp, + type ConnectModule, + type ConnectRequest, + type ConnectResponse, +} from '../../../../src/integrations/connect/index'; + +// --- Mock Sentry core --- + +const activeSpans: ReturnType[] = []; +let mockParentSpan: ReturnType | null = null; + +function makeMockSpan(name = 'span') { + return { + name, + ended: false, + attributes: {} as Record, + setAttributes(attrs: Record) { + Object.assign(this.attributes, attrs); + }, + end() { + this.ended = true; + }, + }; +} + +vi.mock('../../../../src/utils/spanUtils', () => ({ + getActiveSpan: () => mockParentSpan, +})); + +const startInactiveSpanCalls: { options: unknown }[] = []; + +vi.mock('../../../../src/tracing', () => ({ + startInactiveSpan(options: unknown) { + const span = makeMockSpan((options as { name: string }).name); + activeSpans.push(span); + startInactiveSpanCalls.push({ options }); + return span; + }, +})); + +const capturedExceptions: [unknown, unknown][] = []; +vi.mock('../../../../src/exports', () => ({ + captureException(error: unknown, hint: unknown) { + capturedExceptions.push([error, hint]); + return 'eventId'; + }, +})); + +vi.mock('../../../../src/debug-build', () => ({ DEBUG_BUILD: true })); +const debugErrors: [string, unknown][] = []; +vi.mock('../../../../src/utils/debug-logger', () => ({ + debug: { error: (msg: string, e: unknown) => debugErrors.push([msg, e]) }, +})); + +vi.mock('../../../../src/semanticAttributes', () => ({ + SEMANTIC_ATTRIBUTE_SENTRY_OP: 'sentry.op', + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN: 'sentry.origin', +})); + +// --- Helpers --- + +function makeRequest(): ConnectRequest { + return { method: 'GET', url: '/test' }; +} + +function makeResponse(): ConnectResponse & { listeners: Record void)[]> } { + const listeners: Record void)[]> = {}; + return { + listeners, + addListener(event: string, fn: () => void) { + (listeners[event] ??= []).push(fn); + }, + removeListener(event: string, fn: () => void) { + listeners[event] = (listeners[event] ?? []).filter(l => l !== fn); + }, + }; +} + +function makeApp(): ConnectApp & { stack: Array<(...a: unknown[]) => unknown> } { + const stack: Array<(...a: unknown[]) => unknown> = []; + return { + stack, + use(...args: unknown[]) { + stack.push(args[args.length - 1] as (...a: unknown[]) => unknown); + return this; + }, + handle(req: unknown, res: unknown, out?: unknown) { + for (const fn of stack) { + fn(req, res, out ?? (() => undefined)); + } + return undefined; + }, + } as unknown as ConnectApp & { stack: Array<(...a: unknown[]) => unknown> }; +} + +function makeConnectModule(): ConnectModule { + return function connect() { + return makeApp(); + }; +} + +beforeEach(() => { + activeSpans.length = 0; + startInactiveSpanCalls.length = 0; + capturedExceptions.length = 0; + debugErrors.length = 0; + mockParentSpan = makeMockSpan('parent'); +}); + +// --- Tests --- + +describe('patchConnectModule', () => { + it('returns a factory that creates patched apps', () => { + const connect = makeConnectModule(); + const patched = patchConnectModule(connect); + const app = patched(); + expect(typeof app.use).toBe('function'); + expect(typeof app.handle).toBe('function'); + }); + + it('patched factory passes args to original factory', () => { + let receivedArgs: unknown[] = []; + const connect = function (...args: unknown[]) { + receivedArgs = args; + return makeApp(); + }; + const patched = patchConnectModule(connect); + patched('arg1', 'arg2'); + expect(receivedArgs).toStrictEqual(['arg1', 'arg2']); + }); + + it('wraps middleware added via use to create spans', () => { + const connect = makeConnectModule(); + const patched = patchConnectModule(connect); + const app = patched(); + + const req = makeRequest(); + const res = makeResponse(); + let middlewareCalled = false; + const middleware = function (_req: unknown, _res: unknown, n: () => void) { + middlewareCalled = true; + n(); + }; + // Clear inferred name so connect treats it as anonymous + Object.defineProperty(middleware, 'name', { value: '', configurable: true }); + + app.use(middleware as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(middlewareCalled).toBe(true); + expect(startInactiveSpanCalls).toHaveLength(1); + expect((startInactiveSpanCalls[0]!.options as { name: string }).name).toBe(ANONYMOUS_NAME); + }); +}); + +describe('patchConnectApp', () => { + it('wraps anonymous middleware without a route', () => { + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + let middlewareCalled = false; + // Use a function expression so .name === '' (treated as anonymous) + const middleware = function (_req: unknown, _res: unknown, n: () => void) { + middlewareCalled = true; + n(); + }; + + // Clear inferred name so connect treats it as anonymous + Object.defineProperty(middleware, 'name', { value: '', configurable: true }); + + app.use(middleware as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + expect(middlewareCalled).toBe(true); + + expect(startInactiveSpanCalls).toHaveLength(1); + const spanOptions = startInactiveSpanCalls[0]!.options as Record; + expect(spanOptions['name']).toBe(ANONYMOUS_NAME); + expect((spanOptions['attributes'] as Record)['connect.type']).toBe('middleware'); + expect((spanOptions['attributes'] as Record)['connect.name']).toBe(ANONYMOUS_NAME); + expect((spanOptions['attributes'] as Record)['sentry.op']).toBe('middleware.connect'); + expect((spanOptions['attributes'] as Record)['sentry.origin']).toBe('auto.http.connect'); + }); + + it('uses middleware.name when available for anonymous middleware', () => { + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + function myHandler(_req: unknown, _res: unknown, next: () => void) { + next(); + } + + app.use(myHandler as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(startInactiveSpanCalls).toHaveLength(1); + expect((startInactiveSpanCalls[0]!.options as { name: string }).name).toBe('myHandler'); + }); + + it('wraps named route handler with routeName', () => { + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + const handler = vi.fn((_req: unknown, _res: unknown, n: () => void) => n()); + + app.use('/users', handler as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(startInactiveSpanCalls).toHaveLength(1); + const spanOptions = startInactiveSpanCalls[0]!.options as Record; + expect(spanOptions['name']).toBe('/users'); + expect((spanOptions['attributes'] as Record)['connect.type']).toBe('request_handler'); + expect((spanOptions['attributes'] as Record)['connect.name']).toBe('/users'); + expect((spanOptions['attributes'] as Record)['sentry.op']).toBe('request_handler.connect'); + expect((spanOptions['attributes'] as Record)['http.route']).toBe('/users'); + }); + + it('calls onRouteResolved when a named route handler is matched', () => { + const app = makeApp(); + const routeResolved: string[] = []; + patchConnectApp(app, { onRouteResolved: r => routeResolved.push(r) }); + + const req = makeRequest(); + const res = makeResponse(); + const handler = vi.fn((_req: unknown, _res: unknown, next: () => void) => next()); + + app.use('/api', handler as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(routeResolved).toStrictEqual(['/api']); + }); + + it('does not call onRouteResolved for anonymous middleware', () => { + const app = makeApp(); + const routeResolved: string[] = []; + patchConnectApp(app, { onRouteResolved: r => routeResolved.push(r) }); + + const req = makeRequest(); + const res = makeResponse(); + const middleware = vi.fn((_req: unknown, _res: unknown, next: () => void) => next()); + + app.use(middleware as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(routeResolved).toStrictEqual([]); + }); + + it('ends span when next() is called', () => { + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + let capturedNext: (() => void) | undefined; + const middleware = vi.fn((_req: unknown, _res: unknown, next: () => void) => { + capturedNext = next; + }); + + app.use(middleware as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(activeSpans[0]!.ended).toBe(false); + capturedNext!(); + expect(activeSpans[0]!.ended).toBe(true); + }); + + it('ends span when response close event fires', () => { + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + const middleware = vi.fn((_req: unknown, _res: unknown, _next: unknown) => { + // intentionally does not call next + }); + + app.use(middleware as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(activeSpans[0]!.ended).toBe(false); + // Simulate response close + res.listeners['close']?.forEach(fn => fn()); + expect(activeSpans[0]!.ended).toBe(true); + }); + + it('does not end span twice if both next() and close fire', () => { + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + let capturedNext: (() => void) | undefined; + const middleware = vi.fn((_req: unknown, _res: unknown, next: () => void) => { + capturedNext = next; + }); + + app.use(middleware as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + capturedNext!(); + res.listeners['close']?.forEach(fn => fn()); + + // span.end is idempotent in our patchedMiddleware + expect(activeSpans[0]!.ended).toBe(true); + }); + + it('skips span creation when there is no active parent span', () => { + mockParentSpan = null; + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + const middleware = vi.fn((_req: unknown, _res: unknown, next: () => void) => next()); + + app.use(middleware as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(startInactiveSpanCalls).toHaveLength(0); + expect(middleware).toHaveBeenCalledOnce(); + }); + + it('preserves middleware arity (length) so connect can detect error handlers', () => { + const app = makeApp(); + patchConnectApp(app); + + const errorMiddleware = vi.fn(); + Object.defineProperty(errorMiddleware, 'length', { value: 4 }); + + app.use(errorMiddleware as unknown as (...args: unknown[]) => ConnectApp); + + // Verify that the patched middleware in the stack still has length 4 + const { stack } = app as unknown as { stack: Function[] }; + expect(stack[0]!.length).toBe(4); + }); + + it('error middleware uses (err, req, res, next) argument positions', () => { + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + const next = vi.fn(); + const err = new Error('oops'); + let capturedArgs: unknown[] = []; + + // 4-arg error middleware + const errorMiddleware = function (_err: unknown, _req: unknown, _res: unknown, _next: () => void) { + capturedArgs = [...arguments]; + _next(); + }; + + app.use(errorMiddleware as unknown as (...args: unknown[]) => ConnectApp); + + // Simulate connect calling the error middleware + const { stack } = app as unknown as { stack: Function[] }; + (stack[0] as Function)(err, req, res, next); + + expect(capturedArgs[0]).toBe(err); + expect(capturedArgs[1]).toBe(req); + expect(capturedArgs[2]).toBe(res); + expect(typeof capturedArgs[3]).toBe('function'); // patched next + }); + + it('wraps handle to track route stack per request', () => { + const app = makeApp(); + const routeResolved: string[] = []; + patchConnectApp(app, { onRouteResolved: r => routeResolved.push(r) }); + + const req = makeRequest(); + const res = makeResponse(); + + // Simulate nested: handle adds a layer, route handler resolves the route + const handler = vi.fn((_req: unknown, _res: unknown, next: () => void) => next()); + app.use('/api/users', handler as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + expect(routeResolved).toStrictEqual(['/api/users']); + }); + + it('emits debug error when patching use fails (already wrapped)', () => { + const app = makeApp(); + patchConnectApp(app); // first patch + patchConnectApp(app); // second patch — should log debug error + expect(debugErrors.some(([msg]) => (msg as string).includes('use'))).toBe(true); + }); + + it('emits debug error when patching handle fails (already wrapped)', () => { + const app = makeApp(); + patchConnectApp(app); // first patch + patchConnectApp(app); // second patch — should log debug error + expect(debugErrors.some(([msg]) => (msg as string).includes('handle'))).toBe(true); + }); + + it('http.route falls back to "/" for middleware without a routeName', () => { + const app = makeApp(); + patchConnectApp(app); + + const req = makeRequest(); + const res = makeResponse(); + const middleware = vi.fn((_req: unknown, _res: unknown, next: () => void) => next()); + + app.use(middleware as unknown as (...args: unknown[]) => ConnectApp); + app.handle(req, res, () => undefined); + + const attrs = (startInactiveSpanCalls[0]!.options as { attributes: Record }).attributes; + expect(attrs['http.route']).toBe('/'); + }); +}); + +describe('setupConnectErrorHandler', () => { + it('adds a 4-argument error middleware to the app', () => { + const added: unknown[] = []; + const app = { use: (fn: unknown) => added.push(fn) }; + setupConnectErrorHandler(app); + expect(added).toHaveLength(1); + const fn = added[0] as Function; + expect(fn.length).toBe(4); + }); + + it('captures exceptions via captureException', () => { + const added: unknown[] = []; + const app = { use: (fn: unknown) => added.push(fn) }; + setupConnectErrorHandler(app); + + const next = vi.fn(); + const err = new Error('test'); + (added[0] as Function)(err, {}, {}, next); + + expect(capturedExceptions).toStrictEqual([ + [ + err, + { + mechanism: { handled: false, type: 'auto.middleware.connect' }, + }, + ], + ]); + expect(next).toHaveBeenCalledExactlyOnceWith(err); + }); +}); diff --git a/packages/node/src/integrations/tracing/connect.ts b/packages/node/src/integrations/tracing/connect.ts index 3fd7d10b28fb..ef23e0af9b61 100644 --- a/packages/node/src/integrations/tracing/connect.ts +++ b/packages/node/src/integrations/tracing/connect.ts @@ -1,29 +1,67 @@ -import { ConnectInstrumentation } from '@opentelemetry/instrumentation-connect'; -import type { IntegrationFn, Span } from '@sentry/core'; +// Automatic instrumentation for Connect using our portable core integration +import type { InstrumentationConfig } from '@opentelemetry/instrumentation'; +import { InstrumentationBase, InstrumentationNodeModuleDefinition } from '@opentelemetry/instrumentation'; +import { context } from '@opentelemetry/api'; +import { getRPCMetadata, RPCType } from '@opentelemetry/core'; + +import type { ConnectIntegrationOptions, ConnectModule, IntegrationFn } from '@sentry/core'; import { - captureException, + patchConnectModule, + setupConnectErrorHandler as coreSetupConnectErrorHandler, + SDK_VERSION, defineIntegration, - getClient, - SEMANTIC_ATTRIBUTE_SENTRY_OP, - SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, - spanToJSON, } from '@sentry/core'; import { ensureIsWrapped, generateInstrumentOnce } from '@sentry/node-core'; type ConnectApp = { - // eslint-disable-next-line @typescript-eslint/no-explicit-any + // oxlint-disable-next-line no-explicit-any use: (middleware: any) => void; }; const INTEGRATION_NAME = 'Connect'; +const SUPPORTED_VERSIONS = ['>=3.0.0 <4']; + +export type ConnectInstrumentationConfig = InstrumentationConfig & Omit; + +export const instrumentConnect = generateInstrumentOnce( + INTEGRATION_NAME, + (options?: ConnectInstrumentationConfig) => new ConnectInstrumentation(options), +); -export const instrumentConnect = generateInstrumentOnce(INTEGRATION_NAME, () => new ConnectInstrumentation()); +export class ConnectInstrumentation extends InstrumentationBase { + public constructor(config: ConnectInstrumentationConfig = {}) { + super('sentry-connect', SDK_VERSION, config); + } + + public init(): InstrumentationNodeModuleDefinition { + let originalConnect: ConnectModule | undefined; -const _connectIntegration = (() => { + return new InstrumentationNodeModuleDefinition( + 'connect', + SUPPORTED_VERSIONS, + connect => { + originalConnect = connect as ConnectModule; + return patchConnectModule(connect as ConnectModule, { + onRouteResolved(route) { + const rpcMetadata = getRPCMetadata(context.active()); + if (route && rpcMetadata?.type === RPCType.HTTP) { + rpcMetadata.route = route; + } + }, + }); + }, + () => { + return originalConnect; + }, + ); + } +} + +const _connectIntegration = ((options?: ConnectInstrumentationConfig) => { return { name: INTEGRATION_NAME, setupOnce() { - instrumentConnect(); + instrumentConnect(options); }, }; }) satisfies IntegrationFn; @@ -46,17 +84,6 @@ const _connectIntegration = (() => { */ export const connectIntegration = defineIntegration(_connectIntegration); -// eslint-disable-next-line @typescript-eslint/no-explicit-any -function connectErrorMiddleware(err: any, req: any, res: any, next: any): void { - captureException(err, { - mechanism: { - handled: false, - type: 'auto.middleware.connect', - }, - }); - next(err); -} - /** * Add a Connect middleware to capture errors to Sentry. * @@ -71,46 +98,12 @@ function connectErrorMiddleware(err: any, req: any, res: any, next: any): void { * * Sentry.setupConnectErrorHandler(app); * - * // Add you connect routes here + * // Add your connect routes here * * app.listen(3000); * ``` */ export const setupConnectErrorHandler = (app: ConnectApp): void => { - app.use(connectErrorMiddleware); - - // Sadly, ConnectInstrumentation has no requestHook, so we need to add the attributes here - // We register this hook in this method, because if we register it in the integration `setup`, - // it would always run even for users that are not even using connect - const client = getClient(); - if (client) { - client.on('spanStart', span => { - addConnectSpanAttributes(span); - }); - } - + coreSetupConnectErrorHandler(app); ensureIsWrapped(app.use, 'connect'); }; - -function addConnectSpanAttributes(span: Span): void { - const attributes = spanToJSON(span).data; - - // this is one of: middleware, request_handler - const type = attributes['connect.type']; - - // If this is already set, or we have no connect span, no need to process again... - if (attributes[SEMANTIC_ATTRIBUTE_SENTRY_OP] || !type) { - return; - } - - span.setAttributes({ - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.http.otel.connect', - [SEMANTIC_ATTRIBUTE_SENTRY_OP]: `${type}.connect`, - }); - - // Also update the name, we don't need the "middleware - " prefix - const name = attributes['connect.name']; - if (typeof name === 'string') { - span.updateName(name); - } -} From 8514248303714984bdd86eff9e01cff9fea1644c Mon Sep 17 00:00:00 2001 From: isaacs Date: Fri, 8 May 2026 23:28:04 -0700 Subject: [PATCH 2/5] feat(core): HTTP server-spans subscription utility Add platform-portable building blocks that server SDKs use to instrument incoming HTTP requests without depending on OTel HTTP instrumentation: - `getHttpServerSubscriptions`: diagnostics_channel listener for `http.server.request.start`, set up isolation scope, request data, trace continuation, optional body capture and request-session tracking - `getHttpServerSpanSubscriptions`: wrapper that creates the root server span around the request lifecycle, applying static-asset/status-code filtering, `ignoreIncomingRequests` and `onSpanCreated` hooks - `recordRequestSession`: release-health session aggregation per request - `patchRequestToCaptureBody`: opt-in incoming request body capture Add `kind` field on `StartSpanOptions` so OTel-based SDKs can set SpanKind on the underlying span, and update `headersToDict` to allow `number`-valued headers to support Node.js types. --- packages/core/src/index.ts | 3 + .../core/src/integrations/connect/index.ts | 24 +- .../http/patch-request-to-capture-body.ts | 125 ++++++++++ .../http/record-request-session.ts | 86 +++++++ .../http/server-span-subscription.ts | 226 ++++++++++++++++++ .../integrations/http/server-subscription.ts | 180 ++++++++++++++ packages/core/src/integrations/http/types.ts | 4 +- .../core/src/types-hoist/startSpanOptions.ts | 14 ++ packages/core/src/utils/request.ts | 4 +- 9 files changed, 654 insertions(+), 12 deletions(-) create mode 100644 packages/core/src/integrations/http/patch-request-to-capture-body.ts create mode 100644 packages/core/src/integrations/http/record-request-session.ts create mode 100644 packages/core/src/integrations/http/server-span-subscription.ts create mode 100644 packages/core/src/integrations/http/server-subscription.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a093a70b6264..8de69ba0b1b3 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -154,6 +154,9 @@ export { growthbookIntegration } from './integrations/featureFlags'; export { conversationIdIntegration } from './integrations/conversationId'; export { patchHttpModuleClient } from './integrations/http/client-patch'; export { getHttpClientSubscriptions } from './integrations/http/client-subscriptions'; +export { getHttpServerSubscriptions } from './integrations/http/server-subscription'; +export { recordRequestSession } from './integrations/http/record-request-session'; +export { getHttpServerSpanSubscriptions, isStaticAssetRequest } from './integrations/http/server-span-subscription'; export { addOutgoingRequestBreadcrumb } from './integrations/http/add-outgoing-request-breadcrumb'; export { getRequestUrl, diff --git a/packages/core/src/integrations/connect/index.ts b/packages/core/src/integrations/connect/index.ts index b008e10e5ca0..5a5168a7e7d3 100644 --- a/packages/core/src/integrations/connect/index.ts +++ b/packages/core/src/integrations/connect/index.ts @@ -46,7 +46,6 @@ const _LAYERS_STORE_PROPERTY = Symbol('sentry.connect.request-route-stack'); export type ConnectRequest = { method?: string; url?: string; - [key: symbol]: unknown; }; export type ConnectResponse = { @@ -86,8 +85,17 @@ export interface ConnectIntegrationOptions { // Tracks nested route paths on the request object, mirroring the OTel // connect instrumentation's approach for building the full HTTP route. +// `ConnectRequest` is intentionally minimal so it stays compatible with +// downstream consumers compiled under TS 3.8 (which doesn't allow `symbol` +// index signatures). The route stack is stored on the request via a Symbol +// key; we cast through a symbol-indexed type here so the access is typed +// without leaking the symbol-index into the public `ConnectRequest` type. +function getLayers(req: ConnectRequest): string[] | undefined { + return (req as { [k: symbol]: unknown })[_LAYERS_STORE_PROPERTY] as string[] | undefined; +} + function addNewStackLayer(req: ConnectRequest): () => void { - let layers = req[_LAYERS_STORE_PROPERTY] as string[] | undefined; + let layers = getLayers(req); if (!Array.isArray(layers)) { layers = []; Object.defineProperty(req, _LAYERS_STORE_PROPERTY, { @@ -98,18 +106,16 @@ function addNewStackLayer(req: ConnectRequest): () => void { layers.push('/'); const stackLength = layers.length; return () => { - if ( - Array.isArray(req[_LAYERS_STORE_PROPERTY]) && - (req[_LAYERS_STORE_PROPERTY] as string[]).length === stackLength - ) { - (req[_LAYERS_STORE_PROPERTY] as string[]).pop(); + const current = getLayers(req); + if (Array.isArray(current) && current.length === stackLength) { + current.pop(); } }; } function replaceCurrentStackRoute(req: ConnectRequest, newRoute: string): void { if (!newRoute) return; - const layers = req[_LAYERS_STORE_PROPERTY] as string[] | undefined; + const layers = getLayers(req); if (Array.isArray(layers) && layers.length > 0) { layers.splice(-1, 1, newRoute); } @@ -118,7 +124,7 @@ function replaceCurrentStackRoute(req: ConnectRequest, newRoute: string): void { // Combines all stack layers into a single route path, deduplicating slashes: // ['/api/', '/users', '/:id'] => '/api/users/:id' function generateRoute(req: ConnectRequest): string { - const layers = req[_LAYERS_STORE_PROPERTY] as string[] | undefined; + const layers = getLayers(req); /* v8 ignore start */ if (!Array.isArray(layers) || layers.length === 0) return '/'; return layers.reduce((acc: string, sub: string) => acc.replace(/\/+$/, '') + sub); diff --git a/packages/core/src/integrations/http/patch-request-to-capture-body.ts b/packages/core/src/integrations/http/patch-request-to-capture-body.ts new file mode 100644 index 000000000000..84e0469b33b0 --- /dev/null +++ b/packages/core/src/integrations/http/patch-request-to-capture-body.ts @@ -0,0 +1,125 @@ +import type { Scope } from '../../scope'; +import { debug } from '../../utils/debug-logger'; +import { DEBUG_BUILD } from '../../debug-build'; +import type { HttpIncomingMessage } from './types'; + +const MAX_BODY_BYTE_LENGTH = 1024 * 1024; + +/** + * This method patches the request object to capture the body. + * Instead of actually consuming the streamed body ourselves, which has + * potential side effects, we monkey patch `req.on('data')` to intercept + * the body chunks. This way, we only read the body if the user also consumes + * the body, ensuring we do not change any behavior in unexpected ways. + */ +export function patchRequestToCaptureBody( + req: HttpIncomingMessage, + isolationScope: Scope, + maxIncomingRequestBodySize: 'small' | 'medium' | 'always', + integrationName: string, +): void { + let bodyByteLength = 0; + const chunks: Buffer[] = []; + + DEBUG_BUILD && debug.log(integrationName, 'Patching request.on'); + + // keep track of the original callbacks to remove listeners later + // `off` depends on having the exact same function reference passed in, + // so we need to be able to map original listeners to our wrapped ones. + const callbackMap = new WeakMap(); + + const maxBodySize = + maxIncomingRequestBodySize === 'small' + ? 1_000 + : maxIncomingRequestBodySize === 'medium' + ? 10_000 + : MAX_BODY_BYTE_LENGTH; + + try { + // eslint-disable-next-line @typescript-eslint/unbound-method + req.on = req.addListener = new Proxy(req.on, { + apply: (target, thisArg, args: Parameters) => { + const [event, listener, ...restArgs] = args; + + if (event === 'data') { + DEBUG_BUILD && + debug.log(integrationName, `Handling request.on("data") with maximum body size of ${maxBodySize}b`); + + const callback = new Proxy(listener, { + apply: (target, thisArg, args: Parameters) => { + try { + const chunk = args[0] as Buffer | string; + const bufferifiedChunk = Buffer.from(chunk as string); + + if (bodyByteLength < maxBodySize) { + chunks.push(bufferifiedChunk); + bodyByteLength += bufferifiedChunk.byteLength; + } else if (DEBUG_BUILD) { + debug.log( + integrationName, + `Dropping request body chunk because maximum body length of ${maxBodySize}b is exceeded.`, + ); + } + } catch (_err) { + DEBUG_BUILD && debug.error(integrationName, 'Encountered error while storing body chunk.'); + } + + return Reflect.apply(target, thisArg, args); + }, + }); + + callbackMap.set(listener, callback); + + return Reflect.apply(target, thisArg, [event, callback, ...restArgs]); + } + + return Reflect.apply(target, thisArg, args); + }, + }); + + // Ensure we also remove callbacks correctly + // eslint-disable-next-line @typescript-eslint/unbound-method + req.off = req.removeListener = new Proxy(req.off, { + apply: (target, thisArg, args: Parameters) => { + const [, listener] = args; + + const callback = callbackMap.get(listener); + if (callback) { + callbackMap.delete(listener); + + const modifiedArgs = args.slice(); + modifiedArgs[1] = callback; + return Reflect.apply(target, thisArg, modifiedArgs); + } + + return Reflect.apply(target, thisArg, args); + }, + }); + + req.on('end', () => { + try { + const body = Buffer.concat(chunks).toString('utf-8'); + if (body) { + // Using Buffer.byteLength here, because the body may contain characters that are not 1 byte long + const bodyByteLength = Buffer.byteLength(body, 'utf-8'); + const truncatedBody = + bodyByteLength > maxBodySize + ? `${Buffer.from(body) + .subarray(0, maxBodySize - 3) + .toString('utf-8')}...` + : body; + + isolationScope.setSDKProcessingMetadata({ normalizedRequest: { data: truncatedBody } }); + } + } catch (error) { + if (DEBUG_BUILD) { + debug.error(integrationName, 'Error building captured request body', error); + } + } + }); + } catch (error) { + if (DEBUG_BUILD) { + debug.error(integrationName, 'Error patching request to capture body', error); + } + } +} diff --git a/packages/core/src/integrations/http/record-request-session.ts b/packages/core/src/integrations/http/record-request-session.ts new file mode 100644 index 000000000000..07549eeeb5c5 --- /dev/null +++ b/packages/core/src/integrations/http/record-request-session.ts @@ -0,0 +1,86 @@ +import type { Client } from '../../client'; +import { debug } from '../../utils/debug-logger'; +import { DEBUG_BUILD } from '../../debug-build'; +import type { Scope } from '../../scope'; +import type { HttpServerResponse } from './types'; +import type { AggregationCounts } from '../../types-hoist/session'; + +const clientToRequestSessionAggregatesMap = new WeakMap< + Client, + { [timestampRoundedToSeconds: string]: { exited: number; crashed: number; errored: number } } +>(); + +/** + * Starts a session and tracks it in the context of a given isolation scope. + * When the passed response is finished, the session is put into a task and + * is aggregated with other sessions that may happen in a certain time window + * (sessionFlushingDelayMs). + * + * The sessions are always aggregated by the client that is on the current + * scope at the time of ending the response (if there is one). + */ +export function recordRequestSession( + client: Client, + { + requestIsolationScope, + response, + sessionFlushingDelayMS, + }: { + requestIsolationScope: Scope; + response: HttpServerResponse; + sessionFlushingDelayMS?: number; + }, +): void { + requestIsolationScope.setSDKProcessingMetadata({ + requestSession: { status: 'ok' }, + }); + + response.once('close', () => { + const requestSession = requestIsolationScope.getScopeData().sdkProcessingMetadata.requestSession; + + if (client && requestSession) { + DEBUG_BUILD && debug.log(`Recorded request session with status: ${requestSession.status}`); + + const roundedDate = new Date(); + roundedDate.setSeconds(0, 0); + const dateBucketKey = roundedDate.toISOString(); + + const existingClientAggregate = clientToRequestSessionAggregatesMap.get(client); + const bucket = existingClientAggregate?.[dateBucketKey] || { exited: 0, crashed: 0, errored: 0 }; + bucket[({ ok: 'exited', crashed: 'crashed', errored: 'errored' } as const)[requestSession.status]]++; + + if (existingClientAggregate) { + existingClientAggregate[dateBucketKey] = bucket; + } else { + DEBUG_BUILD && debug.log('Opened new request session aggregate.'); + const newClientAggregate = { [dateBucketKey]: bucket }; + clientToRequestSessionAggregatesMap.set(client, newClientAggregate); + + const flushPendingClientAggregates = (): void => { + clearTimeout(timeout); + unregisterClientFlushHook(); + clientToRequestSessionAggregatesMap.delete(client); + + const aggregatePayload: AggregationCounts[] = Object.entries(newClientAggregate).map( + ([timestamp, value]) => ({ + started: timestamp, + exited: value.exited, + errored: value.errored, + crashed: value.crashed, + }), + ); + client.sendSession({ aggregates: aggregatePayload }); + }; + + const unregisterClientFlushHook = client.on('flush', () => { + DEBUG_BUILD && debug.log('Sending request session aggregate due to client flush'); + flushPendingClientAggregates(); + }); + const timeout = setTimeout(() => { + DEBUG_BUILD && debug.log('Sending request session aggregate due to flushing schedule'); + flushPendingClientAggregates(); + }, sessionFlushingDelayMS).unref?.(); + } + } + }); +} diff --git a/packages/core/src/integrations/http/server-span-subscription.ts b/packages/core/src/integrations/http/server-span-subscription.ts new file mode 100644 index 000000000000..609981526e1e --- /dev/null +++ b/packages/core/src/integrations/http/server-span-subscription.ts @@ -0,0 +1,226 @@ +import { getIsolationScope } from '../../currentScopes'; +import { DEBUG_BUILD } from '../../debug-build'; +import { parseStringToURLObject, stripUrlQueryAndFragment } from '../../utils/url'; +import { getHttpServerSubscriptions, type HttpServerSubscriptions } from './server-subscription'; +import type { HttpIncomingMessage, HttpInstrumentationOptions } from './types'; +import { debug } from '../../utils/debug-logger'; +import { getSpanStatusFromHttpCode, SPAN_STATUS_ERROR, startSpanManual } from '../../tracing'; +import { + SEMANTIC_ATTRIBUTE_SENTRY_OP, + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, + SEMANTIC_ATTRIBUTE_SENTRY_SOURCE, +} from '../../semanticAttributes'; +import type { SpanAttributes } from '../../types-hoist/span'; +import { headersToDict, httpHeadersToSpanAttributes } from '../../utils/request'; +import type { SpanStatus } from '../../types-hoist/spanStatus'; + +// Tree-shakable guard to remove all code related to tracing +declare const __SENTRY_TRACING__: boolean; + +function shouldIgnoreSpansForIncomingRequest( + request: HttpIncomingMessage, + { + ignoreStaticAssets, + ignoreIncomingRequests, + }: { + ignoreStaticAssets?: boolean; + ignoreIncomingRequests?: (urlPath: string, request: HttpIncomingMessage) => boolean; + }, +): boolean { + // request.url is the only property that holds any information about the url + // it only consists of the URL path and query string (if any) + const urlPath = request.url; + + const method = request.method?.toUpperCase(); + // We do not capture OPTIONS/HEAD requests as spans + if (method === 'OPTIONS' || method === 'HEAD' || !urlPath) { + return true; + } + + // Default static asset filtering + if (ignoreStaticAssets && method === 'GET' && isStaticAssetRequest(urlPath)) { + return true; + } + + if (ignoreIncomingRequests?.(urlPath, request)) { + return true; + } + + return false; +} + +const INTEGRATION_NAME = 'Http.SentryServerSpans'; + +export function getHttpServerSpanSubscriptions(options: HttpInstrumentationOptions): HttpServerSubscriptions { + const { + wrapServerEmitRequest: wrap, + ignoreIncomingRequests, + ignoreStaticAssets, + onSpanCreated, + errorMonitor = 'error', + onSpanEnd, + } = options; + + return getHttpServerSubscriptions({ + ...options, + wrapServerEmitRequest(request, response, normalizedRequest, _next) { + if (typeof __SENTRY_TRACING__ !== 'undefined' && !__SENTRY_TRACING__) { + return _next(); + } + + // If the user provided a wrapServerEmitRequest, call it as the outer + // wrapper so it can set up context (e.g. OTel propagation) before the + // span is created. + return wrap ? wrap(request, response, normalizedRequest, createSpan) : createSpan(); + + function createSpan() { + const isolationScope = getIsolationScope(); + const client = isolationScope.getClient(); + if (!client) { + return _next(); + } + + if ( + shouldIgnoreSpansForIncomingRequest(request, { + ignoreStaticAssets, + ignoreIncomingRequests, + }) + ) { + DEBUG_BUILD && debug.log(INTEGRATION_NAME, 'Skipping span creation for incoming request', request.url); + return _next(); + } + + const fullUrl = normalizedRequest.url || request.url || '/'; + const urlObj = parseStringToURLObject(fullUrl); + const httpTargetWithoutQueryFragment = urlObj ? urlObj.pathname : stripUrlQueryAndFragment(fullUrl); + const method = (request.method || 'GET').toUpperCase(); + const name = `${method} ${httpTargetWithoutQueryFragment}`; + const headers = request.headers; + const userAgent = headers['user-agent']; + const ips = headers['x-forwarded-for']; + const httpVersion = request.httpVersion; + const host = headers.host as undefined | string; + const hostname = host?.replace(/^(.*)(:[0-9]{1,5})/, '$1') || 'localhost'; + const scheme = fullUrl.startsWith('https') ? 'https' : 'http'; + const { socket } = request; + const { localAddress, localPort, remoteAddress, remotePort } = socket ?? {}; + + return startSpanManual( + { + name, + // SpanKind.SERVER = 1; pass this so the OTel sampler infers + // op='http.server' rather than 'http', which it does for + // SpanKind.INTERNAL = 0, the default + kind: 1, + attributes: { + // Sentry-specific attributes + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'http.server', + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.http.otel.http', + [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'route', + // Set http.route to the URL path as a best-effort route name. + // Framework integrations (Express, etc.) update this via onSpanEnd. + 'http.route': httpTargetWithoutQueryFragment, + // OTel kind (explicit attribute so it appears in span data) + 'otel.kind': 'SERVER', + // Network attributes + 'net.host.ip': localAddress, + 'net.host.port': localPort, + 'net.peer.ip': remoteAddress, + 'net.peer.port': remotePort, + 'sentry.http.prefetch': isKnownPrefetchRequest(request) || undefined, + // Old Semantic Conventions attributes for compatibility + 'http.url': fullUrl, + 'http.method': method, + 'http.target': urlObj ? `${urlObj.pathname}${urlObj.search}` : httpTargetWithoutQueryFragment, + 'http.host': host, + 'net.host.name': hostname, + 'http.client_ip': typeof ips === 'string' ? ips.split(',')[0] : undefined, + 'http.user_agent': userAgent, + 'http.scheme': scheme, + 'http.flavor': httpVersion, + 'net.transport': httpVersion?.toUpperCase() === 'QUIC' ? 'ip_udp' : 'ip_tcp', + ...getRequestContentLengthAttribute(request), + ...httpHeadersToSpanAttributes( + normalizedRequest.headers || {}, + client.getOptions().sendDefaultPii ?? false, + ), + }, + }, + span => { + onSpanCreated?.(span, request, response); + // Ensure we only end the span once + // E.g. error can be emitted before close is emitted + let isEnded = false; + + function endSpan(status: SpanStatus): void { + if (isEnded) { + return; + } + + isEnded = true; + // set attributes that come from the response + span.setAttributes({ + 'http.status_text': response.statusMessage?.toUpperCase(), + 'http.response.status_code': response.statusCode, + 'http.status_code': response.statusCode, + ...httpHeadersToSpanAttributes( + headersToDict(response.headers), + client?.getOptions().sendDefaultPii ?? false, + 'response', + ), + }); + span.setStatus(status); + onSpanEnd?.(span, request, response); + span.end(); + } + + response.once('close', () => { + endSpan(getSpanStatusFromHttpCode(response.statusCode)); + }); + + response.once(errorMonitor, () => { + const httpStatus = getSpanStatusFromHttpCode(response.statusCode); + // Ensure we def. have an error status here + endSpan(httpStatus.code === SPAN_STATUS_ERROR ? httpStatus : { code: SPAN_STATUS_ERROR }); + }); + + // Continue handling the request inside the active span context + _next(); + }, + ); + } + }, + }); +} + +export function isStaticAssetRequest(urlPath: string): boolean { + const path = stripUrlQueryAndFragment(urlPath); + // Common static file extensions + if (path.match(/\.(ico|png|jpg|jpeg|gif|svg|css|js|woff|woff2|ttf|eot|webp|avif)$/)) { + return true; + } + + // Common metadata files + if (path.match(/^\/(robots\.txt|sitemap\.xml|manifest\.json|browserconfig\.xml)$/)) { + return true; + } + + return false; +} + +function isKnownPrefetchRequest(req: HttpIncomingMessage): boolean { + // Currently only handles Next.js prefetch requests but may check other frameworks in the future. + return req.headers['next-router-prefetch'] === '1'; +} + +function getRequestContentLengthAttribute(request: HttpIncomingMessage): SpanAttributes { + const { headers } = request; + const contentLengthHeader = headers['content-length']; + const length = contentLengthHeader ? parseInt(String(contentLengthHeader), 10) : -1; + const encoding = headers['content-encoding']; + return length >= 0 + ? encoding && encoding !== 'identity' + ? { 'http.request_content_length': length } + : { 'http.request_content_length_uncompressed': length } + : {}; +} diff --git a/packages/core/src/integrations/http/server-subscription.ts b/packages/core/src/integrations/http/server-subscription.ts new file mode 100644 index 000000000000..615f4c94c3b0 --- /dev/null +++ b/packages/core/src/integrations/http/server-subscription.ts @@ -0,0 +1,180 @@ +/** + * Provide the `http.server.request.start` subscription function that we use + * to instrument incoming HTTP requests that use the `node:http` module. + * + * On Node.js v18.7 and up, we can just assign the diagnostics channel + * listener, and that's enough. But for older node versions, or other SSJS + * platforms, we have to explicitly fire the provided method an a patched + * Server.emit method. + * + * This decision is made in the relevant Node/Bun/Deno SDKs; core just + * provides them with the methods to use. + */ + +import type { ServerSubscriptionName } from './constants'; +type ChannelListener = (message: unknown, name: string | symbol) => void; +import { HTTP_ON_SERVER_REQUEST } from './constants'; +import type { HttpIncomingMessage, HttpInstrumentationOptions, HttpServer, HttpServerResponse } from './types'; +import { DEBUG_BUILD } from '../../debug-build'; +import { debug } from '../../utils/debug-logger'; +import { getClient, getCurrentScope, getIsolationScope, withIsolationScope } from '../../currentScopes'; +import { httpRequestToRequestData } from '../../utils/request'; +import { patchRequestToCaptureBody } from './patch-request-to-capture-body'; +import { stripUrlQueryAndFragment } from '../../utils/url'; +import { recordRequestSession } from './record-request-session'; +import { generateSpanId, generateTraceId } from '../../utils/propagationContext'; +import { continueTrace } from '../../tracing/trace'; +import { safeMathRandom } from '../../utils/randomSafeContext'; + +const INTEGRATION_NAME = 'Http.Server'; + +export type HttpServerSubscriptions = Record; + +// Tracks the last Sentry-created emit wrapper for each server so we can detect +// when user code has replaced server.emit (e.g. with a proxy of the original) +// and re-wrap it to restore Sentry's instrumentation. +const lastSentryEmitMap = new WeakMap(); + +const kRequestMark = Symbol.for('sentry_http_server_instrumented'); +type MarkedRequest = HttpIncomingMessage & { + [kRequestMark]?: boolean; +}; + +/** return true if it is NOT already marked */ +function markRequest(request: MarkedRequest): boolean { + return !request[kRequestMark] && (request[kRequestMark] = true); +} + +export function instrumentServer(options: HttpInstrumentationOptions, server: HttpServer): void { + // Use a proxy and a WeakSet of server objects here, rather than a + // wrappedFunction, because NestJS has been observed to "fork" emit + // methods, including copying properties, leading to false positives. + // Furthermore, we mark the Request object so that if two copies of this + // instrumentation both are run on forked emit() methods for the same + // request, we still only ever create a single root span. Previously, + // this was done with a flag on the OTEL context, but in this non-OTEL + // version, we mark the Request itself with a non-enumerable prop instead. + + // oxlint-disable-next-line typescript/unbound-method -- `this` is forwarded via Proxy/target.apply below + const currentEmit = server.emit; + const instrumentedEmit = lastSentryEmitMap.get(server); + + // Skip re-wrapping only if already instrumented AND server.emit still points + // to our wrapper. If user code replaced server.emit (e.g. with a proxy of the + // original pre-Sentry emit), re-wrap so Sentry's instrumentation is restored. + if (currentEmit === instrumentedEmit) { + return; + } + + const newEmit = new Proxy(currentEmit, { + apply(target, thisArg, args: unknown[]) { + const [event, ...data] = args; + if (event !== 'request') { + return target.apply(thisArg, args); + } + + const client = getClient(); + const [request, response] = data as [HttpIncomingMessage, HttpServerResponse]; + + if (!client || !markRequest(request)) { + return target.apply(thisArg, args); + } + + DEBUG_BUILD && debug.log(INTEGRATION_NAME, 'Handling incoming request'); + const isolationScope = getIsolationScope().clone(); + isolationScope.setClient(client); + + const ipAddress = request.socket?.remoteAddress; + const url = request.url || '/'; + const normalizedRequest = httpRequestToRequestData(request); + const { + maxRequestBodySize = 'medium', + ignoreRequestBody, + sessions = true, + sessionFlushingDelayMS = 60_000, + } = options; + + if (maxRequestBodySize !== 'none' && !ignoreRequestBody?.(url, request)) { + patchRequestToCaptureBody(request, isolationScope, maxRequestBodySize, INTEGRATION_NAME); + } + + // Update the isolation scope, isolate this request + isolationScope.setSDKProcessingMetadata({ normalizedRequest, ipAddress }); + + // attempt to update the scope's `transactionName` based on the request URL + // Ideally, framework instrumentations coming after the HttpInstrumentation + // update the transactionName once we get a parameterized route. + const httpMethod = (request.method || 'GET').toUpperCase(); + const httpTargetWithoutQueryFragment = stripUrlQueryAndFragment(url); + + const bestEffortTransactionName = `${httpMethod} ${httpTargetWithoutQueryFragment}`; + + isolationScope.setTransactionName(bestEffortTransactionName); + + // light does not do this + if (sessions) { + recordRequestSession(client, { + requestIsolationScope: isolationScope, + response, + sessionFlushingDelayMS: sessionFlushingDelayMS ?? 60_000, + }); + } + + return withIsolationScope(isolationScope, () => { + const sentryTrace = normalizedRequest.headers?.['sentry-trace']; + const baggage = normalizedRequest.headers?.['baggage']; + const sentryTraceValue = Array.isArray(sentryTrace) ? sentryTrace[0] : sentryTrace; + return continueTrace( + { + sentryTrace: sentryTraceValue, + baggage: Array.isArray(baggage) ? baggage[0] : baggage, + }, + () => { + const propagationContext = getCurrentScope().getPropagationContext(); + // Set propagationSpanId after continueTrace because it calls + // withScope + setPropagationContext internally, which would + // overwrite any previously set value. + propagationContext.propagationSpanId = generateSpanId(); + // In OTel mode, continueTrace does not generate a new traceId + // when there is no incoming sentry-trace header. We generate one + // explicitly here so each request gets a unique trace ID even when + // tracing is disabled. + if (!sentryTraceValue) { + propagationContext.traceId = generateTraceId(); + propagationContext.sampleRand = safeMathRandom(); + } + + response.once('close', () => { + isolationScope.setContext('response', { + status_code: response.statusCode, + }); + }); + + const wrap = options.wrapServerEmitRequest; + let emitResult: boolean = false; + if (wrap) { + wrap(request, response, normalizedRequest, () => { + emitResult = target.apply(thisArg, args) as boolean; + }); + } else { + emitResult = target.apply(thisArg, args) as boolean; + } + return emitResult; + }, + ); + }); + }, + }); + + lastSentryEmitMap.set(server, newEmit); + server.emit = newEmit; +} + +export function getHttpServerSubscriptions(options: HttpInstrumentationOptions): HttpServerSubscriptions { + const onHttpServerRequest: ChannelListener = (data: unknown): void => { + const { server } = data as { server: HttpServer }; + instrumentServer(options, server); + }; + + return { [HTTP_ON_SERVER_REQUEST]: onHttpServerRequest }; +} diff --git a/packages/core/src/integrations/http/types.ts b/packages/core/src/integrations/http/types.ts index fb9a132a1e3b..faefdc910451 100644 --- a/packages/core/src/integrations/http/types.ts +++ b/packages/core/src/integrations/http/types.ts @@ -65,8 +65,8 @@ export interface HttpServerResponse { statusCode: number; statusMessage?: string; headers: Record; - once(ev: string, ...data: unknown[]): this; - once(ev: 'close'): this; + once(ev: string | symbol, handler: (...data: unknown[]) => void): this; + once(ev: 'close', handler: () => void): this; on(ev: string | symbol, handler: (...data: unknown[]) => void): this; } diff --git a/packages/core/src/types-hoist/startSpanOptions.ts b/packages/core/src/types-hoist/startSpanOptions.ts index eb3aa0b53299..ba184152e486 100644 --- a/packages/core/src/types-hoist/startSpanOptions.ts +++ b/packages/core/src/types-hoist/startSpanOptions.ts @@ -28,6 +28,20 @@ export interface StartSpanOptions { /** An op for the span. This is a categorization for spans. */ op?: string; + /** + * The kind of the span, following OpenTelemetry's SpanKind enum. + * - 0 = INTERNAL (default) + * - 1 = SERVER + * - 2 = CLIENT + * - 3 = PRODUCER + * - 4 = CONSUMER + * + * This is used by OpenTelemetry-based SDK implementations to set the correct + * span kind on the underlying OTel span, which affects how the span is + * displayed and sampled. + */ + kind?: 0 | 1 | 2 | 3 | 4; + /** * If provided, make the new span a child of this span. * If this is not provided, the new span will be a child of the currently active span. diff --git a/packages/core/src/utils/request.ts b/packages/core/src/utils/request.ts index 700f0272f121..81aad8dde29c 100644 --- a/packages/core/src/utils/request.ts +++ b/packages/core/src/utils/request.ts @@ -61,7 +61,9 @@ export function winterCGHeadersToDict(winterCGHeaders: WebFetchHeaders): Record< /** * Convert common request headers to a simple dictionary. */ -export function headersToDict(reqHeaders: Record): Record { +export function headersToDict( + reqHeaders: Record, +): Record { const headers: Record = Object.create(null); try { From 81346699b550e2d3a0042db912d681ec3532b864 Mon Sep 17 00:00:00 2001 From: isaacs Date: Sat, 9 May 2026 10:36:30 -0700 Subject: [PATCH 3/5] feat(node-core): use core HTTP server subscription utility in light SDK Replace inline `instrumentServer` Proxy/emit-wrapping implementation in node-light's HTTP integration with core's `getHttpServerSubscriptions`, which does the same work (isolation scope, request data, body capture, trace continuation, best-effort transaction name). Centralized implementation lets Bun, Deno, Workers, and the OTel-based Node SDK share server-side primitives. Also drop `wrappedEmitFns` and the import of several core utilities that are now consumed via the subscriptions. --- .../src/light/integrations/httpIntegration.ts | 115 ++---------------- 1 file changed, 7 insertions(+), 108 deletions(-) diff --git a/packages/node-core/src/light/integrations/httpIntegration.ts b/packages/node-core/src/light/integrations/httpIntegration.ts index e5a13fd6782d..5ee31c6f07bb 100644 --- a/packages/node-core/src/light/integrations/httpIntegration.ts +++ b/packages/node-core/src/light/integrations/httpIntegration.ts @@ -3,24 +3,16 @@ import type { RequestOptions } from 'node:http'; import type { HttpClientRequest, HttpIncomingMessage, Integration, IntegrationFn } from '@sentry/core'; import { addOutgoingRequestBreadcrumb, - continueTrace, - debug, - generateSpanId, getCurrentScope, getHttpClientSubscriptions, - getIsolationScope, - HTTP_ON_CLIENT_REQUEST, - httpRequestToRequestData, - stripUrlQueryAndFragment, - SUPPRESS_TRACING_KEY, - withIsolationScope, + getHttpServerSubscriptions, getRequestOptions, getRequestUrlFromClientRequest, + HTTP_ON_CLIENT_REQUEST, + HTTP_ON_SERVER_REQUEST, + SUPPRESS_TRACING_KEY, } from '@sentry/core'; -import type { ClientRequest, IncomingMessage, Server } from 'node:http'; -import { DEBUG_BUILD } from '../../debug-build'; -import { patchRequestToCaptureBody } from '../../utils/captureRequestBody'; -import type { LightNodeClient } from '../client'; +import type { ClientRequest } from 'node:http'; import { errorMonitor } from 'node:events'; import { NODE_VERSION } from '../../nodeVersion'; @@ -31,9 +23,6 @@ const FULLY_SUPPORTS_HTTP_DIAGNOSTICS_CHANNEL = (NODE_VERSION.major === 23 && NODE_VERSION.minor >= 2) || NODE_VERSION.major >= 24; -// We keep track of emit functions we wrapped, to avoid double wrapping -const wrappedEmitFns = new WeakSet(); - export interface HttpIntegrationOptions { /** * Do not capture the request body for incoming HTTP requests to URLs where the given callback returns `true`. @@ -101,10 +90,7 @@ const _httpIntegration = ((options: HttpIntegrationOptions = {}) => { return { name: INTEGRATION_NAME, setupOnce() { - const onHttpServerRequestStart = (_data: unknown) => { - const data = _data as { server: Server }; - instrumentServer(data.server, _options); - }; + const { [HTTP_ON_SERVER_REQUEST]: onHttpServerRequestStart } = getHttpServerSubscriptions(_options); const { ignoreOutgoingRequests } = _options; @@ -120,7 +106,7 @@ const _httpIntegration = ((options: HttpIntegrationOptions = {}) => { errorMonitor, }); - subscribe('http.server.request.start', onHttpServerRequestStart); + subscribe(HTTP_ON_SERVER_REQUEST, onHttpServerRequestStart); // Subscribe on the request creation in node versions that support it subscribe(HTTP_ON_CLIENT_REQUEST, onHttpClientRequestCreated); @@ -180,90 +166,3 @@ export const httpIntegration = _httpIntegration as (options?: HttpIntegrationOpt name: 'Http'; setupOnce: () => void; }; - -/** - * Instrument a server to capture incoming requests. - */ -function instrumentServer( - server: Server, - { - ignoreRequestBody, - maxRequestBodySize, - }: { - ignoreRequestBody?: (url: string, request: IncomingMessage) => boolean; - maxRequestBodySize: 'small' | 'medium' | 'always' | 'none'; - }, -): void { - // eslint-disable-next-line @typescript-eslint/unbound-method - const originalEmit: typeof Server.prototype.emit = server.emit; - - if (wrappedEmitFns.has(originalEmit)) { - return; - } - - const newEmit = new Proxy(originalEmit, { - apply(target, thisArg, args: [event: string, ...args: unknown[]]) { - // Only handle request events - if (args[0] !== 'request') { - return target.apply(thisArg, args); - } - - const client = getCurrentScope().getClient(); - - if (!client) { - return target.apply(thisArg, args); - } - - DEBUG_BUILD && debug.log(INTEGRATION_NAME, 'Handling incoming request'); - - const isolationScope = getIsolationScope().clone(); - const request = args[1] as IncomingMessage; - - const normalizedRequest = httpRequestToRequestData(request); - - // request.ip is non-standard but some frameworks set this - const ipAddress = (request as { ip?: string }).ip || request.socket?.remoteAddress; - - const url = request.url || '/'; - if (maxRequestBodySize !== 'none' && !ignoreRequestBody?.(url, request)) { - patchRequestToCaptureBody(request, isolationScope, maxRequestBodySize, INTEGRATION_NAME); - } - - // Update the isolation scope, isolate this request - isolationScope.setSDKProcessingMetadata({ normalizedRequest, ipAddress }); - - // attempt to update the scope's `transactionName` based on the request URL - // Ideally, framework instrumentations coming after the HttpInstrumentation - // update the transactionName once we get a parameterized route. - const httpMethod = (request.method || 'GET').toUpperCase(); - const httpTargetWithoutQueryFragment = stripUrlQueryAndFragment(url); - - const bestEffortTransactionName = `${httpMethod} ${httpTargetWithoutQueryFragment}`; - - isolationScope.setTransactionName(bestEffortTransactionName); - - return withIsolationScope(isolationScope, () => { - // Handle trace propagation using Sentry's continueTrace - // This replaces OpenTelemetry's propagation.extract() + context.with() - const sentryTrace = normalizedRequest.headers?.['sentry-trace']; - const baggage = normalizedRequest.headers?.['baggage']; - - return continueTrace( - { - sentryTrace: Array.isArray(sentryTrace) ? sentryTrace[0] : sentryTrace, - baggage: Array.isArray(baggage) ? baggage[0] : baggage, - }, - () => { - // Set propagationSpanId after continueTrace because it calls withScope + - // setPropagationContext internally, which would overwrite any previously set value. - getCurrentScope().getPropagationContext().propagationSpanId = generateSpanId(); - return target.apply(thisArg, args); - }, - ); - }); - }, - }); - - wrappedEmitFns.add(newEmit); - server.emit = newEmit; -} From c72f522aaf226d32885e76113d1fa1ca945c6f45 Mon Sep 17 00:00:00 2001 From: isaacs Date: Sat, 9 May 2026 10:45:44 -0700 Subject: [PATCH 4/5] fix(node-core): use core HTTP server subscription utility in httpServerIntegration Replace inline `instrumentServer` Proxy/emit-wrapping in `httpServerIntegration.ts` with core's `getHttpServerSubscriptions`. OTel-specific concerns (header propagation, double-wrap context guard, `_startSpanCallback` dispatch) move into a `wrapServerEmitRequest` callback that `instrumentServer` invokes inside the per-request lifecycle. Re-export `recordRequestSession` from core so existing test continues to pass. Duplicated request-isolation/session/body-capture plumbing removed, logic now lives in `@sentry/core`'s subscription factory. --- .../http/httpServerIntegration.ts | 286 ++++-------------- 1 file changed, 59 insertions(+), 227 deletions(-) diff --git a/packages/node-core/src/integrations/http/httpServerIntegration.ts b/packages/node-core/src/integrations/http/httpServerIntegration.ts index 1c08e9f4f16e..79bacf5c1342 100644 --- a/packages/node-core/src/integrations/http/httpServerIntegration.ts +++ b/packages/node-core/src/integrations/http/httpServerIntegration.ts @@ -1,28 +1,23 @@ -import type { ChannelListener } from 'node:diagnostics_channel'; import { subscribe } from 'node:diagnostics_channel'; -import type { EventEmitter } from 'node:events'; -import type { IncomingMessage, RequestOptions, Server, ServerResponse } from 'node:http'; -import type { Socket } from 'node:net'; +import type { RequestOptions } from 'node:http'; import { context, createContextKey, propagation } from '@opentelemetry/api'; -import type { AggregationCounts, Client, HttpIncomingMessage, Integration, IntegrationFn, Scope } from '@sentry/core'; +import type { HttpIncomingMessage, HttpServerResponse, Integration, IntegrationFn } from '@sentry/core'; import { - _INTERNAL_safeMathRandom, addNonEnumerableProperty, debug, - generateSpanId, - generateTraceId, getClient, - getCurrentScope, - getIsolationScope, - httpRequestToRequestData, - stripUrlQueryAndFragment, - withIsolationScope, + getHttpServerSubscriptions, + HTTP_ON_SERVER_REQUEST, + recordRequestSession, } from '@sentry/core'; +import type { RequestEventData } from '@sentry/core'; import { DEBUG_BUILD } from '../../debug-build'; -import type { NodeClient } from '../../sdk/client'; -import { patchRequestToCaptureBody } from '../../utils/captureRequestBody'; -type ServerEmit = typeof Server.prototype.emit; +// Re-export so existing test imports continue to work; the implementation now lives in core. +export { recordRequestSession }; + +const HTTP_SERVER_INSTRUMENTED_KEY = createContextKey('sentry_http_server_instrumented'); +const INTEGRATION_NAME = 'Http.Server'; // Inlining this type to not depend on newer TS types interface WeakRefImpl { @@ -34,20 +29,6 @@ type RequestWithOptionalStartSpanCallback = HttpIncomingMessage & { _startSpanCallback?: WeakRefImpl; }; -const HTTP_SERVER_INSTRUMENTED_KEY = createContextKey('sentry_http_server_instrumented'); -const INTEGRATION_NAME = 'Http.Server'; - -const clientToRequestSessionAggregatesMap = new Map< - Client, - { [timestampRoundedToSeconds: string]: { exited: number; crashed: number; errored: number } } ->(); - -// We keep track of emit functions we wrapped, to avoid double wrapping -// We do this instead of putting a non-enumerable property on the function, because -// sometimes the property seems to be migrated to forks of the emit function, which we do not want to happen -// This was the case in the nestjs-distributed-tracing E2E test -const wrappedEmitFns = new WeakSet(); - export interface HttpServerIntegrationOptions { /** * Whether the integration should create [Sessions](https://docs.sentry.io/product/releases/health/#sessions) for incoming requests to track the health and crash-free rate of your releases in Sentry. @@ -103,19 +84,59 @@ const _httpServerIntegration = ((options: HttpServerIntegrationOptions = {}) => sessions: options.sessions ?? true, sessionFlushingDelayMS: options.sessionFlushingDelayMS ?? 60_000, maxRequestBodySize: options.maxRequestBodySize ?? 'medium', - ignoreRequestBody: options.ignoreRequestBody, + // Cast: core uses HttpIncomingMessage; node consumers pass RequestOptions-typed callbacks. + // The two are structurally compatible for the fields the callback reads (url, method, headers). + ignoreRequestBody: options.ignoreRequestBody as + | ((url: string, request: HttpIncomingMessage) => boolean) + | undefined, + /** + * Hook called by core's `instrumentServer` to wrap the upstream `emit('request')` call. + * We use it to extract OTel context from request headers and re-enter the OTel context + * before the framework sees the request, so subsequent spans (e.g. from + * `httpServerSpansIntegration`) attach to the right trace. + */ + wrapServerEmitRequest( + request: HttpIncomingMessage, + response: HttpServerResponse, + normalizedRequest: RequestEventData, + next: () => void, + ): void { + const client = getClient(); + if (!client) return next(); + + // Guard against double-wrapping: if our wrapper somehow runs inside an already-instrumented + // context, just continue without re-extracting and re-emitting. + if (context.active().getValue(HTTP_SERVER_INSTRUMENTED_KEY)) { + return next(); + } + + const ctx = propagation + .extract(context.active(), normalizedRequest.headers) + .setValue(HTTP_SERVER_INSTRUMENTED_KEY, true); + + context.with(ctx, () => { + // httpServerSpansIntegration listens for this and may attach `_startSpanCallback` + // to the request to wrap span creation around the emit. + client.emit('httpServerRequest', request, response, normalizedRequest); + + const callback = (request as RequestWithOptionalStartSpanCallback)._startSpanCallback?.deref(); + if (callback) { + callback(() => { + next(); + return true; + }); + } else { + next(); + } + }); + }, }; return { name: INTEGRATION_NAME, setupOnce() { - const onHttpServerRequestStart = ((_data: unknown) => { - const data = _data as { server: Server }; - - instrumentServer(data.server, _options); - }) satisfies ChannelListener; - - subscribe('http.server.request.start', onHttpServerRequestStart); + const { [HTTP_ON_SERVER_REQUEST]: onHttpServerRequestStart } = getHttpServerSubscriptions(_options); + subscribe(HTTP_ON_SERVER_REQUEST, onHttpServerRequestStart); }, afterAllSetup(client) { if (DEBUG_BUILD && client.getIntegrationByName('Http')) { @@ -141,192 +162,3 @@ export const httpServerIntegration = _httpServerIntegration as ( name: 'HttpServer'; setupOnce: () => void; }; - -/** - * Instrument a server to capture incoming requests. - * - */ -function instrumentServer( - server: Server, - { - ignoreRequestBody, - maxRequestBodySize, - sessions, - sessionFlushingDelayMS, - }: { - ignoreRequestBody?: (url: string, request: IncomingMessage) => boolean; - maxRequestBodySize: 'small' | 'medium' | 'always' | 'none'; - sessions: boolean; - sessionFlushingDelayMS: number; - }, -): void { - // eslint-disable-next-line @typescript-eslint/unbound-method - const originalEmit: ServerEmit = server.emit; - - if (wrappedEmitFns.has(originalEmit)) { - return; - } - - const newEmit = new Proxy(originalEmit, { - apply(target, thisArg, args: [event: string, ...args: unknown[]]) { - // Only traces request events - if (args[0] !== 'request') { - return target.apply(thisArg, args); - } - - const client = getClient(); - - // Make sure we do not double execute our wrapper code, for edge cases... - // Without this check, if we double-wrap emit, for whatever reason, you'd get two http.server spans (one the children of the other) - if (context.active().getValue(HTTP_SERVER_INSTRUMENTED_KEY) || !client) { - return target.apply(thisArg, args); - } - - DEBUG_BUILD && debug.log(INTEGRATION_NAME, 'Handling incoming request'); - - const isolationScope = getIsolationScope().clone(); - const request = args[1] as IncomingMessage; - const response = args[2] as ServerResponse & { socket: Socket }; - - const normalizedRequest = httpRequestToRequestData(request); - - // request.ip is non-standard but some frameworks set this - const ipAddress = (request as { ip?: string }).ip || request.socket?.remoteAddress; - - const url = request.url || '/'; - if (maxRequestBodySize !== 'none' && !ignoreRequestBody?.(url, request)) { - patchRequestToCaptureBody(request, isolationScope, maxRequestBodySize, INTEGRATION_NAME); - } - - // Update the isolation scope, isolate this request - isolationScope.setSDKProcessingMetadata({ normalizedRequest, ipAddress }); - - // attempt to update the scope's `transactionName` based on the request URL - // Ideally, framework instrumentations coming after the HttpInstrumentation - // update the transactionName once we get a parameterized route. - const httpMethod = (request.method || 'GET').toUpperCase(); - const httpTargetWithoutQueryFragment = stripUrlQueryAndFragment(url); - - const bestEffortTransactionName = `${httpMethod} ${httpTargetWithoutQueryFragment}`; - - isolationScope.setTransactionName(bestEffortTransactionName); - - if (sessions && client) { - recordRequestSession(client, { - requestIsolationScope: isolationScope, - response, - sessionFlushingDelayMS: sessionFlushingDelayMS ?? 60_000, - }); - } - - return withIsolationScope(isolationScope, () => { - const newPropagationContext = { - traceId: generateTraceId(), - sampleRand: _INTERNAL_safeMathRandom(), - propagationSpanId: generateSpanId(), - }; - // - Set a fresh propagation context so each request gets a unique traceId. - // When there are incoming trace headers, propagation.extract() below sets a remote - // span on the OTel context which takes precedence in getTraceContextForScope(). - // - We can write directly to the current scope here because it is forked implicitly via - // `context.with` in `withIsolationScope` (See `SentryContextManager`). - // - explicitly making a deep copy to avoid mutation of original PC on the other scope - getCurrentScope().setPropagationContext({ ...newPropagationContext }); - isolationScope.setPropagationContext({ ...newPropagationContext }); - - const ctx = propagation - .extract(context.active(), normalizedRequest.headers) - .setValue(HTTP_SERVER_INSTRUMENTED_KEY, true); - - return context.with(ctx, () => { - // This is used (optionally) by the httpServerSpansIntegration to attach _startSpanCallback to the request object - client.emit('httpServerRequest', request, response, normalizedRequest); - - const callback = (request as RequestWithOptionalStartSpanCallback)._startSpanCallback?.deref(); - if (callback) { - return callback(() => target.apply(thisArg, args)); - } - return target.apply(thisArg, args); - }); - }); - }, - }); - - wrappedEmitFns.add(newEmit); - server.emit = newEmit; -} - -/** - * Starts a session and tracks it in the context of a given isolation scope. - * When the passed response is finished, the session is put into a task and is - * aggregated with other sessions that may happen in a certain time window - * (sessionFlushingDelayMs). - * - * The sessions are always aggregated by the client that is on the current scope - * at the time of ending the response (if there is one). - */ -// Exported for unit tests -export function recordRequestSession( - client: Client, - { - requestIsolationScope, - response, - sessionFlushingDelayMS, - }: { - requestIsolationScope: Scope; - response: EventEmitter; - sessionFlushingDelayMS?: number; - }, -): void { - requestIsolationScope.setSDKProcessingMetadata({ - requestSession: { status: 'ok' }, - }); - response.once('close', () => { - const requestSession = requestIsolationScope.getScopeData().sdkProcessingMetadata.requestSession; - - if (client && requestSession) { - DEBUG_BUILD && debug.log(`Recorded request session with status: ${requestSession.status}`); - - const roundedDate = new Date(); - roundedDate.setSeconds(0, 0); - const dateBucketKey = roundedDate.toISOString(); - - const existingClientAggregate = clientToRequestSessionAggregatesMap.get(client); - const bucket = existingClientAggregate?.[dateBucketKey] || { exited: 0, crashed: 0, errored: 0 }; - bucket[({ ok: 'exited', crashed: 'crashed', errored: 'errored' } as const)[requestSession.status]]++; - - if (existingClientAggregate) { - existingClientAggregate[dateBucketKey] = bucket; - } else { - DEBUG_BUILD && debug.log('Opened new request session aggregate.'); - const newClientAggregate = { [dateBucketKey]: bucket }; - clientToRequestSessionAggregatesMap.set(client, newClientAggregate); - - const flushPendingClientAggregates = (): void => { - clearTimeout(timeout); - unregisterClientFlushHook(); - clientToRequestSessionAggregatesMap.delete(client); - - const aggregatePayload: AggregationCounts[] = Object.entries(newClientAggregate).map( - ([timestamp, value]) => ({ - started: timestamp, - exited: value.exited, - errored: value.errored, - crashed: value.crashed, - }), - ); - client.sendSession({ aggregates: aggregatePayload }); - }; - - const unregisterClientFlushHook = client.on('flush', () => { - DEBUG_BUILD && debug.log('Sending request session aggregate due to client flush'); - flushPendingClientAggregates(); - }); - const timeout = setTimeout(() => { - DEBUG_BUILD && debug.log('Sending request session aggregate due to flushing schedule'); - flushPendingClientAggregates(); - }, sessionFlushingDelayMS).unref(); - } - } - }); -} From 2954cdcaf1a875f317d25293d54c7d909a9f9e01 Mon Sep 17 00:00:00 2001 From: isaacs Date: Sat, 9 May 2026 10:48:34 -0700 Subject: [PATCH 5/5] chore(node-core): drop unused captureRequestBody `patchRequestToCaptureBody` lives in `@sentry/core` now and both the light SDK and the OTel-based `httpServerIntegration` consume it from there. --- .../node-core/src/utils/captureRequestBody.ts | 119 ------------------ 1 file changed, 119 deletions(-) delete mode 100644 packages/node-core/src/utils/captureRequestBody.ts diff --git a/packages/node-core/src/utils/captureRequestBody.ts b/packages/node-core/src/utils/captureRequestBody.ts deleted file mode 100644 index 7afb1e40c530..000000000000 --- a/packages/node-core/src/utils/captureRequestBody.ts +++ /dev/null @@ -1,119 +0,0 @@ -import type { IncomingMessage } from 'node:http'; -import type { Scope } from '@sentry/core'; -import { debug, getMaxBodyByteLength, type MaxRequestBodySize } from '@sentry/core'; -import { DEBUG_BUILD } from '../debug-build'; - -/** - * This method patches the request object to capture the body. - * Instead of actually consuming the streamed body ourselves, which has potential side effects, - * we monkey patch `req.on('data')` to intercept the body chunks. - * This way, we only read the body if the user also consumes the body, ensuring we do not change any behavior in unexpected ways. - */ -export function patchRequestToCaptureBody( - req: IncomingMessage, - isolationScope: Scope, - maxIncomingRequestBodySize: Exclude, - integrationName: string, -): void { - let bodyByteLength = 0; - const chunks: Buffer[] = []; - - DEBUG_BUILD && debug.log(integrationName, 'Patching request.on'); - - /** - * We need to keep track of the original callbacks, in order to be able to remove listeners again. - * Since `off` depends on having the exact same function reference passed in, we need to be able to map - * original listeners to our wrapped ones. - */ - const callbackMap = new WeakMap(); - - const maxBodySize = getMaxBodyByteLength(maxIncomingRequestBodySize); - - try { - // eslint-disable-next-line @typescript-eslint/unbound-method - req.on = new Proxy(req.on, { - apply: (target, thisArg, args: Parameters) => { - const [event, listener, ...restArgs] = args; - - if (event === 'data') { - DEBUG_BUILD && - debug.log(integrationName, `Handling request.on("data") with maximum body size of ${maxBodySize}b`); - - const callback = new Proxy(listener, { - apply: (target, thisArg, args: Parameters) => { - try { - const chunk = args[0] as Buffer | string; - const bufferifiedChunk = Buffer.from(chunk); - - if (bodyByteLength < maxBodySize) { - chunks.push(bufferifiedChunk); - bodyByteLength += bufferifiedChunk.byteLength; - } else if (DEBUG_BUILD) { - debug.log( - integrationName, - `Dropping request body chunk because maximum body length of ${maxBodySize}b is exceeded.`, - ); - } - } catch (_err) { - DEBUG_BUILD && debug.error(integrationName, 'Encountered error while storing body chunk.'); - } - - return Reflect.apply(target, thisArg, args); - }, - }); - - callbackMap.set(listener, callback); - - return Reflect.apply(target, thisArg, [event, callback, ...restArgs]); - } - - return Reflect.apply(target, thisArg, args); - }, - }); - - // Ensure we also remove callbacks correctly - // eslint-disable-next-line @typescript-eslint/unbound-method - req.off = new Proxy(req.off, { - apply: (target, thisArg, args: Parameters) => { - const [, listener] = args; - - const callback = callbackMap.get(listener); - if (callback) { - callbackMap.delete(listener); - - const modifiedArgs = args.slice(); - modifiedArgs[1] = callback; - return Reflect.apply(target, thisArg, modifiedArgs); - } - - return Reflect.apply(target, thisArg, args); - }, - }); - - req.on('end', () => { - try { - const body = Buffer.concat(chunks).toString('utf-8'); - if (body) { - // Using Buffer.byteLength here, because the body may contain characters that are not 1 byte long - const bodyByteLength = Buffer.byteLength(body, 'utf-8'); - const truncatedBody = - bodyByteLength > maxBodySize - ? `${Buffer.from(body) - .subarray(0, maxBodySize - 3) - .toString('utf-8')}...` - : body; - - isolationScope.setSDKProcessingMetadata({ normalizedRequest: { data: truncatedBody } }); - } - } catch (error) { - if (DEBUG_BUILD) { - debug.error(integrationName, 'Error building captured request body', error); - } - } - }); - } catch (error) { - if (DEBUG_BUILD) { - debug.error(integrationName, 'Error patching request to capture body', error); - } - } -}