From 5a0dac762d543440b553157e7636d2d5a9374c03 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Thu, 23 Apr 2026 09:08:13 +0300 Subject: [PATCH] perf: move request submission off event loop thread in execute_concurrent ConcurrentExecutorListResults now uses a dedicated submitter thread instead of calling _execute_next inline from the event loop callback. This decouples I/O completion processing from new request serialization and enqueuing, yielding ~6-9% higher write throughput. The callback signals a threading.Event; the submitter thread drains a deque and calls session.execute_async in batches. This avoids blocking the libev event loop thread with request preparation work (query plan, serialization, tablet lookup) that takes ~27us per request. The event-loop callback path is lock-free: it appends to a deque and sets an Event, with no Condition/Lock acquisition in the hot path. --- cassandra/concurrent.py | 192 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 174 insertions(+), 18 deletions(-) diff --git a/cassandra/concurrent.py b/cassandra/concurrent.py index b96d0b12d4..cb6dbe05a6 100644 --- a/cassandra/concurrent.py +++ b/cassandra/concurrent.py @@ -13,10 +13,10 @@ # limitations under the License. -from collections import namedtuple +from collections import deque, namedtuple from heapq import heappush, heappop from itertools import cycle -from threading import Condition +from threading import Condition, Event, Thread import sys from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT @@ -193,28 +193,184 @@ class ConcurrentExecutorListResults(_ConcurrentExecutor): def execute(self, concurrency, fail_fast): self._exception = None - return super(ConcurrentExecutorListResults, self).execute(concurrency, fail_fast) + self._submit_ready = deque() + self._submit_event = Event() + self._stop_event = Event() + self._exhausted = False + # Submit the initial batch from the calling thread (no contention + # yet -- the submitter thread is not started until afterward). + # Track whether the initial batch consumed all statements. + self._fail_fast = fail_fast + self._results_queue = [] + self._current = 0 + self._exec_count = 0 + with self._condition: + for n in range(concurrency): + if not self._execute_next(): + self._exhausted = True + break + return self._results() + + def _results(self): + # Always start the submitter thread: it owns ``_current`` accounting + # (incrementing from drained completion signals) so the event-loop + # callback path can stay lock-free in the success case. Even when + # the iterator was fully consumed by the initial batch, the + # submitter still needs to run to record completions. + self._submitter = Thread(target=self._submitter_loop, + daemon=True, name="concurrent-submitter") + self._submitter.start() + + try: + with self._condition: + while not self._exhausted or self._current < self._exec_count: + self._condition.wait() + if self._exception and self._fail_fast: + break + finally: + self._stop_event.set() + self._submit_event.set() # wake submitter so it sees the stop + self._submitter.join() + if self._exception and self._fail_fast: + raise self._exception + return [r[1] for r in sorted(self._results_queue)] def _put_result(self, result, idx, success): + """Record a completion and signal the submitter thread. + + Called from the event-loop callback thread (or from the submitter + thread when execute_async raises synchronously). + + Hot path (success, not fail-fast): NO lock acquisition. We rely on + the submitter thread to bump ``_current`` from the drained signal + count under the same lock acquisition that bumps ``_exec_count``. + This removes ~0.5-1us of lock cost from every callback on the + event-loop thread. + + Note: ``self._results_queue.append`` and ``self._submit_ready.append`` + are safe under the GIL (CPython list/deque appends are atomic). + Under free-threaded builds (PEP 703) the GIL is removed; this + module assumes a GIL build, which is the default for the driver's + supported Python versions. + """ self._results_queue.append((idx, ExecutionResult(success, result))) - with self._condition: - self._current += 1 - if not success and self._fail_fast: + if not success and self._fail_fast: + # Cold path: take the lock to record the exception and wake + # the main thread immediately so it can stop waiting. + with self._condition: if not self._exception: self._exception = result self._condition.notify() - elif not self._execute_next() and self._current == self._exec_count: - self._condition.notify() - - def _results(self): - with self._condition: - while self._current < self._exec_count: - self._condition.wait() - if self._exception and self._fail_fast: - raise self._exception - if self._exception and self._fail_fast: # raise the exception even if there was no wait - raise self._exception - return [r[1] for r in sorted(self._results_queue)] + # Signal the submitter thread. It will: + # 1) bump _current under the lock from the drained signal count, + # 2) submit a replacement request, + # 3) notify _results() if all completions have arrived. + self._submit_ready.append(1) + self._submit_event.set() + + def _submitter_loop(self): + """Drain completion signals and submit follow-up requests. + + Runs on a dedicated thread so that the libev event-loop thread + only needs to do the lightweight ``deque.append`` + ``Event.set`` + in ``_put_result`` rather than the full execute_async cycle + (query-plan, borrow connection, serialise, enqueue). + + Owns ``_current`` accounting: each drained completion signal + increments ``_current`` by one under the same lock acquisition + that bumps ``_exec_count`` for the new batch. This keeps the + event-loop callback path lock-free in the success case. + """ + ready = self._submit_ready + ready_event = self._submit_event + stop_event = self._stop_event + enum_stmts = self._enum_statements + session = self.session + profile = self._execution_profile + on_success = self._on_success + on_error = self._on_error + condition = self._condition + while not stop_event.is_set(): + ready_event.wait() + ready_event.clear() + # Drain all pending completion signals. + count = 0 + while True: + try: + ready.popleft() + count += 1 + except IndexError: + break + if count == 0: + continue + if stop_event.is_set(): + # Main thread is shutting down (e.g. fail-fast). Do the + # accounting for already-completed requests but skip + # dispatching new ones. + with condition: + self._current += count + if self._exhausted and self._current >= self._exec_count: + condition.notify() + continue + if self._exhausted: + # No more statements to dispatch -- just account for the + # completions we just drained and notify the waiter if + # everything has caught up. + with condition: + self._current += count + if self._current >= self._exec_count: + condition.notify() + continue + # Submit follow-up requests directly (fast path). + # The iterator is only consumed from this thread (the initial + # batch was fully dispatched before this thread started). + # + # Pull statements from the iterator first, then bump _current + # and _exec_count for the entire batch in one lock acquisition, + # then dispatch. This avoids per-request lock overhead while + # ensuring _results() never sees _current >= _exec_count + # prematurely. + batch = [] + iterator_done = False + for _ in range(count): + try: + batch.append(next(enum_stmts)) + except StopIteration: + iterator_done = True + break + # Single lock acquisition: bump both _current (from the + # drained completion count) and _exec_count (from the new + # batch size) atomically. Setting _exhausted in the same + # critical section ensures the main thread never sees + # _exhausted=True with a stale _exec_count. + with condition: + self._current += count + self._exec_count += len(batch) + if iterator_done: + self._exhausted = True + # Wake the waiter if all completions have caught up. + if self._exhausted and self._current >= self._exec_count: + condition.notify() + # Re-check stop after the lock release: fail-fast may have + # arrived while we were holding the lock; avoid dispatching + # requests we know will be discarded. + if stop_event.is_set(): + continue + for idx, (statement, params) in batch: + try: + future = session.execute_async(statement, params, + timeout=None, + execution_profile=profile) + args = (future, idx) + future.add_callbacks( + callback=on_success, callback_args=args, + errback=on_error, errback_args=args) + except Exception as exc: + # Record the failure directly. _put_result handles + # _current accounting and will enqueue another signal + # to _submit_ready -- but that is fine because the + # next drain will attempt another next(enum_stmts). + self._put_result(exc, idx, False)