Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/boost/capy/ex/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ class BOOST_CAPY_DECL
thread_pool(thread_pool const&) = delete;
thread_pool& operator=(thread_pool const&) = delete;

/** Request all worker threads to stop.

Signals all threads to exit via stop token. Threads will
finish their current work item before exiting. Does not
wait for threads to exit.
*/
void
stop() noexcept;

/** Return an executor for this thread pool.

@return An executor associated with this thread pool.
Expand Down
77 changes: 41 additions & 36 deletions src/ex/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/capy/core/intrusive_queue.hpp>
#include <condition_variable>
#include <mutex>
#include <stop_token>
#include <thread>
#include <vector>

Expand All @@ -19,10 +20,8 @@ namespace capy {

//------------------------------------------------------------------------------

// Pimpl implementation hides threading details from the header
class thread_pool::impl
{
// Wraps a coroutine handle for queue storage
struct work : intrusive_queue<work>::node
{
any_coro h_;
Expand All @@ -34,7 +33,6 @@ class thread_pool::impl

void run()
{
// delete before dispatch
auto h = h_;
delete this;
h.resume();
Expand All @@ -47,46 +45,36 @@ class thread_pool::impl
};

std::mutex mutex_;
std::condition_variable cv_;
std::condition_variable_any cv_;
intrusive_queue<work> q_;
std::vector<std::thread> threads_;
bool stop_;
std::vector<std::jthread> threads_;
std::size_t num_threads_;
std::once_flag start_flag_;

public:
~impl()
{
{
std::lock_guard<std::mutex> lock(mutex_);
stop_ = true;
}
cv_.notify_all();

for(auto& t : threads_)
t.join();
stop();
threads_.clear();

// Destroy any work items that were never executed
while(auto* w = q_.pop())
w->destroy();
}

explicit
impl(std::size_t num_threads)
: stop_(false)
: num_threads_(num_threads)
{
if( num_threads == 0)
num_threads = std::thread::hardware_concurrency();
// Fallback
if( num_threads == 0)
num_threads = 1;

threads_.reserve(num_threads);
for(std::size_t i = 0; i < num_threads; ++i)
threads_.emplace_back([this]{ run(); });
if(num_threads_ == 0)
num_threads_ = std::thread::hardware_concurrency();
if(num_threads_ == 0)
num_threads_ = 1;
}

void
post(any_coro h)
{
ensure_started();
auto* w = new work(h);
{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -95,26 +83,37 @@ class thread_pool::impl
cv_.notify_one();
}

void
stop() noexcept
{
for (auto& t : threads_)
t.request_stop();
cv_.notify_all();
}

private:
void
run()
ensure_started()
{
std::call_once(start_flag_, [this]{
threads_.reserve(num_threads_);
for(std::size_t i = 0; i < num_threads_; ++i)
threads_.emplace_back([this](std::stop_token st){ run(st); });
});
}

void
run(std::stop_token st)
{
for(;;)
{
work* w = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]{
return stop_ || !q_.empty();
});

// Only exit when stopped AND queue is drained
if(stop_ && q_.empty())
if(!cv_.wait(lock, st, [this]{ return !q_.empty(); }))
return;

w = q_.pop();
}

w->run();
}
}
Expand All @@ -125,10 +124,9 @@ class thread_pool::impl
thread_pool::
~thread_pool()
{
// Order matters: shutdown services, then impl, then base
shutdown();
delete impl_;
destroy();
delete impl_;
}

thread_pool::
Expand All @@ -137,6 +135,13 @@ thread_pool(std::size_t num_threads)
{
}

void
thread_pool::
stop() noexcept
{
impl_->stop();
}

//------------------------------------------------------------------------------

void
Expand Down
5 changes: 1 addition & 4 deletions test/unit/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ project
<target-os>darwin,<undefined-sanitizer>norecover:<link>static
;

# Exclude buffers/ which has its own Jamfile
for local f in [ glob-tree-ex . : *.cpp : file*.cpp buffers ]
for local f in [ glob-tree-ex . : *.cpp : file*.cpp ]
{
run $(f) ;
}
Comment on lines +29 to 32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for any remaining buffers-specific configuration in Jamfiles
rg -n --glob 'Jamfile*' -C2 'buffers'

Repository: cppalliance/capy

Length of output: 42


🏁 Script executed:

#!/bin/bash
# Check the current structure of test/unit directory
fd --type f 'Jamfile*' test/unit | head -20

Repository: cppalliance/capy

Length of output: 77


🏁 Script executed:

#!/bin/bash
# Get the content of test/unit/Jamfile to see the full context
cat test/unit/Jamfile

Repository: cppalliance/capy

Length of output: 1274


🏁 Script executed:

#!/bin/bash
# Check for any buffers subdirectory and its contents
fd --type f '\.(cpp|h)$' test/unit/buffers 2>/dev/null | head -20

Repository: cppalliance/capy

Length of output: 694


🏁 Script executed:

#!/bin/bash
# List all subdirectories in test/unit to see the current structure
fd --type d . test/unit | head -20

Repository: cppalliance/capy

Length of output: 195


🏁 Script executed:

#!/bin/bash
# Check git history for buffers/Jamfile
git log --oneline --all -- 'test/unit/buffers/Jamfile' 2>/dev/null | head -10

Repository: cppalliance/capy

Length of output: 188


🏁 Script executed:

#!/bin/bash
# Search for any comments or notes about buffers consolidation
rg -n 'buffers' test/unit/Jamfile

Repository: cppalliance/capy

Length of output: 42


🏁 Script executed:

#!/bin/bash
# Check if there are any special dependencies or requirements mentioned in buffers test files
rg -l 'boost/filesystem|requirements' test/unit/buffers/*.cpp | head -5

Repository: cppalliance/capy

Length of output: 42


🏁 Script executed:

#!/bin/bash
# Check the commits that touched buffers/Jamfile to see what was there
git show a915f4d:test/unit/buffers/Jamfile 2>/dev/null || echo "File not in that commit"
git show 3e0ebab:test/unit/buffers/Jamfile 2>/dev/null || echo "File not in that commit"

Repository: cppalliance/capy

Length of output: 1994


🏁 Script executed:

#!/bin/bash
# Check if buffers/Jamfile still exists
ls -la test/unit/buffers/Jamfile 2>/dev/null || echo "Jamfile does not exist in buffers"

Repository: cppalliance/capy

Length of output: 93


🏁 Script executed:

#!/bin/bash
# Check the diff of the commit that may have removed it
git show --stat a915f4d | grep -i jamfile

Repository: cppalliance/capy

Length of output: 242


🏁 Script executed:

#!/bin/bash
# Check if buffers tests include ASIO headers
rg 'boost/asio|boost.asio' test/unit/buffers/ 2>/dev/null | head -10

Repository: cppalliance/capy

Length of output: 119


🏁 Script executed:

#!/bin/bash
# Check the asio.cpp test specifically
cat test/unit/buffers/asio.cpp

Repository: cppalliance/capy

Length of output: 5304


Add ASIO library dependency for buffers tests.

The consolidation removed the <library>/boost/asio//boost_asio/<warnings-as-errors>off and Windows define (<define>_WIN32_WINNT=0x0601) requirements that were in test/unit/buffers/Jamfile. The asio.cpp test depends on ASIO headers and won't build without this library dependency. Consider adding a separate glob-tree-ex loop for buffers tests (similar to the file*.cpp loop on lines 34-39) with the required ASIO library, or add ASIO to the top-level project requirements.

🤖 Prompt for AI Agents
In `@test/unit/Jamfile` around lines 29 - 32, The buffers tests lost the ASIO
dependency and required Windows define when the Jamfiles were consolidated;
update the Jamfile by adding a dedicated glob-tree-ex loop for buffers sources
(e.g., match asio.cpp) that invokes run $(f) with the ASIO library stanza
(<library>/boost/asio//boost_asio/<warnings-as-errors>off) and the define
_WIN32_WINNT=0x0601, or alternatively add that ASIO library and define to the
top-level project requirements so the asio.cpp test will find ASIO headers and
build successfully.

Expand All @@ -40,5 +39,3 @@ for local f in [ glob-tree-ex . : file*.cpp ]
<warnings>off <undefined-sanitizer>norecover:<link>static
;
}

build-project buffers ;
34 changes: 0 additions & 34 deletions test/unit/buffers/Jamfile

This file was deleted.

File renamed without changes.
Loading