@@ -7,11 +7,7 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
77import { AbortOperation } from '../../../utils/AbortOperation.js' ;
88import { DataStream } from '../../../utils/DataStream.js' ;
99import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js' ;
10- import {
11- StreamingSyncLine ,
12- StreamingSyncLineOrCrudUploadComplete ,
13- StreamingSyncRequest
14- } from './streaming-sync-types.js' ;
10+ import { StreamingSyncRequest } from './streaming-sync-types.js' ;
1511import { WebsocketClientTransport } from './WebsocketClientTransport.js' ;
1612
1713export type BSONImplementation = typeof BSON ;
@@ -305,6 +301,27 @@ export abstract class AbstractRemote {
305301 // automatically as a header.
306302 const userAgent = this . getUserAgent ( ) ;
307303
304+ const stream = new DataStream < T , Uint8Array > ( {
305+ logger : this . logger ,
306+ pressure : {
307+ lowWaterMark : SYNC_QUEUE_REQUEST_LOW_WATER
308+ } ,
309+ mapLine : map
310+ } ) ;
311+
312+ // Handle upstream abort
313+ if ( options . abortSignal ?. aborted ) {
314+ throw new AbortOperation ( 'Connection request aborted' ) ;
315+ } else {
316+ options . abortSignal ?. addEventListener (
317+ 'abort' ,
318+ ( ) => {
319+ stream . close ( ) ;
320+ } ,
321+ { once : true }
322+ ) ;
323+ }
324+
308325 let keepAliveTimeout : any ;
309326 const resetTimeout = ( ) => {
310327 clearTimeout ( keepAliveTimeout ) ;
@@ -315,15 +332,28 @@ export abstract class AbstractRemote {
315332 } ;
316333 resetTimeout ( ) ;
317334
335+ // Typescript complains about this being `never` if it's not assigned here.
336+ // This is assigned in `wsCreator`.
337+ let disposeSocketConnectionTimeout = ( ) => { } ;
338+
318339 const url = this . options . socketUrlTransformer ( request . url ) ;
319340 const connector = new RSocketConnector ( {
320341 transport : new WebsocketClientTransport ( {
321342 url,
322343 wsCreator : ( url ) => {
323344 const socket = this . createSocket ( url ) ;
345+ disposeSocketConnectionTimeout = stream . registerListener ( {
346+ closed : ( ) => {
347+ // Allow closing the underlying WebSocket if the stream was closed before the
348+ // RSocket connect completed. This should effectively abort the request.
349+ socket . close ( ) ;
350+ }
351+ } ) ;
352+
324353 socket . addEventListener ( 'message' , ( event ) => {
325354 resetTimeout ( ) ;
326355 } ) ;
356+
327357 return socket ;
328358 }
329359 } ) ,
@@ -345,22 +375,19 @@ export abstract class AbstractRemote {
345375 let rsocket : RSocket ;
346376 try {
347377 rsocket = await connector . connect ( ) ;
378+ // The connection is established, we no longer need to monitor the initial timeout
379+ disposeSocketConnectionTimeout ( ) ;
348380 } catch ( ex ) {
349381 this . logger . error ( `Failed to connect WebSocket` , ex ) ;
350382 clearTimeout ( keepAliveTimeout ) ;
383+ if ( ! stream . closed ) {
384+ await stream . close ( ) ;
385+ }
351386 throw ex ;
352387 }
353388
354389 resetTimeout ( ) ;
355390
356- const stream = new DataStream < T , Uint8Array > ( {
357- logger : this . logger ,
358- pressure : {
359- lowWaterMark : SYNC_QUEUE_REQUEST_LOW_WATER
360- } ,
361- mapLine : map
362- } ) ;
363-
364391 let socketIsClosed = false ;
365392 const closeSocket = ( ) => {
366393 clearTimeout ( keepAliveTimeout ) ;
@@ -455,18 +482,6 @@ export abstract class AbstractRemote {
455482 }
456483 } ) ;
457484
458- /**
459- * Handle abort operations here.
460- * Unfortunately cannot insert them into the connection.
461- */
462- if ( options . abortSignal ?. aborted ) {
463- stream . close ( ) ;
464- } else {
465- options . abortSignal ?. addEventListener ( 'abort' , ( ) => {
466- stream . close ( ) ;
467- } ) ;
468- }
469-
470485 return stream ;
471486 }
472487
0 commit comments