From fc234387601c17c5b242389721964c665541cc09 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 27 Apr 2026 12:28:00 +0200 Subject: [PATCH 1/3] feat(kafka): [Queue Instrumentation 34] Wrap Producer for send spans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace SentryKafkaProducerInterceptor with SentryKafkaProducer, a Producer wrapper that records a queue.publish span around each send and finishes it when the broker ack callback fires. The span now reflects the full async send lifecycle, not just the synchronous onSend window. For Spring Boot, the SentryKafkaProducerBeanPostProcessor switches from patching KafkaTemplate.setProducerInterceptor(...) to installing a ProducerPostProcessor on every ProducerFactory bean via ProducerFactory.addPostProcessor(...). KafkaTemplate beans are no longer touched, so all customer-configured listeners, interceptors and observation settings are preserved. The console sample now wraps the raw KafkaProducer instead of setting INTERCEPTOR_CLASSES_CONFIG. Spring Boot samples need no change — the auto-configured ProducerPostProcessor is transparent. Co-Authored-By: Claude --- sentry-kafka/api/sentry-kafka.api | 26 +- .../kafka/SentryKafkaConsumerTracing.java | 2 +- .../io/sentry/kafka/SentryKafkaProducer.java | 299 ++++++++++++++++ .../kafka/SentryKafkaProducerInterceptor.java | 150 -------- .../kafka/SentryKafkaConsumerTracingTest.kt | 2 +- .../SentryKafkaProducerInterceptorTest.kt | 225 ------------ .../sentry/kafka/SentryKafkaProducerTest.kt | 338 ++++++++++++++++++ .../samples/console/kafka/KafkaShowcase.java | 21 +- .../KafkaOtelCoexistenceSystemTest.kt | 2 +- .../KafkaOtelCoexistenceSystemTest.kt | 2 +- .../boot/jakarta/SentryAutoConfiguration.java | 2 +- .../SentryKafkaAutoConfigurationTest.kt | 4 +- .../SentryKafkaProducerBeanPostProcessor.java | 91 ++--- .../kafka/SentryKafkaRecordInterceptor.java | 4 +- ...entryKafkaProducerBeanPostProcessorTest.kt | 96 +++-- .../kafka/SentryKafkaRecordInterceptorTest.kt | 4 +- 16 files changed, 781 insertions(+), 487 deletions(-) create mode 100644 sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java delete mode 100644 sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java delete mode 100644 sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt create mode 100644 sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index ce5b0efb66..64bb34a229 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -9,15 +9,27 @@ public final class io/sentry/kafka/SentryKafkaConsumerTracing { public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object; } -public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { +public final class io/sentry/kafka/SentryKafkaProducer : org/apache/kafka/clients/producer/Producer { public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String; public static final field TRACE_ORIGIN Ljava/lang/String; - public fun ()V - public fun (Lio/sentry/IScopes;)V - public fun (Lio/sentry/IScopes;Ljava/lang/String;)V + public fun (Lorg/apache/kafka/clients/producer/Producer;)V + public fun (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)V + public fun (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)V + public fun abortTransaction ()V + public fun beginTransaction ()V + public fun clientInstanceId (Ljava/time/Duration;)Lorg/apache/kafka/common/Uuid; public fun close ()V - public fun configure (Ljava/util/Map;)V - public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V - public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord; + public fun close (Ljava/time/Duration;)V + public fun commitTransaction ()V + public fun flush ()V + public fun getDelegate ()Lorg/apache/kafka/clients/producer/Producer; + public fun initTransactions ()V + public fun metrics ()Ljava/util/Map; + public fun partitionsFor (Ljava/lang/String;)Ljava/util/List; + public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;)Ljava/util/concurrent/Future; + public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;Lorg/apache/kafka/clients/producer/Callback;)Ljava/util/concurrent/Future; + public fun sendOffsetsToTransaction (Ljava/util/Map;Ljava/lang/String;)V + public fun sendOffsetsToTransaction (Ljava/util/Map;Lorg/apache/kafka/clients/consumer/ConsumerGroupMetadata;)V + public fun toString ()Ljava/lang/String; } diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java index 37c7073038..1231cae15e 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java @@ -241,7 +241,7 @@ private void finishTransaction( private @Nullable Long receiveLatency(final @NotNull ConsumerRecord record) { final @Nullable String enqueuedTimeStr = - headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); + headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr == null) { return null; } diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java new file mode 100644 index 0000000000..6b1278692e --- /dev/null +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java @@ -0,0 +1,299 @@ +package io.sentry.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.DateUtils; +import io.sentry.IScopes; +import io.sentry.ISpan; +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanOptions; +import io.sentry.SpanStatus; +import io.sentry.util.SpanUtils; +import io.sentry.util.TracingUtils; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send} + * and to inject Sentry trace propagation headers into the produced record. + * + *

Unlike a {@link org.apache.kafka.clients.producer.ProducerInterceptor}, the wrapper keeps the + * span open until the send callback fires, so the span reflects the actual broker-ack lifecycle. + * + *

For raw Kafka usage: + * + *

{@code
+ * Producer producer =
+ *     new SentryKafkaProducer<>(new KafkaProducer<>(props));
+ * }
+ * + *

For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code + * sentry-spring-jakarta} installs this wrapper automatically via {@code + * ProducerFactory.addPostProcessor(...)}. + */ +@ApiStatus.Experimental +public final class SentryKafkaProducer implements Producer { + + public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer"; + public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; + + private final @NotNull Producer delegate; + private final @NotNull IScopes scopes; + private final @NotNull String traceOrigin; + + public SentryKafkaProducer(final @NotNull Producer delegate) { + this(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN); + } + + public SentryKafkaProducer( + final @NotNull Producer delegate, final @NotNull IScopes scopes) { + this(delegate, scopes, TRACE_ORIGIN); + } + + public SentryKafkaProducer( + final @NotNull Producer delegate, + final @NotNull IScopes scopes, + final @NotNull String traceOrigin) { + this.delegate = delegate; + this.scopes = scopes; + this.traceOrigin = traceOrigin; + } + + /** Returns the wrapped producer. */ + public @NotNull Producer getDelegate() { + return delegate; + } + + @Override + public @NotNull Future send(final @NotNull ProducerRecord record) { + return send(record, null); + } + + @Override + public @NotNull Future send( + final @NotNull ProducerRecord record, final @Nullable Callback callback) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { + return delegate.send(record, callback); + } + + final @Nullable ISpan activeSpan = scopes.getSpan(); + if (activeSpan == null || activeSpan.isNoOp()) { + return delegate.send(record, callback); + } + + @Nullable ISpan span = null; + try { + final @NotNull SpanOptions spanOptions = new SpanOptions(); + spanOptions.setOrigin(traceOrigin); + span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); + if (span.isNoOp()) { + return delegate.send(record, callback); + } + + span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + injectHeaders(record.headers(), span); + } catch (Throwable t) { + if (span != null) { + span.setThrowable(t); + span.setStatus(SpanStatus.INTERNAL_ERROR); + if (!span.isFinished()) { + span.finish(); + } + } + scopes + .getOptions() + .getLogger() + .log(SentryLevel.ERROR, "Failed to instrument Kafka producer record.", t); + return delegate.send(record, callback); + } + + final @NotNull ISpan finalSpan = span; + final @NotNull Callback wrappedCallback = wrapCallback(callback, finalSpan); + + try { + return delegate.send(record, wrappedCallback); + } catch (Throwable t) { + finishWithError(finalSpan, t); + throw t; + } + } + + private @NotNull Callback wrapCallback( + final @Nullable Callback userCallback, final @NotNull ISpan span) { + return (metadata, exception) -> { + try { + if (exception != null) { + span.setThrowable(exception); + span.setStatus(SpanStatus.INTERNAL_ERROR); + } else { + span.setStatus(SpanStatus.OK); + } + } catch (Throwable t) { + scopes + .getOptions() + .getLogger() + .log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t); + } finally { + if (!span.isFinished()) { + span.finish(); + } + if (userCallback != null) { + userCallback.onCompletion(metadata, exception); + } + } + }; + } + + private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) { + span.setThrowable(t); + span.setStatus(SpanStatus.INTERNAL_ERROR); + if (!span.isFinished()) { + span.finish(); + } + } + + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); + } + + private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { + final @Nullable List existingBaggageHeaders = + readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); + final @Nullable TracingUtils.TracingHeaders tracingHeaders = + TracingUtils.trace(scopes, existingBaggageHeaders, span); + if (tracingHeaders != null) { + final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); + headers.remove(sentryTraceHeader.getName()); + headers.add( + sentryTraceHeader.getName(), + sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); + + final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); + if (baggageHeader != null) { + headers.remove(baggageHeader.getName()); + headers.add( + baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); + } + } + + headers.remove(SENTRY_ENQUEUED_TIME_HEADER); + headers.add( + SENTRY_ENQUEUED_TIME_HEADER, + DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis())) + .toString() + .getBytes(StandardCharsets.UTF_8)); + } + + private static @Nullable List readHeaderValues( + final @NotNull Headers headers, final @NotNull String name) { + @Nullable List values = null; + for (final @NotNull Header header : headers.headers(name)) { + final byte @Nullable [] value = header.value(); + if (value != null) { + if (values == null) { + values = new ArrayList<>(); + } + values.add(new String(value, StandardCharsets.UTF_8)); + } + } + return values; + } + + // --- Pure delegation for everything else --- + + @Override + public void initTransactions() { + delegate.initTransactions(); + } + + @Override + public void beginTransaction() throws ProducerFencedException { + delegate.beginTransaction(); + } + + @Override + @SuppressWarnings("deprecation") + public void sendOffsetsToTransaction( + final @NotNull Map offsets, + final @NotNull String consumerGroupId) + throws ProducerFencedException { + delegate.sendOffsetsToTransaction(offsets, consumerGroupId); + } + + @Override + public void sendOffsetsToTransaction( + final @NotNull Map offsets, + final @NotNull ConsumerGroupMetadata groupMetadata) + throws ProducerFencedException { + delegate.sendOffsetsToTransaction(offsets, groupMetadata); + } + + @Override + public void commitTransaction() throws ProducerFencedException { + delegate.commitTransaction(); + } + + @Override + public void abortTransaction() throws ProducerFencedException { + delegate.abortTransaction(); + } + + @Override + public void flush() { + delegate.flush(); + } + + @Override + public @NotNull List partitionsFor(final @NotNull String topic) { + return delegate.partitionsFor(topic); + } + + @Override + public @NotNull Map metrics() { + return delegate.metrics(); + } + + @Override + public @NotNull Uuid clientInstanceId(final @NotNull Duration timeout) { + return delegate.clientInstanceId(timeout); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public void close(final @NotNull Duration timeout) { + delegate.close(timeout); + } + + @Override + public @NotNull String toString() { + return "SentryKafkaProducer[delegate=" + delegate + "]"; + } +} diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java deleted file mode 100644 index 6bcb424397..0000000000 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java +++ /dev/null @@ -1,150 +0,0 @@ -package io.sentry.kafka; - -import io.sentry.BaggageHeader; -import io.sentry.DateUtils; -import io.sentry.IScopes; -import io.sentry.ISpan; -import io.sentry.ScopesAdapter; -import io.sentry.SentryLevel; -import io.sentry.SentryTraceHeader; -import io.sentry.SpanDataConvention; -import io.sentry.SpanOptions; -import io.sentry.SpanStatus; -import io.sentry.util.SpanUtils; -import io.sentry.util.TracingUtils; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.kafka.clients.producer.ProducerInterceptor; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.jetbrains.annotations.ApiStatus; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -@ApiStatus.Experimental -public final class SentryKafkaProducerInterceptor implements ProducerInterceptor { - - public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer"; - public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; - - private final @NotNull IScopes scopes; - private final @NotNull String traceOrigin; - - public SentryKafkaProducerInterceptor() { - this(ScopesAdapter.getInstance(), TRACE_ORIGIN); - } - - public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) { - this(scopes, TRACE_ORIGIN); - } - - public SentryKafkaProducerInterceptor( - final @NotNull IScopes scopes, final @NotNull String traceOrigin) { - this.scopes = scopes; - this.traceOrigin = traceOrigin; - } - - @Override - public @NotNull ProducerRecord onSend(final @NotNull ProducerRecord record) { - if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { - return record; - } - - final @Nullable ISpan activeSpan = scopes.getSpan(); - if (activeSpan == null || activeSpan.isNoOp()) { - return record; - } - - @Nullable ISpan span = null; - try { - final @NotNull SpanOptions spanOptions = new SpanOptions(); - spanOptions.setOrigin(traceOrigin); - span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); - if (span.isNoOp()) { - return record; - } - - span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); - span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); - - injectHeaders(record.headers(), span); - span.setStatus(SpanStatus.OK); - } catch (Throwable t) { - if (span != null) { - span.setThrowable(t); - span.setStatus(SpanStatus.INTERNAL_ERROR); - } - scopes - .getOptions() - .getLogger() - .log(SentryLevel.ERROR, "Failed to instrument Kafka producer record.", t); - } finally { - if (span != null && !span.isFinished()) { - span.finish(); - } - } - - return record; - } - - @Override - public void onAcknowledgement( - final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {} - - private boolean isIgnored() { - return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); - } - - @Override - public void close() {} - - @Override - public void configure(final @Nullable Map configs) {} - - private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { - final @Nullable List existingBaggageHeaders = - readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); - final @Nullable TracingUtils.TracingHeaders tracingHeaders = - TracingUtils.trace(scopes, existingBaggageHeaders, span); - if (tracingHeaders != null) { - final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); - headers.remove(sentryTraceHeader.getName()); - headers.add( - sentryTraceHeader.getName(), - sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); - - final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); - if (baggageHeader != null) { - headers.remove(baggageHeader.getName()); - headers.add( - baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); - } - } - - headers.remove(SENTRY_ENQUEUED_TIME_HEADER); - headers.add( - SENTRY_ENQUEUED_TIME_HEADER, - DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis())) - .toString() - .getBytes(StandardCharsets.UTF_8)); - } - - private static @Nullable List readHeaderValues( - final @NotNull Headers headers, final @NotNull String name) { - @Nullable List values = null; - for (final @NotNull Header header : headers.headers(name)) { - final byte @Nullable [] value = header.value(); - if (value != null) { - if (values == null) { - values = new ArrayList<>(); - } - values.add(new String(value, StandardCharsets.UTF_8)); - } - } - return values; - } -} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt index 38c0bf3198..3bd992e8c8 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt @@ -232,7 +232,7 @@ class SentryKafkaConsumerTracingTest { } enqueuedTime?.let { headers.add( - SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, + SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER, it.toByteArray(StandardCharsets.UTF_8), ) } diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt deleted file mode 100644 index 758deed094..0000000000 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt +++ /dev/null @@ -1,225 +0,0 @@ -package io.sentry.kafka - -import io.sentry.BaggageHeader -import io.sentry.IScopes -import io.sentry.ISentryLifecycleToken -import io.sentry.ISpan -import io.sentry.Sentry -import io.sentry.SentryOptions -import io.sentry.SentryTraceHeader -import io.sentry.SentryTracer -import io.sentry.SpanOptions -import io.sentry.SpanStatus -import io.sentry.TransactionContext -import io.sentry.test.initForTest -import java.nio.charset.StandardCharsets -import kotlin.test.AfterTest -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertNotNull -import kotlin.test.assertSame -import kotlin.test.assertTrue -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.Header -import org.apache.kafka.common.header.Headers -import org.mockito.kotlin.any -import org.mockito.kotlin.eq -import org.mockito.kotlin.mock -import org.mockito.kotlin.verify -import org.mockito.kotlin.whenever - -class SentryKafkaProducerInterceptorTest { - - private lateinit var scopes: IScopes - private lateinit var options: SentryOptions - - @BeforeTest - fun setup() { - initForTest { - it.dsn = "https://key@sentry.io/proj" - it.isEnableQueueTracing = true - it.tracesSampleRate = 1.0 - } - scopes = mock() - options = - SentryOptions().apply { - dsn = "https://key@sentry.io/proj" - isEnableQueueTracing = true - } - whenever(scopes.options).thenReturn(options) - } - - @AfterTest - fun teardown() { - Sentry.close() - } - - private fun createTransaction(): SentryTracer { - val tx = SentryTracer(TransactionContext("tx", "op"), scopes) - whenever(scopes.span).thenReturn(tx) - return tx - } - - @Test - fun `creates queue publish span and injects headers`() { - val tx = createTransaction() - val interceptor = SentryKafkaProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - interceptor.onSend(record) - - assertEquals(1, tx.spans.size) - val span = tx.spans.first() - assertEquals("queue.publish", span.operation) - assertEquals("my-topic", span.description) - assertEquals("kafka", span.data["messaging.system"]) - assertEquals("my-topic", span.data["messaging.destination.name"]) - assertEquals(SentryKafkaProducerInterceptor.TRACE_ORIGIN, span.spanContext.origin) - assertTrue(span.isFinished) - - val sentryTraceHeader = record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER) - assertNotNull(sentryTraceHeader) - - val enqueuedTimeHeader = - record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) - assertNotNull(enqueuedTimeHeader) - val enqueuedTimeRaw = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8) - // Must be written as a plain decimal so cross-SDK consumers (e.g. sentry-python) can - // parse it. String.valueOf(double) would emit scientific notation (e.g. 1.77E9) for - // epoch seconds. - assertFalse( - enqueuedTimeRaw.contains('E') || enqueuedTimeRaw.contains('e'), - "enqueued-time header must not use scientific notation, got: $enqueuedTimeRaw", - ) - assertTrue( - enqueuedTimeRaw.matches(Regex("""^\d+\.\d{6}$""")), - "enqueued-time header must be plain epoch seconds with 6 decimals, got: $enqueuedTimeRaw", - ) - val enqueuedTime = enqueuedTimeRaw.toDouble() - assertTrue(enqueuedTime > 0) - } - - @Test - fun `preserves pre-existing third-party baggage header entries`() { - val tx = createTransaction() - val interceptor = SentryKafkaProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - record - .headers() - .add( - BaggageHeader.BAGGAGE_HEADER, - "othervendor=someValue,another=thing".toByteArray(StandardCharsets.UTF_8), - ) - - interceptor.onSend(record) - - val baggageHeaders = record.headers().headers(BaggageHeader.BAGGAGE_HEADER).toList() - assertEquals(1, baggageHeaders.size) - val baggageValue = String(baggageHeaders.first().value(), StandardCharsets.UTF_8) - assertTrue( - baggageValue.contains("othervendor=someValue"), - "expected third-party baggage entry preserved, got: $baggageValue", - ) - assertTrue( - baggageValue.contains("another=thing"), - "expected third-party baggage entry preserved, got: $baggageValue", - ) - assertTrue( - baggageValue.contains("sentry-"), - "expected Sentry baggage entries appended, got: $baggageValue", - ) - } - - @Test - fun `finishes span with error when header injection fails`() { - val activeSpan = mock() - val span = mock() - val headers = mock() - val record = mock>() - val exception = RuntimeException("boom") - whenever(scopes.span).thenReturn(activeSpan) - whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any())) - .thenReturn(span) - whenever(span.isNoOp).thenReturn(false) - whenever(span.isFinished).thenReturn(false) - whenever(span.toSentryTrace()) - .thenReturn(SentryTraceHeader("2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1")) - whenever(span.toBaggageHeader(null)).thenReturn(null) - whenever(record.topic()).thenReturn("my-topic") - whenever(record.headers()).thenReturn(headers) - whenever(headers.headers(BaggageHeader.BAGGAGE_HEADER)).thenReturn(emptyList

()) - whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)).thenThrow(exception) - - val interceptor = SentryKafkaProducerInterceptor(scopes) - - interceptor.onSend(record) - - verify(span).setStatus(SpanStatus.INTERNAL_ERROR) - verify(span).setThrowable(exception) - verify(span).finish() - } - - @Test - fun `does not create span when queue tracing is disabled`() { - val tx = createTransaction() - options.isEnableQueueTracing = false - val interceptor = SentryKafkaProducerInterceptor(scopes) - - interceptor.onSend(ProducerRecord("my-topic", "key", "value")) - - assertEquals(0, tx.spans.size) - } - - @Test - fun `does not create span when trace origin is ignored`() { - val tx = createTransaction() - options.setIgnoredSpanOrigins(listOf(SentryKafkaProducerInterceptor.TRACE_ORIGIN)) - val interceptor = SentryKafkaProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - interceptor.onSend(record) - - assertEquals(0, tx.spans.size) - assertEquals(null, record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) - assertEquals( - null, - record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER), - ) - } - - @Test - fun `returns original record when no active span`() { - whenever(scopes.span).thenReturn(null) - val interceptor = SentryKafkaProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - val result = interceptor.onSend(record) - - assertSame(record, result) - } - - @Test - fun `no-arg constructor uses current scopes`() { - val transaction = Sentry.startTransaction("tx", "op") - val record = ProducerRecord("my-topic", "key", "value") - - try { - val token: ISentryLifecycleToken = transaction.makeCurrent() - try { - val interceptor = SentryKafkaProducerInterceptor() - interceptor.onSend(record) - } finally { - token.close() - } - } finally { - transaction.finish() - } - - assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) - assertNotNull( - record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) - ) - } -} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt new file mode 100644 index 0000000000..aa3135ca12 --- /dev/null +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt @@ -0,0 +1,338 @@ +package io.sentry.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.ISpan +import io.sentry.Sentry +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.SpanOptions +import io.sentry.SpanStatus +import io.sentry.TransactionContext +import io.sentry.test.initForTest +import java.nio.charset.StandardCharsets +import java.util.concurrent.CompletableFuture +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNotNull +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.Headers +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.eq +import org.mockito.kotlin.isNull +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever + +class SentryKafkaProducerTest { + + private lateinit var scopes: IScopes + private lateinit var options: SentryOptions + private lateinit var delegate: Producer + + @BeforeTest + fun setup() { + initForTest { + it.dsn = "https://key@sentry.io/proj" + it.isEnableQueueTracing = true + it.tracesSampleRate = 1.0 + } + scopes = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + } + whenever(scopes.options).thenReturn(options) + delegate = mock() + whenever(delegate.send(any(), any())).thenReturn(CompletableFuture.completedFuture(null)) + } + + @AfterTest + fun teardown() { + Sentry.close() + } + + private fun createTransaction(): SentryTracer { + val tx = SentryTracer(TransactionContext("tx", "op"), scopes) + whenever(scopes.span).thenReturn(tx) + return tx + } + + @Test + fun `creates queue publish span and injects headers`() { + val tx = createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + assertEquals(1, tx.spans.size) + val span = tx.spans.first() + assertEquals("queue.publish", span.operation) + assertEquals("my-topic", span.description) + assertEquals("kafka", span.data["messaging.system"]) + assertEquals("my-topic", span.data["messaging.destination.name"]) + assertEquals(SentryKafkaProducer.TRACE_ORIGIN, span.spanContext.origin) + + val sentryTraceHeader = record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER) + assertNotNull(sentryTraceHeader) + + val enqueuedTimeHeader = + record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER) + assertNotNull(enqueuedTimeHeader) + val enqueuedTimeRaw = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8) + // Cross-SDK consumers (e.g. sentry-python) parse this as a plain decimal — must not use + // scientific notation. + assertFalse(enqueuedTimeRaw.contains('E') || enqueuedTimeRaw.contains('e')) + assertTrue(enqueuedTimeRaw.matches(Regex("""^\d+\.\d{6}$"""))) + } + + @Test + fun `delegates send and does not finish span synchronously`() { + val tx = createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + verify(delegate).send(eq(record), any()) + val span = tx.spans.first() + assertFalse(span.isFinished, "span should be open until callback fires") + } + + @Test + fun `finishes span as OK when broker ack callback succeeds`() { + val tx = createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + val captor = argumentCaptor() + verify(delegate).send(eq(record), captor.capture()) + val metadata = RecordMetadata(TopicPartition("my-topic", 0), 0L, 0, 0L, 0, 0) + captor.firstValue.onCompletion(metadata, null) + + val span = tx.spans.first() + assertTrue(span.isFinished) + assertEquals(SpanStatus.OK, span.status) + } + + @Test + fun `finishes span as INTERNAL_ERROR when broker ack callback fails`() { + val tx = createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + val exception = RuntimeException("boom") + + producer.send(record) + + val captor = argumentCaptor() + verify(delegate).send(eq(record), captor.capture()) + captor.firstValue.onCompletion(null, exception) + + val span = tx.spans.first() + assertTrue(span.isFinished) + assertEquals(SpanStatus.INTERNAL_ERROR, span.status) + assertSame(exception, span.throwable) + } + + @Test + fun `forwards user callback after finishing span`() { + createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + val userCallback = mock() + + producer.send(record, userCallback) + + val captor = argumentCaptor() + verify(delegate).send(eq(record), captor.capture()) + val metadata = RecordMetadata(TopicPartition("my-topic", 0), 0L, 0, 0L, 0, 0) + captor.firstValue.onCompletion(metadata, null) + + verify(userCallback).onCompletion(metadata, null) + } + + @Test + fun `finishes span with error when delegate send throws synchronously`() { + val tx = createTransaction() + val exception = RuntimeException("kaboom") + whenever(delegate.send(any(), any())).thenThrow(exception) + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + val thrown = runCatching { producer.send(record) }.exceptionOrNull() + + assertSame(exception, thrown) + val span = tx.spans.first() + assertTrue(span.isFinished) + assertEquals(SpanStatus.INTERNAL_ERROR, span.status) + assertSame(exception, span.throwable) + } + + @Test + fun `delegates send without span when queue tracing is disabled`() { + createTransaction() + options.isEnableQueueTracing = false + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + verify(delegate).send(eq(record), isNull()) + } + + @Test + fun `delegates send without span when trace origin is ignored`() { + val tx = createTransaction() + options.setIgnoredSpanOrigins(listOf(SentryKafkaProducer.TRACE_ORIGIN)) + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + assertEquals(0, tx.spans.size) + verify(delegate).send(eq(record), isNull()) + assertEquals(null, record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + } + + @Test + fun `delegates send without span when no active span`() { + whenever(scopes.span).thenReturn(null) + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + verify(delegate).send(eq(record), isNull()) + } + + @Test + fun `preserves pre-existing third-party baggage header entries`() { + createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + record + .headers() + .add( + BaggageHeader.BAGGAGE_HEADER, + "othervendor=someValue,another=thing".toByteArray(StandardCharsets.UTF_8), + ) + + producer.send(record) + + val baggageHeaders = record.headers().headers(BaggageHeader.BAGGAGE_HEADER).toList() + assertEquals(1, baggageHeaders.size) + val baggageValue = String(baggageHeaders.first().value(), StandardCharsets.UTF_8) + assertTrue(baggageValue.contains("othervendor=someValue")) + assertTrue(baggageValue.contains("another=thing")) + assertTrue(baggageValue.contains("sentry-")) + } + + @Test + fun `finishes span with error when header injection fails`() { + val activeSpan = mock() + val span = mock() + val headers = mock() + val record = mock>() + val exception = RuntimeException("boom") + whenever(scopes.span).thenReturn(activeSpan) + whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any())) + .thenReturn(span) + whenever(span.isNoOp).thenReturn(false) + whenever(span.isFinished).thenReturn(false) + whenever(span.toSentryTrace()) + .thenReturn(SentryTraceHeader("2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1")) + whenever(span.toBaggageHeader(null)).thenReturn(null) + whenever(record.topic()).thenReturn("my-topic") + whenever(record.headers()).thenReturn(headers) + whenever(headers.headers(BaggageHeader.BAGGAGE_HEADER)).thenReturn(emptyList
()) + whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)).thenThrow(exception) + + val producer = SentryKafkaProducer(delegate, scopes) + producer.send(record) + + verify(span).setStatus(SpanStatus.INTERNAL_ERROR) + verify(span).setThrowable(exception) + verify(span).finish() + // After header-injection failure, falls back to a plain delegate send (no Sentry callback). + verify(delegate).send(eq(record), isNull()) + } + + @Test + fun `delegates non-send methods to underlying producer`() { + val producer = SentryKafkaProducer(delegate, scopes) + + producer.flush() + producer.partitionsFor("my-topic") + producer.metrics() + producer.close() + + verify(delegate).flush() + verify(delegate).partitionsFor("my-topic") + verify(delegate).metrics() + verify(delegate).close() + } + + @Test + fun `no-arg constructor uses current scopes`() { + val transaction = Sentry.startTransaction("tx", "op") + val record = ProducerRecord("my-topic", "key", "value") + + try { + val token: ISentryLifecycleToken = transaction.makeCurrent() + try { + val producer = SentryKafkaProducer(delegate) + producer.send(record) + } finally { + token.close() + } + } finally { + transaction.finish() + } + + assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER)) + verify(delegate).send(eq(record), any()) + } + + @Test + fun `getDelegate exposes wrapped producer`() { + val producer = SentryKafkaProducer(delegate, scopes) + assertSame(delegate, producer.delegate) + } + + @Test + fun `does not invoke sentry callback wrap when no-op span returned`() { + val activeSpan = mock() + val span = mock() + val record = ProducerRecord("my-topic", "key", "value") + whenever(scopes.span).thenReturn(activeSpan) + whenever(activeSpan.isNoOp).thenReturn(false) + whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any())) + .thenReturn(span) + whenever(span.isNoOp).thenReturn(true) + + val producer = SentryKafkaProducer(delegate, scopes) + producer.send(record) + + verify(delegate).send(eq(record), isNull()) + verify(span, never()).finish() + } +} diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index da89145cfe..cc819ac0db 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -4,7 +4,7 @@ import io.sentry.ITransaction; import io.sentry.Sentry; import io.sentry.kafka.SentryKafkaConsumerTracing; -import io.sentry.kafka.SentryKafkaProducerInterceptor; +import io.sentry.kafka.SentryKafkaProducer; import java.time.Duration; import java.util.Collections; import java.util.Properties; @@ -16,6 +16,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; @@ -30,11 +31,19 @@ private KafkaShowcase() {} public static void runKafkaWithSentryTracing(final String bootstrapServers) { final CountDownLatch consumedLatch = new CountDownLatch(1); final Thread consumerThread = startConsumerWithSentryTracing(bootstrapServers, consumedLatch); - final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers); + final Properties producerProperties = createProducerProperties(bootstrapServers); final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { - try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { + // 1. Create the raw Kafka producer as you normally would. + final KafkaProducer rawProducer = new KafkaProducer<>(producerProperties); + + // 2. >>> Sentry instrumentation <<< + // Wrap it in SentryKafkaProducer so every send is captured as a + // `queue.publish` span that closes when the broker ack callback fires. + final Producer producer = new SentryKafkaProducer<>(rawProducer); + + try (producer) { Thread.sleep(500); producer.send(new ProducerRecord<>(TOPIC, "sentry-kafka sample message")).get(); } catch (InterruptedException e) { @@ -59,7 +68,7 @@ public static void runKafkaWithSentryTracing(final String bootstrapServers) { } } - public static Properties createProducerPropertiesWithSentry(final String bootstrapServers) { + public static Properties createProducerProperties(final String bootstrapServers) { final Properties producerProperties = new Properties(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProperties.put( @@ -67,10 +76,6 @@ public static Properties createProducerPropertiesWithSentry(final String bootstr producerProperties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - // Required for Sentry queue tracing in kafka-clients producer setup. - producerProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); - // Optional tuning for sample stability in CI/local runs. producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt index 61c298f86c..6ede83510e 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt @@ -10,7 +10,7 @@ import org.junit.Before * * The Sentry Kafka auto-configuration (`SentryKafkaQueueConfiguration`) is intentionally suppressed * when `io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider` is on the classpath, so - * the Sentry `SentryKafkaProducerInterceptor` and `SentryKafkaRecordInterceptor` must not be wired. + * the Sentry `SentryKafkaProducer` and `SentryKafkaRecordInterceptor` must not be wired. * * These tests produce a Kafka message end-to-end and assert that Sentry-style `queue.publish` / * `queue.process` spans/transactions are *not* emitted. Any Kafka telemetry in OTel mode must come diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt index f55303541b..d150fe70cd 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt @@ -10,7 +10,7 @@ import org.junit.Before * * The Sentry Kafka auto-configuration (`SentryKafkaQueueConfiguration`) is intentionally suppressed * when `io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider` is on the classpath, so - * the Sentry `SentryKafkaProducerInterceptor` and `SentryKafkaRecordInterceptor` must not be wired. + * the Sentry `SentryKafkaProducer` and `SentryKafkaRecordInterceptor` must not be wired. * * These tests produce a Kafka message end-to-end and assert that Sentry-style `queue.publish` / * `queue.process` spans/transactions are *not* emitted. Any Kafka telemetry in OTel mode must come diff --git a/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java b/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java index 688153046f..b678abc716 100644 --- a/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java +++ b/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java @@ -252,7 +252,7 @@ static class SentryCacheConfiguration { @ConditionalOnClass( name = { "org.springframework.kafka.core.KafkaTemplate", - "io.sentry.kafka.SentryKafkaProducerInterceptor" + "io.sentry.kafka.SentryKafkaProducer" }) @ConditionalOnProperty(name = "sentry.enable-queue-tracing", havingValue = "true") @ConditionalOnMissingClass("io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider") diff --git a/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt b/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt index c3a4c12e35..5b010891a1 100644 --- a/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt +++ b/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt @@ -1,6 +1,6 @@ package io.sentry.spring.boot.jakarta -import io.sentry.kafka.SentryKafkaProducerInterceptor +import io.sentry.kafka.SentryKafkaProducer import io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider import io.sentry.spring.jakarta.kafka.SentryKafkaConsumerBeanPostProcessor import io.sentry.spring.jakarta.kafka.SentryKafkaProducerBeanPostProcessor @@ -34,7 +34,7 @@ class SentryKafkaAutoConfigurationTest { private val noSentryKafkaClassLoader = FilteredClassLoader( - SentryKafkaProducerInterceptor::class.java, + SentryKafkaProducer::class.java, SentryAutoConfigurationCustomizerProvider::class.java, ) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java index 4ce6a7c5ed..ed3faba853 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -1,84 +1,71 @@ package io.sentry.spring.jakarta.kafka; import io.sentry.ScopesAdapter; -import io.sentry.SentryLevel; -import io.sentry.kafka.SentryKafkaProducerInterceptor; -import java.lang.reflect.Field; -import org.apache.kafka.clients.producer.ProducerInterceptor; +import io.sentry.kafka.SentryKafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.core.Ordered; import org.springframework.core.PriorityOrdered; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.CompositeProducerInterceptor; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.ProducerPostProcessor; /** - * Sets a {@link SentryKafkaProducerInterceptor} on {@link KafkaTemplate} beans via {@link - * KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced. + * Installs a {@link ProducerPostProcessor} on every {@link ProducerFactory} bean so that each + * {@link Producer} created by Spring Kafka is wrapped in a {@link SentryKafkaProducer}. * - *

If the template already has a {@link ProducerInterceptor}, both are composed using {@link - * CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public - * getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry - * interceptor is set. + *

The wrapper records a {@code queue.publish} span around each {@code send(...)} that finishes + * when the broker ack callback fires, giving a real producer-send lifecycle span. {@code + * KafkaTemplate} beans are left untouched, so all customer-configured listeners, interceptors and + * observation settings are preserved. + * + *

Idempotent: re-running on the same factory does not register the post-processor twice. + * + *

Note: {@link ProducerFactory#addPostProcessor(ProducerPostProcessor)} is a default method on + * the interface. Custom factories that do not extend {@code DefaultKafkaProducerFactory} and do not + * implement {@code addPostProcessor} will silently no-op. */ @ApiStatus.Internal public final class SentryKafkaProducerBeanPostProcessor implements BeanPostProcessor, PriorityOrdered { @Override - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public @NotNull Object postProcessAfterInitialization( final @NotNull Object bean, final @NotNull String beanName) throws BeansException { - if (bean instanceof KafkaTemplate) { - final @NotNull KafkaTemplate template = (KafkaTemplate) bean; - final @Nullable ProducerInterceptor existing = getExistingInterceptor(template); + if (bean instanceof ProducerFactory) { + final @NotNull ProducerFactory factory = (ProducerFactory) bean; - if (existing instanceof SentryKafkaProducerInterceptor) { - return bean; + for (final Object existing : factory.getPostProcessors()) { + if (existing instanceof SentryProducerPostProcessor) { + return bean; + } } - @SuppressWarnings("rawtypes") - final SentryKafkaProducerInterceptor sentryInterceptor = - new SentryKafkaProducerInterceptor<>( - ScopesAdapter.getInstance(), "auto.queue.spring_jakarta.kafka.producer"); - - if (existing != null) { - @SuppressWarnings("rawtypes") - final CompositeProducerInterceptor composite = - new CompositeProducerInterceptor(sentryInterceptor, existing); - template.setProducerInterceptor(composite); - } else { - template.setProducerInterceptor(sentryInterceptor); - } + factory.addPostProcessor(new SentryProducerPostProcessor<>()); } return bean; } - @SuppressWarnings("unchecked") - private @Nullable ProducerInterceptor getExistingInterceptor( - final @NotNull KafkaTemplate template) { - try { - final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor"); - field.setAccessible(true); - return (ProducerInterceptor) field.get(template); - } catch (NoSuchFieldException | IllegalAccessException e) { - ScopesAdapter.getInstance() - .getOptions() - .getLogger() - .log( - SentryLevel.WARNING, - "Unable to read existing producerInterceptor from KafkaTemplate via reflection. " - + "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.", - e); - return null; - } - } - @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } + + /** + * Marker {@link ProducerPostProcessor} that wraps the freshly created Kafka {@link Producer} in a + * {@link SentryKafkaProducer}, unless it is already wrapped. + */ + static final class SentryProducerPostProcessor implements ProducerPostProcessor { + @Override + public @NotNull Producer apply(final @NotNull Producer producer) { + if (producer instanceof SentryKafkaProducer) { + return producer; + } + return new SentryKafkaProducer<>( + producer, ScopesAdapter.getInstance(), "auto.queue.spring_jakarta.kafka.producer"); + } + } } diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index d2302dca57..a6b5247fe7 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -10,7 +10,7 @@ import io.sentry.SpanStatus; import io.sentry.TransactionContext; import io.sentry.TransactionOptions; -import io.sentry.kafka.SentryKafkaProducerInterceptor; +import io.sentry.kafka.SentryKafkaProducer; import io.sentry.util.SpanUtils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -188,7 +188,7 @@ private boolean isIgnored() { } final @Nullable String enqueuedTimeStr = - headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); + headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr != null) { try { final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt index f0247178f2..9d36e9274c 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -1,50 +1,54 @@ package io.sentry.spring.jakarta.kafka -import io.sentry.kafka.SentryKafkaProducerInterceptor +import io.sentry.kafka.SentryKafkaProducer import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertSame import kotlin.test.assertTrue -import org.apache.kafka.clients.producer.ProducerInterceptor +import org.apache.kafka.clients.producer.Producer +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.mock -import org.springframework.kafka.core.KafkaTemplate +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.support.CompositeProducerInterceptor +import org.springframework.kafka.core.ProducerPostProcessor class SentryKafkaProducerBeanPostProcessorTest { - private fun readInterceptor(template: KafkaTemplate<*, *>): Any? { - val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor") - field.isAccessible = true - return field.get(template) - } - @Test - fun `sets SentryKafkaProducerInterceptor on KafkaTemplate`() { - val template = KafkaTemplate(mock>()) + fun `registers Sentry post-processor on ProducerFactory`() { + val factory = mock>() + whenever(factory.postProcessors).thenReturn(emptyList()) val processor = SentryKafkaProducerBeanPostProcessor() - processor.postProcessAfterInitialization(template, "kafkaTemplate") + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") - assertTrue(readInterceptor(template) is SentryKafkaProducerInterceptor<*, *>) + val captor = argumentCaptor>() + verify(factory).addPostProcessor(captor.capture()) + assertTrue( + captor.firstValue is SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor<*, *> + ) } @Test - fun `does not double-wrap when SentryKafkaProducerInterceptor already set`() { - val template = KafkaTemplate(mock>()) + fun `is idempotent when Sentry post-processor is already registered`() { + val factory = mock>() + val existing = + SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + whenever(factory.postProcessors).thenReturn(listOf(existing)) val processor = SentryKafkaProducerBeanPostProcessor() - processor.postProcessAfterInitialization(template, "kafkaTemplate") - val firstInterceptor = readInterceptor(template) + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") - processor.postProcessAfterInitialization(template, "kafkaTemplate") - val secondInterceptor = readInterceptor(template) - - assertSame(firstInterceptor, secondInterceptor) + verify(factory, never()).addPostProcessor(any()) } @Test - fun `does not modify non-KafkaTemplate beans`() { - val someBean = "not a kafka template" + fun `does not modify non-ProducerFactory beans`() { + val someBean = "not a producer factory" val processor = SentryKafkaProducerBeanPostProcessor() val result = processor.postProcessAfterInitialization(someBean, "someBean") @@ -54,26 +58,50 @@ class SentryKafkaProducerBeanPostProcessorTest { @Test fun `returns the same bean instance`() { - val template = KafkaTemplate(mock>()) + val factory = mock>() + whenever(factory.postProcessors).thenReturn(emptyList()) val processor = SentryKafkaProducerBeanPostProcessor() - val result = processor.postProcessAfterInitialization(template, "kafkaTemplate") + val result = processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") - assertSame(template, result, "BPP should return the same bean, not a replacement") + assertSame(factory, result, "BPP must return the same bean, not a replacement") } @Test - fun `composes with existing customer interceptor using CompositeProducerInterceptor`() { - val template = KafkaTemplate(mock>()) - val customerInterceptor = mock>() - template.setProducerInterceptor(customerInterceptor) + fun `registered post-processor wraps producers in SentryKafkaProducer`() { + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + val raw = mock>() + + val wrapped = pp.apply(raw) + + assertTrue(wrapped is SentryKafkaProducer<*, *>) + assertSame(raw, (wrapped as SentryKafkaProducer).delegate) + } + @Test + fun `registered post-processor does not double-wrap`() { + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + val raw = mock>() + val alreadyWrapped = SentryKafkaProducer(raw) + + val result = pp.apply(alreadyWrapped) + + assertSame(alreadyWrapped, result) + } + + @Test + fun `integrates with DefaultKafkaProducerFactory addPostProcessor contract`() { + // Sanity check against the real Spring Kafka API surface — DefaultKafkaProducerFactory + // honors addPostProcessor and exposes it via getPostProcessors(). + val factory = DefaultKafkaProducerFactory(emptyMap()) val processor = SentryKafkaProducerBeanPostProcessor() - processor.postProcessAfterInitialization(template, "kafkaTemplate") + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + + assertEquals(1, factory.postProcessors.size) assertTrue( - readInterceptor(template) is CompositeProducerInterceptor<*, *>, - "Should use CompositeProducerInterceptor when existing interceptor is present", + factory.postProcessors.first() + is SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor<*, *> ) } } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index c17025285c..c08756da69 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -9,7 +9,7 @@ import io.sentry.SentryTraceHeader import io.sentry.SentryTracer import io.sentry.SpanDataConvention import io.sentry.TransactionContext -import io.sentry.kafka.SentryKafkaProducerInterceptor +import io.sentry.kafka.SentryKafkaProducer import io.sentry.test.initForTest import java.nio.ByteBuffer import java.nio.charset.StandardCharsets @@ -112,7 +112,7 @@ class SentryKafkaRecordInterceptorTest { } enqueuedTime?.let { headers.add( - SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, + SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER, it.toByteArray(StandardCharsets.UTF_8), ) } From 925ab2b91d1df2fb68a50d25cba5b0382778b214 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 27 Apr 2026 13:52:50 +0200 Subject: [PATCH 2/3] fix(kafka): Inject trace headers even without active span Decouple header injection from span creation in SentryKafkaProducer so that distributed tracing works for background workers, @Scheduled jobs, and startup publishers that have no active span. Restructure send() to match the SentryFeignClient/OkHttp pattern: - isIgnored: pure delegate, no headers, no span - No active span: inject headers from PropagationContext, no span - Active span: start child span, inject headers, wrap callback Also simplify the implementation: - Rename injectHeaders to maybeInjectHeaders with encapsulated try/catch (matches Feign's maybeAddTracingHeaders pattern) - Remove outer try/catch around span setup - Remove redundant span.isNoOp() early-return branch - Remove redundant isFinished() guards before finish() calls Co-Authored-By: Claude --- .../io/sentry/kafka/SentryKafkaProducer.java | 100 +++++++----------- .../sentry/kafka/SentryKafkaProducerTest.kt | 49 +++++---- 2 files changed, 69 insertions(+), 80 deletions(-) diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java index 6b1278692e..500e2bc90e 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java @@ -101,43 +101,22 @@ public SentryKafkaProducer( final @Nullable ISpan activeSpan = scopes.getSpan(); if (activeSpan == null || activeSpan.isNoOp()) { + maybeInjectHeaders(record.headers(), null); return delegate.send(record, callback); } - @Nullable ISpan span = null; - try { - final @NotNull SpanOptions spanOptions = new SpanOptions(); - spanOptions.setOrigin(traceOrigin); - span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); - if (span.isNoOp()) { - return delegate.send(record, callback); - } - - span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); - span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); - injectHeaders(record.headers(), span); - } catch (Throwable t) { - if (span != null) { - span.setThrowable(t); - span.setStatus(SpanStatus.INTERNAL_ERROR); - if (!span.isFinished()) { - span.finish(); - } - } - scopes - .getOptions() - .getLogger() - .log(SentryLevel.ERROR, "Failed to instrument Kafka producer record.", t); - return delegate.send(record, callback); - } + final @NotNull SpanOptions spanOptions = new SpanOptions(); + spanOptions.setOrigin(traceOrigin); + final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); - final @NotNull ISpan finalSpan = span; - final @NotNull Callback wrappedCallback = wrapCallback(callback, finalSpan); + span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + maybeInjectHeaders(record.headers(), span); try { - return delegate.send(record, wrappedCallback); + return delegate.send(record, wrapCallback(callback, span)); } catch (Throwable t) { - finishWithError(finalSpan, t); + finishWithError(span, t); throw t; } } @@ -158,9 +137,7 @@ public SentryKafkaProducer( .getLogger() .log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t); } finally { - if (!span.isFinished()) { - span.finish(); - } + span.finish(); if (userCallback != null) { userCallback.onCompletion(metadata, exception); } @@ -171,41 +148,46 @@ public SentryKafkaProducer( private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) { span.setThrowable(t); span.setStatus(SpanStatus.INTERNAL_ERROR); - if (!span.isFinished()) { - span.finish(); - } + span.finish(); } private boolean isIgnored() { return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); } - private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { - final @Nullable List existingBaggageHeaders = - readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); - final @Nullable TracingUtils.TracingHeaders tracingHeaders = - TracingUtils.trace(scopes, existingBaggageHeaders, span); - if (tracingHeaders != null) { - final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); - headers.remove(sentryTraceHeader.getName()); - headers.add( - sentryTraceHeader.getName(), - sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); - - final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); - if (baggageHeader != null) { - headers.remove(baggageHeader.getName()); + private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) { + try { + final @Nullable List existingBaggageHeaders = + readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); + final @Nullable TracingUtils.TracingHeaders tracingHeaders = + TracingUtils.trace(scopes, existingBaggageHeaders, span); + if (tracingHeaders != null) { + final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); + headers.remove(sentryTraceHeader.getName()); headers.add( - baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); + sentryTraceHeader.getName(), + sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); + + final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); + if (baggageHeader != null) { + headers.remove(baggageHeader.getName()); + headers.add( + baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); + } } - } - headers.remove(SENTRY_ENQUEUED_TIME_HEADER); - headers.add( - SENTRY_ENQUEUED_TIME_HEADER, - DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis())) - .toString() - .getBytes(StandardCharsets.UTF_8)); + headers.remove(SENTRY_ENQUEUED_TIME_HEADER); + headers.add( + SENTRY_ENQUEUED_TIME_HEADER, + DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis())) + .toString() + .getBytes(StandardCharsets.UTF_8)); + } catch (Throwable t) { + scopes + .getOptions() + .getLogger() + .log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t); + } } private static @Nullable List readHeaderValues( diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt index aa3135ca12..90a6bb259b 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt @@ -4,6 +4,8 @@ import io.sentry.BaggageHeader import io.sentry.IScopes import io.sentry.ISentryLifecycleToken import io.sentry.ISpan +import io.sentry.Scope +import io.sentry.ScopeCallback import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.SentryTraceHeader @@ -31,10 +33,10 @@ import org.apache.kafka.common.header.Header import org.apache.kafka.common.header.Headers import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.doAnswer import org.mockito.kotlin.eq import org.mockito.kotlin.isNull import org.mockito.kotlin.mock -import org.mockito.kotlin.never import org.mockito.kotlin.verify import org.mockito.kotlin.whenever @@ -58,6 +60,9 @@ class SentryKafkaProducerTest { isEnableQueueTracing = true } whenever(scopes.options).thenReturn(options) + doAnswer { (it.arguments[0] as ScopeCallback).run(Scope(options)) } + .whenever(scopes) + .configureScope(any()) delegate = mock() whenever(delegate.send(any(), any())).thenReturn(CompletableFuture.completedFuture(null)) } @@ -213,7 +218,7 @@ class SentryKafkaProducerTest { } @Test - fun `delegates send without span when no active span`() { + fun `injects headers but creates no span when no active span`() { whenever(scopes.span).thenReturn(null) val producer = SentryKafkaProducer(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") @@ -221,6 +226,10 @@ class SentryKafkaProducerTest { producer.send(record) verify(delegate).send(eq(record), isNull()) + // Headers should still be injected from PropagationContext + assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertNotNull(record.headers().lastHeader(BaggageHeader.BAGGAGE_HEADER)) + assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER)) } @Test @@ -246,12 +255,11 @@ class SentryKafkaProducerTest { } @Test - fun `finishes span with error when header injection fails`() { + fun `header injection failure does not prevent send`() { val activeSpan = mock() val span = mock() val headers = mock() val record = mock>() - val exception = RuntimeException("boom") whenever(scopes.span).thenReturn(activeSpan) whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any())) .thenReturn(span) @@ -263,16 +271,15 @@ class SentryKafkaProducerTest { whenever(record.topic()).thenReturn("my-topic") whenever(record.headers()).thenReturn(headers) whenever(headers.headers(BaggageHeader.BAGGAGE_HEADER)).thenReturn(emptyList

()) - whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)).thenThrow(exception) + whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)) + .thenThrow(RuntimeException("boom")) val producer = SentryKafkaProducer(delegate, scopes) producer.send(record) - verify(span).setStatus(SpanStatus.INTERNAL_ERROR) - verify(span).setThrowable(exception) - verify(span).finish() - // After header-injection failure, falls back to a plain delegate send (no Sentry callback). - verify(delegate).send(eq(record), isNull()) + // Header injection failed silently; send still proceeds with wrapped callback for span + // lifecycle. + verify(delegate).send(eq(record), any()) } @Test @@ -319,20 +326,20 @@ class SentryKafkaProducerTest { } @Test - fun `does not invoke sentry callback wrap when no-op span returned`() { - val activeSpan = mock() - val span = mock() + fun `wraps callback even when child span is no-op`() { + val tx = createTransaction() + // Set max spans to 1 so the child span is no-op (over limit) + options.maxSpans = 0 + val producer = SentryKafkaProducer(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") - whenever(scopes.span).thenReturn(activeSpan) - whenever(activeSpan.isNoOp).thenReturn(false) - whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any())) - .thenReturn(span) - whenever(span.isNoOp).thenReturn(true) - val producer = SentryKafkaProducer(delegate, scopes) producer.send(record) - verify(delegate).send(eq(record), isNull()) - verify(span, never()).finish() + // Callback is still wrapped (no-op span finish is harmless) + verify(delegate).send(eq(record), any()) + // Headers should still be injected from PropagationContext + assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertNotNull(record.headers().lastHeader(BaggageHeader.BAGGAGE_HEADER)) + assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER)) } } From 1e293c66582915ea3b34fce75c02bd2949c38485 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 27 Apr 2026 14:08:04 +0200 Subject: [PATCH 3/3] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68dd4433f7..be35e99df6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ ### Fixes +- Inject Kafka trace headers even without an active span so distributed tracing works for background workers and `@Scheduled` jobs ([#5338](https://github.com/getsentry/sentry-java/pull/5338)) - Write the `sentry-task-enqueued-time` Kafka header as a plain decimal so cross-SDK consumers (e.g. sentry-python) can parse it ([#5328](https://github.com/getsentry/sentry-java/pull/5328)) ## 8.37.1