Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,32 @@ async def _reset(

if close:
self.state = PoolState.CLOSED

# Publish PoolClearedEvent while holding the lock and before
# notify_all(). This guarantees it is recorded before any waiting
# thread wakes up and emits ConnectionCheckOutFailedEvent, which
# also requires size_cond. Without this ordering, a race on PyPy
# and free-threaded Python causes ConnectionCheckOutFailedEvent to
# arrive before PoolClearedEvent (PYTHON-3519).
if not close and old_state != PoolState.PAUSED:
_listeners = self.opts._event_listeners
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_CLEARED,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
serviceId=service_id,
)
if self.enabled_for_cmap:
assert _listeners is not None
_listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)

# Clear the wait queue
self._max_connecting_cond.notify_all()
self.size_cond.notify_all()
Expand Down Expand Up @@ -872,23 +898,6 @@ async def _reset(
assert listeners is not None
listeners.publish_pool_closed(self.address)
else:
if old_state != PoolState.PAUSED:
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_CLEARED,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
serviceId=service_id,
)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if not _IS_SYNC:
await asyncio.gather(
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value]
Expand Down
43 changes: 26 additions & 17 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,32 @@ def _reset(

if close:
self.state = PoolState.CLOSED

# Publish PoolClearedEvent while holding the lock and before
# notify_all(). This guarantees it is recorded before any waiting
# thread wakes up and emits ConnectionCheckOutFailedEvent, which
# also requires size_cond. Without this ordering, a race on PyPy
# and free-threaded Python causes ConnectionCheckOutFailedEvent to
# arrive before PoolClearedEvent (PYTHON-3519).
if not close and old_state != PoolState.PAUSED:
_listeners = self.opts._event_listeners
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_CLEARED,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
serviceId=service_id,
)
if self.enabled_for_cmap:
assert _listeners is not None
_listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)

# Clear the wait queue
self._max_connecting_cond.notify_all()
self.size_cond.notify_all()
Expand Down Expand Up @@ -870,23 +896,6 @@ def _reset(
assert listeners is not None
listeners.publish_pool_closed(self.address)
else:
if old_state != PoolState.PAUSED:
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_CLEARED,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
serviceId=service_id,
)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if not _IS_SYNC:
asyncio.gather(
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value]
Expand Down
3 changes: 0 additions & 3 deletions test/asynchronous/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ class TestPoolPausedError(AsyncIntegrationTest):
@async_client_context.require_failCommand_blockConnection
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
async def test_pool_paused_error_is_retryable(self):
if "PyPy" in sys.version:
# Tracked in PYTHON-3519
self.skipTest("Test is flaky on PyPy")
cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener()
client = await self.async_rs_or_single_client(
Expand Down
3 changes: 0 additions & 3 deletions test/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ class TestPoolPausedError(IntegrationTest):
@client_context.require_failCommand_blockConnection
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
def test_pool_paused_error_is_retryable(self):
if "PyPy" in sys.version:
# Tracked in PYTHON-3519
self.skipTest("Test is flaky on PyPy")
cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener()
client = self.rs_or_single_client(
Expand Down
Loading