From d0b32aeb6c26a6fa03cbba37d30ec135bcfec3e1 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 18 Feb 2026 13:13:53 +0300 Subject: [PATCH 1/4] Create tasks for conn.close_connection calls. --- hazelcast/internal/asyncio_cluster.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/hazelcast/internal/asyncio_cluster.py b/hazelcast/internal/asyncio_cluster.py index 4ce2491b65..6ba883177c 100644 --- a/hazelcast/internal/asyncio_cluster.py +++ b/hazelcast/internal/asyncio_cluster.py @@ -134,6 +134,9 @@ def __init__(self, client, config): self._listeners = {} self._member_list_snapshot = _EMPTY_SNAPSHOT self._initial_list_fetched = asyncio.Event() + # asyncio tasks are weakly referenced; keep strong refs until they finish. + # see: https://docs.python.org/3/library/asyncio-task.html#creating-tasks + self._close_tasks: typing.Set[asyncio.Task] = set() def start(self, connection_manager, membership_listeners): self._connection_manager = connection_manager @@ -282,14 +285,18 @@ def _detect_membership_events(self, previous_members, current_members): for dead_member in dead_members: connection = self._connection_manager.get_connection(dead_member.uuid) if connection: - connection.close_connection( - None, - TargetDisconnectedError( - "The client has closed the connection to this member, " - "after receiving a member left event from the cluster. " - "%s" % connection - ), + task = asyncio.create_task( + connection.close_connection( + None, + TargetDisconnectedError( + "The client has closed the connection to this member, " + "after receiving a member left event from the cluster. " + "%s" % connection + ), + ) ) + self._close_tasks.add(task) + task.add_done_callback(self._close_tasks.discard) if (len(new_members) + len(dead_members)) > 0: if len(current_members) > 0: From df228c2edde50bfdc5996e77fabe1b3ac8f4f731 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 18 Feb 2026 13:14:30 +0300 Subject: [PATCH 2/4] cluster get_member must use the correct variable --- hazelcast/cluster.py | 2 +- hazelcast/internal/asyncio_cluster.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hazelcast/cluster.py b/hazelcast/cluster.py index ce5ec1d6ad..a1e3325893 100644 --- a/hazelcast/cluster.py +++ b/hazelcast/cluster.py @@ -140,7 +140,7 @@ def start(self, connection_manager, membership_listeners): self.add_listener(*listener) def get_member(self, member_uuid): - check_not_none(uuid, "UUID must not be null") + check_not_none(member_uuid, "UUID must not be null") snapshot = self._member_list_snapshot return snapshot.members.get(member_uuid, None) diff --git a/hazelcast/internal/asyncio_cluster.py b/hazelcast/internal/asyncio_cluster.py index 6ba883177c..b56d136cda 100644 --- a/hazelcast/internal/asyncio_cluster.py +++ b/hazelcast/internal/asyncio_cluster.py @@ -144,7 +144,7 @@ def start(self, connection_manager, membership_listeners): self.add_listener(*listener) def get_member(self, member_uuid): - check_not_none(uuid, "UUID must not be null") + check_not_none(member_uuid, "UUID must not be null") snapshot = self._member_list_snapshot return snapshot.members.get(member_uuid, None) From 02f6cdc2681552e2a513118912acebc12c006056 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 18 Feb 2026 14:34:10 +0300 Subject: [PATCH 3/4] Fix the deadlock in asyncio listener --- hazelcast/internal/asyncio_listener.py | 77 +++++++++++++------------- 1 file changed, 40 insertions(+), 37 deletions(-) diff --git a/hazelcast/internal/asyncio_listener.py b/hazelcast/internal/asyncio_listener.py index 43642761a6..f041df03b1 100644 --- a/hazelcast/internal/asyncio_listener.py +++ b/hazelcast/internal/asyncio_listener.py @@ -82,51 +82,54 @@ async def register_listener( tg.create_task(task) return registration_id except Exception: - await self.deregister_listener(registration_id) + await self._deregister_listener_unsafe(registration_id) raise HazelcastError("Listener cannot be added") async def deregister_listener(self, user_registration_id): check_not_none(user_registration_id, "None user_registration_id is not allowed!") async with self._registration_lock: - listener_registration = self._active_registrations.pop(user_registration_id, None) - if not listener_registration: - return False - - async def handle(inv: Invocation, conn: AsyncioConnection): - try: - await inv.future - except Exception as e: - if not isinstance( - e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError) - ): - _logger.warning( - "Deregistration of listener with ID %s has failed for address %s: %s", - user_registration_id, - conn.remote_address, - e, - ) + return await self._deregister_listener_unsafe(user_registration_id) - async with asyncio.TaskGroup() as tg: - items = listener_registration.connection_registrations.items() - for connection, event_registration in items: - # Remove local handler - self.remove_event_handler(event_registration.correlation_id) - # The rest is for deleting the remote registration - server_registration_id = event_registration.server_registration_id - deregister_request = listener_registration.encode_deregister_request( - server_registration_id - ) - if deregister_request is None: - # None means no remote registration (e.g. for backup acks) - continue - invocation = Invocation( - deregister_request, connection=connection, timeout=sys.maxsize, urgent=True + async def _deregister_listener_unsafe(self, user_registration_id): + listener_registration = self._active_registrations.pop(user_registration_id, None) + if not listener_registration: + return False + + async def handle(inv: Invocation, conn: AsyncioConnection): + try: + await inv.future + except Exception as e: + if not isinstance( + e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError) + ): + _logger.warning( + "Deregistration of listener with ID %s has failed for address %s: %s", + user_registration_id, + conn.remote_address, + e, ) - self._invocation_service.invoke(invocation) - tg.create_task(handle(invocation, connection)) - listener_registration.connection_registrations.clear() - return True + async with asyncio.TaskGroup() as tg: + items = listener_registration.connection_registrations.items() + for connection, event_registration in items: + # Remove local handler + self.remove_event_handler(event_registration.correlation_id) + # The rest is for deleting the remote registration + server_registration_id = event_registration.server_registration_id + deregister_request = listener_registration.encode_deregister_request( + server_registration_id + ) + if deregister_request is None: + # None means no remote registration (e.g. for backup acks) + continue + invocation = Invocation( + deregister_request, connection=connection, timeout=sys.maxsize, urgent=True + ) + self._invocation_service.invoke(invocation) + tg.create_task(handle(invocation, connection)) + + listener_registration.connection_registrations.clear() + return True def handle_client_message(self, message: InboundMessage, correlation_id: int): handler = self._event_handlers.get(correlation_id, None) From 16b4a09f939b7197d0f1a510ab314f65ab539cfb Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 18 Feb 2026 16:10:40 +0300 Subject: [PATCH 4/4] Fixed wrong method retries in VC, compact schema send retry, blocking cloud discovery refresh --- hazelcast/internal/asyncio_compact.py | 1 + hazelcast/internal/asyncio_discovery.py | 2 +- hazelcast/internal/asyncio_proxy/vector_collection.py | 4 ++-- hazelcast/proxy/vector_collection.py | 4 ++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/hazelcast/internal/asyncio_compact.py b/hazelcast/internal/asyncio_compact.py index 1533befdc3..9ba6746961 100644 --- a/hazelcast/internal/asyncio_compact.py +++ b/hazelcast/internal/asyncio_compact.py @@ -102,6 +102,7 @@ async def _replicate_schema( # is not known to be replicated yet. We should retry # sending it in a random member. await asyncio.sleep(self._invocation_retry_pause) + remaining_retries -= 1 # We tried to send it a couple of times, but the member list # in our local and the member list returned by the initiator diff --git a/hazelcast/internal/asyncio_discovery.py b/hazelcast/internal/asyncio_discovery.py index dcd890e8fb..9f755cc5cb 100644 --- a/hazelcast/internal/asyncio_discovery.py +++ b/hazelcast/internal/asyncio_discovery.py @@ -53,6 +53,6 @@ async def translate(self, address): async def refresh(self): """Refreshes the internal lookup table if necessary.""" try: - self._private_to_public = self.cloud_discovery.discover_nodes() + self._private_to_public = await asyncio.to_thread(self.cloud_discovery.discover_nodes) except Exception as e: _logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e) diff --git a/hazelcast/internal/asyncio_proxy/vector_collection.py b/hazelcast/internal/asyncio_proxy/vector_collection.py index 2a2a1ce632..d9fd258915 100644 --- a/hazelcast/internal/asyncio_proxy/vector_collection.py +++ b/hazelcast/internal/asyncio_proxy/vector_collection.py @@ -390,7 +390,7 @@ def handler(message): key_data = self._to_data(key) value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: - return await self._send_schema_and_retry(e, self.set, key, document) + return await self._send_schema_and_retry(e, self.put, key, document) document = copy.copy(document) document.value = value_data request = vector_collection_put_codec.encode_request( @@ -409,7 +409,7 @@ def handler(message): key_data = self._to_data(key) value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: - return await self._send_schema_and_retry(e, self.set, key, document) + return await self._send_schema_and_retry(e, self.put_if_absent, key, document) document.value = value_data request = vector_collection_put_if_absent_codec.encode_request( self.name, diff --git a/hazelcast/proxy/vector_collection.py b/hazelcast/proxy/vector_collection.py index 5e45931326..b39eb1f4d8 100644 --- a/hazelcast/proxy/vector_collection.py +++ b/hazelcast/proxy/vector_collection.py @@ -391,7 +391,7 @@ def handler(message): key_data = self._to_data(key) value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: - return self._send_schema_and_retry(e, self.set, key, document) + return self._send_schema_and_retry(e, self.put, key, document) document = copy.copy(document) document.value = value_data request = vector_collection_put_codec.encode_request( @@ -410,7 +410,7 @@ def handler(message): key_data = self._to_data(key) value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: - return self._send_schema_and_retry(e, self.set, key, document) + return self._send_schema_and_retry(e, self.put_if_absent, key, document) document.value = value_data request = vector_collection_put_if_absent_codec.encode_request( self.name,