Skip to content

Commit 4d0fe53

Browse files
committed
Fix pushing into closed stream
1 parent 133353f commit 4d0fe53

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,9 @@ The next upload iteration will be delayed.`);
10711071
await control(PowerSyncControlCommand.START, JSON.stringify(options));
10721072

10731073
this.notifyCompletedUploads = () => {
1074-
controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED });
1074+
if (controlInvocations && !controlInvocations?.closed) {
1075+
controlInvocations.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED });
1076+
}
10751077
};
10761078
await receivingLines;
10771079
} finally {

packages/node/tests/sync.test.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -873,8 +873,17 @@ function defineSyncTests(impl: SyncClientImplementation) {
873873
parameters: { a: 0 }
874874
});
875875

876+
await powersync.execute('insert into lists (id, name) values (?, ?);', ['local_list', 'local']);
877+
878+
await vi.waitFor(() =>
879+
expect(syncService.connectedListeners[0]).toMatchObject({
880+
parameters: { a: 1 }
881+
})
882+
);
883+
876884
syncService.pushLine({
877885
checkpoint: {
886+
write_checkpoint: '1',
878887
last_op_id: '1',
879888
buckets: [bucket('a', 1)]
880889
}
@@ -887,9 +896,17 @@ function defineSyncTests(impl: SyncClientImplementation) {
887896
checksum: 0,
888897
op_id: '1',
889898
op: 'PUT',
899+
object_id: 'local_list',
900+
object_type: 'lists',
901+
data: '{"name": "local"}'
902+
},
903+
{
904+
checksum: 0,
905+
op_id: '2',
906+
op: 'PUT',
890907
object_id: 'my_list',
891908
object_type: 'lists',
892-
data: '{"name": "l"}'
909+
data: '{"name": "r"}'
893910
}
894911
]
895912
}
@@ -898,7 +915,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
898915

899916
await vi.waitFor(() =>
900917
expect(syncService.connectedListeners[0]).toMatchObject({
901-
parameters: { a: 1 }
918+
parameters: { a: 2 }
902919
})
903920
);
904921

0 commit comments

Comments
 (0)