Skip to content

Commit 893d42b

Browse files
feat: Add FetchStrategy option to connect() for WebSockets (#494)
Co-authored-by: stevensJourney <steven@journeyapps.com>
1 parent c68675a commit 893d42b

File tree

4 files changed

+56
-18
lines changed

4 files changed

+56
-18
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/react-native': minor
3+
'@powersync/common': minor
4+
---
5+
6+
Introduced `fetchStrategy` option to connect, allowing you to choose either `buffered` or `sequential` for the Websocket connect option. Internally the functionality of `buffered` was used by default, but now it can be switched to the sequential mode. This changes the WebSocket sync queue to only process one sync event at a time, improving known keep-alive issues for lower-end hardware with minimal impact on sync performance.

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ export type RemoteConnector = {
2121

2222
// Refresh at least 30 sec before it expires
2323
const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
24-
const SYNC_QUEUE_REQUEST_N = 10;
2524
const SYNC_QUEUE_REQUEST_LOW_WATER = 5;
2625

2726
// Keep alive message is sent every period
@@ -39,6 +38,24 @@ export type SyncStreamOptions = {
3938
fetchOptions?: Request;
4039
};
4140

41+
export enum FetchStrategy {
42+
/**
43+
* Queues multiple sync events before processing, reducing round-trips.
44+
* This comes at the cost of more processing overhead, which may cause ACK timeouts on older/weaker devices for big enough datasets.
45+
*/
46+
Buffered = 'buffered',
47+
48+
/**
49+
* Processes each sync event immediately before requesting the next.
50+
* This reduces processing overhead and improves real-time responsiveness.
51+
*/
52+
Sequential = 'sequential'
53+
}
54+
55+
export type SocketSyncStreamOptions = SyncStreamOptions & {
56+
fetchStrategy: FetchStrategy;
57+
};
58+
4259
export type FetchImplementation = typeof fetch;
4360

4461
/**
@@ -216,8 +233,10 @@ export abstract class AbstractRemote {
216233
/**
217234
* Connects to the sync/stream websocket endpoint
218235
*/
219-
async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
220-
const { path } = options;
236+
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
237+
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
238+
239+
const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1;
221240
const request = await this.buildRequest(path);
222241

223242
const bson = await this.getBSON();
@@ -277,7 +296,7 @@ export abstract class AbstractRemote {
277296
// Helps to prevent double close scenarios
278297
rsocket.onClose(() => (socketIsClosed = true));
279298
// We initially request this amount and expect these to arrive eventually
280-
let pendingEventsCount = SYNC_QUEUE_REQUEST_N;
299+
let pendingEventsCount = syncQueueRequestSize;
281300

282301
const disposeClosedListener = stream.registerListener({
283302
closed: () => {
@@ -298,7 +317,7 @@ export abstract class AbstractRemote {
298317
})
299318
)
300319
},
301-
SYNC_QUEUE_REQUEST_N, // The initial N amount
320+
syncQueueRequestSize, // The initial N amount
302321
{
303322
onError: (e) => {
304323
// Don't log closed as an error
@@ -340,10 +359,10 @@ export abstract class AbstractRemote {
340359
const l = stream.registerListener({
341360
lowWater: async () => {
342361
// Request to fill up the queue
343-
const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount;
362+
const required = syncQueueRequestSize - pendingEventsCount;
344363
if (required > 0) {
345-
socket.request(SYNC_QUEUE_REQUEST_N - pendingEventsCount);
346-
pendingEventsCount = SYNC_QUEUE_REQUEST_N;
364+
socket.request(syncQueueRequestSize - pendingEventsCount);
365+
pendingEventsCount = syncQueueRequestSize;
347366
}
348367
},
349368
closed: () => {

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@ import { throttleLeadingTrailing } from '../../../utils/throttle.js';
77
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
88
import { CrudEntry } from '../bucket/CrudEntry.js';
99
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
10-
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote.js';
10+
import { AbstractRemote, SyncStreamOptions, FetchStrategy } from './AbstractRemote.js';
1111
import {
1212
BucketRequest,
13+
StreamingSyncLine,
1314
StreamingSyncRequestParameterType,
1415
isStreamingKeepalive,
1516
isStreamingSyncCheckpoint,
1617
isStreamingSyncCheckpointComplete,
1718
isStreamingSyncCheckpointDiff,
1819
isStreamingSyncData
1920
} from './streaming-sync-types.js';
21+
import { DataStream } from 'src/utils/DataStream.js';
2022

2123
export enum LockType {
2224
CRUD = 'crud',
@@ -67,7 +69,7 @@ export interface StreamingSyncImplementationListener extends BaseListener {
6769
*/
6870
export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {}
6971

70-
/** @internal */
72+
/** @internal */
7173
export interface BaseConnectionOptions {
7274
/**
7375
* The connection method to use when streaming updates from
@@ -76,13 +78,18 @@ export interface BaseConnectionOptions {
7678
*/
7779
connectionMethod?: SyncStreamConnectionMethod;
7880

81+
/**
82+
* The fetch strategy to use when streaming updates from the PowerSync backend instance.
83+
*/
84+
fetchStrategy?: FetchStrategy;
85+
7986
/**
8087
* These parameters are passed to the sync rules, and will be available under the`user_parameters` object.
8188
*/
8289
params?: Record<string, StreamingSyncRequestParameterType>;
8390
}
8491

85-
/** @internal */
92+
/** @internal */
8693
export interface AdditionalConnectionOptions {
8794
/**
8895
* Delay for retrying sync streaming operations
@@ -97,9 +104,8 @@ export interface AdditionalConnectionOptions {
97104
crudUploadThrottleMs?: number;
98105
}
99106

100-
101107
/** @internal */
102-
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>
108+
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>;
103109

104110
export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
105111
/**
@@ -134,6 +140,7 @@ export type RequiredPowerSyncConnectionOptions = Required<BaseConnectionOptions>
134140

135141
export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = {
136142
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
143+
fetchStrategy: FetchStrategy.Buffered,
137144
params: {}
138145
};
139146

@@ -496,10 +503,15 @@ The next upload iteration will be delayed.`);
496503
}
497504
};
498505

499-
const stream =
500-
resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP
501-
? await this.options.remote.postStream(syncOptions)
502-
: await this.options.remote.socketStream(syncOptions);
506+
let stream: DataStream<StreamingSyncLine>;
507+
if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) {
508+
stream = await this.options.remote.postStream(syncOptions);
509+
} else {
510+
stream = await this.options.remote.socketStream({
511+
...syncOptions,
512+
...{ fetchStrategy: resolvedOptions.fetchStrategy }
513+
});
514+
}
503515

504516
this.logger.debug('Stream established. Processing events');
505517

packages/react-native/src/sync/stream/ReactNativeRemote.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
FetchImplementation,
1010
FetchImplementationProvider,
1111
RemoteConnector,
12+
SocketSyncStreamOptions,
1213
StreamingSyncLine,
1314
SyncStreamOptions
1415
} from '@powersync/common';
@@ -56,7 +57,7 @@ export class ReactNativeRemote extends AbstractRemote {
5657
return BSON;
5758
}
5859

59-
async socketStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
60+
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
6061
return super.socketStream(options);
6162
}
6263

0 commit comments

Comments
 (0)