diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 1887e0a5205..40aecbdac09 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -46,6 +46,7 @@ import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper; import datadog.trace.instrumentation.kafka_common.MetadataState; +import datadog.trace.instrumentation.kafka_common.Utils; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.matcher.ElementMatcher; @@ -219,7 +220,7 @@ public static void onEnter( } DataStreamsTags tags = createWithClusterId("kafka", OUTBOUND, record.topic(), clusterId); try { - defaultPropagator().inject(span, record.headers(), setter); + defaultPropagator().inject(Utils.currentContext(), record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) || STREAMING_CONTEXT.isSinkTopic(record.topic())) { // inject the context in the headers, but delay sending the stats until we know the @@ -241,7 +242,7 @@ record = record.value(), record.headers()); - defaultPropagator().inject(span, record.headers(), setter); + defaultPropagator().inject(Utils.currentContext(), record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) || STREAMING_CONTEXT.isSinkTopic(record.topic())) { Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerContextPropagationAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerContextPropagationAdvice.java index 0ca9823d92e..3175dd7c397 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerContextPropagationAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerContextPropagationAdvice.java @@ -19,6 +19,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.instrumentation.kafka_common.MetadataState; +import datadog.trace.instrumentation.kafka_common.Utils; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.producer.ProducerRecord; @@ -43,7 +44,7 @@ public static void onEnter( } DataStreamsTags tags = create("kafka", OUTBOUND, record.topic(), null, clusterId); try { - defaultPropagator().inject(span, record.headers(), setter); + defaultPropagator().inject(Utils.currentContext(), record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) || STREAMING_CONTEXT.isSinkTopic(record.topic())) { // inject the context in the headers, but delay sending the stats until we know the @@ -65,7 +66,7 @@ record = record.value(), record.headers()); - defaultPropagator().inject(span, record.headers(), setter); + defaultPropagator().inject(Utils.currentContext(), record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) || STREAMING_CONTEXT.isSinkTopic(record.topic())) { Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java index d0f7eb4fcea..58a8cefe82a 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/Utils.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.kafka_common; +import datadog.context.Context; import datadog.trace.api.datastreams.DataStreamsTransactionTracker; import java.nio.charset.StandardCharsets; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -9,6 +10,11 @@ public final class Utils { private Utils() {} // prevent instantiation + /** Returns the current context from the thread-local context store. */ + public static Context currentContext() { + return Context.current(); + } + public static DataStreamsTransactionTracker.TransactionSourceReader DSM_TRANSACTION_SOURCE_READER = (source, headerName) -> {