Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
### Bugs Fixed

- Fixed `ServiceBusProcessorClient.close()` disposing the receiver before in-flight message handlers could complete settlement, causing `IllegalStateException`. The processor now drains active handlers before closing. ([#45716](https://github.com/Azure/azure-sdk-for-java/issues/45716))
- Fixed the first call to `ServiceBusSenderClient.sendMessage()` (and `ServiceBusSenderAsyncClient.sendMessage()`) not recognizing the caller's current OpenTelemetry trace context, causing the `ServiceBus.send` span and the outgoing message's `traceparent` to start a new, disconnected trace. The send span is now started on the calling thread before the first send establishes the AMQP connection on a background thread. ([#44958](https://github.com/Azure/azure-sdk-for-java/issues/44958))
- Fixed `ServiceBusMessageBatch` accepting messages beyond the service-enforced batch size limit on
Premium large-message entities by reading the `com.microsoft:max-message-batch-size` vendor property
from the AMQP sender link instead of using `max-message-size`. ([#48214](https://github.com/Azure/azure-sdk-for-java/pull/48214))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public Mono<Void> sendMessage(ServiceBusMessage message) {
if (Objects.isNull(message)) {
return monoError(logger, new NullPointerException("'message' cannot be null."));
}
return sendFluxInternal(Flux.just(message), null);
return sendFluxInternal(message, null);
}

/**
Expand All @@ -346,7 +346,7 @@ public Mono<Void> sendMessage(ServiceBusMessage message, ServiceBusTransactionCo
return monoError(logger, new NullPointerException("'transactionContext.transactionId' cannot be null."));
}

return sendFluxInternal(Flux.just(message), transactionContext);
return sendFluxInternal(message, transactionContext);
}

/**
Expand Down Expand Up @@ -843,6 +843,11 @@ private Mono<Long> scheduleMessageInternal(ServiceBusMessage message, OffsetDate
*/
private Mono<Void> sendBatchInternal(ServiceBusMessageBatch batch,
ServiceBusTransactionContext transactionContext) {
return sendBatchInternal(batch, transactionContext, true);
}

private Mono<Void> sendBatchInternal(ServiceBusMessageBatch batch, ServiceBusTransactionContext transactionContext,
boolean instrument) {
if (isDisposed.get()) {
return monoError(logger,
new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessages")));
Expand Down Expand Up @@ -884,16 +889,22 @@ private Mono<Void> sendBatchInternal(ServiceBusMessageBatch batch,

final String message = "Sending messages timed out. message-count:" + batch.getCount() + entityId();
final Mono<Void> withRetry = withRetry(sendMessage, retryOptions, message).onErrorMap(this::mapError);
// The single-message send path (sendFluxInternal) starts the "ServiceBus.send" span itself on the
// subscribing thread, so it sends this batch with instrument=false to avoid a duplicate span.
if (!instrument) {
return withRetry;
}
return instrumentation.instrumentSendBatch("ServiceBus.send", withRetry, batch.getMessages());
}

private Mono<Void> sendFluxInternal(Flux<ServiceBusMessage> messages,
ServiceBusTransactionContext transactionContext) {
private Mono<Void> sendFluxInternal(ServiceBusMessage message, ServiceBusTransactionContext transactionContext) {
if (isDisposed.get()) {
return monoError(logger,
new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessage")));
}

final List<ServiceBusMessage> messageList = Collections.singletonList(message);

// Uses getLinkSize() intentionally — this path is for single-message sends only (sendMessage()).
// Single messages should use the full link capacity (up to 100 MB on Premium large-message),
// not the batch-size cap. The batch path (sendMessages(Iterable), scheduleMessages(Iterable))
Expand All @@ -903,14 +914,27 @@ private Mono<Void> sendFluxInternal(Flux<ServiceBusMessage> messages,
final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES;
final CreateMessageBatchOptions batchOptions
= new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize);
return messages.collect(
new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, messageSerializer));
return Flux.fromIterable(messageList)
.collect(new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer,
messageSerializer));
}));

return batchList.flatMap(list -> Flux.fromIterable(list)
.flatMap(batch -> sendBatchInternal(batch, transactionContext))
// The raw send pipeline, without span instrumentation. Instrumentation is applied as the
// outermost operator below so the span is created on the subscribing thread (instrument=false here).
final Mono<Void> send = batchList.flatMap(list -> Flux.fromIterable(list)
.flatMap(batch -> sendBatchInternal(batch, transactionContext, false))
.then()
.doOnError(error -> logger.error("Error sending batch.", error))).onErrorMap(this::mapError);

// Start the producer message span and the "ServiceBus.send" span on the subscribing (caller) thread,
// BEFORE the first send triggers AMQP connection/link establishment on a background thread. Starting
// the spans here ensures the caller's ambient (thread-local) trace context is used as the parent.
// Doing this lazily downstream (after the connection thread hop) is what caused the first send to get
// a new, disconnected trace id. See https://github.com/Azure/azure-sdk-for-java/issues/44958.
return Mono.defer(() -> {
tracer.reportMessageSpan(message);
return instrumentation.instrumentSendBatch("ServiceBus.send", send, messageList);
});
}

private Mono<AmqpSendLink> getSendLink(String callSite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.IdGenerator;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
Expand Down Expand Up @@ -141,6 +143,47 @@ public void sendAndReceive() throws InterruptedException {
assertSettledVsProcessed(completed, processed, messages.size());
}

@Test
@SuppressWarnings("try")
public void sendMessageHasParentSpanOnFirstCall() {
// Regression test for https://github.com/Azure/azure-sdk-for-java/issues/44958
// The FIRST sendMessage() on a freshly built sender establishes the AMQP connection on a
// background thread. Ensure the "ServiceBus.send" and "ServiceBus.message" spans still inherit
// the caller's ambient (thread-local) trace context as the parent, rather than starting a new trace.
// Create the caller span from a SEPARATE OpenTelemetry instance so it is not validated by the
// Service-Bus-specific TestSpanProcessor (which requires az.namespace/messaging attributes on every
// span it sees). It is still made current on this thread, so the SDK picks it up as the parent via
// io.opentelemetry.context.Context.current().
Tracer testTracer = OpenTelemetrySdk.builder().build().getTracer("test");
Span callerSpan = testTracer.spanBuilder("caller").startSpan();
String expectedTraceId = callerSpan.getSpanContext().getTraceId();

ServiceBusMessage message = new ServiceBusMessage(CONTENTS_BYTES);
try (Scope scope = callerSpan.makeCurrent()) {
StepVerifier.create(sender.sendMessage(message)).expectComplete().verify(TIMEOUT);
} finally {
callerSpan.end();
}

List<ReadableSpan> spans = spanProcessor.getEndedSpans();

// Exactly one send span and one message span should be produced for a single sendMessage() call.
// An exact-count assertion guards against accidental double-instrumentation regressions.
List<ReadableSpan> send = findSpans(spans, "ServiceBus.send");
assertEquals(1, send.size());
assertEquals(expectedTraceId, send.get(0).getSpanContext().getTraceId());
assertEquals(expectedTraceId, send.get(0).getParentSpanContext().getTraceId());

List<ReadableSpan> messageSpans = findSpans(spans, "ServiceBus.message");
assertEquals(1, messageSpans.size());
assertMessageSpan(messageSpans.get(0), message);
assertEquals(expectedTraceId, messageSpans.get(0).getSpanContext().getTraceId());

// the traceparent injected into the outgoing message carries the caller's trace id
String traceparent = (String) message.getApplicationProperties().get("traceparent");
assertTrue(traceparent.startsWith("00-" + expectedTraceId));
}

@Test
public void receiveAndRenewLockWithDuration() throws InterruptedException {
ServiceBusMessage message = new ServiceBusMessage(CONTENTS_BYTES);
Expand Down