diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index cf0ff225f8c..bb2e8df0bd9 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -422,7 +422,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { received.value() == greeting received.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + // Use SORT_TRACES_BY_START so the producer trace (started first) is always trace(0) + // regardless of span ID generation strategy. + assertTraces(2, SORT_TRACES_BY_START) { trace(3) { basicSpan(it, "parent") basicSpan(it, "producer callback", span(0)) @@ -542,7 +544,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { received.value() == null received.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + // Use SORT_TRACES_BY_START so the producer trace (started first) is always trace(0) + // regardless of span ID generation strategy. + assertTraces(2, SORT_TRACES_BY_START) { trace(1) { producerSpan(it, senderProps, null, false, true) } @@ -596,7 +600,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { first.value() == greeting first.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + assertTraces(2, SORT_TRACES_BY_START) { trace(1) { producerSpan(it, senderProps) } @@ -651,7 +655,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { first.value() == greeting first.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + assertTraces(2, SORT_TRACES_BY_START) { trace(1) { producerSpan(it, senderProps) } @@ -706,7 +710,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { last.value() == greeting last.key() == null - assertTraces(2, SORT_TRACES_BY_ID) { + assertTraces(2, SORT_TRACES_BY_START) { trace(1) { producerSpan(it, senderProps) } @@ -764,7 +768,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } receivedSet.isEmpty() - assertTraces(9, SORT_TRACES_BY_ID) { + // Use SORT_TRACES_BY_START so producer traces (started first) come before consumer traces + // regardless of span ID generation strategy. + assertTraces(9, SORT_TRACES_BY_START) { // producing traces trace(1) { @@ -792,14 +798,17 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { queueSpan(it, trace(2)[0]) } } else { - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[0], 0..0) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(1)[0], 1..1) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(2)[0], 2..2) + // With SORT_TRACES_BY_START, producer traces are at indices 0-2 and consumer + // traces follow. Use dynamic parent matching since start-time ordering within + // the consumer group may vary with different ID generation strategies. + def producerSpans = [trace(0)[0], trace(1)[0], trace(2)[0]] + (3..5).each { traceIdx -> + def consumerSpanParentId = trace(traceIdx)[0].parentId + def parentProducerSpan = producerSpans.find { it.spanId == consumerSpanParentId } + assert parentProducerSpan != null : "Forward consumer trace $traceIdx has no matching producer span" + trace(1) { + consumerSpan(it, consumerProperties, parentProducerSpan) + } } } @@ -818,14 +827,15 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { queueSpan(it, trace(0)[0]) } } else { - trace(1) { - consumerSpan(it, consumerProperties, trace(2)[0], 2..2) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(1)[0], 1..1) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[0], 0..0) + // Same dynamic parent matching for backward iteration consumer traces + def producerSpans2 = [trace(0)[0], trace(1)[0], trace(2)[0]] + (6..8).each { traceIdx -> + def consumerSpanParentId = trace(traceIdx)[0].parentId + def parentProducerSpan = producerSpans2.find { it.spanId == consumerSpanParentId } + assert parentProducerSpan != null : "Backward consumer trace $traceIdx has no matching producer span" + trace(1) { + consumerSpan(it, consumerProperties, parentProducerSpan) + } } } } @@ -898,7 +908,10 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } assert receivedSet.isEmpty() - assertTraces(4, SORT_TRACES_BY_ID) { + // Use SORT_TRACES_BY_START so the parent trace (started first, before any consumer + // receives messages) is always at index 0 regardless of span ID generation strategy. + // The dynamic parent lookup below handles any ordering of the 3 consumer traces. + assertTraces(4, SORT_TRACES_BY_START) { trace(7) { basicSpan(it, "parent") basicSpan(it, "producer callback", span(0)) @@ -923,14 +936,17 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { queueSpan(it, trace(0)[2]) } } else { - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[6], 0..0) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[4], 0..1) - } - trace(1) { - consumerSpan(it, consumerProperties, trace(0)[2], 0..1) + // Consumer traces are sorted by start time, so we can't assume a fixed + // mapping between consumer trace index and producer span index. Instead, find + // the actual parent producer span for each consumer trace dynamically. + def producerSpans = [trace(0)[2], trace(0)[4], trace(0)[6]] + (1..3).each { traceIdx -> + def consumerSpanParentId = trace(traceIdx)[0].parentId + def parentProducerSpan = producerSpans.find { it.spanId == consumerSpanParentId } + assert parentProducerSpan != null : "Consumer trace $traceIdx has no matching producer span" + trace(1) { + consumerSpan(it, consumerProperties, parentProducerSpan, 0..1) + } } } } @@ -1508,3 +1524,24 @@ class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { injectSysConfig(TraceInstrumentationConfig.LEGACY_CONTEXT_MANAGER_ENABLED, "false") } } + +/** + * Reproduces the flake in "test spring kafka template produce and batch consume" + * by using RANDOM IDs (instead of the default SEQUENTIAL used in tests). + * + * Root cause: The test's assertTraces(4, SORT_TRACES_BY_ID) sorts traces by + * localRootSpan.spanId, then hardcodes positional mappings between consumer and + * producer traces. With SEQUENTIAL IDs (the test default), both the producer span + * finish order within trace(0) and the consumer trace sort order are driven by the + * same Kafka internal ordering, so the mapping happens to be consistent. + * + * With RANDOM IDs (as used in production), the sort order becomes non-deterministic. + * There are 3! = 6 possible orderings for the 3 consumer traces, and only 1 matches + * the hardcoded mapping. The dynamic parent lookup fix handles any ordering. + */ +class KafkaClientDsmDisabledRandomIdsForkedTest extends KafkaClientDataStreamsDisabledForkedTest { + @Override + protected String idGenerationStrategyName() { + return "RANDOM" + } +}