@@ -14,7 +14,7 @@ import {
1414import { AbstractRemote } from './AbstractRemote' ;
1515import ndjsonStream from 'can-ndjson-stream' ;
1616import { BucketChecksum , BucketStorageAdapter , Checkpoint } from '../bucket/BucketStorageAdapter' ;
17- import { SyncStatus } from '../../../db/crud/SyncStatus' ;
17+ import { SyncStatus , SyncStatusOptions } from '../../../db/crud/SyncStatus' ;
1818import { SyncDataBucket } from '../bucket/SyncDataBucket' ;
1919import { BaseObserver , BaseListener } from '../../../utils/BaseObserver' ;
2020
@@ -48,31 +48,38 @@ export const DEFAULT_STREAMING_SYNC_OPTIONS = {
4848 logger : Logger . get ( 'PowerSyncStream' )
4949} ;
5050
51+ const CRUD_UPLOAD_DEBOUNCE_MS = 1000 ;
52+
5153export abstract class AbstractStreamingSyncImplementation extends BaseObserver < StreamingSyncImplementationListener > {
52- protected _lastSyncedAt : Date ;
54+ protected _lastSyncedAt : Date | null ;
5355 protected options : AbstractStreamingSyncImplementationOptions ;
5456
55- private isUploadingCrud : boolean ;
56-
57- protected _isConnected : boolean ;
57+ syncStatus : SyncStatus ;
5858
5959 constructor ( options : AbstractStreamingSyncImplementationOptions ) {
6060 super ( ) ;
6161 this . options = { ...DEFAULT_STREAMING_SYNC_OPTIONS , ...options } ;
62- this . isUploadingCrud = false ;
63- this . _isConnected = false ;
62+ this . syncStatus = new SyncStatus ( {
63+ connected : false ,
64+ lastSyncedAt : null ,
65+ dataFlow : {
66+ uploading : false ,
67+ downloading : false
68+ }
69+ } ) ;
6470 }
6571
6672 get lastSyncedAt ( ) {
67- return new Date ( this . _lastSyncedAt ) ;
73+ const lastSynced = this . syncStatus . lastSyncedAt ;
74+ return lastSynced && new Date ( lastSynced ) ;
6875 }
6976
7077 protected get logger ( ) {
7178 return this . options . logger ! ;
7279 }
7380
7481 get isConnected ( ) {
75- return this . _isConnected ;
82+ return this . syncStatus . connected ;
7683 }
7784
7885 abstract obtainLock < T > ( lockOptions : LockOptions < T > ) : Promise < T > ;
@@ -81,29 +88,51 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
8188 return this . options . adapter . hasCompletedSync ( ) ;
8289 }
8390
84- triggerCrudUpload ( ) {
85- if ( this . isUploadingCrud ) {
86- return ;
87- }
88- this . _uploadAllCrud ( ) ;
89- }
91+ triggerCrudUpload = _ . debounce (
92+ ( ) => {
93+ if ( ! this . syncStatus . connected || this . syncStatus . dataFlowStatus . uploading ) {
94+ return ;
95+ }
96+ this . _uploadAllCrud ( ) ;
97+ } ,
98+ CRUD_UPLOAD_DEBOUNCE_MS ,
99+ { trailing : true }
100+ ) ;
90101
91102 protected async _uploadAllCrud ( ) : Promise < void > {
92- this . isUploadingCrud = true ;
93- while ( true ) {
94- try {
95- const done = await this . uploadCrudBatch ( ) ;
96- if ( done ) {
97- this . isUploadingCrud = false ;
98- break ;
103+ return this . obtainLock ( {
104+ type : LockType . CRUD ,
105+ callback : async ( ) => {
106+ this . updateSyncStatus ( {
107+ dataFlow : {
108+ uploading : true
109+ }
110+ } ) ;
111+ while ( true ) {
112+ try {
113+ const done = await this . uploadCrudBatch ( ) ;
114+ if ( done ) {
115+ break ;
116+ }
117+ } catch ( ex ) {
118+ this . updateSyncStatus ( {
119+ connected : false ,
120+ dataFlow : {
121+ uploading : false
122+ }
123+ } ) ;
124+ await this . delayRetry ( ) ;
125+ break ;
126+ } finally {
127+ this . updateSyncStatus ( {
128+ dataFlow : {
129+ uploading : false
130+ }
131+ } ) ;
132+ }
99133 }
100- } catch ( ex ) {
101- this . updateSyncStatus ( false ) ;
102- await this . delayRetry ( ) ;
103- this . isUploadingCrud = false ;
104- break ;
105134 }
106- }
135+ } ) ;
107136 }
108137
109138 protected async uploadCrudBatch ( ) : Promise < boolean > {
@@ -123,6 +152,15 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
123152 }
124153
125154 async streamingSync ( signal ?: AbortSignal ) : Promise < void > {
155+ signal ?. addEventListener ( 'abort' , ( ) => {
156+ this . updateSyncStatus ( {
157+ connected : false ,
158+ dataFlow : {
159+ downloading : false
160+ }
161+ } ) ;
162+ } ) ;
163+
126164 while ( true ) {
127165 try {
128166 if ( signal ?. aborted ) {
@@ -132,7 +170,9 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
132170 // Continue immediately
133171 } catch ( ex ) {
134172 this . logger . error ( ex ) ;
135- this . updateSyncStatus ( false ) ;
173+ this . updateSyncStatus ( {
174+ connected : false
175+ } ) ;
136176 // On error, wait a little before retrying
137177 await this . delayRetry ( ) ;
138178 }
@@ -173,7 +213,13 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
173213 signal
174214 ) ) {
175215 // A connection is active and messages are being received
176- this . updateSyncStatus ( true ) ;
216+ if ( ! this . syncStatus . connected ) {
217+ // There is a connection now
218+ _ . defer ( ( ) => this . triggerCrudUpload ( ) ) ;
219+ this . updateSyncStatus ( {
220+ connected : true
221+ } ) ;
222+ }
177223
178224 if ( isStreamingSyncCheckpoint ( line ) ) {
179225 targetCheckpoint = line . checkpoint ;
@@ -204,7 +250,13 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
204250 } else {
205251 appliedCheckpoint = _ . clone ( targetCheckpoint ) ;
206252 this . logger . debug ( 'validated checkpoint' , appliedCheckpoint ) ;
207- this . updateSyncStatus ( true , new Date ( ) ) ;
253+ this . updateSyncStatus ( {
254+ connected : true ,
255+ lastSyncedAt : new Date ( ) ,
256+ dataFlow : {
257+ downloading : false
258+ }
259+ } ) ;
208260 }
209261
210262 validatedCheckpoint = _ . clone ( targetCheckpoint ) ;
@@ -242,6 +294,11 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
242294 await this . options . adapter . setTargetCheckpoint ( targetCheckpoint ) ;
243295 } else if ( isStreamingSyncData ( line ) ) {
244296 const { data } = line ;
297+ this . updateSyncStatus ( {
298+ dataFlow : {
299+ downloading : true
300+ }
301+ } ) ;
245302 await this . options . adapter . saveSyncData ( { buckets : [ SyncDataBucket . fromRow ( data ) ] } ) ;
246303 } else if ( isStreamingKeepalive ( line ) ) {
247304 const remaining_seconds = line . token_expires_in ;
@@ -255,7 +312,10 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
255312 this . logger . debug ( 'Sync complete' ) ;
256313
257314 if ( _ . isEqual ( targetCheckpoint , appliedCheckpoint ) ) {
258- this . updateSyncStatus ( true , new Date ( ) ) ;
315+ this . updateSyncStatus ( {
316+ connected : true ,
317+ lastSyncedAt : new Date ( )
318+ } ) ;
259319 } else if ( _ . isEqual ( validatedCheckpoint , targetCheckpoint ) ) {
260320 const result = await this . options . adapter . syncLocalDatabase ( targetCheckpoint ) ;
261321 if ( ! result . checkpointValid ) {
@@ -268,7 +328,13 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
268328 // Continue waiting.
269329 } else {
270330 appliedCheckpoint = _ . clone ( targetCheckpoint ) ;
271- this . updateSyncStatus ( true , new Date ( ) ) ;
331+ this . updateSyncStatus ( {
332+ connected : true ,
333+ lastSyncedAt : new Date ( ) ,
334+ dataFlow : {
335+ downloading : false
336+ }
337+ } ) ;
272338 }
273339 }
274340 }
@@ -300,14 +366,16 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
300366 }
301367 }
302368
303- private updateSyncStatus ( connected : boolean , lastSyncedAt ?: Date ) {
304- const takeSnapShot = ( ) => [ this . _isConnected , this . _lastSyncedAt ?. valueOf ( ) ] ;
369+ protected updateSyncStatus ( options : SyncStatusOptions ) {
370+ const updatedStatus = new SyncStatus ( {
371+ connected : options . connected ?? this . syncStatus . connected ,
372+ lastSyncedAt : options . lastSyncedAt ?? this . syncStatus . lastSyncedAt ,
373+ dataFlow : _ . merge ( _ . clone ( this . syncStatus . dataFlowStatus ) , options . dataFlow ?? { } )
374+ } ) ;
305375
306- const previousValues = takeSnapShot ( ) ;
307- this . _lastSyncedAt = lastSyncedAt ?? this . lastSyncedAt ;
308- this . _isConnected = connected ;
309- if ( ! _ . isEqual ( previousValues , takeSnapShot ( ) ) ) {
310- this . iterateListeners ( ( cb ) => cb . statusChanged ?.( new SyncStatus ( this . isConnected , this . lastSyncedAt ) ) ) ;
376+ if ( ! this . syncStatus . isEqual ( updatedStatus ) ) {
377+ this . syncStatus = updatedStatus ;
378+ this . iterateListeners ( ( cb ) => cb . statusChanged ?.( updatedStatus ) ) ;
311379 }
312380 }
313381
0 commit comments