Skip to content

Commit e649480

Browse files
artembilangaryrussell
authored andcommitted
Fix IntegrationReactiveUtils
The `Mono.doOnSuccess()` is always called for completed `MonoSink` even if the value is `null` * Fix `IntegrationReactiveUtils.messageSourceToFlux()` to check a message for `null` before calling `AckUtils.autoAck()` * Add an `logger.error()` message when `doOnError()` is invoked for that `Mono` **Cherry-pick to 5.3.x**
1 parent d0cab67 commit e649480

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.time.Duration;
2020
import java.util.function.Function;
2121

22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
2224
import org.reactivestreams.Publisher;
2325

2426
import org.springframework.integration.StaticMessageHeaderAccessor;
@@ -45,6 +47,8 @@
4547
*/
4648
public final class IntegrationReactiveUtils {
4749

50+
private static final Log logger = LogFactory.getLog(IntegrationReactiveUtils.class);
51+
4852
/**
4953
* The subscriber context entry for {@link Flux#delayElements}
5054
* from the {@link Mono#repeatWhenEmpty(Function)}.
@@ -75,16 +79,19 @@ private IntegrationReactiveUtils() {
7579
public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) {
7680
return Mono.
7781
<Message<T>>create(monoSink ->
78-
monoSink.onRequest(value ->
79-
monoSink.success(messageSource.receive())))
80-
.doOnSuccess((message) ->
81-
AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message)))
82+
monoSink.onRequest(value -> monoSink.success(messageSource.receive())))
83+
.doOnSuccess((message) -> {
84+
if (message != null) {
85+
AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message));
86+
}
87+
})
8288
.doOnError(MessagingException.class,
8389
(ex) -> {
8490
Message<?> failedMessage = ex.getFailedMessage();
8591
if (failedMessage != null) {
8692
AckUtils.autoNack(StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMessage));
8793
}
94+
logger.error("Error from Flux for : " + messageSource, ex);
8895
})
8996
.subscribeOn(Schedulers.boundedElastic())
9097
.repeatWhenEmpty((repeat) ->

0 commit comments

Comments
 (0)