diff --git a/hazelcast/internal/asyncio_cluster.py b/hazelcast/internal/asyncio_cluster.py index b56d136cda..a809a66d93 100644 --- a/hazelcast/internal/asyncio_cluster.py +++ b/hazelcast/internal/asyncio_cluster.py @@ -262,7 +262,7 @@ def _fire_membership_events(self, dead_members, new_members): if handler: try: handler(dead_member) - except: + except Exception: _logger.exception("Exception in membership listener") for new_member in new_members: @@ -270,7 +270,7 @@ def _fire_membership_events(self, dead_members, new_members): if handler: try: handler(new_member) - except: + except Exception: _logger.exception("Exception in membership listener") def _detect_membership_events(self, previous_members, current_members): diff --git a/hazelcast/internal/asyncio_connection.py b/hazelcast/internal/asyncio_connection.py index 5d20192086..a3912d8aad 100644 --- a/hazelcast/internal/asyncio_connection.py +++ b/hazelcast/internal/asyncio_connection.py @@ -5,7 +5,7 @@ import struct import time import uuid -from typing import Coroutine +from typing import Coroutine, Tuple from hazelcast import __version__ from hazelcast.config import ReconnectMode @@ -307,11 +307,8 @@ async def connect_to_all_cluster_members(self, sync_start): self._start_connect_all_members_timer() - async def on_connection_close(self, closed_connection): - remote_uuid = closed_connection.remote_uuid - remote_address = closed_connection.remote_address - - if not remote_address: + async def on_connection_close(self, closed_connection, unsafe=False): + if not closed_connection.remote_address: _logger.debug( "Destroying %s, but it has no remote address, hence nothing is " "removed from the connection dictionary", @@ -319,25 +316,9 @@ async def on_connection_close(self, closed_connection): ) return - disconnected = False - removed = False - trigger_reconnection = False - async with self._lock: - connection = self.active_connections.get(remote_uuid, None) - if connection == closed_connection: - self.active_connections.pop(remote_uuid, None) - removed = True - _logger.info( - "Removed connection to %s:%s, connection: %s", - remote_address, - remote_uuid, - connection, - ) - - if not self.active_connections: - trigger_reconnection = True - if self._client_state == ClientState.INITIALIZED_ON_CLUSTER: - disconnected = True + disconnected, removed, trigger_reconnection = await self._determine_connection_state( + closed_connection, unsafe=unsafe + ) if disconnected: self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED) @@ -359,9 +340,40 @@ async def on_connection_close(self, closed_connection): _logger.debug( "Destroying %s, but there is no mapping for %s in the connection dictionary", closed_connection, + closed_connection.remote_uuid, + ) + + async def _determine_connection_state( + self, closed_connection, unsafe=False + ) -> Tuple[bool, bool, bool]: + if unsafe: + return self._determine_connection_state_unsafe(closed_connection) + async with self._lock: + return self._determine_connection_state_unsafe(closed_connection) + + def _determine_connection_state_unsafe(self, closed_connection) -> Tuple[bool, bool, bool]: + remote_uuid = closed_connection.remote_uuid + disconnected = False + removed = False + trigger_reconnection = False + connection = self.active_connections.get(remote_uuid, None) + if connection == closed_connection: + self.active_connections.pop(remote_uuid, None) + removed = True + _logger.info( + "Removed connection to %s:%s, connection: %s", + closed_connection.remote_address, remote_uuid, + connection, ) + if not self.active_connections: + trigger_reconnection = True + if self._client_state == ClientState.INITIALIZED_ON_CLUSTER: + disconnected = True + + return disconnected, removed, trigger_reconnection + def check_invocation_allowed(self): state = self._client_state if state == ClientState.INITIALIZED_ON_CLUSTER and self.active_connections: @@ -464,6 +476,12 @@ def _init_wait_strategy(self, config): def _start_connect_all_members_timer(self): connecting_uuids = set() + async def connect_to_member(member): + try: + await self._get_or_connect_to_member(member) + except Exception: + _logger.debug("Error connecting to %s in reconnect timer", member, exc_info=True) + async def run(): await asyncio.sleep(1) if not self._lifecycle_service.running: @@ -480,7 +498,7 @@ async def run(): connecting_uuids.add(member_uuid) if not self._lifecycle_service.running: break - tg.create_task(self._get_or_connect_to_member(member)) + tg.create_task(connect_to_member(member)) member_uuids.append(member_uuid) for item in member_uuids: @@ -658,49 +676,54 @@ async def _handle_successful_auth(self, response, connection): existing = self.active_connections.get(remote_uuid, None) - if existing: - await connection.close_connection( - "Duplicate connection to same member with UUID: %s" % remote_uuid, None - ) - return existing - - new_cluster_id = response["cluster_id"] - changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id - if changed_cluster: - await self._check_client_state_on_cluster_change(connection) - _logger.warning( - "Switching from current cluster: %s to new cluster: %s", - self._cluster_id, - new_cluster_id, - ) - self._on_cluster_restart() + if existing: + await connection.close_connection( + "Duplicate connection to same member with UUID: %s" % remote_uuid, + None, + unsafe=True, + ) + return existing + + new_cluster_id = response["cluster_id"] + changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id + if changed_cluster: + await self._check_client_state_on_cluster_change(connection) + _logger.warning( + "Switching from current cluster: %s to new cluster: %s", + self._cluster_id, + new_cluster_id, + ) + self._on_cluster_restart() - async with self._lock: is_initial_connection = not self.active_connections self.active_connections[remote_uuid] = connection fire_connected_lifecycle_event = False - if is_initial_connection: - self._cluster_id = new_cluster_id - # In split brain, the client might connect to the one half - # of the cluster, and then later might reconnect to the - # other half, after the half it was connected to is - # completely dead. Since the cluster id is preserved in - # split brain scenarios, it is impossible to distinguish - # reconnection to the same cluster vs reconnection to the - # other half of the split brain. However, in the latter, - # we might need to send some state to the other half of - # the split brain (like Compact schemas). That forces us - # to send the client state to the cluster after the first - # cluster connection, regardless the cluster id is - # changed or not. - if self._established_initial_cluster_connection: - self._client_state = ClientState.CONNECTED_TO_CLUSTER - await self._initialize_on_cluster(new_cluster_id) - else: - fire_connected_lifecycle_event = True - self._established_initial_cluster_connection = True - self._client_state = ClientState.INITIALIZED_ON_CLUSTER + init_on_cluster = False + if is_initial_connection: + self._cluster_id = new_cluster_id + # In split brain, the client might connect to the one half + # of the cluster, and then later might reconnect to the + # other half, after the half it was connected to is + # completely dead. Since the cluster id is preserved in + # split brain scenarios, it is impossible to distinguish + # reconnection to the same cluster vs reconnection to the + # other half of the split brain. However, in the latter, + # we might need to send some state to the other half of + # the split brain (like Compact schemas). That forces us + # to send the client state to the cluster after the first + # cluster connection, regardless the cluster id is + # changed or not. + if self._established_initial_cluster_connection: + self._client_state = ClientState.CONNECTED_TO_CLUSTER + init_on_cluster = True + else: + fire_connected_lifecycle_event = True + self._established_initial_cluster_connection = True + self._client_state = ClientState.INITIALIZED_ON_CLUSTER + + if init_on_cluster: + await self._initialize_on_cluster(new_cluster_id) if fire_connected_lifecycle_event: self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) @@ -777,7 +800,7 @@ async def _check_client_state_on_cluster_change(self, connection): # we can operate on. In those scenarios, we rely on the fact that we will # reopen the connections. reason = "Connection does not belong to the cluster %s" % self._cluster_id - await connection.close_connection(reason, None) + await connection.close_connection(reason, None, unsafe=True) raise ValueError(reason) def _on_cluster_restart(self): @@ -985,13 +1008,13 @@ def send_message(self, message): self._write(message.buf) return True - # Not named close to distinguish it from the asyncore.dispatcher.close. - async def close_connection(self, reason, cause): + async def close_connection(self, reason, cause, unsafe=False): """Closes the connection. Args: reason (str): The reason this connection is going to be closed. Is allowed to be None. cause (Exception): The exception responsible for closing this connection. Is allowed to be None. + unsafe (bool): Do not acquire a lock """ if not self.live: return @@ -1003,7 +1026,7 @@ async def close_connection(self, reason, cause): self._inner_close() except Exception: _logger.exception("Error while closing the the connection %s", self) - await self._connection_manager.on_connection_close(self) + await self._connection_manager.on_connection_close(self, unsafe=unsafe) def _log_close(self, reason, cause): msg = "%s closed. Reason: %s" diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 9daeca0e1f..1c688c037b 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -1,3 +1,4 @@ +import asyncio import typing from hazelcast.internal.asyncio_proxy.vector_collection import ( @@ -29,11 +30,24 @@ def __init__(self, context): async def get_or_create(self, service_name, name, create_on_remote=True): ns = (service_name, name) - if ns in self._proxies: - return self._proxies[ns] + proxy = self._proxies.get(ns) + if proxy is not None: + if isinstance(proxy, asyncio.Future): + return await proxy + return proxy - proxy = await self._create_proxy(service_name, name, create_on_remote) + # allocate the proxy slot, so a task that tries to access the same proxy knows it's being created + fut = asyncio.get_running_loop().create_future() + self._proxies[ns] = fut + try: + proxy = await self._create_proxy(service_name, name, create_on_remote) + except BaseException as e: + self._proxies.pop(ns, None) + fut.set_exception(e) + raise + # replace the placeholder with the proxy self._proxies[ns] = proxy + fut.set_result(proxy) return proxy async def _create_proxy(self, service_name, name, create_on_remote) -> Proxy: @@ -59,4 +73,4 @@ async def destroy_proxy(self, service_name, name, destroy_on_remote=True): return False def get_distributed_objects(self): - return to_list(self._proxies.values()) + return to_list(v for v in self._proxies.values() if not isinstance(v, asyncio.Future)) diff --git a/hazelcast/internal/asyncio_proxy/map.py b/hazelcast/internal/asyncio_proxy/map.py index a911ef8a38..57d0a755c7 100644 --- a/hazelcast/internal/asyncio_proxy/map.py +++ b/hazelcast/internal/asyncio_proxy/map.py @@ -279,23 +279,32 @@ def handle_event_entry( number_of_affected_entries, ) if event.event_type == EntryEventType.ADDED: - added_func(event) + if added_func: + added_func(event) elif event.event_type == EntryEventType.REMOVED: - removed_func(event) + if removed_func: + removed_func(event) elif event.event_type == EntryEventType.UPDATED: - updated_func(event) + if updated_func: + updated_func(event) elif event.event_type == EntryEventType.EVICTED: - evicted_func(event) + if evicted_func: + evicted_func(event) elif event.event_type == EntryEventType.EVICT_ALL: - evict_all_func(event) + if evict_all_func: + evict_all_func(event) elif event.event_type == EntryEventType.CLEAR_ALL: - clear_all_func(event) + if clear_all_func: + clear_all_func(event) elif event.event_type == EntryEventType.MERGED: - merged_func(event) + if merged_func: + merged_func(event) elif event.event_type == EntryEventType.EXPIRED: - expired_func(event) + if expired_func: + expired_func(event) elif event.event_type == EntryEventType.LOADED: - loaded_func(event) + if loaded_func: + loaded_func(event) return await self._register_listener( request, diff --git a/hazelcast/internal/asyncio_proxy/vector_collection.py b/hazelcast/internal/asyncio_proxy/vector_collection.py index d9fd258915..3e81813f63 100644 --- a/hazelcast/internal/asyncio_proxy/vector_collection.py +++ b/hazelcast/internal/asyncio_proxy/vector_collection.py @@ -410,6 +410,7 @@ def handler(message): value_data = self._to_data(document.value) except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.put_if_absent, key, document) + document = copy.copy(document) document.value = value_data request = vector_collection_put_if_absent_codec.encode_request( self.name, diff --git a/hazelcast/internal/asyncio_reactor.py b/hazelcast/internal/asyncio_reactor.py index f08e1bde55..38c2060cf2 100644 --- a/hazelcast/internal/asyncio_reactor.py +++ b/hazelcast/internal/asyncio_reactor.py @@ -69,6 +69,7 @@ def __init__( self._preconn_buffers: list = [] self._create_task: asyncio.Task | None = None self._close_task: asyncio.Task | None = None + self._connect_timer_task: asyncio.Task | None = None self._connected = False self._receive_buffer_size = _BUFFER_SIZE self._sock = None @@ -239,7 +240,7 @@ def _set_socket_options(self, sock, config): sock.setsockopt(level, option_name, value) def _create_ssl_context(self, config: Config): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) protocol = config.ssl_protocol # Use only the configured protocol try: @@ -285,10 +286,11 @@ def __init__(self, conn: AsyncioConnection): self.start_time: float | None = None self._write_buf = io.BytesIO() self._write_buf_size = 0 + self._flush_scheduled = False self._recv_buf = None # asyncio tasks are weakly referenced # storing tasks here in order not to lose them midway - # see: https: // docs.python.org / 3 / library / asyncio - task.html # creating-tasks + # see: https: //docs.python.org/3/library/asyncio-task.html # creating-tasks self._tasks: set = set() def connection_made(self, transport: transports.BaseTransport): @@ -296,7 +298,6 @@ def connection_made(self, transport: transports.BaseTransport): self.start_time = time.time() self.write(self.PROTOCOL_STARTER) _logger.debug("Connected to %s", self._conn._address) - self._conn._loop.call_soon(self._write_loop) def connection_lost(self, exc): _logger.warning("Connection closed by server") @@ -313,6 +314,9 @@ def close(self): def write(self, buf): self._write_buf.write(buf) self._write_buf_size += len(buf) + if not self._flush_scheduled: + self._flush_scheduled = True + self._conn._loop.call_soon(self._flush) def get_buffer(self, sizehint): if self._recv_buf is None: @@ -338,9 +342,9 @@ def _do_write(self): self._write_buf.seek(0) self._write_buf_size = 0 - def _write_loop(self): + def _flush(self): + self._flush_scheduled = False self._do_write() - return self._conn._loop.call_later(0.01, self._write_loop) def _strerror(err): diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index 703c8e196f..86d6dff2bf 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -117,12 +117,15 @@ def handle_event_entry( uuid, number_of_affected_entries, ) - if event.event_type == EntryEventType.ADDED and added_func: - added_func(event) - elif event.event_type == EntryEventType.REMOVED and removed_func: - removed_func(event) - elif event.event_type == EntryEventType.CLEAR_ALL and clear_all_func: - clear_all_func(event) + if event.event_type == EntryEventType.ADDED: + if added_func: + added_func(event) + elif event.event_type == EntryEventType.REMOVED: + if removed_func: + removed_func(event) + elif event.event_type == EntryEventType.CLEAR_ALL: + if clear_all_func: + clear_all_func(event) return self._register_listener( request, diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index 70fe58d771..78e527112a 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -171,16 +171,21 @@ def handle_event_entry( uuid, number_of_affected_entries, ) - if event.event_type == EntryEventType.ADDED and added_func: - added_func(event) - elif event.event_type == EntryEventType.REMOVED and removed_func: - removed_func(event) - elif event.event_type == EntryEventType.UPDATED and updated_func: - updated_func(event) - elif event.event_type == EntryEventType.EVICTED and evicted_func: - evicted_func(event) - elif event.event_type == EntryEventType.CLEAR_ALL and clear_all_func: - clear_all_func(event) + if event.event_type == EntryEventType.ADDED: + if added_func: + added_func(event) + elif event.event_type == EntryEventType.REMOVED: + if removed_func: + removed_func(event) + elif event.event_type == EntryEventType.UPDATED: + if updated_func: + updated_func(event) + elif event.event_type == EntryEventType.EVICTED: + if evicted_func: + evicted_func(event) + elif event.event_type == EntryEventType.CLEAR_ALL: + if clear_all_func: + clear_all_func(event) return self._register_listener( request, diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 00455063f2..df591d75af 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -543,7 +543,7 @@ def _set_socket_options(self, config): self.socket.setsockopt(level, option_name, value) def _wrap_as_ssl_socket(self, config: Config, hostname: str): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) protocol = config.ssl_protocol diff --git a/hazelcast/serialization/input.py b/hazelcast/serialization/input.py index cd982cd10d..3d873659a0 100644 --- a/hazelcast/serialization/input.py +++ b/hazelcast/serialization/input.py @@ -147,10 +147,10 @@ def read_string(self): length = self.read_int() if length == NULL_ARRAY_LENGTH: return None - result = bytearray(length) - if length > 0: - self.read_into(result, 0, length) - return result.decode("utf-8") + self._check_available(self._pos, length) + result = self._buffer[self._pos : self._pos + length].decode("utf-8") + self._pos += length + return result def read_byte_array(self): length = self.read_int() @@ -163,28 +163,28 @@ def read_byte_array(self): return result def read_i8_array(self) -> typing.List[int]: - return self._read_array_fnc(self.read_byte) + return self._bulk_read(BYTE_SIZE_IN_BYTES, "b") def read_boolean_array(self): - return self._read_array_fnc(self.read_boolean) + return self._bulk_read(BOOLEAN_SIZE_IN_BYTES, "?") def read_char_array(self): return self._read_array_fnc(self.read_char) def read_int_array(self): - return self._read_array_fnc(self.read_int) + return self._bulk_read(INT_SIZE_IN_BYTES, "i") def read_long_array(self): - return self._read_array_fnc(self.read_long) + return self._bulk_read(LONG_SIZE_IN_BYTES, "q") def read_double_array(self): - return self._read_array_fnc(self.read_double) + return self._bulk_read(DOUBLE_SIZE_IN_BYTES, "d") def read_float_array(self): - return self._read_array_fnc(self.read_float) + return self._bulk_read(FLOAT_SIZE_IN_BYTES, "f") def read_short_array(self): - return self._read_array_fnc(self.read_short) + return self._bulk_read(SHORT_SIZE_IN_BYTES, "h") def read_string_array(self): return self._read_array_fnc(self.read_string) @@ -215,7 +215,18 @@ def read_utf(self): def read_utf_array(self): return self.read_string_array() - # HELPERS + def _bulk_read(self, item_size, fmt_char): + length = self.read_int() + if length == NULL_ARRAY_LENGTH: + return None + + nbytes = length * item_size + self._check_available(self._pos, nbytes) + endian = ">" if self._is_big_endian else "<" + result = list(struct.unpack_from(f"{endian}{length}{fmt_char}", self._buffer, self._pos)) + self._pos += nbytes + return result + def _check_available(self, position, size): if position < 0: raise ValueError diff --git a/hazelcast/serialization/output.py b/hazelcast/serialization/output.py index 445b755a6b..79ee30bb07 100644 --- a/hazelcast/serialization/output.py +++ b/hazelcast/serialization/output.py @@ -1,5 +1,3 @@ -import typing - from hazelcast.serialization.api import * from hazelcast.serialization.bits import * @@ -157,28 +155,28 @@ def write_byte_array(self, val): self._pos += _len def write_signed_byte_array(self, val: typing.List[int]) -> None: - self._write_array_fnc(val, self.write_signed_byte) + self._bulk_write(val, BYTE_SIZE_IN_BYTES, "b") def write_boolean_array(self, val): - self._write_array_fnc(val, self.write_boolean) + self._bulk_write(val, BOOLEAN_SIZE_IN_BYTES, "?") def write_char_array(self, val): self._write_array_fnc(val, self.write_char) def write_int_array(self, val): - self._write_array_fnc(val, self.write_int) + self._bulk_write(val, INT_SIZE_IN_BYTES, "i") def write_long_array(self, val): - self._write_array_fnc(val, self.write_long) + self._bulk_write(val, LONG_SIZE_IN_BYTES, "q") def write_double_array(self, val): - self._write_array_fnc(val, self.write_double) + self._bulk_write(val, DOUBLE_SIZE_IN_BYTES, "d") def write_float_array(self, val): - self._write_array_fnc(val, self.write_float) + self._bulk_write(val, FLOAT_SIZE_IN_BYTES, "f") def write_short_array(self, val): - self._write_array_fnc(val, self.write_short) + self._bulk_write(val, SHORT_SIZE_IN_BYTES, "h") def write_string_array(self, val): self._write_array_fnc(val, self.write_string) @@ -204,8 +202,9 @@ def set_position(self, position): self._pos = position def write_zero_bytes(self, count): - for _ in range(0, count): - self._write(0) + self._ensure_available(count) + self._buffer[self._pos : self._pos + count] = bytes(count) + self._pos += count def write_utf(self, val): self.write_string(val) @@ -213,7 +212,16 @@ def write_utf(self, val): def write_utf_array(self, val): self.write_string_array(val) - # HELPERS + def _bulk_write(self, val, item_size, fmt_char): + length = len(val) if val is not None else NULL_ARRAY_LENGTH + self.write_int(length) + if length > 0: + nbytes = length * item_size + self._ensure_available(nbytes) + endian = ">" if self._is_big_endian else "<" + struct.pack_into(f"{endian}{length}{fmt_char}", self._buffer, self._pos, *val) + self._pos += nbytes + def _write_array_fnc(self, val, item_write_fnc): _len = len(val) if val is not None else NULL_ARRAY_LENGTH self.write_int(_len) diff --git a/tests/integration/asyncio/statistics_test.py b/tests/integration/asyncio/statistics_test.py index 01b22abd0f..f9148d85af 100644 --- a/tests/integration/asyncio/statistics_test.py +++ b/tests/integration/asyncio/statistics_test.py @@ -262,5 +262,5 @@ def get_runtime_and_system_metrics(self, client): try: # Compatibility for <4.2.1 clients return s._get_os_and_runtime_stats() - except: + except Exception: return itertools.chain(s._registered_system_gauges, s._registered_process_gauges)