Skip to content

Commit 6200b1a

Browse files
authored
Merge pull request #925 from rstoyanchev/rsocketclient
RSocketClient refactoring
2 parents 39be15d + 13bb818 commit 6200b1a

File tree

10 files changed

+178
-162
lines changed

10 files changed

+178
-162
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 0 additions & 97 deletions
This file was deleted.

rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.rsocket.core;
217

318
import io.netty.util.IllegalReferenceCountException;
419
import io.netty.util.ReferenceCounted;
520
import io.rsocket.Payload;
621
import io.rsocket.RSocket;
7-
import io.rsocket.RSocketClient;
822
import io.rsocket.frame.FrameType;
923
import java.util.AbstractMap;
1024
import java.util.Map;
@@ -57,7 +71,11 @@ class DefaultRSocketClient extends ResolvingOperator<RSocket>
5771
AtomicReferenceFieldUpdater.newUpdater(DefaultRSocketClient.class, Subscription.class, "s");
5872

5973
DefaultRSocketClient(Mono<RSocket> source) {
60-
this.source = source;
74+
this.source = unwrapReconnectMono(source);
75+
}
76+
77+
private Mono<RSocket> unwrapReconnectMono(Mono<RSocket> source) {
78+
return source instanceof ReconnectMono ? ((ReconnectMono<RSocket>) source).getSource() : source;
6179
}
6280

6381
@Override
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import io.rsocket.Payload;
19+
import io.rsocket.RSocket;
20+
import org.reactivestreams.Publisher;
21+
import reactor.core.Disposable;
22+
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.Mono;
24+
25+
/**
26+
* Contract for performing RSocket requests.
27+
*
28+
* <p>{@link RSocketClient} differs from {@link RSocket} in a number of ways:
29+
*
30+
* <ul>
31+
* <li>{@code RSocket} represents a "live" connection that is transient and needs to be obtained
32+
* typically from a {@code Mono<RSocket>} source via {@code flatMap} or block. By contrast,
33+
* {@code RSocketClient} is a higher level layer that contains such a {@link #source() source}
34+
* of connections and transparently obtains and re-obtains a shared connection as needed when
35+
* requests are made concurrently. That means an {@code RSocketClient} can simply be created
36+
* once, even before a connection is established, and shared as a singleton across multiple
37+
* places as you would with any other client.
38+
* <li>For request input {@code RSocket} accepts an instance of {@code Payload} and does not allow
39+
* more than one subscription per request because there is no way to safely re-use that input.
40+
* By contrast {@code RSocketClient} accepts {@code Publisher<Payload>} and allow
41+
* re-subscribing which repeats the request.
42+
* <li>{@code RSocket} can be used for sending and it can also be implemented for receiving. By
43+
* contrast {@code RSocketClient} is used only for sending, typically from the client side
44+
* which allows obtaining and re-obtaining connections from a source as needed. However it can
45+
* also be used from the server side by {@link #from(RSocket) wrapping} the "live" {@code
46+
* RSocket} for a given connection.
47+
* </ul>
48+
*
49+
* <p>The example below shows how to create an {@code RSocketClient}:
50+
*
51+
* <pre class="code">{@code
52+
* Mono<RSocket> source =
53+
* RSocketConnector.create()
54+
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
55+
* .dataMimeType("application/cbor")
56+
* .connect(TcpClientTransport.create("localhost", 7000));
57+
*
58+
* RSocketClient client = RSocketClient.from(source);
59+
* }</pre>
60+
*
61+
* <p>The below configures retry logic to use when a shared {@code RSocket} connection is obtained:
62+
*
63+
* <pre class="code">{@code
64+
* Mono<RSocket> source =
65+
* RSocketConnector.create()
66+
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
67+
* .dataMimeType("application/cbor")
68+
* .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
69+
* .connect(TcpClientTransport.create("localhost", 7000));
70+
*
71+
* RSocketClient client = RSocketClient.from(source);
72+
* }</pre>
73+
*
74+
* @since 1.1
75+
* @see io.rsocket.loadbalance.LoadbalanceRSocketClient
76+
*/
77+
public interface RSocketClient extends Disposable {
78+
79+
/** Return the underlying source used to obtain a shared {@link RSocket} connection. */
80+
Mono<RSocket> source();
81+
82+
/**
83+
* Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows
84+
* multiple subscriptions and performs a request per subscriber.
85+
*/
86+
Mono<Void> fireAndForget(Mono<Payload> payloadMono);
87+
88+
/**
89+
* Perform a Request-Response interaction via {@link RSocket#requestResponse(Payload)}. Allows
90+
* multiple subscriptions and performs a request per subscriber.
91+
*/
92+
Mono<Payload> requestResponse(Mono<Payload> payloadMono);
93+
94+
/**
95+
* Perform a Request-Stream interaction via {@link RSocket#requestStream(Payload)}. Allows
96+
* multiple subscriptions and performs a request per subscriber.
97+
*/
98+
Flux<Payload> requestStream(Mono<Payload> payloadMono);
99+
100+
/**
101+
* Perform a Request-Channel interaction via {@link RSocket#requestChannel(Publisher)}. Allows
102+
* multiple subscriptions and performs a request per subscriber.
103+
*/
104+
Flux<Payload> requestChannel(Publisher<Payload> payloads);
105+
106+
/**
107+
* Perform a Metadata Push via {@link RSocket#metadataPush(Payload)}. Allows multiple
108+
* subscriptions and performs a request per subscriber.
109+
*/
110+
Mono<Void> metadataPush(Mono<Payload> payloadMono);
111+
112+
/**
113+
* Create an {@link RSocketClient} that obtains shared connections as needed, when requests are
114+
* made, from the given {@code Mono<RSocket>} source.
115+
*
116+
* @param source the source for connections, typically prepared via {@link RSocketConnector}.
117+
* @return the created client instance
118+
*/
119+
static RSocketClient from(Mono<RSocket> source) {
120+
return new DefaultRSocketClient(source);
121+
}
122+
123+
/**
124+
* Adapt the given {@link RSocket} to use as {@link RSocketClient}. This is useful to wrap the
125+
* sending {@code RSocket} in a server.
126+
*
127+
* <p><strong>Note:</strong> unlike an {@code RSocketClient} created via {@link
128+
* RSocketClient#from(Mono)}, the instance returned from this factory method can only perform
129+
* requests for as long as the given {@code RSocket} remains "live".
130+
*
131+
* @param rsocket the {@code RSocket} to perform requests with
132+
* @return the created client instance
133+
*/
134+
static RSocketClient from(RSocket rsocket) {
135+
return new RSocketClientAdapter(rsocket);
136+
}
137+
}

rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import io.rsocket.Payload;
1919
import io.rsocket.RSocket;
20-
import io.rsocket.RSocketClient;
2120
import org.reactivestreams.Publisher;
2221
import reactor.core.publisher.Flux;
2322
import reactor.core.publisher.Mono;
@@ -30,7 +29,7 @@
3029
*
3130
* @since 1.1
3231
*/
33-
public class RSocketClientAdapter implements RSocketClient {
32+
class RSocketClientAdapter implements RSocketClient {
3433

3534
private final RSocket rsocket;
3635

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 9 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.rsocket.DuplexConnection;
2626
import io.rsocket.Payload;
2727
import io.rsocket.RSocket;
28-
import io.rsocket.RSocketClient;
2928
import io.rsocket.SocketAcceptor;
3029
import io.rsocket.frame.SetupFrameCodec;
3130
import io.rsocket.frame.decoder.PayloadDecoder;
@@ -120,15 +119,6 @@ public static Mono<RSocket> connectWith(ClientTransport transport) {
120119
return RSocketConnector.create().connect(() -> transport);
121120
}
122121

123-
/**
124-
* @param transport
125-
* @return
126-
* @since 1.0.1
127-
*/
128-
public static RSocketClient createRSocketClient(ClientTransport transport) {
129-
return RSocketConnector.create().toRSocketClient(transport);
130-
}
131-
132122
/**
133123
* Provide a {@code Mono} from which to obtain the {@code Payload} for the initial SETUP frame.
134124
* Data and metadata should be formatted according to the MIME types specified via {@link
@@ -485,37 +475,6 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) {
485475
return this;
486476
}
487477

488-
/**
489-
* Create {@link RSocketClient} that will use {@link #connect(ClientTransport)} as its source to
490-
* obtain a live, shared {@code RSocket} when the first request is made, and also on subsequent
491-
* requests after the connection is lost.
492-
*
493-
* <p>The following transports are available through additional RSocket Java modules:
494-
*
495-
* <ul>
496-
* <li>{@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via
497-
* {@code rsocket-transport-netty}.
498-
* <li>{@link io.rsocket.transport.netty.client.WebsocketClientTransport
499-
* WebsocketClientTransport} via {@code rsocket-transport-netty}.
500-
* <li>{@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code
501-
* rsocket-transport-local}
502-
* </ul>
503-
*
504-
* @param transport the transport of choice to connect with
505-
* @return a {@code RSocketClient} with not established connection. Note, connection will be
506-
* established on the first request
507-
* @since 1.0.1
508-
*/
509-
public RSocketClient toRSocketClient(ClientTransport transport) {
510-
Mono<RSocket> source = connect0(() -> transport);
511-
512-
if (retrySpec != null) {
513-
source = source.retryWhen(retrySpec);
514-
}
515-
516-
return new DefaultRSocketClient(source);
517-
}
518-
519478
/**
520479
* Connect with the given transport and obtain a live {@link RSocket} to use for making requests.
521480
* Each subscriber to the returned {@code Mono} receives a new connection, if neither {@link
@@ -549,19 +508,6 @@ public Mono<RSocket> connect(ClientTransport transport) {
549508
* @return a {@code Mono} with the connected RSocket
550509
*/
551510
public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
552-
return this.connect0(transportSupplier)
553-
.as(
554-
source -> {
555-
if (retrySpec != null) {
556-
return new ReconnectMono<>(
557-
source.retryWhen(retrySpec), Disposable::dispose, INVALIDATE_FUNCTION);
558-
} else {
559-
return source;
560-
}
561-
});
562-
}
563-
564-
private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
565511
return Mono.fromSupplier(transportSupplier)
566512
.flatMap(
567513
ct -> {
@@ -692,6 +638,15 @@ private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
692638
})
693639
.doFinally(signalType -> setup.release());
694640
});
641+
})
642+
.as(
643+
source -> {
644+
if (retrySpec != null) {
645+
return new ReconnectMono<>(
646+
source.retryWhen(retrySpec), Disposable::dispose, INVALIDATE_FUNCTION);
647+
} else {
648+
return source;
649+
}
695650
});
696651
}
697652
}

0 commit comments

Comments
 (0)