From e4b04a6a25ca14a322581b830b70442828c4fb5b Mon Sep 17 00:00:00 2001 From: Adrian Chong Date: Fri, 8 May 2026 14:40:44 -0400 Subject: [PATCH] Fix W3C baggage header not propagated into Kafka producer record headers Pass Context.current() instead of span to defaultPropagator().inject() in Kafka producer instrumentation so BaggagePropagator can access baggage stored on the thread-local context rather than falling back to Context.root() via AgentSpan.get(). Uses a helper method (Utils.currentContext()) to avoid VerifyError from static interface method calls in inlined ByteBuddy advice. Fixes #11286 tag: ai generated --- .../kafka_clients/KafkaProducerInstrumentation.java | 5 +++-- .../kafka_clients38/ProducerContextPropagationAdvice.java | 5 +++-- .../datadog/trace/instrumentation/kafka_common/Utils.java | 6 ++++++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 1887e0a5205..40aecbdac09 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -46,6 +46,7 @@ import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper; import datadog.trace.instrumentation.kafka_common.MetadataState; +import datadog.trace.instrumentation.kafka_common.Utils; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.matcher.ElementMatcher; @@ -219,7 +220,7 @@ public static void onEnter( } DataStreamsTags tags = createWithClusterId("kafka", OUTBOUND, record.topic(), clusterId); try { - defaultPropagator().inject(span, record.headers(), setter); + defaultPropagator().inject(Utils.currentContext(), record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) || STREAMING_CONTEXT.isSinkTopic(record.topic())) { // inject the context in the headers, but delay sending the stats until we know the @@ -241,7 +242,7 @@ record = record.value(), record.headers()); - defaultPropagator().inject(span, record.headers(), setter); + defaultPropagator().inject(Utils.currentContext(), record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) || STREAMING_CONTEXT.isSinkTopic(record.topic())) { Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerContextPropagationAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerContextPropagationAdvice.java index 0ca9823d92e..3175dd7c397 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerContextPropagationAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerContextPropagationAdvice.java @@ -19,6 +19,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.instrumentation.kafka_common.MetadataState; +import datadog.trace.instrumentation.kafka_common.Utils; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.producer.ProducerRecord; @@ -43,7 +44,7 @@ public static void onEnter( } DataStreamsTags tags = create("kafka", OUTBOUND, record.topic(), null, clusterId); try { - defaultPropagator().inject(span, record.headers(), setter); + defaultPropagator().inject(Utils.currentContext(), record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) || STREAMING_CONTEXT.isSinkTopic(record.topic())) { // inject the context in the headers, but delay sending the stats until we know the @@ -65,7 +66,7 @@ record = record.value(), record.headers()); - defaultPropagator().inject(span, record.headers(), setter); + defaultPropagator().inject(Utils.currentContext(), record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) || STREAMING_CONTEXT.isSinkTopic(record.topic())) { Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java index d0f7eb4fcea..58a8cefe82a 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.kafka_common; +import datadog.context.Context; import datadog.trace.api.datastreams.DataStreamsTransactionTracker; import java.nio.charset.StandardCharsets; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -9,6 +10,11 @@ public final class Utils { private Utils() {} // prevent instantiation + /** Returns the current context from the thread-local context store. */ + public static Context currentContext() { + return Context.current(); + } + public static DataStreamsTransactionTracker.TransactionSourceReader DSM_TRANSACTION_SOURCE_READER = (source, headerName) -> {