Skip to content

Commit b1bd5f3

Browse files
steveguryrobertroeser
authored andcommitted
Refactor Factory to Connector + Implement availability (#15)
* Refactor Factory to Connector + Implement availability Problem We need to comply with the recent refactoring "Factory to Connector" introduced in reactivesocket-java. Also, implementation of DuplexConnection have to return an availability. Solution The Factory to Connector refactoring is pretty straight forward. All implementations of DuplexConnection return an availability which directly map the state of the underlying resource (0.0 if the resource is closed, 1.0 otherwise). This will greatly help the load-balancer to select a valid connection. I removed the blocking method from reactivesocket-java, so I moved some utility blocking code inside the TestUtil class. Clean-up of the Netty implementation, remove unused args, explicitey specify tcp socket configuration. Shortened the toString name to ease log-reading. Bug All DuplexConnection implementations now cancel the subscription when an exception occurs. This fix the problem, where the Keep-Alive Observable kept trying to send a keep-alive on a closed connection. * Refactor the TestUtil helper to return CompletableFuture * rs version * Restore rs dependency to latest.release * Remove mavenLocal in gradle build file * Propagate cancellation in all cases * Use helper method from ReactiveSocket Unsafe
1 parent a8baf0c commit b1bd5f3

File tree

18 files changed

+192
-180
lines changed

18 files changed

+192
-180
lines changed

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/client/AeronClientDuplexConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ public void onComplete() {
118118
});
119119
}
120120

121+
@Override
122+
public double availability() {
123+
return publication.isClosed() ? 0.0 : 1.0;
124+
}
125+
121126
@Override
122127
public void close() throws IOException {
123128
onClose.accept(publication);
@@ -136,6 +141,5 @@ public String toString() {
136141
"channel=" + publication.channel() + "," +
137142
"streamId=" + publication.streamId() + "," +
138143
"sessionId=" + publication.sessionId() + "]";
139-
140144
}
141145
}
Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package io.reactivesocket.aeron.client;
1717

18-
import io.reactivesocket.ConnectionSetupPayload;
19-
import io.reactivesocket.DefaultReactiveSocket;
20-
import io.reactivesocket.ReactiveSocket;
21-
import io.reactivesocket.ReactiveSocketFactory;
18+
import io.reactivesocket.*;
2219
import io.reactivesocket.rx.Completable;
2320
import org.agrona.LangUtil;
2421
import org.reactivestreams.Publisher;
@@ -40,17 +37,17 @@
4037
/**
4138
* An implementation of {@link ReactiveSocketFactory} that creates Aeron ReactiveSockets.
4239
*/
43-
public class AeronReactiveSocketFactory implements ReactiveSocketFactory<SocketAddress, ReactiveSocket> {
44-
private static final Logger logger = LoggerFactory.getLogger(AeronReactiveSocketFactory.class);
40+
public class AeronReactiveSocketConnector implements ReactiveSocketConnector<SocketAddress> {
41+
private static final Logger logger = LoggerFactory.getLogger(AeronReactiveSocketConnector.class);
4542

4643
private final ConnectionSetupPayload connectionSetupPayload;
4744
private final Consumer<Throwable> errorStream;
4845

49-
public AeronReactiveSocketFactory(ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
46+
public AeronReactiveSocketConnector(ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
5047
this(getIPv4InetAddress().getHostAddress(), 39790, connectionSetupPayload, errorStream);
5148
}
5249

53-
public AeronReactiveSocketFactory(String host, int port, ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
50+
public AeronReactiveSocketConnector(String host, int port, ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
5451
this.connectionSetupPayload = connectionSetupPayload;
5552
this.errorStream = errorStream;
5653

@@ -65,7 +62,7 @@ public AeronReactiveSocketFactory(String host, int port, ConnectionSetupPayload
6562
}
6663

6764
@Override
68-
public Publisher<ReactiveSocket> call(SocketAddress address) {
65+
public Publisher<ReactiveSocket> connect(SocketAddress address) {
6966
Publisher<AeronClientDuplexConnection> connection
7067
= AeronClientDuplexConnectionFactory.getInstance().createAeronClientDuplexConnection(address);
7168

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/server/AeronServerDuplexConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public void addOutput(Publisher<Frame> o, Completable callback) {
7878
o.subscribe(new ServerSubscription(publication, callback));
7979
}
8080

81+
@Override
82+
public double availability() {
83+
return isClosed ? 0.0 : 1.0;
84+
}
85+
8186
// TODO - this is bad - I need to queue this up somewhere and process this on the polling thread so it doesn't just block everything
8287
void ackEstablishConnection(int ackSessionId) {
8388
debug("Acking establish connection for session id => {}", ackSessionId);

reactivesocket-jsr-356/src/main/java/io/reactivesocket/javax/websocket/WebSocketDuplexConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ public void onSubscribe(Subscription s) {
7979
});
8080
}
8181

82+
@Override
83+
public double availability() {
84+
return session.isOpen() ? 1.0 : 0.0;
85+
}
86+
8287
@Override
8388
public void close() throws IOException {
8489
session.close();
Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package io.reactivesocket.javax.websocket.client;
1717

18-
import io.reactivesocket.ConnectionSetupPayload;
19-
import io.reactivesocket.DefaultReactiveSocket;
20-
import io.reactivesocket.ReactiveSocket;
21-
import io.reactivesocket.ReactiveSocketFactory;
18+
import io.reactivesocket.*;
2219
import io.reactivesocket.javax.websocket.WebSocketDuplexConnection;
2320
import io.reactivesocket.rx.Completable;
2421
import org.glassfish.tyrus.client.ClientManager;
@@ -36,23 +33,23 @@
3633
/**
3734
* An implementation of {@link ReactiveSocketFactory} that creates JSR-356 WebSocket ReactiveSockets.
3835
*/
39-
public class WebSocketReactiveSocketFactory implements ReactiveSocketFactory<SocketAddress, ReactiveSocket> {
40-
private static final Logger logger = LoggerFactory.getLogger(WebSocketReactiveSocketFactory.class);
36+
public class WebSocketReactiveSocketConnector implements ReactiveSocketConnector<SocketAddress> {
37+
private static final Logger logger = LoggerFactory.getLogger(WebSocketReactiveSocketConnector.class);
4138

4239
private final ConnectionSetupPayload connectionSetupPayload;
4340
private final Consumer<Throwable> errorStream;
4441
private final String path;
4542
private final ClientManager clientManager;
4643

47-
public WebSocketReactiveSocketFactory(String path, ClientManager clientManager, ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
44+
public WebSocketReactiveSocketConnector(String path, ClientManager clientManager, ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
4845
this.connectionSetupPayload = connectionSetupPayload;
4946
this.errorStream = errorStream;
5047
this.path = path;
5148
this.clientManager = clientManager;
5249
}
5350

5451
@Override
55-
public Publisher<ReactiveSocket> call(SocketAddress address) {
52+
public Publisher<ReactiveSocket> connect(SocketAddress address) {
5653
Publisher<WebSocketDuplexConnection> connection
5754
= ReactiveSocketWebSocketClient.create(address, path, clientManager);
5855

reactivesocket-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public void onComplete() {
8080
});
8181
}
8282

83+
@Override
84+
public double availability() {
85+
return 1.0;
86+
}
87+
8388
void write(Frame frame) {
8489
subjects
8590
.forEach(o -> o.onNext(frame));
Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,17 @@
1515
*/
1616
package io.reactivesocket.local;
1717

18-
import io.reactivesocket.ConnectionSetupPayload;
19-
import io.reactivesocket.DefaultReactiveSocket;
20-
import io.reactivesocket.ReactiveSocket;
21-
import io.reactivesocket.ReactiveSocketFactory;
18+
import io.reactivesocket.*;
2219
import io.reactivesocket.internal.rx.EmptySubscription;
2320
import org.reactivestreams.Publisher;
2421

25-
public class LocalClientReactiveSocketFactory implements ReactiveSocketFactory<LocalClientReactiveSocketFactory.Config, ReactiveSocket> {
26-
public static final LocalClientReactiveSocketFactory INSTANCE = new LocalClientReactiveSocketFactory();
22+
public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector<LocalClientReactiveSocketConnector.Config> {
23+
public static final LocalClientReactiveSocketConnector INSTANCE = new LocalClientReactiveSocketConnector();
2724

28-
private LocalClientReactiveSocketFactory() {}
25+
private LocalClientReactiveSocketConnector() {}
2926

3027
@Override
31-
public Publisher<ReactiveSocket> call(Config config) {
28+
public Publisher<ReactiveSocket> connect(Config config) {
3229
return s -> {
3330
try {
3431
s.onSubscribe(EmptySubscription.INSTANCE);

reactivesocket-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ public void onComplete() {
7979
});
8080
}
8181

82+
@Override
83+
public double availability() {
84+
return 1.0;
85+
}
86+
8287
void write(Frame frame) {
8388
subjects
8489
.forEach(o -> o.onNext(frame));
Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,17 @@
1515
*/
1616
package io.reactivesocket.local;
1717

18-
import io.reactivesocket.ConnectionSetupHandler;
19-
import io.reactivesocket.DefaultReactiveSocket;
20-
import io.reactivesocket.ReactiveSocket;
21-
import io.reactivesocket.ReactiveSocketFactory;
18+
import io.reactivesocket.*;
2219
import io.reactivesocket.internal.rx.EmptySubscription;
2320
import org.reactivestreams.Publisher;
2421

25-
public class LocalServerReactiveSocketFactory implements ReactiveSocketFactory<LocalServerReactiveSocketFactory.Config, ReactiveSocket> {
26-
public static final LocalServerReactiveSocketFactory INSTANCE = new LocalServerReactiveSocketFactory();
22+
public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector<LocalServerReactiveSocketConnector.Config> {
23+
public static final LocalServerReactiveSocketConnector INSTANCE = new LocalServerReactiveSocketConnector();
2724

28-
private LocalServerReactiveSocketFactory() {}
25+
private LocalServerReactiveSocketConnector() {}
2926

3027
@Override
31-
public Publisher<ReactiveSocket> call(Config config) {
28+
public Publisher<ReactiveSocket> connect(Config config) {
3229
return s -> {
3330
try {
3431
s.onSubscribe(EmptySubscription.INSTANCE);

reactivesocket-local/src/test/java/io/reactivesocket/local/ClientServerTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
import java.util.concurrent.TimeUnit;
3434

35+
import static io.reactivesocket.util.Unsafe.toSingleFuture;
36+
3537
public class ClientServerTest {
3638

3739
static ReactiveSocket client;
@@ -40,14 +42,13 @@ public class ClientServerTest {
4042

4143
@BeforeClass
4244
public static void setup() throws Exception {
43-
server = LocalServerReactiveSocketFactory.INSTANCE.callAndWait(new LocalServerReactiveSocketFactory.Config("test", new ConnectionSetupHandler() {
45+
LocalServerReactiveSocketConnector.Config serverConfig = new LocalServerReactiveSocketConnector.Config("test", new ConnectionSetupHandler() {
4446
@Override
4547
public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket rs) throws SetupException {
4648
return new RequestHandler() {
4749
@Override
4850
public Publisher<Payload> handleRequestResponse(Payload payload) {
4951
return s -> {
50-
//System.out.println("Handling request/response payload => " + s.toString());
5152
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
5253
s.onNext(response);
5354
s.onComplete();
@@ -91,10 +92,12 @@ public Publisher<Void> handleMetadataPush(Payload payload) {
9192
}
9293
};
9394
}
94-
}));
95+
});
9596

96-
client = LocalClientReactiveSocketFactory.INSTANCE.callAndWait(new LocalClientReactiveSocketFactory.Config("test", "text", "text"));
97+
server = toSingleFuture(LocalServerReactiveSocketConnector.INSTANCE.connect(serverConfig)).get(5, TimeUnit.SECONDS);
9798

99+
LocalClientReactiveSocketConnector.Config clientConfig = new LocalClientReactiveSocketConnector.Config("test", "text", "text");
100+
client = toSingleFuture(LocalClientReactiveSocketConnector.INSTANCE.connect(clientConfig)).get(5, TimeUnit.SECONDS);;
98101
}
99102

100103
@Test

0 commit comments

Comments
 (0)