Skip to content

Commit b15a1fc

Browse files
authored
add fragmentation support using new frames (#585)
* add fragmentation support using new frames Signed-off-by: Robert Roeser <rroeserr@gmail.com>
1 parent 05a99a1 commit b15a1fc

File tree

49 files changed

+2067
-1111
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2067
-1111
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
# limitations under the License.
1313
#
1414

15-
version=0.11.16-SNAPSHOT
15+
version=0.12.1-SNAPSHOT

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,5 +127,15 @@ public ByteBuf sliceMetadata() {
127127
public ByteBuf sliceData() {
128128
return SetupFrameFlyweight.data(setupFrame);
129129
}
130+
131+
@Override
132+
public ByteBuf data() {
133+
return sliceData();
134+
}
135+
136+
@Override
137+
public ByteBuf metadata() {
138+
return sliceMetadata();
139+
}
130140
}
131141
}

rsocket-core/src/main/java/io/rsocket/Payload.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public interface Payload extends ReferenceCounted {
3232
boolean hasMetadata();
3333

3434
/**
35-
* Returns the Payload metadata. Always non-null, check {@link #hasMetadata()} to differentiate
36-
* null from "".
35+
* Returns a slice Payload metadata. Always non-null, check {@link #hasMetadata()} to
36+
* differentiate null from "".
3737
*
3838
* @return payload metadata.
3939
*/
@@ -46,6 +46,22 @@ public interface Payload extends ReferenceCounted {
4646
*/
4747
ByteBuf sliceData();
4848

49+
/**
50+
* Returns the Payloads' data without slicing if possible. This is not safe and editing this could
51+
* effect the payload. It is recommended to call sliceData().
52+
*
53+
* @return data as a bytebuf or slice of the data
54+
*/
55+
ByteBuf data();
56+
57+
/**
58+
* Returns the Payloads' metadata without slicing if possible. This is not safe and editing this
59+
* could effect the payload. It is recommended to call sliceMetadata().
60+
*
61+
* @return metadata as a bytebuf or slice of the metadata
62+
*/
63+
ByteBuf metadata();
64+
4965
/** Increases the reference count by {@code 1}. */
5066
@Override
5167
Payload retain();

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
21+
import io.netty.util.ReferenceCountUtil;
2122
import io.netty.util.collection.IntObjectHashMap;
2223
import io.rsocket.exceptions.ConnectionErrorException;
2324
import io.rsocket.exceptions.Exceptions;
@@ -220,7 +221,6 @@ private Mono<Void> handleFireAndForget(Payload payload) {
220221
false,
221222
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
222223
payload.sliceData().retain());
223-
224224
payload.release();
225225
sendProcessor.onNext(requestFrame);
226226
}));
@@ -292,12 +292,12 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
292292
false,
293293
payload.sliceMetadata().retain(),
294294
payload.sliceData().retain());
295+
payload.release();
295296

296297
UnicastMonoProcessor<Payload> receiver = UnicastMonoProcessor.create();
297298
receivers.put(streamId, receiver);
298299

299300
sendProcessor.onNext(requestFrame);
300-
301301
return receiver
302302
.doOnError(
303303
t ->
@@ -472,8 +472,10 @@ private void handleIncomingFrames(ByteBuf frame) {
472472
} else {
473473
handleFrame(streamId, type, frame);
474474
}
475-
} finally {
476475
frame.release();
476+
} catch (Throwable t) {
477+
ReferenceCountUtil.safeRelease(frame);
478+
throw reactor.core.Exceptions.propagate(t);
477479
}
478480
}
479481

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.rsocket.exceptions.InvalidSetupException;
2222
import io.rsocket.exceptions.RejectedSetupException;
23-
import io.rsocket.fragmentation.FragmentationDuplexConnection;
2423
import io.rsocket.frame.ErrorFrameFlyweight;
2524
import io.rsocket.frame.SetupFrameFlyweight;
2625
import io.rsocket.frame.VersionFlyweight;
@@ -216,7 +215,7 @@ private class StartClient implements Start<RSocket> {
216215
public Mono<RSocket> start() {
217216
return transportClient
218217
.get()
219-
.connect()
218+
.connect(mtu)
220219
.flatMap(
221220
connection -> {
222221
ByteBuf setupFrame =
@@ -231,10 +230,6 @@ public Mono<RSocket> start() {
231230
setupPayload.sliceMetadata(),
232231
setupPayload.sliceData());
233232

234-
if (mtu > 0) {
235-
connection = new FragmentationDuplexConnection(connection, mtu);
236-
}
237-
238233
ClientServerInputMultiplexer multiplexer =
239234
new ClientServerInputMultiplexer(connection, plugins);
240235

@@ -333,10 +328,6 @@ public Mono<T> start() {
333328
.get()
334329
.start(
335330
connection -> {
336-
if (mtu > 0) {
337-
connection = new FragmentationDuplexConnection(connection, mtu);
338-
}
339-
340331
ClientServerInputMultiplexer multiplexer =
341332
new ClientServerInputMultiplexer(connection, plugins);
342333

@@ -345,7 +336,8 @@ public Mono<T> start() {
345336
.receive()
346337
.next()
347338
.flatMap(setupFrame -> processSetupFrame(multiplexer, setupFrame));
348-
});
339+
},
340+
mtu);
349341
}
350342

351343
private Mono<Void> processSetupFrame(

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.reactivestreams.Subscriber;
3535
import org.reactivestreams.Subscription;
3636
import reactor.core.Disposable;
37+
import reactor.core.Exceptions;
3738
import reactor.core.publisher.Flux;
3839
import reactor.core.publisher.Mono;
3940
import reactor.core.publisher.SignalType;
@@ -330,8 +331,10 @@ private void handleFrame(ByteBuf frame) {
330331
new IllegalStateException("ServerRSocket: Unexpected frame type: " + frameType));
331332
break;
332333
}
333-
} finally {
334334
ReferenceCountUtil.safeRelease(frame);
335+
} catch (Throwable t) {
336+
ReferenceCountUtil.safeRelease(frame);
337+
throw Exceptions.propagate(t);
335338
}
336339
}
337340

@@ -345,11 +348,28 @@ private void handleFireAndForget(int streamId, Mono<Void> result) {
345348
private void handleRequestResponse(int streamId, Mono<Payload> response) {
346349
response
347350
.doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription))
348-
.map(payload -> PayloadFrameFlyweight.encodeNextComplete(allocator, streamId, payload))
351+
.map(
352+
payload -> {
353+
ByteBuf byteBuf = null;
354+
try {
355+
byteBuf = PayloadFrameFlyweight.encodeNextComplete(allocator, streamId, payload);
356+
} catch (Throwable t) {
357+
if (byteBuf != null) {
358+
ReferenceCountUtil.safeRelease(byteBuf);
359+
ReferenceCountUtil.safeRelease(payload);
360+
}
361+
}
362+
payload.release();
363+
return byteBuf;
364+
})
349365
.switchIfEmpty(
350366
Mono.fromCallable(() -> PayloadFrameFlyweight.encodeComplete(allocator, streamId)))
351367
.doFinally(signalType -> sendingSubscriptions.remove(streamId))
352-
.subscribe(t1 -> sendProcessor.onNext(t1), t -> handleError(streamId, t));
368+
.subscribe(
369+
t1 -> {
370+
sendProcessor.onNext(t1);
371+
},
372+
t -> handleError(streamId, t));
353373
}
354374

355375
private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
@@ -364,9 +384,20 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
364384
})
365385
.doFinally(signalType -> sendingSubscriptions.remove(streamId))
366386
.subscribe(
367-
payload ->
368-
sendProcessor.onNext(
369-
PayloadFrameFlyweight.encodeNext(allocator, streamId, payload)),
387+
payload -> {
388+
ByteBuf byteBuf = null;
389+
try {
390+
byteBuf = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload);
391+
} catch (Throwable t) {
392+
if (byteBuf != null) {
393+
ReferenceCountUtil.safeRelease(byteBuf);
394+
ReferenceCountUtil.safeRelease(payload);
395+
}
396+
throw Exceptions.propagate(t);
397+
}
398+
payload.release();
399+
sendProcessor.onNext(byteBuf);
400+
},
370401
t -> handleError(streamId, t),
371402
() -> sendProcessor.onNext(PayloadFrameFlyweight.encodeComplete(allocator, streamId)));
372403
}

0 commit comments

Comments
 (0)