From 18a2a366a25d0a0f75e8b4baa96af03f274f756e Mon Sep 17 00:00:00 2001 From: ksalazar-91 Date: Mon, 22 Jun 2026 19:32:34 -0700 Subject: [PATCH 1/2] [Service Bus] Fix trace context not propagated on first sendMessage() (#44958) The first call to ServiceBusSenderClient.sendMessage() (and the async client) did not recognize the caller's current OpenTelemetry trace context: the ServiceBus.send span and the outgoing message's traceparent started a new, disconnected trace. This happened because the span was started lazily downstream of the first AMQP connection/link establishment, which runs on a background thread where the caller's thread-local context is not available. The single-message send path now starts the producer message span and the ServiceBus.send span on the subscribing (caller) thread, before the connection thread hop, mirroring the structure already used by the batch send path and Event Hubs. A non-instrumenting overload of sendBatchInternal avoids a duplicate span. Adds a live regression test (sendMessageHasParentSpanOnFirstCall) and a CHANGELOG entry. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azure-messaging-servicebus/CHANGELOG.md | 1 + .../ServiceBusSenderAsyncClient.java | 40 +++++++++++++++---- .../servicebus/TracingIntegrationTests.java | 39 ++++++++++++++++++ 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 2a0fbd19f431..c1ad5ec7ff43 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -11,6 +11,7 @@ ### Bugs Fixed - Fixed `ServiceBusProcessorClient.close()` disposing the receiver before in-flight message handlers could complete settlement, causing `IllegalStateException`. The processor now drains active handlers before closing. ([#45716](https://github.com/Azure/azure-sdk-for-java/issues/45716)) +- Fixed the first call to `ServiceBusSenderClient.sendMessage()` (and `ServiceBusSenderAsyncClient.sendMessage()`) not recognizing the caller's current OpenTelemetry trace context, causing the `ServiceBus.send` span and the outgoing message's `traceparent` to start a new, disconnected trace. The send span is now started on the calling thread before the first send establishes the AMQP connection on a background thread. ([#44958](https://github.com/Azure/azure-sdk-for-java/issues/44958)) ### Other Changes diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index e8b488042f81..4b9ece605f07 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -321,7 +321,7 @@ public Mono sendMessage(ServiceBusMessage message) { if (Objects.isNull(message)) { return monoError(logger, new NullPointerException("'message' cannot be null.")); } - return sendFluxInternal(Flux.just(message), null); + return sendFluxInternal(message, null); } /** @@ -346,7 +346,7 @@ public Mono sendMessage(ServiceBusMessage message, ServiceBusTransactionCo return monoError(logger, new NullPointerException("'transactionContext.transactionId' cannot be null.")); } - return sendFluxInternal(Flux.just(message), transactionContext); + return sendFluxInternal(message, transactionContext); } /** @@ -843,6 +843,11 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate */ private Mono sendBatchInternal(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) { + return sendBatchInternal(batch, transactionContext, true); + } + + private Mono sendBatchInternal(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext, + boolean instrument) { if (isDisposed.get()) { return monoError(logger, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessages"))); @@ -884,16 +889,22 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, final String message = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); final Mono withRetry = withRetry(sendMessage, retryOptions, message).onErrorMap(this::mapError); + // The single-message send path (sendFluxInternal) starts the "ServiceBus.send" span itself on the + // subscribing thread, so it sends this batch with instrument=false to avoid a duplicate span. + if (!instrument) { + return withRetry; + } return instrumentation.instrumentSendBatch("ServiceBus.send", withRetry, batch.getMessages()); } - private Mono sendFluxInternal(Flux messages, - ServiceBusTransactionContext transactionContext) { + private Mono sendFluxInternal(ServiceBusMessage message, ServiceBusTransactionContext transactionContext) { if (isDisposed.get()) { return monoError(logger, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessage"))); } + final List messageList = Collections.singletonList(message); + // Uses getLinkSize() intentionally — this path is for single-message sends only (sendMessage()). // Single messages should use the full link capacity (up to 100 MB on Premium large-message), // not the batch-size cap. The batch path (sendMessages(Iterable), scheduleMessages(Iterable)) @@ -903,14 +914,27 @@ private Mono sendFluxInternal(Flux messages, final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; final CreateMessageBatchOptions batchOptions = new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize); - return messages.collect( - new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, messageSerializer)); + return Flux.fromIterable(messageList) + .collect(new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, + messageSerializer)); })); - return batchList.flatMap(list -> Flux.fromIterable(list) - .flatMap(batch -> sendBatchInternal(batch, transactionContext)) + // The raw send pipeline, without span instrumentation. Instrumentation is applied as the + // outermost operator below so the span is created on the subscribing thread (instrument=false here). + final Mono send = batchList.flatMap(list -> Flux.fromIterable(list) + .flatMap(batch -> sendBatchInternal(batch, transactionContext, false)) .then() .doOnError(error -> logger.error("Error sending batch.", error))).onErrorMap(this::mapError); + + // Start the producer message span and the "ServiceBus.send" span on the subscribing (caller) thread, + // BEFORE the first send triggers AMQP connection/link establishment on a background thread. Starting + // the spans here ensures the caller's ambient (thread-local) trace context is used as the parent. + // Doing this lazily downstream (after the connection thread hop) is what caused the first send to get + // a new, disconnected trace id. See https://github.com/Azure/azure-sdk-for-java/issues/44958. + return Mono.defer(() -> { + tracer.reportMessageSpan(message); + return instrumentation.instrumentSendBatch("ServiceBus.send", send, messageList); + }); } private Mono getSendLink(String callSite) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java index 408afbb6974d..a79dc70da272 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java @@ -16,7 +16,9 @@ import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.trace.IdGenerator; import io.opentelemetry.sdk.trace.ReadWriteSpan; @@ -141,6 +143,43 @@ public void sendAndReceive() throws InterruptedException { assertSettledVsProcessed(completed, processed, messages.size()); } + @Test + @SuppressWarnings("try") + public void sendMessageHasParentSpanOnFirstCall() { + // Regression test for https://github.com/Azure/azure-sdk-for-java/issues/44958 + // The FIRST sendMessage() on a freshly built sender establishes the AMQP connection on a + // background thread. Ensure the "ServiceBus.send" and "ServiceBus.message" spans still inherit + // the caller's ambient (thread-local) trace context as the parent, rather than starting a new trace. + // Create the caller span from a SEPARATE OpenTelemetry instance so it is not validated by the + // Service-Bus-specific TestSpanProcessor (which requires az.namespace/messaging attributes on every + // span it sees). It is still made current on this thread, so the SDK picks it up as the parent via + // io.opentelemetry.context.Context.current(). + Tracer testTracer = OpenTelemetrySdk.builder().build().getTracer("test"); + Span callerSpan = testTracer.spanBuilder("caller").startSpan(); + String expectedTraceId = callerSpan.getSpanContext().getTraceId(); + + ServiceBusMessage message = new ServiceBusMessage(CONTENTS_BYTES); + try (Scope scope = callerSpan.makeCurrent()) { + StepVerifier.create(sender.sendMessage(message)).expectComplete().verify(TIMEOUT); + } finally { + callerSpan.end(); + } + + List spans = spanProcessor.getEndedSpans(); + + List send = findSpans(spans, "ServiceBus.send"); + assertEquals(expectedTraceId, send.get(0).getSpanContext().getTraceId()); + assertEquals(expectedTraceId, send.get(0).getParentSpanContext().getTraceId()); + + List messageSpans = findSpans(spans, "ServiceBus.message"); + assertMessageSpan(messageSpans.get(0), message); + assertEquals(expectedTraceId, messageSpans.get(0).getSpanContext().getTraceId()); + + // the traceparent injected into the outgoing message carries the caller's trace id + String traceparent = (String) message.getApplicationProperties().get("traceparent"); + assertTrue(traceparent.startsWith("00-" + expectedTraceId)); + } + @Test public void receiveAndRenewLockWithDuration() throws InterruptedException { ServiceBusMessage message = new ServiceBusMessage(CONTENTS_BYTES); From 3f5fdd778c3e67d157fe2a17aaec33aa1d9032c9 Mon Sep 17 00:00:00 2001 From: ksalazar-91 Date: Mon, 22 Jun 2026 20:12:36 -0700 Subject: [PATCH 2/2] [Service Bus] Strengthen first-send tracing regression test Add exact-count assertions (one ServiceBus.send and one ServiceBus.message span per single sendMessage()) so an accidental double-instrumentation regression is caught, per PR review feedback. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azure/messaging/servicebus/TracingIntegrationTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java index a79dc70da272..5107ecdcfcdc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java @@ -167,11 +167,15 @@ public void sendMessageHasParentSpanOnFirstCall() { List spans = spanProcessor.getEndedSpans(); + // Exactly one send span and one message span should be produced for a single sendMessage() call. + // An exact-count assertion guards against accidental double-instrumentation regressions. List send = findSpans(spans, "ServiceBus.send"); + assertEquals(1, send.size()); assertEquals(expectedTraceId, send.get(0).getSpanContext().getTraceId()); assertEquals(expectedTraceId, send.get(0).getParentSpanContext().getTraceId()); List messageSpans = findSpans(spans, "ServiceBus.message"); + assertEquals(1, messageSpans.size()); assertMessageSpan(messageSpans.get(0), message); assertEquals(expectedTraceId, messageSpans.get(0).getSpanContext().getTraceId());