Skip to content

Commit 61a4c4d

Browse files
committed
Properly update subscriptions
1 parent 20d02f5 commit 61a4c4d

File tree

6 files changed

+62
-44
lines changed

6 files changed

+62
-44
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:convert';
23

34
import 'package:logging/logging.dart';
45
import 'package:meta/meta.dart';
@@ -14,6 +15,8 @@ import 'package:powersync_core/src/schema.dart';
1415
import 'package:powersync_core/src/schema_logic.dart';
1516
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
1617
import 'package:powersync_core/src/sync/connection_manager.dart';
18+
import 'package:powersync_core/src/sync/instruction.dart';
19+
import 'package:powersync_core/src/sync/mutable_sync_status.dart';
1720
import 'package:powersync_core/src/sync/options.dart';
1821
import 'package:powersync_core/src/sync/sync_status.dart';
1922

@@ -137,53 +140,26 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
137140

138141
Future<void> _updateHasSynced() async {
139142
// Query the database to see if any data has been synced.
140-
final result = await database.getAll(
141-
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority;',
143+
final row = await database.get(
144+
'SELECT powersync_offline_sync_status() AS r;',
142145
);
143-
const prioritySentinel = 2147483647;
144-
var hasSynced = false;
145-
DateTime? lastCompleteSync;
146-
final priorityStatusEntries = <SyncPriorityStatus>[];
147146

148-
DateTime parseDateTime(String sql) {
149-
return DateTime.parse('${sql}Z').toLocal();
150-
}
151-
152-
for (final row in result) {
153-
final priority = row.columnAt(0) as int;
154-
final lastSyncedAt = parseDateTime(row.columnAt(1) as String);
155-
156-
if (priority == prioritySentinel) {
157-
hasSynced = true;
158-
lastCompleteSync = lastSyncedAt;
159-
} else {
160-
priorityStatusEntries.add((
161-
hasSynced: true,
162-
lastSyncedAt: lastSyncedAt,
163-
priority: BucketPriority(priority)
164-
));
165-
}
166-
}
147+
final status = CoreSyncStatus.fromJson(
148+
json.decode(row['r'] as String) as Map<String, Object?>);
167149

168-
if (hasSynced != currentStatus.hasSynced) {
169-
final status = SyncStatus(
170-
hasSynced: hasSynced,
171-
lastSyncedAt: lastCompleteSync,
172-
priorityStatusEntries: priorityStatusEntries,
173-
);
174-
setStatus(status);
175-
}
150+
setStatus((MutableSyncStatus()..applyFromCore(status))
151+
.immutableSnapshot(setLastSynced: true));
176152
}
177153

178154
/// Returns a [Future] which will resolve once at least one full sync cycle
179155
/// has completed (meaninng that the first consistent checkpoint has been
180156
/// reached across all buckets).
181157
///
182158
/// When [priority] is null (the default), this method waits for the first
183-
/// full sync checkpoint to complete. When set to a [BucketPriority] however,
159+
/// full sync checkpoint to complete. When set to a [StreamPriority] however,
184160
/// it completes once all buckets within that priority (as well as those in
185161
/// higher priorities) have been synchronized at least once.
186-
Future<void> waitForFirstSync({BucketPriority? priority}) async {
162+
Future<void> waitForFirstSync({StreamPriority? priority}) async {
187163
bool matches(SyncStatus status) {
188164
if (priority == null) {
189165
return status.hasSynced == true;

packages/powersync_core/lib/src/sync/connection_manager.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ final class ConnectionManager {
197197
// would be equal and don't require an event. So, check again.
198198
if (newStatus != currentStatus) {
199199
_currentStatus = newStatus;
200-
_statusController.add(currentStatus);
200+
_statusController.add(_currentStatus);
201201
}
202202
}
203203
}

packages/powersync_core/lib/src/sync/instruction.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ final class CoreSyncStatus {
8585
null => null,
8686
final raw as Map<String, Object?> => DownloadProgress.fromJson(raw),
8787
},
88-
streams: (json['streams'] as List<Object?>?)
89-
?.map((e) =>
88+
streams: (json['streams'] as List<Object?>)
89+
.map((e) =>
9090
CoreActiveStreamSubscription.fromJson(e as Map<String, Object?>))
9191
.toList(),
9292
);

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ final class MutableSyncStatus {
9797
streams = status.streams;
9898
}
9999

100-
SyncStatus immutableSnapshot() {
100+
SyncStatus immutableSnapshot({bool setLastSynced = false}) {
101101
return SyncStatus(
102102
connected: connected,
103103
connecting: connecting,
@@ -106,7 +106,7 @@ final class MutableSyncStatus {
106106
downloadProgress: downloadProgress?.asSyncDownloadProgress,
107107
priorityStatusEntries: UnmodifiableListView(priorityStatusEntries),
108108
lastSyncedAt: lastSyncedAt,
109-
hasSynced: null, // Stream client is not supposed to set this value.
109+
hasSynced: setLastSynced ? lastSyncedAt != null : null,
110110
uploadError: uploadError,
111111
downloadError: downloadError,
112112
streamSubscriptions: streams,

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class StreamingSyncImplementation implements StreamingSync {
135135
void updateSubscriptions(List<SubscribedStream> streams) {
136136
_activeSubscriptions = streams;
137137
if (_nonLineSyncEvents.hasListener) {
138-
_nonLineSyncEvents.add(const AbortCurrentIteration());
138+
_nonLineSyncEvents.add(HandleChangedSubscriptions(streams));
139139
}
140140
}
141141

@@ -464,6 +464,7 @@ class StreamingSyncImplementation implements StreamingSync {
464464
_state.updateStatus((s) => s.setConnected());
465465
await handleLine(line as StreamingSyncLine);
466466
case UploadCompleted():
467+
case HandleChangedSubscriptions():
467468
// Only relevant for the Rust sync implementation.
468469
break;
469470
case AbortCurrentIteration():
@@ -612,6 +613,12 @@ final class _ActiveRustStreamingIteration {
612613

613614
_ActiveRustStreamingIteration(this.sync);
614615

616+
List<Object?> _encodeSubscriptions(List<SubscribedStream> subscriptions) {
617+
return sync._activeSubscriptions
618+
.map((s) => {'name': s.name, 'params': s.parameters})
619+
.toList();
620+
}
621+
615622
Future<void> syncIteration() async {
616623
try {
617624
await _control(
@@ -620,9 +627,7 @@ final class _ActiveRustStreamingIteration {
620627
'parameters': sync.options.params,
621628
'schema': convert.json.decode(sync.schemaJson),
622629
'include_defaults': sync.options.includeDefaultStreams,
623-
'active_streams': sync._activeSubscriptions
624-
.map((s) => {'name': s.name, 'params': s.parameters})
625-
.toList(),
630+
'active_streams': _encodeSubscriptions(sync._activeSubscriptions),
626631
}),
627632
);
628633
assert(_completedStream.isCompleted, 'Should have started streaming');
@@ -672,6 +677,9 @@ final class _ActiveRustStreamingIteration {
672677
break loop;
673678
case TokenRefreshComplete():
674679
await _control('refreshed_token');
680+
case HandleChangedSubscriptions(:final currentSubscriptions):
681+
await _control('update_subscriptions',
682+
convert.json.encode(_encodeSubscriptions(currentSubscriptions)));
675683
}
676684
}
677685
}
@@ -761,3 +769,9 @@ final class TokenRefreshComplete implements SyncEvent {
761769
final class AbortCurrentIteration implements SyncEvent {
762770
const AbortCurrentIteration();
763771
}
772+
773+
final class HandleChangedSubscriptions implements SyncEvent {
774+
final List<SubscribedStream> currentSubscriptions;
775+
776+
HandleChangedSubscriptions(this.currentSubscriptions);
777+
}

packages/powersync_core/test/sync/stream_test.dart

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,32 @@ void main() {
183183
),
184184
);
185185
});
186+
187+
test('changes subscriptions dynamically', () async {
188+
await waitForConnection();
189+
syncService.addKeepAlive();
190+
191+
final subscription = await database.syncStream('a').subscribe();
192+
syncService.endCurrentListener();
193+
final request = await syncService.waitForListener;
194+
expect(
195+
json.decode(await request.readAsString()),
196+
containsPair(
197+
'streams',
198+
containsPair('subscriptions', [
199+
{
200+
'stream': 'a',
201+
'parameters': null,
202+
'override_priority': null,
203+
},
204+
]),
205+
),
206+
);
207+
208+
// Given that the subscription has a TTL, dropping the handle should not
209+
// re-subscribe.
210+
await subscription.unsubscribe();
211+
await pumpEventQueue();
212+
expect(syncService.controller.hasListener, isTrue);
213+
});
186214
}

0 commit comments

Comments
 (0)