Skip to content

Conversation

@vinniefalco
Copy link
Member

@vinniefalco vinniefalco commented Jan 20, 2026

Summary by CodeRabbit

  • New Features

    • Added a non-blocking stop() to signal worker threads to stop; threads finish their current task before exiting.
  • Improvements

    • Lazy-start of worker threads on first use and sensible default sizing when none specified.
    • More cooperative, reliable shutdown and resource cleanup ensuring queued work is drained and startup/shutdown ordering is safer.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 20, 2026

📝 Walkthrough

Walkthrough

Added a non-blocking stop() to thread_pool and refactored thread management to use std::jthread/stop tokens, std::condition_variable_any, lazy startup, and unified shutdown sequencing; workers observe stop requests and drain or exit cooperatively.

Changes

Cohort / File(s) Summary
Header Declaration
include/boost/capy/ex/thread_pool.hpp
Added public stop() declaration: BOOST_CAPY_DECL void stop() noexcept; to signal worker threads to stop via stop tokens (non-blocking).
Implementation & Threading
src/ex/thread_pool.cpp
Replaced std::threadstd::jthread, std::condition_variablestd::condition_variable_any; added stop_token-aware worker loop; implemented stop() to signal stop_source and wake condition variable; introduced ensure_started() lazy startup, num_threads_/start_flag_, and revised destructor to call stop() and clear threads before draining queue.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant ThreadPool
    participant TaskQueue
    participant Worker

    Client->>ThreadPool: post(task)
    ThreadPool->>ThreadPool: ensure_started()
    ThreadPool->>Worker: start jthread (with stop_token)
    Client->>TaskQueue: enqueue(task)
    Worker->>TaskQueue: wait for task or stop (condvar + stop_token)
    TaskQueue-->>Worker: deliver task
    Worker->>Worker: execute task
    Client->>ThreadPool: stop()
    ThreadPool->>Worker: request_stop (via stop_source -> stop_token)
    ThreadPool->>TaskQueue: notify_all (wake condition variable)
    Worker->>Worker: finish current task (or exit if none), then return/exit
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I nudged the pool to say “be still,”

jthreads wake, then end at will,
A gentle stop, no blocking fuss,
Tasks drained softly, hop — no muss,
🎉 small change, big calm — from me to us.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.77% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically summarizes the main changes: jthread adoption, new stop() method, and lazy-start semantics in the thread_pool.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/ex/thread_pool.cpp`:
- Around line 87-92: The stop() currently calls stop_source_.request_stop()
which does not affect the std::jthread instances because their functions receive
the jthread-owned stop_token; change stop() to iterate the thread container
(e.g., the vector of std::jthread instances used by this pool) and call
request_stop() on each jthread (or call .request_stop() on each thread's
associated stop_source if you've stored one), then notify_all() on cv_; after
this, remove stop_source_ or alternatively make worker thread functions accept
and use the shared stop_source_.get_token() so the original stop_source_ becomes
effective (update worker thread launch code and stop() accordingly).
- Around line 124-130: The destructor currently deletes impl_ before running
shutdown() and destroy(), creating a use-after-free if those methods or
callbacks (e.g., post(), impl_->stop()) access impl_; fix it by reordering
thread_pool::~thread_pool() so it first calls shutdown() and destroy() and only
after those complete deletes impl_, ensuring impl_ remains valid during
base-class cleanup and any callbacks.

@cppalliance-bot
Copy link

cppalliance-bot commented Jan 20, 2026

An automated preview of the documentation is available at https://86.capy.prtest3.cppalliance.org/index.html

If more commits are pushed to the pull request, the docs will rebuild at the same URL.

2026-01-20 03:37:12 UTC

@cppalliance-bot
Copy link

cppalliance-bot commented Jan 20, 2026

GCOVR code coverage report https://86.capy.prtest3.cppalliance.org/gcovr/index.html
LCOV code coverage report https://86.capy.prtest3.cppalliance.org/genhtml/index.html
Coverage Diff https://86.capy.prtest3.cppalliance.org/gcovr/coverage_diff.txt

Build time: 2026-01-20 03:47:18 UTC

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/ex/thread_pool.cpp (1)

47-103: Synchronize stop() with lazy-start and make stop sticky to prevent races and orphaned work.

The implementation has two issues:

  1. Data race on threads_ vector: stop() iterates threads_ while ensure_started() modifies it under call_once. If stop() is called concurrently with post() (a reasonable scenario where the main thread stops the pool while executor operations trigger post internally), this is an unsynchronized read/write on a vector.

  2. Non-sticky stop semantic: Calling stop() before the first post() has no persistent effect. A subsequent post() will trigger ensure_started(), creating fresh threads that bypass the earlier stop() signal. This is semantically wrong even in single-threaded usage—once stopped, the pool should not create new work threads.

The proposed fix correctly addresses both by introducing:

  • An atomic stopped_ flag to make stop sticky
  • A start_mutex_ to synchronize thread creation/iteration
  • Early returns in post() and ensure_started() if already stopped
Suggested fix
+#include <atomic>
+
 class thread_pool::impl
 {
     std::mutex mutex_;
     std::condition_variable_any cv_;
     intrusive_queue<work> q_;
     std::vector<std::jthread> threads_;
     std::size_t num_threads_;
     std::once_flag start_flag_;
+    std::mutex start_mutex_;
+    std::atomic<bool> stopped_{false};
 ...
     void post(any_coro h)
     {
+        if(stopped_.load(std::memory_order_acquire))
+            return;
         ensure_started();
         auto* w = new work(h);
         ...
     }
 ...
     void stop() noexcept
     {
+        stopped_.store(true, std::memory_order_release);
+        std::scoped_lock lk(start_mutex_);
         for (auto& t : threads_)
             t.request_stop();
         cv_.notify_all();
     }
 ...
     void ensure_started()
     {
+        if(stopped_.load(std::memory_order_acquire))
+            return;
         std::call_once(start_flag_, [this]{
+            std::scoped_lock lk(start_mutex_);
+            if(stopped_.load(std::memory_order_acquire))
+                return;
             threads_.reserve(num_threads_);
             for(std::size_t i = 0; i < num_threads_; ++i)
                 threads_.emplace_back([this](std::stop_token st){ run(st); });
         });
     }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants