Skip to content

Commit 23c89bb

Browse files
committed
Improve data integrity and distributed tracing
1 parent 09ef20a commit 23c89bb

File tree

2 files changed

+891
-75
lines changed

2 files changed

+891
-75
lines changed

packages/core/src/integrations/supabase.ts

Lines changed: 177 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
/* eslint-disable max-lines */
55
import { addBreadcrumb } from '../breadcrumbs';
6-
import { getClient, getCurrentScope } from '../currentScopes';
6+
import { getClient, getCurrentScope, withIsolationScope } from '../currentScopes';
77
import { DEBUG_BUILD } from '../debug-build';
88
import { captureException } from '../exports';
99
import { defineIntegration } from '../integration';
@@ -15,7 +15,7 @@ import {
1515
} from '../tracing/dynamicSamplingContext';
1616
import type { IntegrationFn } from '../types-hoist/integration';
1717
import type { Span, SpanAttributes } from '../types-hoist/span';
18-
import { dynamicSamplingContextToSentryBaggageHeader } from '../utils/baggage';
18+
import { dynamicSamplingContextToSentryBaggageHeader, parseBaggageHeader } from '../utils/baggage';
1919
import { debug } from '../utils/debug-logger';
2020
import { isPlainObject } from '../utils/is';
2121
import { addExceptionMechanism } from '../utils/misc';
@@ -245,6 +245,27 @@ export function translateFiltersIntoMethods(key: string, query: string): string
245245
return `${method}(${key}, ${value.join('.')})`;
246246
}
247247

248+
/**
249+
* Normalizes RPC function names by stripping schema prefixes.
250+
* Handles schema-qualified names like 'pgmq.send' → 'send'
251+
*
252+
* @param name - The RPC function name, potentially schema-qualified
253+
* @returns The normalized function name without schema prefix
254+
*/
255+
function _normalizeRpcFunctionName(name: unknown): string {
256+
if (!name || typeof name !== 'string') {
257+
return '';
258+
}
259+
260+
// Strip schema prefix: 'pgmq.send' → 'send', 'my_schema.pop' → 'pop'
261+
if (name.includes('.')) {
262+
const parts = name.split('.');
263+
return parts[parts.length - 1] || '';
264+
}
265+
266+
return name;
267+
}
268+
248269
/**
249270
* Creates a proxy handler for RPC methods to instrument queue operations.
250271
* This handler is shared between direct RPC calls and RPC calls via schema.
@@ -260,9 +281,10 @@ function _createRpcProxyHandler(): ProxyHandler<(...args: unknown[]) => Promise<
260281
): Promise<unknown> {
261282
// Add try-catch for safety
262283
try {
263-
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch';
264-
const isConsumerSpan =
265-
argumentsList[0] === 'pop' || argumentsList[0] === 'receive' || argumentsList[0] === 'read';
284+
// Normalize RPC function name to handle schema-qualified names (e.g., 'pgmq.send' → 'send')
285+
const normalizedName = _normalizeRpcFunctionName(argumentsList[0]);
286+
const isProducerSpan = normalizedName === 'send' || normalizedName === 'send_batch';
287+
const isConsumerSpan = normalizedName === 'pop' || normalizedName === 'receive' || normalizedName === 'read';
266288

267289
if (!isProducerSpan && !isConsumerSpan) {
268290
const result = Reflect.apply(target, thisArg, argumentsList);
@@ -491,15 +513,9 @@ function _calculateBatchLatency(messages: Array<{ enqueued_at?: string }>): numb
491513
* @param span - The span to process
492514
* @param res - The Supabase response
493515
* @param queueName - The name of the queue
494-
* @param operationName - The queue operation name (e.g., 'pop', 'read', 'receive')
495516
* @returns The original response
496517
*/
497-
function _processConsumerSpan(
498-
span: Span,
499-
res: SupabaseResponse,
500-
queueName: string | undefined,
501-
operationName: string,
502-
): SupabaseResponse {
518+
function _processConsumerSpan(span: Span, res: SupabaseResponse, queueName: string | undefined): SupabaseResponse {
503519
// Calculate latency for single message or batch average
504520
let latency: number | undefined;
505521
const isBatch = Array.isArray(res.data) && res.data.length > 1;
@@ -526,11 +542,8 @@ function _processConsumerSpan(
526542
span.setAttribute('messaging.message.id', messageId);
527543
}
528544

529-
// Note: messaging.destination.name is already set in initial span attributes
530-
531-
// Set OTEL messaging semantic attributes
532-
span.setAttribute('messaging.operation.type', 'process');
533-
span.setAttribute('messaging.operation.name', operationName);
545+
// Note: messaging.destination.name, messaging.operation.name, and messaging.operation.type
546+
// are already set in initial span attributes
534547

535548
if (latency !== undefined) {
536549
span.setAttribute('messaging.message.receive.latency', latency);
@@ -593,7 +606,8 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
593606
return Reflect.apply(target as (...args: unknown[]) => Promise<unknown>, thisArg, argumentsList);
594607
}
595608

596-
const [operationName, queueParams] = argumentsList as [string, unknown];
609+
const operationName = _normalizeRpcFunctionName(argumentsList[0]);
610+
const queueParams = argumentsList[1];
597611

598612
if (!isPlainObject(queueParams)) {
599613
return Reflect.apply(target as (...args: unknown[]) => Promise<unknown>, thisArg, argumentsList);
@@ -618,6 +632,8 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
618632
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process',
619633
'messaging.system': 'supabase',
620634
'messaging.destination.name': queueName,
635+
'messaging.operation.name': operationName,
636+
'messaging.operation.type': 'process',
621637
} as const;
622638
const spanStartTime = timestampInSeconds();
623639

@@ -631,7 +647,8 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
631647
.then(res => {
632648
DEBUG_BUILD && debug.log('Consumer RPC call completed', { queueName, hasData: !!res.data });
633649

634-
const { sentryTrace } = _extractTraceAndBaggageFromMessage(res.data?.[0]?.message || {});
650+
// Extract trace context from message for distributed tracing
651+
const { sentryTrace, baggage } = _extractTraceAndBaggageFromMessage(res.data?.[0]?.message || {});
635652

636653
if (Array.isArray(res.data)) {
637654
res.data.forEach(item => {
@@ -641,8 +658,12 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
641658
});
642659
}
643660

644-
// Extract producer span context for span link (before creating consumer span)
661+
// Extract producer trace context for span link and propagation
645662
let producerSpanContext: { traceId: string; spanId: string; traceFlags: number } | undefined;
663+
let producerPropagationContext:
664+
| { traceId: string; parentSpanId: string; sampled: boolean; dsc?: Record<string, string> }
665+
| undefined;
666+
646667
if (sentryTrace) {
647668
const traceparentData = extractTraceparentData(sentryTrace);
648669
if (traceparentData?.traceId && traceparentData?.parentSpanId) {
@@ -655,55 +676,117 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
655676
spanId: traceparentData.parentSpanId,
656677
traceFlags,
657678
};
679+
680+
// Prepare propagation context for isolated scope
681+
producerPropagationContext = {
682+
traceId: traceparentData.traceId,
683+
parentSpanId: traceparentData.parentSpanId,
684+
sampled: traceparentData.parentSampled ?? false,
685+
dsc: baggage ? parseBaggageHeader(baggage) : undefined,
686+
};
658687
}
659688
}
660689

661-
const runWithSpan = (): SupabaseResponse =>
662-
startSpan(
663-
{
664-
name: spanName,
665-
op: 'queue.process',
666-
startTime: spanStartTime,
667-
attributes: spanAttributes,
668-
// Add span link to producer span for distributed tracing across async queue boundary
669-
links: producerSpanContext
670-
? [
671-
{
672-
context: producerSpanContext,
673-
attributes: { 'sentry.link.type': 'queue.producer' },
674-
},
675-
]
676-
: undefined,
677-
},
678-
span => {
679-
try {
680-
const processedResponse = _processConsumerSpan(span, res, queueName, operationName);
690+
const runWithSpan = (): SupabaseResponse => {
691+
// If we have producer trace context, use isolated scope to prevent pollution
692+
if (producerPropagationContext) {
693+
return withIsolationScope(isolatedScope => {
694+
// Set producer's propagation context in isolated scope
695+
// This ensures the consumer span continues the producer's trace
696+
isolatedScope.setPropagationContext({
697+
...producerPropagationContext,
698+
sampleRand: Math.random(), // Generate new sample rand for current execution context
699+
});
681700

682-
DEBUG_BUILD && debug.log('Consumer span processed successfully', { queueName });
701+
// Force transaction to make it a root span (not child of current span)
702+
// This is critical to prevent the consumer span from becoming a child of
703+
// an active HTTP request or other unrelated transaction
704+
return startSpan(
705+
{
706+
name: spanName,
707+
op: 'queue.process',
708+
startTime: spanStartTime,
709+
attributes: spanAttributes,
710+
forceTransaction: true, // Makes this a root span, not a child
711+
// Add span link to producer span for distributed tracing across async queue boundary
712+
links: producerSpanContext
713+
? [
714+
{
715+
context: producerSpanContext,
716+
attributes: { 'sentry.link.type': 'queue.producer' },
717+
},
718+
]
719+
: undefined,
720+
},
721+
span => {
722+
try {
723+
const processedResponse = _processConsumerSpan(span, res, queueName);
683724

684-
return processedResponse;
685-
} catch (err: unknown) {
686-
DEBUG_BUILD && debug.log('Consumer span processing failed', { queueName, error: err });
725+
DEBUG_BUILD && debug.log('Consumer span processed successfully', { queueName });
687726

688-
captureException(err, scope => {
689-
scope.addEventProcessor(e => {
690-
addExceptionMechanism(e, {
691-
handled: false,
692-
type: 'auto.db.supabase.queue',
727+
return processedResponse;
728+
} catch (err: unknown) {
729+
DEBUG_BUILD && debug.log('Consumer span processing failed', { queueName, error: err });
730+
731+
captureException(err, scope => {
732+
scope.addEventProcessor(e => {
733+
addExceptionMechanism(e, {
734+
handled: false,
735+
type: 'auto.db.supabase.queue',
736+
});
737+
return e;
738+
});
739+
scope.setContext('supabase', { queueName });
740+
return scope;
693741
});
694-
return e;
742+
743+
span.setStatus({ code: SPAN_STATUS_ERROR });
744+
throw err;
745+
}
746+
},
747+
);
748+
});
749+
// Isolated scope automatically discarded here, original scope restored
750+
} else {
751+
// No producer context, create regular span without isolation
752+
return startSpan(
753+
{
754+
name: spanName,
755+
op: 'queue.process',
756+
startTime: spanStartTime,
757+
attributes: spanAttributes,
758+
},
759+
span => {
760+
try {
761+
const processedResponse = _processConsumerSpan(span, res, queueName);
762+
763+
DEBUG_BUILD && debug.log('Consumer span processed successfully', { queueName });
764+
765+
return processedResponse;
766+
} catch (err: unknown) {
767+
DEBUG_BUILD && debug.log('Consumer span processing failed', { queueName, error: err });
768+
769+
captureException(err, scope => {
770+
scope.addEventProcessor(e => {
771+
addExceptionMechanism(e, {
772+
handled: false,
773+
type: 'auto.db.supabase.queue',
774+
});
775+
return e;
776+
});
777+
scope.setContext('supabase', { queueName });
778+
return scope;
695779
});
696-
scope.setContext('supabase', { queueName });
697-
return scope;
698-
});
699780

700-
span.setStatus({ code: SPAN_STATUS_ERROR });
701-
throw err;
702-
}
703-
},
704-
);
781+
span.setStatus({ code: SPAN_STATUS_ERROR });
782+
throw err;
783+
}
784+
},
785+
);
786+
}
787+
};
705788

706-
// Create consumer span with link to producer (not as a child)
789+
// Create consumer span with isolated scope and forced transaction
707790
return runWithSpan();
708791
})
709792
.catch((err: unknown) => {
@@ -777,7 +860,7 @@ function _instrumentRpcProducer(target: unknown, thisArg: unknown, argumentsList
777860
return Reflect.apply(target as (...args: unknown[]) => Promise<unknown>, thisArg, argumentsList);
778861
}
779862

780-
const operationName = argumentsList[0] as 'send' | 'send_batch';
863+
const operationName = _normalizeRpcFunctionName(argumentsList[0]) as 'send' | 'send_batch';
781864
const isBatch = operationName === 'send_batch';
782865

783866
DEBUG_BUILD &&
@@ -826,22 +909,40 @@ function _instrumentRpcProducer(target: unknown, thisArg: unknown, argumentsList
826909
];
827910

828911
// Inject trace context into messages (avoid mutation)
912+
// Only inject into plain objects to prevent payload corruption (primitives, arrays)
829913
if (sentryArgumentsQueueParams?.message) {
830-
sentryArgumentsQueueParams.message = {
831-
...sentryArgumentsQueueParams.message,
832-
_sentry: {
833-
sentry_trace: sentryTrace,
834-
baggage: sentryBaggage,
835-
},
836-
};
914+
if (isPlainObject(sentryArgumentsQueueParams.message)) {
915+
sentryArgumentsQueueParams.message = {
916+
...sentryArgumentsQueueParams.message,
917+
_sentry: {
918+
sentry_trace: sentryTrace,
919+
baggage: sentryBaggage,
920+
},
921+
};
922+
} else {
923+
DEBUG_BUILD &&
924+
debug.warn(
925+
'Skipping trace propagation for non-object message payload. PGMQ supports primitives and arrays, but trace context can only be injected into plain objects.',
926+
);
927+
}
837928
} else if (sentryArgumentsQueueParams?.messages) {
838-
sentryArgumentsQueueParams.messages = sentryArgumentsQueueParams.messages.map(message => ({
839-
...message,
840-
_sentry: {
841-
sentry_trace: sentryTrace,
842-
baggage: sentryBaggage,
843-
},
844-
}));
929+
sentryArgumentsQueueParams.messages = sentryArgumentsQueueParams.messages.map(message => {
930+
if (isPlainObject(message)) {
931+
return {
932+
...message,
933+
_sentry: {
934+
sentry_trace: sentryTrace,
935+
baggage: sentryBaggage,
936+
},
937+
};
938+
} else {
939+
DEBUG_BUILD &&
940+
debug.warn(
941+
'Skipping trace propagation for non-object message in batch. PGMQ supports primitives and arrays, but trace context can only be injected into plain objects.',
942+
);
943+
return message;
944+
}
945+
});
845946
}
846947

847948
argumentsList[1] = sentryArgumentsQueueParams;
@@ -1101,7 +1202,8 @@ function _createInstrumentedPostgRESTThen(
11011202
const rpcIndex = pathParts.indexOf('rpc');
11021203
const rpcFunctionName = rpcIndex !== -1 && pathParts.length > rpcIndex + 1 ? pathParts[rpcIndex + 1] : undefined;
11031204

1104-
if (rpcFunctionName && QUEUE_RPC_OPERATIONS.has(rpcFunctionName)) {
1205+
// Normalize RPC function name to handle schema-qualified names (e.g., 'pgmq.send' → 'send')
1206+
if (rpcFunctionName && QUEUE_RPC_OPERATIONS.has(_normalizeRpcFunctionName(rpcFunctionName))) {
11051207
// Queue RPC calls are instrumented in the dedicated queue instrumentation.
11061208
return Reflect.apply(target, thisArg, argumentsList);
11071209
}

0 commit comments

Comments
 (0)