Skip to content

Commit 92134e8

Browse files
authored
makes part of the API package-private (#922)
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
1 parent 1c8bdad commit 92134e8

File tree

14 files changed

+73
-70
lines changed

14 files changed

+73
-70
lines changed

rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Ewma.java renamed to rsocket-core/src/main/java/io/rsocket/loadbalance/Ewma.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.rsocket.loadbalance.stat;
17+
package io.rsocket.loadbalance;
1818

1919
import io.rsocket.util.Clock;
2020
import java.util.concurrent.TimeUnit;
@@ -27,7 +27,7 @@
2727
* <p>e.g. with a half-life of 10 unit, if you insert 100 at t=0 and 200 at t=10 the ewma will be
2828
* equal to (200 - 100)/2 = 150 (half of the distance between the new and the old value)
2929
*/
30-
public class Ewma {
30+
class Ewma {
3131
private final long tau;
3232
private volatile long stamp;
3333
private volatile double ewma;

rsocket-core/src/main/java/io/rsocket/loadbalance/stat/FrugalQuantile.java renamed to rsocket-core/src/main/java/io/rsocket/loadbalance/FrugalQuantile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.rsocket.loadbalance.stat;
17+
package io.rsocket.loadbalance;
1818

1919
import java.util.SplittableRandom;
2020

@@ -25,7 +25,7 @@
2525
*
2626
* <p>More info: http://blog.aggregateknowledge.com/2013/09/16/sketch-of-the-day-frugal-streaming/
2727
*/
28-
public class FrugalQuantile implements Quantile {
28+
class FrugalQuantile implements Quantile {
2929
private final double increment;
3030
volatile double estimate;
3131
int step;

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,38 @@ public static LoadbalanceRSocketClient create(
7979
new RSocketPool(rSocketSuppliersPublisher, loadbalanceStrategy));
8080
}
8181

82-
public static RSocketClient create(
82+
public static LoadbalanceRSocketClient create(
8383
Publisher<List<LoadbalanceRSocketSource>> rSocketSuppliersPublisher) {
8484
return create(new RoundRobinLoadbalanceStrategy(), rSocketSuppliersPublisher);
8585
}
86+
87+
public static Builder builder() {
88+
return new Builder();
89+
}
90+
91+
public static class Builder {
92+
93+
LoadbalanceStrategy loadbalanceStrategy;
94+
95+
Builder() {}
96+
97+
public Builder withWeightedLoadbalanceStrategy() {
98+
return withCustomLoadbalanceStrategy(new WeightedLoadbalanceStrategy());
99+
}
100+
101+
public Builder withRoundRobinLoadbalanceStrategy() {
102+
return withCustomLoadbalanceStrategy(new RoundRobinLoadbalanceStrategy());
103+
}
104+
105+
public Builder withCustomLoadbalanceStrategy(LoadbalanceStrategy strategy) {
106+
this.loadbalanceStrategy = strategy;
107+
return this;
108+
}
109+
110+
public LoadbalanceRSocketClient build(
111+
Publisher<List<LoadbalanceRSocketSource>> rSocketSuppliersPublisher) {
112+
return new LoadbalanceRSocketClient(
113+
new RSocketPool(rSocketSuppliersPublisher, this.loadbalanceStrategy));
114+
}
115+
}
86116
}

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceStrategy.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,11 @@
1515
*/
1616
package io.rsocket.loadbalance;
1717

18+
import io.rsocket.RSocket;
1819
import java.util.List;
19-
import java.util.function.Supplier;
2020

2121
@FunctionalInterface
2222
public interface LoadbalanceStrategy {
2323

24-
WeightedRSocket select(List<WeightedRSocket> availableRSockets);
25-
26-
default Supplier<Stats> statsSupplier() {
27-
return Stats::noOps;
28-
}
24+
RSocket select(List<RSocket> availableRSockets);
2925
}

rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Median.java renamed to rsocket-core/src/main/java/io/rsocket/loadbalance/Median.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.rsocket.loadbalance.stat;
17+
package io.rsocket.loadbalance;
1818

1919
/** This implementation gives better results because it considers more data-point. */
20-
public class Median extends FrugalQuantile {
20+
class Median extends FrugalQuantile {
2121
public Median() {
2222
super(0.5, 1.0);
2323
}

rsocket-core/src/main/java/io/rsocket/loadbalance/stat/Quantile.java renamed to rsocket-core/src/main/java/io/rsocket/loadbalance/Quantile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.rsocket.loadbalance.stat;
16+
package io.rsocket.loadbalance;
1717

18-
public interface Quantile {
18+
interface Quantile {
1919
/** @return the estimation of the current value of the quantile */
2020
double estimation();
2121

rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import reactor.util.annotation.Nullable;
3838

3939
class RSocketPool extends ResolvingOperator<Void>
40-
implements CoreSubscriber<List<LoadbalanceRSocketSource>>, List<WeightedRSocket> {
40+
implements CoreSubscriber<List<LoadbalanceRSocketSource>>, List<RSocket> {
4141

4242
final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this);
4343
final LoadbalanceStrategy loadbalanceStrategy;
@@ -59,7 +59,11 @@ class RSocketPool extends ResolvingOperator<Void>
5959
RSocketPool(
6060
Publisher<List<LoadbalanceRSocketSource>> source, LoadbalanceStrategy loadbalanceStrategy) {
6161
this.loadbalanceStrategy = loadbalanceStrategy;
62-
this.statsSupplier = loadbalanceStrategy.statsSupplier();
62+
if (loadbalanceStrategy instanceof WeightedLoadbalanceStrategy) {
63+
this.statsSupplier = Stats::create;
64+
} else {
65+
this.statsSupplier = Stats::noOps;
66+
}
6367

6468
ACTIVE_SOCKETS.lazySet(this, EMPTY);
6569

@@ -361,12 +365,12 @@ public boolean contains(Object o) {
361365
}
362366

363367
@Override
364-
public Iterator<WeightedRSocket> iterator() {
368+
public Iterator<RSocket> iterator() {
365369
throw new UnsupportedOperationException();
366370
}
367371

368372
@Override
369-
public boolean add(WeightedRSocket weightedRSocket) {
373+
public boolean add(RSocket weightedRSocket) {
370374
throw new UnsupportedOperationException();
371375
}
372376

@@ -381,12 +385,12 @@ public boolean containsAll(Collection<?> c) {
381385
}
382386

383387
@Override
384-
public boolean addAll(Collection<? extends WeightedRSocket> c) {
388+
public boolean addAll(Collection<? extends RSocket> c) {
385389
throw new UnsupportedOperationException();
386390
}
387391

388392
@Override
389-
public boolean addAll(int index, Collection<? extends WeightedRSocket> c) {
393+
public boolean addAll(int index, Collection<? extends RSocket> c) {
390394
throw new UnsupportedOperationException();
391395
}
392396

@@ -406,12 +410,12 @@ public void clear() {
406410
}
407411

408412
@Override
409-
public WeightedRSocket set(int index, WeightedRSocket element) {
413+
public WeightedRSocket set(int index, RSocket element) {
410414
throw new UnsupportedOperationException();
411415
}
412416

413417
@Override
414-
public void add(int index, WeightedRSocket element) {
418+
public void add(int index, RSocket element) {
415419
throw new UnsupportedOperationException();
416420
}
417421

@@ -431,17 +435,17 @@ public int lastIndexOf(Object o) {
431435
}
432436

433437
@Override
434-
public ListIterator<WeightedRSocket> listIterator() {
438+
public ListIterator<RSocket> listIterator() {
435439
throw new UnsupportedOperationException();
436440
}
437441

438442
@Override
439-
public ListIterator<WeightedRSocket> listIterator(int index) {
443+
public ListIterator<RSocket> listIterator(int index) {
440444
throw new UnsupportedOperationException();
441445
}
442446

443447
@Override
444-
public List<WeightedRSocket> subList(int fromIndex, int toIndex) {
448+
public List<RSocket> subList(int fromIndex, int toIndex) {
445449
throw new UnsupportedOperationException();
446450
}
447451
}

rsocket-core/src/main/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,19 @@
1515
*/
1616
package io.rsocket.loadbalance;
1717

18+
import io.rsocket.RSocket;
1819
import java.util.List;
1920
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2021

21-
public class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy {
22+
class RoundRobinLoadbalanceStrategy implements LoadbalanceStrategy {
2223

2324
volatile int nextIndex;
2425

2526
static final AtomicIntegerFieldUpdater<RoundRobinLoadbalanceStrategy> NEXT_INDEX =
2627
AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadbalanceStrategy.class, "nextIndex");
2728

2829
@Override
29-
public WeightedRSocket select(List<WeightedRSocket> sockets) {
30+
public RSocket select(List<RSocket> sockets) {
3031
int length = sockets.size();
3132

3233
int indexToUse = Math.abs(NEXT_INDEX.getAndIncrement(this) % length);

rsocket-core/src/main/java/io/rsocket/loadbalance/Stats.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
package io.rsocket.loadbalance;
22

33
import io.rsocket.Availability;
4-
import io.rsocket.loadbalance.stat.Ewma;
5-
import io.rsocket.loadbalance.stat.FrugalQuantile;
6-
import io.rsocket.loadbalance.stat.Median;
7-
import io.rsocket.loadbalance.stat.Quantile;
84
import io.rsocket.util.Clock;
95
import java.util.concurrent.TimeUnit;
106
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
117

12-
public class Stats implements Availability {
8+
class Stats implements Availability {
139

1410
private static final double DEFAULT_LOWER_QUANTILE = 0.5;
1511
private static final double DEFAULT_HIGHER_QUANTILE = 0.8;

rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616

1717
package io.rsocket.loadbalance;
1818

19+
import io.rsocket.RSocket;
1920
import java.util.List;
2021
import java.util.SplittableRandom;
2122
import java.util.concurrent.ThreadLocalRandom;
2223
import java.util.function.Supplier;
2324
import reactor.util.annotation.Nullable;
2425

25-
public class WeightedLoadbalanceStrategy implements LoadbalanceStrategy {
26+
class WeightedLoadbalanceStrategy implements LoadbalanceStrategy {
2627

2728
private static final double EXP_FACTOR = 4.0;
2829

@@ -52,24 +53,19 @@ public WeightedLoadbalanceStrategy(
5253
}
5354

5455
@Override
55-
public Supplier<Stats> statsSupplier() {
56-
return this.statsSupplier;
57-
}
58-
59-
@Override
60-
public WeightedRSocket select(List<WeightedRSocket> sockets) {
56+
public RSocket select(List<RSocket> sockets) {
6157
final int effort = this.effort;
6258
final int size = sockets.size();
6359

6460
WeightedRSocket weightedRSocket;
6561
switch (size) {
6662
case 1:
67-
weightedRSocket = sockets.get(0);
63+
weightedRSocket = (WeightedRSocket) sockets.get(0);
6864
break;
6965
case 2:
7066
{
71-
WeightedRSocket rsc1 = sockets.get(0);
72-
WeightedRSocket rsc2 = sockets.get(1);
67+
WeightedRSocket rsc1 = (WeightedRSocket) sockets.get(0);
68+
WeightedRSocket rsc2 = (WeightedRSocket) sockets.get(1);
7369

7470
double w1 = algorithmicWeight(rsc1);
7571
double w2 = algorithmicWeight(rsc2);
@@ -92,8 +88,8 @@ public WeightedRSocket select(List<WeightedRSocket> sockets) {
9288
if (i2 >= i1) {
9389
i2++;
9490
}
95-
rsc1 = sockets.get(i1);
96-
rsc2 = sockets.get(i2);
91+
rsc1 = (WeightedRSocket) sockets.get(i1);
92+
rsc2 = (WeightedRSocket) sockets.get(i2);
9793
if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) {
9894
break;
9995
}

0 commit comments

Comments
 (0)