Skip to content

Commit 48459fa

Browse files
committed
fix for switchTransform
Signed-off-by: Oleh Dokuka <oleh@netifi.com>
1 parent af6628c commit 48459fa

File tree

2 files changed

+256
-235
lines changed

2 files changed

+256
-235
lines changed

rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616

1717
package io.rsocket.internal;
1818

19+
import io.netty.util.ReferenceCountUtil;
1920
import java.util.Objects;
2021
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2122
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2223
import java.util.function.BiFunction;
23-
24-
import io.netty.util.ReferenceCountUtil;
2524
import org.reactivestreams.Publisher;
2625
import org.reactivestreams.Subscription;
2726
import reactor.core.CoreSubscriber;
@@ -38,7 +37,7 @@ public final class SwitchTransformFlux<T, R> extends Flux<R> {
3837
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
3938

4039
public SwitchTransformFlux(
41-
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
40+
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
4241
this.source = Objects.requireNonNull(source, "source");
4342
this.transformer = Objects.requireNonNull(transformer, "transformer");
4443
}
@@ -52,42 +51,48 @@ public int getPrefetch() {
5251
@SuppressWarnings("unchecked")
5352
public void subscribe(CoreSubscriber<? super R> actual) {
5453
if (actual instanceof Fuseable.ConditionalSubscriber) {
55-
source.subscribe(new SwitchTransformConditionalOperator<>((Fuseable.ConditionalSubscriber<? super R>) actual, transformer));
54+
source.subscribe(
55+
new SwitchTransformConditionalOperator<>(
56+
(Fuseable.ConditionalSubscriber<? super R>) actual, transformer));
5657
return;
5758
}
5859
source.subscribe(new SwitchTransformOperator<>(actual, transformer));
5960
}
6061

6162
static final class SwitchTransformOperator<T, R> extends Flux<T>
62-
implements CoreSubscriber<T>, Subscription, Scannable {
63+
implements CoreSubscriber<T>, Subscription, Scannable {
6364

6465
final CoreSubscriber<? super R> outer;
6566
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
6667

6768
Subscription s;
68-
Throwable throwable;
69+
Throwable throwable;
6970

7071
volatile boolean done;
71-
volatile T first;
72+
volatile T first;
7273

7374
volatile CoreSubscriber<? super T> inner;
75+
7476
@SuppressWarnings("rawtypes")
7577
static final AtomicReferenceFieldUpdater<SwitchTransformOperator, CoreSubscriber> INNER =
76-
AtomicReferenceFieldUpdater.newUpdater(SwitchTransformOperator.class, CoreSubscriber.class, "inner");
78+
AtomicReferenceFieldUpdater.newUpdater(
79+
SwitchTransformOperator.class, CoreSubscriber.class, "inner");
7780

7881
volatile int wip;
82+
7983
@SuppressWarnings("rawtypes")
8084
static final AtomicIntegerFieldUpdater<SwitchTransformOperator> WIP =
81-
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "wip");
85+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "wip");
8286

8387
volatile int once;
88+
8489
@SuppressWarnings("rawtypes")
8590
static final AtomicIntegerFieldUpdater<SwitchTransformOperator> ONCE =
86-
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "once");
91+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "once");
8792

8893
SwitchTransformOperator(
89-
CoreSubscriber<? super R> outer,
90-
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
94+
CoreSubscriber<? super R> outer,
95+
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
9196
this.outer = outer;
9297
this.transformer = transformer;
9398
}
@@ -133,9 +138,9 @@ public void subscribe(CoreSubscriber<? super T> actual) {
133138
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
134139
INNER.lazySet(this, actual);
135140
actual.onSubscribe(this);
136-
}
137-
else {
138-
Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber"));
141+
} else {
142+
Operators.error(
143+
actual, new IllegalStateException("SwitchTransform allows only one Subscriber"));
139144
}
140145
}
141146

@@ -160,12 +165,11 @@ public void onNext(T t) {
160165
try {
161166
first = t;
162167
Publisher<? extends R> result =
163-
Objects.requireNonNull(
164-
transformer.apply(t, this), "The transformer returned a null value");
168+
Objects.requireNonNull(
169+
transformer.apply(t, this), "The transformer returned a null value");
165170
result.subscribe(outer);
166171
return;
167-
}
168-
catch (Throwable e) {
172+
} catch (Throwable e) {
169173
onError(Operators.onOperatorError(s, e, t, currentContext()));
170174
ReferenceCountUtil.safeRelease(t);
171175
return;
@@ -190,8 +194,7 @@ public void onError(Throwable t) {
190194
if (first == null) {
191195
drainRegular();
192196
}
193-
}
194-
else {
197+
} else {
195198
Operators.error(outer, t);
196199
}
197200
}
@@ -209,8 +212,7 @@ public void onComplete() {
209212
if (first == null) {
210213
drainRegular();
211214
}
212-
}
213-
else {
215+
} else {
214216
Operators.complete(outer);
215217
}
216218
}
@@ -222,8 +224,7 @@ public void request(long n) {
222224
if (n > 0) {
223225
s.request(n);
224226
}
225-
}
226-
else {
227+
} else {
227228
s.request(n);
228229
}
229230
}
@@ -239,7 +240,7 @@ boolean drainRegular() {
239240
Subscription s = this.s;
240241
CoreSubscriber<? super T> a = inner;
241242

242-
for (;;) {
243+
for (; ; ) {
243244
if (f != null) {
244245
first = null;
245246
ReferenceCountUtil.safeRelease(f);
@@ -262,14 +263,12 @@ boolean drainRegular() {
262263
Throwable t = throwable;
263264
if (t != null) {
264265
a.onError(t);
265-
}
266-
else {
266+
} else {
267267
a.onComplete();
268268
}
269269
return sent;
270270
}
271271

272-
273272
m = WIP.addAndGet(this, -m);
274273

275274
if (m == 0) {
@@ -279,37 +278,44 @@ boolean drainRegular() {
279278
}
280279
}
281280

282-
283281
static final class SwitchTransformConditionalOperator<T, R> extends Flux<T>
284-
implements Fuseable.ConditionalSubscriber<T>, Subscription, Scannable {
282+
implements Fuseable.ConditionalSubscriber<T>, Subscription, Scannable {
285283

286284
final Fuseable.ConditionalSubscriber<? super R> outer;
287285
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
288286

289287
Subscription s;
290-
Throwable throwable;
288+
Throwable throwable;
291289

292290
volatile boolean done;
293-
volatile T first;
291+
volatile T first;
294292

295293
volatile Fuseable.ConditionalSubscriber<? super T> inner;
294+
296295
@SuppressWarnings("rawtypes")
297-
static final AtomicReferenceFieldUpdater<SwitchTransformConditionalOperator, Fuseable.ConditionalSubscriber> INNER =
298-
AtomicReferenceFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, Fuseable.ConditionalSubscriber.class, "inner");
296+
static final AtomicReferenceFieldUpdater<
297+
SwitchTransformConditionalOperator, Fuseable.ConditionalSubscriber>
298+
INNER =
299+
AtomicReferenceFieldUpdater.newUpdater(
300+
SwitchTransformConditionalOperator.class,
301+
Fuseable.ConditionalSubscriber.class,
302+
"inner");
299303

300304
volatile int wip;
305+
301306
@SuppressWarnings("rawtypes")
302307
static final AtomicIntegerFieldUpdater<SwitchTransformConditionalOperator> WIP =
303-
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "wip");
308+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "wip");
304309

305310
volatile int once;
311+
306312
@SuppressWarnings("rawtypes")
307313
static final AtomicIntegerFieldUpdater<SwitchTransformConditionalOperator> ONCE =
308-
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "once");
314+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "once");
309315

310316
SwitchTransformConditionalOperator(
311-
Fuseable.ConditionalSubscriber<? super R> outer,
312-
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
317+
Fuseable.ConditionalSubscriber<? super R> outer,
318+
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
313319
this.outer = outer;
314320
this.transformer = transformer;
315321
}
@@ -356,14 +362,13 @@ public void subscribe(CoreSubscriber<? super T> actual) {
356362
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
357363
if (actual instanceof Fuseable.ConditionalSubscriber) {
358364
INNER.lazySet(this, (Fuseable.ConditionalSubscriber<? super T>) actual);
359-
}
360-
else {
365+
} else {
361366
INNER.lazySet(this, new ConditionalSubscriberAdapter<>(actual));
362367
}
363368
actual.onSubscribe(this);
364-
}
365-
else {
366-
Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber"));
369+
} else {
370+
Operators.error(
371+
actual, new IllegalStateException("SwitchTransform allows only one Subscriber"));
367372
}
368373
}
369374

@@ -388,12 +393,11 @@ public void onNext(T t) {
388393
try {
389394
first = t;
390395
Publisher<? extends R> result =
391-
Objects.requireNonNull(
392-
transformer.apply(t, this), "The transformer returned a null value");
396+
Objects.requireNonNull(
397+
transformer.apply(t, this), "The transformer returned a null value");
393398
result.subscribe(outer);
394399
return;
395-
}
396-
catch (Throwable e) {
400+
} catch (Throwable e) {
397401
onError(Operators.onOperatorError(s, e, t, currentContext()));
398402
ReferenceCountUtil.safeRelease(t);
399403
return;
@@ -416,12 +420,11 @@ public boolean tryOnNext(T t) {
416420
try {
417421
first = t;
418422
Publisher<? extends R> result =
419-
Objects.requireNonNull(
420-
transformer.apply(t, this), "The transformer returned a null value");
423+
Objects.requireNonNull(
424+
transformer.apply(t, this), "The transformer returned a null value");
421425
result.subscribe(outer);
422426
return true;
423-
}
424-
catch (Throwable e) {
427+
} catch (Throwable e) {
425428
onError(Operators.onOperatorError(s, e, t, currentContext()));
426429
ReferenceCountUtil.safeRelease(t);
427430
return false;
@@ -446,8 +449,7 @@ public void onError(Throwable t) {
446449
if (first == null) {
447450
drainRegular();
448451
}
449-
}
450-
else {
452+
} else {
451453
Operators.error(outer, t);
452454
}
453455
}
@@ -465,8 +467,7 @@ public void onComplete() {
465467
if (first == null) {
466468
drainRegular();
467469
}
468-
}
469-
else {
470+
} else {
470471
Operators.complete(outer);
471472
}
472473
}
@@ -477,8 +478,7 @@ public void request(long n) {
477478
if (--n > 0) {
478479
s.request(n);
479480
}
480-
}
481-
else {
481+
} else {
482482
s.request(n);
483483
}
484484
}
@@ -494,7 +494,7 @@ boolean drainRegular() {
494494
Subscription s = this.s;
495495
CoreSubscriber<? super T> a = inner;
496496

497-
for (;;) {
497+
for (; ; ) {
498498
if (f != null) {
499499
first = null;
500500
ReferenceCountUtil.safeRelease(f);
@@ -517,14 +517,12 @@ boolean drainRegular() {
517517
Throwable t = throwable;
518518
if (t != null) {
519519
a.onError(t);
520-
}
521-
else {
520+
} else {
522521
a.onComplete();
523522
}
524523
return sent;
525524
}
526525

527-
528526
m = WIP.addAndGet(this, -m);
529527

530528
if (m == 0) {

0 commit comments

Comments
 (0)