diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a5d5b28990..7bc2a9f207 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -839,6 +839,32 @@ async def _reset( if close: self.state = PoolState.CLOSED + + # Publish PoolClearedEvent while holding the lock and before + # notify_all(). This guarantees it is recorded before any waiting + # thread wakes up and emits ConnectionCheckOutFailedEvent, which + # also requires size_cond. Without this ordering, a race on PyPy + # and free-threaded Python causes ConnectionCheckOutFailedEvent to + # arrive before PoolClearedEvent (PYTHON-3519). + if not close and old_state != PoolState.PAUSED: + _listeners = self.opts._event_listeners + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.POOL_CLEARED, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + serviceId=service_id, + ) + if self.enabled_for_cmap: + assert _listeners is not None + _listeners.publish_pool_cleared( + self.address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) + # Clear the wait queue self._max_connecting_cond.notify_all() self.size_cond.notify_all() @@ -872,23 +898,6 @@ async def _reset( assert listeners is not None listeners.publish_pool_closed(self.address) else: - if old_state != PoolState.PAUSED: - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_CLEARED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - serviceId=service_id, - ) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, - ) if not _IS_SYNC: await asyncio.gather( *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 25f2d08fe7..970989c594 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -837,6 +837,32 @@ def _reset( if close: self.state = PoolState.CLOSED + + # Publish PoolClearedEvent while holding the lock and before + # notify_all(). This guarantees it is recorded before any waiting + # thread wakes up and emits ConnectionCheckOutFailedEvent, which + # also requires size_cond. Without this ordering, a race on PyPy + # and free-threaded Python causes ConnectionCheckOutFailedEvent to + # arrive before PoolClearedEvent (PYTHON-3519). + if not close and old_state != PoolState.PAUSED: + _listeners = self.opts._event_listeners + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.POOL_CLEARED, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + serviceId=service_id, + ) + if self.enabled_for_cmap: + assert _listeners is not None + _listeners.publish_pool_cleared( + self.address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) + # Clear the wait queue self._max_connecting_cond.notify_all() self.size_cond.notify_all() @@ -870,23 +896,6 @@ def _reset( assert listeners is not None listeners.publish_pool_closed(self.address) else: - if old_state != PoolState.PAUSED: - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_CLEARED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - serviceId=service_id, - ) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, - ) if not _IS_SYNC: asyncio.gather( *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index 361db4ca92..2f0bb1f7d5 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -89,9 +89,6 @@ class TestPoolPausedError(AsyncIntegrationTest): @async_client_context.require_failCommand_blockConnection @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) async def test_pool_paused_error_is_retryable(self): - if "PyPy" in sys.version: - # Tracked in PYTHON-3519 - self.skipTest("Test is flaky on PyPy") cmap_listener = CMAPListener() cmd_listener = OvertCommandListener() client = await self.async_rs_or_single_client( diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index 2da87f8b26..4545d7ff7e 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -89,9 +89,6 @@ class TestPoolPausedError(IntegrationTest): @client_context.require_failCommand_blockConnection @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) def test_pool_paused_error_is_retryable(self): - if "PyPy" in sys.version: - # Tracked in PYTHON-3519 - self.skipTest("Test is flaky on PyPy") cmap_listener = CMAPListener() cmd_listener = OvertCommandListener() client = self.rs_or_single_client(