diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 065686f43a..5936b5f726 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -37,7 +37,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared from pymongo.asynchronous.client_session import _validate_session_write_concern -from pymongo.asynchronous.helpers import _backoff, _handle_reauth +from pymongo.asynchronous.helpers import _handle_reauth from pymongo.asynchronous.network import command from pymongo.common import ( MAX_BSON_SIZE, @@ -52,6 +52,7 @@ DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, @@ -789,8 +790,8 @@ def __init__( # Also used for: clearing the wait queue self._max_connecting_cond = _async_create_condition(self.lock) self._pending = 0 + self._max_connecting = self.opts.max_connecting self._client_id = client_id - self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -846,8 +847,6 @@ async def _reset( async with self.size_cond: if self.closed: return - # Clear the backoff state. - self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -930,11 +929,6 @@ async def _reset( for conn in sockets: await conn.close_conn(ConnectionClosedReason.STALE) - @property - def max_connecting(self) -> int: - """The current max connecting limit for the pool.""" - return 1 if self._backoff else self.opts.max_connecting - async def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -1001,7 +995,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: async with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self.max_connecting: + if self._pending >= self._max_connecting: return self._pending += 1 incremented = True @@ -1029,29 +1023,14 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() - def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + def _handle_connection_error(self, error: BaseException) -> None: # Handle system overload condition for non-sdam pools. - # Look for an AutoReconnect error raised from a ConnectionResetError with - # errno == errno.ECONNRESET or raised from an OSError that we've created due to - # a closed connection. - # If found, set backoff and add error labels. - if self.is_sdam or type(error) != AutoReconnect: + # Look for errors of type AutoReconnect and add error labels if appropriate. + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return - self._backoff += 1 + assert isinstance(error, AutoReconnect) # Appease type checker. error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") - # Log the pool backoff message. - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_BACKOFF, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), - error=ConnectionClosedReason.POOL_BACKOFF, - ) async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: """Connect to Mongo and return a new AsyncConnection. @@ -1082,17 +1061,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A driverConnectionId=conn_id, ) - # Apply backoff if applicable. - if self._backoff: - await asyncio.sleep(_backoff(self._backoff)) - - # Pass a context to determine if we successfully create a configured socket. - context = dict(has_created_socket=False) - try: - networking_interface = await _configured_protocol_interface( - self.address, self.opts, context=context - ) + networking_interface = await _configured_protocol_interface(self.address, self.opts) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: async with self.lock: @@ -1113,8 +1083,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) - if context["has_created_socket"]: - self._handle_connection_error(error, "handshake", conn_id) + self._handle_connection_error(error) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1126,9 +1095,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + completed_hello = False try: if not self.is_sdam: await conn.hello() + completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) @@ -1138,15 +1109,14 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A except BaseException as e: async with self.lock: self.active_contexts.discard(conn.cancel_context) - self._handle_connection_error(e, "hello", conn_id) + if not completed_hello: + self._handle_connection_error(e) await conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: await handler.client._topology.receive_cluster_time(conn._cluster_time) - # Clear the backoff state. - self._backoff = 0 return conn @contextlib.asynccontextmanager @@ -1323,12 +1293,12 @@ async def _get_conn( # to be checked back into the pool. async with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self.max_connecting): + while not (self.conns or self._pending < self._max_connecting): timeout = deadline - time.monotonic() if deadline else None if not await _async_cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self.max_connecting: + if self.conns or self._pending < self._max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) @@ -1469,7 +1439,7 @@ async def _perished(self, conn: AsyncConnection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, or we are in backoff mode, to keep performance reasonable - + pool to keep performance reasonable - we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() @@ -1482,8 +1452,6 @@ async def _perished(self, conn: AsyncConnection) -> bool: return True check_interval_seconds = self._check_interval_seconds - if self._backoff: - check_interval_seconds = 0 if check_interval_seconds is not None and ( check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 1e91bbe79b..a2b354f7cc 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -890,8 +890,8 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None # Clear the pool. await server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( - "SystemOverloadedError" + if isinstance(error, WaitQueueTimeoutError) or ( + error.has_error_label("SystemOverloadedError") ): return # "Client MUST replace the server's description with type Unknown diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 9bf46cbc3d..605b8dde9b 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -256,7 +256,6 @@ def __init__(self, timeout: Optional[float] = None): self._timeout = timeout self._closed = asyncio.get_running_loop().create_future() self._connection_lost = False - self._closing_exception = None def settimeout(self, timeout: float | None) -> None: self._timeout = timeout @@ -270,11 +269,9 @@ def close(self, exc: Optional[Exception] = None) -> None: self.transport.abort() self._resolve_pending(exc) self._connection_lost = True - self._closing_exception = exc # type:ignore[assignment] def connection_lost(self, exc: Optional[Exception] = None) -> None: self._resolve_pending(exc) - self._closing_exception = exc # type:ignore[assignment] if not self._closed.done(): self._closed.set_result(None) @@ -338,11 +335,8 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[ if self._done_messages: message = await self._done_messages.popleft() else: - if self._closed.done(): - if self._closing_exception: - raise self._closing_exception - else: - raise OSError("connection closed") + if self.transport and self.transport.is_closing(): + raise OSError("connection is already closed") read_waiter = asyncio.get_running_loop().create_future() self._pending_messages.append(read_waiter) try: @@ -480,7 +474,6 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None: else: msg.set_exception(exc) self._done_messages.append(msg) - self._pending_messages.clear() class PyMongoKMSProtocol(PyMongoBaseProtocol): diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index c555b125df..0536dc3835 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -250,7 +250,6 @@ async def _configured_protocol_interface( address: _Address, options: PoolOptions, protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol, - context: dict[str, bool] | None = None, ) -> AsyncNetworkingInterface: """Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface. @@ -262,10 +261,6 @@ async def _configured_protocol_interface( ssl_context = options._ssl_context timeout = options.socket_timeout - # Signal that we have created the socket successfully. - if context: - context["has_created_socket"] = True - if ssl_context is None: return AsyncNetworkingInterface( await asyncio.get_running_loop().create_connection( @@ -379,7 +374,7 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket def _configured_socket_interface( - address: _Address, options: PoolOptions, *args: Any, context: dict[str, bool] | None = None + address: _Address, options: PoolOptions, *args: Any ) -> NetworkingInterface: """Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket. @@ -390,10 +385,6 @@ def _configured_socket_interface( sock = _create_connection(address, options) ssl_context = options._ssl_context - # Signal that we have created the socket successfully. - if context: - context["has_created_socket"] = True - if ssl_context is None: sock.settimeout(options.socket_timeout) return NetworkingInterface(sock) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index d0c517f186..88b09c9b46 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -49,6 +49,7 @@ DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, @@ -84,7 +85,7 @@ from pymongo.server_type import SERVER_TYPE from pymongo.socket_checker import SocketChecker from pymongo.synchronous.client_session import _validate_session_write_concern -from pymongo.synchronous.helpers import _backoff, _handle_reauth +from pymongo.synchronous.helpers import _handle_reauth from pymongo.synchronous.network import command if TYPE_CHECKING: @@ -787,8 +788,8 @@ def __init__( # Also used for: clearing the wait queue self._max_connecting_cond = _create_condition(self.lock) self._pending = 0 + self._max_connecting = self.opts.max_connecting self._client_id = client_id - self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -844,8 +845,6 @@ def _reset( with self.size_cond: if self.closed: return - # Clear the backoff state. - self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -928,11 +927,6 @@ def _reset( for conn in sockets: conn.close_conn(ConnectionClosedReason.STALE) - @property - def max_connecting(self) -> int: - """The current max connecting limit for the pool.""" - return 1 if self._backoff else self.opts.max_connecting - def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -997,7 +991,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None: with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self.max_connecting: + if self._pending >= self._max_connecting: return self._pending += 1 incremented = True @@ -1025,29 +1019,14 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() - def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + def _handle_connection_error(self, error: BaseException) -> None: # Handle system overload condition for non-sdam pools. - # Look for an AutoReconnect error raised from a ConnectionResetError with - # errno == errno.ECONNRESET or raised from an OSError that we've created due to - # a closed connection. - # If found, set backoff and add error labels. - if self.is_sdam or type(error) != AutoReconnect: + # Look for errors of type AutoReconnect and add error labels if appropriate. + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return - self._backoff += 1 + assert isinstance(error, AutoReconnect) # Appease type checker. error._add_error_label("SystemOverloadedError") error._add_error_label("RetryableError") - # Log the pool backoff message. - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_BACKOFF, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), - error=ConnectionClosedReason.POOL_BACKOFF, - ) def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: """Connect to Mongo and return a new Connection. @@ -1078,17 +1057,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect driverConnectionId=conn_id, ) - # Apply backoff if applicable. - if self._backoff: - time.sleep(_backoff(self._backoff)) - - # Pass a context to determine if we successfully create a configured socket. - context = dict(has_created_socket=False) - try: - networking_interface = _configured_socket_interface( - self.address, self.opts, context=context - ) + networking_interface = _configured_socket_interface(self.address, self.opts) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: with self.lock: @@ -1109,8 +1079,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) - if context["has_created_socket"]: - self._handle_connection_error(error, "handshake", conn_id) + self._handle_connection_error(error) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1122,9 +1091,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + completed_hello = False try: if not self.is_sdam: conn.hello() + completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) @@ -1134,15 +1105,14 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect except BaseException as e: with self.lock: self.active_contexts.discard(conn.cancel_context) - self._handle_connection_error(e, "hello", conn_id) + if not completed_hello: + self._handle_connection_error(e) conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: handler.client._topology.receive_cluster_time(conn._cluster_time) - # Clear the backoff state. - self._backoff = 0 return conn @contextlib.contextmanager @@ -1319,12 +1289,12 @@ def _get_conn( # to be checked back into the pool. with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self.max_connecting): + while not (self.conns or self._pending < self._max_connecting): timeout = deadline - time.monotonic() if deadline else None if not _cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self.max_connecting: + if self.conns or self._pending < self._max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) @@ -1465,7 +1435,7 @@ def _perished(self, conn: Connection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, or we are in backoff mode, to keep performance reasonable - + pool to keep performance reasonable - we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() @@ -1478,8 +1448,6 @@ def _perished(self, conn: Connection) -> bool: return True check_interval_seconds = self._check_interval_seconds - if self._backoff: - check_interval_seconds = 0 if check_interval_seconds is not None and ( check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 0f6592dfc0..e967c2089f 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -888,8 +888,8 @@ def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None: # Clear the pool. server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( - "SystemOverloadedError" + if isinstance(error, WaitQueueTimeoutError) or ( + error.has_error_label("SystemOverloadedError") ): return # "Client MUST replace the server's description with type Unknown diff --git a/test/asynchronous/test_discovery_and_monitoring.py b/test/asynchronous/test_discovery_and_monitoring.py index 2798afe7df..53c63ee570 100644 --- a/test/asynchronous/test_discovery_and_monitoring.py +++ b/test/asynchronous/test_discovery_and_monitoring.py @@ -25,8 +25,10 @@ from pathlib import Path from test.asynchronous.helpers import ConcurrentRunner from test.asynchronous.utils import flaky +from test.utils_shared import delay from pymongo.asynchronous.pool import AsyncConnection +from pymongo.errors import ConnectionFailure from pymongo.operations import _Op from pymongo.server_selectors import writable_server_selector @@ -70,7 +72,12 @@ ) from pymongo.hello import Hello, HelloCompat from pymongo.helpers_shared import _check_command_response, _check_write_command_response -from pymongo.monitoring import ServerHeartbeatFailedEvent, ServerHeartbeatStartedEvent +from pymongo.monitoring import ( + ConnectionCheckOutFailedEvent, + PoolClearedEvent, + ServerHeartbeatFailedEvent, + ServerHeartbeatStartedEvent, +) from pymongo.server_description import SERVER_TYPE, ServerDescription from pymongo.topology_description import TOPOLOGY_TYPE @@ -446,6 +453,71 @@ async def mock_close(self, reason): AsyncConnection.close_conn = original_close +class TestPoolBackpressure(AsyncIntegrationTest): + @async_client_context.require_version_min(7, 0, 0) + async def test_connection_pool_is_not_cleared(self): + listener = CMAPListener() + + # Create a client that listens to CMAP events, with maxConnecting=100. + client = await self.async_rs_or_single_client(maxConnecting=100, event_listeners=[listener]) + + # Use an admin client for test setup and teardown to enable and disable the ingress rate limiter. + admin_client = self.client + await admin_client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=True + ) + await admin_client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRatePerSec=20 + ) + await admin_client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentBurstCapacitySecs=1 + ) + await admin_client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentMaxQueueDepth=1 + ) + + # Disable the ingress rate limiter on teardown. + async def teardown(): + await admin_client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=False + ) + + self.addAsyncCleanup(teardown) + + # Make sure the collection has at least one document. + await client.test.test.delete_many({}) + await client.test.test.insert_one({}) + + # Run a slow operation to tie up the connection. + async def target(): + try: + await client.test.test.find_one({"$where": delay(0.1)}) + except ConnectionFailure: + pass + + # Warm the pool with 10 tasks so there are existing connections. + tasks = [] + for _ in range(10): + tasks.append(ConcurrentRunner(target=target)) + for t in tasks: + await t.start() + for t in tasks: + await t.join() + + # Run 100 parallel operations that contend for connections. + tasks = [] + for _ in range(100): + tasks.append(ConcurrentRunner(target=target)) + for t in tasks: + await t.start() + for t in tasks: + await t.join() + + # Verify there were at least 10 connection checkout failed event but no pool cleared events. + self.assertGreater(len(listener.events_by_type(ConnectionCheckOutFailedEvent)), 10) + self.assertEqual(len(listener.events_by_type(PoolClearedEvent)), 0) + + class TestServerMonitoringMode(AsyncIntegrationTest): @async_client_context.require_no_load_balancer async def asyncSetUp(self): diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 6cbdf7a65c..2f0d5fc962 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -514,35 +514,10 @@ async def test_connection_timeout_message(self): str(error.exception), ) - async def test_pool_check_backoff(self): - # Test that Pool recovers from two connection failures in a row. - # This exercises code at the end of Pool._check(). - cx_pool = await self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) - self.addAsyncCleanup(cx_pool.close) - - async with cx_pool.checkout() as conn: - # Simulate a closed socket without telling the Connection it's - # closed. - await conn.conn.close() - - # Enable backoff. - cx_pool._backoff = 1 - - # Swap pool's address with a bad one. - address, cx_pool.address = cx_pool.address, ("foo.com", 1234) - with self.assertRaises(AutoReconnect): - async with cx_pool.checkout(): - pass - - # Back to normal, semaphore was correctly released. - cx_pool.address = address - async with cx_pool.checkout(): - pass - @async_client_context.require_failCommand_appName - async def test_pool_backoff_preserves_existing_connections(self): + async def test_pool_backpressure_preserves_existing_connections(self): client = await self.async_rs_or_single_client() - coll = self.db.t + coll = client.pymongo_test.t pool = await async_get_pool(client) await coll.insert_many([{"x": 1} for _ in range(10)]) t = SocketGetter(self.c, pool) @@ -564,9 +539,6 @@ async def test_pool_backoff_preserves_existing_connections(self): async with self.fail_point(mock_connection_fail): await coll.find_one({}) - # Make sure the pool is out of backoff state. - assert pool._backoff == 0 - # Make sure the existing socket was not affected. assert not t.sock.conn_closed() @@ -575,16 +547,6 @@ async def test_pool_backoff_preserves_existing_connections(self): await t.join() await pool.close() - async def test_pool_backoff_limits_maxConnecting(self): - client = await self.async_rs_or_single_client(maxConnecting=10) - pool = await async_get_pool(client) - assert pool.max_connecting == 10 - pool._backoff = 1 - assert pool.max_connecting == 1 - pool._backoff = 0 - assert pool.max_connecting == 10 - await client.close() - class TestPoolMaxSize(_TestPoolingBase): async def test_max_pool_size(self): diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 60190c7dc0..5799e834d7 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -331,9 +331,7 @@ "uriOptions": { "retryReads": false, "appname": "clientAppName", - "heartbeatFrequencyMS": 10000, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "heartbeatFrequencyMS": 10000 }, "observeLogMessages": { "connection": "debug" @@ -357,9 +355,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "clientAppName" } } diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 8ec958780d..4334ce2571 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -15,17 +15,13 @@ "isMaster", "hello" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "errorCode": 91, "appName": "poolCreateMinSizeErrorTest" } }, "poolOptions": { "minPoolSize": 1, "backgroundThreadIntervalMS": 50, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appName": "poolCreateMinSizeErrorTest" }, "operations": [ diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 656b291366..84763af32e 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -53,9 +53,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "authNetworkErrorTest" } } @@ -77,8 +75,6 @@ ], "uriOptions": { "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appname": "authNetworkErrorTest" } } diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json new file mode 100644 index 0000000000..a806df571a --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -0,0 +1,171 @@ +{ + "description": "backpressure-network-error-fail", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backpressure-network-error-fail", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Apply backpressure on network connection errors during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "serverDescriptionChangedEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "serverMonitoringMode": "poll", + "directConnection": true, + "appname": "backpressureNetworkErrorFailTest" + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backpressure-network-error-fail" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "appName": "backpressureNetworkErrorFailTest", + "closeConnection": true + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true, + "errorLabelsContain": [ + "SystemOverloadedError", + "RetryableError" + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "poolReadyEvent": {} + } + ] + }, + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "serverHeartbeatSucceededEvent": {} + }, + { + "serverDescriptionChangedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json new file mode 100644 index 0000000000..0222493f7f --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -0,0 +1,174 @@ +{ + "description": "backpressure-network-timeout-error", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backpressure-network-timeout-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Apply backpressure on network timeout error during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "serverDescriptionChangedEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "appname": "backpressureNetworkTimeoutErrorTest", + "serverMonitoringMode": "poll", + "directConnection": true, + "connectTimeoutMS": 250, + "socketTimeoutMS": 250 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backpressure-network-timeout-error" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "blockConnection": true, + "blockTimeMS": 500, + "appName": "backpressureNetworkTimeoutErrorTest" + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true, + "errorLabelsContain": [ + "SystemOverloadedError", + "RetryableError" + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "poolReadyEvent": {} + } + ] + }, + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": false, + "events": [ + { + "serverHeartbeatSucceededEvent": {} + }, + { + "serverDescriptionChangedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index b9842b8017..8974c4779c 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -32,9 +32,9 @@ "useMultipleMongoses": false, "uriOptions": { "appname": "lbSDAMErrorTestClient", - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, - "retryWrites": false + "retryWrites": false, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500 }, "observeEvents": [ "connectionCreatedEvent", @@ -66,9 +66,7 @@ "id": "multiClient", "useMultipleMongoses": true, "uriOptions": { - "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "retryWrites": false }, "observeEvents": [ "connectionCreatedEvent", @@ -350,8 +348,7 @@ "failCommands": [ "saslContinue" ], - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "lbSDAMErrorTestClient" } } @@ -412,8 +409,7 @@ "failCommands": [ "getMore" ], - "closeConnection": true, - "appName": "lbSDAMErrorTestClient" + "closeConnection": true } } } diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 4f8ee30d16..d5db199e9a 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -25,7 +25,9 @@ from pathlib import Path from test.helpers import ConcurrentRunner from test.utils import flaky +from test.utils_shared import delay +from pymongo.errors import ConnectionFailure from pymongo.operations import _Op from pymongo.server_selectors import writable_server_selector from pymongo.synchronous.pool import Connection @@ -67,7 +69,12 @@ ) from pymongo.hello import Hello, HelloCompat from pymongo.helpers_shared import _check_command_response, _check_write_command_response -from pymongo.monitoring import ServerHeartbeatFailedEvent, ServerHeartbeatStartedEvent +from pymongo.monitoring import ( + ConnectionCheckOutFailedEvent, + PoolClearedEvent, + ServerHeartbeatFailedEvent, + ServerHeartbeatStartedEvent, +) from pymongo.server_description import SERVER_TYPE, ServerDescription from pymongo.synchronous.settings import TopologySettings from pymongo.synchronous.topology import Topology, _ErrorContext @@ -444,6 +451,67 @@ def mock_close(self, reason): Connection.close_conn = original_close +class TestPoolBackpressure(IntegrationTest): + @client_context.require_version_min(7, 0, 0) + def test_connection_pool_is_not_cleared(self): + listener = CMAPListener() + + # Create a client that listens to CMAP events, with maxConnecting=100. + client = self.rs_or_single_client(maxConnecting=100, event_listeners=[listener]) + + # Use an admin client for test setup and teardown to enable and disable the ingress rate limiter. + admin_client = self.client + admin_client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=True + ) + admin_client.admin.command("setParameter", 1, ingressConnectionEstablishmentRatePerSec=20) + admin_client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentBurstCapacitySecs=1 + ) + admin_client.admin.command("setParameter", 1, ingressConnectionEstablishmentMaxQueueDepth=1) + + # Disable the ingress rate limiter on teardown. + def teardown(): + admin_client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=False + ) + + self.addCleanup(teardown) + + # Make sure the collection has at least one document. + client.test.test.delete_many({}) + client.test.test.insert_one({}) + + # Run a slow operation to tie up the connection. + def target(): + try: + client.test.test.find_one({"$where": delay(0.1)}) + except ConnectionFailure: + pass + + # Warm the pool with 10 tasks so there are existing connections. + tasks = [] + for _ in range(10): + tasks.append(ConcurrentRunner(target=target)) + for t in tasks: + t.start() + for t in tasks: + t.join() + + # Run 100 parallel operations that contend for connections. + tasks = [] + for _ in range(100): + tasks.append(ConcurrentRunner(target=target)) + for t in tasks: + t.start() + for t in tasks: + t.join() + + # Verify there were at least 10 connection checkout failed event but no pool cleared events. + self.assertGreater(len(listener.events_by_type(ConnectionCheckOutFailedEvent)), 10) + self.assertEqual(len(listener.events_by_type(PoolClearedEvent)), 0) + + class TestServerMonitoringMode(IntegrationTest): @client_context.require_no_load_balancer def setUp(self): diff --git a/test/test_pooling.py b/test/test_pooling.py index f3bfcf4ba2..0f7ef144f6 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -512,35 +512,10 @@ def test_connection_timeout_message(self): str(error.exception), ) - def test_pool_check_backoff(self): - # Test that Pool recovers from two connection failures in a row. - # This exercises code at the end of Pool._check(). - cx_pool = self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) - self.addCleanup(cx_pool.close) - - with cx_pool.checkout() as conn: - # Simulate a closed socket without telling the Connection it's - # closed. - conn.conn.close() - - # Enable backoff. - cx_pool._backoff = 1 - - # Swap pool's address with a bad one. - address, cx_pool.address = cx_pool.address, ("foo.com", 1234) - with self.assertRaises(AutoReconnect): - with cx_pool.checkout(): - pass - - # Back to normal, semaphore was correctly released. - cx_pool.address = address - with cx_pool.checkout(): - pass - @client_context.require_failCommand_appName - def test_pool_backoff_preserves_existing_connections(self): + def test_pool_backpressure_preserves_existing_connections(self): client = self.rs_or_single_client() - coll = self.db.t + coll = client.pymongo_test.t pool = get_pool(client) coll.insert_many([{"x": 1} for _ in range(10)]) t = SocketGetter(self.c, pool) @@ -562,9 +537,6 @@ def test_pool_backoff_preserves_existing_connections(self): with self.fail_point(mock_connection_fail): coll.find_one({}) - # Make sure the pool is out of backoff state. - assert pool._backoff == 0 - # Make sure the existing socket was not affected. assert not t.sock.conn_closed() @@ -573,16 +545,6 @@ def test_pool_backoff_preserves_existing_connections(self): t.join() pool.close() - def test_pool_backoff_limits_maxConnecting(self): - client = self.rs_or_single_client(maxConnecting=10) - pool = get_pool(client) - assert pool.max_connecting == 10 - pool._backoff = 1 - assert pool.max_connecting == 1 - pool._backoff = 0 - assert pool.max_connecting == 10 - client.close() - class TestPoolMaxSize(_TestPoolingBase): def test_max_pool_size(self):