Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
received.value() == greeting
received.key() == null

assertTraces(2, SORT_TRACES_BY_ID) {
// Use SORT_TRACES_BY_START so the producer trace (started first) is always trace(0)
// regardless of span ID generation strategy.
assertTraces(2, SORT_TRACES_BY_START) {
trace(3) {
basicSpan(it, "parent")
basicSpan(it, "producer callback", span(0))
Expand Down Expand Up @@ -542,7 +544,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
received.value() == null
received.key() == null

assertTraces(2, SORT_TRACES_BY_ID) {
// Use SORT_TRACES_BY_START so the producer trace (started first) is always trace(0)
// regardless of span ID generation strategy.
assertTraces(2, SORT_TRACES_BY_START) {
trace(1) {
producerSpan(it, senderProps, null, false, true)
}
Expand Down Expand Up @@ -596,7 +600,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
first.value() == greeting
first.key() == null

assertTraces(2, SORT_TRACES_BY_ID) {
assertTraces(2, SORT_TRACES_BY_START) {
trace(1) {
producerSpan(it, senderProps)
}
Expand Down Expand Up @@ -651,7 +655,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
first.value() == greeting
first.key() == null

assertTraces(2, SORT_TRACES_BY_ID) {
assertTraces(2, SORT_TRACES_BY_START) {
trace(1) {
producerSpan(it, senderProps)
}
Expand Down Expand Up @@ -706,7 +710,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
last.value() == greeting
last.key() == null

assertTraces(2, SORT_TRACES_BY_ID) {
assertTraces(2, SORT_TRACES_BY_START) {
trace(1) {
producerSpan(it, senderProps)
}
Expand Down Expand Up @@ -764,7 +768,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
}
receivedSet.isEmpty()

assertTraces(9, SORT_TRACES_BY_ID) {
// Use SORT_TRACES_BY_START so producer traces (started first) come before consumer traces
// regardless of span ID generation strategy.
assertTraces(9, SORT_TRACES_BY_START) {

// producing traces
trace(1) {
Expand Down Expand Up @@ -792,14 +798,17 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
queueSpan(it, trace(2)[0])
}
} else {
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[0], 0..0)
}
trace(1) {
consumerSpan(it, consumerProperties, trace(1)[0], 1..1)
}
trace(1) {
consumerSpan(it, consumerProperties, trace(2)[0], 2..2)
// With SORT_TRACES_BY_START, producer traces are at indices 0-2 and consumer
// traces follow. Use dynamic parent matching since start-time ordering within
// the consumer group may vary with different ID generation strategies.
def producerSpans = [trace(0)[0], trace(1)[0], trace(2)[0]]
(3..5).each { traceIdx ->
def consumerSpanParentId = trace(traceIdx)[0].parentId
def parentProducerSpan = producerSpans.find { it.spanId == consumerSpanParentId }
assert parentProducerSpan != null : "Forward consumer trace $traceIdx has no matching producer span"
trace(1) {
consumerSpan(it, consumerProperties, parentProducerSpan)
}
}
}

Expand All @@ -818,14 +827,15 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
queueSpan(it, trace(0)[0])
}
} else {
trace(1) {
consumerSpan(it, consumerProperties, trace(2)[0], 2..2)
}
trace(1) {
consumerSpan(it, consumerProperties, trace(1)[0], 1..1)
}
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[0], 0..0)
// Same dynamic parent matching for backward iteration consumer traces
def producerSpans2 = [trace(0)[0], trace(1)[0], trace(2)[0]]
(6..8).each { traceIdx ->
def consumerSpanParentId = trace(traceIdx)[0].parentId
def parentProducerSpan = producerSpans2.find { it.spanId == consumerSpanParentId }
assert parentProducerSpan != null : "Backward consumer trace $traceIdx has no matching producer span"
trace(1) {
consumerSpan(it, consumerProperties, parentProducerSpan)
}
}
}
}
Expand Down Expand Up @@ -898,7 +908,10 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
}
assert receivedSet.isEmpty()

assertTraces(4, SORT_TRACES_BY_ID) {
// Use SORT_TRACES_BY_START so the parent trace (started first, before any consumer
// receives messages) is always at index 0 regardless of span ID generation strategy.
// The dynamic parent lookup below handles any ordering of the 3 consumer traces.
assertTraces(4, SORT_TRACES_BY_START) {
trace(7) {
basicSpan(it, "parent")
basicSpan(it, "producer callback", span(0))
Expand All @@ -923,14 +936,17 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
queueSpan(it, trace(0)[2])
}
} else {
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[6], 0..0)
}
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[4], 0..1)
}
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[2], 0..1)
// Consumer traces are sorted by start time, so we can't assume a fixed
// mapping between consumer trace index and producer span index. Instead, find
// the actual parent producer span for each consumer trace dynamically.
def producerSpans = [trace(0)[2], trace(0)[4], trace(0)[6]]
(1..3).each { traceIdx ->
def consumerSpanParentId = trace(traceIdx)[0].parentId
def parentProducerSpan = producerSpans.find { it.spanId == consumerSpanParentId }
assert parentProducerSpan != null : "Consumer trace $traceIdx has no matching producer span"
trace(1) {
consumerSpan(it, consumerProperties, parentProducerSpan, 0..1)
}
}
}
}
Expand Down Expand Up @@ -1508,3 +1524,24 @@ class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest {
injectSysConfig(TraceInstrumentationConfig.LEGACY_CONTEXT_MANAGER_ENABLED, "false")
}
}

/**
* Reproduces the flake in "test spring kafka template produce and batch consume"
* by using RANDOM IDs (instead of the default SEQUENTIAL used in tests).
*
* Root cause: The test's assertTraces(4, SORT_TRACES_BY_ID) sorts traces by
* localRootSpan.spanId, then hardcodes positional mappings between consumer and
* producer traces. With SEQUENTIAL IDs (the test default), both the producer span
* finish order within trace(0) and the consumer trace sort order are driven by the
* same Kafka internal ordering, so the mapping happens to be consistent.
*
* With RANDOM IDs (as used in production), the sort order becomes non-deterministic.
* There are 3! = 6 possible orderings for the 3 consumer traces, and only 1 matches
* the hardcoded mapping. The dynamic parent lookup fix handles any ordering.
*/
class KafkaClientDsmDisabledRandomIdsForkedTest extends KafkaClientDataStreamsDisabledForkedTest {
@Override
protected String idGenerationStrategyName() {
return "RANDOM"
}
}
Comment on lines +1527 to +1547
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Reproduces the flake in "test spring kafka template produce and batch consume"
* by using RANDOM IDs (instead of the default SEQUENTIAL used in tests).
*
* Root cause: The test's assertTraces(4, SORT_TRACES_BY_ID) sorts traces by
* localRootSpan.spanId, then hardcodes positional mappings between consumer and
* producer traces. With SEQUENTIAL IDs (the test default), both the producer span
* finish order within trace(0) and the consumer trace sort order are driven by the
* same Kafka internal ordering, so the mapping happens to be consistent.
*
* With RANDOM IDs (as used in production), the sort order becomes non-deterministic.
* There are 3! = 6 possible orderings for the 3 consumer traces, and only 1 matches
* the hardcoded mapping. The dynamic parent lookup fix handles any ordering.
*/
class KafkaClientDsmDisabledRandomIdsForkedTest extends KafkaClientDataStreamsDisabledForkedTest {
@Override
protected String idGenerationStrategyName() {
return "RANDOM"
}
}

I'd suggest leaving it in since it provides a more representative test case, but I'm happy to remove it if folks think otherwise

Loading