diff --git a/sentry-opentelemetry/sentry-opentelemetry-core/api/sentry-opentelemetry-core.api b/sentry-opentelemetry/sentry-opentelemetry-core/api/sentry-opentelemetry-core.api index b51c8cc39bc..847d69bca1b 100644 --- a/sentry-opentelemetry/sentry-opentelemetry-core/api/sentry-opentelemetry-core.api +++ b/sentry-opentelemetry/sentry-opentelemetry-core/api/sentry-opentelemetry-core.api @@ -149,7 +149,7 @@ public final class io/sentry/opentelemetry/SentrySpanProcessor : io/opentelemetr public final class io/sentry/opentelemetry/SpanDescriptionExtractor { public fun ()V - public fun extractSpanInfo (Lio/opentelemetry/sdk/trace/data/SpanData;Lio/sentry/opentelemetry/IOtelSpanWrapper;)Lio/sentry/opentelemetry/OtelSpanInfo; + public fun extractSpanInfo (Lio/opentelemetry/sdk/trace/data/SpanData;Lio/sentry/opentelemetry/IOtelSpanWrapper;Lio/sentry/SentryOptions;)Lio/sentry/opentelemetry/OtelSpanInfo; } public final class io/sentry/opentelemetry/SpanNode { diff --git a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java index 680177f8451..e7fc873908a 100644 --- a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java +++ b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java @@ -12,6 +12,7 @@ import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.sdk.trace.export.SpanExporter; import io.opentelemetry.semconv.HttpAttributes; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import io.opentelemetry.semconv.incubating.ProcessIncubatingAttributes; import io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes; import io.sentry.Baggage; @@ -200,7 +201,7 @@ private void createAndFinishSpanForOtelSpan( final @Nullable IOtelSpanWrapper sentrySpanMaybe = spanStorage.getSentrySpan(spanData.getSpanContext()); final @NotNull OtelSpanInfo spanInfo = - spanDescriptionExtractor.extractSpanInfo(spanData, sentrySpanMaybe); + spanDescriptionExtractor.extractSpanInfo(spanData, sentrySpanMaybe, scopes.getOptions()); scopes .getOptions() @@ -294,7 +295,7 @@ private void transferSpanDetails( final @NotNull IScopes scopesToUse = scopesToUseBeforeForking.forkedCurrentScope("SentrySpanExporter.createTransaction"); final @NotNull OtelSpanInfo spanInfo = - spanDescriptionExtractor.extractSpanInfo(span, sentrySpanMaybe); + spanDescriptionExtractor.extractSpanInfo(span, sentrySpanMaybe, scopesToUse.getOptions()); scopesToUse .getOptions() @@ -361,6 +362,23 @@ private void transferSpanDetails( maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_ID); maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_NAME); + // Root transactions don't bulk-copy OTel attributes into span data (unlike child spans). + // The Sentry Queues product reads `trace.data.messaging.*`, so messaging attributes must + // be explicitly transferred for consumer root transactions to show up correctly. These are + // operational metadata (no payload contents) and are safe to transfer unconditionally. + maybeTransferOtelAttribute( + span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_SYSTEM); + maybeTransferOtelAttribute( + span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME); + maybeTransferOtelAttribute( + span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE); + maybeTransferOtelAttribute( + span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID); + maybeTransferOtelAttribute( + span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE); + maybeTransferOtelAttribute( + span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_MESSAGE_ENVELOPE_SIZE); + scopesToUse.configureScope( ScopeType.CURRENT, scope -> attributesExtractor.extract(span, scope, scopesToUse.getOptions())); diff --git a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanProcessor.java b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanProcessor.java index 9c6a51f17c3..31bd6368318 100644 --- a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanProcessor.java +++ b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanProcessor.java @@ -297,7 +297,7 @@ private boolean isSentryRequest(final @NotNull ReadableSpan otelSpan) { private void updateTransactionWithOtelData( final @NotNull ITransaction sentryTransaction, final @NotNull ReadableSpan otelSpan) { final @NotNull OtelSpanInfo otelSpanInfo = - spanDescriptionExtractor.extractSpanInfo(otelSpan.toSpanData(), null); + spanDescriptionExtractor.extractSpanInfo(otelSpan.toSpanData(), null, scopes.getOptions()); sentryTransaction.setOperation(otelSpanInfo.getOp()); String transactionName = otelSpanInfo.getDescription(); sentryTransaction.setName( @@ -334,7 +334,7 @@ private void updateSpanWithOtelData( }); final @NotNull OtelSpanInfo otelSpanInfo = - spanDescriptionExtractor.extractSpanInfo(otelSpan.toSpanData(), null); + spanDescriptionExtractor.extractSpanInfo(otelSpan.toSpanData(), null, scopes.getOptions()); sentrySpan.setOperation(otelSpanInfo.getOp()); sentrySpan.setDescription(otelSpanInfo.getDescription()); } diff --git a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java index b66555d68c9..90db227505d 100644 --- a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java +++ b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java @@ -7,6 +7,8 @@ import io.opentelemetry.semconv.UrlAttributes; import io.opentelemetry.semconv.incubating.DbIncubatingAttributes; import io.opentelemetry.semconv.incubating.HttpIncubatingAttributes; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import io.sentry.SentryOptions; import io.sentry.protocol.TransactionNameSource; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; @@ -17,9 +19,19 @@ public final class SpanDescriptionExtractor { @SuppressWarnings("deprecation") public @NotNull OtelSpanInfo extractSpanInfo( - final @NotNull SpanData otelSpan, final @Nullable IOtelSpanWrapper sentrySpan) { + final @NotNull SpanData otelSpan, + final @Nullable IOtelSpanWrapper sentrySpan, + final @NotNull SentryOptions options) { final @NotNull Attributes attributes = otelSpan.getAttributes(); + if (options.isEnableQueueTracing()) { + final @Nullable String messagingSystem = + attributes.get(MessagingIncubatingAttributes.MESSAGING_SYSTEM); + if (messagingSystem != null) { + return descriptionForMessagingSystem(otelSpan); + } + } + final @Nullable String httpMethod = attributes.get(HttpAttributes.HTTP_REQUEST_METHOD); if (httpMethod != null) { return descriptionForHttpMethod(otelSpan, httpMethod); @@ -91,6 +103,57 @@ private static boolean isRootSpan(SpanData otelSpan) { return !otelSpan.getParentSpanContext().isValid() || otelSpan.getParentSpanContext().isRemote(); } + @SuppressWarnings("deprecation") + private OtelSpanInfo descriptionForMessagingSystem(final @NotNull SpanData otelSpan) { + final @NotNull Attributes attributes = otelSpan.getAttributes(); + final @NotNull String op = opForMessaging(otelSpan); + final @Nullable String destination = + attributes.get(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME); + final @NotNull String description = destination != null ? destination : otelSpan.getName(); + return new OtelSpanInfo(op, description, TransactionNameSource.TASK); + } + + @SuppressWarnings("deprecation") + private @NotNull String opForMessaging(final @NotNull SpanData otelSpan) { + final @NotNull Attributes attributes = otelSpan.getAttributes(); + // Prefer `messaging.operation.type` (current OTel semconv), fall back to legacy + // `messaging.operation`. OTel's SpanKind.CONSUMER is overloaded for both `receive` and + // `process`, so attribute-first mapping is required. SpanKind is used only as a last resort. + @Nullable + String operationType = attributes.get(MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE); + if (operationType == null) { + operationType = attributes.get(MessagingIncubatingAttributes.MESSAGING_OPERATION); + } + if (operationType != null) { + switch (operationType) { + case "publish": + case "send": + return "queue.publish"; + case "create": + return "queue.create"; + case "receive": + return "queue.receive"; + case "process": + case "deliver": + return "queue.process"; + case "settle": + return "queue.settle"; + default: + // fall through to SpanKind mapping + break; + } + } + + final @NotNull SpanKind kind = otelSpan.getKind(); + if (SpanKind.PRODUCER.equals(kind)) { + return "queue.publish"; + } + if (SpanKind.CONSUMER.equals(kind)) { + return "queue.process"; + } + return "queue"; + } + @SuppressWarnings("deprecation") private OtelSpanInfo descriptionForDbSystem(final @NotNull SpanData otelSpan) { final @NotNull Attributes attributes = otelSpan.getAttributes(); diff --git a/sentry-opentelemetry/sentry-opentelemetry-core/src/test/kotlin/SpanDescriptionExtractorTest.kt b/sentry-opentelemetry/sentry-opentelemetry-core/src/test/kotlin/SpanDescriptionExtractorTest.kt index 9c5a1a352df..26c4ea408cd 100644 --- a/sentry-opentelemetry/sentry-opentelemetry-core/src/test/kotlin/SpanDescriptionExtractorTest.kt +++ b/sentry-opentelemetry/sentry-opentelemetry-core/src/test/kotlin/SpanDescriptionExtractorTest.kt @@ -11,6 +11,8 @@ import io.opentelemetry.semconv.HttpAttributes import io.opentelemetry.semconv.UrlAttributes import io.opentelemetry.semconv.incubating.DbIncubatingAttributes import io.opentelemetry.semconv.incubating.HttpIncubatingAttributes +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes +import io.sentry.SentryOptions import io.sentry.protocol.TransactionNameSource import kotlin.test.Test import kotlin.test.assertEquals @@ -228,6 +230,216 @@ class SpanDescriptionExtractorTest { assertEquals(TransactionNameSource.TASK, info.transactionNameSource) } + @Test + fun `ignores messaging system when queue tracing disabled`() { + givenSpanName("my-topic publish") + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "publish", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = false) + + assertEquals("my-topic publish", info.op) + assertEquals("my-topic publish", info.description) + assertEquals(TransactionNameSource.CUSTOM, info.transactionNameSource) + } + + @Test + fun `maps messaging publish operation type to queue publish op`() { + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "publish", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.publish", info.op) + assertEquals("my-topic", info.description) + assertEquals(TransactionNameSource.TASK, info.transactionNameSource) + } + + @Test + fun `maps messaging process operation type to queue process op`() { + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "process", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.process", info.op) + assertEquals("my-topic", info.description) + assertEquals(TransactionNameSource.TASK, info.transactionNameSource) + } + + @Test + fun `maps messaging create operation type to queue create op`() { + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "create", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.create", info.op) + assertEquals("my-topic", info.description) + assertEquals(TransactionNameSource.TASK, info.transactionNameSource) + } + + @Test + fun `maps messaging receive operation type to queue receive op`() { + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "receive", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.receive", info.op) + assertEquals("my-topic", info.description) + assertEquals(TransactionNameSource.TASK, info.transactionNameSource) + } + + @Test + fun `maps messaging settle operation type to queue settle op`() { + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "rabbitmq", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-queue", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "settle", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.settle", info.op) + assertEquals("my-queue", info.description) + assertEquals(TransactionNameSource.TASK, info.transactionNameSource) + } + + @Test + fun `falls back to legacy messaging operation attribute`() { + @Suppress("DEPRECATION") + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "rabbitmq", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "queue-name", + MessagingIncubatingAttributes.MESSAGING_OPERATION to "publish", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.publish", info.op) + assertEquals("queue-name", info.description) + } + + @Test + fun `falls back to PRODUCER span kind when no operation attribute`() { + givenSpanKind(SpanKind.PRODUCER) + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.publish", info.op) + assertEquals("my-topic", info.description) + } + + @Test + fun `falls back to CONSUMER span kind when no operation attribute`() { + givenSpanKind(SpanKind.CONSUMER) + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.process", info.op) + assertEquals("my-topic", info.description) + } + + @Test + fun `falls back to span name as description when destination missing`() { + givenSpanName("my-topic publish") + givenAttributes( + mapOf( + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "publish", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.publish", info.op) + assertEquals("my-topic publish", info.description) + } + + @Test + fun `messaging mapping wins over http when both attributes present and queue tracing enabled`() { + // Some OTel instrumentations (e.g. aws-sdk-2.2 SQS) attach both messaging and http + // attributes to the same span. Messaging is more specific and must win. + givenSpanKind(SpanKind.PRODUCER) + givenAttributes( + mapOf( + HttpAttributes.HTTP_REQUEST_METHOD to "POST", + UrlAttributes.URL_FULL to "https://sqs.us-east-1.amazonaws.com/", + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "aws.sqs", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-queue", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "publish", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = true) + + assertEquals("queue.publish", info.op) + assertEquals("my-queue", info.description) + assertEquals(TransactionNameSource.TASK, info.transactionNameSource) + } + + @Test + fun `http mapping wins over messaging when queue tracing disabled`() { + givenSpanKind(SpanKind.CLIENT) + givenAttributes( + mapOf( + HttpAttributes.HTTP_REQUEST_METHOD to "POST", + UrlAttributes.URL_FULL to "https://sqs.us-east-1.amazonaws.com/", + MessagingIncubatingAttributes.MESSAGING_SYSTEM to "aws.sqs", + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-queue", + MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "publish", + ) + ) + + val info = whenExtractingSpanInfo(queueTracingEnabled = false) + + assertEquals("http.client", info.op) + assertEquals("POST https://sqs.us-east-1.amazonaws.com/", info.description) + assertEquals(TransactionNameSource.URL, info.transactionNameSource) + } + @Test fun `uses span name as op and description if no relevant attributes`() { givenSpanName("span name") @@ -289,9 +501,10 @@ class SpanDescriptionExtractorTest { builder.put(key as AttributeKey, value) } - private fun whenExtractingSpanInfo(): OtelSpanInfo { + private fun whenExtractingSpanInfo(queueTracingEnabled: Boolean = false): OtelSpanInfo { fixture.setup() - return SpanDescriptionExtractor().extractSpanInfo(fixture.otelSpan, fixture.sentrySpan) + val options = SentryOptions().apply { isEnableQueueTracing = queueTracingEnabled } + return SpanDescriptionExtractor().extractSpanInfo(fixture.otelSpan, fixture.sentrySpan, options) } private fun givenParentContext(parentContext: SpanContext) { diff --git a/sentry/api/sentry.api b/sentry/api/sentry.api index 9e5f09320b5..e4611a46d44 100644 --- a/sentry/api/sentry.api +++ b/sentry/api/sentry.api @@ -4398,9 +4398,11 @@ public abstract interface class io/sentry/SpanDataConvention { public static final field HTTP_STATUS_CODE_KEY Ljava/lang/String; public static final field MESSAGING_DESTINATION_NAME Ljava/lang/String; public static final field MESSAGING_MESSAGE_BODY_SIZE Ljava/lang/String; + public static final field MESSAGING_MESSAGE_ENVELOPE_SIZE Ljava/lang/String; public static final field MESSAGING_MESSAGE_ID Ljava/lang/String; public static final field MESSAGING_MESSAGE_RECEIVE_LATENCY Ljava/lang/String; public static final field MESSAGING_MESSAGE_RETRY_COUNT Ljava/lang/String; + public static final field MESSAGING_OPERATION_TYPE Ljava/lang/String; public static final field MESSAGING_SYSTEM Ljava/lang/String; public static final field PROFILER_ID Ljava/lang/String; public static final field THREAD_ID Ljava/lang/String; diff --git a/sentry/src/main/java/io/sentry/SentryOptions.java b/sentry/src/main/java/io/sentry/SentryOptions.java index 819789678e5..7db109e9d2e 100644 --- a/sentry/src/main/java/io/sentry/SentryOptions.java +++ b/sentry/src/main/java/io/sentry/SentryOptions.java @@ -2708,18 +2708,20 @@ public void setEnableCacheTracing(boolean enableCacheTracing) { } /** - * Whether queue operations (publish, process) should be traced. + * Whether Sentry emits Queue spans and transforms OpenTelemetry messaging spans to match Sentry's + * queue conventions. * - * @return true if queue operations should be traced + * @return true if queue tracing is enabled */ public boolean isEnableQueueTracing() { return enableQueueTracing; } /** - * Whether queue operations (publish, process) should be traced. + * Whether Sentry emits Queue spans and transforms OpenTelemetry messaging spans to match Sentry's + * queue conventions. * - * @param enableQueueTracing true if queue operations should be traced + * @param enableQueueTracing true to enable queue tracing */ public void setEnableQueueTracing(boolean enableQueueTracing) { this.enableQueueTracing = enableQueueTracing; diff --git a/sentry/src/main/java/io/sentry/SpanDataConvention.java b/sentry/src/main/java/io/sentry/SpanDataConvention.java index 047a235422d..4ede74505cb 100644 --- a/sentry/src/main/java/io/sentry/SpanDataConvention.java +++ b/sentry/src/main/java/io/sentry/SpanDataConvention.java @@ -35,5 +35,7 @@ public interface SpanDataConvention { String MESSAGING_MESSAGE_ID = "messaging.message.id"; String MESSAGING_MESSAGE_RETRY_COUNT = "messaging.message.retry.count"; String MESSAGING_MESSAGE_BODY_SIZE = "messaging.message.body.size"; + String MESSAGING_MESSAGE_ENVELOPE_SIZE = "messaging.message.envelope.size"; String MESSAGING_MESSAGE_RECEIVE_LATENCY = "messaging.message.receive.latency"; + String MESSAGING_OPERATION_TYPE = "messaging.operation.type"; }