diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 1ecae856a4cf..c7bd2aa15ce6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -11,6 +11,7 @@ ### Bugs Fixed - Fixed `ServiceBusProcessorClient.close()` disposing the receiver before in-flight message handlers could complete settlement, causing `IllegalStateException`. The processor now drains active handlers before closing. ([#45716](https://github.com/Azure/azure-sdk-for-java/issues/45716)) +- Fixed the first call to `ServiceBusSenderClient.sendMessage()` (and `ServiceBusSenderAsyncClient.sendMessage()`) not recognizing the caller's current OpenTelemetry trace context, causing the `ServiceBus.send` span and the outgoing message's `traceparent` to start a new, disconnected trace. The send span is now started on the calling thread before the first send establishes the AMQP connection on a background thread. ([#44958](https://github.com/Azure/azure-sdk-for-java/issues/44958)) - Fixed `ServiceBusMessageBatch` accepting messages beyond the service-enforced batch size limit on Premium large-message entities by reading the `com.microsoft:max-message-batch-size` vendor property from the AMQP sender link instead of using `max-message-size`. ([#48214](https://github.com/Azure/azure-sdk-for-java/pull/48214)) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index e8b488042f81..4b9ece605f07 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -321,7 +321,7 @@ public Mono sendMessage(ServiceBusMessage message) { if (Objects.isNull(message)) { return monoError(logger, new NullPointerException("'message' cannot be null.")); } - return sendFluxInternal(Flux.just(message), null); + return sendFluxInternal(message, null); } /** @@ -346,7 +346,7 @@ public Mono sendMessage(ServiceBusMessage message, ServiceBusTransactionCo return monoError(logger, new NullPointerException("'transactionContext.transactionId' cannot be null.")); } - return sendFluxInternal(Flux.just(message), transactionContext); + return sendFluxInternal(message, transactionContext); } /** @@ -843,6 +843,11 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate */ private Mono sendBatchInternal(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext) { + return sendBatchInternal(batch, transactionContext, true); + } + + private Mono sendBatchInternal(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext, + boolean instrument) { if (isDisposed.get()) { return monoError(logger, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessages"))); @@ -884,16 +889,22 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, final String message = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); final Mono withRetry = withRetry(sendMessage, retryOptions, message).onErrorMap(this::mapError); + // The single-message send path (sendFluxInternal) starts the "ServiceBus.send" span itself on the + // subscribing thread, so it sends this batch with instrument=false to avoid a duplicate span. + if (!instrument) { + return withRetry; + } return instrumentation.instrumentSendBatch("ServiceBus.send", withRetry, batch.getMessages()); } - private Mono sendFluxInternal(Flux messages, - ServiceBusTransactionContext transactionContext) { + private Mono sendFluxInternal(ServiceBusMessage message, ServiceBusTransactionContext transactionContext) { if (isDisposed.get()) { return monoError(logger, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessage"))); } + final List messageList = Collections.singletonList(message); + // Uses getLinkSize() intentionally — this path is for single-message sends only (sendMessage()). // Single messages should use the full link capacity (up to 100 MB on Premium large-message), // not the batch-size cap. The batch path (sendMessages(Iterable), scheduleMessages(Iterable)) @@ -903,14 +914,27 @@ private Mono sendFluxInternal(Flux messages, final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; final CreateMessageBatchOptions batchOptions = new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize); - return messages.collect( - new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, messageSerializer)); + return Flux.fromIterable(messageList) + .collect(new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, + messageSerializer)); })); - return batchList.flatMap(list -> Flux.fromIterable(list) - .flatMap(batch -> sendBatchInternal(batch, transactionContext)) + // The raw send pipeline, without span instrumentation. Instrumentation is applied as the + // outermost operator below so the span is created on the subscribing thread (instrument=false here). + final Mono send = batchList.flatMap(list -> Flux.fromIterable(list) + .flatMap(batch -> sendBatchInternal(batch, transactionContext, false)) .then() .doOnError(error -> logger.error("Error sending batch.", error))).onErrorMap(this::mapError); + + // Start the producer message span and the "ServiceBus.send" span on the subscribing (caller) thread, + // BEFORE the first send triggers AMQP connection/link establishment on a background thread. Starting + // the spans here ensures the caller's ambient (thread-local) trace context is used as the parent. + // Doing this lazily downstream (after the connection thread hop) is what caused the first send to get + // a new, disconnected trace id. See https://github.com/Azure/azure-sdk-for-java/issues/44958. + return Mono.defer(() -> { + tracer.reportMessageSpan(message); + return instrumentation.instrumentSendBatch("ServiceBus.send", send, messageList); + }); } private Mono getSendLink(String callSite) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java index 408afbb6974d..5107ecdcfcdc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java @@ -16,7 +16,9 @@ import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.trace.IdGenerator; import io.opentelemetry.sdk.trace.ReadWriteSpan; @@ -141,6 +143,47 @@ public void sendAndReceive() throws InterruptedException { assertSettledVsProcessed(completed, processed, messages.size()); } + @Test + @SuppressWarnings("try") + public void sendMessageHasParentSpanOnFirstCall() { + // Regression test for https://github.com/Azure/azure-sdk-for-java/issues/44958 + // The FIRST sendMessage() on a freshly built sender establishes the AMQP connection on a + // background thread. Ensure the "ServiceBus.send" and "ServiceBus.message" spans still inherit + // the caller's ambient (thread-local) trace context as the parent, rather than starting a new trace. + // Create the caller span from a SEPARATE OpenTelemetry instance so it is not validated by the + // Service-Bus-specific TestSpanProcessor (which requires az.namespace/messaging attributes on every + // span it sees). It is still made current on this thread, so the SDK picks it up as the parent via + // io.opentelemetry.context.Context.current(). + Tracer testTracer = OpenTelemetrySdk.builder().build().getTracer("test"); + Span callerSpan = testTracer.spanBuilder("caller").startSpan(); + String expectedTraceId = callerSpan.getSpanContext().getTraceId(); + + ServiceBusMessage message = new ServiceBusMessage(CONTENTS_BYTES); + try (Scope scope = callerSpan.makeCurrent()) { + StepVerifier.create(sender.sendMessage(message)).expectComplete().verify(TIMEOUT); + } finally { + callerSpan.end(); + } + + List spans = spanProcessor.getEndedSpans(); + + // Exactly one send span and one message span should be produced for a single sendMessage() call. + // An exact-count assertion guards against accidental double-instrumentation regressions. + List send = findSpans(spans, "ServiceBus.send"); + assertEquals(1, send.size()); + assertEquals(expectedTraceId, send.get(0).getSpanContext().getTraceId()); + assertEquals(expectedTraceId, send.get(0).getParentSpanContext().getTraceId()); + + List messageSpans = findSpans(spans, "ServiceBus.message"); + assertEquals(1, messageSpans.size()); + assertMessageSpan(messageSpans.get(0), message); + assertEquals(expectedTraceId, messageSpans.get(0).getSpanContext().getTraceId()); + + // the traceparent injected into the outgoing message carries the caller's trace id + String traceparent = (String) message.getApplicationProperties().get("traceparent"); + assertTrue(traceparent.startsWith("00-" + expectedTraceId)); + } + @Test public void receiveAndRenewLockWithDuration() throws InterruptedException { ServiceBusMessage message = new ServiceBusMessage(CONTENTS_BYTES);