Skip to content

Commit 7cea2d7

Browse files
committed
RDBC-730 Improve Changes API
1 parent 6b2239d commit 7cea2d7

File tree

5 files changed

+54
-49
lines changed

5 files changed

+54
-49
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,7 @@ all_documents_changes = []
894894
# Subscribe for all documents, or for specific collection (or other database items)
895895
all_observer = self.store.changes().for_all_documents()
896896
897-
close_method = all_observer.subscribe(ActionObserver(on_next=all_documents_changes.append))
897+
close_method = all_observer.subscribe_with_observer(ActionObserver(on_next=all_documents_changes.append))
898898
all_observer.ensure_subscribe_now()
899899
900900
session = store.open_session()

ravendb/changes/observers.py

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from abc import ABCMeta, abstractmethod
1+
from __future__ import annotations
2+
from concurrent.futures import Future
3+
from threading import Lock
24
from typing import Callable, Generic, TypeVar
35

46
from ravendb.tools.concurrentset import ConcurrentSet
5-
from concurrent.futures import Future
6-
from threading import Lock
77

88
_T_Change = TypeVar("_T_Change")
99

@@ -32,21 +32,36 @@ def on_document_change_notification(self, value: _T_Change):
3232
except Exception as e:
3333
self.error(e)
3434

35-
def subscribe(self, observer) -> Callable[[], None]:
35+
def subscribe(self, on_next_callback: Callable[[], None]) -> Callable[[], None]:
3636
"""
37-
@param Observer or func observer: The observer that will do action when changes happens
37+
@param func on_next_callback: The action that observer will do when changes happens
3838
:return: method that close the subscriber
3939
"""
4040
self.inc()
41-
if not isinstance(observer, Observer):
42-
observer = ActionObserver(on_next=observer)
41+
observer = ActionObserver(on_next=on_next_callback)
4342
self._subscribers.add(observer)
4443

45-
def close_action():
44+
def close_action() -> None:
4645
self.dec()
4746
self._subscribers.remove(observer)
48-
if "on_complete" in observer.__dict__ and "__call__" in observer.on_complete.__dict__:
49-
observer.on_complete()
47+
if observer.on_completed_callback is not None:
48+
observer.on_completed()
49+
50+
return close_action
51+
52+
def subscribe_with_observer(self, observer: ActionObserver) -> Callable[[], None]:
53+
"""
54+
@param Observer observer: The observer that will do action when changes happens
55+
:return: method that close the subscriber
56+
"""
57+
self.inc()
58+
self._subscribers.add(observer)
59+
60+
def close_action() -> None:
61+
self.dec()
62+
self._subscribers.remove(observer)
63+
if observer.on_completed_callback is not None:
64+
observer.on_completed()
5065

5166
return close_action
5267

@@ -106,38 +121,28 @@ def send(self, msg: _T_Change):
106121
return
107122

108123

109-
class Observer(metaclass=ABCMeta):
110-
@abstractmethod
111-
def on_completed(self) -> None:
112-
pass
113-
114-
@abstractmethod
115-
def on_error(self, exception: Exception) -> None:
116-
pass
117-
118-
@abstractmethod
119-
def on_next(self, value: _T_Change) -> None:
120-
pass
121-
122-
123-
class ActionObserver(Observer):
124+
class ActionObserver:
124125
def __init__(
125126
self,
126127
on_next: Callable[[_T_Change], None],
127128
on_error: Callable[[Exception], None] = None,
128129
on_completed: Callable[[], None] = None,
129130
):
130-
self._on_next = on_next
131-
self._on_error = on_error
132-
self._on_completed = on_completed
131+
self._on_next_callback = on_next
132+
self._on_error_callback = on_error
133+
self._on_completed_callback = on_completed
133134

134135
def on_next(self, value: _T_Change) -> None:
135-
self._on_next(value)
136+
self._on_next_callback(value)
136137

137138
def on_error(self, exception: Exception) -> None:
138-
if self._on_error:
139-
self._on_error(exception)
139+
if self._on_error_callback:
140+
self._on_error_callback(exception)
140141

141142
def on_completed(self) -> None:
142-
if self._on_completed:
143-
self._on_completed()
143+
if self._on_completed_callback:
144+
self._on_completed_callback()
145+
146+
@property
147+
def on_completed_callback(self):
148+
return self._on_completed_callback

ravendb/tests/database_changes/test_database_changes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def on_next(value):
3535
event.set()
3636

3737
all_observer = self.store.changes().for_all_documents()
38-
close_method = all_observer.subscribe(ActionObserver(on_next=on_next))
38+
close_method = all_observer.subscribe_with_observer(ActionObserver(on_next=on_next))
3939
all_observer.ensure_subscribe_now()
4040
with self.store.open_session() as session:
4141
session.store(User("Idan"))

ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_11703.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def test_can_get_notification_about_counter_delete(self):
6161
session.counters_for("users/1").increment("likes")
6262
session.save_changes()
6363

64-
counter_change: CounterChange = changes_queue.get(timeout=20)
64+
counter_change: CounterChange = changes_queue.get(timeout=10)
6565
self.assertIsNotNone(counter_change)
6666

6767
self.assertEqual("users/1", counter_change.document_id)
@@ -75,7 +75,7 @@ def test_can_get_notification_about_counter_delete(self):
7575
session.counters_for("users/1").delete("likes")
7676
session.save_changes()
7777

78-
counter_change = changes_queue.get(timeout=20)
78+
counter_change = changes_queue.get(timeout=10)
7979
self.assertIsNotNone(counter_change)
8080

8181
self.assertEqual("users/1", counter_change.document_id)

ravendb/tests/jvm_migrated_tests/server_tests/documents/notifications/test_changes.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ def __ev(value: DocumentChange):
4242
document_change = value
4343
event.set()
4444

45-
subscription = ActionObserver(__ev)
46-
close_action = observable.subscribe(subscription)
45+
observer = ActionObserver(__ev)
46+
close_action = observable.subscribe_with_observer(observer)
4747
observable.ensure_subscribe_now()
4848

4949
with self.store.open_session() as session:
@@ -81,8 +81,8 @@ def __ev(value: DocumentChange):
8181
if len(changes_list) == 2:
8282
event.set()
8383

84-
subscription = ActionObserver(__ev)
85-
close_action = observable.subscribe(subscription)
84+
observer = ActionObserver(__ev)
85+
close_action = observable.subscribe_with_observer(observer)
8686
observable.ensure_subscribe_now()
8787

8888
with self.store.open_session() as session:
@@ -117,8 +117,8 @@ def __ev(value: DocumentChange):
117117
document_change = value
118118
event.set()
119119

120-
subscription = ActionObserver(__ev)
121-
close_action = observable.subscribe(subscription)
120+
observer = ActionObserver(__ev)
121+
close_action = observable.subscribe_with_observer(observer)
122122
observable.ensure_subscribe_now()
123123

124124
with self.store.open_session() as session:
@@ -160,8 +160,8 @@ def __ev(value: IndexChange):
160160
index_change = value
161161
event.set()
162162

163-
subscription = ActionObserver(__ev)
164-
close_action = observable.subscribe(subscription)
163+
observer = ActionObserver(__ev)
164+
close_action = observable.subscribe_with_observer(observer)
165165
observable.ensure_subscribe_now()
166166

167167
operation = SetIndexesPriorityOperation(IndexPriority.LOW, index.index_name)
@@ -184,8 +184,8 @@ def __ev(value: IndexChange):
184184
index_change = value
185185
event.set()
186186

187-
subscription = ActionObserver(__ev)
188-
close_action = observable.subscribe(subscription)
187+
observer = ActionObserver(__ev)
188+
close_action = observable.subscribe_with_observer(observer)
189189
observable.ensure_subscribe_now()
190190

191191
operation = SetIndexesPriorityOperation(IndexPriority.LOW, index.index_name)
@@ -208,8 +208,8 @@ def __ev(value: DocumentChange):
208208
if len(document_changes) == 2:
209209
event.set()
210210

211-
subscription = ActionObserver(__ev)
212-
close_action = observable.subscribe(subscription)
211+
observer = ActionObserver(__ev)
212+
close_action = observable.subscribe_with_observer(observer)
213213
observable.ensure_subscribe_now()
214214

215215
with self.store.open_session() as session:

0 commit comments

Comments
 (0)