Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hazelcast/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def start(self, connection_manager, membership_listeners):
self.add_listener(*listener)

def get_member(self, member_uuid):
check_not_none(uuid, "UUID must not be null")
check_not_none(member_uuid, "UUID must not be null")
snapshot = self._member_list_snapshot
return snapshot.members.get(member_uuid, None)

Expand Down
23 changes: 15 additions & 8 deletions hazelcast/internal/asyncio_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,17 @@ def __init__(self, client, config):
self._listeners = {}
self._member_list_snapshot = _EMPTY_SNAPSHOT
self._initial_list_fetched = asyncio.Event()
# asyncio tasks are weakly referenced; keep strong refs until they finish.
# see: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
self._close_tasks: typing.Set[asyncio.Task] = set()

def start(self, connection_manager, membership_listeners):
self._connection_manager = connection_manager
for listener in membership_listeners:
self.add_listener(*listener)

def get_member(self, member_uuid):
check_not_none(uuid, "UUID must not be null")
check_not_none(member_uuid, "UUID must not be null")
snapshot = self._member_list_snapshot
return snapshot.members.get(member_uuid, None)

Expand Down Expand Up @@ -282,14 +285,18 @@ def _detect_membership_events(self, previous_members, current_members):
for dead_member in dead_members:
connection = self._connection_manager.get_connection(dead_member.uuid)
if connection:
connection.close_connection(
None,
TargetDisconnectedError(
"The client has closed the connection to this member, "
"after receiving a member left event from the cluster. "
"%s" % connection
),
task = asyncio.create_task(
connection.close_connection(
None,
TargetDisconnectedError(
"The client has closed the connection to this member, "
"after receiving a member left event from the cluster. "
"%s" % connection
),
)
)
self._close_tasks.add(task)
task.add_done_callback(self._close_tasks.discard)

if (len(new_members) + len(dead_members)) > 0:
if len(current_members) > 0:
Expand Down
1 change: 1 addition & 0 deletions hazelcast/internal/asyncio_compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def _replicate_schema(
# is not known to be replicated yet. We should retry
# sending it in a random member.
await asyncio.sleep(self._invocation_retry_pause)
remaining_retries -= 1

# We tried to send it a couple of times, but the member list
# in our local and the member list returned by the initiator
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/internal/asyncio_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ async def translate(self, address):
async def refresh(self):
"""Refreshes the internal lookup table if necessary."""
try:
self._private_to_public = self.cloud_discovery.discover_nodes()
self._private_to_public = await asyncio.to_thread(self.cloud_discovery.discover_nodes)
except Exception as e:
_logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e)
77 changes: 40 additions & 37 deletions hazelcast/internal/asyncio_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,51 +82,54 @@ async def register_listener(
tg.create_task(task)
return registration_id
except Exception:
await self.deregister_listener(registration_id)
await self._deregister_listener_unsafe(registration_id)
raise HazelcastError("Listener cannot be added")

async def deregister_listener(self, user_registration_id):
check_not_none(user_registration_id, "None user_registration_id is not allowed!")
async with self._registration_lock:
listener_registration = self._active_registrations.pop(user_registration_id, None)
if not listener_registration:
return False

async def handle(inv: Invocation, conn: AsyncioConnection):
try:
await inv.future
except Exception as e:
if not isinstance(
e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)
):
_logger.warning(
"Deregistration of listener with ID %s has failed for address %s: %s",
user_registration_id,
conn.remote_address,
e,
)
return await self._deregister_listener_unsafe(user_registration_id)

async with asyncio.TaskGroup() as tg:
items = listener_registration.connection_registrations.items()
for connection, event_registration in items:
# Remove local handler
self.remove_event_handler(event_registration.correlation_id)
# The rest is for deleting the remote registration
server_registration_id = event_registration.server_registration_id
deregister_request = listener_registration.encode_deregister_request(
server_registration_id
)
if deregister_request is None:
# None means no remote registration (e.g. for backup acks)
continue
invocation = Invocation(
deregister_request, connection=connection, timeout=sys.maxsize, urgent=True
async def _deregister_listener_unsafe(self, user_registration_id):
listener_registration = self._active_registrations.pop(user_registration_id, None)
if not listener_registration:
return False

async def handle(inv: Invocation, conn: AsyncioConnection):
try:
await inv.future
except Exception as e:
if not isinstance(
e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)
):
_logger.warning(
"Deregistration of listener with ID %s has failed for address %s: %s",
user_registration_id,
conn.remote_address,
e,
)
self._invocation_service.invoke(invocation)
tg.create_task(handle(invocation, connection))

listener_registration.connection_registrations.clear()
return True
async with asyncio.TaskGroup() as tg:
items = listener_registration.connection_registrations.items()
for connection, event_registration in items:
# Remove local handler
self.remove_event_handler(event_registration.correlation_id)
# The rest is for deleting the remote registration
server_registration_id = event_registration.server_registration_id
deregister_request = listener_registration.encode_deregister_request(
server_registration_id
)
if deregister_request is None:
# None means no remote registration (e.g. for backup acks)
continue
invocation = Invocation(
deregister_request, connection=connection, timeout=sys.maxsize, urgent=True
)
self._invocation_service.invoke(invocation)
tg.create_task(handle(invocation, connection))

listener_registration.connection_registrations.clear()
return True

def handle_client_message(self, message: InboundMessage, correlation_id: int):
handler = self._event_handlers.get(correlation_id, None)
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/internal/asyncio_proxy/vector_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def handler(message):
key_data = self._to_data(key)
value_data = self._to_data(document.value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.set, key, document)
return await self._send_schema_and_retry(e, self.put, key, document)
document = copy.copy(document)
document.value = value_data
request = vector_collection_put_codec.encode_request(
Expand All @@ -409,7 +409,7 @@ def handler(message):
key_data = self._to_data(key)
value_data = self._to_data(document.value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.set, key, document)
return await self._send_schema_and_retry(e, self.put_if_absent, key, document)
document.value = value_data
request = vector_collection_put_if_absent_codec.encode_request(
self.name,
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/vector_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def handler(message):
key_data = self._to_data(key)
value_data = self._to_data(document.value)
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.set, key, document)
return self._send_schema_and_retry(e, self.put, key, document)
document = copy.copy(document)
document.value = value_data
request = vector_collection_put_codec.encode_request(
Expand All @@ -410,7 +410,7 @@ def handler(message):
key_data = self._to_data(key)
value_data = self._to_data(document.value)
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.set, key, document)
return self._send_schema_and_retry(e, self.put_if_absent, key, document)
document.value = value_data
request = vector_collection_put_if_absent_codec.encode_request(
self.name,
Expand Down