Skip to content

Commit 83e40e3

Browse files
authored
Merge pull request #189 from poissoncorp/RDBC-730
RDBC-730 Failing flaky tests
2 parents b25a774 + 0f7c567 commit 83e40e3

File tree

8 files changed

+84
-64
lines changed

8 files changed

+84
-64
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,7 @@ all_documents_changes = []
894894
# Subscribe for all documents, or for specific collection (or other database items)
895895
all_observer = self.store.changes().for_all_documents()
896896
897-
close_method = all_observer.subscribe(ActionObserver(on_next=all_documents_changes.append))
897+
close_method = all_observer.subscribe_with_observer(ActionObserver(on_next=all_documents_changes.append))
898898
all_observer.ensure_subscribe_now()
899899
900900
session = store.open_session()

ravendb/changes/observers.py

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from abc import ABCMeta, abstractmethod
1+
from __future__ import annotations
2+
from concurrent.futures import Future
3+
from threading import Lock
24
from typing import Callable, Generic, TypeVar
35

46
from ravendb.tools.concurrentset import ConcurrentSet
5-
from concurrent.futures import Future
6-
from threading import Lock
77

88
_T_Change = TypeVar("_T_Change")
99

@@ -32,21 +32,36 @@ def on_document_change_notification(self, value: _T_Change):
3232
except Exception as e:
3333
self.error(e)
3434

35-
def subscribe(self, observer) -> Callable[[], None]:
35+
def subscribe(self, on_next_callback: Callable[[], None]) -> Callable[[], None]:
3636
"""
37-
@param Observer or func observer: The observer that will do action when changes happens
37+
@param func on_next_callback: The action that observer will do when changes happens
3838
:return: method that close the subscriber
3939
"""
4040
self.inc()
41-
if not isinstance(observer, Observer):
42-
observer = ActionObserver(on_next=observer)
41+
observer = ActionObserver(on_next=on_next_callback)
4342
self._subscribers.add(observer)
4443

45-
def close_action():
44+
def close_action() -> None:
4645
self.dec()
4746
self._subscribers.remove(observer)
48-
if "on_complete" in observer.__dict__ and "__call__" in observer.on_complete.__dict__:
49-
observer.on_complete()
47+
if observer.on_completed_callback is not None:
48+
observer.on_completed()
49+
50+
return close_action
51+
52+
def subscribe_with_observer(self, observer: ActionObserver) -> Callable[[], None]:
53+
"""
54+
@param Observer observer: The observer that will do action when changes happens
55+
:return: method that close the subscriber
56+
"""
57+
self.inc()
58+
self._subscribers.add(observer)
59+
60+
def close_action() -> None:
61+
self.dec()
62+
self._subscribers.remove(observer)
63+
if observer.on_completed_callback is not None:
64+
observer.on_completed()
5065

5166
return close_action
5267

@@ -106,38 +121,28 @@ def send(self, msg: _T_Change):
106121
return
107122

108123

109-
class Observer(metaclass=ABCMeta):
110-
@abstractmethod
111-
def on_completed(self) -> None:
112-
pass
113-
114-
@abstractmethod
115-
def on_error(self, exception: Exception) -> None:
116-
pass
117-
118-
@abstractmethod
119-
def on_next(self, value: _T_Change) -> None:
120-
pass
121-
122-
123-
class ActionObserver(Observer):
124+
class ActionObserver:
124125
def __init__(
125126
self,
126127
on_next: Callable[[_T_Change], None],
127128
on_error: Callable[[Exception], None] = None,
128129
on_completed: Callable[[], None] = None,
129130
):
130-
self._on_next = on_next
131-
self._on_error = on_error
132-
self._on_completed = on_completed
131+
self._on_next_callback = on_next
132+
self._on_error_callback = on_error
133+
self._on_completed_callback = on_completed
133134

134135
def on_next(self, value: _T_Change) -> None:
135-
self._on_next(value)
136+
self._on_next_callback(value)
136137

137138
def on_error(self, exception: Exception) -> None:
138-
if self._on_error:
139-
self._on_error(exception)
139+
if self._on_error_callback:
140+
self._on_error_callback(exception)
140141

141142
def on_completed(self) -> None:
142-
if self._on_completed:
143-
self._on_completed()
143+
if self._on_completed_callback:
144+
self._on_completed_callback()
145+
146+
@property
147+
def on_completed_callback(self):
148+
return self._on_completed_callback

ravendb/tests/database_changes/test_database_changes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def on_next(value):
3535
event.set()
3636

3737
all_observer = self.store.changes().for_all_documents()
38-
close_method = all_observer.subscribe(ActionObserver(on_next=on_next))
38+
close_method = all_observer.subscribe_with_observer(ActionObserver(on_next=on_next))
3939
all_observer.ensure_subscribe_now()
4040
with self.store.open_session() as session:
4141
session.store(User("Idan"))

ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_11703.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import threading
2+
import time
13
from queue import Queue
24

35
from ravendb.changes.types import CounterChange, CounterChangeTypes
@@ -49,9 +51,9 @@ def test_can_get_notification_about_counter_increment(self):
4951
close_method()
5052

5153
def test_can_get_notification_about_counter_delete(self):
52-
changes_queue = Queue()
54+
changes = []
5355
observer = self.store.changes().for_counters_of_document("users/1")
54-
close_method = observer.subscribe(changes_queue.put)
56+
close_method = observer.subscribe(changes.append)
5557

5658
with self.store.open_session() as session:
5759
session.store(User(), "users/1")
@@ -60,8 +62,13 @@ def test_can_get_notification_about_counter_delete(self):
6062
with self.store.open_session() as session:
6163
session.counters_for("users/1").increment("likes")
6264
session.save_changes()
65+
i = 0
66+
while i < 100 and not changes:
67+
i += 1
68+
time.sleep(0.1)
69+
70+
counter_change = changes.pop()
6371

64-
counter_change: CounterChange = changes_queue.get(timeout=2)
6572
self.assertIsNotNone(counter_change)
6673

6774
self.assertEqual("users/1", counter_change.document_id)
@@ -75,7 +82,11 @@ def test_can_get_notification_about_counter_delete(self):
7582
session.counters_for("users/1").delete("likes")
7683
session.save_changes()
7784

78-
counter_change = changes_queue.get(timeout=2)
85+
while i < 100 and not changes:
86+
i += 1
87+
time.sleep(0.1)
88+
89+
counter_change = changes.pop()
7990
self.assertIsNotNone(counter_change)
8091

8192
self.assertEqual("users/1", counter_change.document_id)

ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_13456.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import unittest
2+
13
from ravendb.documents.operations.configuration import ClientConfiguration, PutClientConfigurationOperation
24
from ravendb.documents.operations.statistics import GetStatisticsOperation
35
from ravendb.documents.session.misc import SessionOptions, TransactionMode
@@ -10,6 +12,7 @@ class TestRavenDB13456(TestBase):
1012
def setUp(self):
1113
super().setUp()
1214

15+
@unittest.skip("Fails on cicd due to free license - adding the client configuration is disallowed")
1316
def test_can_change_identity_parts_separator(self):
1417
with self.store.open_session() as session:
1518
company1 = Company()

ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_13735.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import time
3+
import unittest
34

45
from ravendb import DocumentStore, RefreshConfiguration
56
from ravendb.documents.operations.refresh.configuration import ConfigureRefreshOperation
@@ -19,7 +20,7 @@ def _setup_refresh(self, store: DocumentStore) -> None:
1920

2021
store.maintenance.send(ConfigureRefreshOperation(config))
2122

22-
# todo: flaky test that fails around 12AM - known issue
23+
@unittest.skip("Fails on cicd due to free license - refresh frequency is below allowed 36 hours")
2324
def test_refresh_will_update_document_change_vector(self):
2425
self._setup_refresh(self.store)
2526

@@ -29,16 +30,7 @@ def test_refresh_will_update_document_change_vector(self):
2930
session.store(user, "users/1-A")
3031

3132
datetime_now = datetime.datetime.now()
32-
hour_ago = datetime.datetime(
33-
year=datetime_now.year,
34-
month=datetime_now.month,
35-
day=datetime_now.day,
36-
hour=datetime_now.hour - 1,
37-
minute=datetime_now.minute,
38-
second=datetime_now.second,
39-
microsecond=datetime_now.microsecond,
40-
tzinfo=datetime_now.tzinfo,
41-
)
33+
hour_ago = datetime_now - datetime.timedelta(hours=1)
4234

4335
session.advanced.get_metadata_for(user)["@refresh"] = hour_ago.isoformat()
4436
session.save_changes()

ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_14006.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
import unittest
23
from typing import Optional
34

@@ -279,7 +280,7 @@ def __statistics_callback(stats: QueryStatistics):
279280
).statistics(__statistics_callback)
280281
)
281282
self.assertEqual(1, len(companies))
282-
self.assertGreater(statistics.duration_in_ms, 0)
283+
self.assertGreaterEqual(statistics.duration_in_ms, 0)
283284
number_of_requests = session.number_of_requests
284285
result_etag = statistics.result_etag
285286

@@ -309,6 +310,8 @@ def __statistics_callback(stats: QueryStatistics):
309310
)
310311
value.value.city = "Bydgoszcz"
311312
inner_session.save_changes()
313+
self.wait_for_indexing(self.store)
314+
time.sleep(1)
312315

313316
companies = list(
314317
session.advanced.raw_query(
@@ -319,7 +322,9 @@ def __statistics_callback(stats: QueryStatistics):
319322
+ "from Companies as c\n"
320323
+ "select incl(c)",
321324
Company,
322-
).statistics(__statistics_callback)
325+
)
326+
.statistics(__statistics_callback)
327+
.wait_for_non_stale_results()
323328
)
324329

325330
self.assertEqual(1, len(companies))
@@ -460,6 +465,8 @@ def __include_cmpxch(builder: QueryIncludeBuilder):
460465
value.value.city = "Bydgoszcz"
461466

462467
inner_session.save_changes()
468+
self.wait_for_indexing(self.store)
469+
time.sleep(0.3)
463470

464471
companies = list(
465472
session.query(object_type=Company).statistics(__statistics_callback).include(__include_cmpxch)
@@ -539,10 +546,12 @@ def __include_cmpxch(builder: QueryIncludeBuilder):
539546
inner_session.save_changes()
540547

541548
self.wait_for_indexing(self.store)
549+
time.sleep(0.3)
542550

543551
companies = list(
544552
session.query_index_type(Companies_ByName, Company)
545553
.statistics(__statistics_callback)
554+
.wait_for_non_stale_results()
546555
.include(__include_cmpxch)
547556
)
548557

ravendb/tests/jvm_migrated_tests/server_tests/documents/notifications/test_changes.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ def __ev(value: DocumentChange):
4242
document_change = value
4343
event.set()
4444

45-
subscription = ActionObserver(__ev)
46-
close_action = observable.subscribe(subscription)
45+
observer = ActionObserver(__ev)
46+
close_action = observable.subscribe_with_observer(observer)
4747
observable.ensure_subscribe_now()
4848

4949
with self.store.open_session() as session:
@@ -81,8 +81,8 @@ def __ev(value: DocumentChange):
8181
if len(changes_list) == 2:
8282
event.set()
8383

84-
subscription = ActionObserver(__ev)
85-
close_action = observable.subscribe(subscription)
84+
observer = ActionObserver(__ev)
85+
close_action = observable.subscribe_with_observer(observer)
8686
observable.ensure_subscribe_now()
8787

8888
with self.store.open_session() as session:
@@ -117,8 +117,8 @@ def __ev(value: DocumentChange):
117117
document_change = value
118118
event.set()
119119

120-
subscription = ActionObserver(__ev)
121-
close_action = observable.subscribe(subscription)
120+
observer = ActionObserver(__ev)
121+
close_action = observable.subscribe_with_observer(observer)
122122
observable.ensure_subscribe_now()
123123

124124
with self.store.open_session() as session:
@@ -160,8 +160,8 @@ def __ev(value: IndexChange):
160160
index_change = value
161161
event.set()
162162

163-
subscription = ActionObserver(__ev)
164-
close_action = observable.subscribe(subscription)
163+
observer = ActionObserver(__ev)
164+
close_action = observable.subscribe_with_observer(observer)
165165
observable.ensure_subscribe_now()
166166

167167
operation = SetIndexesPriorityOperation(IndexPriority.LOW, index.index_name)
@@ -184,8 +184,8 @@ def __ev(value: IndexChange):
184184
index_change = value
185185
event.set()
186186

187-
subscription = ActionObserver(__ev)
188-
close_action = observable.subscribe(subscription)
187+
observer = ActionObserver(__ev)
188+
close_action = observable.subscribe_with_observer(observer)
189189
observable.ensure_subscribe_now()
190190

191191
operation = SetIndexesPriorityOperation(IndexPriority.LOW, index.index_name)
@@ -208,8 +208,8 @@ def __ev(value: DocumentChange):
208208
if len(document_changes) == 2:
209209
event.set()
210210

211-
subscription = ActionObserver(__ev)
212-
close_action = observable.subscribe(subscription)
211+
observer = ActionObserver(__ev)
212+
close_action = observable.subscribe_with_observer(observer)
213213
observable.ensure_subscribe_now()
214214

215215
with self.store.open_session() as session:

0 commit comments

Comments
 (0)