Skip to content

Commit 8721538

Browse files
Add default lock timeout for shared sync workers. Add flagging for unprotected clients. Perform reconnect operations inside shared port mutex.
1 parent 58f412b commit 8721538

File tree

5 files changed

+146
-64
lines changed

5 files changed

+146
-64
lines changed

demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@ import { AppSchema, ListRecord, LISTS_TABLE, TODOS_TABLE } from '@/library/power
33
import { SupabaseConnector } from '@/library/powersync/SupabaseConnector';
44
import { CircularProgress } from '@mui/material';
55
import { PowerSyncContext } from '@powersync/react';
6-
import { createBaseLogger, DifferentialWatchedQuery, LogLevel, PowerSyncDatabase } from '@powersync/web';
6+
import {
7+
createBaseLogger,
8+
DifferentialWatchedQuery,
9+
LogLevel,
10+
PowerSyncDatabase,
11+
WASQLiteOpenFactory,
12+
WASQLiteVFS
13+
} from '@powersync/web';
714
import React, { Suspense } from 'react';
815
import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext';
916

@@ -12,8 +19,15 @@ export const useSupabase = () => React.useContext(SupabaseContext);
1219

1320
export const db = new PowerSyncDatabase({
1421
schema: AppSchema,
15-
database: {
16-
dbFilename: 'example.db'
22+
database: new WASQLiteOpenFactory({
23+
dbFilename: 'example.db',
24+
vfs: WASQLiteVFS.OPFSCoopSyncVFS,
25+
flags: {
26+
enableMultiTabs: typeof SharedWorker !== 'undefined'
27+
}
28+
}),
29+
flags: {
30+
enableMultiTabs: typeof SharedWorker !== 'undefined'
1731
}
1832
});
1933

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export interface LockedAsyncDatabaseAdapterOptions {
2626
openConnection: () => Promise<AsyncDatabaseConnection>;
2727
debugMode?: boolean;
2828
logger?: ILogger;
29+
defaultLockTimeoutMs?: number;
2930
}
3031

3132
export type LockedAsyncDatabaseAdapterListener = DBAdapterListener & {
@@ -196,7 +197,7 @@ export class LockedAsyncDatabaseAdapter
196197
return this.acquireLock(
197198
async () => fn(this.generateDBHelpers({ execute: this._execute, executeRaw: this._executeRaw })),
198199
{
199-
timeoutMs: options?.timeoutMs
200+
timeoutMs: options?.timeoutMs ?? this.options.defaultLockTimeoutMs
200201
}
201202
);
202203
}
@@ -206,7 +207,7 @@ export class LockedAsyncDatabaseAdapter
206207
return this.acquireLock(
207208
async () => fn(this.generateDBHelpers({ execute: this._execute, executeRaw: this._executeRaw })),
208209
{
209-
timeoutMs: options?.timeoutMs
210+
timeoutMs: options?.timeoutMs ?? this.options.defaultLockTimeoutMs
210211
}
211212
);
212213
}

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -157,36 +157,8 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
157157
* DB worker, but a port to the DB worker can be transferred to the
158158
* sync worker.
159159
*/
160-
const { crudUploadThrottleMs, identifier, retryDelayMs } = this.options;
161-
const flags = { ...this.webOptions.flags, workers: undefined };
162-
163-
// Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which
164-
// will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker
165-
// to free resources associated with this tab.
166-
// We take hold of this lock as soon-as-possible in order to cater for potentially closed tabs.
167-
getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => {
168-
if (!this.abortOnClose.signal.aborted) {
169-
// Awaiting here ensures the worker is waiting for the lock
170-
await this.syncManager.addLockBasedCloseSignal(lock!.name);
171-
172-
await new Promise<void>((r) => {
173-
this.abortOnClose.signal.onabort = () => r();
174-
});
175-
}
176-
});
177160

178-
this.isInitialized = this.syncManager.setParams(
179-
{
180-
dbParams: this.dbAdapter.getConfiguration(),
181-
streamOptions: {
182-
crudUploadThrottleMs,
183-
identifier,
184-
retryDelayMs,
185-
flags: flags
186-
}
187-
},
188-
options.subscriptions
189-
);
161+
this.isInitialized = this._init();
190162

191163
/**
192164
* Pass along any sync status updates to this listener
@@ -198,13 +170,62 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
198170
},
199171
options.db
200172
);
173+
}
174+
175+
protected async _init() {
176+
/**
177+
* The general flow of initialization is:
178+
* - The client requests a unique navigator lock.
179+
* - Once the lock is acquired, we register the lock with the shared worker.
180+
* - The shared worker can then request the same lock. The client has been closed if the shared worker can acquire the lock.
181+
* - Once the shared worker knows the client's lock, we can guarentee that the shared worker will detect if the client has been closed.
182+
* - This makes the client safe for the shared worker to use.
183+
* - The client side lock is held until the client is disposed.
184+
* - We resolve the top-level promise after the lock has been registered with the shared worker.
185+
* - The client sends the params to the shared worker after locks have been registered.
186+
*/
187+
await new Promise<void>((resolve) => {
188+
// Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which
189+
// will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker
190+
// to free resources associated with this tab.
191+
// We take hold of this lock as soon-as-possible in order to cater for potentially closed tabs.
192+
getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => {
193+
if (!this.abortOnClose.signal.aborted) {
194+
// Awaiting here ensures the worker is waiting for the lock
195+
await this.syncManager.addLockBasedCloseSignal(lock!.name);
196+
197+
// The lock has been registered, we can continue with the initialization
198+
resolve();
199+
200+
await new Promise<void>((r) => {
201+
this.abortOnClose.signal.onabort = () => r();
202+
});
203+
}
204+
});
205+
});
206+
207+
const { crudUploadThrottleMs, identifier, retryDelayMs } = this.options;
208+
const flags = { ...this.webOptions.flags, workers: undefined };
201209

202210
/**
203211
* The sync worker will call this client provider when it needs
204212
* to fetch credentials or upload data.
205213
* This performs bi-directional method calling.
206214
*/
207215
Comlink.expose(this.clientProvider, this.messagePort);
216+
217+
await this.syncManager.setParams(
218+
{
219+
dbParams: this.dbAdapter.getConfiguration(),
220+
streamOptions: {
221+
crudUploadThrottleMs,
222+
identifier,
223+
retryDelayMs,
224+
flags: flags
225+
}
226+
},
227+
this.options.subscriptions
228+
);
208229
}
209230

210231
/**

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ export type WrappedSyncPort = {
7676
db?: DBAdapter;
7777
currentSubscriptions: SubscribedStream[];
7878
closeListeners: (() => void | Promise<void>)[];
79+
/**
80+
* If we can use Navigator locks to detect if the client has closed.
81+
*/
82+
isProtectedFromClose: boolean;
7983
};
8084

8185
/**
@@ -142,7 +146,9 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
142146
createSyncImplementation: async () => {
143147
return this.portMutex.runExclusive(async () => {
144148
await this.waitForReady();
149+
this.logger.debug('Creating sync implementation');
145150
if (!this.dbAdapter) {
151+
this.logger.debug('Opening internal DB');
146152
await this.openInternalDB();
147153
}
148154

@@ -171,6 +177,19 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
171177
return this.connectionManager.syncStreamImplementation?.isConnected ?? false;
172178
}
173179

180+
/**
181+
* Gets the last client port which we know is safe from unexpected closes.
182+
*/
183+
protected get lastWrappedPort(): WrappedSyncPort | undefined {
184+
// Find the last port which is protected from close
185+
for (let i = this.ports.length - 1; i >= 0; i--) {
186+
if (this.ports[i].isProtectedFromClose) {
187+
return this.ports[i];
188+
}
189+
}
190+
return;
191+
}
192+
174193
async waitForStatus(status: SyncStatusOptions): Promise<void> {
175194
return this.withSyncImplementation(async (sync) => {
176195
return sync.waitForStatus(status);
@@ -276,7 +295,8 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
276295
port,
277296
clientProvider: Comlink.wrap<AbstractSharedSyncClientProvider>(port),
278297
currentSubscriptions: [],
279-
closeListeners: []
298+
closeListeners: [],
299+
isProtectedFromClose: false
280300
} satisfies WrappedSyncPort;
281301
this.ports.push(portProvider);
282302

@@ -298,11 +318,11 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
298318
// Remove the port within a mutex context.
299319
// Warns if the port is not found. This should not happen in practice.
300320
// We return early if the port is not found.
301-
const { trackedPort, shouldReconnect } = await this.portMutex.runExclusive(async () => {
321+
return await this.portMutex.runExclusive(async () => {
302322
const index = this.ports.findIndex((p) => p == port);
303323
if (index < 0) {
304324
this.logger.warn(`Could not remove port ${port} since it is not present in active ports.`);
305-
return {};
325+
return () => {};
306326
}
307327

308328
const trackedPort = this.ports[index];
@@ -322,7 +342,6 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
322342
});
323343

324344
const shouldReconnect = !!this.connectionManager.syncStreamImplementation && this.ports.length > 0;
325-
326345
/**
327346
* If the current database adapter is the one that is being closed, we need to disconnect from the backend.
328347
* We can disconnect in the portMutex lock. This ensures the disconnect is not affected by potential other
@@ -338,30 +357,25 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
338357
.catch((ex) => this.logger.warn('Error while disconnecting. Will attempt to reconnect.', ex));
339358
}
340359

341-
return {
342-
shouldReconnect,
343-
trackedPort
344-
};
345-
});
346-
347-
if (!trackedPort) {
348-
// We could not find the port to remove
349-
return () => {};
350-
}
351-
352-
for (const closeListener of trackedPort.closeListeners) {
353-
await closeListener();
354-
}
360+
for (const closeListener of trackedPort.closeListeners) {
361+
await closeListener();
362+
}
355363

356-
if (shouldReconnect) {
357-
await this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions ?? {});
358-
}
364+
try {
365+
await trackedPort.db?.close();
366+
} catch (ex) {
367+
this.logger.warn('error closing database', ex);
368+
}
359369

360-
// Re-index subscriptions, the subscriptions of the removed port would no longer be considered.
361-
this.collectActiveSubscriptions();
370+
// Re-index subscriptions, the subscriptions of the removed port would no longer be considered.
371+
this.collectActiveSubscriptions();
372+
if (shouldReconnect) {
373+
// The internals of this needs a port mutex lock. It should be safe to start this operation here, but we cannot and don't need to await it.
374+
this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions ?? {});
375+
}
362376

363-
// Release proxy
364-
return () => trackedPort.clientProvider[Comlink.releaseProxy]();
377+
return () => trackedPort.clientProvider[Comlink.releaseProxy]();
378+
});
365379
}
366380

367381
triggerCrudUpload() {
@@ -410,7 +424,10 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
410424
remote: new WebRemote(
411425
{
412426
invalidateCredentials: async () => {
413-
const lastPort = this.ports[this.ports.length - 1];
427+
const lastPort = this.lastWrappedPort;
428+
if (!lastPort) {
429+
throw new Error('No client port found to invalidate credentials');
430+
}
414431
try {
415432
this.logger.log('calling the last port client provider to invalidate credentials');
416433
lastPort.clientProvider.invalidateCredentials();
@@ -419,7 +436,10 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
419436
}
420437
},
421438
fetchCredentials: async () => {
422-
const lastPort = this.ports[this.ports.length - 1];
439+
const lastPort = this.lastWrappedPort;
440+
if (!lastPort) {
441+
throw new Error('No client port found to fetch credentials');
442+
}
423443
return new Promise(async (resolve, reject) => {
424444
const abortController = new AbortController();
425445
this.fetchCredentialsController = {
@@ -442,7 +462,10 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
442462
this.logger
443463
),
444464
uploadCrud: async () => {
445-
const lastPort = this.ports[this.ports.length - 1];
465+
const lastPort = this.lastWrappedPort;
466+
if (!lastPort) {
467+
throw new Error('No client port found to upload crud');
468+
}
446469

447470
return new Promise(async (resolve, reject) => {
448471
const abortController = new AbortController();
@@ -470,7 +493,7 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
470493
}
471494

472495
protected async openInternalDB() {
473-
const lastClient = this.ports[this.ports.length - 1];
496+
const lastClient = this.lastWrappedPort;
474497
if (!lastClient) {
475498
// Should not really happen in practice
476499
throw new Error(`Could not open DB connection since no client is connected.`);
@@ -481,6 +504,7 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
481504
const db = await remote(this.syncParams!.dbParams);
482505
const locked = new LockedAsyncDatabaseAdapter({
483506
name: identifier,
507+
defaultLockTimeoutMs: 20_000, // Max wait time for a lock request (we will retry failed attempts)
484508
openConnection: async () => {
485509
const wrapped = new WorkerWrappedAsyncDatabaseConnection({
486510
remote,

packages/web/src/worker/sync/WorkerClient.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
*/
1818
export class WorkerClient {
1919
private resolvedPort: WrappedSyncPort | null = null;
20+
protected resolvedPortPromise: Promise<WrappedSyncPort> | null = null;
2021

2122
constructor(
2223
private readonly sync: SharedSyncImplementation,
@@ -37,7 +38,15 @@ export class WorkerClient {
3738
}
3839
});
3940

40-
this.resolvedPort = await this.sync.addPort(this.port);
41+
/**
42+
* Keep a reference to the resolved port promise.
43+
* The init timing is difficult to predict due to the async message passing.
44+
* We only want to use a port if we are know it's been protected from being closed.
45+
* The lock based close signal will be added asynchronously. We need to use the
46+
* added port once the lock is configured.
47+
*/
48+
this.resolvedPortPromise = this.sync.addPort(this.port);
49+
this.resolvedPort = await this.resolvedPortPromise;
4150
}
4251

4352
private async removePort() {
@@ -61,6 +70,19 @@ export class WorkerClient {
6170
* it can consider the connection to be closed.
6271
*/
6372
addLockBasedCloseSignal(name: string) {
73+
if (!this.resolvedPortPromise) {
74+
// The init logic above is actually synchronous, so this should not happen.
75+
this.sync.broadCastLogger.warn('addLockBasedCloseSignal called before port promise registered');
76+
} else {
77+
this.resolvedPortPromise.then((wrappedPort) => {
78+
/**
79+
* The client registered a navigator lock. We now can guarantee detecting if the client has closed.
80+
* E.g. before this point: It's possible some ports might have been created and closed before the
81+
* lock based close signal is added. We should not trust those ports.
82+
*/
83+
wrappedPort.isProtectedFromClose = true;
84+
});
85+
}
6486
getNavigatorLocks().request(name, async () => {
6587
await this.removePort();
6688
});

0 commit comments

Comments
 (0)