|
| 1 | +from ravendb.documents.subscriptions.revision import Revision |
| 2 | + |
| 3 | +from ravendb.documents.subscriptions.worker import SubscriptionBatch |
| 4 | + |
| 5 | +from ravendb.documents.subscriptions.options import SubscriptionWorkerOptions |
| 6 | + |
| 7 | +from ravendb.documents.operations.revisions import RevisionsCollectionConfiguration, RevisionsConfiguration, \ |
| 8 | + ConfigureRevisionsOperation |
| 9 | + |
| 10 | +from ravendb.tests.test_base import TestBase, User, Company |
| 11 | + |
| 12 | +import threading |
| 13 | + |
| 14 | + |
| 15 | +class TestRevisionsSubscriptions(TestBase): |
| 16 | + def setUp(self): |
| 17 | + super(TestRevisionsSubscriptions, self).setUp() |
| 18 | + |
| 19 | + def test_plain_revisions_subscriptions_compare_docs(self): |
| 20 | + subscription_id = self.store.subscriptions.create_for_revisions(object_type=User) |
| 21 | + |
| 22 | + default_collection = RevisionsCollectionConfiguration(minimum_revisions_to_keep=5) |
| 23 | + users_config = RevisionsCollectionConfiguration() |
| 24 | + dons_config = RevisionsCollectionConfiguration() |
| 25 | + |
| 26 | + per_collection_config = { "Users": users_config, "Dons": dons_config } |
| 27 | + configuration = RevisionsConfiguration(default_collection, per_collection_config) |
| 28 | + |
| 29 | + operation = ConfigureRevisionsOperation(configuration) |
| 30 | + self.store.maintenance.send(operation) |
| 31 | + |
| 32 | + for i in range(10): |
| 33 | + with self.store.open_session() as session: |
| 34 | + user = User(f"users1 ver {i}", i) |
| 35 | + session.store(user) |
| 36 | + |
| 37 | + company = Company(name=f"dons1 ver {i}") |
| 38 | + session.store(company) |
| 39 | + |
| 40 | + session.save_changes() |
| 41 | + |
| 42 | + sub = self.store.subscriptions.get_subscription_worker_for_revisions(SubscriptionWorkerOptions(subscription_id), object_type=User) |
| 43 | + |
| 44 | + mre = threading.Semaphore(0) |
| 45 | + names = [] |
| 46 | + max_age = -1 |
| 47 | + |
| 48 | + def process_batch(batch: SubscriptionBatch): |
| 49 | + nonlocal mre, names, max_age |
| 50 | + |
| 51 | + for item in batch.items: |
| 52 | + item: SubscriptionBatch.Item[Revision[User]] |
| 53 | + x = item.result |
| 54 | + |
| 55 | + if x.current.age > max_age and x.current.age > (x.previous.age if x.previous is not None else -1): |
| 56 | + current_name = x.current.name if x.current is not None else None |
| 57 | + previous_name = x.previous.name if x.previous is not None else None |
| 58 | + names.append(f"{current_name} + {previous_name}") |
| 59 | + max_age = x.current.age |
| 60 | + |
| 61 | + if len(names) == 10: |
| 62 | + mre.release() |
| 63 | + |
| 64 | + # todo kuba: RuntimeError: dictionary changed size during iteration |
| 65 | + # todo kuba: TypeError: 'NoneType' object does not support the context manager protocol |
| 66 | + sub.run(process_batch) |
| 67 | + |
| 68 | + self.assertTrue(mre.acquire(timeout=15)) |
0 commit comments