Skip to content

Commit ce40042

Browse files
committed
Diagnostics app: Support Rust client
1 parent b1aca34 commit ce40042

File tree

9 files changed

+218
-9
lines changed

9 files changed

+218
-9
lines changed

.changeset/fuzzy-buses-own.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Add `clientImplementation` field to `SyncStatus`.

.changeset/large-toes-drive.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/diagnostics-app': patch
3+
---
4+
5+
Support Rust client.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,10 @@ The next upload iteration will be delayed.`);
632632
...DEFAULT_STREAM_CONNECTION_OPTIONS,
633633
...(options ?? {})
634634
};
635+
const clientImplementation = resolvedOptions.clientImplementation;
636+
this.updateSyncStatus({ clientImplementation });
635637

636-
if (resolvedOptions.clientImplementation == SyncClientImplementation.JAVASCRIPT) {
638+
if (clientImplementation == SyncClientImplementation.JAVASCRIPT) {
637639
await this.legacyStreamingSyncIteration(signal, resolvedOptions);
638640
} else {
639641
await this.requireKeyFormat(true);
@@ -1168,7 +1170,8 @@ The next upload iteration will be delayed.`);
11681170
...this.syncStatus.dataFlowStatus,
11691171
...options.dataFlow
11701172
},
1171-
priorityStatusEntries: options.priorityStatusEntries ?? this.syncStatus.priorityStatusEntries
1173+
priorityStatusEntries: options.priorityStatusEntries ?? this.syncStatus.priorityStatusEntries,
1174+
clientImplementation: options.clientImplementation ?? this.syncStatus.clientImplementation
11721175
});
11731176

11741177
if (!this.syncStatus.isEqual(updatedStatus)) {

packages/common/src/db/crud/SyncStatus.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { SyncClientImplementation } from 'src/client/sync/stream/AbstractStreamingSyncImplementation.js';
12
import { InternalProgressInformation, SyncProgress } from './SyncProgress.js';
23

34
export type SyncDataFlowStatus = Partial<{
@@ -35,11 +36,22 @@ export type SyncStatusOptions = {
3536
lastSyncedAt?: Date;
3637
hasSynced?: boolean;
3738
priorityStatusEntries?: SyncPriorityStatus[];
39+
clientImplementation?: SyncClientImplementation;
3840
};
3941

4042
export class SyncStatus {
4143
constructor(protected options: SyncStatusOptions) {}
4244

45+
/**
46+
* Returns the used sync client implementation (either the one implemented in JavaScript or the newer Rust-based
47+
* implementation).
48+
*
49+
* This information is only available after a connection has been requested.
50+
*/
51+
get clientImplementation() {
52+
return this.options.clientImplementation;
53+
}
54+
4355
/**
4456
* Indicates if the client is currently connected to the PowerSync service.
4557
*

tools/diagnostics-app/src/app/views/layout.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ export default function ViewsLayout({ children }: { children: React.ReactNode })
155155
<Box sx={{ flexGrow: 1 }}>
156156
<Typography>{title}</Typography>
157157
</Box>
158+
{syncStatus?.clientImplementation && <Typography>Client: {syncStatus?.clientImplementation}</Typography>}
158159
<NorthIcon
159160
sx={{ marginRight: '-10px' }}
160161
color={syncStatus?.dataFlowStatus.uploading ? 'primary' : 'inherit'}

tools/diagnostics-app/src/components/widgets/LoginDetailsWidget.tsx

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,23 @@
11
import React from 'react';
2-
import { Box, Button, ButtonGroup, FormGroup, Paper, TextField, Typography, styled } from '@mui/material';
2+
import {
3+
Box,
4+
Button,
5+
ButtonGroup,
6+
FormControlLabel,
7+
FormGroup,
8+
Paper,
9+
Switch,
10+
TextField,
11+
Typography,
12+
styled
13+
} from '@mui/material';
314
import { Formik, FormikErrors } from 'formik';
15+
import { SyncClientImplementation } from '@powersync/web';
416

517
export type LoginDetailsFormValues = {
618
token: string;
719
endpoint: string;
20+
clientImplementation: SyncClientImplementation;
821
};
922

1023
export type LoginAction = {
@@ -25,7 +38,7 @@ export const LoginDetailsWidget: React.FC<LoginDetailsWidgetProps> = (props) =>
2538
<S.Logo alt="PowerSync Logo" width={400} height={100} src="/powersync-logo.svg" />
2639
</S.LogoBox>
2740
<Formik<LoginDetailsFormValues>
28-
initialValues={{ token: '', endpoint: '' }}
41+
initialValues={{ token: '', endpoint: '', clientImplementation: SyncClientImplementation.RUST }}
2942
validateOnChange={false}
3043
validateOnBlur={false}
3144
validate={(values) => {
@@ -44,15 +57,16 @@ export const LoginDetailsWidget: React.FC<LoginDetailsWidgetProps> = (props) =>
4457
}
4558
await props.onSubmit({
4659
token: values.token,
47-
endpoint
60+
endpoint,
61+
clientImplementation: values.clientImplementation
4862
});
4963
} catch (ex: any) {
5064
console.error(ex);
5165
setSubmitting(false);
5266
setFieldError('endpoint', ex.message);
5367
}
5468
}}>
55-
{({ values, errors, handleChange, handleBlur, isSubmitting, handleSubmit }) => (
69+
{({ values, errors, handleChange, handleBlur, isSubmitting, handleSubmit, setFieldValue }) => (
5670
<form onSubmit={handleSubmit}>
5771
<FormGroup>
5872
<S.TextInput
@@ -84,6 +98,34 @@ export const LoginDetailsWidget: React.FC<LoginDetailsWidgetProps> = (props) =>
8498
/>
8599
</FormGroup>
86100
<S.ActionButtonGroup>
101+
<FormControlLabel
102+
control={
103+
<Switch
104+
checked={values.clientImplementation == SyncClientImplementation.RUST}
105+
onChange={() =>
106+
setFieldValue(
107+
'clientImplementation',
108+
values.clientImplementation == SyncClientImplementation.RUST
109+
? SyncClientImplementation.JAVASCRIPT
110+
: SyncClientImplementation.RUST
111+
)
112+
}
113+
/>
114+
}
115+
label={
116+
<span>
117+
Rust sync client (
118+
<a
119+
style={{ color: 'lightblue' }}
120+
target="_blank"
121+
href="https://releases.powersync.com/announcements/improved-sync-performance-in-our-client-sdks">
122+
what's that?
123+
</a>
124+
)
125+
</span>
126+
}
127+
/>
128+
87129
<Button variant="outlined" type="submit" disabled={isSubmitting}>
88130
Proceed
89131
</Button>
@@ -143,7 +185,7 @@ namespace S {
143185
margin-top: 20px;
144186
width: 100%;
145187
display: flex;
146-
justify-content: end;
188+
justify-content: space-between;
147189
`;
148190

149191
export const TextInput = styled(TextField)`

tools/diagnostics-app/src/library/powersync/ConnectionManager.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
createBaseLogger,
44
LogLevel,
55
PowerSyncDatabase,
6+
SyncClientImplementation,
67
TemporaryStorageOption,
78
WASQLiteOpenFactory,
89
WASQLiteVFS,
@@ -14,6 +15,7 @@ import { safeParse } from '../safeParse/safeParse';
1415
import { DynamicSchemaManager } from './DynamicSchemaManager';
1516
import { RecordingStorageAdapter } from './RecordingStorageAdapter';
1617
import { TokenConnector } from './TokenConnector';
18+
import { RustClientInterceptor } from './RustClientInterceptor';
1719

1820
const baseLogger = createBaseLogger();
1921
baseLogger.useDefaults();
@@ -57,9 +59,19 @@ if (connector.hasCredentials()) {
5759
}
5860

5961
export async function connect() {
62+
const client =
63+
localStorage.getItem('preferred_client_implementation') == SyncClientImplementation.RUST
64+
? SyncClientImplementation.RUST
65+
: SyncClientImplementation.JAVASCRIPT;
66+
6067
const params = getParams();
6168
await sync?.disconnect();
6269
const remote = new WebRemote(connector);
70+
const adapter =
71+
client == SyncClientImplementation.JAVASCRIPT
72+
? new RecordingStorageAdapter(db.database, schemaManager)
73+
: new RustClientInterceptor(db.database, remote, schemaManager);
74+
6375
const syncOptions: WebStreamingSyncImplementationOptions = {
6476
adapter,
6577
remote,
@@ -69,7 +81,7 @@ export async function connect() {
6981
identifier: 'diagnostics'
7082
};
7183
sync = new WebStreamingSyncImplementation(syncOptions);
72-
await sync.connect({ params });
84+
await sync.connect({ params, clientImplementation: client });
7385
if (!sync.syncStatus.connected) {
7486
const error = sync.syncStatus.dataFlowStatus.downloadError ?? new Error('Failed to connect');
7587
// Disconnect but don't wait for it
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import {
2+
AbstractPowerSyncDatabase,
3+
AbstractRemote,
4+
BucketChecksum,
5+
Checkpoint,
6+
ColumnType,
7+
DBAdapter,
8+
isStreamingSyncCheckpoint,
9+
isStreamingSyncCheckpointDiff,
10+
isStreamingSyncData,
11+
PowerSyncControlCommand,
12+
SqliteBucketStorage,
13+
StreamingSyncLine,
14+
SyncDataBucket
15+
} from '@powersync/web';
16+
import { DynamicSchemaManager } from './DynamicSchemaManager';
17+
18+
/**
19+
* Tracks per-byte and per-operation progress for the Rust client.
20+
*
21+
* While per-operation progress is reported by the SDK as well, we need those counters for each bucket. Since that
22+
* information is internal to the Rust client and inaccessible to JavaScript, this intercepts the raw
23+
* `powersync_control` calls to decode sync lines and derive progress information.
24+
*/
25+
export class RustClientInterceptor extends SqliteBucketStorage {
26+
private rdb: DBAdapter;
27+
private lastStartedCheckpoint: Checkpoint | null = null;
28+
29+
public tables: Record<string, Record<string, ColumnType>> = {};
30+
31+
constructor(
32+
db: DBAdapter,
33+
private remote: AbstractRemote,
34+
private schemaManager: DynamicSchemaManager
35+
) {
36+
super(db, (AbstractPowerSyncDatabase as any).transactionMutex);
37+
this.rdb = db;
38+
}
39+
40+
async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> {
41+
const response = await super.control(op, payload);
42+
43+
if (op == PowerSyncControlCommand.PROCESS_TEXT_LINE) {
44+
await this.processTextLine(payload as string);
45+
} else if (op == PowerSyncControlCommand.PROCESS_BSON_LINE) {
46+
await this.processBinaryLine(payload as Uint8Array);
47+
}
48+
49+
return response;
50+
}
51+
52+
private processTextLine(line: string) {
53+
return this.processParsedLine(JSON.parse(line));
54+
}
55+
56+
private async processBinaryLine(line: Uint8Array) {
57+
const bson = await this.remote.getBSON();
58+
await this.processParsedLine(bson.deserialize(line) as StreamingSyncLine);
59+
}
60+
61+
private async processParsedLine(line: StreamingSyncLine) {
62+
if (isStreamingSyncCheckpoint(line)) {
63+
this.lastStartedCheckpoint = line.checkpoint;
64+
await this.trackCheckpoint(line.checkpoint);
65+
} else if (isStreamingSyncCheckpointDiff(line) && this.lastStartedCheckpoint) {
66+
const diff = line.checkpoint_diff;
67+
const newBuckets = new Map<string, BucketChecksum>();
68+
for (const checksum of this.lastStartedCheckpoint.buckets) {
69+
newBuckets.set(checksum.bucket, checksum);
70+
}
71+
for (const checksum of diff.updated_buckets) {
72+
newBuckets.set(checksum.bucket, checksum);
73+
}
74+
for (const bucket of diff.removed_buckets) {
75+
newBuckets.delete(bucket);
76+
}
77+
78+
const newCheckpoint: Checkpoint = {
79+
last_op_id: diff.last_op_id,
80+
buckets: [...newBuckets.values()],
81+
write_checkpoint: diff.write_checkpoint
82+
};
83+
this.lastStartedCheckpoint = newCheckpoint;
84+
await this.trackCheckpoint(newCheckpoint);
85+
} else if (isStreamingSyncData(line)) {
86+
const batch = { buckets: [SyncDataBucket.fromRow(line.data)] };
87+
88+
await this.rdb.writeTransaction(async (tx) => {
89+
for (const bucket of batch.buckets) {
90+
// Record metrics
91+
const size = JSON.stringify(bucket.data).length;
92+
await tx.execute(
93+
`UPDATE local_bucket_data SET
94+
download_size = IFNULL(download_size, 0) + ?,
95+
last_op = ?,
96+
downloading = ?,
97+
downloaded_operations = IFNULL(downloaded_operations, 0) + ?
98+
WHERE id = ?`,
99+
[size, bucket.next_after, bucket.has_more, bucket.data.length, bucket.bucket]
100+
);
101+
}
102+
});
103+
104+
await this.schemaManager.updateFromOperations(batch);
105+
}
106+
}
107+
108+
private async trackCheckpoint(checkpoint: Checkpoint) {
109+
await this.rdb.writeTransaction(async (tx) => {
110+
for (const bucket of checkpoint.buckets) {
111+
await tx.execute(
112+
`INSERT OR REPLACE INTO local_bucket_data(id, total_operations, last_op, download_size, downloading, downloaded_operations)
113+
VALUES (
114+
?,
115+
?,
116+
IFNULL((SELECT last_op FROM local_bucket_data WHERE id = ?), '0'),
117+
IFNULL((SELECT download_size FROM local_bucket_data WHERE id = ?), 0),
118+
IFNULL((SELECT downloading FROM local_bucket_data WHERE id = ?), TRUE),
119+
IFNULL((SELECT downloaded_operations FROM local_bucket_data WHERE id = ?), TRUE)
120+
)`,
121+
[bucket.bucket, bucket.count, bucket.bucket, bucket.bucket, bucket.bucket, bucket.bucket]
122+
);
123+
}
124+
});
125+
}
126+
}

tools/diagnostics-app/src/library/powersync/TokenConnector.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { AbstractPowerSyncDatabase, PowerSyncBackendConnector } from '@powersync/web';
22
import { connect } from './ConnectionManager';
3+
import { LoginDetailsFormValues } from '@/components/widgets/LoginDetailsWidget';
34

45
export interface Credentials {
56
token: string;
@@ -21,11 +22,12 @@ export class TokenConnector implements PowerSyncBackendConnector {
2122
await tx?.complete();
2223
}
2324

24-
async signIn(credentials: Credentials) {
25+
async signIn(credentials: LoginDetailsFormValues) {
2526
validateSecureContext(credentials.endpoint);
2627
checkJWT(credentials.token);
2728
try {
2829
localStorage.setItem('powersync_credentials', JSON.stringify(credentials));
30+
localStorage.setItem('preferred_client_implementation', credentials.clientImplementation);
2931
await connect();
3032
} catch (e) {
3133
this.clearCredentials();
@@ -39,6 +41,7 @@ export class TokenConnector implements PowerSyncBackendConnector {
3941

4042
clearCredentials() {
4143
localStorage.removeItem('powersync_credentials');
44+
localStorage.removeItem('preferred_client_implementation');
4245
}
4346
}
4447

0 commit comments

Comments
 (0)