Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 174 additions & 18 deletions cassandra/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# limitations under the License.


from collections import namedtuple
from collections import deque, namedtuple
from heapq import heappush, heappop
from itertools import cycle
from threading import Condition
from threading import Condition, Event, Thread
import sys

from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT
Expand Down Expand Up @@ -193,28 +193,184 @@ class ConcurrentExecutorListResults(_ConcurrentExecutor):

def execute(self, concurrency, fail_fast):
self._exception = None
return super(ConcurrentExecutorListResults, self).execute(concurrency, fail_fast)
self._submit_ready = deque()
self._submit_event = Event()
self._stop_event = Event()
self._exhausted = False
# Submit the initial batch from the calling thread (no contention
# yet -- the submitter thread is not started until afterward).
# Track whether the initial batch consumed all statements.
self._fail_fast = fail_fast
self._results_queue = []
self._current = 0
self._exec_count = 0
with self._condition:
for n in range(concurrency):
if not self._execute_next():
self._exhausted = True
break
return self._results()

def _results(self):
# Always start the submitter thread: it owns ``_current`` accounting
# (incrementing from drained completion signals) so the event-loop
# callback path can stay lock-free in the success case. Even when
# the iterator was fully consumed by the initial batch, the
# submitter still needs to run to record completions.
self._submitter = Thread(target=self._submitter_loop,
daemon=True, name="concurrent-submitter")
self._submitter.start()

try:
with self._condition:
while not self._exhausted or self._current < self._exec_count:
self._condition.wait()
if self._exception and self._fail_fast:
break
finally:
self._stop_event.set()
self._submit_event.set() # wake submitter so it sees the stop
self._submitter.join()
if self._exception and self._fail_fast:
raise self._exception
return [r[1] for r in sorted(self._results_queue)]

def _put_result(self, result, idx, success):
"""Record a completion and signal the submitter thread.

Called from the event-loop callback thread (or from the submitter
thread when execute_async raises synchronously).

Hot path (success, not fail-fast): NO lock acquisition. We rely on
the submitter thread to bump ``_current`` from the drained signal
count under the same lock acquisition that bumps ``_exec_count``.
This removes ~0.5-1us of lock cost from every callback on the
event-loop thread.

Note: ``self._results_queue.append`` and ``self._submit_ready.append``
are safe under the GIL (CPython list/deque appends are atomic).
Under free-threaded builds (PEP 703) the GIL is removed; this
module assumes a GIL build, which is the default for the driver's
supported Python versions.
"""
self._results_queue.append((idx, ExecutionResult(success, result)))
with self._condition:
self._current += 1
if not success and self._fail_fast:
if not success and self._fail_fast:
# Cold path: take the lock to record the exception and wake
# the main thread immediately so it can stop waiting.
with self._condition:
if not self._exception:
self._exception = result
self._condition.notify()
elif not self._execute_next() and self._current == self._exec_count:
self._condition.notify()

def _results(self):
with self._condition:
while self._current < self._exec_count:
self._condition.wait()
if self._exception and self._fail_fast:
raise self._exception
if self._exception and self._fail_fast: # raise the exception even if there was no wait
raise self._exception
return [r[1] for r in sorted(self._results_queue)]
# Signal the submitter thread. It will:
# 1) bump _current under the lock from the drained signal count,
# 2) submit a replacement request,
# 3) notify _results() if all completions have arrived.
self._submit_ready.append(1)
self._submit_event.set()

def _submitter_loop(self):
"""Drain completion signals and submit follow-up requests.

Runs on a dedicated thread so that the libev event-loop thread
only needs to do the lightweight ``deque.append`` + ``Event.set``
in ``_put_result`` rather than the full execute_async cycle
(query-plan, borrow connection, serialise, enqueue).

Owns ``_current`` accounting: each drained completion signal
increments ``_current`` by one under the same lock acquisition
that bumps ``_exec_count`` for the new batch. This keeps the
event-loop callback path lock-free in the success case.
"""
ready = self._submit_ready
ready_event = self._submit_event
stop_event = self._stop_event
enum_stmts = self._enum_statements
session = self.session
profile = self._execution_profile
on_success = self._on_success
on_error = self._on_error
condition = self._condition
while not stop_event.is_set():
ready_event.wait()
ready_event.clear()
# Drain all pending completion signals.
count = 0
while True:
try:
ready.popleft()
count += 1
except IndexError:
break
if count == 0:
continue
if stop_event.is_set():
# Main thread is shutting down (e.g. fail-fast). Do the
# accounting for already-completed requests but skip
# dispatching new ones.
with condition:
self._current += count
if self._exhausted and self._current >= self._exec_count:
condition.notify()
continue
if self._exhausted:
# No more statements to dispatch -- just account for the
# completions we just drained and notify the waiter if
# everything has caught up.
with condition:
self._current += count
if self._current >= self._exec_count:
condition.notify()
continue
# Submit follow-up requests directly (fast path).
# The iterator is only consumed from this thread (the initial
# batch was fully dispatched before this thread started).
#
# Pull statements from the iterator first, then bump _current
# and _exec_count for the entire batch in one lock acquisition,
# then dispatch. This avoids per-request lock overhead while
# ensuring _results() never sees _current >= _exec_count
# prematurely.
batch = []
iterator_done = False
for _ in range(count):
try:
batch.append(next(enum_stmts))
except StopIteration:
iterator_done = True
break
# Single lock acquisition: bump both _current (from the
# drained completion count) and _exec_count (from the new
# batch size) atomically. Setting _exhausted in the same
# critical section ensures the main thread never sees
# _exhausted=True with a stale _exec_count.
with condition:
self._current += count
self._exec_count += len(batch)
if iterator_done:
self._exhausted = True
# Wake the waiter if all completions have caught up.
if self._exhausted and self._current >= self._exec_count:
condition.notify()
# Re-check stop after the lock release: fail-fast may have
# arrived while we were holding the lock; avoid dispatching
# requests we know will be discarded.
if stop_event.is_set():
continue
for idx, (statement, params) in batch:
try:
future = session.execute_async(statement, params,
timeout=None,
execution_profile=profile)
args = (future, idx)
future.add_callbacks(
callback=on_success, callback_args=args,
errback=on_error, errback_args=args)
except Exception as exc:
# Record the failure directly. _put_result handles
# _current accounting and will enqueue another signal
# to _submit_ready -- but that is fine because the
# next drain will attempt another next(enum_stmts).
self._put_result(exc, idx, False)



Expand Down
Loading