Skip to content

Commit 4552053

Browse files
mostroverkhovrobertroeser
authored andcommitted
Fix wrong messages content/refCount exceptions on highly concurrent streams (#524)
* fix: assumption about all payloads being processed synchronously is wrong * convert to test
1 parent 3e89238 commit 4552053

File tree

3 files changed

+115
-4
lines changed

3 files changed

+115
-4
lines changed

rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@
2323
import io.netty.util.AbstractReferenceCounted;
2424
import io.netty.util.Recycler;
2525
import io.rsocket.Payload;
26+
27+
import javax.annotation.Nullable;
2628
import java.nio.ByteBuffer;
2729
import java.nio.CharBuffer;
2830
import java.nio.charset.Charset;
29-
import javax.annotation.Nullable;
3031

3132
public final class ByteBufPayload extends AbstractReferenceCounted implements Payload {
3233
private static final Recycler<ByteBufPayload> RECYCLER =
@@ -168,8 +169,8 @@ public static Payload create(ByteBuf data) {
168169
public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
169170
ByteBufPayload payload = RECYCLER.get();
170171
payload.setRefCnt(1);
171-
payload.data = data;
172-
payload.metadata = metadata;
172+
payload.data = data.retain();
173+
payload.metadata = metadata == null ? Unpooled.EMPTY_BUFFER : metadata.retain();
173174
return payload;
174175
}
175176

rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) {
155155
}
156156

157157
public static Payload create(Payload payload) {
158-
return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null);
158+
return create(copy(payload.sliceData()), payload.hasMetadata() ? copy(payload.sliceMetadata()) : null);
159+
}
160+
161+
private static ByteBuffer copy(ByteBuf byteBuf) {
162+
byte[] contents = new byte[byteBuf.readableBytes()];
163+
byteBuf.readBytes(contents);
164+
return ByteBuffer.wrap(contents);
159165
}
160166
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package io.rsocket.integration;
2+
3+
import io.rsocket.AbstractRSocket;
4+
import io.rsocket.Payload;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.RSocketFactory;
7+
import io.rsocket.test.SlowTest;
8+
import io.rsocket.transport.netty.client.TcpClientTransport;
9+
import io.rsocket.transport.netty.server.NettyContextCloseable;
10+
import io.rsocket.transport.netty.server.TcpServerTransport;
11+
import io.rsocket.util.DefaultPayload;
12+
import org.junit.jupiter.api.Test;
13+
import org.reactivestreams.Publisher;
14+
import reactor.core.publisher.Flux;
15+
import reactor.core.publisher.Mono;
16+
17+
import java.time.Duration;
18+
import java.util.function.Supplier;
19+
20+
public class InteractionsLoadTest {
21+
22+
@Test
23+
@SlowTest
24+
public void channel() {
25+
TcpServerTransport serverTransport = TcpServerTransport.create(0);
26+
27+
NettyContextCloseable server = RSocketFactory.receive()
28+
.acceptor((setup, rsocket) -> Mono.just(new EchoRSocket()))
29+
.transport(serverTransport)
30+
.start()
31+
.block(Duration.ofSeconds(10));
32+
33+
TcpClientTransport transport = TcpClientTransport.create(server.address());
34+
35+
RSocket client = RSocketFactory
36+
.connect()
37+
.transport(transport).start()
38+
.block(Duration.ofSeconds(10));
39+
40+
int concurrency = 16;
41+
Flux.range(1, concurrency)
42+
.flatMap(v ->
43+
client.requestChannel(
44+
input().onBackpressureDrop().map(iv ->
45+
DefaultPayload.create("foo")))
46+
.limitRate(10000), concurrency)
47+
.timeout(Duration.ofSeconds(5))
48+
.doOnNext(p -> {
49+
String data = p.getDataUtf8();
50+
if (!data.equals("bar")) {
51+
throw new IllegalStateException("Channel Client Bad message: " + data);
52+
}
53+
})
54+
.window(Duration.ofSeconds(1))
55+
.flatMap(Flux::count)
56+
.doOnNext(d -> System.out.println("Got: " + d))
57+
.take(Duration.ofMinutes(1))
58+
.doOnTerminate(server::dispose)
59+
.subscribe();
60+
61+
server.onClose().block();
62+
63+
}
64+
65+
private static Flux<Long> input() {
66+
Flux<Long> interval = Flux.interval(Duration.ofMillis(1))
67+
.onBackpressureDrop();
68+
for (int i = 0; i < 10; i++) {
69+
interval = interval.mergeWith(interval);
70+
}
71+
return interval;
72+
}
73+
74+
private static class EchoRSocket extends AbstractRSocket {
75+
@Override
76+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
77+
return Flux.from(payloads).map(p -> {
78+
79+
String data = p.getDataUtf8();
80+
if (!data.equals("foo")) {
81+
throw new IllegalStateException("Channel Server Bad message: " + data);
82+
}
83+
return DefaultPayload.create(DefaultPayload.create("bar"));
84+
});
85+
}
86+
87+
@Override
88+
public Flux<Payload> requestStream(Payload payload) {
89+
return Flux.just(payload)
90+
.map(p -> {
91+
String data = p.getDataUtf8();
92+
return data;
93+
})
94+
.doOnNext((data) -> {
95+
if (!data.equals("foo")) {
96+
throw new IllegalStateException("Stream Server Bad message: " + data);
97+
}
98+
}).flatMap(data -> {
99+
Supplier<Payload> p = () -> DefaultPayload.create("bar");
100+
return Flux.range(1, 100).map(v -> p.get());
101+
});
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)