5454import io .rsocket .util .MonoLifecycleHandler ;
5555import java .nio .channels .ClosedChannelException ;
5656import java .util .concurrent .CancellationException ;
57- import java .util .concurrent .atomic .AtomicBoolean ;
57+ import java .util .concurrent .atomic .AtomicInteger ;
5858import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
5959import java .util .function .Consumer ;
6060import java .util .function .LongConsumer ;
@@ -260,6 +260,7 @@ public void doOnTerminal(
260260 removeStreamReceiver (streamId );
261261 }
262262 });
263+
263264 receivers .put (streamId , receiver );
264265
265266 return receiver .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
@@ -281,7 +282,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
281282
282283 final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
283284 final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
284- final AtomicBoolean payloadReleasedFlag = new AtomicBoolean ( false );
285+ final AtomicInteger wip = new AtomicInteger ( 0 );
285286
286287 receivers .put (streamId , receiver );
287288
@@ -293,30 +294,49 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
293294
294295 @ Override
295296 public void accept (long n ) {
296- if (firstRequest && ! receiver . isDisposed () ) {
297+ if (firstRequest ) {
297298 firstRequest = false ;
298- if (! payloadReleasedFlag . getAndSet ( true ) ) {
299- sendProcessor . onNext (
300- RequestStreamFrameFlyweight . encodeReleasingPayload (
301- allocator , streamId , n , payload )) ;
299+ if (wip . getAndIncrement () != 0 ) {
300+ // no need to do anything.
301+ // stream was canceled and fist payload has already been discarded
302+ return ;
302303 }
303- } else if (contains (streamId ) && !receiver .isDisposed ()) {
304+ int missed = 1 ;
305+ boolean firstHasBeenSent = false ;
306+ for (; ; ) {
307+ if (!firstHasBeenSent ) {
308+ sendProcessor .onNext (
309+ RequestStreamFrameFlyweight .encodeReleasingPayload (
310+ allocator , streamId , n , payload ));
311+ firstHasBeenSent = true ;
312+ } else {
313+ // if first frame was sent but we cycling again, it means that wip was
314+ // incremented at doOnCancel
315+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
316+ return ;
317+ }
318+
319+ missed = wip .addAndGet (-missed );
320+ if (missed == 0 ) {
321+ return ;
322+ }
323+ }
324+ } else if (!receiver .isDisposed ()) {
304325 sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
305326 }
306327 }
307328 })
308- .doOnError (
309- t -> {
310- if (contains (streamId ) && !receiver .isDisposed ()) {
311- sendProcessor .onNext (ErrorFrameFlyweight .encode (allocator , streamId , t ));
312- }
313- })
314329 .doOnCancel (
315330 () -> {
316- if (! payloadReleasedFlag . getAndSet ( true ) ) {
317- payload . release () ;
331+ if (wip . getAndIncrement () != 0 ) {
332+ return ;
318333 }
319- if (contains (streamId ) && !receiver .isDisposed ()) {
334+
335+ // check if we need to release payload
336+ // only applicable if the cancel appears earlier than actual request
337+ if (payload .refCnt () > 0 ) {
338+ payload .release ();
339+ } else {
320340 sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
321341 }
322342 })
@@ -330,30 +350,32 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
330350 return Flux .error (err );
331351 }
332352
333- return request .switchOnFirst (
334- (s , flux ) -> {
335- Payload payload = s .get ();
336- if (payload != null ) {
337- if (!PayloadValidationUtils .isValid (mtu , payload )) {
338- payload .release ();
339- final IllegalArgumentException t =
340- new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
341- errorConsumer .accept (t );
342- return Mono .error (t );
343- }
344- return handleChannel (payload , flux );
345- } else {
346- return flux ;
347- }
348- },
349- false );
353+ return request
354+ .switchOnFirst (
355+ (s , flux ) -> {
356+ Payload payload = s .get ();
357+ if (payload != null ) {
358+ if (!PayloadValidationUtils .isValid (mtu , payload )) {
359+ payload .release ();
360+ final IllegalArgumentException t =
361+ new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
362+ errorConsumer .accept (t );
363+ return Mono .error (t );
364+ }
365+ return handleChannel (payload , flux );
366+ } else {
367+ return flux ;
368+ }
369+ },
370+ false )
371+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
350372 }
351373
352374 private Flux <? extends Payload > handleChannel (Payload initialPayload , Flux <Payload > inboundFlux ) {
353375 final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
354- final AtomicBoolean payloadReleasedFlag = new AtomicBoolean (false );
355376 final int streamId = streamIdSupplier .nextStreamId (receivers );
356377
378+ final AtomicInteger wip = new AtomicInteger (0 );
357379 final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
358380 final BaseSubscriber <Payload > upstreamSubscriber =
359381 new BaseSubscriber <Payload >() {
@@ -421,19 +443,47 @@ protected void hookFinally(SignalType type) {
421443 public void accept (long n ) {
422444 if (firstRequest ) {
423445 firstRequest = false ;
424- senders .put (streamId , upstreamSubscriber );
425- receivers .put (streamId , receiver );
426-
427- inboundFlux
428- .limitRate (Queues .SMALL_BUFFER_SIZE )
429- .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER )
430- .subscribe (upstreamSubscriber );
431- if (!payloadReleasedFlag .getAndSet (true )) {
432- ByteBuf frame =
433- RequestChannelFrameFlyweight .encodeReleasingPayload (
434- allocator , streamId , false , n , initialPayload );
435-
436- sendProcessor .onNext (frame );
446+ if (wip .getAndIncrement () != 0 ) {
447+ // no need to do anything.
448+ // stream was canceled and fist payload has already been discarded
449+ return ;
450+ }
451+ int missed = 1 ;
452+ boolean firstHasBeenSent = false ;
453+ for (; ; ) {
454+ if (!firstHasBeenSent ) {
455+ ByteBuf frame ;
456+ try {
457+ frame =
458+ RequestChannelFrameFlyweight .encodeReleasingPayload (
459+ allocator , streamId , false , n , initialPayload );
460+ } catch (IllegalReferenceCountException e ) {
461+ return ;
462+ }
463+
464+ senders .put (streamId , upstreamSubscriber );
465+ receivers .put (streamId , receiver );
466+
467+ inboundFlux
468+ .limitRate (Queues .SMALL_BUFFER_SIZE )
469+ .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER )
470+ .subscribe (upstreamSubscriber );
471+
472+ sendProcessor .onNext (frame );
473+ firstHasBeenSent = true ;
474+ } else {
475+ // if first frame was sent but we cycling again, it means that wip was
476+ // incremented at doOnCancel
477+ senders .remove (streamId , upstreamSubscriber );
478+ receivers .remove (streamId , receiver );
479+ sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
480+ return ;
481+ }
482+
483+ missed = wip .addAndGet (-missed );
484+ if (missed == 0 ) {
485+ return ;
486+ }
437487 }
438488 } else {
439489 sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
@@ -442,22 +492,22 @@ public void accept(long n) {
442492 })
443493 .doOnError (
444494 t -> {
445- if (receivers .remove (streamId , receiver )) {
446- upstreamSubscriber .cancel ();
447- }
495+ upstreamSubscriber .cancel ();
496+ receivers .remove (streamId , receiver );
448497 })
449498 .doOnComplete (() -> receivers .remove (streamId , receiver ))
450499 .doOnCancel (
451500 () -> {
452- if (!payloadReleasedFlag .getAndSet (true )) {
453- initialPayload .release ();
501+ upstreamSubscriber .cancel ();
502+ if (wip .getAndIncrement () != 0 ) {
503+ return ;
454504 }
505+
506+ // need to send frame only if RequestChannelFrame was sent
455507 if (receivers .remove (streamId , receiver )) {
456508 sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
457- upstreamSubscriber .cancel ();
458509 }
459- })
460- .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
510+ });
461511 }
462512
463513 private Mono <Void > handleMetadataPush (Payload payload ) {
0 commit comments