Skip to content

Commit a1aa18c

Browse files
authored
Dont block on crud uploads (#661)
1 parent a9f6eba commit a1aa18c

File tree

9 files changed

+129
-135
lines changed

9 files changed

+129
-135
lines changed

.changeset/orange-baboons-work.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/react-native': patch
3+
'@powersync/common': patch
4+
'@powersync/web': patch
5+
'@powersync/node': patch
6+
---
7+
8+
Fix sync stream delays during CRUD upload.

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
77
import { AbortOperation } from '../../../utils/AbortOperation.js';
88
import { DataStream } from '../../../utils/DataStream.js';
99
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
10-
import { StreamingSyncLine, StreamingSyncRequest } from './streaming-sync-types.js';
10+
import {
11+
StreamingSyncLine,
12+
StreamingSyncLineOrCrudUploadComplete,
13+
StreamingSyncRequest
14+
} from './streaming-sync-types.js';
1115
import { WebsocketClientTransport } from './WebsocketClientTransport.js';
1216

1317
export type BSONImplementation = typeof BSON;
@@ -267,15 +271,6 @@ export abstract class AbstractRemote {
267271
return new WebSocket(url);
268272
}
269273

270-
/**
271-
* Connects to the sync/stream websocket endpoint and delivers sync lines by decoding the BSON events
272-
* sent by the server.
273-
*/
274-
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
275-
const bson = await this.getBSON();
276-
return await this.socketStreamRaw(options, (data) => bson.deserialize(data) as StreamingSyncLine, bson);
277-
}
278-
279274
/**
280275
* Returns a data stream of sync line data.
281276
*
@@ -475,15 +470,6 @@ export abstract class AbstractRemote {
475470
return stream;
476471
}
477472

478-
/**
479-
* Connects to the sync/stream http endpoint, parsing lines as JSON.
480-
*/
481-
async postStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
482-
return await this.postStreamRaw(options, (line) => {
483-
return JSON.parse(line) as StreamingSyncLine;
484-
});
485-
}
486-
487473
/**
488474
* Connects to the sync/stream http endpoint, mapping and emitting each received string line.
489475
*/

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 76 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru
66
import * as sync_status from '../../../db/crud/SyncStatus.js';
77
import { AbortOperation } from '../../../utils/AbortOperation.js';
88
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
9-
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
9+
import { throttleLeadingTrailing } from '../../../utils/async.js';
1010
import {
1111
BucketChecksum,
1212
BucketDescription,
@@ -19,7 +19,9 @@ import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
1919
import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js';
2020
import {
2121
BucketRequest,
22+
CrudUploadNotification,
2223
StreamingSyncLine,
24+
StreamingSyncLineOrCrudUploadComplete,
2325
StreamingSyncRequestParameterType,
2426
isStreamingKeepalive,
2527
isStreamingSyncCheckpoint,
@@ -225,7 +227,7 @@ export abstract class AbstractStreamingSyncImplementation
225227
protected crudUpdateListener?: () => void;
226228
protected streamingSyncPromise?: Promise<void>;
227229

228-
private pendingCrudUpload?: Promise<void>;
230+
private isUploadingCrud: boolean = false;
229231
private notifyCompletedUploads?: () => void;
230232

231233
syncStatus: SyncStatus;
@@ -247,16 +249,14 @@ export abstract class AbstractStreamingSyncImplementation
247249
this.abortController = null;
248250

249251
this.triggerCrudUpload = throttleLeadingTrailing(() => {
250-
if (!this.syncStatus.connected || this.pendingCrudUpload != null) {
252+
if (!this.syncStatus.connected || this.isUploadingCrud) {
251253
return;
252254
}
253255

254-
this.pendingCrudUpload = new Promise((resolve) => {
255-
this._uploadAllCrud().finally(() => {
256-
this.notifyCompletedUploads?.();
257-
this.pendingCrudUpload = undefined;
258-
resolve();
259-
});
256+
this.isUploadingCrud = true;
257+
this._uploadAllCrud().finally(() => {
258+
this.notifyCompletedUploads?.();
259+
this.isUploadingCrud = false;
260260
});
261261
}, this.options.crudUploadThrottleMs!);
262262
}
@@ -532,6 +532,8 @@ The next upload iteration will be delayed.`);
532532
}
533533
});
534534
} finally {
535+
this.notifyCompletedUploads = undefined;
536+
535537
if (!signal.aborted) {
536538
nestedAbortController.abort(new AbortOperation('Closing sync stream network requests before retry.'));
537539
nestedAbortController = new AbortController();
@@ -617,10 +619,9 @@ The next upload iteration will be delayed.`);
617619
this.options.adapter.startSession();
618620
let [req, bucketMap] = await this.collectLocalBucketState();
619621

620-
// These are compared by reference
621622
let targetCheckpoint: Checkpoint | null = null;
622-
let validatedCheckpoint: Checkpoint | null = null;
623-
let appliedCheckpoint: Checkpoint | null = null;
623+
// A checkpoint that has been validated but not applied (e.g. due to pending local writes)
624+
let pendingValidatedCheckpoint: Checkpoint | null = null;
624625

625626
const clientId = await this.options.adapter.getClientId();
626627
const usingFixedKeyFormat = await this.requireKeyFormat(false);
@@ -639,25 +640,63 @@ The next upload iteration will be delayed.`);
639640
}
640641
};
641642

642-
let stream: DataStream<StreamingSyncLine>;
643+
let stream: DataStream<StreamingSyncLineOrCrudUploadComplete>;
643644
if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) {
644-
stream = await this.options.remote.postStream(syncOptions);
645-
} else {
646-
stream = await this.options.remote.socketStream({
647-
...syncOptions,
648-
...{ fetchStrategy: resolvedOptions.fetchStrategy }
645+
stream = await this.options.remote.postStreamRaw(syncOptions, (line: string | CrudUploadNotification) => {
646+
if (typeof line == 'string') {
647+
return JSON.parse(line) as StreamingSyncLine;
648+
} else {
649+
// Directly enqueued by us
650+
return line;
651+
}
649652
});
653+
} else {
654+
const bson = await this.options.remote.getBSON();
655+
stream = await this.options.remote.socketStreamRaw(
656+
{
657+
...syncOptions,
658+
...{ fetchStrategy: resolvedOptions.fetchStrategy }
659+
},
660+
(payload: Uint8Array | CrudUploadNotification) => {
661+
if (payload instanceof Uint8Array) {
662+
return bson.deserialize(payload) as StreamingSyncLine;
663+
} else {
664+
// Directly enqueued by us
665+
return payload;
666+
}
667+
},
668+
bson
669+
);
650670
}
651671

652672
this.logger.debug('Stream established. Processing events');
653673

674+
this.notifyCompletedUploads = () => {
675+
if (!stream.closed) {
676+
stream.enqueueData({ crud_upload_completed: null });
677+
}
678+
};
679+
654680
while (!stream.closed) {
655681
const line = await stream.read();
656682
if (!line) {
657683
// The stream has closed while waiting
658684
return;
659685
}
660686

687+
if ('crud_upload_completed' in line) {
688+
if (pendingValidatedCheckpoint != null) {
689+
const { applied, endIteration } = await this.applyCheckpoint(pendingValidatedCheckpoint);
690+
if (applied) {
691+
pendingValidatedCheckpoint = null;
692+
} else if (endIteration) {
693+
break;
694+
}
695+
}
696+
697+
continue;
698+
}
699+
661700
// A connection is active and messages are being received
662701
if (!this.syncStatus.connected) {
663702
// There is a connection now
@@ -686,13 +725,12 @@ The next upload iteration will be delayed.`);
686725
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
687726
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);
688727
} else if (isStreamingSyncCheckpointComplete(line)) {
689-
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
728+
const result = await this.applyCheckpoint(targetCheckpoint!);
690729
if (result.endIteration) {
691730
return;
692-
} else if (result.applied) {
693-
appliedCheckpoint = targetCheckpoint;
731+
} else if (!result.applied) {
732+
pendingValidatedCheckpoint = targetCheckpoint;
694733
}
695-
validatedCheckpoint = targetCheckpoint;
696734
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
697735
const priority = line.partial_checkpoint_complete.priority;
698736
this.logger.debug('Partial checkpoint complete', priority);
@@ -802,25 +840,7 @@ The next upload iteration will be delayed.`);
802840
}
803841
this.triggerCrudUpload();
804842
} else {
805-
this.logger.debug('Sync complete');
806-
807-
if (targetCheckpoint === appliedCheckpoint) {
808-
this.updateSyncStatus({
809-
connected: true,
810-
lastSyncedAt: new Date(),
811-
priorityStatusEntries: [],
812-
dataFlow: {
813-
downloadError: undefined
814-
}
815-
});
816-
} else if (validatedCheckpoint === targetCheckpoint) {
817-
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
818-
if (result.endIteration) {
819-
return;
820-
} else if (result.applied) {
821-
appliedCheckpoint = targetCheckpoint;
822-
}
823-
}
843+
this.logger.debug('Received unknown sync line', line);
824844
}
825845
}
826846
this.logger.debug('Stream input empty');
@@ -1059,50 +1079,35 @@ The next upload iteration will be delayed.`);
10591079
});
10601080
}
10611081

1062-
private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
1082+
private async applyCheckpoint(checkpoint: Checkpoint) {
10631083
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
1064-
const pending = this.pendingCrudUpload;
10651084

10661085
if (!result.checkpointValid) {
10671086
this.logger.debug('Checksum mismatch in checkpoint, will reconnect');
10681087
// This means checksums failed. Start again with a new checkpoint.
10691088
// TODO: better back-off
10701089
await new Promise((resolve) => setTimeout(resolve, 50));
10711090
return { applied: false, endIteration: true };
1072-
} else if (!result.ready && pending != null) {
1073-
// We have pending entries in the local upload queue or are waiting to confirm a write
1074-
// checkpoint, which prevented this checkpoint from applying. Wait for that to complete and
1075-
// try again.
1091+
} else if (!result.ready) {
10761092
this.logger.debug(
1077-
'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.'
1093+
'Could not apply checkpoint due to local data. We will retry applying the checkpoint after that upload is completed.'
10781094
);
1079-
await Promise.race([pending, onAbortPromise(abort)]);
1080-
1081-
if (abort.aborted) {
1082-
return { applied: false, endIteration: true };
1083-
}
10841095

1085-
// Try again now that uploads have completed.
1086-
result = await this.options.adapter.syncLocalDatabase(checkpoint);
1096+
return { applied: false, endIteration: false };
10871097
}
10881098

1089-
if (result.checkpointValid && result.ready) {
1090-
this.logger.debug('validated checkpoint', checkpoint);
1091-
this.updateSyncStatus({
1092-
connected: true,
1093-
lastSyncedAt: new Date(),
1094-
dataFlow: {
1095-
downloading: false,
1096-
downloadProgress: null,
1097-
downloadError: undefined
1098-
}
1099-
});
1099+
this.logger.debug('validated checkpoint', checkpoint);
1100+
this.updateSyncStatus({
1101+
connected: true,
1102+
lastSyncedAt: new Date(),
1103+
dataFlow: {
1104+
downloading: false,
1105+
downloadProgress: null,
1106+
downloadError: undefined
1107+
}
1108+
});
11001109

1101-
return { applied: true, endIteration: false };
1102-
} else {
1103-
this.logger.debug('Could not apply checkpoint. Waiting for next sync complete line.');
1104-
return { applied: false, endIteration: false };
1105-
}
1110+
return { applied: true, endIteration: false };
11061111
}
11071112

11081113
protected updateSyncStatus(options: SyncStatusOptions) {

packages/common/src/client/sync/stream/streaming-sync-types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ export type StreamingSyncLine =
136136
| StreamingSyncCheckpointPartiallyComplete
137137
| StreamingSyncKeepalive;
138138

139+
export type CrudUploadNotification = { crud_upload_completed: null };
140+
141+
export type StreamingSyncLineOrCrudUploadComplete = StreamingSyncLine | CrudUploadNotification;
142+
139143
export interface BucketRequest {
140144
name: string;
141145

packages/common/src/utils/async.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,3 @@ export function throttleLeadingTrailing(func: () => void, wait: number) {
4848
}
4949
};
5050
}
51-
52-
export function onAbortPromise(signal: AbortSignal): Promise<void> {
53-
return new Promise<void>((resolve) => {
54-
if (signal.aborted) {
55-
resolve();
56-
} else {
57-
signal.onabort = () => resolve();
58-
}
59-
});
60-
}

packages/react-native/src/sync/stream/ReactNativeRemote.ts

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import {
99
FetchImplementation,
1010
FetchImplementationProvider,
1111
RemoteConnector,
12-
SocketSyncStreamOptions,
13-
StreamingSyncLine,
1412
SyncStreamOptions
1513
} from '@powersync/common';
1614
import { Platform } from 'react-native';
@@ -57,11 +55,7 @@ export class ReactNativeRemote extends AbstractRemote {
5755
return BSON;
5856
}
5957

60-
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
61-
return super.socketStream(options);
62-
}
63-
64-
async postStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
58+
async postStreamRaw<T>(options: SyncStreamOptions, mapLine: (line: string) => T): Promise<DataStream<T>> {
6559
const timeout =
6660
Platform.OS == 'android'
6761
? setTimeout(() => {
@@ -74,19 +68,22 @@ export class ReactNativeRemote extends AbstractRemote {
7468
: null;
7569

7670
try {
77-
return await super.postStream({
78-
...options,
79-
fetchOptions: {
80-
...options.fetchOptions,
81-
/**
82-
* The `react-native-fetch-api` polyfill provides streaming support via
83-
* this non-standard flag
84-
* https://github.com/react-native-community/fetch#enable-text-streaming
85-
*/
86-
// @ts-expect-error https://github.com/react-native-community/fetch#enable-text-streaming
87-
reactNative: { textStreaming: true }
88-
}
89-
});
71+
return await super.postStreamRaw(
72+
{
73+
...options,
74+
fetchOptions: {
75+
...options.fetchOptions,
76+
/**
77+
* The `react-native-fetch-api` polyfill provides streaming support via
78+
* this non-standard flag
79+
* https://github.com/react-native-community/fetch#enable-text-streaming
80+
*/
81+
// @ts-expect-error https://github.com/react-native-community/fetch#enable-text-streaming
82+
reactNative: { textStreaming: true }
83+
}
84+
},
85+
mapLine
86+
);
9087
} finally {
9188
if (timeout) {
9289
clearTimeout(timeout);

0 commit comments

Comments
 (0)