Skip to content

Commit 058878b

Browse files
committed
added missing retain on metadata when reassembling frames
Signed-off-by: Robert Roeser <rroeserr@gmail.com>
1 parent 0088065 commit 058878b

File tree

3 files changed

+173
-2
lines changed

3 files changed

+173
-2
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
# limitations under the License.
1313
#
1414

15-
version=0.12.1-RC1
15+
version=0.12.1-RC2

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf h
267267

268268
ByteBuf data = assembleData(frame, streamId);
269269

270-
return FragmentationFlyweight.encode(allocator, header, metadata, data);
270+
return FragmentationFlyweight.encode(allocator, header, metadata.retain(), data);
271271
}
272272

273273
private ByteBuf assembleData(ByteBuf frame, int streamId) {
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.integration;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.rsocket.AbstractRSocket;
22+
import io.rsocket.Payload;
23+
import io.rsocket.RSocket;
24+
import io.rsocket.RSocketFactory;
25+
import io.rsocket.transport.netty.client.TcpClientTransport;
26+
import io.rsocket.transport.netty.server.CloseableChannel;
27+
import io.rsocket.transport.netty.server.TcpServerTransport;
28+
import io.rsocket.util.DefaultPayload;
29+
import io.rsocket.util.RSocketProxy;
30+
import java.util.concurrent.ThreadLocalRandom;
31+
import org.junit.jupiter.api.AfterEach;
32+
import org.junit.jupiter.api.BeforeEach;
33+
import org.junit.jupiter.api.Test;
34+
import reactor.core.publisher.Flux;
35+
import reactor.core.publisher.Mono;
36+
37+
public class FragmentTest {
38+
private static final int frameSize = 128;
39+
private AbstractRSocket handler;
40+
private CloseableChannel server;
41+
private String message = null;
42+
private String metaData = null;
43+
44+
@BeforeEach
45+
public void startup() {
46+
int randomPort = ThreadLocalRandom.current().nextInt(10_000, 20_000);
47+
StringBuilder message = new StringBuilder();
48+
StringBuilder metaData = new StringBuilder();
49+
for (int i = 0; i < 100; i++) {
50+
message.append("RESPONSE ");
51+
metaData.append("METADATA ");
52+
}
53+
this.message = message.toString();
54+
this.metaData = metaData.toString();
55+
TcpServerTransport serverTransport = TcpServerTransport.create(randomPort);
56+
server =
57+
RSocketFactory.receive()
58+
.fragment(frameSize)
59+
.acceptor((setup, sendingSocket) -> Mono.just(new RSocketProxy(handler)))
60+
.transport(serverTransport)
61+
.start()
62+
.block();
63+
}
64+
65+
private RSocket buildClient() {
66+
return RSocketFactory.connect()
67+
.fragment(frameSize)
68+
.transport(TcpClientTransport.create(server.address()))
69+
.start()
70+
.block();
71+
}
72+
73+
@AfterEach
74+
public void cleanup() {
75+
server.dispose();
76+
}
77+
78+
@Test
79+
void testFragmentNoMetaData() {
80+
System.out.println(
81+
"-------------------------------------------------testFragmentNoMetaData-------------------------------------------------");
82+
handler =
83+
new AbstractRSocket() {
84+
@Override
85+
public Flux<Payload> requestStream(Payload payload) {
86+
String request = payload.getDataUtf8();
87+
String metaData = payload.getMetadataUtf8();
88+
System.out.println("request message: " + request);
89+
System.out.println("request metadata: " + metaData);
90+
91+
return Flux.just(DefaultPayload.create(request));
92+
}
93+
};
94+
95+
RSocket client = buildClient();
96+
97+
System.out.println("original message: " + message);
98+
System.out.println("original metadata: " + metaData);
99+
Payload payload = client.requestStream(DefaultPayload.create(message)).blockLast();
100+
System.out.println("response message: " + payload.getDataUtf8());
101+
System.out.println("response metadata: " + payload.getMetadataUtf8());
102+
103+
assertThat(message).isEqualTo(payload.getDataUtf8());
104+
}
105+
106+
@Test
107+
void testFragmentRequestMetaDataOnly() {
108+
System.out.println(
109+
"-------------------------------------------------testFragmentRequestMetaDataOnly-------------------------------------------------");
110+
handler =
111+
new AbstractRSocket() {
112+
@Override
113+
public Flux<Payload> requestStream(Payload payload) {
114+
String request = payload.getDataUtf8();
115+
String metaData = payload.getMetadataUtf8();
116+
System.out.println("request message: " + request);
117+
System.out.println("request metadata: " + metaData);
118+
119+
return Flux.just(DefaultPayload.create(request));
120+
}
121+
};
122+
123+
RSocket client = buildClient();
124+
125+
System.out.println("original message: " + message);
126+
System.out.println("original metadata: " + metaData);
127+
Payload payload = client.requestStream(DefaultPayload.create(message, metaData)).blockLast();
128+
System.out.println("response message: " + payload.getDataUtf8());
129+
System.out.println("response metadata: " + payload.getMetadataUtf8());
130+
131+
assertThat(message).isEqualTo(payload.getDataUtf8());
132+
}
133+
134+
@Test
135+
void testFragmentBothMetaData() {
136+
System.out.println(
137+
"-------------------------------------------------testFragmentBothMetaData-------------------------------------------------");
138+
handler =
139+
new AbstractRSocket() {
140+
@Override
141+
public Flux<Payload> requestStream(Payload payload) {
142+
String request = payload.getDataUtf8();
143+
String metaData = payload.getMetadataUtf8();
144+
System.out.println("request message: " + request);
145+
System.out.println("request metadata: " + metaData);
146+
147+
return Flux.just(DefaultPayload.create(request, metaData));
148+
}
149+
150+
@Override
151+
public Mono<Payload> requestResponse(Payload payload) {
152+
String request = payload.getDataUtf8();
153+
String metaData = payload.getMetadataUtf8();
154+
System.out.println("request message: " + request);
155+
System.out.println("request metadata: " + metaData);
156+
157+
return Mono.just(DefaultPayload.create(request, metaData));
158+
}
159+
};
160+
161+
RSocket client = buildClient();
162+
163+
System.out.println("original message: " + message);
164+
System.out.println("original metadata: " + metaData);
165+
Payload payload = client.requestStream(DefaultPayload.create(message, metaData)).blockLast();
166+
System.out.println("response message: " + payload.getDataUtf8());
167+
System.out.println("response metadata: " + payload.getMetadataUtf8());
168+
169+
assertThat(message).isEqualTo(payload.getDataUtf8());
170+
}
171+
}

0 commit comments

Comments
 (0)