1515
1616import java .util .Queue ;
1717import java .util .concurrent .*;
18+ import java .util .concurrent .atomic .AtomicReference ;
1819
19- import io .reactivex .rxjava3 .annotations .NonNull ;
20+ import io .reactivex .rxjava3 .annotations .* ;
2021import io .reactivex .rxjava3 .core .Scheduler ;
21- import io .reactivex .rxjava3 .disposables .* ;
22+ import io .reactivex .rxjava3 .disposables .Disposable ;
2223import io .reactivex .rxjava3 .internal .disposables .EmptyDisposable ;
24+ import io .reactivex .rxjava3 .plugins .RxJavaPlugins ;
2325
2426/**
2527 * A special, non thread-safe scheduler for testing operators that require
2628 * a scheduler without introducing real concurrency and allows manually advancing
2729 * a virtual time.
30+ * <p>
31+ * By default, the tasks submitted via the various {@code schedule} methods are not
32+ * wrapped by the {@link RxJavaPlugins#onSchedule(Runnable)} hook. To enable this behavior,
33+ * create a {@code TestScheduler} via {@link #TestScheduler(boolean)} or {@link #TestScheduler(long, TimeUnit, boolean)}.
2834 */
2935public final class TestScheduler extends Scheduler {
3036 /** The ordered queue for the runnable tasks. */
3137 final Queue <TimedRunnable > queue = new PriorityBlockingQueue <>(11 );
38+ /** Use the {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks. */
39+ final boolean useOnScheduleHook ;
3240 /** The per-scheduler global order counter. */
3341 long counter ;
3442 // Storing time in nanoseconds internally.
@@ -38,7 +46,20 @@ public final class TestScheduler extends Scheduler {
3846 * Creates a new TestScheduler with initial virtual time of zero.
3947 */
4048 public TestScheduler () {
41- // No-op.
49+ this (false );
50+ }
51+
52+ /**
53+ * Creates a new TestScheduler with the option to use the
54+ * {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks.
55+ * @param useOnScheduleHook if {@code true}, the tasks submitted to this
56+ * TestScheduler is wrapped via the
57+ * {@link RxJavaPlugins#onSchedule(Runnable)} hook
58+ * @since 3.0.10 - experimental
59+ */
60+ @ Experimental
61+ public TestScheduler (boolean useOnScheduleHook ) {
62+ this .useOnScheduleHook = useOnScheduleHook ;
4263 }
4364
4465 /**
@@ -50,7 +71,27 @@ public TestScheduler() {
5071 * the units of time that {@code delayTime} is expressed in
5172 */
5273 public TestScheduler (long delayTime , TimeUnit unit ) {
74+ this (delayTime , unit , false );
75+ }
76+
77+ /**
78+ * Creates a new TestScheduler with the specified initial virtual time
79+ * and with the option to use the
80+ * {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks.
81+ *
82+ * @param delayTime
83+ * the point in time to move the Scheduler's clock to
84+ * @param unit
85+ * the units of time that {@code delayTime} is expressed in
86+ * @param useOnScheduleHook if {@code true}, the tasks submitted to this
87+ * TestScheduler is wrapped via the
88+ * {@link RxJavaPlugins#onSchedule(Runnable)} hook
89+ * @since 3.0.10 - experimental
90+ */
91+ @ Experimental
92+ public TestScheduler (long delayTime , TimeUnit unit , boolean useOnScheduleHook ) {
5393 time = unit .toNanos (delayTime );
94+ this .useOnScheduleHook = useOnScheduleHook ;
5495 }
5596
5697 static final class TimedRunnable implements Comparable <TimedRunnable > {
@@ -163,10 +204,13 @@ public Disposable schedule(@NonNull Runnable run, long delayTime, @NonNull TimeU
163204 if (disposed ) {
164205 return EmptyDisposable .INSTANCE ;
165206 }
207+ if (useOnScheduleHook ) {
208+ run = RxJavaPlugins .onSchedule (run );
209+ }
166210 final TimedRunnable timedAction = new TimedRunnable (this , time + unit .toNanos (delayTime ), run , counter ++);
167211 queue .add (timedAction );
168212
169- return Disposable . fromRunnable ( new QueueRemove (timedAction ) );
213+ return new QueueRemove (timedAction );
170214 }
171215
172216 @ NonNull
@@ -175,26 +219,38 @@ public Disposable schedule(@NonNull Runnable run) {
175219 if (disposed ) {
176220 return EmptyDisposable .INSTANCE ;
177221 }
222+ if (useOnScheduleHook ) {
223+ run = RxJavaPlugins .onSchedule (run );
224+ }
178225 final TimedRunnable timedAction = new TimedRunnable (this , 0 , run , counter ++);
179226 queue .add (timedAction );
180- return Disposable . fromRunnable ( new QueueRemove (timedAction ) );
227+ return new QueueRemove (timedAction );
181228 }
182229
183230 @ Override
184231 public long now (@ NonNull TimeUnit unit ) {
185232 return TestScheduler .this .now (unit );
186233 }
187234
188- final class QueueRemove implements Runnable {
189- final TimedRunnable timedAction ;
235+ final class QueueRemove extends AtomicReference <TimedRunnable > implements Disposable {
236+
237+ private static final long serialVersionUID = -7874968252110604360L ;
190238
191239 QueueRemove (TimedRunnable timedAction ) {
192- this .timedAction = timedAction ;
240+ this .lazySet (timedAction );
241+ }
242+
243+ @ Override
244+ public void dispose () {
245+ TimedRunnable tr = getAndSet (null );
246+ if (tr != null ) {
247+ queue .remove (tr );
248+ }
193249 }
194250
195251 @ Override
196- public void run () {
197- queue . remove ( timedAction ) ;
252+ public boolean isDisposed () {
253+ return get () == null ;
198254 }
199255 }
200256 }
0 commit comments