@@ -18,6 +18,11 @@ import { CrudEntry } from './sync/bucket/CrudEntry';
1818import { mutexRunExclusive } from '../utils/mutex' ;
1919import { BaseObserver } from '../utils/BaseObserver' ;
2020import { EventIterator } from 'event-iterator' ;
21+ import { quoteIdentifier } from '../utils/strings' ;
22+
23+ export interface DisconnectAndClearOptions {
24+ clearLocal ?: boolean ;
25+ }
2126
2227export interface PowerSyncDatabaseOptions {
2328 schema : Schema ;
@@ -57,6 +62,10 @@ export interface PowerSyncDBListener extends StreamingSyncImplementationListener
5762
5863const POWERSYNC_TABLE_MATCH = / ( ^ p s _ d a t a _ _ | ^ p s _ d a t a _ l o c a l _ _ ) / ;
5964
65+ const DEFAULT_DISCONNECT_CLEAR_OPTIONS : DisconnectAndClearOptions = {
66+ clearLocal : true
67+ } ;
68+
6069export const DEFAULT_WATCH_THROTTLE_MS = 30 ;
6170
6271export const DEFAULT_POWERSYNC_DB_OPTIONS = {
@@ -90,21 +99,23 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
9099 protected bucketStorageAdapter : BucketStorageAdapter ;
91100 private syncStatusListenerDisposer ?: ( ) => void ;
92101 protected _isReadyPromise : Promise < void > ;
102+ protected _schema : Schema ;
93103
94104 constructor ( protected options : PowerSyncDatabaseOptions ) {
95105 super ( ) ;
96106 this . bucketStorageAdapter = this . generateBucketStorageAdapter ( ) ;
97107 this . closed = true ;
98108 this . currentStatus = null ;
99109 this . options = { ...DEFAULT_POWERSYNC_DB_OPTIONS , ...options } ;
110+ this . _schema = options . schema ;
100111 this . ready = false ;
101112 this . sdkVersion = '' ;
102113 // Start async init
103114 this . _isReadyPromise = this . initialize ( ) ;
104115 }
105116
106117 get schema ( ) {
107- return this . options . schema ;
118+ return this . _schema ;
108119 }
109120
110121 protected get database ( ) {
@@ -145,13 +156,32 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
145156 protected async initialize ( ) {
146157 await this . _initialize ( ) ;
147158 await this . bucketStorageAdapter . init ( ) ;
148- await this . database . execute ( 'SELECT powersync_replace_schema(?)' , [ JSON . stringify ( this . schema . toJSON ( ) ) ] ) ;
149159 const version = await this . options . database . execute ( 'SELECT powersync_rs_version()' ) ;
150160 this . sdkVersion = version . rows ?. item ( 0 ) [ 'powersync_rs_version()' ] ?? '' ;
161+ await this . updateSchema ( this . options . schema ) ;
151162 this . ready = true ;
152163 this . iterateListeners ( ( cb ) => cb . initialized ?.( ) ) ;
153164 }
154165
166+ async updateSchema ( schema : Schema ) {
167+ if ( this . abortController ) {
168+ throw new Error ( 'Cannot update schema while connected' ) ;
169+ }
170+
171+ /**
172+ * TODO
173+ * Validations only show a warning for now.
174+ * The next major release should throw an exception.
175+ */
176+ try {
177+ schema . validate ( ) ;
178+ } catch ( ex ) {
179+ this . options . logger . warn ( 'Schema validation failed. Unexpected behaviour could occur' , ex ) ;
180+ }
181+ this . _schema = schema ;
182+ await this . database . execute ( 'SELECT powersync_replace_schema(?)' , [ JSON . stringify ( this . schema . toJSON ( ) ) ] ) ;
183+ }
184+
155185 /**
156186 * Queues a CRUD upload when internal CRUD tables have been updated
157187 */
@@ -208,24 +238,31 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
208238 * The database can still be queried after this is called, but the tables
209239 * would be empty.
210240 */
211- async disconnectAndClear ( ) {
241+ async disconnectAndClear ( options = DEFAULT_DISCONNECT_CLEAR_OPTIONS ) {
212242 await this . disconnect ( ) ;
213243
244+ const { clearLocal } = options ;
245+
214246 // TODO DB name, verify this is necessary with extension
215247 await this . database . writeTransaction ( async ( tx ) => {
216- await tx . execute ( `DELETE FROM ${ PSInternalTable . OPLOG } WHERE 1` ) ;
217- await tx . execute ( `DELETE FROM ${ PSInternalTable . CRUD } WHERE 1` ) ;
218- await tx . execute ( `DELETE FROM ${ PSInternalTable . BUCKETS } WHERE 1` ) ;
248+ await tx . execute ( `DELETE FROM ${ PSInternalTable . OPLOG } ` ) ;
249+ await tx . execute ( `DELETE FROM ${ PSInternalTable . CRUD } ` ) ;
250+ await tx . execute ( `DELETE FROM ${ PSInternalTable . BUCKETS } ` ) ;
251+
252+ const tableGlob = clearLocal ? 'ps_data_*' : 'ps_data__*' ;
219253
220254 const existingTableRows = await tx . execute (
221- "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"
255+ `
256+ SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?
257+ ` ,
258+ [ tableGlob ]
222259 ) ;
223260
224261 if ( ! existingTableRows . rows . length ) {
225262 return ;
226263 }
227264 for ( const row of existingTableRows . rows . _array ) {
228- await tx . execute ( `DELETE FROM ${ row . name } WHERE 1` ) ;
265+ await tx . execute ( `DELETE FROM ${ quoteIdentifier ( row . name ) } WHERE 1` ) ;
229266 }
230267 } ) ;
231268 }
@@ -499,15 +536,19 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
499536 const throttleMs = options . throttleMs ?? DEFAULT_WATCH_THROTTLE_MS ;
500537
501538 return new EventIterator < WatchOnChangeEvent > ( ( eventOptions ) => {
502- const flushTableUpdates = _ . throttle ( async ( ) => {
503- const intersection = _ . intersection ( watchedTables , throttledTableUpdates ) ;
504- if ( intersection . length ) {
505- eventOptions . push ( {
506- changedTables : intersection
507- } ) ;
508- }
509- throttledTableUpdates = [ ] ;
510- } , throttleMs ) ;
539+ const flushTableUpdates = _ . throttle (
540+ async ( ) => {
541+ const intersection = _ . intersection ( watchedTables , throttledTableUpdates ) ;
542+ if ( intersection . length ) {
543+ eventOptions . push ( {
544+ changedTables : intersection
545+ } ) ;
546+ }
547+ throttledTableUpdates = [ ] ;
548+ } ,
549+ throttleMs ,
550+ { leading : false , trailing : true }
551+ ) ;
511552
512553 const dispose = this . database . registerListener ( {
513554 tablesUpdated : async ( update ) => {
0 commit comments