Skip to content

Commit e68a576

Browse files
committed
Add OTEL operation attributes to queue spans
1 parent 9accd4b commit e68a576

File tree

3 files changed

+62
-4
lines changed

3 files changed

+62
-4
lines changed

dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ test('Sends queue publish spans with `schema(...).rpc(...)`', async ({ page, bas
306306
data: {
307307
'messaging.destination.name': 'todos',
308308
'messaging.message.id': '1',
309+
'messaging.message.body.size': expect.any(Number),
309310
},
310311
});
311312
});
@@ -355,6 +356,7 @@ test('Sends queue publish spans with `rpc(...)`', async ({ page, baseURL }) => {
355356
data: {
356357
'messaging.destination.name': 'todos',
357358
'messaging.message.id': '2',
359+
'messaging.message.body.size': expect.any(Number),
358360
},
359361
});
360362
});
@@ -459,6 +461,7 @@ test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, bas
459461
data: {
460462
'messaging.destination.name': 'todos',
461463
'messaging.message.id': '1',
464+
'messaging.message.body.size': expect.any(Number),
462465
},
463466
});
464467
});
@@ -563,6 +566,7 @@ test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => {
563566
data: {
564567
'messaging.destination.name': 'todos',
565568
'messaging.message.id': '2',
569+
'messaging.message.body.size': expect.any(Number),
566570
},
567571
});
568572
});
@@ -683,6 +687,7 @@ test('Sends queue batch publish spans with `rpc(...)`', async ({ page, baseURL }
683687
'messaging.destination.name': 'todos',
684688
'messaging.message.id': expect.stringMatching(/^\d+,\d+$/),
685689
'messaging.batch.message_count': 2,
690+
'messaging.message.body.size': expect.any(Number),
686691
},
687692
});
688693
});

packages/core/src/integrations/supabase.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,9 +491,15 @@ function _calculateBatchLatency(messages: Array<{ enqueued_at?: string }>): numb
491491
* @param span - The span to process
492492
* @param res - The Supabase response
493493
* @param queueName - The name of the queue
494+
* @param operationName - The queue operation name (e.g., 'pop', 'read', 'receive')
494495
* @returns The original response
495496
*/
496-
function _processConsumerSpan(span: Span, res: SupabaseResponse, queueName: string | undefined): SupabaseResponse {
497+
function _processConsumerSpan(
498+
span: Span,
499+
res: SupabaseResponse,
500+
queueName: string | undefined,
501+
operationName: string,
502+
): SupabaseResponse {
497503
// Calculate latency for single message or batch average
498504
let latency: number | undefined;
499505
const isBatch = Array.isArray(res.data) && res.data.length > 1;
@@ -522,6 +528,10 @@ function _processConsumerSpan(span: Span, res: SupabaseResponse, queueName: stri
522528

523529
// Note: messaging.destination.name is already set in initial span attributes
524530

531+
// Set OTEL messaging semantic attributes
532+
span.setAttribute('messaging.operation.type', 'process');
533+
span.setAttribute('messaging.operation.name', operationName);
534+
525535
if (latency !== undefined) {
526536
span.setAttribute('messaging.message.receive.latency', latency);
527537
}
@@ -540,6 +550,7 @@ function _processConsumerSpan(span: Span, res: SupabaseResponse, queueName: stri
540550
const breadcrumbData: Record<string, unknown> = {};
541551
if (messageId) breadcrumbData['messaging.message.id'] = messageId;
542552
if (queueName) breadcrumbData['messaging.destination.name'] = queueName;
553+
if (messageBodySize !== undefined) breadcrumbData['messaging.message.body.size'] = messageBodySize;
543554
_createQueueBreadcrumb('queue.process', queueName, breadcrumbData);
544555

545556
// Handle errors in the response
@@ -666,7 +677,7 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
666677
},
667678
span => {
668679
try {
669-
const processedResponse = _processConsumerSpan(span, res, queueName);
680+
const processedResponse = _processConsumerSpan(span, res, queueName, operationName);
670681

671682
DEBUG_BUILD && debug.log('Consumer span processed successfully', { queueName });
672683

@@ -788,6 +799,8 @@ function _instrumentRpcProducer(target: unknown, thisArg: unknown, argumentsList
788799
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.publish',
789800
'messaging.system': 'supabase',
790801
'messaging.destination.name': queueName,
802+
'messaging.operation.name': operationName,
803+
'messaging.operation.type': 'publish',
791804
...(messageBodySize !== undefined && { 'messaging.message.body.size': messageBodySize }),
792805
},
793806
},

packages/core/test/lib/integrations/supabase-queues.test.ts

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -890,9 +890,49 @@ describe('Supabase Queue Instrumentation', () => {
890890
name: 'publish attr-test-queue',
891891
op: 'queue.publish',
892892
attributes: expect.objectContaining({
893-
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.queue.supabase.producer',
893+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase.queue.producer',
894894
'messaging.system': 'supabase',
895895
'messaging.destination.name': 'attr-test-queue',
896+
'messaging.operation.name': 'send',
897+
'messaging.operation.type': 'publish',
898+
'messaging.message.body.size': expect.any(Number),
899+
}),
900+
}),
901+
);
902+
});
903+
904+
it('should set correct attributes on producer span for batch send', async () => {
905+
const mockResponse: SupabaseResponse = {
906+
data: [{ msg_id: 790 }, { msg_id: 791 }],
907+
status: 200,
908+
};
909+
910+
mockRpcFunction.mockResolvedValue(mockResponse);
911+
instrumentSupabaseClient(mockSupabaseClient);
912+
913+
const startSpanSpy = vi.spyOn(Tracing, 'startSpan');
914+
915+
await startSpan({ name: 'test-transaction' }, async () => {
916+
await mockSupabaseClient.rpc('send_batch', {
917+
queue_name: 'attr-test-queue-batch',
918+
messages: [{ test: 'data1' }, { test: 'data2' }],
919+
});
920+
});
921+
922+
// Find the queue.publish span call
923+
const publishSpanCall = startSpanSpy.mock.calls.find(call => call[0]?.name === 'publish attr-test-queue-batch');
924+
925+
expect(publishSpanCall).toBeDefined();
926+
expect(publishSpanCall?.[0]).toEqual(
927+
expect.objectContaining({
928+
name: 'publish attr-test-queue-batch',
929+
op: 'queue.publish',
930+
attributes: expect.objectContaining({
931+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase.queue.producer',
932+
'messaging.system': 'supabase',
933+
'messaging.destination.name': 'attr-test-queue-batch',
934+
'messaging.operation.name': 'send_batch',
935+
'messaging.operation.type': 'publish',
896936
'messaging.message.body.size': expect.any(Number),
897937
}),
898938
}),
@@ -929,7 +969,7 @@ describe('Supabase Queue Instrumentation', () => {
929969
name: 'process consumer-attr-queue',
930970
op: 'queue.process',
931971
attributes: expect.objectContaining({
932-
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.queue.supabase.consumer',
972+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase.queue.consumer',
933973
'messaging.system': 'supabase',
934974
'messaging.destination.name': 'consumer-attr-queue',
935975
}),

0 commit comments

Comments
 (0)