Skip to content

Commit 221b5bf

Browse files
authored
Fixed an issue where ByteBufferStoringSubscriber is requesting more than specified in minimumBytesBuffered (#5014)
1 parent 172a019 commit 221b5bf

File tree

3 files changed

+43
-26
lines changed

3 files changed

+43
-26
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fixed an issue in `ByteBufferStoringSubscriber` where it could buffer more data than configured, resulting in out of memory error. See [#4999](https://github.com/aws/aws-sdk-java-v2/issues/4999)."
6+
}

utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ public TransferResult transferTo(ByteBuffer out) {
9999
next = storingSubscriber.peek();
100100
}
101101

102-
addBufferedDataAmount(-transferred);
102+
if (transferred != 0) {
103+
addBufferedDataAmount(-transferred);
104+
}
103105

104106
if (!next.isPresent()) {
105107
return TransferResult.SUCCESS;
@@ -178,8 +180,9 @@ public void onSubscribe(Subscription s) {
178180

179181
@Override
180182
public void onNext(ByteBuffer byteBuffer) {
183+
int remaining = byteBuffer.remaining();
181184
storingSubscriber.onNext(byteBuffer.duplicate());
182-
addBufferedDataAmount(byteBuffer.remaining());
185+
addBufferedDataAmount(remaining);
183186
phaser.arrive();
184187
}
185188

utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,13 @@
1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Assertions.assertThatCode;
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21-
import static org.mockito.ArgumentMatchers.any;
22-
import static org.mockito.ArgumentMatchers.anyInt;
2321
import static org.mockito.ArgumentMatchers.anyLong;
2422
import static org.mockito.Mockito.doAnswer;
25-
import static org.mockito.Mockito.mock;
2623
import static org.mockito.Mockito.times;
2724
import static org.mockito.Mockito.verify;
2825
import static org.mockito.Mockito.verifyNoMoreInteractions;
29-
import static org.mockito.Mockito.when;
3026

3127
import java.nio.ByteBuffer;
32-
import java.util.ArrayList;
33-
import java.util.Collections;
34-
import java.util.List;
3528
import java.util.concurrent.CountDownLatch;
3629
import java.util.concurrent.ExecutionException;
3730
import java.util.concurrent.ExecutorService;
@@ -43,11 +36,19 @@
4336
import java.util.concurrent.atomic.AtomicReference;
4437
import org.junit.jupiter.api.Test;
4538
import org.junit.jupiter.api.Timeout;
39+
import org.junit.jupiter.api.extension.ExtendWith;
40+
import org.mockito.Mock;
41+
import org.mockito.junit.jupiter.MockitoExtension;
4642
import org.reactivestreams.Subscription;
4743
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
4844
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;
4945

46+
@ExtendWith(MockitoExtension.class)
5047
public class ByteBufferStoringSubscriberTest {
48+
49+
@Mock
50+
private Subscription subscription;
51+
5152
@Test
5253
public void constructorCalled_withNonPositiveSize_throwsException() {
5354
assertThatCode(() -> new ByteBufferStoringSubscriber(1)).doesNotThrowAnyException();
@@ -61,7 +62,6 @@ public void constructorCalled_withNonPositiveSize_throwsException() {
6162
@Test
6263
public void doesNotRequestMoreThanMaxBytes() {
6364
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(3);
64-
Subscription subscription = mock(Subscription.class);
6565

6666
subscriber.onSubscribe(subscription);
6767
verify(subscription).request(1);
@@ -79,7 +79,6 @@ public void doesNotRequestMoreThanMaxBytes() {
7979
@Test
8080
public void canStoreMoreThanMaxBytesButWontAskForMoreUntilBelowMax() {
8181
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(3);
82-
Subscription subscription = mock(Subscription.class);
8382

8483
subscriber.onSubscribe(subscription);
8584
verify(subscription).request(1);
@@ -104,7 +103,6 @@ public void blockingTransfer_waitsForFullOutputBuffer() throws InterruptedExcept
104103

105104
try {
106105
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(1);
107-
Subscription subscription = mock(Subscription.class);
108106

109107
AtomicInteger bufferNumber = new AtomicInteger(0);
110108
doAnswer(i -> {
@@ -146,17 +144,18 @@ public void blockingTransfer_stopsOnComplete() throws ExecutionException, Interr
146144
try {
147145
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(Long.MAX_VALUE);
148146

149-
subscriber.onSubscribe(mock(Subscription.class));
147+
subscriber.onSubscribe(subscription); // request 1
150148
Future<ByteBuffer> blockingRead = executor.submit(() -> {
151149
ByteBuffer out = ByteBuffer.allocate(1024);
150+
// request 2
152151
TransferResult transferResult = subscriber.blockingTransferTo(out);
153152
assertThat(transferResult).isEqualTo(TransferResult.END_OF_STREAM);
154153
return out;
155154
});
156155

157156
ByteBuffer input = fullByteBufferOfSize(1);
158157

159-
subscriber.onNext(input);
158+
subscriber.onNext(input); // request 3
160159
subscriber.onComplete();
161160

162161
ByteBuffer output = blockingRead.get();
@@ -165,6 +164,8 @@ public void blockingTransfer_stopsOnComplete() throws ExecutionException, Interr
165164
} finally {
166165
executor.shutdownNow();
167166
}
167+
168+
verify(subscription, times(3)).request(1);
168169
}
169170

170171
@Test
@@ -175,7 +176,7 @@ public void blockingTransfer_stopsOnError() throws ExecutionException, Interrupt
175176
try {
176177
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(Long.MAX_VALUE);
177178

178-
subscriber.onSubscribe(mock(Subscription.class));
179+
subscriber.onSubscribe(subscription);
179180
Future<?> blockingRead = executor.submit(() -> {
180181
subscriber.blockingTransferTo(ByteBuffer.allocate(1024)); // Expected to throw
181182
return null;
@@ -202,7 +203,7 @@ public void blockingTransfer_stopsOnInterrupt() throws InterruptedException {
202203
AtomicBoolean threadIsInterruptedInCatch = new AtomicBoolean(false);
203204
AtomicReference<Throwable> failureReason = new AtomicReference<>();
204205

205-
subscriber.onSubscribe(mock(Subscription.class));
206+
subscriber.onSubscribe(subscription);
206207

207208
CountDownLatch threadIsRunning = new CountDownLatch(1);
208209
Thread thread = new Thread(() -> {
@@ -230,98 +231,105 @@ public void blockingTransfer_stopsOnInterrupt() throws InterruptedException {
230231
public void blockingTransfer_returnsEndOfStreamWithRepeatedCalls() {
231232
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(Long.MAX_VALUE);
232233

233-
subscriber.onSubscribe(mock(Subscription.class));
234+
subscriber.onSubscribe(subscription);
234235
subscriber.onComplete();
235236

236237
ByteBuffer buffer = ByteBuffer.allocate(0);
237238

238239
assertThat(subscriber.blockingTransferTo(buffer)).isEqualTo(TransferResult.END_OF_STREAM);
239240
assertThat(subscriber.blockingTransferTo(buffer)).isEqualTo(TransferResult.END_OF_STREAM);
240241
assertThat(subscriber.blockingTransferTo(buffer)).isEqualTo(TransferResult.END_OF_STREAM);
242+
verify(subscription).request(1);
241243
}
242244

243245

244246
@Test
245247
public void noDataTransferredIfNoDataBuffered() {
246248
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(2);
247-
subscriber.onSubscribe(mock(Subscription.class));
249+
subscriber.onSubscribe(subscription);
248250

249251
ByteBuffer out = emptyByteBufferOfSize(1);
250252

251253
assertThat(subscriber.transferTo(out)).isEqualTo(TransferResult.SUCCESS);
252254
assertThat(out.remaining()).isEqualTo(1);
255+
verify(subscription).request(1);
253256
}
254257

255258
@Test
256259
public void noDataTransferredIfComplete() {
257260
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(2);
258-
subscriber.onSubscribe(mock(Subscription.class));
261+
subscriber.onSubscribe(subscription);
259262
subscriber.onComplete();
260263

261264
ByteBuffer out = emptyByteBufferOfSize(1);
262265

263266
assertThat(subscriber.transferTo(out)).isEqualTo(TransferResult.END_OF_STREAM);
264267
assertThat(out.remaining()).isEqualTo(1);
268+
verify(subscription).request(1);
265269
}
266270

267271
@Test
268272
public void noDataTransferredIfError() {
269273
RuntimeException error = new RuntimeException();
270274

271275
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(2);
272-
subscriber.onSubscribe(mock(Subscription.class));
276+
subscriber.onSubscribe(subscription);
273277
subscriber.onError(error);
274278

275279
ByteBuffer out = emptyByteBufferOfSize(1);
276280

277281
assertThatThrownBy(() -> subscriber.transferTo(out)).isEqualTo(error);
278282
assertThat(out.remaining()).isEqualTo(1);
283+
verify(subscription).request(1);
279284
}
280285

281286
@Test
282287
public void checkedExceptionsAreWrapped() {
283288
Exception error = new Exception();
284289

285290
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(2);
286-
subscriber.onSubscribe(mock(Subscription.class));
291+
subscriber.onSubscribe(subscription);
287292
subscriber.onError(error);
288293

289294
ByteBuffer out = emptyByteBufferOfSize(1);
290295

291296
assertThatThrownBy(() -> subscriber.transferTo(out)).hasCause(error);
292297
assertThat(out.remaining()).isEqualTo(1);
298+
verify(subscription).request(1);
293299
}
294300

295301
@Test
296302
public void completeIsReportedEvenWithExactOutSize() {
297303
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(2);
298-
subscriber.onSubscribe(mock(Subscription.class));
304+
subscriber.onSubscribe(subscription);
299305
subscriber.onNext(fullByteBufferOfSize(2));
300306
subscriber.onComplete();
301307

302308
ByteBuffer out = emptyByteBufferOfSize(2);
303309
assertThat(subscriber.transferTo(out)).isEqualTo(TransferResult.END_OF_STREAM);
304310
assertThat(out.remaining()).isEqualTo(0);
311+
verify(subscription, times(2)).request(1);
305312
}
306313

307314
@Test
308315
public void completeIsReportedEvenWithExtraOutSize() {
309316
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(2);
310-
subscriber.onSubscribe(mock(Subscription.class));
317+
subscriber.onSubscribe(subscription);
311318
subscriber.onNext(fullByteBufferOfSize(2));
312319
subscriber.onComplete();
313320

314321
ByteBuffer out = emptyByteBufferOfSize(3);
315322
assertThat(subscriber.transferTo(out)).isEqualTo(TransferResult.END_OF_STREAM);
316323
assertThat(out.remaining()).isEqualTo(1);
324+
verify(subscription, times(2)).request(1);
317325
}
318326

319327
@Test
320328
public void errorIsReportedEvenWithExactOutSize() {
321329
RuntimeException error = new RuntimeException();
322330

323331
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(2);
324-
subscriber.onSubscribe(mock(Subscription.class));
332+
subscriber.onSubscribe(subscription);
325333
subscriber.onNext(fullByteBufferOfSize(2));
326334
subscriber.onError(error);
327335

@@ -335,7 +343,7 @@ public void errorIsReportedEvenWithExtraOutSize() {
335343
RuntimeException error = new RuntimeException();
336344

337345
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(2);
338-
subscriber.onSubscribe(mock(Subscription.class));
346+
subscriber.onSubscribe(subscription);
339347
subscriber.onNext(fullByteBufferOfSize(2));
340348
subscriber.onError(error);
341349

@@ -351,7 +359,7 @@ public void dataIsDeliveredInTheRightOrder() {
351359
ByteBuffer buffer3 = fullByteBufferOfSize(1);
352360

353361
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(3);
354-
subscriber.onSubscribe(mock(Subscription.class));
362+
subscriber.onSubscribe(subscription);
355363
subscriber.onNext(buffer1);
356364
subscriber.onNext(buffer2);
357365
subscriber.onNext(buffer3);

0 commit comments

Comments
 (0)