Skip to content

Commit cadd4e6

Browse files
committed
Upgrade to the latest Reactor & SF
Related to spring-projects/spring-framework#25884 * Don't use `MonoProcessor` & `FluxProcessor` since they are deprecated now * Fix RSocket components to use an `AtomicReference` header instead of deprecated `MonoProcessor` * Introduce an internal `IntegrationRSocketPayloadReturnValueHandler` to handle properly a Spring Integration case for inbound requests with its `IntegrationRSocketEndpoint` implementation * Don't do response handling in the `RSocketInboundGateway` any more since it is deferred now to the `IntegrationRSocketPayloadReturnValueHandler` * Remove internal `ChannelSendOperator` since it is out of use from now on
1 parent 20447df commit cadd4e6

File tree

6 files changed

+83
-503
lines changed

6 files changed

+83
-503
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import reactor.core.Disposable;
3030
import reactor.core.Disposables;
3131
import reactor.core.publisher.Flux;
32-
import reactor.core.publisher.FluxProcessor;
3332
import reactor.core.publisher.Sinks;
3433
import reactor.core.scheduler.Schedulers;
3534

@@ -46,24 +45,17 @@
4645
public class FluxMessageChannel extends AbstractMessageChannel
4746
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
4847

49-
private final Sinks.Many<Message<?>> sink;
50-
51-
private final FluxProcessor<Message<?>, Message<?>> processor;
48+
private final Sinks.Many<Message<?>> sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
5249

5350
private final Sinks.Many<Boolean> subscribedSignal = Sinks.many().replay().limit(1);
5451

5552
private final Disposable.Composite upstreamSubscriptions = Disposables.composite();
5653

5754
private volatile boolean active = true;
5855

59-
public FluxMessageChannel() {
60-
this.sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
61-
this.processor = FluxProcessor.fromSink(this.sink);
62-
}
63-
6456
@Override
6557
protected boolean doSend(Message<?> message, long timeout) {
66-
Assert.state(this.active && this.processor.hasDownstreams(),
58+
Assert.state(this.active && this.sink.currentSubscriberCount() > 0,
6759
() -> "The [" + this + "] doesn't have subscribers to accept messages");
6860
long remainingTime = 0;
6961
if (timeout > 0) {
@@ -101,11 +93,12 @@ private boolean tryEmitMessage(Message<?> message) {
10193

10294
@Override
10395
public void subscribe(Subscriber<? super Message<?>> subscriber) {
104-
this.processor
105-
.doFinally((s) -> this.subscribedSignal.tryEmitNext(this.processor.hasDownstreams()))
96+
this.sink.asFlux()
97+
.doFinally((s) -> this.subscribedSignal.tryEmitNext(this.sink.currentSubscriberCount() > 0))
10698
.share()
10799
.subscribe(subscriber);
108-
this.subscribedSignal.tryEmitNext(this.processor.hasDownstreams());
100+
101+
this.subscribedSignal.tryEmitNext(this.sink.currentSubscriberCount() > 0);
109102
}
110103

111104
@Override
@@ -131,9 +124,9 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
131124
@Override
132125
public void destroy() {
133126
this.active = false;
134-
this.subscribedSignal.tryEmitNext(false);
135127
this.upstreamSubscriptions.dispose();
136-
this.processor.onComplete();
128+
this.subscribedSignal.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
129+
this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
137130
super.destroy();
138131
}
139132

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
import org.springframework.util.Assert;
6868

6969
import reactor.core.publisher.Mono;
70-
import reactor.core.publisher.MonoProcessor;
7170
import reactor.core.publisher.Sinks;
7271

7372
/**
@@ -899,7 +898,10 @@ public boolean send(Message<?> message, long timeout) {
899898

900899
@Override
901900
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
902-
publisher.subscribe(MonoProcessor.fromSink(this.replyMono));
901+
Mono.from(publisher)
902+
.subscribe(
903+
(value) -> this.replyMono.emitValue(value, Sinks.EmitFailureHandler.FAIL_FAST),
904+
this.replyMono::tryEmitError, this.replyMono::tryEmitEmpty);
903905
}
904906

905907
}

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151

5252
import reactor.core.Disposable;
5353
import reactor.core.publisher.Flux;
54-
import reactor.core.publisher.FluxProcessor;
5554

5655
/**
5756
* @author Artem Bilan
@@ -141,7 +140,7 @@ void testFluxMessageChannelCleanUp() throws InterruptedException {
141140

142141
flowRegistration.destroy();
143142

144-
assertThat(TestUtils.getPropertyValue(flux, "processor", FluxProcessor.class).isTerminated()).isTrue();
143+
assertThat(TestUtils.getPropertyValue(flux, "sink.sink.done", Boolean.class)).isTrue();
145144
}
146145

147146
@Configuration

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,35 @@
1717
package org.springframework.integration.rsocket;
1818

1919
import java.lang.reflect.Method;
20+
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collection;
2223
import java.util.Collections;
2324
import java.util.List;
25+
import java.util.concurrent.atomic.AtomicReference;
2426

2527
import org.springframework.context.ApplicationContext;
2628
import org.springframework.core.MethodParameter;
29+
import org.springframework.core.ReactiveAdapterRegistry;
30+
import org.springframework.core.codec.Encoder;
31+
import org.springframework.lang.Nullable;
2732
import org.springframework.messaging.Message;
2833
import org.springframework.messaging.ReactiveMessageHandler;
2934
import org.springframework.messaging.handler.CompositeMessageCondition;
3035
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
3136
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
37+
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
3238
import org.springframework.messaging.handler.invocation.reactive.SyncHandlerMethodArgumentResolver;
3339
import org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition;
3440
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
41+
import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler;
42+
import org.springframework.util.Assert;
3543
import org.springframework.util.ReflectionUtils;
3644

45+
import io.rsocket.Payload;
3746
import io.rsocket.frame.FrameType;
47+
import reactor.core.publisher.Flux;
48+
import reactor.core.publisher.Mono;
3849

3950
/**
4051
* The {@link RSocketMessageHandler} extension for Spring Integration needs.
@@ -109,6 +120,23 @@ protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers()
109120
}
110121
}
111122

123+
@Override
124+
@SuppressWarnings("unchecked")
125+
protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
126+
HandlerMethodReturnValueHandler integrationRSocketPayloadReturnValueHandler =
127+
new IntegrationRSocketPayloadReturnValueHandler((List<Encoder<?>>) getEncoders(),
128+
getReactiveAdapterRegistry());
129+
if (this.messageMappingCompatible) {
130+
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();
131+
handlers.add(integrationRSocketPayloadReturnValueHandler);
132+
handlers.addAll(getReturnValueHandlerConfigurer().getCustomHandlers());
133+
return handlers;
134+
}
135+
else {
136+
return Collections.singletonList(integrationRSocketPayloadReturnValueHandler);
137+
}
138+
}
139+
112140
protected static final class MessageHandlerMethodArgumentResolver implements SyncHandlerMethodArgumentResolver {
113141

114142
@Override
@@ -123,4 +151,35 @@ public Object resolveArgumentValue(MethodParameter parameter, Message<?> message
123151

124152
}
125153

154+
protected static final class IntegrationRSocketPayloadReturnValueHandler extends RSocketPayloadReturnValueHandler {
155+
156+
protected IntegrationRSocketPayloadReturnValueHandler(List<Encoder<?>> encoders,
157+
ReactiveAdapterRegistry registry) {
158+
159+
super(encoders, registry);
160+
}
161+
162+
@Override public Mono<Void> handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
163+
Message<?> message) {
164+
165+
AtomicReference<Flux<Payload>> responseReference = getResponseReference(message);
166+
167+
if (returnValue == null && responseReference != null) {
168+
return super.handleReturnValue(responseReference.get(), returnType, message);
169+
}
170+
else {
171+
return super.handleReturnValue(returnValue, returnType, message);
172+
}
173+
}
174+
175+
@Nullable
176+
@SuppressWarnings("unchecked")
177+
private static AtomicReference<Flux<Payload>> getResponseReference(Message<?> message) {
178+
Object headerValue = message.getHeaders().get(RESPONSE_HEADER);
179+
Assert.state(headerValue == null || headerValue instanceof AtomicReference, "Expected AtomicReference");
180+
return (AtomicReference<Flux<Payload>>) headerValue;
181+
}
182+
183+
}
184+
126185
}

0 commit comments

Comments
 (0)