Skip to content

Commit 19b59bb

Browse files
committed
Upgrade to Reactor 2020.0.0-RC1
* Handle `Emission.FAIL_ZERO_SUBSCRIBER` in the `FluxMessageChannel` and `IntegrationReactiveUtils`
1 parent 5bac9bf commit 19b59bb

File tree

3 files changed

+7
-1
lines changed

3 files changed

+7
-1
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ ext {
9090
pahoMqttClientVersion = '1.2.4'
9191
postgresVersion = '42.2.14'
9292
r2dbch2Version='0.8.4.RELEASE'
93-
reactorVersion = '2020.0.0-SNAPSHOT'
93+
reactorVersion = '2020.0.0-RC1'
9494
resilience4jVersion = '1.5.0'
9595
romeToolsVersion = '1.12.2'
9696
rsocketVersion = '1.1.0-SNAPSHOT'

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ private boolean tryEmitMessage(Message<?> message) {
8888
case FAIL_NON_SERIALIZED:
8989
case FAIL_OVERFLOW:
9090
return false;
91+
case FAIL_ZERO_SUBSCRIBER:
92+
throw new IllegalStateException("The [" + this + "] doesn't have subscribers to accept messages");
9193
case FAIL_TERMINATED:
9294
case FAIL_CANCELLED:
9395
throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink: "
@@ -101,6 +103,7 @@ private boolean tryEmitMessage(Message<?> message) {
101103
public void subscribe(Subscriber<? super Message<?>> subscriber) {
102104
this.processor
103105
.doFinally((s) -> this.subscribedSignal.tryEmitNext(this.processor.hasDownstreams()))
106+
.share()
104107
.subscribe(subscriber);
105108
this.subscribedSignal.tryEmitNext(this.processor.hasDownstreams());
106109
}

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(Subscrib
135135
case FAIL_OVERFLOW:
136136
LockSupport.parkNanos(1000); // NOSONAR
137137
break;
138+
case FAIL_ZERO_SUBSCRIBER:
139+
throw new IllegalStateException("The [" + sink +
140+
"] doesn't have subscribers to accept messages");
138141
case FAIL_TERMINATED:
139142
case FAIL_CANCELLED:
140143
throw new IllegalStateException("Cannot emit messages into the cancelled " +

0 commit comments

Comments
 (0)