Skip to content

Commit 110de9a

Browse files
committed
RDBC-435 Changes Observable uneven connection state count prematurely closes changes connection
1 parent e425468 commit 110de9a

File tree

2 files changed

+82
-3
lines changed

2 files changed

+82
-3
lines changed

src/Documents/Changes/ChangesObservable.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,12 @@ export class ChangesObservable<T, TConnectionState extends IChangesConnectionSta
5959
public off(event: "error", handler: (error: Error) => void);
6060
public off(event: "data" | "error", handler: ((value: T) => void) | ((error: Error) => void)) {
6161

62-
this._connectionState.dec();
63-
6462
switch (event) {
6563
case "data":
66-
this._subscribers.delete(handler as (value: T) => void);
64+
if (this._subscribers.delete(handler as (value: T) => void)) {
65+
this._connectionState.dec();
66+
}
67+
6768
if (!this._subscribers.size) {
6869
// no more subscribers left - remove from parent
6970
this._connectionState.removeOnChangeNotification(this._type, this._sendHandler);

test/Ported/Issues/RDBC_435.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { testContext, disposeTestDocumentStore } from "../../Utils/TestUtil";
2+
3+
import {
4+
IDocumentStore, DocumentChange,
5+
} from "../../../src";
6+
import { User } from "../../Assets/Entities";
7+
import { AsyncQueue } from "../../Utils/AsyncQueue";
8+
import { throwError } from "../../../src/Exceptions";
9+
import { assertThat, assertThrows } from "../../Utils/AssertExtensions";
10+
11+
describe("RDBC_435", function () {
12+
13+
let store: IDocumentStore;
14+
15+
beforeEach(async function () {
16+
store = await testContext.getDocumentStore();
17+
});
18+
19+
afterEach(async () =>
20+
await disposeTestDocumentStore(store));
21+
22+
it("can handle data/errors on/off", async () => {
23+
24+
const createUser = async () => {
25+
const session = store.openSession();
26+
const user1 = new User();
27+
user1.age = 5;
28+
await session.store(user1);
29+
await session.saveChanges();
30+
31+
return user1.id;
32+
}
33+
34+
const changesList = new AsyncQueue<DocumentChange>();
35+
36+
const changes = store.changes();
37+
await changes.ensureConnectedNow();
38+
39+
const observable = changes.forDocumentsInCollection("users");
40+
await observable.ensureSubscribedNow();
41+
42+
const handler = (change: DocumentChange) => changesList.push(change);
43+
44+
const errorHandler = e => throwError("InvalidOperationException", e.message);
45+
46+
observable.on("data", handler);
47+
observable.on("error", errorHandler);
48+
49+
const user1Id = await createUser();
50+
51+
const change1 = await changesList.poll(15_000);
52+
assertThat(change1)
53+
.isNotNull();
54+
assertThat(change1.id)
55+
.isEqualTo(user1Id);
56+
57+
// now disable error handler, create next user and wait for change
58+
observable.off("error", errorHandler);
59+
60+
const user2Id = await createUser();
61+
62+
const change2 = await changesList.poll(15_000);
63+
assertThat(change2)
64+
.isNotNull();
65+
assertThat(change2.id)
66+
.isEqualTo(user2Id);
67+
68+
// disable data handle - not we shouldn't get any data
69+
70+
observable.off("data", handler);
71+
await createUser();
72+
73+
await assertThrows(async () => changesList.poll(3_000), err => {
74+
assertThat(err.name)
75+
.isEqualTo("TimeoutException");
76+
});
77+
});
78+
});

0 commit comments

Comments
 (0)