Skip to content

Commit a218427

Browse files
qweekrobertroeser
authored andcommitted
fix compiler flow issues (#414)
replace lambdas with method references replace statement lambda with expression lambda redundant local variable Unchecked assignment: 'org.reactivestreams.Subscriber' to 'org.reactivestreams.Subscriber<? super io.rsocket.Payload>' Unchecked assignment: 'org.reactivestreams.Subscriber' to 'org.reactivestreams.Subscriber<? super java.lang.Void>' The declared exception 'Exception' is never thrown
1 parent b9cc917 commit a218427

File tree

2 files changed

+41
-45
lines changed

2 files changed

+41
-45
lines changed

rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void requestStreamHello1000(Input input) {
6969
@Benchmark
7070
public void fireAndForgetHello(Input input) {
7171
// this is synchronous so we don't need to use a CountdownLatch to wait
72-
input.client.fireAndForget(Input.HELLO_PAYLOAD).subscribe(input.blackHoleSubscriber);
72+
input.client.fireAndForget(Input.HELLO_PAYLOAD).subscribe(input.voidSubscriber);
7373
}
7474

7575
@State(Scope.Benchmark)
@@ -155,37 +155,42 @@ public Mono<Void> onClose() {
155155
return Mono.just(closeable);
156156
});
157157

158-
Subscriber blackHoleSubscriber;
158+
Subscriber<Payload> blackHoleSubscriber;
159+
Subscriber<Void> voidSubscriber;
159160

160161
RSocket client;
161162

162163
@Setup
163164
public void setup(Blackhole bh) {
164-
blackHoleSubscriber =
165-
new Subscriber() {
166-
@Override
167-
public void onSubscribe(Subscription s) {
168-
s.request(Long.MAX_VALUE);
169-
}
170-
171-
@Override
172-
public void onNext(Object o) {
173-
bh.consume(o);
174-
}
175-
176-
@Override
177-
public void onError(Throwable t) {
178-
t.printStackTrace();
179-
}
180-
181-
@Override
182-
public void onComplete() {}
183-
};
165+
blackHoleSubscriber = subscriber(bh);
166+
voidSubscriber = subscriber(bh);
184167

185168
client =
186169
RSocketFactory.connect().transport(() -> Mono.just(clientConnection)).start().block();
187170

188171
this.bh = bh;
189172
}
173+
174+
private <T> Subscriber<T> subscriber(Blackhole bh) {
175+
return new Subscriber<T>() {
176+
@Override
177+
public void onSubscribe(Subscription s) {
178+
s.request(Long.MAX_VALUE);
179+
}
180+
181+
@Override
182+
public void onNext(T o) {
183+
bh.consume(o);
184+
}
185+
186+
@Override
187+
public void onError(Throwable t) {
188+
t.printStackTrace();
189+
}
190+
191+
@Override
192+
public void onComplete() {}
193+
};
194+
}
190195
}
191196
}

rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@
1515
import reactor.core.publisher.Mono;
1616

1717
public class TestingStreaming {
18-
Supplier<ServerTransport<? extends Closeable>> serverSupplier =
18+
private Supplier<ServerTransport<? extends Closeable>> serverSupplier =
1919
() -> LocalServerTransport.create("test");
2020

21-
Supplier<ClientTransport> clientSupplier = () -> LocalClientTransport.create("test");
21+
private Supplier<ClientTransport> clientSupplier = () -> LocalClientTransport.create("test");
2222

2323
@Test(expected = ApplicationException.class)
24-
public void testRangeButThrowException() throws Exception {
24+
public void testRangeButThrowException() {
2525
Closeable server = null;
2626
try {
2727
server =
2828
RSocketFactory.receive()
29-
.errorConsumer(t -> t.printStackTrace())
29+
.errorConsumer(Throwable::printStackTrace)
3030
.acceptor(
3131
(connectionSetupPayload, rSocket) -> {
3232
AbstractRSocket abstractRSocket =
@@ -65,12 +65,12 @@ public Flux<Payload> requestStream(Payload payload) {
6565
}
6666

6767
@Test
68-
public void testRangeOfConsumers() throws Exception {
68+
public void testRangeOfConsumers() {
6969
Closeable server = null;
7070
try {
7171
server =
7272
RSocketFactory.receive()
73-
.errorConsumer(t -> t.printStackTrace())
73+
.errorConsumer(Throwable::printStackTrace)
7474
.acceptor(
7575
(connectionSetupPayload, rSocket) -> {
7676
AbstractRSocket abstractRSocket =
@@ -102,30 +102,21 @@ public Flux<Payload> requestStream(Payload payload) {
102102
}
103103
}
104104

105-
public Flux<Payload> consumer(String s) {
106-
Mono<RSocket> test =
107-
RSocketFactory.connect()
108-
.errorConsumer(t -> t.printStackTrace())
109-
.transport(clientSupplier)
110-
.start();
111-
112-
Flux<Payload> payloadFlux =
113-
test.flatMapMany(
105+
private Flux<Payload> consumer(String s) {
106+
return RSocketFactory.connect()
107+
.errorConsumer(Throwable::printStackTrace)
108+
.transport(clientSupplier)
109+
.start()
110+
.flatMapMany(
114111
rSocket -> {
115112
AtomicInteger count = new AtomicInteger();
116113
return Flux.range(1, 100)
117-
.flatMap(
118-
i -> {
119-
return rSocket.requestStream(new PayloadImpl("i -> " + i)).take(100);
120-
},
121-
1);
114+
.flatMap(i -> rSocket.requestStream(new PayloadImpl("i -> " + i)).take(100), 1);
122115
});
123-
124-
return payloadFlux;
125116
}
126117

127118
@Test
128-
public void testSingleConsumer() throws Exception {
119+
public void testSingleConsumer() {
129120
Closeable server = null;
130121

131122
try {

0 commit comments

Comments
 (0)