diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 89df6eea46..3328d9817b 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -5,15 +5,28 @@ __all__ = [ "EntryEventCallable", + "Executor", "HazelcastClient", "List", "Map", + "MultiMap", + "PNCounter", + "Queue", "ReplicatedMap", + "Ringbuffer", + "Set", "VectorCollection", ] from hazelcast.internal.asyncio_client import HazelcastClient +from hazelcast.internal.asyncio_proxy.executor import Executor from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map, EntryEventCallable +from hazelcast.internal.asyncio_proxy.multi_map import MultiMap +from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter +from hazelcast.internal.asyncio_proxy.queue import Queue from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap +from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap +from hazelcast.internal.asyncio_proxy.ringbuffer import Ringbuffer +from hazelcast.internal.asyncio_proxy.set import Set from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index 0cfbf6fe74..5ef4f4910c 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -11,6 +11,7 @@ from hazelcast.discovery import HazelcastCloudAddressProvider from hazelcast.errors import IllegalStateError, InvalidConfigurationError from hazelcast.internal.asyncio_invocation import InvocationService, Invocation +from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService from hazelcast.internal.asyncio_listener import ClusterViewListenerService, ListenerService @@ -32,6 +33,7 @@ RINGBUFFER_SERVICE, SET_SERVICE, VECTOR_SERVICE, + PN_COUNTER_SERVICE, ) from hazelcast.internal.asyncio_proxy.base import Proxy from hazelcast.internal.asyncio_proxy.executor import Executor @@ -350,6 +352,17 @@ async def get_ringbuffer(self, name: str) -> Ringbuffer[ItemType]: """ return await self._proxy_manager.get_or_create(RINGBUFFER_SERVICE, name) + async def get_pn_counter(self, name: str) -> PNCounter: + """Returns the PN Counter instance with the specified name. + + Args: + name: Name of the PN Counter. + + Returns: + Distributed PN Counter instance with the specified name. + """ + return await self._proxy_manager.get_or_create(PN_COUNTER_SERVICE, name) + async def create_vector_collection_config( self, name: str, diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 565d24eb6d..7929ffe794 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -4,6 +4,7 @@ from hazelcast.internal.asyncio_proxy.executor import create_executor_proxy from hazelcast.internal.asyncio_proxy.list import create_list_proxy from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy +from hazelcast.internal.asyncio_proxy.pn_counter import create_pn_counter_proxy from hazelcast.internal.asyncio_proxy.queue import create_queue_proxy from hazelcast.internal.asyncio_proxy.set import create_set_proxy from hazelcast.internal.asyncio_proxy.vector_collection import ( @@ -25,6 +26,7 @@ REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" RINGBUFFER_SERVICE = "hz:impl:ringbufferService" SET_SERVICE = "hz:impl:setService" +PN_COUNTER_SERVICE = "hz:impl:PNCounterService" VECTOR_SERVICE = "hz:service:vector" _proxy_init: typing.Dict[ @@ -39,6 +41,7 @@ REPLICATED_MAP_SERVICE: create_replicated_map_proxy, RINGBUFFER_SERVICE: create_ringbuffer_proxy, SET_SERVICE: create_set_proxy, + PN_COUNTER_SERVICE: create_pn_counter_proxy, VECTOR_SERVICE: create_vector_collection_proxy, } diff --git a/hazelcast/internal/asyncio_proxy/pn_counter.py b/hazelcast/internal/asyncio_proxy/pn_counter.py new file mode 100644 index 0000000000..1979e82224 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/pn_counter.py @@ -0,0 +1,324 @@ +import asyncio +import logging +import random + +from hazelcast.errors import NoDataMemberInClusterError +from hazelcast.internal.asyncio_cluster import VectorClock +from hazelcast.internal.asyncio_proxy.base import Proxy +from hazelcast.protocol.codec import ( + pn_counter_get_codec, + pn_counter_add_codec, + pn_counter_get_configured_replica_count_codec, +) + +_logger = logging.getLogger(__name__) + + +class PNCounter(Proxy): + """PN (Positive-Negative) CRDT counter. + + The counter supports adding and subtracting values as well as + retrieving the current counter value. + Each replica of this counter can perform operations locally without + coordination with the other replicas, thus increasing availability. + The counter guarantees that whenever two nodes have received the + same set of updates, possibly in a different order, their state is + identical, and any conflicting updates are merged automatically. + If no new updates are made to the shared state, all nodes that can + communicate will eventually have the same data. + + When invoking updates from the client, the invocation is remote. + This may lead to indeterminate state - the update may be applied but the + response has not been received. In this case, the caller will be notified + with a TargetDisconnectedError. + + The read and write methods provide monotonic read and RYW (read-your-write) + guarantees. These guarantees are session guarantees which means that if + no replica with the previously observed state is reachable, the session + guarantees are lost and the method invocation will throw a + ConsistencyLostError. This does not mean + that an update is lost. All of the updates are part of some replica and + will be eventually reflected in the state of all other replicas. This + exception just means that you cannot observe your own writes because + all replicas that contain your updates are currently unreachable. + After you have received a ConsistencyLostError, you can either + wait for a sufficiently up-to-date replica to become reachable in which + case the session can be continued or you can reset the session by calling + the reset() method. If you have called the reset() method, + a new session is started with the next invocation to a CRDT replica. + + Notes: + The CRDT state is kept entirely on non-lite (data) members. If there + aren't any and the methods here are invoked on a lite member, they will + fail with an NoDataMemberInClusterError. + """ + + def __init__(self, service_name, name, context): + super(PNCounter, self).__init__(service_name, name, context) + self._observed_clock = VectorClock() + self._max_replica_count = 0 + self._current_target_replica_address = None + + async def get(self) -> int: + """Returns the current value of the counter. + + Returns: + The current value of the counter. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + return await self._invoke_internal(pn_counter_get_codec) + + async def get_and_add(self, delta: int) -> int: + """Adds the given value to the current value and returns the previous + value. + + Args: + delta: The value to add. + + Returns: + The previous value. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + + return await self._invoke_internal( + pn_counter_add_codec, delta=delta, get_before_update=True + ) + + async def add_and_get(self, delta: int) -> int: + """Adds the given value to the current value and returns the updated + value. + + Args: + delta: The value to add. + + Returns: + The updated value. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + + return await self._invoke_internal( + pn_counter_add_codec, delta=delta, get_before_update=False + ) + + async def get_and_subtract(self, delta: int) -> int: + """Subtracts the given value from the current value and returns the + previous value. + + Args: + delta: The value to subtract. + + Returns: + The previous value. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + + return await self._invoke_internal( + pn_counter_add_codec, delta=-1 * delta, get_before_update=True + ) + + async def subtract_and_get(self, delta: int) -> int: + """Subtracts the given value from the current value and returns the + updated value. + + Args: + delta: The value to subtract. + + Returns: + The updated value. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + + return await self._invoke_internal( + pn_counter_add_codec, delta=-1 * delta, get_before_update=False + ) + + async def get_and_decrement(self) -> int: + """Decrements the counter value by one and returns the previous value. + + Returns: + The previous value. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + + return await self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=True) + + async def decrement_and_get(self) -> int: + """Decrements the counter value by one and returns the updated value. + + Returns: + The updated value. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + + return await self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=False) + + async def get_and_increment(self) -> int: + """Increments the counter value by one and returns the previous value. + + Returns: + The previous value. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + + return await self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=True) + + async def increment_and_get(self) -> int: + """Increments the counter value by one and returns the updated value. + + Returns: + The updated value. + + Raises: + NoDataMemberInClusterError: if the cluster does not contain any + data members. + ConsistencyLostError: if the session guarantees have been lost. + """ + + return await self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=False) + + def reset(self) -> None: + """Resets the observed state by this PN counter. + + This method may be used after a method invocation has thrown a + ``ConsistencyLostError`` to reset the proxy and to be able to start a + new session. + """ + + self._observed_clock = VectorClock() + + async def _invoke_internal(self, codec, **kwargs) -> int: + delegated_future = asyncio.get_running_loop().create_future() + await self._set_result_or_error(delegated_future, [], None, codec, **kwargs) + return await delegated_future + + async def _set_result_or_error( + self, delegated_future, excluded_addresses, last_error, codec, **kwargs + ): + target = await self._get_crdt_operation_target(excluded_addresses) + if not target: + if last_error: + delegated_future.set_exception(last_error) + return + delegated_future.set_exception( + NoDataMemberInClusterError( + "Cannot invoke operations on a CRDT because " + "the cluster does not contain any data members" + ) + ) + return + request = codec.encode_request( + name=self.name, + replica_timestamps=self._observed_clock.entry_set(), + target_replica_uuid=target.uuid, + **kwargs + ) + + try: + result = await self._ainvoke_on_target(request, target.uuid, codec.decode_response) + self._update_observed_replica_timestamp(result["replica_timestamps"]) + delegated_future.set_result(result["value"]) + except Exception as ex: + _logger.exception( + "Exception occurred while invoking operation on target %s, " + "choosing different target", + target, + ) + excluded_addresses.append(target) + await self._set_result_or_error( + delegated_future, excluded_addresses, ex, codec, **kwargs + ) + + async def _get_crdt_operation_target(self, excluded_addresses): + if ( + self._current_target_replica_address + and self._current_target_replica_address not in excluded_addresses + ): + return self._current_target_replica_address + + self._current_target_replica_address = await self._choose_target_replica(excluded_addresses) + return self._current_target_replica_address + + async def _choose_target_replica(self, excluded_addresses): + replica_addresses = await self._get_replica_addresses(excluded_addresses) + + if len(replica_addresses) == 0: + return None + + random_replica_index = random.randrange(0, len(replica_addresses)) + return replica_addresses[random_replica_index] + + async def _get_replica_addresses(self, excluded_addresses): + data_members = self._context.cluster_service.get_members( + lambda member: not member.lite_member + ) + replica_count = await self._get_max_configured_replica_count() + + current_count = min(replica_count, len(data_members)) + replica_addresses = [] + + for i in range(current_count): + member_address = data_members[i] + if member_address not in excluded_addresses: + replica_addresses.append(member_address) + + return replica_addresses + + async def _get_max_configured_replica_count(self): + if self._max_replica_count > 0: + return self._max_replica_count + + request = pn_counter_get_configured_replica_count_codec.encode_request(self.name) + count = await self._invoke( + request, pn_counter_get_configured_replica_count_codec.decode_response + ) + self._max_replica_count = count + return self._max_replica_count + + def _update_observed_replica_timestamp(self, observed_timestamps): + observed_clock = self._to_vector_clock(observed_timestamps) + if observed_clock.is_after(self._observed_clock): + self._observed_clock = observed_clock + + @classmethod + def _to_vector_clock(cls, timestamps): + vector_clock = VectorClock() + for replica_id, timestamp in timestamps: + vector_clock.set_replica_timestamp(replica_id, timestamp) + + return vector_clock + + +async def create_pn_counter_proxy(service_name, name, context): + return PNCounter(service_name, name, context) diff --git a/tests/integration/asyncio/proxy/pn_counter_test.py b/tests/integration/asyncio/proxy/pn_counter_test.py new file mode 100644 index 0000000000..168b515de5 --- /dev/null +++ b/tests/integration/asyncio/proxy/pn_counter_test.py @@ -0,0 +1,169 @@ +import os +import unittest + +from hazelcast.errors import ConsistencyLostError, NoDataMemberInClusterError +from hazelcast.internal.asyncio_client import HazelcastClient +from tests.integration.asyncio.base import SingleMemberTestCase, HazelcastTestCase + + +class PNCounterBasicTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + async def asyncSetUp(self): + await super().asyncSetUp() + self.pn_counter = await self.client.get_pn_counter("pn-counter") + + async def asyncTearDown(self): + await self.pn_counter.destroy() + await super().asyncTearDown() + + async def test_get(self): + await self.pn_counter.add_and_get(4) + self.assertEqual(4, await self.pn_counter.get()) + + async def test_get_initial_value(self): + self.assertEqual(0, await self.pn_counter.get()) + + async def test_get_and_add(self): + await self.check_pn_counter_method(await self.pn_counter.get_and_add(3), 0, 3) + + async def test_add_and_get(self): + await self.check_pn_counter_method(await self.pn_counter.add_and_get(4), 4, 4) + + async def test_get_and_subtract(self): + await self.check_pn_counter_method(await self.pn_counter.get_and_subtract(2), 0, -2) + + async def test_subtract_and_get(self): + await self.check_pn_counter_method(await self.pn_counter.subtract_and_get(5), -5, -5) + + async def test_get_and_decrement(self): + await self.check_pn_counter_method(await self.pn_counter.get_and_decrement(), 0, -1) + + async def test_decrement_and_get(self): + await self.check_pn_counter_method(await self.pn_counter.decrement_and_get(), -1, -1) + + async def test_get_and_increment(self): + await self.check_pn_counter_method(await self.pn_counter.get_and_increment(), 0, 1) + + async def test_increment_and_get(self): + await self.check_pn_counter_method(await self.pn_counter.increment_and_get(), 1, 1) + + async def test_reset(self): + await self.pn_counter.get_and_add(1) + old_vector_clock = self.pn_counter._observed_clock + self.pn_counter.reset() + self.assertNotEqual(old_vector_clock, self.pn_counter._observed_clock) + + async def check_pn_counter_method( + self, return_value, expected_return_value, expected_get_value + ): + get_value = await self.pn_counter.get() + + self.assertEqual(expected_return_value, return_value) + self.assertEqual(expected_get_value, get_value) + + +class PNCounterConsistencyTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + async def asyncSetUp(self): + self.rc = self.create_rc() + self.cluster = self.create_cluster(self.rc, self.read_cluster_config()) + self.cluster.start_member() + self.cluster.start_member() + self.client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + self.pn_counter = await self.client.get_pn_counter("pn-counter") + + async def asyncTearDown(self): + await self.client.shutdown() + self.rc.terminateCluster(self.cluster.id) + self.rc.exit() + + async def test_consistency_lost_error_raised_when_target_terminates(self): + await self.pn_counter.add_and_get(3) + replica_address = self.pn_counter._current_target_replica_address + self.rc.terminateMember(self.cluster.id, str(replica_address.uuid)) + with self.assertRaises(ConsistencyLostError): + await self.pn_counter.add_and_get(5) + + async def test_counter_can_continue_session_by_calling_reset(self): + await self.pn_counter.add_and_get(3) + replica_address = self.pn_counter._current_target_replica_address + self.rc.terminateMember(self.cluster.id, str(replica_address.uuid)) + self.pn_counter.reset() + await self.pn_counter.add_and_get(5) + + @staticmethod + def read_cluster_config(): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open(os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast.xml")) as f: + return f.read() + + +class PNCounterLiteMemberTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open( + os.path.join(dir_path, "../../backward_compatible/proxy/hazelcast_litemember.xml") + ) as f: + return f.read() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.pn_counter = await self.client.get_pn_counter("pn-counter") + + async def asyncTearDown(self): + await self.pn_counter.destroy() + await super().asyncTearDown() + + async def test_get_with_lite_member(self): + await self.verify_error_raised(NoDataMemberInClusterError, self.pn_counter.get) + + async def test_get_and_add_with_lite_member(self): + await self.verify_error_raised(NoDataMemberInClusterError, self.pn_counter.get_and_add, 1) + + async def test_add_and_get_with_lite_member(self): + await self.verify_error_raised(NoDataMemberInClusterError, self.pn_counter.add_and_get, 2) + + async def test_get_and_subtract_with_lite_member(self): + await self.verify_error_raised( + NoDataMemberInClusterError, self.pn_counter.get_and_subtract, 1 + ) + + async def test_subtract_and_get_with_lite_member(self): + await self.verify_error_raised( + NoDataMemberInClusterError, self.pn_counter.subtract_and_get, 5 + ) + + async def test_get_and_decrement_with_lite_member(self): + await self.verify_error_raised( + NoDataMemberInClusterError, self.pn_counter.get_and_decrement + ) + + async def test_decrement_and_get_with_lite_member(self): + await self.verify_error_raised( + NoDataMemberInClusterError, self.pn_counter.decrement_and_get + ) + + async def test_get_and_increment(self): + await self.verify_error_raised( + NoDataMemberInClusterError, self.pn_counter.get_and_increment + ) + + async def test_increment_and_get(self): + await self.verify_error_raised( + NoDataMemberInClusterError, self.pn_counter.increment_and_get + ) + + async def verify_error_raised(self, error, func, *args): + with self.assertRaises(error): + await func(*args)