@@ -849,6 +849,63 @@ function defineSyncTests(impl: SyncClientImplementation) {
849849 ) ;
850850 } ) ;
851851 }
852+
853+ mockSyncServiceTest ( 'can reconnect based on query changes' , async ( { syncService } ) => {
854+ // Test for https://discord.com/channels/1138230179878154300/1399340612435710034/1399340612435710034
855+ const logger = createLogger ( 'test' , { logLevel : Logger . TRACE } ) ;
856+ const logMessages : string [ ] = [ ] ;
857+ ( logger as any ) . invoke = ( level , args ) => {
858+ console . log ( ...args ) ;
859+ logMessages . push ( util . format ( ...args ) ) ;
860+ } ;
861+
862+ const powersync = await syncService . createDatabase ( { logger } ) ;
863+ powersync . watchWithCallback ( 'SELECT * FROM lists' , [ ] , {
864+ onResult ( results ) {
865+ const param = results . rows ?. length ?? 0 ;
866+
867+ powersync . connect ( new TestConnector ( ) , { ...options , params : { a : param } } ) ;
868+ }
869+ } ) ;
870+
871+ await vi . waitFor ( ( ) => expect ( syncService . connectedListeners ) . toHaveLength ( 1 ) ) ;
872+ expect ( syncService . connectedListeners [ 0 ] ) . toMatchObject ( {
873+ parameters : { a : 0 }
874+ } ) ;
875+
876+ syncService . pushLine ( {
877+ checkpoint : {
878+ last_op_id : '1' ,
879+ buckets : [ bucket ( 'a' , 1 ) ]
880+ }
881+ } ) ;
882+ syncService . pushLine ( {
883+ data : {
884+ bucket : 'a' ,
885+ data : [
886+ {
887+ checksum : 0 ,
888+ op_id : '1' ,
889+ op : 'PUT' ,
890+ object_id : 'my_list' ,
891+ object_type : 'lists' ,
892+ data : '{"name": "l"}'
893+ }
894+ ]
895+ }
896+ } ) ;
897+ syncService . pushLine ( { checkpoint_complete : { last_op_id : '1' } } ) ;
898+
899+ await vi . waitFor ( ( ) =>
900+ expect ( syncService . connectedListeners [ 0 ] ) . toMatchObject ( {
901+ parameters : { a : 1 }
902+ } )
903+ ) ;
904+
905+ expect ( logMessages ) . not . toEqual (
906+ expect . arrayContaining ( [ expect . stringContaining ( 'Cannot enqueue data into closed stream' ) ] )
907+ ) ;
908+ } ) ;
852909}
853910
854911function bucket ( name : string , count : number , options : { priority : number } = { priority : 3 } ) : BucketChecksum {
0 commit comments