Skip to content

Commit 94d2e48

Browse files
authored
Merge pull request #545 from rsocket/saferelease-fix
removed safeRelease
2 parents aa3e8ed + 1b32df0 commit 94d2e48

File tree

2 files changed

+73
-21
lines changed

2 files changed

+73
-21
lines changed

rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.rsocket.internal;
1818

19-
import io.netty.util.ReferenceCountUtil;
2019
import java.util.Objects;
2120
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2221
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -122,11 +121,15 @@ public void cancel() {
122121
if (s != Operators.cancelledSubscription()) {
123122
Subscription s = this.s;
124123
this.s = Operators.cancelledSubscription();
125-
ReferenceCountUtil.safeRelease(first);
126124

127125
if (WIP.getAndIncrement(this) == 0) {
128126
INNER.lazySet(this, null);
129-
first = null;
127+
128+
T f = first;
129+
if (f != null) {
130+
first = null;
131+
Operators.onDiscard(f, currentContext());
132+
}
130133
}
131134

132135
s.cancel();
@@ -171,7 +174,6 @@ public void onNext(T t) {
171174
return;
172175
} catch (Throwable e) {
173176
onError(Operators.onOperatorError(s, e, t, currentContext()));
174-
ReferenceCountUtil.safeRelease(t);
175177
return;
176178
}
177179
}
@@ -219,13 +221,14 @@ public void onComplete() {
219221

220222
@Override
221223
public void request(long n) {
222-
if (first != null && drainRegular() && n != Long.MAX_VALUE) {
223-
n = Operators.addCap(n, -1);
224-
if (n > 0) {
224+
if (Operators.validate(n)) {
225+
if (first != null && drainRegular() && n != Long.MAX_VALUE) {
226+
if (--n > 0) {
227+
s.request(n);
228+
}
229+
} else {
225230
s.request(n);
226231
}
227-
} else {
228-
s.request(n);
229232
}
230233
}
231234

@@ -245,12 +248,11 @@ boolean drainRegular() {
245248
first = null;
246249

247250
if (s == Operators.cancelledSubscription()) {
248-
Operators.onNextDropped(f, a.currentContext());
251+
Operators.onDiscard(f, a.currentContext());
249252
return true;
250253
}
251254

252255
a.onNext(f);
253-
ReferenceCountUtil.safeRelease(f);
254256
f = null;
255257
sent = true;
256258
}
@@ -345,11 +347,15 @@ public void cancel() {
345347
if (s != Operators.cancelledSubscription()) {
346348
Subscription s = this.s;
347349
this.s = Operators.cancelledSubscription();
348-
ReferenceCountUtil.safeRelease(first);
349350

350351
if (WIP.getAndIncrement(this) == 0) {
351352
INNER.lazySet(this, null);
352-
first = null;
353+
354+
T f = first;
355+
if (f != null) {
356+
first = null;
357+
Operators.onDiscard(f, currentContext());
358+
}
353359
}
354360

355361
s.cancel();
@@ -399,7 +405,6 @@ public void onNext(T t) {
399405
return;
400406
} catch (Throwable e) {
401407
onError(Operators.onOperatorError(s, e, t, currentContext()));
402-
ReferenceCountUtil.safeRelease(t);
403408
return;
404409
}
405410
}
@@ -426,7 +431,6 @@ public boolean tryOnNext(T t) {
426431
return true;
427432
} catch (Throwable e) {
428433
onError(Operators.onOperatorError(s, e, t, currentContext()));
429-
ReferenceCountUtil.safeRelease(t);
430434
return false;
431435
}
432436
}
@@ -474,12 +478,14 @@ public void onComplete() {
474478

475479
@Override
476480
public void request(long n) {
477-
if (first != null && drainRegular() && n != Long.MAX_VALUE) {
478-
if (--n > 0) {
481+
if (Operators.validate(n)) {
482+
if (first != null && drainRegular() && n != Long.MAX_VALUE) {
483+
if (--n > 0) {
484+
s.request(n);
485+
}
486+
} else {
479487
s.request(n);
480488
}
481-
} else {
482-
s.request(n);
483489
}
484490
}
485491

@@ -499,12 +505,11 @@ boolean drainRegular() {
499505
first = null;
500506

501507
if (s == Operators.cancelledSubscription()) {
502-
Operators.onNextDropped(f, a.currentContext());
508+
Operators.onDiscard(f, a.currentContext());
503509
return true;
504510
}
505511

506512
a.onNext(f);
507-
ReferenceCountUtil.safeRelease(f);
508513
f = null;
509514
sent = true;
510515
}

rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,4 +394,51 @@ public void shouldBeAbleToBeCancelledProperly() {
394394
publisher.assertCancelled();
395395
publisher.assertWasRequested();
396396
}
397+
398+
@Test
399+
public void shouldBeAbleToCatchDiscardedElement() {
400+
TestPublisher<Integer> publisher = TestPublisher.createCold();
401+
Integer[] discarded = new Integer[1];
402+
Flux<String> switchTransformed =
403+
publisher
404+
.flux()
405+
.transform(
406+
flux ->
407+
new SwitchTransformFlux<>(
408+
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)))
409+
.doOnDiscard(Integer.class, e -> discarded[0] = e);
410+
411+
publisher.next(1);
412+
413+
StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10));
414+
415+
publisher.assertCancelled();
416+
publisher.assertWasRequested();
417+
418+
Assert.assertArrayEquals(new Integer[] {1}, discarded);
419+
}
420+
421+
@Test
422+
public void shouldBeAbleToCatchDiscardedElementInCaseOfConditional() {
423+
TestPublisher<Integer> publisher = TestPublisher.createCold();
424+
Integer[] discarded = new Integer[1];
425+
Flux<String> switchTransformed =
426+
publisher
427+
.flux()
428+
.transform(
429+
flux ->
430+
new SwitchTransformFlux<>(
431+
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)))
432+
.filter(t -> true)
433+
.doOnDiscard(Integer.class, e -> discarded[0] = e);
434+
435+
publisher.next(1);
436+
437+
StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10));
438+
439+
publisher.assertCancelled();
440+
publisher.assertWasRequested();
441+
442+
Assert.assertArrayEquals(new Integer[] {1}, discarded);
443+
}
397444
}

0 commit comments

Comments
 (0)