Skip to content

Commit f3c4df7

Browse files
close http streams better
1 parent bed68be commit f3c4df7

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { DataStream } from '../../../utils/DataStream.js';
88
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
99
import { WebsocketClientTransport } from './WebsocketClientTransport.js';
1010
import { StreamingSyncRequest } from './streaming-sync-types.js';
11-
;
11+
1212

1313
export type BSONImplementation = typeof BSON;
1414

@@ -570,17 +570,21 @@ export abstract class AbstractRemote {
570570
reader.releaseLock();
571571
};
572572

573+
574+
const stream = new DataStream<T, string>({
575+
logger: this.logger,
576+
mapLine: mapLine
577+
});
578+
573579
abortSignal?.addEventListener('abort', () => {
574580
closeReader();
581+
stream.close();
575582
});
576583

577584
const decoder = this.createTextDecoder();
578585
let buffer = '';
579586

580-
const stream = new DataStream<T, string>({
581-
logger: this.logger,
582-
mapLine: mapLine
583-
});
587+
584588

585589
const l = stream.registerListener({
586590
lowWater: async () => {

packages/node/tests/sync.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
484484

485485
// Re-open database
486486
await database.close();
487+
487488
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
488489
database = await syncService.createDatabase();
489490
database.connect(new TestConnector(), options);

packages/web/tests/watch.test.ts

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -669,32 +669,26 @@ describe('Watch Tests', { sequential: true }, () => {
669669

670670
let notificationCount = 0;
671671
const dispose = watch.registerListener({
672-
onStateChange: (state) => {
672+
onStateChange: () => {
673673
notificationCount++;
674674
}
675675
});
676676
onTestFinished(dispose);
677677

678678
// Wait for the initial load to complete
679-
await vi.waitFor(
680-
() => {
681-
expect(notificationCount).equals(1);
682-
},
683-
{ timeout: 1000 }
684-
);
679+
await vi.waitFor(() => {
680+
expect(notificationCount).equals(1);
681+
});
685682

686683
notificationCount = 0; // We want to count the number of state changes after the initial load
687684

688685
// Should only a state change trigger for this operation
689686
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', uuid()]);
690687

691688
// We should get an update for the change above
692-
await vi.waitFor(
693-
() => {
694-
expect(notificationCount).equals(1);
695-
},
696-
{ timeout: 1000 }
697-
);
689+
await vi.waitFor(() => {
690+
expect(notificationCount).equals(1);
691+
});
698692

699693
// Should not trigger any state change for these operations
700694
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['make1', uuid()]);

0 commit comments

Comments
 (0)