Skip to content

Commit b75d68f

Browse files
mostroverkhovyschimke
authored andcommitted
Server setup rejection support (#530)
1 parent 08705ea commit b75d68f

File tree

7 files changed

+494
-158
lines changed

7 files changed

+494
-158
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 209 additions & 152 deletions
Large diffs are not rendered by default.

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.rsocket;
1818

1919
import io.rsocket.exceptions.InvalidSetupException;
20+
import io.rsocket.exceptions.RejectedSetupException;
2021
import io.rsocket.fragmentation.FragmentationDuplexConnection;
2122
import io.rsocket.frame.SetupFrameFlyweight;
2223
import io.rsocket.frame.VersionFlyweight;
@@ -353,6 +354,12 @@ private Mono<Void> processSetupFrame(
353354

354355
return acceptor
355356
.accept(setupPayload, wrappedRSocketClient)
357+
.onErrorResume(
358+
err ->
359+
multiplexer
360+
.asStreamZeroConnection()
361+
.sendOne(rejectedSetupErrorFrame(err))
362+
.then(Mono.error(err)))
356363
.doOnNext(
357364
unwrappedServerSocket -> {
358365
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
@@ -369,6 +376,12 @@ private Mono<Void> processSetupFrame(
369376
.doFinally(signalType -> setupPayload.release())
370377
.then();
371378
}
379+
380+
private Frame rejectedSetupErrorFrame(Throwable err) {
381+
String msg = err.getMessage();
382+
return Frame.Error.from(
383+
0, new RejectedSetupException(msg == null ? "rejected by server acceptor" : msg));
384+
}
372385
}
373386
}
374387
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package io.rsocket;
2+
3+
import static io.rsocket.transport.ServerTransport.*;
4+
import static org.assertj.core.api.Assertions.*;
5+
6+
import io.rsocket.exceptions.Exceptions;
7+
import io.rsocket.exceptions.RejectedSetupException;
8+
import io.rsocket.framing.FrameType;
9+
import io.rsocket.test.util.TestDuplexConnection;
10+
import io.rsocket.transport.ServerTransport;
11+
import io.rsocket.util.DefaultPayload;
12+
import java.time.Duration;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import org.junit.jupiter.api.Test;
16+
import reactor.core.publisher.Mono;
17+
import reactor.core.publisher.UnicastProcessor;
18+
import reactor.test.StepVerifier;
19+
20+
public class SetupRejectionTest {
21+
22+
@Test
23+
void responderRejectSetup() {
24+
SingleConnectionTransport transport = new SingleConnectionTransport();
25+
26+
String errorMsg = "error";
27+
RejectingAcceptor acceptor = new RejectingAcceptor(errorMsg);
28+
RSocketFactory.receive().acceptor(acceptor).transport(transport).start().block();
29+
30+
transport.connect();
31+
32+
Frame sentFrame = transport.awaitSent();
33+
assertThat(sentFrame.getType()).isEqualTo(FrameType.ERROR);
34+
RuntimeException error = Exceptions.from(sentFrame);
35+
assertThat(errorMsg).isEqualTo(error.getMessage());
36+
assertThat(error).isInstanceOf(RejectedSetupException.class);
37+
RSocket acceptorSender = acceptor.senderRSocket().block();
38+
assertThat(acceptorSender.isDisposed()).isTrue();
39+
}
40+
41+
@Test
42+
void requesterStreamsTerminatedOnZeroErrorFrame() {
43+
TestDuplexConnection conn = new TestDuplexConnection();
44+
List<Throwable> errors = new ArrayList<>();
45+
RSocketClient rSocket =
46+
new RSocketClient(
47+
conn, DefaultPayload::create, errors::add, StreamIdSupplier.clientSupplier());
48+
49+
String errorMsg = "error";
50+
51+
Mono.delay(Duration.ofMillis(100))
52+
.doOnTerminate(
53+
() ->
54+
conn.addToReceivedBuffer(Frame.Error.from(0, new RejectedSetupException(errorMsg))))
55+
.subscribe();
56+
57+
StepVerifier.create(rSocket.requestResponse(DefaultPayload.create("test")))
58+
.expectErrorMatches(
59+
err -> err instanceof RejectedSetupException && errorMsg.equals(err.getMessage()))
60+
.verify(Duration.ofSeconds(5));
61+
62+
assertThat(errors).hasSize(1);
63+
assertThat(rSocket.isDisposed()).isTrue();
64+
}
65+
66+
@Test
67+
void requesterNewStreamsTerminatedAfterZeroErrorFrame() {
68+
TestDuplexConnection conn = new TestDuplexConnection();
69+
RSocketClient rSocket =
70+
new RSocketClient(
71+
conn, DefaultPayload::create, err -> {}, StreamIdSupplier.clientSupplier());
72+
73+
conn.addToReceivedBuffer(Frame.Error.from(0, new RejectedSetupException("error")));
74+
75+
StepVerifier.create(
76+
rSocket
77+
.requestResponse(DefaultPayload.create("test"))
78+
.delaySubscription(Duration.ofMillis(100)))
79+
.expectErrorMatches(
80+
err -> err instanceof RejectedSetupException && "error".equals(err.getMessage()))
81+
.verify(Duration.ofSeconds(5));
82+
}
83+
84+
private static class RejectingAcceptor implements SocketAcceptor {
85+
private final String errorMessage;
86+
87+
public RejectingAcceptor(String errorMessage) {
88+
this.errorMessage = errorMessage;
89+
}
90+
91+
private final UnicastProcessor<RSocket> senderRSockets = UnicastProcessor.create();
92+
93+
@Override
94+
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
95+
senderRSockets.onNext(sendingSocket);
96+
return Mono.error(new RuntimeException(errorMessage));
97+
}
98+
99+
public Mono<RSocket> senderRSocket() {
100+
return senderRSockets.next();
101+
}
102+
}
103+
104+
private static class SingleConnectionTransport implements ServerTransport<TestCloseable> {
105+
106+
private final TestDuplexConnection conn = new TestDuplexConnection();
107+
108+
@Override
109+
public Mono<TestCloseable> start(ConnectionAcceptor acceptor) {
110+
return Mono.just(new TestCloseable(acceptor, conn));
111+
}
112+
113+
public Frame awaitSent() {
114+
try {
115+
return conn.awaitSend();
116+
} catch (InterruptedException e) {
117+
throw new RuntimeException(e);
118+
}
119+
}
120+
121+
public void connect() {
122+
Frame setup =
123+
Frame.Setup.from(
124+
0, 42, 1, "mdMime", "dMime", DefaultPayload.create(DefaultPayload.EMPTY_BUFFER));
125+
conn.addToReceivedBuffer(setup);
126+
}
127+
}
128+
129+
private static class TestCloseable implements Closeable {
130+
131+
private final DuplexConnection conn;
132+
133+
TestCloseable(ConnectionAcceptor acceptor, DuplexConnection conn) {
134+
this.conn = conn;
135+
Mono.from(acceptor.apply(conn)).subscribe(notUsed -> {}, err -> conn.dispose());
136+
}
137+
138+
@Override
139+
public Mono<Void> onClose() {
140+
return conn.onClose();
141+
}
142+
143+
@Override
144+
public void dispose() {
145+
conn.dispose();
146+
}
147+
}
148+
}

rsocket-transport-netty/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ dependencies {
3131
testImplementation 'io.projectreactor:reactor-test'
3232
testImplementation 'org.assertj:assertj-core'
3333
testImplementation 'org.junit.jupiter:junit-jupiter-api'
34+
testImplementation 'org.junit.jupiter:junit-jupiter-params'
3435

3536
testRuntimeOnly 'ch.qos.logback:logback-classic'
3637
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,7 @@ public Mono<NettyContextCloseable> start(ConnectionAcceptor acceptor) {
9898
(in, out) -> {
9999
in.context().addHandler(new RSocketLengthCodec());
100100
TcpDuplexConnection connection = new TcpDuplexConnection(in, out, in.context());
101-
acceptor.apply(connection).subscribe();
102-
103-
return out.neverComplete();
101+
return acceptor.apply(connection).then(out.neverComplete());
104102
})
105103
.map(NettyContextCloseable::new);
106104
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,7 @@ static BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> newHandl
8383

8484
return (in, out) -> {
8585
WebsocketDuplexConnection connection = new WebsocketDuplexConnection(in, out, in.context());
86-
acceptor.apply(connection).subscribe();
87-
88-
return out.neverComplete();
86+
return acceptor.apply(connection).then(out.neverComplete());
8987
};
9088
}
9189
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package io.rsocket.transport.netty;
2+
3+
import io.rsocket.ConnectionSetupPayload;
4+
import io.rsocket.RSocket;
5+
import io.rsocket.RSocketFactory;
6+
import io.rsocket.SocketAcceptor;
7+
import io.rsocket.exceptions.RejectedSetupException;
8+
import io.rsocket.transport.ClientTransport;
9+
import io.rsocket.transport.ServerTransport;
10+
import io.rsocket.transport.netty.client.TcpClientTransport;
11+
import io.rsocket.transport.netty.client.WebsocketClientTransport;
12+
import io.rsocket.transport.netty.server.NettyContextCloseable;
13+
import io.rsocket.transport.netty.server.TcpServerTransport;
14+
import io.rsocket.transport.netty.server.WebsocketServerTransport;
15+
import io.rsocket.util.DefaultPayload;
16+
import java.net.InetSocketAddress;
17+
import java.time.Duration;
18+
import java.util.function.Consumer;
19+
import java.util.function.Function;
20+
import java.util.stream.Stream;
21+
import org.junit.jupiter.api.DisplayName;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.Arguments;
24+
import org.junit.jupiter.params.provider.MethodSource;
25+
import reactor.core.publisher.EmitterProcessor;
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Mono;
28+
import reactor.test.StepVerifier;
29+
30+
public class SetupRejectionTest {
31+
32+
@DisplayName(
33+
"Rejecting setup by server causes requester RSocket disposal and RejectedSetupException")
34+
@ParameterizedTest
35+
@MethodSource(value = "transports")
36+
void rejectSetupTcp(
37+
Function<InetSocketAddress, ServerTransport<NettyContextCloseable>> serverTransport,
38+
Function<InetSocketAddress, ClientTransport> clientTransport) {
39+
40+
String errorMessage = "error";
41+
RejectingAcceptor acceptor = new RejectingAcceptor(errorMessage);
42+
Mono<RSocket> serverRequester = acceptor.requesterRSocket();
43+
44+
NettyContextCloseable nettyCtx =
45+
RSocketFactory.receive()
46+
.acceptor(acceptor)
47+
.transport(serverTransport.apply(new InetSocketAddress(0)))
48+
.start()
49+
.block();
50+
51+
ErrorConsumer errorConsumer = new ErrorConsumer();
52+
53+
RSocket clientRequester =
54+
RSocketFactory.connect()
55+
.errorConsumer(errorConsumer)
56+
.transport(clientTransport.apply(nettyCtx.address()))
57+
.start()
58+
.block();
59+
60+
StepVerifier.create(errorConsumer.errors().next())
61+
.expectNextMatches(
62+
err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
63+
.expectComplete()
64+
.verify(Duration.ofSeconds(5));
65+
66+
StepVerifier.create(clientRequester.onClose()).expectComplete().verify(Duration.ofSeconds(5));
67+
StepVerifier.create(serverRequester.flatMap(RSocket::onClose))
68+
.expectComplete()
69+
.verify(Duration.ofSeconds(5));
70+
71+
StepVerifier.create(clientRequester.requestResponse(DefaultPayload.create("test")))
72+
.expectErrorMatches(
73+
err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
74+
.verify(Duration.ofSeconds(5));
75+
76+
nettyCtx.dispose();
77+
}
78+
79+
static Stream<Arguments> transports() {
80+
Function<InetSocketAddress, ServerTransport<NettyContextCloseable>> tcpServer =
81+
TcpServerTransport::create;
82+
Function<InetSocketAddress, ServerTransport<NettyContextCloseable>> wsServer =
83+
WebsocketServerTransport::create;
84+
Function<InetSocketAddress, ClientTransport> tcpClient = TcpClientTransport::create;
85+
Function<InetSocketAddress, ClientTransport> wsClient = WebsocketClientTransport::create;
86+
87+
return Stream.of(Arguments.of(tcpServer, tcpClient), Arguments.of(wsServer, wsClient));
88+
}
89+
90+
static class ErrorConsumer implements Consumer<Throwable> {
91+
private final EmitterProcessor<Throwable> errors = EmitterProcessor.create();
92+
93+
@Override
94+
public void accept(Throwable t) {
95+
errors.onNext(t);
96+
}
97+
98+
Flux<Throwable> errors() {
99+
return errors;
100+
}
101+
}
102+
103+
private static class RejectingAcceptor implements SocketAcceptor {
104+
private final String msg;
105+
private final EmitterProcessor<RSocket> requesters = EmitterProcessor.create();
106+
107+
public RejectingAcceptor(String msg) {
108+
this.msg = msg;
109+
}
110+
111+
@Override
112+
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
113+
requesters.onNext(sendingSocket);
114+
return Mono.error(new RuntimeException(msg));
115+
}
116+
117+
public Mono<RSocket> requesterRSocket() {
118+
return requesters.next();
119+
}
120+
}
121+
}

0 commit comments

Comments
 (0)