Skip to content

Commit ac577e9

Browse files
authored
Optional io.micrometer:context-propagation (#8556)
For better performance by default it is better to not pull a `io.micrometer:context-propagation` a hard dependency. * Remove `io.micrometer:context-propagation` dependency management * It is pulled transitively by the `io.micrometer:micrometer-tracing-integration-test` in test scope * Rework all the `ContextSnapshot` usage in the reactive code to respective recommended `handle()` API in `Flux` and `Mono`
1 parent b482b00 commit ac577e9

File tree

4 files changed

+34
-40
lines changed

4 files changed

+34
-40
lines changed

build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ ext {
8787
lettuceVersion = '6.2.2.RELEASE'
8888
log4jVersion = '2.19.0'
8989
mailVersion = '1.0.0'
90-
micrometerPropagationVersion = '1.0.2'
9190
micrometerTracingVersion = '1.0.2'
9291
micrometerVersion = '1.10.4'
9392
mockitoVersion = '4.10.0'
@@ -533,7 +532,6 @@ project('spring-integration-core') {
533532
}
534533
api 'io.projectreactor:reactor-core'
535534
api 'io.micrometer:micrometer-observation'
536-
api "io.micrometer:context-propagation:$micrometerPropagationVersion"
537535

538536
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
539537
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.concurrent.TimeUnit;
2121
import java.util.concurrent.locks.LockSupport;
2222

23-
import io.micrometer.context.ContextSnapshot;
2423
import org.reactivestreams.Publisher;
2524
import org.reactivestreams.Subscriber;
2625
import reactor.core.Disposable;
@@ -112,19 +111,18 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
112111
Flux.from(publisher)
113112
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
114113
.publishOn(this.scheduler)
115-
.transformDeferredContextual((flux, contextView) ->
116-
flux.doOnNext((message) -> {
117-
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
118-
try (scope) {
119-
if (!send(message)) {
120-
throw new MessageDeliveryException(message,
121-
"Failed to send message to channel '" + this);
122-
}
123-
}
124-
catch (Exception ex) {
125-
logger.warn(ex, () -> "Error during processing event: " + message);
126-
}
127-
}))
114+
.handle((message, synchronousSink) -> {
115+
try {
116+
if (!send(message)) {
117+
logger.warn(new MessageDeliveryException(message,
118+
"Failed to send message to channel '" + this),
119+
"Message was not delivered");
120+
}
121+
}
122+
catch (Exception ex) {
123+
logger.warn(ex, () -> "Error during processing event: " + message);
124+
}
125+
})
128126
.contextCapture()
129127
.subscribe());
130128
}

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.Set;
2121
import java.util.concurrent.ConcurrentHashMap;
2222

23-
import io.micrometer.context.ContextSnapshot;
2423
import io.micrometer.observation.ObservationRegistry;
2524
import org.reactivestreams.Publisher;
2625
import org.reactivestreams.Subscriber;
@@ -721,7 +720,7 @@ private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestC
721720
throw new MessageMappingException("Cannot map to message: " + object, e);
722721
}
723722

724-
return Mono.deferContextual(contextView -> {
723+
return Mono.defer(() -> {
725724
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
726725
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
727726

@@ -739,13 +738,13 @@ private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestC
739738
.setErrorChannel(replyChan)
740739
.build();
741740

742-
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
743-
try (scope) {
744-
sendMessageForReactiveFlow(requestChannel, messageToSend);
745-
}
746-
747-
return buildReplyMono(requestMessage, replyChan.replyMono.asMono(), error,
748-
originalReplyChannelHeader, originalErrorChannelHeader);
741+
return Mono.just(messageToSend)
742+
.handle((message, synchronousSink) -> {
743+
sendMessageForReactiveFlow(requestChannel, message);
744+
synchronousSink.complete();
745+
})
746+
.then(buildReplyMono(requestMessage, replyChan.replyMono.asMono(), error,
747+
originalReplyChannelHeader, originalErrorChannelHeader));
749748
})
750749
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
751750
}

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/inbound/WebFluxInboundEndpoint.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.function.Supplier;
2828
import java.util.stream.Collectors;
2929

30-
import io.micrometer.context.ContextSnapshot;
3130
import org.reactivestreams.Publisher;
3231
import reactor.core.publisher.Flux;
3332
import reactor.core.publisher.Mono;
@@ -152,20 +151,20 @@ private Mono<Void> doHandle(ServerWebExchange exchange) {
152151
new RequestEntity<>(body, exchange.getRequest().getHeaders(),
153152
exchange.getRequest().getMethod(), exchange.getRequest().getURI()))
154153
.flatMap(entity -> buildMessage(entity, exchange))
155-
.flatMap(requestTuple ->
156-
Mono.deferContextual(contextView -> {
157-
if (isExpectReply()) {
158-
return sendAndReceiveMessageReactive(requestTuple.getT1())
159-
.flatMap(replyMessage -> populateResponse(exchange, replyMessage));
160-
}
161-
else {
162-
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
163-
try (scope) {
164-
send(requestTuple.getT1());
165-
}
166-
return setStatusCode(exchange, requestTuple.getT2());
167-
}
168-
}))
154+
.flatMap(requestTuple -> {
155+
if (isExpectReply()) {
156+
return sendAndReceiveMessageReactive(requestTuple.getT1())
157+
.flatMap(replyMessage -> populateResponse(exchange, replyMessage));
158+
}
159+
else {
160+
return Mono.just(requestTuple.getT1())
161+
.handle((objectMessage, synchronousSink) -> {
162+
send(objectMessage);
163+
synchronousSink.complete();
164+
})
165+
.then(setStatusCode(exchange, requestTuple.getT2()));
166+
}
167+
})
169168
.doOnTerminate(this.activeCount::decrementAndGet);
170169

171170
}

0 commit comments

Comments
 (0)