Skip to content

Commit 9f6fbcc

Browse files
nebhalerobertroeser
authored andcommitted
Netty Transport Improvements (#507)
This change makes general improvements to the Netty Transport. It updates the code to make it better documented and tested. In addition it introduces the new TransportTest, a JUnit 5 way of testing uniformly across all of the transport implementations.
1 parent d01121f commit 9f6fbcc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1514
-334
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
plugins {
1818
id 'com.gradle.build-scan' version '1.12.1'
1919

20-
id 'com.github.johnrengelman.shadow' version '2.0.2' apply false
2120
id 'com.github.sherter.google-java-format' version '0.6'
2221
id 'com.jfrog.artifactory' version '4.7.0'
2322
id 'com.jfrog.bintray' version '1.8.0'

rsocket-core/build.gradle

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
plugins {
1818
id 'java-library'
1919
id 'maven-publish'
20-
id 'com.github.johnrengelman.shadow'
2120
id 'com.jfrog.artifactory'
2221
id 'com.jfrog.bintray'
2322
id 'io.morethan.jmhreport'
@@ -29,10 +28,10 @@ dependencies {
2928
api 'io.projectreactor:reactor-core'
3029

3130
implementation 'io.projectreactor.addons:reactor-extra'
31+
implementation 'org.jctools:jctools-core'
3232
implementation 'org.slf4j:slf4j-api'
3333

3434
compileOnly 'com.google.code.findbugs:jsr305'
35-
compileOnly 'org.jctools:jctools-core'
3635

3736
testImplementation 'io.projectreactor:reactor-test'
3837
testImplementation 'org.assertj:assertj-core'
@@ -49,28 +48,6 @@ dependencies {
4948
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
5049
}
5150

52-
jar {
53-
enabled = false
54-
dependsOn(shadowJar { classifier = null })
55-
}
56-
57-
shadowJar {
58-
configurations = [project.configurations.compileOnly]
59-
60-
dependencies {
61-
include(dependency('org.jctools:jctools-core'))
62-
}
63-
64-
include '*.jar'
65-
include 'io/rsocket/**'
66-
include 'org/jctools/maps/ConcurrentAutoTable*.class'
67-
include 'org/jctools/maps/NonBlockingHashMapLong*.class'
68-
include 'org/jctools/util/RangeUtil*.class'
69-
include 'org/jctools/util/UnsafeAccess*.class'
70-
71-
relocate 'org.jctools', 'io.rsocket.shadowed.org.jctools'
72-
}
73-
7451
description = "Core functionality for the RSocket library"
7552

7653
apply from: 'jmh.gradle'

rsocket-core/src/jmh/java/io/rsocket/fragmentation/FragmentationPerformanceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void largeFragmentation(Input input) {
5757

5858
@Benchmark
5959
public void largeReassembly(Input input) {
60-
input.largeFrames.forEach(frame -> input.reassembler.reassemble(frame, input.sink));
60+
input.largeFrames.forEach(frame -> input.reassembler.reassemble(frame));
6161

6262
input.bh.consume(input.sink.next);
6363
}
@@ -72,7 +72,7 @@ public void smallFragmentation(Input input) {
7272

7373
@Benchmark
7474
public void smallReassembly(Input input) {
75-
input.smallFrames.forEach(frame -> input.reassembler.reassemble(frame, input.sink));
75+
input.smallFrames.forEach(frame -> input.reassembler.reassemble(frame));
7676

7777
input.bh.consume(input.sink.next);
7878
}

rsocket-core/src/main/java/io/rsocket/transport/ServerTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public interface ServerTransport<T extends Closeable> extends Transport {
3030
*
3131
* @param acceptor An acceptor to process a newly accepted {@code DuplexConnection}
3232
* @return A handle to retrieve information about a started server.
33+
* @throws NullPointerException if {@code acceptor} is {@code null}
3334
*/
3435
Mono<T> start(ConnectionAcceptor acceptor);
3536

rsocket-core/src/main/java/io/rsocket/transport/TransportHeaderAware.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,12 @@
2424
* Http2.
2525
*/
2626
public interface TransportHeaderAware {
27+
28+
/**
29+
* Sets the transport headers
30+
*
31+
* @param transportHeaders the transport headers
32+
* @throws NullPointerException if {@code transportHeaders} is {@code null}
33+
*/
2734
void setTransportHeaders(Supplier<Map<String, String>> transportHeaders);
2835
}

rsocket-test/build.gradle

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@
1616

1717
plugins {
1818
id 'java-library'
19+
id 'maven-publish'
20+
id 'com.jfrog.artifactory'
21+
id 'com.jfrog.bintray'
1922
}
2023

2124
dependencies {
25+
api project(':rsocket-core')
2226
api 'org.hdrhistogram:HdrHistogram'
27+
api 'org.junit.jupiter:junit-jupiter-api'
28+
29+
compileOnly 'com.google.code.findbugs:jsr305'
2330

24-
implementation project(':rsocket-core')
25-
implementation 'com.google.code.findbugs:jsr305'
2631
implementation 'io.projectreactor:reactor-test'
2732
implementation 'org.assertj:assertj-core'
28-
implementation 'org.junit.jupiter:junit-jupiter-api'
2933
implementation 'org.mockito:mockito-core'
3034

3135
// TODO: Remove after JUnit5 migration

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import io.rsocket.transport.ServerTransport;
2525
import io.rsocket.util.DefaultPayload;
2626
import java.time.Duration;
27+
import java.util.function.BiFunction;
28+
import java.util.function.Function;
29+
import java.util.function.Supplier;
2730
import org.junit.jupiter.api.AfterEach;
2831
import org.junit.jupiter.api.DisplayName;
2932
import org.junit.jupiter.api.Test;
@@ -258,21 +261,32 @@ default void requestStreamDelayedRequestN() {
258261
.verify(getTimeout());
259262
}
260263

261-
final class TransportPair implements Disposable {
264+
final class TransportPair<T, S extends Closeable> implements Disposable {
262265

263266
private final RSocket client;
264267

265-
private final Closeable server;
268+
private final S server;
266269

267-
public TransportPair(ServerTransport<?> serverTransport, ClientTransport clientTransport) {
268-
this.server =
270+
public TransportPair(
271+
Supplier<T> addressSupplier,
272+
BiFunction<T, S, ClientTransport> clientTransportSupplier,
273+
Function<T, ServerTransport<S>> serverTransportSupplier) {
274+
275+
T address = addressSupplier.get();
276+
277+
server =
269278
RSocketFactory.receive()
270279
.acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket()))
271-
.transport(serverTransport)
280+
.transport(serverTransportSupplier.apply(address))
272281
.start()
273282
.block();
274283

275-
this.client = RSocketFactory.connect().transport(clientTransport).start().block();
284+
client =
285+
RSocketFactory.connect()
286+
.transport(clientTransportSupplier.apply(address, server))
287+
.start()
288+
.doOnError(Throwable::printStackTrace)
289+
.block();
276290
}
277291

278292
@Override

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalClientTransportTest.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,7 @@ void connect() {
3333

3434
serverTransport
3535
.start(duplexConnection -> Mono.empty())
36-
.as(StepVerifier::create)
37-
.expectNextCount(1)
38-
.verifyComplete();
39-
40-
LocalClientTransport.create(serverTransport.getName())
41-
.connect()
36+
.flatMap(closeable -> LocalClientTransport.create(serverTransport.getName()).connect())
4237
.as(StepVerifier::create)
4338
.expectNextCount(1)
4439
.verifyComplete();

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818

1919
import io.rsocket.test.TransportTest;
2020
import java.time.Duration;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
final class LocalTransportTest implements TransportTest {
2324

24-
private final LocalServerTransport serverTransport = LocalServerTransport.createEphemeral();
25+
private static final AtomicInteger UNIQUE_NAME_GENERATOR = new AtomicInteger();
2526

26-
private final LocalClientTransport clientTransport = serverTransport.clientTransport();
27-
28-
private final TransportPair transportPair = new TransportPair(serverTransport, clientTransport);
27+
private final TransportPair transportPair =
28+
new TransportPair<>(
29+
() -> "test" + UNIQUE_NAME_GENERATOR.incrementAndGet(),
30+
(address, server) -> LocalClientTransport.create(address),
31+
LocalServerTransport::create);
2932

3033
@Override
3134
public Duration getTimeout() {

rsocket-transport-netty/build.gradle

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@ dependencies {
2525
api project(':rsocket-core')
2626
api 'io.projectreactor.ipc:reactor-netty'
2727

28+
compileOnly 'com.google.code.findbugs:jsr305'
29+
2830
testImplementation project(':rsocket-test')
31+
testImplementation 'io.projectreactor:reactor-test'
32+
testImplementation 'org.assertj:assertj-core'
2933
testImplementation 'org.junit.jupiter:junit-jupiter-api'
3034

31-
// TODO: Remove after JUnit5 migration
32-
testCompileOnly 'junit:junit'
33-
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
35+
testRuntimeOnly 'ch.qos.logback:logback-classic'
36+
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
3437
}
3538

3639
description = 'Reactor Netty RSocket transport implementations (TCP, Websocket)'

0 commit comments

Comments
 (0)