Skip to content

Commit ffe3095

Browse files
authored
WebSocket keepalive changes (#648)
1 parent 31e942f commit ffe3095

File tree

7 files changed

+126
-73
lines changed

7 files changed

+126
-73
lines changed

.changeset/dull-mugs-carry.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@powersync/react-native': patch
3+
'@powersync/diagnostics-app': patch
4+
'@powersync/common': patch
5+
'@powersync/node': patch
6+
'@powersync/op-sqlite': patch
7+
'@powersync/web': patch
8+
---
9+
10+
Improve websocket keepalive logic to reduce keepalive errors.

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,5 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
106106
/**
107107
* Invokes the `powersync_control` function for the sync client.
108108
*/
109-
control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string>;
109+
control(op: PowerSyncControlCommand, payload: string | Uint8Array | null): Promise<string>;
110110
}

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
364364
// No-op for now
365365
}
366366

367-
async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise<string> {
367+
async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
368368
return await this.writeTransaction(async (tx) => {
369369
const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]);
370370
return raw;

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@ const SYNC_QUEUE_REQUEST_LOW_WATER = 5;
2424

2525
// Keep alive message is sent every period
2626
const KEEP_ALIVE_MS = 20_000;
27-
// The ACK must be received in this period
28-
const KEEP_ALIVE_LIFETIME_MS = 30_000;
27+
28+
// One message of any type must be received in this period.
29+
const SOCKET_TIMEOUT_MS = 30_000;
30+
31+
// One keepalive message must be received in this period.
32+
// If there is a backlog of messages (for example on slow connections), keepalive messages could be delayed
33+
// significantly. Therefore this is longer than the socket timeout.
34+
const KEEP_ALIVE_LIFETIME_MS = 90_000;
2935

3036
export const DEFAULT_REMOTE_LOGGER = Logger.get('PowerSyncRemote');
3137

@@ -267,7 +273,7 @@ export abstract class AbstractRemote {
267273
*/
268274
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
269275
const bson = await this.getBSON();
270-
return await this.socketStreamRaw(options, (data) => bson.deserialize(data), bson);
276+
return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson);
271277
}
272278

273279
/**
@@ -279,9 +285,9 @@ export abstract class AbstractRemote {
279285
*/
280286
async socketStreamRaw<T>(
281287
options: SocketSyncStreamOptions,
282-
map: (buffer: Buffer) => T,
288+
map: (buffer: Uint8Array) => T,
283289
bson?: typeof BSON
284-
): Promise<DataStream> {
290+
): Promise<DataStream<T>> {
285291
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
286292
const mimeType = bson == null ? 'application/json' : 'application/bson';
287293

@@ -304,12 +310,26 @@ export abstract class AbstractRemote {
304310
// automatically as a header.
305311
const userAgent = this.getUserAgent();
306312

313+
let keepAliveTimeout: any;
314+
const resetTimeout = () => {
315+
clearTimeout(keepAliveTimeout);
316+
keepAliveTimeout = setTimeout(() => {
317+
this.logger.error(`No data received on WebSocket in ${SOCKET_TIMEOUT_MS}ms, closing connection.`);
318+
stream.close();
319+
}, SOCKET_TIMEOUT_MS);
320+
};
321+
resetTimeout();
322+
307323
const url = this.options.socketUrlTransformer(request.url);
308324
const connector = new RSocketConnector({
309325
transport: new WebsocketClientTransport({
310326
url,
311327
wsCreator: (url) => {
312-
return this.createSocket(url);
328+
const socket = this.createSocket(url);
329+
socket.addEventListener('message', (event) => {
330+
resetTimeout();
331+
});
332+
return socket;
313333
}
314334
}),
315335
setup: {
@@ -332,18 +352,23 @@ export abstract class AbstractRemote {
332352
rsocket = await connector.connect();
333353
} catch (ex) {
334354
this.logger.error(`Failed to connect WebSocket`, ex);
355+
clearTimeout(keepAliveTimeout);
335356
throw ex;
336357
}
337358

338-
const stream = new DataStream({
359+
resetTimeout();
360+
361+
const stream = new DataStream<T, Uint8Array>({
339362
logger: this.logger,
340363
pressure: {
341364
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
342-
}
365+
},
366+
mapLine: map
343367
});
344368

345369
let socketIsClosed = false;
346370
const closeSocket = () => {
371+
clearTimeout(keepAliveTimeout);
347372
if (socketIsClosed) {
348373
return;
349374
}
@@ -411,7 +436,7 @@ export abstract class AbstractRemote {
411436
return;
412437
}
413438

414-
stream.enqueueData(map(data));
439+
stream.enqueueData(data);
415440
},
416441
onComplete: () => {
417442
stream.close();
@@ -537,8 +562,9 @@ export abstract class AbstractRemote {
537562
const decoder = new TextDecoder();
538563
let buffer = '';
539564

540-
const stream = new DataStream<T>({
541-
logger: this.logger
565+
const stream = new DataStream<T, string>({
566+
logger: this.logger,
567+
mapLine: mapLine
542568
});
543569

544570
const l = stream.registerListener({
@@ -550,7 +576,7 @@ export abstract class AbstractRemote {
550576
if (done) {
551577
const remaining = buffer.trim();
552578
if (remaining.length != 0) {
553-
stream.enqueueData(mapLine(remaining));
579+
stream.enqueueData(remaining);
554580
}
555581

556582
stream.close();
@@ -565,7 +591,7 @@ export abstract class AbstractRemote {
565591
for (var i = 0; i < lines.length - 1; i++) {
566592
var l = lines[i].trim();
567593
if (l.length > 0) {
568-
stream.enqueueData(mapLine(l));
594+
stream.enqueueData(l);
569595
didCompleteLine = true;
570596
}
571597
}

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -848,10 +848,7 @@ The next upload iteration will be delayed.`);
848848
// Pending sync lines received from the service, as well as local events that trigger a powersync_control
849849
// invocation (local events include refreshed tokens and completed uploads).
850850
// This is a single data stream so that we can handle all control calls from a single place.
851-
let controlInvocations: DataStream<{
852-
command: PowerSyncControlCommand;
853-
payload?: ArrayBuffer | string;
854-
}> | null = null;
851+
let controlInvocations: DataStream<EnqueuedCommand, Uint8Array | EnqueuedCommand> | null = null;
855852

856853
async function connect(instr: EstablishSyncStream) {
857854
const syncOptions: SyncStreamOptions = {
@@ -861,20 +858,34 @@ The next upload iteration will be delayed.`);
861858
};
862859

863860
if (resolvedOptions.connectionMethod == SyncStreamConnectionMethod.HTTP) {
864-
controlInvocations = await remote.postStreamRaw(syncOptions, (line) => ({
865-
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
866-
payload: line
867-
}));
861+
controlInvocations = await remote.postStreamRaw(syncOptions, (line: string | EnqueuedCommand) => {
862+
if (typeof line == 'string') {
863+
return {
864+
command: PowerSyncControlCommand.PROCESS_TEXT_LINE,
865+
payload: line
866+
};
867+
} else {
868+
// Directly enqueued by us
869+
return line;
870+
}
871+
});
868872
} else {
869873
controlInvocations = await remote.socketStreamRaw(
870874
{
871875
...syncOptions,
872876
fetchStrategy: resolvedOptions.fetchStrategy
873877
},
874-
(buffer) => ({
875-
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
876-
payload: buffer
877-
})
878+
(payload: Uint8Array | EnqueuedCommand) => {
879+
if (payload instanceof Uint8Array) {
880+
return {
881+
command: PowerSyncControlCommand.PROCESS_BSON_LINE,
882+
payload: payload
883+
};
884+
} else {
885+
// Directly enqueued by us
886+
return payload;
887+
}
888+
}
878889
);
879890
}
880891

@@ -906,7 +917,7 @@ The next upload iteration will be delayed.`);
906917
await control(PowerSyncControlCommand.STOP);
907918
}
908919

909-
async function control(op: PowerSyncControlCommand, payload?: ArrayBuffer | string) {
920+
async function control(op: PowerSyncControlCommand, payload?: Uint8Array | string) {
910921
const rawResponse = await adapter.control(op, payload ?? null);
911922
await handleInstructions(JSON.parse(rawResponse));
912923
}
@@ -1145,3 +1156,8 @@ The next upload iteration will be delayed.`);
11451156
});
11461157
}
11471158
}
1159+
1160+
interface EnqueuedCommand {
1161+
command: PowerSyncControlCommand;
1162+
payload?: Uint8Array | string;
1163+
}

packages/common/src/utils/DataStream.ts

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import Logger, { ILogger } from 'js-logger';
22
import { BaseListener, BaseObserver } from './BaseObserver.js';
33

4-
export type DataStreamOptions = {
4+
export type DataStreamOptions<ParsedData, SourceData> = {
5+
mapLine?: (line: SourceData) => ParsedData;
6+
57
/**
68
* Close the stream if any consumer throws an error
79
*/
@@ -33,8 +35,8 @@ export const DEFAULT_PRESSURE_LIMITS = {
3335
* native JS streams or async iterators.
3436
* This is handy for environments such as React Native which need polyfills for the above.
3537
*/
36-
export class DataStream<Data extends any = any> extends BaseObserver<DataStreamListener<Data>> {
37-
dataQueue: Data[];
38+
export class DataStream<ParsedData, SourceData = any> extends BaseObserver<DataStreamListener<ParsedData>> {
39+
dataQueue: SourceData[];
3840

3941
protected isClosed: boolean;
4042

@@ -43,11 +45,14 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
4345

4446
protected logger: ILogger;
4547

46-
constructor(protected options?: DataStreamOptions) {
48+
protected mapLine: (line: SourceData) => ParsedData;
49+
50+
constructor(protected options?: DataStreamOptions<ParsedData, SourceData>) {
4751
super();
4852
this.processingPromise = null;
4953
this.isClosed = false;
5054
this.dataQueue = [];
55+
this.mapLine = options?.mapLine ?? ((line) => line as any);
5156

5257
this.logger = options?.logger ?? Logger.get('DataStream');
5358

@@ -85,7 +90,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
8590
/**
8691
* Enqueues data for the consumers to read
8792
*/
88-
enqueueData(data: Data) {
93+
enqueueData(data: SourceData) {
8994
if (this.isClosed) {
9095
throw new Error('Cannot enqueue data into closed stream.');
9196
}
@@ -100,7 +105,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
100105
* Reads data once from the data stream
101106
* @returns a Data payload or Null if the stream closed.
102107
*/
103-
async read(): Promise<Data | null> {
108+
async read(): Promise<ParsedData | null> {
104109
if (this.closed) {
105110
return null;
106111
}
@@ -129,7 +134,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
129134
/**
130135
* Executes a callback for each data item in the stream
131136
*/
132-
forEach(callback: DataStreamCallback<Data>) {
137+
forEach(callback: DataStreamCallback<ParsedData>) {
133138
if (this.dataQueue.length <= this.lowWatermark) {
134139
this.iterateAsyncErrored(async (l) => l.lowWater?.());
135140
}
@@ -139,58 +144,40 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
139144
});
140145
}
141146

142-
protected async processQueue() {
147+
protected processQueue() {
143148
if (this.processingPromise) {
144149
return;
145150
}
146151

147-
/**
148-
* Allow listeners to mutate the queue before processing.
149-
* This allows for operations such as dropping or compressing data
150-
* on high water or requesting more data on low water.
151-
*/
152-
if (this.dataQueue.length >= this.highWatermark) {
153-
await this.iterateAsyncErrored(async (l) => l.highWater?.());
154-
}
155-
156152
const promise = (this.processingPromise = this._processQueue());
157153
promise.finally(() => {
158154
return (this.processingPromise = null);
159155
});
160156
return promise;
161157
}
162158

163-
/**
164-
* Creates a new data stream which is a map of the original
165-
*/
166-
map<ReturnData>(callback: (data: Data) => ReturnData): DataStream<ReturnData> {
167-
const stream = new DataStream(this.options);
168-
const l = this.registerListener({
169-
data: async (data) => {
170-
stream.enqueueData(callback(data));
171-
},
172-
closed: () => {
173-
stream.close();
174-
l?.();
175-
}
176-
});
177-
178-
return stream;
179-
}
180-
181159
protected hasDataReader() {
182160
return Array.from(this.listeners.values()).some((l) => !!l.data);
183161
}
184162

185163
protected async _processQueue() {
164+
/**
165+
* Allow listeners to mutate the queue before processing.
166+
* This allows for operations such as dropping or compressing data
167+
* on high water or requesting more data on low water.
168+
*/
169+
if (this.dataQueue.length >= this.highWatermark) {
170+
await this.iterateAsyncErrored(async (l) => l.highWater?.());
171+
}
172+
186173
if (this.isClosed || !this.hasDataReader()) {
187-
await Promise.resolve();
188174
return;
189175
}
190176

191177
if (this.dataQueue.length) {
192178
const data = this.dataQueue.shift()!;
193-
await this.iterateAsyncErrored(async (l) => l.data?.(data));
179+
const mapped = this.mapLine(data);
180+
await this.iterateAsyncErrored(async (l) => l.data?.(mapped));
194181
}
195182

196183
if (this.dataQueue.length <= this.lowWatermark) {
@@ -202,14 +189,17 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
202189
this.notifyDataAdded = null;
203190
}
204191

205-
if (this.dataQueue.length) {
192+
if (this.dataQueue.length > 0) {
206193
// Next tick
207194
setTimeout(() => this.processQueue());
208195
}
209196
}
210197

211-
protected async iterateAsyncErrored(cb: (l: BaseListener) => Promise<void>) {
212-
for (let i of Array.from(this.listeners.values())) {
198+
protected async iterateAsyncErrored(cb: (l: Partial<DataStreamListener<ParsedData>>) => Promise<void>) {
199+
// Important: We need to copy the listeners, as calling a listener could result in adding another
200+
// listener, resulting in infinite loops.
201+
const listeners = Array.from(this.listeners.values());
202+
for (let i of listeners) {
213203
try {
214204
await cb(i);
215205
} catch (ex) {

0 commit comments

Comments
 (0)