Skip to content

Commit 6b005ca

Browse files
committed
Merge branch 'release/1.0.0-RC2'
2 parents 32fd4d3 + cce337a commit 6b005ca

File tree

23 files changed

+307
-76
lines changed

23 files changed

+307
-76
lines changed

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
---
1717
language: java
1818

19+
dist: trusty
20+
1921
matrix:
2022
include:
2123
- jdk: oraclejdk8

build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ subprojects {
2929
apply plugin: 'io.spring.dependency-management'
3030
apply plugin: 'com.github.sherter.google-java-format'
3131

32-
ext['reactor-bom.version'] = 'Californium-SR8'
32+
ext['reactor-bom.version'] = 'Dysprosium-M3'
3333
ext['logback.version'] = '1.2.3'
3434
ext['findbugs.version'] = '3.0.2'
35-
ext['netty.version'] = '4.1.36.Final'
35+
ext['netty.version'] = '4.1.37.Final'
3636
ext['netty-boringssl.version'] = '2.0.25.Final'
3737
ext['hdrhistogram.version'] = '2.1.10'
3838
ext['mockito.version'] = '2.25.1'
@@ -89,6 +89,7 @@ subprojects {
8989

9090
repositories {
9191
mavenCentral()
92+
maven { url 'http://repo.spring.io/milestone' } // temporary for Reactor Dysprosium
9293

9394
if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) {
9495
maven { url 'http://repo.spring.io/libs-snapshot' }

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.0.0-RC1
14+
version=1.0.0-RC2
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.rsocket;
2+
3+
import io.netty.util.collection.IntObjectMap;
4+
import io.rsocket.internal.SynchronizedIntObjectHashMap;
5+
import org.openjdk.jmh.annotations.*;
6+
import org.openjdk.jmh.infra.Blackhole;
7+
8+
@BenchmarkMode(Mode.Throughput)
9+
@Fork(
10+
value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
11+
)
12+
@Warmup(iterations = 10)
13+
@Measurement(iterations = 10)
14+
@State(Scope.Thread)
15+
public class StreamIdSupplierPerf {
16+
@Benchmark
17+
public void benchmarkStreamId(Input input) {
18+
int i = input.supplier.nextStreamId(input.map);
19+
input.bh.consume(i);
20+
}
21+
22+
@State(Scope.Benchmark)
23+
public static class Input {
24+
Blackhole bh;
25+
IntObjectMap map;
26+
StreamIdSupplier supplier;
27+
28+
@Setup
29+
public void setup(Blackhole bh) {
30+
this.supplier = StreamIdSupplier.clientSupplier();
31+
this.bh = bh;
32+
this.map = new SynchronizedIntObjectHashMap();
33+
}
34+
}
35+
}

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.util.AbstractReferenceCounted;
2121
import io.rsocket.frame.FrameHeaderFlyweight;
2222
import io.rsocket.frame.SetupFrameFlyweight;
23+
import javax.annotation.Nullable;
2324

2425
/**
2526
* Exposed to server for determination of ResponderRSocket based on mime types and SETUP
@@ -43,6 +44,11 @@ public static ConnectionSetupPayload create(final ByteBuf setupFrame) {
4344

4445
public abstract boolean willClientHonorLease();
4546

47+
public abstract boolean isResumeEnabled();
48+
49+
@Nullable
50+
public abstract ByteBuf resumeToken();
51+
4652
@Override
4753
public ConnectionSetupPayload retain() {
4854
super.retain();
@@ -101,6 +107,16 @@ public boolean willClientHonorLease() {
101107
return SetupFrameFlyweight.honorLease(setupFrame);
102108
}
103109

110+
@Override
111+
public boolean isResumeEnabled() {
112+
return SetupFrameFlyweight.resumeEnabled(setupFrame);
113+
}
114+
115+
@Override
116+
public ByteBuf resumeToken() {
117+
return SetupFrameFlyweight.resumeToken(setupFrame);
118+
}
119+
104120
@Override
105121
public ConnectionSetupPayload touch() {
106122
setupFrame.touch();

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,7 @@ public Mono<RSocket> start() {
355355
resumeToken,
356356
metadataMimeType,
357357
dataMimeType,
358-
setupPayload.sliceMetadata(),
359-
setupPayload.sliceData());
358+
setupPayload);
360359

361360
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
362361

rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ private Mono<Void> handleFireAndForget(Payload payload) {
205205
return Mono.error(err);
206206
}
207207

208-
final int streamId = streamIdSupplier.nextStreamId();
208+
final int streamId = streamIdSupplier.nextStreamId(receivers);
209209

210210
return emptyUnicastMono()
211211
.doOnSubscribe(
@@ -233,7 +233,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
233233
return Mono.error(err);
234234
}
235235

236-
int streamId = streamIdSupplier.nextStreamId();
236+
int streamId = streamIdSupplier.nextStreamId(receivers);
237237
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
238238

239239
UnicastMonoProcessor<Payload> receiver = UnicastMonoProcessor.create();
@@ -274,7 +274,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
274274
return Flux.error(err);
275275
}
276276

277-
int streamId = streamIdSupplier.nextStreamId();
277+
int streamId = streamIdSupplier.nextStreamId(receivers);
278278

279279
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
280280
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
@@ -328,7 +328,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
328328

329329
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
330330
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
331-
final int streamId = streamIdSupplier.nextStreamId();
331+
final int streamId = streamIdSupplier.nextStreamId(receivers);
332332

333333
return receiver
334334
.doOnRequest(

rsocket-core/src/main/java/io/rsocket/StreamIdSupplier.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
1716
package io.rsocket;
1817

19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
18+
import io.netty.util.collection.IntObjectMap;
19+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2020

2121
final class StreamIdSupplier {
22+
private static final int MASK = 0x7FFFFFFF;
2223

23-
private static final AtomicIntegerFieldUpdater<StreamIdSupplier> STREAM_ID =
24-
AtomicIntegerFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
25-
private volatile int streamId;
24+
private static final AtomicLongFieldUpdater<StreamIdSupplier> STREAM_ID =
25+
AtomicLongFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
26+
private volatile long streamId;
2627

27-
private StreamIdSupplier(int streamId) {
28+
// Visible for testing
29+
StreamIdSupplier(int streamId) {
2830
this.streamId = streamId;
2931
}
3032

@@ -36,8 +38,12 @@ static StreamIdSupplier serverSupplier() {
3638
return new StreamIdSupplier(0);
3739
}
3840

39-
int nextStreamId() {
40-
return STREAM_ID.addAndGet(this, 2);
41+
int nextStreamId(IntObjectMap<?> streamIds) {
42+
int streamId;
43+
do {
44+
streamId = (int) STREAM_ID.addAndGet(this, 2) & MASK;
45+
} while (streamId == 0 || streamIds.containsKey(streamId));
46+
return streamId;
4147
}
4248

4349
boolean isBeforeOrCurrent(int streamId) {

rsocket-core/src/main/java/io/rsocket/buffer/Tuple3ByteBuf.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ public ByteBuffer[] _nioBuffers(int index, int length) {
144144
new ByteBuffer[oneBuffer.length + twoBuffer.length + threeBuffer.length];
145145
System.arraycopy(oneBuffer, 0, results, 0, oneBuffer.length);
146146
System.arraycopy(twoBuffer, 0, results, oneBuffer.length, twoBuffer.length);
147-
System.arraycopy(threeBuffer, 0, results, twoBuffer.length, threeBuffer.length);
147+
System.arraycopy(
148+
threeBuffer, 0, results, oneBuffer.length + twoBuffer.length, threeBuffer.length);
148149
return results;
149150
} else {
150151
ByteBuffer[] results = new ByteBuffer[oneBuffer.length + twoBuffer.length];
@@ -167,7 +168,7 @@ public ByteBuffer[] _nioBuffers(int index, int length) {
167168
threeBuffer = three.nioBuffers(threeReadIndex, length);
168169
ByteBuffer[] results = new ByteBuffer[twoBuffer.length + threeBuffer.length];
169170
System.arraycopy(twoBuffer, 0, results, 0, twoBuffer.length);
170-
System.arraycopy(threeBuffer, 0, results, threeBuffer.length, twoBuffer.length);
171+
System.arraycopy(threeBuffer, 0, results, twoBuffer.length, threeBuffer.length);
171172
return results;
172173
} else {
173174
return twoBuffer;

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
* and Reassembly</a>
3939
*/
4040
final class FrameReassembler extends AtomicBoolean implements Disposable {
41+
42+
private static final long serialVersionUID = -4394598098863449055L;
43+
4144
private static final Logger logger = LoggerFactory.getLogger(FrameReassembler.class);
4245

4346
final IntObjectMap<ByteBuf> headers;

0 commit comments

Comments
 (0)