Skip to content

Commit 40d6543

Browse files
mostroverkhovrobertroeser
authored andcommitted
Requester Channel: Cancel request on response termination (#517)
* Requester Channel: cancel request on response termination * fix some tests which assume port 8080 is available
1 parent 60b8201 commit 40d6543

File tree

5 files changed

+22
-6
lines changed

5 files changed

+22
-6
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,10 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
367367
.doFinally(
368368
s -> {
369369
receivers.remove(streamId);
370-
senders.remove(streamId);
370+
LimitableRequestPublisher sender = senders.remove(streamId);
371+
if (sender != null) {
372+
sender.cancel();
373+
}
371374
});
372375
}));
373376
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public FragmentationDuplexConnection(DuplexConnection delegate, int maxFragmentS
6868
*
6969
* @param byteBufAllocator the {@link ByteBufAllocator} to use
7070
* @param delegate the {@link DuplexConnection} to decorate
71-
* @param maxFragmentSize the maximum fragment size. A value of 0 indicates that frames should not be fragmented.
71+
* @param maxFragmentSize the maximum fragment size. A value of 0 indicates that frames should not
72+
* be fragmented.
7273
* @throws NullPointerException if {@code byteBufAllocator} or {@code delegate} are {@code null}
7374
* @throws IllegalArgumentException if {@code maxFragmentSize} is not {@code positive}
7475
*/

rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ public void subscribe(CoreSubscriber<? super R> actual) {
4343
source.subscribe(new SwitchTransformSubscriber<>(actual, transformer));
4444
}
4545

46-
static final class SwitchTransformSubscriber<T, R>
47-
implements CoreSubscriber<T> {
46+
static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
4847
@SuppressWarnings("rawtypes")
4948
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
5049
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");

rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import reactor.core.publisher.BaseSubscriber;
4949
import reactor.core.publisher.Flux;
5050
import reactor.core.publisher.Mono;
51+
import reactor.core.publisher.MonoProcessor;
5152

5253
public class RSocketClientTest {
5354

@@ -182,6 +183,18 @@ public void testLazyRequestResponse() {
182183
assertThat("Stream ID reused.", streamId2, not(equalTo(streamId)));
183184
}
184185

186+
@Test
187+
public void testChannelRequestCancellation() {
188+
MonoProcessor<Void> cancelled = MonoProcessor.create();
189+
Flux<Payload> request = Flux.<Payload>never().doOnCancel(cancelled::onComplete);
190+
rule.socket.requestChannel(request).subscribe().dispose();
191+
Flux.first(
192+
cancelled,
193+
Flux.error(new IllegalStateException("Channel request not cancelled"))
194+
.delaySubscription(Duration.ofSeconds(1)))
195+
.blockFirst();
196+
}
197+
185198
public int sendRequestResponse(Publisher<Payload> response) {
186199
Subscriber<Payload> sub = TestSubscriber.create();
187200
response.subscribe(sub);

rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/server/WebsocketRouteTransportTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void newHandlerNullAcceptor() {
7575
@Test
7676
void start() {
7777
WebsocketRouteTransport serverTransport =
78-
new WebsocketRouteTransport(HttpServer.create(), routes -> {}, "/test-path");
78+
new WebsocketRouteTransport(HttpServer.create(0), routes -> {}, "/test-path");
7979

8080
serverTransport
8181
.start(duplexConnection -> Mono.empty())
@@ -90,7 +90,7 @@ void startNullAcceptor() {
9090
assertThatNullPointerException()
9191
.isThrownBy(
9292
() ->
93-
new WebsocketRouteTransport(HttpServer.create(), routes -> {}, "/test-path")
93+
new WebsocketRouteTransport(HttpServer.create(0), routes -> {}, "/test-path")
9494
.start(null))
9595
.withMessage("acceptor must not be null");
9696
}

0 commit comments

Comments
 (0)