Skip to content

Commit 307f97b

Browse files
authored
2.x: option to fail for using blockingX on the computation scheduler (#5020)
* 2.x: option to fail for using blockingX on the computation scheduler * Increase sleep time in XFlatMapTest * Add a custom RxJavaPlugins callback onBeforeBlocking
1 parent f53e029 commit 307f97b

23 files changed

+913
-43
lines changed

src/main/java/io/reactivex/internal/observers/BlockingBaseObserver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import io.reactivex.Observer;
1818
import io.reactivex.disposables.Disposable;
19-
import io.reactivex.internal.util.ExceptionHelper;
19+
import io.reactivex.internal.util.*;
2020

2121
public abstract class BlockingBaseObserver<T> extends CountDownLatch
2222
implements Observer<T>, Disposable {
@@ -67,6 +67,7 @@ public final boolean isDisposed() {
6767
public final T blockingGet() {
6868
if (getCount() != 0) {
6969
try {
70+
BlockingHelper.verifyNonBlocking();
7071
await();
7172
} catch (InterruptedException ex) {
7273
dispose();

src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.internal.util.ExceptionHelper;
20+
import io.reactivex.internal.util.*;
2121

2222
/**
2323
* A combined Observer that awaits the success or error signal via a CountDownLatch.
@@ -79,6 +79,7 @@ public void onComplete() {
7979
public T blockingGet() {
8080
if (getCount() != 0) {
8181
try {
82+
BlockingHelper.verifyNonBlocking();
8283
await();
8384
} catch (InterruptedException ex) {
8485
dispose();
@@ -101,6 +102,7 @@ public T blockingGet() {
101102
public T blockingGet(T defaultValue) {
102103
if (getCount() != 0) {
103104
try {
105+
BlockingHelper.verifyNonBlocking();
104106
await();
105107
} catch (InterruptedException ex) {
106108
dispose();
@@ -123,6 +125,7 @@ public T blockingGet(T defaultValue) {
123125
public Throwable blockingGetError() {
124126
if (getCount() != 0) {
125127
try {
128+
BlockingHelper.verifyNonBlocking();
126129
await();
127130
} catch (InterruptedException ex) {
128131
dispose();
@@ -142,6 +145,7 @@ public Throwable blockingGetError() {
142145
public Throwable blockingGetError(long timeout, TimeUnit unit) {
143146
if (getCount() != 0) {
144147
try {
148+
BlockingHelper.verifyNonBlocking();
145149
if (!await(timeout, unit)) {
146150
dispose();
147151
throw ExceptionHelper.wrapOrThrow(new TimeoutException());
@@ -164,6 +168,7 @@ public Throwable blockingGetError(long timeout, TimeUnit unit) {
164168
public boolean blockingAwait(long timeout, TimeUnit unit) {
165169
if (getCount() != 0) {
166170
try {
171+
BlockingHelper.verifyNonBlocking();
167172
if (!await(timeout, unit)) {
168173
dispose();
169174
return false;

src/main/java/io/reactivex/internal/observers/FutureObserver.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.Observer;
2121
import io.reactivex.disposables.Disposable;
2222
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.util.BlockingHelper;
2324
import io.reactivex.plugins.RxJavaPlugins;
2425

2526
/**
@@ -72,6 +73,7 @@ public boolean isDone() {
7273
@Override
7374
public T get() throws InterruptedException, ExecutionException {
7475
if (getCount() != 0) {
76+
BlockingHelper.verifyNonBlocking();
7577
await();
7678
}
7779

@@ -88,6 +90,7 @@ public T get() throws InterruptedException, ExecutionException {
8890
@Override
8991
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
9092
if (getCount() != 0) {
93+
BlockingHelper.verifyNonBlocking();
9194
if (!await(timeout, unit)) {
9295
throw new TimeoutException();
9396
}

src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.SingleObserver;
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.util.BlockingHelper;
2223
import io.reactivex.plugins.RxJavaPlugins;
2324

2425
/**
@@ -71,6 +72,7 @@ public boolean isDone() {
7172
@Override
7273
public T get() throws InterruptedException, ExecutionException {
7374
if (getCount() != 0) {
75+
BlockingHelper.verifyNonBlocking();
7476
await();
7577
}
7678

@@ -87,6 +89,7 @@ public T get() throws InterruptedException, ExecutionException {
8789
@Override
8890
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
8991
if (getCount() != 0) {
92+
BlockingHelper.verifyNonBlocking();
9093
if (!await(timeout, unit)) {
9194
throw new TimeoutException();
9295
}

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.exceptions.MissingBackpressureException;
2424
import io.reactivex.internal.queue.SpscArrayQueue;
2525
import io.reactivex.internal.subscriptions.SubscriptionHelper;
26-
import io.reactivex.internal.util.ExceptionHelper;
26+
import io.reactivex.internal.util.*;
2727

2828
public final class BlockingFlowableIterable<T> implements Iterable<T> {
2929
final Publisher<? extends T> source;
@@ -86,6 +86,7 @@ public boolean hasNext() {
8686
}
8787
}
8888
if (empty) {
89+
BlockingHelper.verifyNonBlocking();
8990
lock.lock();
9091
try {
9192
while (!done && queue.isEmpty()) {

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.reactivestreams.Publisher;
2121

2222
import io.reactivex.*;
23-
import io.reactivex.internal.util.ExceptionHelper;
23+
import io.reactivex.internal.util.*;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525
import io.reactivex.subscribers.DisposableSubscriber;
2626

@@ -79,6 +79,7 @@ public boolean hasNext() {
7979
if (iteratorNotification == null || iteratorNotification.isOnNext()) {
8080
if (iteratorNotification == null) {
8181
try {
82+
BlockingHelper.verifyNonBlocking();
8283
notify.acquire();
8384
} catch (InterruptedException ex) {
8485
dispose();

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.reactivestreams.Publisher;
2121

2222
import io.reactivex.*;
23-
import io.reactivex.internal.util.ExceptionHelper;
23+
import io.reactivex.internal.util.*;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525
import io.reactivex.subscribers.DisposableSubscriber;
2626

@@ -165,6 +165,7 @@ public void onNext(Notification<T> args) {
165165

166166
public Notification<T> takeNext() throws InterruptedException {
167167
setWaiting();
168+
BlockingHelper.verifyNonBlocking();
168169
return buf.take();
169170
}
170171
void setWaiting() {

src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T>
5757
if (bs.isCancelled()) {
5858
break;
5959
}
60+
BlockingHelper.verifyNonBlocking();
6061
v = queue.take();
6162
}
6263
if (bs.isCancelled()) {

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.disposables.Disposable;
2222
import io.reactivex.internal.disposables.DisposableHelper;
2323
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
24-
import io.reactivex.internal.util.ExceptionHelper;
24+
import io.reactivex.internal.util.*;
2525

2626
public final class BlockingObservableIterable<T> implements Iterable<T> {
2727
final ObservableSource<? extends T> source;
@@ -78,6 +78,7 @@ public boolean hasNext() {
7878
}
7979
if (empty) {
8080
try {
81+
BlockingHelper.verifyNonBlocking();
8182
lock.lock();
8283
try {
8384
while (!done && queue.isEmpty()) {

src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import io.reactivex.*;
2121
import io.reactivex.Observable;
22-
import io.reactivex.internal.util.ExceptionHelper;
22+
import io.reactivex.internal.util.*;
2323
import io.reactivex.observers.DisposableObserver;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525

@@ -79,6 +79,7 @@ public boolean hasNext() {
7979
}
8080
if (iteratorNotification == null) {
8181
try {
82+
BlockingHelper.verifyNonBlocking();
8283
notify.acquire();
8384
} catch (InterruptedException ex) {
8485
dispose();

0 commit comments

Comments
 (0)