From 3bde0ede82f59cabefdc60621d185de2fa4c13a5 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 1 Apr 2026 14:51:03 +0300 Subject: [PATCH 01/10] Added asyncio ringbuffer proxy --- hazelcast/internal/asyncio_client.py | 17 +- hazelcast/internal/asyncio_proxy/manager.py | 3 + .../internal/asyncio_proxy/ringbuffer.py | 299 ++++++++++++++++++ hazelcast/proxy/ringbuffer.py | 2 +- 4 files changed, 317 insertions(+), 4 deletions(-) create mode 100644 hazelcast/internal/asyncio_proxy/ringbuffer.py diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index f6e2d62954..b02d2a7222 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -7,7 +7,7 @@ from hazelcast.internal.asyncio_compact import CompactSchemaService from hazelcast.config import Config, IndexConfig from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAsyncioAddressProvider -from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo +from hazelcast.core import DistributedObjectEvent from hazelcast.discovery import HazelcastCloudAddressProvider from hazelcast.errors import IllegalStateError, InvalidConfigurationError from hazelcast.internal.asyncio_invocation import InvocationService, Invocation @@ -18,7 +18,6 @@ from hazelcast.internal.asyncio_partition import PartitionService, InternalPartitionService from hazelcast.protocol.codec import ( client_add_distributed_object_listener_codec, - client_get_distributed_objects_codec, client_remove_distributed_object_listener_codec, dynamic_config_add_vector_collection_config_codec, ) @@ -27,12 +26,13 @@ MAP_SERVICE, ProxyManager, REPLICATED_MAP_SERVICE, + RINGBUFFER_SERVICE, VECTOR_SERVICE, ) -from hazelcast.internal.asyncio_proxy.base import Proxy from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap +from hazelcast.internal.asyncio_proxy.ringbuffer import Ringbuffer from hazelcast.internal.asyncio_reactor import AsyncioReactor from hazelcast.serialization import SerializationServiceV1 from hazelcast.internal.asyncio_statistics import Statistics @@ -286,6 +286,17 @@ async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueTyp """ return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name) + async def get_ringbuffer(self, name: str) -> Ringbuffer: + """Returns the distributed Ringbuffer instance with the specified name. + + Args: + name: Name of the distributed ringbuffer. + + Returns: + Distributed Ringbuffer instance with the specified name. + """ + return await self._proxy_manager.get_or_create(RINGBUFFER_SERVICE, name) + async def create_vector_collection_config( self, name: str, diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 812d8eeb72..c34a1659b4 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -11,11 +11,13 @@ from hazelcast.internal.asyncio_proxy.base import Proxy from hazelcast.internal.asyncio_proxy.map import create_map_proxy from hazelcast.internal.asyncio_proxy.replicated_map import create_replicated_map_proxy +from hazelcast.internal.asyncio_proxy.ringbuffer import create_ringbuffer_proxy from hazelcast.util import to_list LIST_SERVICE = "hz:impl:listService" MAP_SERVICE = "hz:impl:mapService" REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" +RINGBUFFER_SERVICE = "hz:impl:ringbufferService" VECTOR_SERVICE = "hz:service:vector" _proxy_init: typing.Dict[ @@ -25,6 +27,7 @@ LIST_SERVICE: create_list_proxy, MAP_SERVICE: create_map_proxy, REPLICATED_MAP_SERVICE: create_replicated_map_proxy, + RINGBUFFER_SERVICE: create_ringbuffer_proxy, VECTOR_SERVICE: create_vector_collection_proxy, } diff --git a/hazelcast/internal/asyncio_proxy/ringbuffer.py b/hazelcast/internal/asyncio_proxy/ringbuffer.py new file mode 100644 index 0000000000..77021e6162 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/ringbuffer.py @@ -0,0 +1,299 @@ +import typing + +from hazelcast.protocol.codec import ( + ringbuffer_add_all_codec, + ringbuffer_add_codec, + ringbuffer_capacity_codec, + ringbuffer_head_sequence_codec, + ringbuffer_read_many_codec, + ringbuffer_read_one_codec, + ringbuffer_remaining_capacity_codec, + ringbuffer_size_codec, + ringbuffer_tail_sequence_codec, +) +from hazelcast.internal.asyncio_proxy.base import PartitionSpecificProxy +from hazelcast.proxy.ringbuffer import ReadResult, OVERFLOW_POLICY_OVERWRITE, MAX_BATCH_SIZE +from hazelcast.types import ItemType +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import ( + check_not_negative, + check_not_none, + check_not_empty, + check_true, + deserialize_list_in_place, +) + + +class Ringbuffer(PartitionSpecificProxy, typing.Generic[ItemType]): + """A Ringbuffer is an append-only data-structure where the content is + stored in a ring like structure. + + A ringbuffer has a capacity so it won't grow beyond that capacity and + endanger the stability of the system. If that capacity is exceeded, then + the oldest item in the ringbuffer is overwritten. The ringbuffer has two + always incrementing sequences: + + - :func:`tail_sequence`: This is the side where the youngest item is found. + So the tail is the side of the ringbuffer where items are added to. + - :func:`head_sequence`: This is the side where the oldest items are found. + So the head is the side where items gets discarded. + + The items in the ringbuffer can be found by a sequence that is in between + (inclusive) the head and tail sequence. + + If data is read from a ringbuffer with a sequence that is smaller than the + head sequence, it means that the data is not available anymore and a + :class:`hazelcast.errors.StaleSequenceError` is thrown. + + A Ringbuffer currently is a replicated, but not partitioned data structure. + So all data is stored in a single partition, similarly to the + :class:`hazelcast.internal.asyncio_proxy.queue.Queue` implementation. + + A Ringbuffer can be used in a way similar to the Queue, but one of the key + differences is that a :func:`hazelcast.internal.asyncio_proxy.queue.Queue.take` + is destructive, meaning that only 1 consumer is able to take an item. + A :func:`read_one` is not destructive, so you can have multiple consumers reading the + same item multiple times. + + Example: + >>> rb = await client.get_ringbuffer("my_ringbuffer") + >>> await rb.add("item") + >>> print("read_one", await rb.read_one(0)) + """ + + def __init__(self, service_name, name, context): + super(Ringbuffer, self).__init__(service_name, name, context) + self._capacity = None + + async def capacity(self) -> int: + """Returns the capacity of this Ringbuffer. + + Returns: + The capacity of Ringbuffer. + """ + if not self._capacity: + + def handler(message): + self._capacity = ringbuffer_capacity_codec.decode_response(message) + return self._capacity + + request = ringbuffer_capacity_codec.encode_request(self.name) + return await self._invoke(request, handler) + + return self._capacity + + async def size(self) -> int: + """Returns number of items in the Ringbuffer. + + Returns: + The size of Ringbuffer. + """ + request = ringbuffer_size_codec.encode_request(self.name) + return await self._invoke(request, ringbuffer_size_codec.decode_response) + + async def tail_sequence(self) -> int: + """Returns the sequence of the tail. + + The tail is the side of the Ringbuffer where the items are added to. + The initial value of the tail is ``-1``. + + Returns: + The sequence of the tail. + """ + request = ringbuffer_tail_sequence_codec.encode_request(self.name) + return await self._invoke(request, ringbuffer_tail_sequence_codec.decode_response) + + async def head_sequence(self) -> int: + """Returns the sequence of the head. + + The head is the side of the Ringbuffer where the oldest items in the + Ringbuffer are found. If the Ringbuffer is empty, the head will be one + more than the tail. The initial value of the head is ``0`` (``1`` more + than tail). + + Returns: + The sequence of the head. + """ + request = ringbuffer_head_sequence_codec.encode_request(self.name) + return await self._invoke(request, ringbuffer_head_sequence_codec.decode_response) + + async def remaining_capacity(self) -> int: + """Returns the remaining capacity of the Ringbuffer. + + Returns: + The remaining capacity of Ringbuffer. + """ + request = ringbuffer_remaining_capacity_codec.encode_request(self.name) + return await self._invoke(request, ringbuffer_remaining_capacity_codec.decode_response) + + async def add(self, item, overflow_policy: int = OVERFLOW_POLICY_OVERWRITE) -> int: + """Adds the specified item to the tail of the Ringbuffer. + + If there is no space in the Ringbuffer, the action is determined by + ``overflow_policy``. + + Args: + item: The specified item to be added. + overflow_policy: the OverflowPolicy to be used when there is no + space. + + Returns: + The sequenceId of the added item, or ``-1`` if the add failed. + """ + try: + item_data = self._to_data(item) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.add, item, overflow_policy) + + request = ringbuffer_add_codec.encode_request(self.name, overflow_policy, item_data) + return await self._invoke(request, ringbuffer_add_codec.decode_response) + + async def add_all( + self, + items: typing.Sequence[ItemType], + overflow_policy: int = OVERFLOW_POLICY_OVERWRITE, + ) -> int: + """Adds all of the item in the specified collection to the tail of the + Ringbuffer. + + This is likely to outperform multiple calls to :func:`add` due + to better io utilization and a reduced number of executed operations. + The items are added in the order of the Iterator of the collection. + + If there is no space in the Ringbuffer, the action is determined by + ``overflow_policy``. + + Args: + items: The specified collection which contains the items to be + added. + overflow_policy: The OverflowPolicy to be used when there is no + space. + + Returns: + The sequenceId of the last written item, or ``-1`` of the last + write is failed. + """ + check_not_empty(items, "items can't be empty") + if len(items) > MAX_BATCH_SIZE: + raise AssertionError("Batch size can't be greater than %d" % MAX_BATCH_SIZE) + + try: + item_data_list = [] + for item in items: + check_not_none(item, "item can't be None") + item_data_list.append(self._to_data(item)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.add_all, items, overflow_policy) + + request = ringbuffer_add_all_codec.encode_request( + self.name, item_data_list, overflow_policy + ) + return await self._invoke(request, ringbuffer_add_all_codec.decode_response) + + async def read_one(self, sequence: int) -> ItemType: + """Reads one item from the Ringbuffer. + + If the sequence is one beyond the current tail, this call blocks until + an item is added. Currently it isn't possible to control how long + this call is going to block. + + Args: + sequence: The sequence of the item to read. + + Returns: + The read item. + """ + check_not_negative(sequence, "sequence can't be smaller than 0") + + def handler(message): + return self._to_object(ringbuffer_read_one_codec.decode_response(message)) + + request = ringbuffer_read_one_codec.encode_request(self.name, sequence) + return await self._invoke(request, handler) + + async def read_many( + self, start_sequence: int, min_count: int, max_count: int, filter: typing.Any = None + ) -> ReadResult: + """Reads a batch of items from the Ringbuffer. + + If the number of available items after the first read item is smaller + than the ``max_count``, these items are returned. So it could be the + number of items read is smaller than the ``max_count``. If there are + less items available than ``min_count``, then this call blocks. + + Warnings: + These blocking calls consume server memory and if there are many + calls, it can be possible to see leaking memory or + ``OutOfMemoryError`` s on the server. + + Reading a batch of items is likely to perform better because less + overhead is involved. + + A filter can be provided to only select items that need to be read. If + the filter is ``None``, all items are read. If the filter is not + ``None``, only items where the filter function returns true are + returned. Using filters is a good way to prevent getting items that + are of no value to the receiver. This reduces the amount of IO and the + number of operations being executed, and can result in a significant + performance improvement. Note that, filtering logic must be defined + on the server-side. + + If the ``start_sequence`` is smaller than the smallest sequence still + available in the Ringbuffer (:func:`head_sequence`), then the smallest + available sequence will be used as the start sequence and the + minimum/maximum number of items will be attempted to be read from there + on. + + If the ``start_sequence`` is bigger than the last available sequence + in the Ringbuffer (:func:`tail_sequence`), then the last available + sequence plus one will be used as the start sequence and the call will + block until further items become available and it can read at least the + minimum number of items. + + Args: + start_sequence: The start sequence of the first item to read. + min_count: The minimum number of items to read. + max_count: The maximum number of items to read. + filter: Filter to select returned elements. + + Returns: + The list of read items. + """ + check_not_negative(start_sequence, "sequence can't be smaller than 0") + check_not_negative(min_count, "min count can't be smaller than 0") + check_true(max_count >= min_count, "max count should be greater or equal to min count") + check_true( + max_count < MAX_BATCH_SIZE, "max count can't be greater than %d" % MAX_BATCH_SIZE + ) + try: + filter_data = self._to_data(filter) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry( + e, self.read_many, start_sequence, min_count, max_count, filter + ) + + # Since the first call to capacity is cached on the client-side, + # doing a capacity check each time should not be a problem. + capacity = await self.capacity() + check_true( + max_count <= capacity, + "max count: %d should be smaller or equal to capacity: %d" % (max_count, capacity), + ) + + request = ringbuffer_read_many_codec.encode_request( + self.name, start_sequence, min_count, max_count, filter_data + ) + + def handler(message): + response = ringbuffer_read_many_codec.decode_response(message) + items = deserialize_list_in_place(response["items"], self._to_object) + read_count = response["read_count"] + next_seq = response["next_seq"] + item_seqs = response["item_seqs"] + return ReadResult(read_count, next_seq, item_seqs, items) + + return await self._invoke(request, handler) + + +async def create_ringbuffer_proxy(service_name, name, context): + return Ringbuffer(service_name, name, context) diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index 73669b04e6..710281c68d 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -152,7 +152,7 @@ class Ringbuffer(PartitionSpecificProxy["BlockingRingbuffer"], typing.Generic[It stored in a ring like structure. A ringbuffer has a capacity so it won't grow beyond that capacity and - endanger the stability of the system. If that capacity is exceeded, than + endanger the stability of the system. If that capacity is exceeded, then the oldest item in the ringbuffer is overwritten. The ringbuffer has two always incrementing sequences: From 4cfaaaa208af1db71323d850d2fdcbd98f91fdc8 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 1 Apr 2026 16:15:06 +0300 Subject: [PATCH 02/10] Some doc improvements --- .../internal/asyncio_proxy/ringbuffer.py | 17 +++++++------- hazelcast/proxy/ringbuffer.py | 23 ++++++++----------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/hazelcast/internal/asyncio_proxy/ringbuffer.py b/hazelcast/internal/asyncio_proxy/ringbuffer.py index 77021e6162..da1dcf5471 100644 --- a/hazelcast/internal/asyncio_proxy/ringbuffer.py +++ b/hazelcast/internal/asyncio_proxy/ringbuffer.py @@ -153,7 +153,7 @@ async def add_all( items: typing.Sequence[ItemType], overflow_policy: int = OVERFLOW_POLICY_OVERWRITE, ) -> int: - """Adds all of the item in the specified collection to the tail of the + """Adds all items in the specified collection to the tail of the Ringbuffer. This is likely to outperform multiple calls to :func:`add` due @@ -194,7 +194,7 @@ async def read_one(self, sequence: int) -> ItemType: """Reads one item from the Ringbuffer. If the sequence is one beyond the current tail, this call blocks until - an item is added. Currently it isn't possible to control how long + an item is added. Currently, it isn't possible to control how long this call is going to block. Args: @@ -217,21 +217,20 @@ async def read_many( """Reads a batch of items from the Ringbuffer. If the number of available items after the first read item is smaller - than the ``max_count``, these items are returned. So it could be the - number of items read is smaller than the ``max_count``. If there are - less items available than ``min_count``, then this call blocks. + than ``max_count``, these items are returned. So, number of items + read may be smaller than ``max_count``. If there are + fewer items available than ``min_count``, then this call blocks. Warnings: These blocking calls consume server memory and if there are many - calls, it can be possible to see leaking memory or - ``OutOfMemoryError`` s on the server. + calls, an ``OutOfMemoryError`` may be thrown on server-side. Reading a batch of items is likely to perform better because less overhead is involved. - A filter can be provided to only select items that need to be read. If + A filter can be provided to select items that need to be read. If the filter is ``None``, all items are read. If the filter is not - ``None``, only items where the filter function returns true are + ``None``, items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index 710281c68d..94bcec17bd 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -271,7 +271,7 @@ def add_all( items: typing.Sequence[ItemType], overflow_policy: int = OVERFLOW_POLICY_OVERWRITE, ) -> Future[int]: - """Adds all of the item in the specified collection to the tail of the + """Adds all items in the specified collection to the tail of the Ringbuffer. This is likely to outperform multiple calls to :func:`add` due @@ -312,7 +312,7 @@ def read_one(self, sequence: int) -> Future[ItemType]: """Reads one item from the Ringbuffer. If the sequence is one beyond the current tail, this call blocks until - an item is added. Currently it isn't possible to control how long + an item is added. Currently, it isn't possible to control how long this call is going to block. Args: @@ -335,21 +335,20 @@ def read_many( """Reads a batch of items from the Ringbuffer. If the number of available items after the first read item is smaller - than the ``max_count``, these items are returned. So it could be the - number of items read is smaller than the ``max_count``. If there are - less items available than ``min_count``, then this call blocks. + than ``max_count``, these items are returned. So, number of items + read may be smaller than ``max_count``. If there are + fewer items available than ``min_count``, then this call blocks. Warnings: These blocking calls consume server memory and if there are many - calls, it can be possible to see leaking memory or - ``OutOfMemoryError`` s on the server. + calls, an ``OutOfMemoryError`` may be thrown on server-side. Reading a batch of items is likely to perform better because less overhead is involved. - A filter can be provided to only select items that need to be read. If + A filter can be provided to select items that need to be read. If the filter is ``None``, all items are read. If the filter is not - ``None``, only items where the filter function returns true are + ``None``, items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant @@ -404,10 +403,8 @@ def handler(message): return ReadResult(read_count, next_seq, item_seqs, items) def continuation(future): - # Since the first call to capacity - # is cached on the client-side, doing - # a capacity check each time should not - # be a problem + # Since the first call to capacity is cached on the client-side, + # doing a capacity check each time should not be a problem. capacity = future.result() check_true( From 14e751cac91a6a58ee03bc7825ce1e53b08cacca Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Thu, 2 Apr 2026 08:50:49 +0300 Subject: [PATCH 03/10] Ported ringbuffer test --- .../asyncio/proxy/ringbuffer_test.py | 307 ++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 tests/integration/asyncio/proxy/ringbuffer_test.py diff --git a/tests/integration/asyncio/proxy/ringbuffer_test.py b/tests/integration/asyncio/proxy/ringbuffer_test.py new file mode 100644 index 0000000000..8c5ea7ae3b --- /dev/null +++ b/tests/integration/asyncio/proxy/ringbuffer_test.py @@ -0,0 +1,307 @@ +import asyncio +import os +import unittest + +from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, MAX_BATCH_SIZE +from hazelcast.serialization.api import IdentifiedDataSerializable +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.util import random_string, compare_client_version + +CAPACITY = 10 + + +class RingBufferTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + xml_path = os.path.join( + dir_path, "../../backward_compatible/proxy/hazelcast.xml" + ) + with open(xml_path) as f: + return f.read() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.ringbuffer = await self.client.get_ringbuffer( + "ClientRingbufferTestWithTTL-" + random_string() + ) + + async def asyncTearDown(self): + await self.ringbuffer.destroy() + await super().asyncTearDown() + + async def test_capacity(self): + self.assertEqual(await self.ringbuffer.capacity(), CAPACITY) + + async def test_add_size(self): + self.assertEqual(0, await self.ringbuffer.add("value")) + self.assertEqual(1, await self.ringbuffer.add("value")) + self.assertEqual(2, await self.ringbuffer.add("value")) + + self.assertEqual(3, await self.ringbuffer.size()) + + async def test_add_when_full(self): + await self.fill_ringbuffer() + + self.assertEqual(-1, await self.ringbuffer.add(CAPACITY + 1, OVERFLOW_POLICY_FAIL)) + + async def test_add_all(self): + self.assertEqual(CAPACITY - 1, await self.ringbuffer.add_all(list(range(0, CAPACITY)))) + + async def test_add_all_when_full(self): + self.assertEqual( + -1, await self.ringbuffer.add_all(list(range(0, CAPACITY * 2)), OVERFLOW_POLICY_FAIL) + ) + + async def test_add_all_when_empty_list(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.add_all([]) + + async def test_add_all_when_too_large_batch(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.add_all(list(range(0, MAX_BATCH_SIZE + 1))) + + async def test_head_sequence(self): + await self.fill_ringbuffer(CAPACITY * 2) + + self.assertEqual(CAPACITY, await self.ringbuffer.head_sequence()) + + async def test_tail_sequence(self): + await self.fill_ringbuffer(CAPACITY * 2) + + self.assertEqual(CAPACITY * 2 - 1, await self.ringbuffer.tail_sequence()) + + async def test_remaining_capacity(self): + await self.fill_ringbuffer(CAPACITY // 2) + + self.assertEqual(CAPACITY // 2, await self.ringbuffer.remaining_capacity()) + + async def test_read_one(self): + await self.ringbuffer.add("item") + await self.ringbuffer.add("item-2") + await self.ringbuffer.add("item-3") + self.assertEqual("item", await self.ringbuffer.read_one(0)) + self.assertEqual("item-2", await self.ringbuffer.read_one(1)) + self.assertEqual("item-3", await self.ringbuffer.read_one(2)) + + async def test_read_one_negative_sequence(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_one(-1) + + async def test_read_many(self): + await self.fill_ringbuffer(CAPACITY) + items = await self.ringbuffer.read_many(0, 0, CAPACITY) + self.assertEqual(items, list(range(0, CAPACITY))) + + async def test_read_many_when_negative_start_seq(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_many(-1, 0, CAPACITY) + + async def test_read_many_when_min_count_greater_than_max_count(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_many(0, CAPACITY, 0) + + async def test_read_many_when_min_count_greater_than_capacity(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_many(0, CAPACITY + 1, CAPACITY + 1) + + async def test_read_many_when_max_count_greater_than_batch_size(self): + with self.assertRaises(AssertionError): + await self.ringbuffer.read_many(0, 0, MAX_BATCH_SIZE + 1) + + async def fill_ringbuffer(self, n=CAPACITY): + for x in range(0, n): + await self.ringbuffer.add(x) + + async def test_str(self): + self.assertTrue(str(self.ringbuffer).startswith("Ringbuffer")) + + +@unittest.skipIf( + compare_client_version("4.1") < 0, "Tests the features added in 4.1 version of the client" +) +class RingbufferReadManyTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + xml_path = os.path.join( + dir_path, "../../backward_compatible/proxy/hazelcast.xml" + ) + with open(xml_path) as f: + return f.read() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.ringbuffer = await self.client.get_ringbuffer( + "ClientRingbufferTestWithTTL-" + random_string() + ) + + async def asyncTearDown(self): + await self.ringbuffer.destroy() + await super().asyncTearDown() + + async def test_when_start_sequence_is_no_longer_available_gets_clamped(self): + await self.fill_ringbuffer(item_count=CAPACITY + 1) + + result_set = await self.ringbuffer.read_many(0, 1, CAPACITY) + self.assertEqual(CAPACITY, result_set.read_count) + self.assertEqual(CAPACITY, result_set.size) + self.assertEqual(CAPACITY + 1, result_set.next_sequence_to_read_from) + + for i in range(1, CAPACITY + 1): + self.assertEqual(i, result_set[i - 1]) + self.assertEqual(i, result_set.get_sequence(i - 1)) + + async def test_when_start_sequence_is_equal_to_tail_sequence(self): + await self.fill_ringbuffer() + + result_set = await self.ringbuffer.read_many(CAPACITY - 1, 1, CAPACITY) + self.assertEqual(1, result_set.read_count) + self.assertEqual(1, result_set.size) + self.assertEqual(CAPACITY, result_set.next_sequence_to_read_from) + self.assertEqual(CAPACITY - 1, result_set[0]) + self.assertEqual(CAPACITY - 1, result_set.get_sequence(0)) + + async def test_when_start_sequence_is_beyond_tail_sequence_then_blocks(self): + await self.fill_ringbuffer() + + task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY + 1, 1, CAPACITY)) + await asyncio.sleep(0.5) + self.assertFalse(task.done()) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + async def test_when_min_count_items_are_not_available_then_blocks(self): + await self.fill_ringbuffer() + + task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY - 1, 2, 3)) + await asyncio.sleep(0.5) + self.assertFalse(task.done()) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + async def test_when_some_waiting_needed(self): + await self.fill_ringbuffer() + + task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY - 1, 2, 3)) + await asyncio.sleep(0.5) + self.assertFalse(task.done()) + + await self.ringbuffer.add(CAPACITY) + + await self.assertTrueEventually(lambda: self.assertTrue(task.done())) + + result_set = task.result() + self.assertEqual(2, result_set.read_count) + self.assertEqual(2, result_set.size) + self.assertEqual(CAPACITY + 1, result_set.next_sequence_to_read_from) + self.assertEqual(CAPACITY - 1, result_set[0]) + self.assertEqual(CAPACITY - 1, result_set.get_sequence(0)) + self.assertEqual(CAPACITY, result_set[1]) + self.assertEqual(CAPACITY, result_set.get_sequence(1)) + + async def test_min_zero_when_item_available(self): + await self.fill_ringbuffer() + + result_set = await self.ringbuffer.read_many(0, 0, 1) + + self.assertEqual(1, result_set.read_count) + self.assertEqual(1, result_set.size) + + async def test_min_zero_when_no_item_available(self): + result_set = await self.ringbuffer.read_many(0, 0, 1) + + self.assertEqual(0, result_set.read_count) + self.assertEqual(0, result_set.size) + + async def test_max_count(self): + # If more results are available than needed, the surplus results + # should not be read. + await self.fill_ringbuffer() + + max_count = CAPACITY // 2 + result_set = await self.ringbuffer.read_many(0, 0, max_count) + self.assertEqual(max_count, result_set.read_count) + self.assertEqual(max_count, result_set.size) + self.assertEqual(max_count, result_set.next_sequence_to_read_from) + + for i in range(max_count): + self.assertEqual(i, result_set[i]) + self.assertEqual(i, result_set.get_sequence(i)) + + async def test_filter(self): + def item_factory(i): + if i % 2 == 0: + return "good%s" % i + return "bad%s" % i + + await self.fill_ringbuffer(item_factory) + + expected_size = CAPACITY // 2 + + result_set = await self.ringbuffer.read_many(0, 0, CAPACITY, PrefixFilter("good")) + self.assertEqual(CAPACITY, result_set.read_count) + self.assertEqual(expected_size, result_set.size) + self.assertEqual(CAPACITY, result_set.next_sequence_to_read_from) + + for i in range(expected_size): + self.assertEqual(item_factory(i * 2), result_set[i]) + self.assertEqual(i * 2, result_set.get_sequence(i)) + + async def test_filter_with_max_count(self): + def item_factory(i): + if i % 2 == 0: + return "good%s" % i + return "bad%s" % i + + await self.fill_ringbuffer(item_factory) + + expected_size = 3 + + result_set = await self.ringbuffer.read_many(0, 0, expected_size, PrefixFilter("good")) + self.assertEqual(expected_size * 2 - 1, result_set.read_count) + self.assertEqual(expected_size, result_set.size) + self.assertEqual(expected_size * 2 - 1, result_set.next_sequence_to_read_from) + + for i in range(expected_size): + self.assertEqual(item_factory(i * 2), result_set[i]) + self.assertEqual(i * 2, result_set.get_sequence(i)) + + async def fill_ringbuffer(self, item_factory=lambda i: i, item_count=CAPACITY): + for i in range(0, item_count): + await self.ringbuffer.add(item_factory(i)) + + +class PrefixFilter(IdentifiedDataSerializable): + def __init__(self, prefix): + self.prefix = prefix + + def write_data(self, object_data_output): + object_data_output.write_string(self.prefix) + + def read_data(self, object_data_input): + self.prefix = object_data_input.read_string() + + def get_factory_id(self): + return 666 + + def get_class_id(self): + return 14 From bc258ca1667c0999f5d8fc4d54c90cdb8bdd589e Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Thu, 2 Apr 2026 08:56:01 +0300 Subject: [PATCH 04/10] black --- tests/integration/asyncio/proxy/ringbuffer_test.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/integration/asyncio/proxy/ringbuffer_test.py b/tests/integration/asyncio/proxy/ringbuffer_test.py index 8c5ea7ae3b..76cc72e31e 100644 --- a/tests/integration/asyncio/proxy/ringbuffer_test.py +++ b/tests/integration/asyncio/proxy/ringbuffer_test.py @@ -20,9 +20,7 @@ def configure_client(cls, config): def configure_cluster(cls): path = os.path.abspath(__file__) dir_path = os.path.dirname(path) - xml_path = os.path.join( - dir_path, "../../backward_compatible/proxy/hazelcast.xml" - ) + xml_path = os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast.xml") with open(xml_path) as f: return f.read() @@ -136,9 +134,7 @@ def configure_client(cls, config): def configure_cluster(cls): path = os.path.abspath(__file__) dir_path = os.path.dirname(path) - xml_path = os.path.join( - dir_path, "../../backward_compatible/proxy/hazelcast.xml" - ) + xml_path = os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast.xml") with open(xml_path) as f: return f.read() From 26973e6d0f223b1125d5e4555bf2c2f1063edeee Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Thu, 2 Apr 2026 09:21:33 +0300 Subject: [PATCH 05/10] deal with task cancelation in another PR --- tests/integration/asyncio/proxy/ringbuffer_test.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/integration/asyncio/proxy/ringbuffer_test.py b/tests/integration/asyncio/proxy/ringbuffer_test.py index 76cc72e31e..5b113b5a8b 100644 --- a/tests/integration/asyncio/proxy/ringbuffer_test.py +++ b/tests/integration/asyncio/proxy/ringbuffer_test.py @@ -176,11 +176,6 @@ async def test_when_start_sequence_is_beyond_tail_sequence_then_blocks(self): task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY + 1, 1, CAPACITY)) await asyncio.sleep(0.5) self.assertFalse(task.done()) - task.cancel() - try: - await task - except asyncio.CancelledError: - pass async def test_when_min_count_items_are_not_available_then_blocks(self): await self.fill_ringbuffer() @@ -188,11 +183,6 @@ async def test_when_min_count_items_are_not_available_then_blocks(self): task = asyncio.create_task(self.ringbuffer.read_many(CAPACITY - 1, 2, 3)) await asyncio.sleep(0.5) self.assertFalse(task.done()) - task.cancel() - try: - await task - except asyncio.CancelledError: - pass async def test_when_some_waiting_needed(self): await self.fill_ringbuffer() From 9247fa345c8a6de6ac18325670b8ca66600b11f9 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 8 Apr 2026 14:53:09 +0300 Subject: [PATCH 06/10] Update --- hazelcast/asyncio/__init__.py | 1 + hazelcast/core.py | 4 +-- hazelcast/internal/asyncio_client.py | 13 +++++++ hazelcast/internal/asyncio_proxy/manager.py | 38 ++++++++++++++------- hazelcast/proxy/base.py | 2 +- 5 files changed, 42 insertions(+), 16 deletions(-) diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 89df6eea46..2781b856bc 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -17,3 +17,4 @@ from hazelcast.internal.asyncio_proxy.map import Map, EntryEventCallable from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection +from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener diff --git a/hazelcast/core.py b/hazelcast/core.py index 3443759e44..370322c9ab 100644 --- a/hazelcast/core.py +++ b/hazelcast/core.py @@ -20,8 +20,8 @@ class MemberInfo: def __init__( self, address: "Address", - member_uuid: uuid.UUID, - attributes: typing.Dict[str, str], + member_uuid: uuid.UUID|None, + attributes: typing.Dict[str, str]|None, lite_member: bool, version: "MemberVersion", _, diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index acc3fcfe79..548ee6433c 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -26,6 +26,7 @@ MAP_SERVICE, MULTI_MAP_SERVICE, ProxyManager, + RELIABLE_TOPIC_SERVICE, REPLICATED_MAP_SERVICE, RINGBUFFER_SERVICE, VECTOR_SERVICE, @@ -33,6 +34,7 @@ from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.multi_map import MultiMap +from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap from hazelcast.internal.asyncio_proxy.ringbuffer import Ringbuffer from hazelcast.internal.asyncio_reactor import AsyncioReactor @@ -299,6 +301,17 @@ async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueTyp """ return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name) + async def get_reliable_topic(self, name: str) -> ReliableTopic: + """Returns the ReliableTopic instance with the specified name. + + Args: + name: Name of the ReliableTopic. + + Returns: + Distributed ReliableTopic instance with the specified name. + """ + return await self._proxy_manager.get_or_create(RELIABLE_TOPIC_SERVICE, name) + async def get_ringbuffer(self, name: str) -> Ringbuffer: """Returns the distributed Ringbuffer instance with the specified name. diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index db36b38290..2439d58c42 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -4,36 +4,26 @@ from hazelcast.internal.asyncio_proxy.list import create_list_proxy from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy from hazelcast.internal.asyncio_proxy.vector_collection import ( - VectorCollection, create_vector_collection_proxy, ) from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.base import Proxy from hazelcast.internal.asyncio_proxy.map import create_map_proxy +from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic from hazelcast.internal.asyncio_proxy.replicated_map import create_replicated_map_proxy from hazelcast.internal.asyncio_proxy.ringbuffer import create_ringbuffer_proxy +from hazelcast.proxy.reliable_topic import _RINGBUFFER_PREFIX from hazelcast.util import to_list LIST_SERVICE = "hz:impl:listService" MAP_SERVICE = "hz:impl:mapService" MULTI_MAP_SERVICE = "hz:impl:multiMapService" +RELIABLE_TOPIC_SERVICE = "hz:impl:reliableTopicService" REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" RINGBUFFER_SERVICE = "hz:impl:ringbufferService" VECTOR_SERVICE = "hz:service:vector" -_proxy_init: typing.Dict[ - str, - typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]], -] = { - LIST_SERVICE: create_list_proxy, - MAP_SERVICE: create_map_proxy, - MULTI_MAP_SERVICE: create_multi_map_proxy, - REPLICATED_MAP_SERVICE: create_replicated_map_proxy, - RINGBUFFER_SERVICE: create_ringbuffer_proxy, - VECTOR_SERVICE: create_vector_collection_proxy, -} - class ProxyManager: def __init__(self, context): @@ -86,3 +76,25 @@ async def destroy_proxy(self, service_name, name, destroy_on_remote=True): def get_distributed_objects(self): return to_list(v for v in self._proxies.values() if not isinstance(v, asyncio.Future)) + + +async def create_reliable_topic_proxy(service_name, name, context): + ringbuffer = await context.proxy_manager.get_or_create( + RINGBUFFER_SERVICE, _RINGBUFFER_PREFIX + name, create_on_remote=False + ) + return ReliableTopic(service_name, name, context, ringbuffer) + + + +_proxy_init: typing.Dict[ + str, + typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]], +] = { + LIST_SERVICE: create_list_proxy, + MAP_SERVICE: create_map_proxy, + MULTI_MAP_SERVICE: create_multi_map_proxy, + RELIABLE_TOPIC_SERVICE: create_reliable_topic_proxy, + REPLICATED_MAP_SERVICE: create_replicated_map_proxy, + RINGBUFFER_SERVICE: create_ringbuffer_proxy, + VECTOR_SERVICE: create_vector_collection_proxy, +} diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index 5c961ae9a9..71e984ff74 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -266,7 +266,7 @@ class TopicMessage(typing.Generic[MessageType]): __slots__ = ("name", "message", "publish_time", "member") - def __init__(self, name: str, message: MessageType, publish_time: int, member: MemberInfo): + def __init__(self, name: str, message: MessageType, publish_time: int, member: MemberInfo|None): self.name = name self.message = message self.publish_time = publish_time From 495ddc199ec207754ffd57c4cad4483f23bb7069 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 8 Apr 2026 14:54:54 +0300 Subject: [PATCH 07/10] Added the asyncio ReliableTopic --- hazelcast/core.py | 4 +- hazelcast/internal/asyncio_proxy/manager.py | 1 - .../internal/asyncio_proxy/reliable_topic.py | 567 ++++++++++++++++ hazelcast/proxy/base.py | 4 +- .../asyncio/proxy/reliable_topic_test.py | 605 ++++++++++++++++++ 5 files changed, 1177 insertions(+), 4 deletions(-) create mode 100644 hazelcast/internal/asyncio_proxy/reliable_topic.py create mode 100644 tests/integration/asyncio/proxy/reliable_topic_test.py diff --git a/hazelcast/core.py b/hazelcast/core.py index 370322c9ab..7063c66db0 100644 --- a/hazelcast/core.py +++ b/hazelcast/core.py @@ -20,8 +20,8 @@ class MemberInfo: def __init__( self, address: "Address", - member_uuid: uuid.UUID|None, - attributes: typing.Dict[str, str]|None, + member_uuid: uuid.UUID | None, + attributes: typing.Dict[str, str] | None, lite_member: bool, version: "MemberVersion", _, diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 2439d58c42..6a7ff21932 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -85,7 +85,6 @@ async def create_reliable_topic_proxy(service_name, name, context): return ReliableTopic(service_name, name, context, ringbuffer) - _proxy_init: typing.Dict[ str, typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]], diff --git a/hazelcast/internal/asyncio_proxy/reliable_topic.py b/hazelcast/internal/asyncio_proxy/reliable_topic.py new file mode 100644 index 0000000000..4bf6ab6e17 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/reliable_topic.py @@ -0,0 +1,567 @@ +import asyncio +import logging +import time +import typing +from uuid import uuid4 + +from hazelcast.config import ReliableTopicConfig, TopicOverloadPolicy +from hazelcast.core import MemberInfo, MemberVersion, EndpointQualifier, ProtocolType +from hazelcast.errors import ( + OperationTimeoutError, + IllegalArgumentError, + HazelcastClientNotActiveError, + ClientOfflineError, + HazelcastInstanceNotActiveError, + DistributedObjectDestroyedError, + TopicOverloadError, +) +from hazelcast.internal.asyncio_proxy.base import Proxy +from hazelcast.proxy.base import TopicMessage +from hazelcast.proxy.reliable_topic import ReliableMessageListener, _ReliableMessageListenerAdapter +from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.serialization.objects import ReliableTopicMessage +from hazelcast.types import MessageType +from hazelcast.util import check_not_none + +_INITIAL_BACKOFF = 0.1 +_MAX_BACKOFF = 2.0 + +_UNKNOWN_MEMBER_VERSION = MemberVersion(0, 0, 0) +_MEMBER_ENDPOINT_QUALIFIER = EndpointQualifier(ProtocolType.MEMBER, None) + +_logger = logging.getLogger(__name__) + + +class _MessageRunner: + def __init__( + self, + registration_id, + listener, + ringbuffer, + topic_name, + read_batch_size, + to_object, + runners, + ): + self._registration_id = registration_id + self._listener = listener + self._ringbuffer = ringbuffer + self._topic_name = topic_name + self._read_batch_size = read_batch_size + self._to_object = to_object + self._runners = runners + self._sequence = listener.retrieve_initial_sequence() + self._cancelled = False + self._task: asyncio.Task | None = None + + async def start(self): + """Starts the message runner by checking the given sequence. + + If the user provided an initial sequence via listener, we will + use it as it is. If not, we will ask server to get the tail + sequence and use it. + """ + if self._sequence != -1: + # User provided a sequence to start from + return + + # We are going to listen to next publication. + # We don't care about what already has been published. + sequence = await self._ringbuffer.tail_sequence() + self._sequence = sequence + 1 + + def next_batch(self): + """Schedules an asyncio task to read the next batch from the + ringbuffer and call the listener on items when it is done. + """ + if self._cancelled: + return + self._task = asyncio.create_task(self._run_next_batch()) + + async def _run_next_batch(self): + """Reads the next batch from the ringbuffer and processes the items.""" + if self._cancelled: + return + + try: + result = await self._ringbuffer.read_many(self._sequence, 1, self._read_batch_size) + + # Check if there are any messages lost since the last read + # and whether the listener can tolerate that. + lost_count = (result.next_sequence_to_read_from - result.read_count) - self._sequence + if lost_count != 0 and not self._is_loss_tolerable(lost_count): + self.cancel() + return + + # Call the listener for each item read. + for i in range(result.size): + try: + message = result[i] + self._listener.store_sequence(result.get_sequence(i)) + + member = None + if message.publisher_address: + member = MemberInfo( + message.publisher_address, + None, + None, + False, + _UNKNOWN_MEMBER_VERSION, + None, + { + _MEMBER_ENDPOINT_QUALIFIER: message.publisher_address, + }, + ) + + topic_message = TopicMessage( + self._topic_name, + message.payload, + message.publish_time, + member, + ) + self._listener.on_message(topic_message) + except Exception as e: + if self._terminate(e): + self.cancel() + return + + self._sequence = result.next_sequence_to_read_from + self.next_batch() + except asyncio.CancelledError: + pass + except Exception as e: + # read_many request failed. + if not await self._handle_internal_error(e): + self.cancel() + + def cancel(self): + """Sets the cancelled flag, cancels the running task, and removes + the runner registration. + """ + self._cancelled = True + self._runners.pop(self._registration_id, None) + # if self._task is not None and not self._task.done(): + # self._task.cancel() + self._listener.on_cancel() + + def _is_loss_tolerable(self, loss_count: int) -> bool: + """Called when message loss is detected. + + Checks if the listener is able to tolerate the loss. + + Args: + loss_count: Number of lost messages. + + Returns: + ``True`` if the listener may continue reading. + """ + if self._listener.is_loss_tolerant(): + _logger.debug( + "MessageListener %s on topic %s lost %s messages.", + self._listener, + self._topic_name, + loss_count, + ) + return True + + _logger.warning( + "Terminating MessageListener %s on topic %s. " + "Reason: The listener was too slow or the retention period of the message has been violated. " + "%s messages lost.", + self._listener, + self._topic_name, + loss_count, + ) + return False + + def _terminate(self, error: Exception) -> bool: + """Checks if we should terminate the listener based on the error + received while calling on_message. + + Args: + error: Error received while calling the listener. + + Returns: + Should terminate the listener or not. + """ + if self._cancelled: + return True + + try: + terminate = self._listener.is_terminal(error) + if terminate: + _logger.warning( + "Terminating MessageListener %s on topic %s. Reason: Unhandled exception.", + self._listener, + self._topic_name, + exc_info=error, + ) + else: + _logger.debug( + "MessageListener %s on topic %s ran into an error.", + self._listener, + self._topic_name, + exc_info=error, + ) + return terminate + except Exception as e: + _logger.warning( + "Terminating MessageListener %s on topic %s. " + "Reason: Unhandled exception while calling is_terminal method", + self._listener, + self._topic_name, + exc_info=e, + ) + return True + + async def _handle_internal_error(self, error: Exception) -> bool: + """Called when the read_many request fails. + + Based on the error we receive, we will act differently. + + If we can tolerate the error, we will call next_batch here. + The reasoning behind is that, on some cases, we do not immediately + call next_batch, but make a request to the server, and based on + that, call next_batch. + + Args: + error: The error we received. + + Returns: + ``True`` if the error is handled internally. ``False`` otherwise. + When ``False`` is returned, listener should be cancelled. + """ + if isinstance(error, HazelcastClientNotActiveError): + return self._handle_client_not_active_error() + elif isinstance(error, ClientOfflineError): + return self._handle_client_offline_error() + elif isinstance(error, OperationTimeoutError): + return self._handle_timeout_error() + elif isinstance(error, IllegalArgumentError): + return await self._handle_illegal_argument_error(error) + elif isinstance(error, HazelcastInstanceNotActiveError): + return self._handle_instance_not_active_error() + elif isinstance(error, DistributedObjectDestroyedError): + return self._handle_distributed_object_destroyed_error() + else: + return self._handle_generic_error(error) + + def _handle_generic_error(self, error): + # Received an error we do not expect. + _logger.warning( + "Terminating MessageListener %s on topic %s. Reason: Unhandled exception.", + self._listener, + self._topic_name, + exc_info=error, + ) + return False + + def _handle_distributed_object_destroyed_error(self): + # Underlying ringbuffer is destroyed. It should only + # happen when the user destroys the reliable topic + # associated with it. + _logger.debug( + "Terminating MessageListener %s on topic %s. Reason: Topic is destroyed.", + self._listener, + self._topic_name, + ) + return False + + def _handle_instance_not_active_error(self): + # This error should be received from the server. + # We do not throw it anywhere on the client. + _logger.debug( + "Terminating MessageListener %s on topic %s. Reason: Server is shutting down.", + self._listener, + self._topic_name, + ) + return False + + def _handle_client_offline_error(self): + # Client is reconnecting to cluster. + _logger.debug( + "MessageListener %s on topic %s got error. " + "Continuing from the last known sequence %s.", + self._listener, + self._topic_name, + self._sequence, + ) + self.next_batch() + return True + + def _handle_client_not_active_error(self): + # Client#shutdown is called. + _logger.debug( + "Terminating MessageListener %s on topic %s. Reason: Client is shutting down.", + self._listener, + self._topic_name, + ) + return False + + def _handle_timeout_error(self): + # read_many invocation to the server timed out. + _logger.debug( + "MessageListener %s on topic %s timed out. " + "Continuing from the last known sequence %s.", + self._listener, + self._topic_name, + self._sequence, + ) + self.next_batch() + return True + + async def _handle_illegal_argument_error(self, error): + # Server sends this when it detects data loss + # on the underlying ringbuffer. + if self._listener.is_loss_tolerant(): + # Listener can tolerate message loss. Try to continue reading + # after getting head sequence, and try to read from there. + try: + head_sequence = await self._ringbuffer.head_sequence() + _logger.debug( + "MessageListener %s on topic %s requested a too large sequence. " + "Jumping from old sequence %s to sequence %s.", + self._listener, + self._topic_name, + self._sequence, + head_sequence, + exc_info=error, + ) + self._sequence = head_sequence + # We call next_batch only after getting the new head + # sequence and updating our state with it. + self.next_batch() + except Exception as e: + _logger.warning( + "Terminating MessageListener %s on topic %s. " + "Reason: After the ring buffer data related " + "to reliable topic is lost, client tried to get the " + "current head sequence to continue since the listener " + "is loss tolerant, but that request failed.", + self._listener, + self._topic_name, + exc_info=e, + ) + # We said that we can handle that error so the listener + # is not cancelled. But, we could not continue since + # our request to the server failed. We should cancel + # the listener. + self.cancel() + return True + + _logger.warning( + "Terminating MessageListener %s on topic %s. " + "Reason: Underlying ring buffer data related to reliable topic is lost.", + self._listener, + self._topic_name, + ) + return False + + +class ReliableTopic(Proxy, typing.Generic[MessageType]): + """Hazelcast provides distribution mechanism for publishing messages that + are delivered to multiple subscribers, which is also known as a + publish/subscribe (pub/sub) messaging model. Publish and subscriptions are + cluster-wide. When a member subscribes for a topic, it is actually + registering for messages published by any member in the cluster, including + the new members joined after you added the listener. + + Messages are ordered, meaning that listeners(subscribers) will process the + messages in the order they are actually published. + + Hazelcast's Reliable Topic uses the same Topic interface as a regular topic. + The main difference is that Reliable Topic is backed up by the Ringbuffer + data structure, a replicated but not partitioned data structure that stores + its data in a ring-like structure. + """ + + def __init__(self, service_name, name, context, ringbuffer): + super(ReliableTopic, self).__init__(service_name, name, context) + + config = context.config.reliable_topics.get(name, None) + if config is None: + config = ReliableTopicConfig() + + self._config = config + self._ringbuffer = ringbuffer + self._runners: typing.Dict[str, _MessageRunner] = {} + + async def publish(self, message: MessageType) -> None: + """Publishes the message to all subscribers of this topic. + + Args: + message: The message. + """ + check_not_none(message, "Message cannot be None") + try: + payload = self._to_data(message) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.publish, message) + + topic_message = ReliableTopicMessage(time.time(), None, payload) + + overload_policy = self._config.overload_policy + if overload_policy == TopicOverloadPolicy.BLOCK: + return await self._add_with_backoff(topic_message) + elif overload_policy == TopicOverloadPolicy.ERROR: + return await self._add_or_fail(topic_message) + elif overload_policy == TopicOverloadPolicy.DISCARD_OLDEST: + return await self._add_or_overwrite(topic_message) + elif overload_policy == TopicOverloadPolicy.DISCARD_NEWEST: + return await self._add_or_discard(topic_message) + else: + raise ValueError(f"Unexpected overload policy is passed {overload_policy}") + + async def publish_all(self, messages: typing.Sequence[MessageType]) -> None: + """Publishes all messages to all subscribers of this topic. + + Args: + messages: Messages to publish. + """ + check_not_none(messages, "Messages cannot be None") + try: + topic_messages = [] + for message in messages: + check_not_none(message, "Message cannot be None") + payload = self._to_data(message) + topic_messages.append(ReliableTopicMessage(time.time(), None, payload)) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.publish_all, messages) + + overload_policy = self._config.overload_policy + if overload_policy == TopicOverloadPolicy.BLOCK: + return await self._add_messages_with_backoff(topic_messages) + elif overload_policy == TopicOverloadPolicy.ERROR: + return await self._add_messages_or_fail(topic_messages) + elif overload_policy == TopicOverloadPolicy.DISCARD_OLDEST: + return await self._add_messages_or_overwrite(topic_messages) + elif overload_policy == TopicOverloadPolicy.DISCARD_NEWEST: + return await self._add_messages_or_discard(topic_messages) + else: + raise ValueError(f"Unexpected overload policy is passed {overload_policy}") + + async def add_listener( + self, + listener: typing.Union[ + ReliableMessageListener, typing.Callable[[TopicMessage[MessageType]], None] + ], + ) -> str: + """Subscribes to this reliable topic. + + It can be either a simple function or an instance of an + :class:`ReliableMessageListener`. When a function is passed, a + :class:`ReliableMessageListener` is created out of that with + sensible default values. + + When a message is published, the + :func:`ReliableMessageListener.on_message` method of the given + listener (or the function passed) is called. + + More than one message listener can be added on one instance. + + Args: + listener: Listener to add. + + Returns: + The registration id. + """ + check_not_none(listener, "None listener is not allowed") + registration_id = str(uuid4()) + reliable_message_listener = self._to_reliable_message_listener(listener) + runner = _MessageRunner( + registration_id, + reliable_message_listener, + self._ringbuffer, + self.name, + self._config.read_batch_size, + self._to_object, + self._runners, + ) + await runner.start() + # If the runner started successfully, register it. + self._runners[registration_id] = runner + runner.next_batch() + # ensure the runner is scheduled + await asyncio.sleep(0) + return registration_id + + async def remove_listener(self, registration_id: str) -> bool: + """Stops receiving messages for the given message listener. + + If the given listener already removed, this method does nothing. + + Args: + registration_id: ID of listener registration. + + Returns: + ``True`` if registration is removed, ``False`` otherwise. + """ + check_not_none(registration_id, "Registration id cannot be None") + runner = self._runners.get(registration_id, None) + if not runner: + return False + + runner.cancel() + return True + + async def destroy(self) -> bool: + """Destroys underlying Proxy and RingBuffer instances.""" + for runner in list(self._runners.values()): + runner.cancel() + + self._runners.clear() + await super(ReliableTopic, self).destroy() + return await self._ringbuffer.destroy() + + async def _add_or_fail(self, message): + sequence_id = await self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL) + if sequence_id == -1: + raise TopicOverloadError( + "Failed to publish message %s on topic %s." % (message, self.name) + ) + + async def _add_messages_or_fail(self, messages): + sequence_id = await self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL) + if sequence_id == -1: + raise TopicOverloadError("Failed to publish messages on topic %s." % self.name) + + async def _add_or_overwrite(self, message): + await self._ringbuffer.add(message, OVERFLOW_POLICY_OVERWRITE) + + async def _add_messages_or_overwrite(self, messages): + await self._ringbuffer.add_all(messages, OVERFLOW_POLICY_OVERWRITE) + + async def _add_or_discard(self, message): + await self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL) + + async def _add_messages_or_discard(self, messages): + await self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL) + + async def _add_with_backoff(self, message): + backoff = _INITIAL_BACKOFF + while True: + sequence_id = await self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL) + if sequence_id != -1: + return + await asyncio.sleep(backoff) + backoff = min(_MAX_BACKOFF, 2 * backoff) + + async def _add_messages_with_backoff(self, messages): + backoff = _INITIAL_BACKOFF + while True: + sequence_id = await self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL) + if sequence_id != -1: + return + await asyncio.sleep(backoff) + backoff = min(_MAX_BACKOFF, 2 * backoff) + + @staticmethod + def _to_reliable_message_listener(listener): + if isinstance(listener, ReliableMessageListener): + return listener + + if not callable(listener): + raise TypeError("Listener must be a callable") + + return _ReliableMessageListenerAdapter(listener) diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index 71e984ff74..c2a9e6a04c 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -266,7 +266,9 @@ class TopicMessage(typing.Generic[MessageType]): __slots__ = ("name", "message", "publish_time", "member") - def __init__(self, name: str, message: MessageType, publish_time: int, member: MemberInfo|None): + def __init__( + self, name: str, message: MessageType, publish_time: int, member: MemberInfo | None + ): self.name = name self.message = message self.publish_time = publish_time diff --git a/tests/integration/asyncio/proxy/reliable_topic_test.py b/tests/integration/asyncio/proxy/reliable_topic_test.py new file mode 100644 index 0000000000..e3dd050c10 --- /dev/null +++ b/tests/integration/asyncio/proxy/reliable_topic_test.py @@ -0,0 +1,605 @@ +import asyncio +import os +import unittest +from asyncio import InvalidStateError + +from hazelcast.util import AtomicInteger + +from tests.hzrc.ttypes import Lang + +try: + from hazelcast.config import TopicOverloadPolicy + from hazelcast.errors import ( + TopicOverloadError, + HazelcastClientNotActiveError, + TargetDisconnectedError, + ) + from hazelcast.proxy.reliable_topic import ReliableMessageListener +except ImportError: + # For backward compatibility. If we cannot import those, we won't + # be even referencing them in tests. + pass + +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.util import ( + compare_client_version, + random_string, + event_collector, + get_current_timestamp, + skip_if_client_version_older_than, +) + +CAPACITY = 10 + + +@unittest.skipIf( + compare_client_version("4.1") < 0, "Tests the features added in 4.1 version of the client" +) +class ReliableTopicTest(SingleMemberTestCase): + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open( + os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast_topic.xml") + ) as f: + return f.read() + + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + if not compare_client_version("4.1") < 0: + # Add these config elements only to the 4.1+ clients + # since the older versions do not know anything + # about them. + config["reliable_topics"] = { + "discard": { + "overload_policy": TopicOverloadPolicy.DISCARD_NEWEST, + }, + "overwrite": { + "overload_policy": TopicOverloadPolicy.DISCARD_OLDEST, + }, + "block": { + "overload_policy": TopicOverloadPolicy.BLOCK, + }, + "error": { + "overload_policy": TopicOverloadPolicy.ERROR, + }, + } + return config + + async def asyncSetUp(self): + await super().asyncSetUp() + self.topics = [] + self.topic = await self.get_topic(random_string()) + + async def asyncTearDown(self): + for topic in self.topics: + await topic.destroy() + await super().asyncTearDown() + + async def test_add_listener_with_function(self): + topic = await self.get_topic(random_string()) + + collector = event_collector() + registration_id = await topic.add_listener(collector) + self.assertIsNotNone(registration_id) + + await topic.publish("a") + await topic.publish("b") + + await self.assertTrueEventually( + lambda: self.assertEqual(["a", "b"], list(map(lambda m: m.message, collector.events))) + ) + pass + + async def test_add_listener(self): + topic = await self.get_topic(random_string()) + + messages = [] + + on_cancel_call_count = AtomicInteger() + + class Listener(ReliableMessageListener): + def on_message(self, message): + messages.append(message.message) + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return False + + def on_cancel(self): + on_cancel_call_count.add(1) + + registration_id = await topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + await topic.publish("a") + await topic.publish("b") + + await self.assertTrueEventually(lambda: self.assertEqual(["a", "b"], messages)) + + self.assertEqual(0, on_cancel_call_count.get()) + + async def test_add_listener_with_retrieve_initial_sequence(self): + topic = await self.get_topic(random_string()) + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + messages.append(message.message) + + def retrieve_initial_sequence(self): + return 5 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return False + + await topic.publish_all(range(10)) + + registration_id = await topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + await self.assertTrueEventually(lambda: self.assertEqual(list(range(5, 10)), messages)) + + async def test_add_listener_with_store_sequence(self): + topic = await self.get_topic(random_string()) + + sequences = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + pass + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + sequences.append(sequence) + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return False + + registration_id = await topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + await topic.publish_all(["item-%s" % i for i in range(20)]) + + await self.assertTrueEventually(lambda: self.assertEqual(list(range(20)), sequences)) + + async def test_add_listener_with_loss_tolerant_listener_on_message_loss(self): + topic = await self.get_topic("overwrite") # has capacity of 10 + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + messages.append(message.message) + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return True + + def is_terminal(self, error): + return False + + registration_id = await topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + # will overwrite first 10 messages, hence they will be lost + await topic.publish_all(range(2 * CAPACITY)) + + await self.assertTrueEventually( + lambda: self.assertEqual(list(range(CAPACITY, 2 * CAPACITY)), messages) + ) + + async def test_add_listener_with_non_loss_tolerant_listener_on_message_loss(self): + topic = await self.get_topic("overwrite") # has capacity of 10 + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + messages.append(message.message) + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return False + + registration_id = await topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + # will overwrite first 10 messages, hence they will be lost + await topic.publish_all(range(2 * CAPACITY)) + + self.assertEqual(0, len(messages)) + + # Should be cancelled on message loss + await self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._runners))) + + async def test_add_listener_when_on_message_raises_error(self): + topic = await self.get_topic(random_string()) + + messages = [] + + on_cancel_call_count = AtomicInteger() + + class Listener(ReliableMessageListener): + def on_message(self, message): + message = message.message + if message < 5: + messages.append(message) + else: + raise ValueError("expected") + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return isinstance(error, ValueError) + + def on_cancel(self) -> None: + on_cancel_call_count.add(1) + + registration_id = await topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + await topic.publish_all(range(10)) + + await self.assertTrueEventually(lambda: self.assertEqual(list(range(5)), messages)) + + # Should be cancelled since on_message raised error + await self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._runners))) + + if compare_client_version("5.4") >= 0: + self.assertEqual(1, on_cancel_call_count.get()) + + async def test_add_listener_when_on_message_and_is_terminal_raises_error(self): + topic = await self.get_topic(random_string()) + + messages = [] + + on_cancel_call_count = AtomicInteger() + + class Listener(ReliableMessageListener): + def on_message(self, message): + message = message.message + if message < 5: + messages.append(message) + else: + raise ValueError("expected") + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + raise error + + def on_cancel(self) -> None: + on_cancel_call_count.add(1) + + registration_id = await topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + await topic.publish_all(range(10)) + + await self.assertTrueEventually(lambda: self.assertEqual(list(range(5)), messages)) + + # Should be cancelled since on_message raised error + await self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._runners))) + + if compare_client_version("5.4") >= 0: + self.assertEqual(1, on_cancel_call_count.get()) + + async def test_add_listener_with_non_callable(self): + topic = await self.get_topic(random_string()) + with self.assertRaises(TypeError): + await topic.add_listener(3) + + async def test_remove_listener(self): + topic = await self.get_topic(random_string()) + + on_cancel_call_count = AtomicInteger() + + class Listener(ReliableMessageListener): + def on_message(self, message) -> None: + pass + + def retrieve_initial_sequence(self) -> int: + return -1 + + def store_sequence(self, sequence: int) -> None: + pass + + def is_loss_tolerant(self) -> bool: + pass + + def is_terminal(self, error: Exception) -> bool: + pass + + def on_cancel(self) -> None: + on_cancel_call_count.add(1) + + registration_id = await topic.add_listener(Listener()) + self.assertTrue(await topic.remove_listener(registration_id)) + if compare_client_version("5.4") >= 0: + self.assertEqual(1, on_cancel_call_count.get()) + + async def test_remove_listener_does_not_receive_messages_after_removal(self): + topic = await self.get_topic(random_string()) + + collector = event_collector() + registration_id = await topic.add_listener(collector) + self.assertTrue(await topic.remove_listener(registration_id)) + + await topic.publish_all(range(10)) + + self.assertEqual(0, len(collector.events)) + + async def test_remove_listener_twice(self): + topic = await self.get_topic(random_string()) + registration_id = await topic.add_listener(lambda m: m) + self.assertTrue(await topic.remove_listener(registration_id)) + self.assertFalse(await topic.remove_listener(registration_id)) + + async def test_publish_with_discard_newest_policy(self): + topic = await self.get_topic("discard") + + collector = event_collector() + await topic.add_listener(collector) + + for i in range(2 * CAPACITY): + await topic.publish(i) + + await self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY)), await self.get_ringbuffer_data(topic)) + + async def test_publish_with_discard_oldest_policy(self): + topic = await self.get_topic("overwrite") + + collector = event_collector() + await topic.add_listener(collector) + + for i in range(2 * CAPACITY): + await topic.publish(i) + + await self.assertTrueEventually( + lambda: self.assertEqual(2 * CAPACITY, len(collector.events)) + ) + self.assertEqual(list(range(CAPACITY, 2 * CAPACITY)), await self.get_ringbuffer_data(topic)) + + async def test_publish_with_block_policy(self): + topic = await self.get_topic("block") + + collector = event_collector() + await topic.add_listener(collector) + + for i in range(CAPACITY): + await topic.publish(i) + + begin_time = get_current_timestamp() + + for i in range(CAPACITY, 2 * CAPACITY): + await topic.publish(i) + + time_passed = get_current_timestamp() - begin_time + + # TTL is set in the XML config + self.assertTrue(time_passed >= 2.0) + + await self.assertTrueEventually( + lambda: self.assertEqual(2 * CAPACITY, len(collector.events)) + ) + self.assertEqual(list(range(CAPACITY, CAPACITY * 2)), await self.get_ringbuffer_data(topic)) + + async def test_publish_with_error_policy(self): + topic = await self.get_topic("error") + + collector = event_collector() + await topic.add_listener(collector) + + for i in range(CAPACITY): + await topic.publish(i) + + for i in range(CAPACITY, 2 * CAPACITY): + with self.assertRaises(TopicOverloadError): + await topic.publish(i) + + await self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY)), await self.get_ringbuffer_data(topic)) + + async def test_publish_all_with_discard_newest_policy(self): + topic = await self.get_topic("discard") + + collector = event_collector() + await topic.add_listener(collector) + + await topic.publish_all(range(CAPACITY)) + await topic.publish_all(range(CAPACITY, 2 * CAPACITY)) + + await self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY)), await self.get_ringbuffer_data(topic)) + + async def test_publish_all_with_discard_oldest_policy(self): + topic = await self.get_topic("overwrite") + collector = event_collector() + await topic.add_listener(collector) + await topic.publish_all(range(CAPACITY)) + await topic.publish_all(range(CAPACITY, 2 * CAPACITY)) + await self.assertTrueEventually( + lambda: self.assertEqual(2 * CAPACITY, len(collector.events)) + ) + self.assertEqual(list(range(CAPACITY, 2 * CAPACITY)), await self.get_ringbuffer_data(topic)) + + async def test_publish_all_with_block_policy(self): + topic = await self.get_topic("block") + + collector = event_collector() + await topic.add_listener(collector) + + await topic.publish_all(range(CAPACITY)) + + begin_time = get_current_timestamp() + await topic.publish_all(range(CAPACITY, 2 * CAPACITY)) + time_passed = get_current_timestamp() - begin_time + + # TTL is set in the XML config + self.assertTrue(time_passed >= 2.0) + + await self.assertTrueEventually( + lambda: self.assertEqual(2 * CAPACITY, len(collector.events)) + ) + self.assertEqual(list(range(CAPACITY, CAPACITY * 2)), await self.get_ringbuffer_data(topic)) + + async def test_publish_all_with_error_policy(self): + topic = await self.get_topic("error") + + collector = event_collector() + await topic.add_listener(collector) + + await topic.publish_all(range(CAPACITY)) + + with self.assertRaises(TopicOverloadError): + await topic.publish_all(range(CAPACITY, 2 * CAPACITY)) + + await self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY)), await self.get_ringbuffer_data(topic)) + + async def test_durable_subscription(self): + topic = await self.get_topic(random_string()) + + class DurableListener(ReliableMessageListener): + def __init__(self): + self.objects = [] + self.sequences = [] + self.sequence = -1 + + def on_message(self, message): + self.objects.append(message.message) + + def retrieve_initial_sequence(self): + if self.sequence == -1: + return self.sequence + + # +1 to read the next item + return self.sequence + 1 + + def store_sequence(self, sequence): + self.sequences.append(sequence) + self.sequence = sequence + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return True + + listener = DurableListener() + + registration_id = await topic.add_listener(listener) + await topic.publish("item1") + + await self.assertTrueEventually(lambda: self.assertEqual(["item1"], listener.objects)) + + self.assertTrue(await topic.remove_listener(registration_id)) + + await topic.publish("item2") + await topic.publish("item3") + + await topic.add_listener(listener) + + def assertion(): + self.assertEqual(["item1", "item2", "item3"], listener.objects) + self.assertEqual([0, 1, 2], listener.sequences) + + await self.assertTrueEventually(assertion) + + async def test_client_receives_when_server_publish_messages(self): + skip_if_client_version_older_than(self, "4.2.1") + + topic_name = random_string() + topic = await self.get_topic(topic_name) + + received_message_count = [0] + + def listener(message): + self.assertIsNotNone(message.member) + received_message_count[0] += 1 + + await topic.add_listener(listener) + + message_count = 10 + + script = """ + var topic = instance_0.getReliableTopic("%s"); + for (var i = 0; i < %d; i++) { + topic.publish(i); + } + """ % ( + topic_name, + message_count, + ) + + self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT) + await self.assertTrueEventually( + lambda: self.assertEqual(message_count, received_message_count[0]) + ) + + async def get_ringbuffer_data(self, topic): + ringbuffer = topic._ringbuffer + head_sequence = await ringbuffer.head_sequence() + items = await ringbuffer.read_many(head_sequence, CAPACITY, CAPACITY) + return list( + map( + lambda m: topic._to_object(m.payload), + items, + ) + ) + + async def get_topic(self, name): + topic = await self.client.get_reliable_topic(name) + self.topics.append(topic) + return topic From 3de059d1ebdd25c7c079b9c6992be8c4dc2bdef8 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 20 Apr 2026 06:57:25 +0300 Subject: [PATCH 08/10] Black --- hazelcast/internal/asyncio_proxy/manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 2995af845b..08bfab6fc3 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -103,4 +103,3 @@ async def create_reliable_topic_proxy(service_name, name, context): SET_SERVICE: create_set_proxy, VECTOR_SERVICE: create_vector_collection_proxy, } - From fdf1c35a84b77cf152f36343f7da7c534e1189b7 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 20 Apr 2026 07:16:15 +0300 Subject: [PATCH 09/10] Clarify --- hazelcast/internal/asyncio_proxy/reliable_topic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hazelcast/internal/asyncio_proxy/reliable_topic.py b/hazelcast/internal/asyncio_proxy/reliable_topic.py index 4bf6ab6e17..0a8a886890 100644 --- a/hazelcast/internal/asyncio_proxy/reliable_topic.py +++ b/hazelcast/internal/asyncio_proxy/reliable_topic.py @@ -77,9 +77,12 @@ def next_batch(self): """ if self._cancelled: return - self._task = asyncio.create_task(self._run_next_batch()) + # The task is assigned to an instance variable to keep a reference. + # That ensures it is not garbage collected before done. + # See: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task + self._task = asyncio.create_task(self._handle_next_batch()) - async def _run_next_batch(self): + async def _handle_next_batch(self): """Reads the next batch from the ringbuffer and processes the items.""" if self._cancelled: return From 5ef864a6d68819ac84e8ab9c876d5b3b87f7b998 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 20 Apr 2026 07:25:23 +0300 Subject: [PATCH 10/10] Style --- .../internal/asyncio_proxy/reliable_topic.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/hazelcast/internal/asyncio_proxy/reliable_topic.py b/hazelcast/internal/asyncio_proxy/reliable_topic.py index 0a8a886890..7a5b42cf92 100644 --- a/hazelcast/internal/asyncio_proxy/reliable_topic.py +++ b/hazelcast/internal/asyncio_proxy/reliable_topic.py @@ -82,6 +82,14 @@ def next_batch(self): # See: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task self._task = asyncio.create_task(self._handle_next_batch()) + def cancel(self): + """Sets the cancelled flag, cancels the running task, and removes + the runner registration. + """ + self._cancelled = True + self._runners.pop(self._registration_id, None) + self._listener.on_cancel() + async def _handle_next_batch(self): """Reads the next batch from the ringbuffer and processes the items.""" if self._cancelled: @@ -102,7 +110,6 @@ async def _handle_next_batch(self): try: message = result[i] self._listener.store_sequence(result.get_sequence(i)) - member = None if message.publisher_address: member = MemberInfo( @@ -138,16 +145,6 @@ async def _handle_next_batch(self): if not await self._handle_internal_error(e): self.cancel() - def cancel(self): - """Sets the cancelled flag, cancels the running task, and removes - the runner registration. - """ - self._cancelled = True - self._runners.pop(self._registration_id, None) - # if self._task is not None and not self._task.done(): - # self._task.cancel() - self._listener.on_cancel() - def _is_loss_tolerable(self, loss_count: int) -> bool: """Called when message loss is detected.