1313package io .reactivex .internal .operators .flowable ;
1414
1515import java .util .concurrent .Callable ;
16+ import java .util .concurrent .atomic .*;
1617
1718import org .reactivestreams .*;
1819
1920import io .reactivex .exceptions .Exceptions ;
2021import io .reactivex .functions .BiFunction ;
2122import io .reactivex .internal .functions .ObjectHelper ;
22- import io .reactivex .internal .subscribers .SinglePostCompleteSubscriber ;
23- import io .reactivex .internal .subscriptions .EmptySubscription ;
23+ import io .reactivex .internal .fuseable .SimplePlainQueue ;
24+ import io .reactivex .internal .queue .SpscArrayQueue ;
25+ import io .reactivex .internal .subscriptions .*;
26+ import io .reactivex .internal .util .BackpressureHelper ;
2427import io .reactivex .plugins .RxJavaPlugins ;
2528
2629public final class FlowableScanSeed <T , R > extends AbstractFlowableWithUpstream <T , R > {
@@ -45,20 +48,57 @@ protected void subscribeActual(Subscriber<? super R> s) {
4548 return ;
4649 }
4750
48- source .subscribe (new ScanSeedSubscriber <T , R >(s , accumulator , r ));
51+ source .subscribe (new ScanSeedSubscriber <T , R >(s , accumulator , r , bufferSize () ));
4952 }
5053
51- static final class ScanSeedSubscriber <T , R > extends SinglePostCompleteSubscriber <T , R > {
54+ static final class ScanSeedSubscriber <T , R >
55+ extends AtomicInteger
56+ implements Subscriber <T >, Subscription {
5257 private static final long serialVersionUID = -1776795561228106469L ;
5358
59+ final Subscriber <? super R > actual ;
60+
5461 final BiFunction <R , ? super T , R > accumulator ;
5562
56- boolean done ;
63+ final SimplePlainQueue <R > queue ;
64+
65+ final AtomicLong requested ;
66+
67+ final int prefetch ;
68+
69+ final int limit ;
70+
71+ volatile boolean cancelled ;
72+
73+ volatile boolean done ;
74+ Throwable error ;
75+
76+ Subscription s ;
77+
78+ R value ;
5779
58- ScanSeedSubscriber (Subscriber <? super R > actual , BiFunction <R , ? super T , R > accumulator , R value ) {
59- super (actual );
80+ int consumed ;
81+
82+ ScanSeedSubscriber (Subscriber <? super R > actual , BiFunction <R , ? super T , R > accumulator , R value , int prefetch ) {
83+ this .actual = actual ;
6084 this .accumulator = accumulator ;
6185 this .value = value ;
86+ this .prefetch = prefetch ;
87+ this .limit = prefetch - (prefetch >> 2 );
88+ this .queue = new SpscArrayQueue <R >(prefetch );
89+ this .queue .offer (value );
90+ this .requested = new AtomicLong ();
91+ }
92+
93+ @ Override
94+ public void onSubscribe (Subscription s ) {
95+ if (SubscriptionHelper .validate (this .s , s )) {
96+ this .s = s ;
97+
98+ actual .onSubscribe (this );
99+
100+ s .request (prefetch - 1 );
101+ }
62102 }
63103
64104 @ Override
@@ -68,21 +108,18 @@ public void onNext(T t) {
68108 }
69109
70110 R v = value ;
71-
72- R u ;
73-
74111 try {
75- u = ObjectHelper .requireNonNull (accumulator .apply (v , t ), "The accumulator returned a null value" );
76- } catch (Throwable e ) {
77- Exceptions .throwIfFatal (e );
112+ v = ObjectHelper .requireNonNull (accumulator .apply (v , t ), "The accumulator returned a null value" );
113+ } catch (Throwable ex ) {
114+ Exceptions .throwIfFatal (ex );
78115 s .cancel ();
79- onError (e );
116+ onError (ex );
80117 return ;
81118 }
82119
83- value = u ;
84- produced ++ ;
85- actual . onNext ( v );
120+ value = v ;
121+ queue . offer ( v ) ;
122+ drain ( );
86123 }
87124
88125 @ Override
@@ -91,9 +128,9 @@ public void onError(Throwable t) {
91128 RxJavaPlugins .onError (t );
92129 return ;
93130 }
131+ error = t ;
94132 done = true ;
95- value = null ;
96- actual .onError (t );
133+ drain ();
97134 }
98135
99136 @ Override
@@ -102,7 +139,104 @@ public void onComplete() {
102139 return ;
103140 }
104141 done = true ;
105- complete (value );
142+ drain ();
143+ }
144+
145+ @ Override
146+ public void cancel () {
147+ cancelled = true ;
148+ s .cancel ();
149+ if (getAndIncrement () == 0 ) {
150+ queue .clear ();
151+ }
152+ }
153+
154+ @ Override
155+ public void request (long n ) {
156+ if (SubscriptionHelper .validate (n )) {
157+ BackpressureHelper .add (requested , n );
158+ drain ();
159+ }
160+ }
161+
162+ void drain () {
163+ if (getAndIncrement () != 0 ) {
164+ return ;
165+ }
166+
167+ int missed = 1 ;
168+ Subscriber <? super R > a = actual ;
169+ SimplePlainQueue <R > q = queue ;
170+ int lim = limit ;
171+ int c = consumed ;
172+
173+ for (;;) {
174+
175+ long r = requested .get ();
176+ long e = 0L ;
177+
178+ while (e != r ) {
179+ if (cancelled ) {
180+ q .clear ();
181+ return ;
182+ }
183+ boolean d = done ;
184+
185+ if (d ) {
186+ Throwable ex = error ;
187+ if (ex != null ) {
188+ q .clear ();
189+ a .onError (ex );
190+ return ;
191+ }
192+ }
193+
194+ R v = q .poll ();
195+ boolean empty = v == null ;
196+
197+ if (d && empty ) {
198+ a .onComplete ();
199+ return ;
200+ }
201+
202+ if (empty ) {
203+ break ;
204+ }
205+
206+ a .onNext (v );
207+
208+ e ++;
209+ if (++c == lim ) {
210+ c = 0 ;
211+ s .request (lim );
212+ }
213+ }
214+
215+ if (e == r ) {
216+ if (done ) {
217+ Throwable ex = error ;
218+ if (ex != null ) {
219+ q .clear ();
220+ a .onError (ex );
221+ return ;
222+ }
223+ if (q .isEmpty ()) {
224+ a .onComplete ();
225+ return ;
226+ }
227+ }
228+ }
229+
230+ if (e != 0L ) {
231+ BackpressureHelper .produced (requested , e );
232+ }
233+
234+ consumed = c ;
235+ missed = addAndGet (-missed );
236+ if (missed == 0 ) {
237+ break ;
238+ }
239+ }
106240 }
107241 }
108242}
0 commit comments