Skip to content

Commit 39be15d

Browse files
authored
Refactors the input for LoadbalanceRSocketClient (#924)
Signed-off-by: Rossen Stoyanchev <rstoyanchev@vmware.com>
1 parent 92134e8 commit 39be15d

File tree

7 files changed

+1144
-185
lines changed

7 files changed

+1144
-185
lines changed

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java

Lines changed: 89 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
import io.rsocket.Payload;
1919
import io.rsocket.RSocket;
2020
import io.rsocket.RSocketClient;
21+
import io.rsocket.core.RSocketConnector;
2122
import java.util.List;
2223
import org.reactivestreams.Publisher;
2324
import reactor.core.publisher.Flux;
2425
import reactor.core.publisher.Mono;
26+
import reactor.util.annotation.Nullable;
2527

2628
/**
27-
* {@link RSocketClient} implementation that uses a pool and a {@link LoadbalanceStrategy} to select
28-
* the {@code RSocket} to use for each request.
29+
* {@link RSocketClient} implementation that uses a {@link LoadbalanceStrategy} to select the {@code
30+
* RSocket} to use for a given request from a pool of possible targets.
2931
*
3032
* @since 1.1
3133
*/
@@ -72,45 +74,109 @@ public void dispose() {
7274
rSocketPool.dispose();
7375
}
7476

77+
/**
78+
* Shortcut to create an {@link LoadbalanceRSocketClient} with round robin loadalancing.
79+
* Effectively a shortcut for:
80+
*
81+
* <pre class="cdoe">
82+
* LoadbalanceRSocketClient.builder(targetPublisher)
83+
* .connector(RSocketConnector.create())
84+
* .build();
85+
* </pre>
86+
*
87+
* @param connector the {@link Builder#connector(RSocketConnector) to use
88+
* @param targetPublisher publisher that periodically refreshes the list of targets to loadbalance across.
89+
* @return the created client instance
90+
*/
7591
public static LoadbalanceRSocketClient create(
76-
LoadbalanceStrategy loadbalanceStrategy,
77-
Publisher<List<LoadbalanceRSocketSource>> rSocketSuppliersPublisher) {
78-
return new LoadbalanceRSocketClient(
79-
new RSocketPool(rSocketSuppliersPublisher, loadbalanceStrategy));
92+
RSocketConnector connector, Publisher<List<LoadbalanceTarget>> targetPublisher) {
93+
return builder(targetPublisher).connector(connector).build();
8094
}
8195

82-
public static LoadbalanceRSocketClient create(
83-
Publisher<List<LoadbalanceRSocketSource>> rSocketSuppliersPublisher) {
84-
return create(new RoundRobinLoadbalanceStrategy(), rSocketSuppliersPublisher);
85-
}
86-
87-
public static Builder builder() {
88-
return new Builder();
96+
/**
97+
* Return a builder to create an {@link LoadbalanceRSocketClient} with.
98+
*
99+
* @param targetPublisher publisher that periodically refreshes the list of targets to loadbalance
100+
* across.
101+
* @return the builder instance
102+
*/
103+
public static Builder builder(Publisher<List<LoadbalanceTarget>> targetPublisher) {
104+
return new Builder(targetPublisher);
89105
}
90106

107+
/** Builder for creating an {@link LoadbalanceRSocketClient}. */
91108
public static class Builder {
92109

93-
LoadbalanceStrategy loadbalanceStrategy;
110+
private final Publisher<List<LoadbalanceTarget>> targetPublisher;
111+
112+
@Nullable private RSocketConnector connector;
94113

95-
Builder() {}
114+
@Nullable LoadbalanceStrategy loadbalanceStrategy;
96115

97-
public Builder withWeightedLoadbalanceStrategy() {
98-
return withCustomLoadbalanceStrategy(new WeightedLoadbalanceStrategy());
116+
Builder(Publisher<List<LoadbalanceTarget>> targetPublisher) {
117+
this.targetPublisher = targetPublisher;
99118
}
100119

101-
public Builder withRoundRobinLoadbalanceStrategy() {
102-
return withCustomLoadbalanceStrategy(new RoundRobinLoadbalanceStrategy());
120+
/**
121+
* The given {@link RSocketConnector} is used as a template to produce the {@code Mono<RSocket>}
122+
* source for each {@link LoadbalanceTarget}. This is done by passing the {@code
123+
* ClientTransport} contained in every target to the {@code connect} method of the given
124+
* connector instance.
125+
*
126+
* <p>By default this is initialized with {@link RSocketConnector#create()}.
127+
*
128+
* @param connector the connector to use as a template
129+
*/
130+
public Builder connector(RSocketConnector connector) {
131+
this.connector = connector;
132+
return this;
133+
}
134+
135+
/**
136+
* Switch to using a round-robin strategy for selecting a target.
137+
*
138+
* <p>This is the strategy used by default.
139+
*/
140+
public Builder roundRobinLoadbalanceStrategy() {
141+
this.loadbalanceStrategy = new RoundRobinLoadbalanceStrategy();
142+
return this;
103143
}
104144

105-
public Builder withCustomLoadbalanceStrategy(LoadbalanceStrategy strategy) {
145+
/**
146+
* Switch to using a strategy that assigns a weight to each pooled {@code RSocket} based on
147+
* actual usage stats, and uses that to make a choice.
148+
*
149+
* <p>By default this strategy is not used.
150+
*/
151+
public Builder weightedLoadbalanceStrategy() {
152+
this.loadbalanceStrategy = new WeightedLoadbalanceStrategy();
153+
return this;
154+
}
155+
156+
/**
157+
* Switch to using a custom strategy for loadbalancing.
158+
*
159+
* @see #roundRobinLoadbalanceStrategy()
160+
*/
161+
public Builder customLoadbalanceStrategy(LoadbalanceStrategy strategy) {
106162
this.loadbalanceStrategy = strategy;
107163
return this;
108164
}
109165

110-
public LoadbalanceRSocketClient build(
111-
Publisher<List<LoadbalanceRSocketSource>> rSocketSuppliersPublisher) {
166+
/** Build the {@link LoadbalanceRSocketClient} instance. */
167+
public LoadbalanceRSocketClient build() {
112168
return new LoadbalanceRSocketClient(
113-
new RSocketPool(rSocketSuppliersPublisher, this.loadbalanceStrategy));
169+
new RSocketPool(initConnector(), this.targetPublisher, initLoadbalanceStrategy()));
170+
}
171+
172+
private RSocketConnector initConnector() {
173+
return (this.connector != null ? this.connector : RSocketConnector.create());
174+
}
175+
176+
private LoadbalanceStrategy initLoadbalanceStrategy() {
177+
return (this.loadbalanceStrategy != null
178+
? this.loadbalanceStrategy
179+
: new RoundRobinLoadbalanceStrategy());
114180
}
115181
}
116182
}

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketSource.java

Lines changed: 0 additions & 54 deletions
This file was deleted.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2002-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.loadbalance;
17+
18+
import io.rsocket.transport.ClientTransport;
19+
20+
/**
21+
* Simple container for a key and a {@link ClientTransport}, representing a specific target for
22+
* loadbalancing purposes. The key is used to compare previous and new targets when refreshing the
23+
* list of target to use. The transport is used to connect to the target.
24+
*
25+
* @since 1.1
26+
*/
27+
public class LoadbalanceTarget {
28+
29+
final String key;
30+
final ClientTransport transport;
31+
32+
private LoadbalanceTarget(String key, ClientTransport transport) {
33+
this.key = key;
34+
this.transport = transport;
35+
}
36+
37+
/** Return the key for this target. */
38+
public String getKey() {
39+
return key;
40+
}
41+
42+
/** Return the transport to use to connect to the target. */
43+
public ClientTransport getTransport() {
44+
return transport;
45+
}
46+
47+
/**
48+
* Create a an instance of {@link LoadbalanceTarget} with the given key and {@link
49+
* ClientTransport}. The key can be anything that can be used to identify identical targets, e.g.
50+
* a SocketAddress, URL, etc.
51+
*
52+
* @param key the key to use to identify identical targets
53+
* @param transport the transport to use for connecting to the target
54+
* @return the created instance
55+
*/
56+
public static LoadbalanceTarget from(String key, ClientTransport transport) {
57+
return new LoadbalanceTarget(key, transport);
58+
}
59+
60+
@Override
61+
public boolean equals(Object other) {
62+
if (this == other) {
63+
return true;
64+
}
65+
if (other == null || getClass() != other.getClass()) {
66+
return false;
67+
}
68+
LoadbalanceTarget that = (LoadbalanceTarget) other;
69+
return key.equals(that.key);
70+
}
71+
72+
@Override
73+
public int hashCode() {
74+
return key.hashCode();
75+
}
76+
}

rsocket-core/src/main/java/io/rsocket/loadbalance/PooledWeightedRSocket.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ final class PooledWeightedRSocket extends ResolvingOperator<RSocket>
3333
implements CoreSubscriber<RSocket>, WeightedRSocket {
3434

3535
final RSocketPool parent;
36-
final LoadbalanceRSocketSource loadbalanceRSocketSource;
36+
final Mono<RSocket> rSocketSource;
37+
final LoadbalanceTarget loadbalanceTarget;
3738
final Stats stats;
3839

3940
volatile Subscription s;
@@ -42,10 +43,14 @@ final class PooledWeightedRSocket extends ResolvingOperator<RSocket>
4243
AtomicReferenceFieldUpdater.newUpdater(PooledWeightedRSocket.class, Subscription.class, "s");
4344

4445
PooledWeightedRSocket(
45-
RSocketPool parent, LoadbalanceRSocketSource loadbalanceRSocketSource, Stats stats) {
46+
RSocketPool parent,
47+
Mono<RSocket> rSocketSource,
48+
LoadbalanceTarget loadbalanceTarget,
49+
Stats stats) {
4650
this.parent = parent;
51+
this.rSocketSource = rSocketSource;
52+
this.loadbalanceTarget = loadbalanceTarget;
4753
this.stats = stats;
48-
this.loadbalanceRSocketSource = loadbalanceRSocketSource;
4954
}
5055

5156
@Override
@@ -103,7 +108,7 @@ public void onNext(RSocket value) {
103108

104109
@Override
105110
protected void doSubscribe() {
106-
this.loadbalanceRSocketSource.source().subscribe(this);
111+
this.rSocketSource.subscribe(this);
107112
}
108113

109114
@Override
@@ -196,8 +201,8 @@ public Stats stats() {
196201
return stats;
197202
}
198203

199-
LoadbalanceRSocketSource source() {
200-
return loadbalanceRSocketSource;
204+
LoadbalanceTarget target() {
205+
return loadbalanceTarget;
201206
}
202207

203208
@Override

0 commit comments

Comments
 (0)