From b780ce8ad0168b5c5aef0ea3e8a77652b00f97f7 Mon Sep 17 00:00:00 2001 From: bm1549 Date: Tue, 10 Mar 2026 15:53:12 -0400 Subject: [PATCH 1/4] Fix flaky KafkaClientDataStreamsDisabledForkedTest batch consume test The test mapped consumer traces to producer spans by positional index after SORT_TRACES_BY_ID sorting. Since trace IDs are random, the consumer-to-producer mapping was non-deterministic, causing intermittent `span.parentId == parent.spanId` assertion failures. Fix by dynamically finding each consumer span's actual parent producer span via parentId matching instead of relying on sort order. Co-Authored-By: Claude Opus 4.6 --- .../test/groovy/KafkaClientTestBase.groovy | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index cf0ff225f8c..87eb04ed885 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -923,14 +923,17 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { queueSpan(it, trace(0)[2]) } } else { - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[6], 0..0) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[4], 0..1) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[2], 0..1) + // Consumer traces are sorted by random span ID, so we can't assume a fixed + // mapping between consumer trace index and producer span index. Instead, find + // the actual parent producer span for each consumer trace dynamically. + def producerSpans = [trace(0)[2], trace(0)[4], trace(0)[6]] + (1..3).each { traceIdx -> + def consumerSpanParentId = trace(traceIdx)[0].parentId + def parentProducerSpan = producerSpans.find { it.spanId == consumerSpanParentId } + assert parentProducerSpan != null : "Consumer trace $traceIdx has no matching producer span" + trace(1) { + consumerSpan(it, consumerProperties, parentProducerSpan, 0..1) + } } } } From 8d3542c4c91c64fa32fb82b22a627e2e3a42c1c0 Mon Sep 17 00:00:00 2001 From: bm1549 Date: Mon, 16 Mar 2026 23:01:45 -0400 Subject: [PATCH 2/4] Add SEQUENTIAL ID strategy for deterministic flake reproduction Use SEQUENTIAL id.generation.strategy in the DSM-disabled Kafka test to force a deterministic sort order for SORT_TRACES_BY_ID. Sequential IDs sort traces in creation order, which differs from the reverse mapping the original positional code assumed. This proves the dynamic parent lookup fix handles any trace ordering. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/test/groovy/KafkaClientTestBase.groovy | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 87eb04ed885..0f59e532692 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1481,6 +1481,10 @@ class KafkaClientDataStreamsDisabledForkedTest extends KafkaClientTestBase { super.configurePreAgent() injectSysConfig("dd.service", "KafkaClientDataStreamsDisabledForkedTest") injectSysConfig("dd.kafka.legacy.tracing.enabled", "true") + // Deterministic reproduction: SEQUENTIAL ID strategy forces a known sort order + // that differs from the reverse order the original positional code assumed, + // proving the dynamic parent lookup fix handles any ordering. + injectSysConfig("id.generation.strategy", "SEQUENTIAL") } @Override From 45636bc2b50600d42c1710797894a1c2cd1133d3 Mon Sep 17 00:00:00 2001 From: bm1549 Date: Tue, 17 Mar 2026 10:38:06 -0400 Subject: [PATCH 3/4] Replace non-functional SEQUENTIAL config with RANDOM ID reproduction test Remove injectSysConfig("id.generation.strategy", "SEQUENTIAL") which did not actually trigger the flake. Add KafkaClientDsmDisabledRandomIdsForkedTest that overrides idGenerationStrategyName() to "RANDOM", matching production behavior. With RANDOM IDs, SORT_TRACES_BY_ID produces non-deterministic order, causing the original positional consumer-to-producer mapping to fail ~95% of the time. Switch the batch consume test to SORT_TRACES_BY_START so the parent trace (started before any consumer receives messages) is always at index 0. The dynamic parent lookup fix handles any ordering of the 3 consumer traces. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test/groovy/KafkaClientTestBase.groovy | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 0f59e532692..d4f5d66e708 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -898,7 +898,10 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } assert receivedSet.isEmpty() - assertTraces(4, SORT_TRACES_BY_ID) { + // Use SORT_TRACES_BY_START so the parent trace (started first, before any consumer + // receives messages) is always at index 0 regardless of span ID generation strategy. + // The dynamic parent lookup below handles any ordering of the 3 consumer traces. + assertTraces(4, SORT_TRACES_BY_START) { trace(7) { basicSpan(it, "parent") basicSpan(it, "producer callback", span(0)) @@ -923,7 +926,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { queueSpan(it, trace(0)[2]) } } else { - // Consumer traces are sorted by random span ID, so we can't assume a fixed + // Consumer traces are sorted by start time, so we can't assume a fixed // mapping between consumer trace index and producer span index. Instead, find // the actual parent producer span for each consumer trace dynamically. def producerSpans = [trace(0)[2], trace(0)[4], trace(0)[6]] @@ -1481,10 +1484,6 @@ class KafkaClientDataStreamsDisabledForkedTest extends KafkaClientTestBase { super.configurePreAgent() injectSysConfig("dd.service", "KafkaClientDataStreamsDisabledForkedTest") injectSysConfig("dd.kafka.legacy.tracing.enabled", "true") - // Deterministic reproduction: SEQUENTIAL ID strategy forces a known sort order - // that differs from the reverse order the original positional code assumed, - // proving the dynamic parent lookup fix handles any ordering. - injectSysConfig("id.generation.strategy", "SEQUENTIAL") } @Override @@ -1515,3 +1514,24 @@ class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { injectSysConfig(TraceInstrumentationConfig.LEGACY_CONTEXT_MANAGER_ENABLED, "false") } } + +/** + * Reproduces the flake in "test spring kafka template produce and batch consume" + * by using RANDOM IDs (instead of the default SEQUENTIAL used in tests). + * + * Root cause: The test's assertTraces(4, SORT_TRACES_BY_ID) sorts traces by + * localRootSpan.spanId, then hardcodes positional mappings between consumer and + * producer traces. With SEQUENTIAL IDs (the test default), both the producer span + * finish order within trace(0) and the consumer trace sort order are driven by the + * same Kafka internal ordering, so the mapping happens to be consistent. + * + * With RANDOM IDs (as used in production), the sort order becomes non-deterministic. + * There are 3! = 6 possible orderings for the 3 consumer traces, and only 1 matches + * the hardcoded mapping. The dynamic parent lookup fix handles any ordering. + */ +class KafkaClientDsmDisabledRandomIdsForkedTest extends KafkaClientDataStreamsDisabledForkedTest { + @Override + protected String idGenerationStrategyName() { + return "RANDOM" + } +} From f10097968b6995e723a45a1b677c2b923098f547 Mon Sep 17 00:00:00 2001 From: bm1549 Date: Thu, 19 Mar 2026 18:57:52 -0400 Subject: [PATCH 4/4] Fix remaining test methods for RANDOM ID strategy Update 6 more test methods that use SORT_TRACES_BY_ID with hardcoded positional trace references to use SORT_TRACES_BY_START, so they work with the KafkaClientDsmDisabledRandomIdsForkedTest that uses RANDOM span IDs. For the backwards iteration test with 9 traces, also use dynamic parent matching since consumer trace ordering may vary. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../test/groovy/KafkaClientTestBase.groovy | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index d4f5d66e708..bb2e8df0bd9 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -422,7 +422,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { received.value() == greeting received.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + // Use SORT_TRACES_BY_START so the producer trace (started first) is always trace(0) + // regardless of span ID generation strategy. + assertTraces(2, SORT_TRACES_BY_START) { trace(3) { basicSpan(it, "parent") basicSpan(it, "producer callback", span(0)) @@ -542,7 +544,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { received.value() == null received.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + // Use SORT_TRACES_BY_START so the producer trace (started first) is always trace(0) + // regardless of span ID generation strategy. + assertTraces(2, SORT_TRACES_BY_START) { trace(1) { producerSpan(it, senderProps, null, false, true) } @@ -596,7 +600,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { first.value() == greeting first.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + assertTraces(2, SORT_TRACES_BY_START) { trace(1) { producerSpan(it, senderProps) } @@ -651,7 +655,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { first.value() == greeting first.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + assertTraces(2, SORT_TRACES_BY_START) { trace(1) { producerSpan(it, senderProps) } @@ -706,7 +710,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { last.value() == greeting last.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + assertTraces(2, SORT_TRACES_BY_START) { trace(1) { producerSpan(it, senderProps) } @@ -764,7 +768,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } receivedSet.isEmpty() - assertTraces(9, SORT_TRACES_BY_ID) { + // Use SORT_TRACES_BY_START so producer traces (started first) come before consumer traces + // regardless of span ID generation strategy. + assertTraces(9, SORT_TRACES_BY_START) { // producing traces trace(1) { @@ -792,14 +798,17 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { queueSpan(it, trace(2)[0]) } } else { - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[0], 0..0) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(1)[0], 1..1) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(2)[0], 2..2) + // With SORT_TRACES_BY_START, producer traces are at indices 0-2 and consumer + // traces follow. Use dynamic parent matching since start-time ordering within + // the consumer group may vary with different ID generation strategies. + def producerSpans = [trace(0)[0], trace(1)[0], trace(2)[0]] + (3..5).each { traceIdx -> + def consumerSpanParentId = trace(traceIdx)[0].parentId + def parentProducerSpan = producerSpans.find { it.spanId == consumerSpanParentId } + assert parentProducerSpan != null : "Forward consumer trace $traceIdx has no matching producer span" + trace(1) { + consumerSpan(it, consumerProperties, parentProducerSpan) + } } } @@ -818,14 +827,15 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { queueSpan(it, trace(0)[0]) } } else { - trace(1) { - consumerSpan(it, consumerProperties, trace(2)[0], 2..2) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(1)[0], 1..1) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[0], 0..0) + // Same dynamic parent matching for backward iteration consumer traces + def producerSpans2 = [trace(0)[0], trace(1)[0], trace(2)[0]] + (6..8).each { traceIdx -> + def consumerSpanParentId = trace(traceIdx)[0].parentId + def parentProducerSpan = producerSpans2.find { it.spanId == consumerSpanParentId } + assert parentProducerSpan != null : "Backward consumer trace $traceIdx has no matching producer span" + trace(1) { + consumerSpan(it, consumerProperties, parentProducerSpan) + } } } }