@@ -528,7 +528,8 @@ class StreamingSyncImplementation implements StreamingSync {
528528 }
529529
530530 Future <http.StreamedResponse ?> _postStreamRequest (
531- Object ? data, bool acceptBson) async {
531+ Object ? data, bool acceptBson,
532+ {Future <void >? onAbort}) async {
532533 const ndJson = 'application/x-ndjson' ;
533534 const bson = 'application/vnd.powersync.bson-stream' ;
534535
@@ -538,8 +539,8 @@ class StreamingSyncImplementation implements StreamingSync {
538539 }
539540 final uri = credentials.endpointUri ('sync/stream' );
540541
541- final request =
542- http. AbortableRequest ( 'POST' , uri, abortTrigger: _abort! .onAbort);
542+ final request = http. AbortableRequest ( 'POST' , uri,
543+ abortTrigger: onAbort ?? _abort! .onAbort);
543544 request.headers['Content-Type' ] = 'application/json' ;
544545 request.headers['Authorization' ] = "Token ${credentials .token }" ;
545546 request.headers['Accept' ] =
@@ -645,9 +646,10 @@ final class _ActiveRustStreamingIteration {
645646 }
646647 }
647648
648- Stream <ReceivedLine > _receiveLines (Object ? data) {
649+ Stream <ReceivedLine > _receiveLines (Object ? data,
650+ {required Future <void > onAbort}) {
649651 return streamFromFutureAwaitInCancellation (
650- sync ._postStreamRequest (data, true ))
652+ sync ._postStreamRequest (data, true , onAbort : onAbort ))
651653 .asyncExpand< Object /* Uint8List | String */ > ((response) {
652654 if (response == null ) {
653655 return null ;
@@ -662,33 +664,66 @@ final class _ActiveRustStreamingIteration {
662664
663665 Future <RustSyncIterationResult > _handleLines (
664666 EstablishSyncStream request) async {
667+ // This is a workaround for https://github.com/dart-lang/http/issues/1820:
668+ // When cancelling the stream subscription of an HTTP response with the
669+ // fetch-based client implementation, cancelling the subscription is delayed
670+ // until the next chunk (typically a token_expires_in message in our case).
671+ // So, before cancelling, we complete an abort controller for the request to
672+ // speed things up. This is not an issue in most cases because the abort
673+ // controller on this stream would be completed when disconnecting. But
674+ // when switching sync streams, that's not the case and we need a second
675+ // abort controller for the inner iteration.
676+ final innerAbort = Completer <void >.sync ();
665677 final events = addBroadcast (
666- _receiveLines (request.request), sync ._nonLineSyncEvents.stream);
678+ _receiveLines (
679+ request.request,
680+ onAbort: Future .any ([
681+ sync ._abort! .onAbort,
682+ innerAbort.future,
683+ ]),
684+ ),
685+ sync ._nonLineSyncEvents.stream,
686+ );
667687
668688 var needsImmediateRestart = false ;
669689 loop:
670- await for (final event in events) {
671- if (! _isActive || sync .aborted) {
672- break ;
673- }
690+ try {
691+ await for (final event in events) {
692+ if (! _isActive || sync .aborted) {
693+ innerAbort.complete ();
694+ break ;
695+ }
674696
675- switch (event) {
676- case ReceivedLine (line: final Uint8List line):
677- _triggerCrudUploadOnFirstLine ();
678- await _control ('line_binary' , line);
679- case ReceivedLine (line: final line as String ):
680- _triggerCrudUploadOnFirstLine ();
681- await _control ('line_text' , line);
682- case UploadCompleted ():
683- await _control ('completed_upload' );
684- case AbortCurrentIteration (: final hideDisconnectState):
685- needsImmediateRestart = hideDisconnectState;
686- break loop;
687- case TokenRefreshComplete ():
688- await _control ('refreshed_token' );
689- case HandleChangedSubscriptions (: final currentSubscriptions):
690- await _control ('update_subscriptions' ,
691- convert.json.encode (_encodeSubscriptions (currentSubscriptions)));
697+ switch (event) {
698+ case ReceivedLine (line: final Uint8List line):
699+ _triggerCrudUploadOnFirstLine ();
700+ await _control ('line_binary' , line);
701+ case ReceivedLine (line: final line as String ):
702+ _triggerCrudUploadOnFirstLine ();
703+ await _control ('line_text' , line);
704+ case UploadCompleted ():
705+ await _control ('completed_upload' );
706+ case AbortCurrentIteration (: final hideDisconnectState):
707+ innerAbort.complete ();
708+ needsImmediateRestart = hideDisconnectState;
709+ break loop;
710+ case TokenRefreshComplete ():
711+ await _control ('refreshed_token' );
712+ case HandleChangedSubscriptions (: final currentSubscriptions):
713+ await _control (
714+ 'update_subscriptions' ,
715+ convert.json
716+ .encode (_encodeSubscriptions (currentSubscriptions)));
717+ }
718+ }
719+ } on http.RequestAbortedException {
720+ // Unlike a regular cancellation, cancelling via the abort controller
721+ // emits an error. We did mean to just cancel the stream, so we can
722+ // safely ignore that.
723+ if (innerAbort.isCompleted) {
724+ // ignore
725+ } else {
726+ rethrow ;
692727 }
693728 }
694729
0 commit comments