diff --git a/include/boost/capy/ex/thread_pool.hpp b/include/boost/capy/ex/thread_pool.hpp index 5ee1a52..7e90ab1 100644 --- a/include/boost/capy/ex/thread_pool.hpp +++ b/include/boost/capy/ex/thread_pool.hpp @@ -69,6 +69,15 @@ class BOOST_CAPY_DECL thread_pool(thread_pool const&) = delete; thread_pool& operator=(thread_pool const&) = delete; + /** Request all worker threads to stop. + + Signals all threads to exit via stop token. Threads will + finish their current work item before exiting. Does not + wait for threads to exit. + */ + void + stop() noexcept; + /** Return an executor for this thread pool. @return An executor associated with this thread pool. diff --git a/src/ex/thread_pool.cpp b/src/ex/thread_pool.cpp index a9957f4..c35c48d 100644 --- a/src/ex/thread_pool.cpp +++ b/src/ex/thread_pool.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -19,10 +20,8 @@ namespace capy { //------------------------------------------------------------------------------ -// Pimpl implementation hides threading details from the header class thread_pool::impl { - // Wraps a coroutine handle for queue storage struct work : intrusive_queue::node { any_coro h_; @@ -34,7 +33,6 @@ class thread_pool::impl void run() { - // delete before dispatch auto h = h_; delete this; h.resume(); @@ -47,46 +45,36 @@ class thread_pool::impl }; std::mutex mutex_; - std::condition_variable cv_; + std::condition_variable_any cv_; intrusive_queue q_; - std::vector threads_; - bool stop_; + std::vector threads_; + std::size_t num_threads_; + std::once_flag start_flag_; public: ~impl() { - { - std::lock_guard lock(mutex_); - stop_ = true; - } - cv_.notify_all(); - - for(auto& t : threads_) - t.join(); + stop(); + threads_.clear(); - // Destroy any work items that were never executed while(auto* w = q_.pop()) w->destroy(); } explicit impl(std::size_t num_threads) - : stop_(false) + : num_threads_(num_threads) { - if( num_threads == 0) - num_threads = std::thread::hardware_concurrency(); - // Fallback - if( num_threads == 0) - num_threads = 1; - - threads_.reserve(num_threads); - for(std::size_t i = 0; i < num_threads; ++i) - threads_.emplace_back([this]{ run(); }); + if(num_threads_ == 0) + num_threads_ = std::thread::hardware_concurrency(); + if(num_threads_ == 0) + num_threads_ = 1; } void post(any_coro h) { + ensure_started(); auto* w = new work(h); { std::lock_guard lock(mutex_); @@ -95,26 +83,37 @@ class thread_pool::impl cv_.notify_one(); } + void + stop() noexcept + { + for (auto& t : threads_) + t.request_stop(); + cv_.notify_all(); + } + private: void - run() + ensure_started() + { + std::call_once(start_flag_, [this]{ + threads_.reserve(num_threads_); + for(std::size_t i = 0; i < num_threads_; ++i) + threads_.emplace_back([this](std::stop_token st){ run(st); }); + }); + } + + void + run(std::stop_token st) { for(;;) { work* w = nullptr; { std::unique_lock lock(mutex_); - cv_.wait(lock, [this]{ - return stop_ || !q_.empty(); - }); - - // Only exit when stopped AND queue is drained - if(stop_ && q_.empty()) + if(!cv_.wait(lock, st, [this]{ return !q_.empty(); })) return; - w = q_.pop(); } - w->run(); } } @@ -125,10 +124,9 @@ class thread_pool::impl thread_pool:: ~thread_pool() { - // Order matters: shutdown services, then impl, then base shutdown(); - delete impl_; destroy(); + delete impl_; } thread_pool:: @@ -137,6 +135,13 @@ thread_pool(std::size_t num_threads) { } +void +thread_pool:: +stop() noexcept +{ + impl_->stop(); +} + //------------------------------------------------------------------------------ void diff --git a/test/unit/Jamfile b/test/unit/Jamfile index d8f173f..e3c1a8b 100644 --- a/test/unit/Jamfile +++ b/test/unit/Jamfile @@ -26,8 +26,7 @@ project darwin,norecover:static ; -# Exclude buffers/ which has its own Jamfile -for local f in [ glob-tree-ex . : *.cpp : file*.cpp buffers ] +for local f in [ glob-tree-ex . : *.cpp : file*.cpp ] { run $(f) ; } @@ -40,5 +39,3 @@ for local f in [ glob-tree-ex . : file*.cpp ] off norecover:static ; } - -build-project buffers ; diff --git a/test/unit/buffers/Jamfile b/test/unit/buffers/Jamfile deleted file mode 100644 index 39fbf12..0000000 --- a/test/unit/buffers/Jamfile +++ /dev/null @@ -1,34 +0,0 @@ -# -# Copyright (c) 2023 Vinnie Falco (vinnie.falco@gmail.com) -# -# Distributed under the Boost Software License, Version 1.0. (See accompanying -# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -# -# Official repository: https://github.com/cppalliance/capy -# - -import testing ; - -project - : requirements - 20 - /boost/capy//boost_capy - /boost/asio//boost_asio/off - ../../../../url/extra/test_suite/test_main.cpp - ../../../../url/extra/test_suite/test_suite.cpp - . - ../../../../url/extra/test_suite - extra - on - darwin,norecover:static - windows:_WIN32_WINNT=0x0601 - ; - -# Exclude buffers.cpp to avoid output name conflict with buffers/ directory -for local f in [ glob-tree-ex . : *.cpp : buffers.cpp ] -{ - run $(f) ; -} - -# Use explicit target name to avoid conflict with buffers/ output directory -run buffers.cpp : : : : buffers_ ; diff --git a/test/unit/buffers/buffers.cpp b/test/unit/buffers/buffers_.cpp similarity index 100% rename from test/unit/buffers/buffers.cpp rename to test/unit/buffers/buffers_.cpp