Skip to content

Commit 54e1318

Browse files
steveguryNiteshKant
authored andcommitted
Migrate reactivesocket-load-balancer into reactivesocket-java (#93)
**Problem** The multi-repo structure is difficult to use in a context of rapid short iterations. **Solution** Migrate the reactivesocket-load-balancer repo inside reactivesocket-java. The submodules have been renamed like this: - reactivesocket-load-balancer-builder -> reactivesocket-client - reactivesocket-load-balancer-core -> reactivesocket-client - reactivesocket-load-balancer-eureka -> reactivesocket-discovery-eureka - reactivesocket-load-balancer-servo -> reactivesocket-stats-servo I also added a simple module `reactivesocket-examples`, which only contains one file so far. The goal of this module is to demonstrate simple examples.
1 parent 973640c commit 54e1318

38 files changed

+4189
-0
lines changed

reactivesocket-client/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
dependencies {
2+
compile project(':reactivesocket-core')
3+
testCompile project(':reactivesocket-test')
4+
}
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.client;
17+
18+
import io.reactivesocket.ReactiveSocket;
19+
import io.reactivesocket.ReactiveSocketConnector;
20+
import io.reactivesocket.ReactiveSocketFactory;
21+
import io.reactivesocket.client.filter.*;
22+
import org.reactivestreams.Publisher;
23+
import org.reactivestreams.Subscriber;
24+
import org.reactivestreams.Subscription;
25+
26+
import java.net.SocketAddress;
27+
import java.util.*;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.ThreadFactory;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.function.Function;
34+
import java.util.stream.Collectors;
35+
36+
public class Builder {
37+
private static AtomicInteger counter = new AtomicInteger(0);
38+
private final String name;
39+
40+
private final ScheduledExecutorService executor;
41+
42+
private final long requestTimeout;
43+
private final TimeUnit requestTimeoutUnit;
44+
45+
private final long connectTimeout;
46+
private final TimeUnit connectTimeoutUnit;
47+
48+
private final double backupQuantile;
49+
50+
private final int retries;
51+
52+
private final ReactiveSocketConnector<SocketAddress> connector;
53+
private final Function<Throwable, Boolean> retryThisException;
54+
55+
private final Publisher<List<SocketAddress>> source;
56+
57+
private Builder(
58+
String name,
59+
ScheduledExecutorService executor,
60+
long requestTimeout, TimeUnit requestTimeoutUnit,
61+
long connectTimeout, TimeUnit connectTimeoutUnit,
62+
double backupQuantile,
63+
int retries, Function<Throwable, Boolean> retryThisException,
64+
ReactiveSocketConnector<SocketAddress> connector,
65+
Publisher<List<SocketAddress>> source
66+
) {
67+
this.name = name;
68+
this.executor = executor;
69+
this.requestTimeout = requestTimeout;
70+
this.requestTimeoutUnit = requestTimeoutUnit;
71+
this.connectTimeout = connectTimeout;
72+
this.connectTimeoutUnit = connectTimeoutUnit;
73+
this.backupQuantile = backupQuantile;
74+
this.retries = retries;
75+
this.connector = connector;
76+
this.retryThisException = retryThisException;
77+
this.source = source;
78+
}
79+
80+
public Builder withRequestTimeout(long timeout, TimeUnit unit) {
81+
return new Builder(
82+
name,
83+
executor,
84+
timeout, unit,
85+
connectTimeout, connectTimeoutUnit,
86+
backupQuantile,
87+
retries, retryThisException,
88+
connector,
89+
source
90+
);
91+
}
92+
93+
public Builder withConnectTimeout(long timeout, TimeUnit unit) {
94+
return new Builder(
95+
name,
96+
executor,
97+
requestTimeout, requestTimeoutUnit,
98+
timeout, unit,
99+
backupQuantile,
100+
retries, retryThisException,
101+
connector,
102+
source
103+
);
104+
}
105+
106+
public Builder withBackupRequest(double quantile) {
107+
return new Builder(
108+
name,
109+
executor,
110+
requestTimeout, requestTimeoutUnit,
111+
connectTimeout, connectTimeoutUnit,
112+
quantile,
113+
retries, retryThisException,
114+
connector,
115+
source
116+
);
117+
}
118+
119+
public Builder withExecutor(ScheduledExecutorService executor) {
120+
return new Builder(
121+
name,
122+
executor,
123+
requestTimeout, requestTimeoutUnit,
124+
connectTimeout, connectTimeoutUnit,
125+
backupQuantile,
126+
retries, retryThisException,
127+
connector,
128+
source
129+
);
130+
}
131+
132+
public Builder withConnector(ReactiveSocketConnector<SocketAddress> connector) {
133+
return new Builder(
134+
name,
135+
executor,
136+
requestTimeout, requestTimeoutUnit,
137+
connectTimeout, connectTimeoutUnit,
138+
backupQuantile,
139+
retries, retryThisException,
140+
connector,
141+
source
142+
);
143+
}
144+
145+
public Builder withSource(Publisher<List<SocketAddress>> source) {
146+
return new Builder(
147+
name,
148+
executor,
149+
requestTimeout, requestTimeoutUnit,
150+
connectTimeout, connectTimeoutUnit,
151+
backupQuantile,
152+
retries, retryThisException,
153+
connector,
154+
source
155+
);
156+
}
157+
158+
public Builder withRetries(int nbOfRetries, Function<Throwable, Boolean> retryThisException) {
159+
return new Builder(
160+
name,
161+
executor,
162+
requestTimeout, requestTimeoutUnit,
163+
connectTimeout, connectTimeoutUnit,
164+
backupQuantile,
165+
nbOfRetries, retryThisException,
166+
connector,
167+
source
168+
);
169+
}
170+
171+
public ReactiveSocket build() {
172+
if (source == null) {
173+
throw new IllegalStateException("Please configure the source!");
174+
}
175+
if (connector == null) {
176+
throw new IllegalStateException("Please configure the connector!");
177+
}
178+
179+
ReactiveSocketConnector<SocketAddress> filterConnector = connector
180+
.chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor))
181+
.chain(DrainingSocket::new);
182+
183+
Publisher<List<ReactiveSocketFactory<SocketAddress>>> factories =
184+
sourceToFactory(source, filterConnector);
185+
186+
ReactiveSocket socket = new LoadBalancer(factories);
187+
if (0.0 < backupQuantile && backupQuantile < 1.0) {
188+
socket = new BackupRequestSocket(socket, backupQuantile, executor);
189+
}
190+
if (retries > 0) {
191+
socket = new RetrySocket(socket, retries, t -> true);
192+
}
193+
return socket;
194+
}
195+
196+
private Publisher<List<ReactiveSocketFactory<SocketAddress>>> sourceToFactory(
197+
Publisher<List<SocketAddress>> source,
198+
ReactiveSocketConnector<SocketAddress> connector
199+
) {
200+
return subscriber ->
201+
source.subscribe(new Subscriber<List<SocketAddress>>() {
202+
private Map<SocketAddress, ReactiveSocketFactory<SocketAddress>> current;
203+
204+
@Override
205+
public void onSubscribe(Subscription s) {
206+
subscriber.onSubscribe(s);
207+
current = new HashMap<>();
208+
}
209+
210+
@Override
211+
public void onNext(List<SocketAddress> socketAddresses) {
212+
socketAddresses.stream()
213+
.filter(sa -> !current.containsKey(sa))
214+
.map(connector::toFactory)
215+
.map(factory -> new TimeoutFactory<>(factory, connectTimeout, connectTimeoutUnit, executor))
216+
.map(FailureAwareFactory::new)
217+
.forEach(factory -> current.put(factory.remote(), factory));
218+
219+
Set<SocketAddress> addresses = new HashSet<>(socketAddresses);
220+
Iterator<Map.Entry<SocketAddress, ReactiveSocketFactory<SocketAddress>>> it =
221+
current.entrySet().iterator();
222+
while (it.hasNext()) {
223+
SocketAddress sa = it.next().getKey();
224+
if (! addresses.contains(sa)) {
225+
it.remove();
226+
}
227+
}
228+
229+
List<ReactiveSocketFactory<SocketAddress>> factories =
230+
current.values().stream().collect(Collectors.toList());
231+
subscriber.onNext(factories);
232+
}
233+
234+
@Override
235+
public void onError(Throwable t) { subscriber.onError(t); }
236+
237+
@Override
238+
public void onComplete() { subscriber.onComplete(); }
239+
});
240+
}
241+
242+
public static Builder instance() {
243+
return new Builder(
244+
"rs-loadbalancer-" + counter.incrementAndGet(),
245+
Executors.newScheduledThreadPool(4, new ThreadFactory() {
246+
@Override
247+
public Thread newThread(Runnable r) {
248+
Thread thread = new Thread(r);
249+
thread.setName("reactivesocket-scheduler-thread");
250+
thread.setDaemon(true);
251+
return thread;
252+
}
253+
}),
254+
1, TimeUnit.SECONDS,
255+
10, TimeUnit.SECONDS,
256+
0.99,
257+
3, t -> true,
258+
null,
259+
null
260+
);
261+
}
262+
}
263+

0 commit comments

Comments
 (0)