Skip to content

Commit 58f412b

Browse files
Merge branch 'dead-tab-detection' into fix/opfs-multitab-issue
2 parents e61059a + 9730337 commit 58f412b

File tree

7 files changed

+808
-35
lines changed

7 files changed

+808
-35
lines changed

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import {
66
SyncStatusOptions
77
} from '@powersync/common';
88
import * as Comlink from 'comlink';
9+
import { getNavigatorLocks } from '../../shared/navigator';
910
import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider';
1011
import { ManualSharedSyncPayload, SharedSyncClientEvent } from '../../worker/sync/SharedSyncImplementation';
11-
import { DEFAULT_CACHE_SIZE_KB, resolveWebSQLFlags, TemporaryStorageOption } from '../adapters/web-sql-flags';
12+
import { WorkerClient } from '../../worker/sync/WorkerClient';
1213
import { WebDBAdapter } from '../adapters/WebDBAdapter';
14+
import { DEFAULT_CACHE_SIZE_KB, TemporaryStorageOption, resolveWebSQLFlags } from '../adapters/web-sql-flags';
1315
import {
1416
WebStreamingSyncImplementation,
1517
WebStreamingSyncImplementationOptions
1618
} from './WebStreamingSyncImplementation';
17-
import { WorkerClient } from '../../worker/sync/WorkerClient';
18-
import { getNavigatorLocks } from '../../shared/navigator';
1919

2020
/**
2121
* The shared worker will trigger methods on this side of the message port
@@ -160,6 +160,21 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
160160
const { crudUploadThrottleMs, identifier, retryDelayMs } = this.options;
161161
const flags = { ...this.webOptions.flags, workers: undefined };
162162

163+
// Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which
164+
// will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker
165+
// to free resources associated with this tab.
166+
// We take hold of this lock as soon-as-possible in order to cater for potentially closed tabs.
167+
getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => {
168+
if (!this.abortOnClose.signal.aborted) {
169+
// Awaiting here ensures the worker is waiting for the lock
170+
await this.syncManager.addLockBasedCloseSignal(lock!.name);
171+
172+
await new Promise<void>((r) => {
173+
this.abortOnClose.signal.onabort = () => r();
174+
});
175+
}
176+
});
177+
163178
this.isInitialized = this.syncManager.setParams(
164179
{
165180
dbParams: this.dbAdapter.getConfiguration(),
@@ -190,19 +205,6 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
190205
* This performs bi-directional method calling.
191206
*/
192207
Comlink.expose(this.clientProvider, this.messagePort);
193-
194-
// Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which
195-
// will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker
196-
// to free resources associated with this tab.
197-
getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => {
198-
if (!this.abortOnClose.signal.aborted) {
199-
this.syncManager.addLockBasedCloseSignal(lock!.name);
200-
201-
await new Promise<void>((r) => {
202-
this.abortOnClose.signal.onabort = () => r();
203-
});
204-
}
205-
});
206208
}
207209

208210
/**

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,22 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
322322
});
323323

324324
const shouldReconnect = !!this.connectionManager.syncStreamImplementation && this.ports.length > 0;
325+
326+
/**
327+
* If the current database adapter is the one that is being closed, we need to disconnect from the backend.
328+
* We can disconnect in the portMutex lock. This ensures the disconnect is not affected by potential other
329+
* connect operations coming from other tabs.
330+
*/
331+
if (this.dbAdapter && this.dbAdapter == trackedPort.db) {
332+
this.logger.debug(`Disconnecting due to closed database: should reconnect: ${shouldReconnect}`);
333+
this.dbAdapter = null;
334+
// Unconditionally close the connection because the database it's writing to has just been closed.
335+
// The connection has been closed previously, this might throw. We should be able to ignore it.
336+
await this.connectionManager
337+
.disconnect()
338+
.catch((ex) => this.logger.warn('Error while disconnecting. Will attempt to reconnect.', ex));
339+
}
340+
325341
return {
326342
shouldReconnect,
327343
trackedPort
@@ -337,19 +353,8 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
337353
await closeListener();
338354
}
339355

340-
if (this.dbAdapter && this.dbAdapter == trackedPort.db) {
341-
// Unconditionally close the connection because the database it's writing to has just been closed.
342-
// The connection has been closed previously, this might throw. We should be able to ignore it.
343-
await this.connectionManager
344-
.disconnect()
345-
.catch((ex) => this.logger.warn('Error while disconnecting. Will attempt to reconnect.', ex));
346-
347-
// Clearing the adapter will result in a new one being opened in connect
348-
this.dbAdapter = null;
349-
350-
if (shouldReconnect) {
351-
await this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions ?? {});
352-
}
356+
if (shouldReconnect) {
357+
await this.connectionManager.connect(CONNECTOR_PLACEHOLDER, this.lastConnectOptions ?? {});
353358
}
354359

355360
// Re-index subscriptions, the subscriptions of the removed port would no longer be considered.
@@ -487,6 +492,11 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
487492
});
488493
lastClient.closeListeners.push(async () => {
489494
this.logger.info('Aborting open connection because associated tab closed.');
495+
/**
496+
* Don't await this close operation. It might never resolve if the tab is closed.
497+
* We run the close operation first, before marking the remote as closed. This gives the database some chance
498+
* to close the connection.
499+
*/
490500
wrapped.close().catch((ex) => this.logger.warn('error closing database connection', ex));
491501
wrapped.markRemoteClosed();
492502
});

packages/web/src/worker/sync/WorkerClient.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
import { ILogLevel, PowerSyncConnectionOptions, SubscribedStream, SyncStatusOptions } from '@powersync/common';
12
import * as Comlink from 'comlink';
3+
import { getNavigatorLocks } from '../../shared/navigator';
24
import {
35
ManualSharedSyncPayload,
46
SharedSyncClientEvent,
57
SharedSyncImplementation,
68
SharedSyncInitOptions,
79
WrappedSyncPort
810
} from './SharedSyncImplementation';
9-
import { ILogLevel, PowerSyncConnectionOptions, SubscribedStream, SyncStatusOptions } from '@powersync/common';
10-
import { getNavigatorLocks } from '../../shared/navigator';
1111

1212
/**
1313
* A client to the shared sync worker.
@@ -21,7 +21,9 @@ export class WorkerClient {
2121
constructor(
2222
private readonly sync: SharedSyncImplementation,
2323
private readonly port: MessagePort
24-
) {}
24+
) {
25+
Comlink.expose(this, this.port);
26+
}
2527

2628
async initialize() {
2729
/**
@@ -36,7 +38,6 @@ export class WorkerClient {
3638
});
3739

3840
this.resolvedPort = await this.sync.addPort(this.port);
39-
Comlink.expose(this, this.port);
4041
}
4142

4243
private async removePort() {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import {
2+
AbstractRemote,
3+
AbstractRemoteOptions,
4+
BSONImplementation,
5+
DEFAULT_REMOTE_LOGGER,
6+
FetchImplementation,
7+
FetchImplementationProvider,
8+
ILogger,
9+
RemoteConnector
10+
} from '@powersync/common';
11+
12+
/**
13+
* Mock WebRemote that throws 401 Unauthorized errors for all HTTP requests.
14+
* Used for testing error handling in the shared sync worker.
15+
* Other tests may override this for managed streams.
16+
*/
17+
class MockFetchProvider extends FetchImplementationProvider {
18+
getFetch(): FetchImplementation {
19+
// Return a mock fetch that always returns 401
20+
return async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
21+
const response = new Response(null, {
22+
status: 401,
23+
statusText: 'Unauthorized'
24+
});
25+
return response;
26+
};
27+
}
28+
}
29+
30+
export class WebRemote extends AbstractRemote {
31+
private _bson: BSONImplementation | undefined;
32+
33+
constructor(
34+
protected connector: RemoteConnector,
35+
protected logger: ILogger = DEFAULT_REMOTE_LOGGER,
36+
options?: Partial<AbstractRemoteOptions>
37+
) {
38+
super(connector, logger, {
39+
...(options ?? {}),
40+
fetchImplementation: options?.fetchImplementation ?? new MockFetchProvider()
41+
});
42+
}
43+
44+
getUserAgent(): string {
45+
return 'powersync-web-mock';
46+
}
47+
48+
async getBSON(): Promise<BSONImplementation> {
49+
if (this._bson) {
50+
return this._bson;
51+
}
52+
const { BSON } = await import('bson');
53+
this._bson = BSON;
54+
return this._bson;
55+
}
56+
}

0 commit comments

Comments
 (0)