diff --git a/bench/bench.cpp b/bench/bench.cpp index 899a1f2..1215f1e 100644 --- a/bench/bench.cpp +++ b/bench/bench.cpp @@ -115,9 +115,9 @@ struct foreign_awaitable { } - // Affine awaitable protocol + // IoAwaitable protocol template - any_coro await_suspend(any_coro h, D const&) const + any_coro await_suspend(any_coro h, D const&, std::stop_token) const { return h; } diff --git a/include/boost/capy.hpp b/include/boost/capy.hpp index 49775de..33138c9 100644 --- a/include/boost/capy.hpp +++ b/include/boost/capy.hpp @@ -10,6 +10,7 @@ #ifndef BOOST_CAPY_HPP #define BOOST_CAPY_HPP +#include #include #include #include diff --git a/include/boost/capy/ex/any_executor.hpp b/include/boost/capy/ex/any_executor.hpp new file mode 100644 index 0000000..65c397f --- /dev/null +++ b/include/boost/capy/ex/any_executor.hpp @@ -0,0 +1,293 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_ANY_EXECUTOR_HPP +#define BOOST_CAPY_ANY_EXECUTOR_HPP + +#include +#include + +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { + +class execution_context; + +/** A type-erased wrapper for executor objects. + + This class provides type erasure for any executor type, enabling + runtime polymorphism with automatic memory management via shared + ownership. It stores a shared pointer to a polymorphic wrapper, + allowing executors of different types to be stored uniformly + while satisfying the full `Executor` concept. + + @par Value Semantics + + This class has value semantics with shared ownership. Copy and + move operations are cheap, simply copying the internal shared + pointer. Multiple `any_executor` instances may share the same + underlying executor. Move operations do not invalidate the + source; there is no moved-from state. + + @par Default State + + A default-constructed `any_executor` holds no executor. Calling + executor operations on a default-constructed instance results + in undefined behavior. Use `operator bool()` to check validity. + + @par Thread Safety + + The `any_executor` itself is thread-safe for concurrent reads. + Concurrent modification requires external synchronization. + Executor operations are safe to call concurrently if the + underlying executor supports it. + + @par Executor Concept + + This class satisfies the `Executor` concept, making it usable + anywhere a concrete executor is expected. + + @see any_executor_ref, Executor +*/ +class any_executor +{ + struct impl_base; + + std::shared_ptr p_; + + struct impl_base + { + virtual ~impl_base() = default; + virtual execution_context& context() const noexcept = 0; + virtual void on_work_started() const noexcept = 0; + virtual void on_work_finished() const noexcept = 0; + virtual std::coroutine_handle<> dispatch(std::coroutine_handle<>) const = 0; + virtual void post(std::coroutine_handle<>) const = 0; + virtual bool equals(impl_base const*) const noexcept = 0; + virtual std::type_info const& target_type() const noexcept = 0; + }; + + template + struct impl final : impl_base + { + Ex ex_; + + template + explicit impl(Ex1&& ex) + : ex_(std::forward(ex)) + { + } + + execution_context& context() const noexcept override + { + return const_cast(ex_).context(); + } + + void on_work_started() const noexcept override + { + ex_.on_work_started(); + } + + void on_work_finished() const noexcept override + { + ex_.on_work_finished(); + } + + std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const override + { + return ex_.dispatch(h); + } + + void post(std::coroutine_handle<> h) const override + { + ex_.post(h); + } + + bool equals(impl_base const* other) const noexcept override + { + if(target_type() != other->target_type()) + return false; + return ex_ == static_cast(other)->ex_; + } + + std::type_info const& target_type() const noexcept override + { + return typeid(Ex); + } + }; + +public: + /** Default constructor. + + Constructs an empty `any_executor`. Calling any executor + operations on a default-constructed instance results in + undefined behavior. + + @par Postconditions + @li `!*this` + */ + any_executor() = default; + + /** Copy constructor. + + Creates a new `any_executor` sharing ownership of the + underlying executor with `other`. + + @par Postconditions + @li `*this == other` + */ + any_executor(any_executor const&) = default; + + /** Copy assignment operator. + + Shares ownership of the underlying executor with `other`. + + @par Postconditions + @li `*this == other` + */ + any_executor& operator=(any_executor const&) = default; + + /** Constructs from any executor type. + + Allocates storage for a copy of the given executor and + stores it internally. The executor must satisfy the + `Executor` concept. + + @param ex The executor to wrap. A copy is stored internally. + + @par Postconditions + @li `*this` is valid + */ + template + requires ( + !std::same_as, any_executor> && + std::copy_constructible>) + any_executor(Ex&& ex) + : p_(std::make_shared>>(std::forward(ex))) + { + } + + /** Returns true if this instance holds a valid executor. + + @return `true` if constructed with an executor, `false` if + default-constructed. + */ + explicit operator bool() const noexcept + { + return p_ != nullptr; + } + + /** Returns a reference to the associated execution context. + + @return A reference to the execution context. + + @pre This instance holds a valid executor. + */ + execution_context& context() const noexcept + { + return p_->context(); + } + + /** Informs the executor that work is beginning. + + Must be paired with a subsequent call to `on_work_finished()`. + + @pre This instance holds a valid executor. + */ + void on_work_started() const noexcept + { + p_->on_work_started(); + } + + /** Informs the executor that work has completed. + + @pre A preceding call to `on_work_started()` was made. + @pre This instance holds a valid executor. + */ + void on_work_finished() const noexcept + { + p_->on_work_finished(); + } + + /** Dispatches a coroutine handle through the wrapped executor. + + Invokes the executor's `dispatch()` operation with the given + coroutine handle, returning a handle suitable for symmetric + transfer. + + @param h The coroutine handle to dispatch for resumption. + + @return A coroutine handle that the caller may use for symmetric + transfer, or `std::noop_coroutine()` if the executor + posted the work for later execution. + + @pre This instance holds a valid executor. + */ + any_coro dispatch(any_coro h) const + { + return p_->dispatch(h); + } + + /** Posts a coroutine handle to the wrapped executor. + + Posts the coroutine handle to the executor for later execution + and returns. The caller should transfer to `std::noop_coroutine()` + after calling this. + + @param h The coroutine handle to post for resumption. + + @pre This instance holds a valid executor. + */ + void post(any_coro h) const + { + p_->post(h); + } + + /** Compares two executor wrappers for equality. + + Two `any_executor` instances are equal if they both hold + executors of the same type that compare equal, or if both + are empty. + + @param other The executor to compare against. + + @return `true` if both wrap equal executors of the same type, + or both are empty. + */ + bool operator==(any_executor const& other) const noexcept + { + if(!p_ && !other.p_) + return true; + if(!p_ || !other.p_) + return false; + return p_->equals(other.p_.get()); + } + + /** Returns the type_info of the wrapped executor. + + @return The `std::type_info` of the stored executor type, + or `typeid(void)` if empty. + */ + std::type_info const& target_type() const noexcept + { + if(!p_) + return typeid(void); + return p_->target_type(); + } +}; + +} // capy +} // boost + +#endif diff --git a/include/boost/capy/ex/run_async.hpp b/include/boost/capy/ex/run_async.hpp index b84fdc5..3481fb9 100644 --- a/include/boost/capy/ex/run_async.hpp +++ b/include/boost/capy/ex/run_async.hpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -184,20 +183,29 @@ struct get_promise_awaiter order) and serves as the task's continuation. When the task final_suspends, control returns to the trampoline which then invokes the appropriate handler. + @tparam Ex The executor type. @tparam Handlers The handler type (default_handler or handler_pair). */ -template +template struct trampoline { - using invoke_fn = void(*)(void*, std::optional&); + using invoke_fn = void(*)(void*, Handlers&); struct promise_type { + Ex ex_; + Handlers handlers_; invoke_fn invoke_ = nullptr; void* task_promise_ = nullptr; - std::optional handlers_; std::coroutine_handle<> task_h_; + // Constructor receives coroutine parameters by lvalue reference + promise_type(Ex ex, Handlers h) + : ex_(std::move(ex)) + , handlers_(std::move(h)) + { + } + trampoline get_return_object() noexcept { return trampoline{ @@ -229,24 +237,27 @@ struct trampoline /// Type-erased invoke function instantiated per task. template - static void invoke_impl(void* p, std::optional& h) + static void invoke_impl(void* p, Handlers& h) { auto& promise = *static_cast::promise_type*>(p); if(promise.ep_) - (*h)(promise.ep_); + h(promise.ep_); else if constexpr(std::is_void_v) - (*h)(); + h(); else - (*h)(std::move(*promise.result_)); + h(std::move(*promise.result_)); } }; /// Coroutine body for trampoline - invokes handlers then destroys task. -template -trampoline -make_trampoline() +template +trampoline +make_trampoline(Ex ex, Handlers h) { - auto& p = co_await get_promise_awaiter::promise_type>{}; + // Parameters are passed to promise_type constructor by coroutine machinery + (void)ex; + (void)h; + auto& p = co_await get_promise_awaiter::promise_type>{}; // Invoke the type-erased handler p.invoke_(p.task_promise_, p.handlers_); @@ -294,8 +305,7 @@ make_trampoline() template class [[nodiscard]] run_async_wrapper { - detail::trampoline tr_; - Ex ex_; + detail::trampoline tr_; std::stop_token st_; public: @@ -304,18 +314,16 @@ class [[nodiscard]] run_async_wrapper Ex ex, std::stop_token st, Handlers h) - : tr_(detail::make_trampoline()) - , ex_(std::move(ex)) + : tr_(detail::make_trampoline( + std::move(ex), std::move(h))) , st_(std::move(st)) { - // Store handlers in the trampoline's promise - tr_.h_.promise().handlers_.emplace(std::move(h)); } // Non-copyable, non-movable (must be used immediately) run_async_wrapper(run_async_wrapper const&) = delete; - run_async_wrapper& operator=(run_async_wrapper const&) = delete; run_async_wrapper(run_async_wrapper&&) = delete; + run_async_wrapper& operator=(run_async_wrapper const&) = delete; run_async_wrapper& operator=(run_async_wrapper&&) = delete; /** Launch the task for execution. @@ -336,20 +344,21 @@ class [[nodiscard]] run_async_wrapper auto& p = tr_.h_.promise(); // Inject T-specific invoke function - p.invoke_ = detail::trampoline::template invoke_impl; + p.invoke_ = detail::trampoline::template invoke_impl; p.task_promise_ = &task_h.promise(); p.task_h_ = task_h; // Setup task's continuation to return to trampoline + // Executor lives in trampoline's promise, so reference is valid for task's lifetime task_h.promise().continuation_ = tr_.h_; - task_h.promise().caller_ex_ = ex_; - task_h.promise().ex_ = ex_; + task_h.promise().caller_ex_ = p.ex_; + task_h.promise().ex_ = p.ex_; task_h.promise().set_stop_token(st_); // Resume task through executor // The executor returns a handle for symmetric transfer; // from non-coroutine code we must explicitly resume it - ex_.dispatch(task_h)(); + p.ex_.dispatch(task_h).resume(); } }; diff --git a/include/boost/capy/task.hpp b/include/boost/capy/task.hpp index ce54eff..9b49489 100644 --- a/include/boost/capy/task.hpp +++ b/include/boost/capy/task.hpp @@ -196,8 +196,11 @@ struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE } else { + static_assert("legacy tasks not supported"); + #if 0 // Trampoline fallback for legacy awaitables return make_affine(std::forward(a), ex_); + #endif } } }; diff --git a/test/unit/ex/any_executor.cpp b/test/unit/ex/any_executor.cpp new file mode 100644 index 0000000..9efb3d5 --- /dev/null +++ b/test/unit/ex/any_executor.cpp @@ -0,0 +1,356 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Test that header file is self-contained. +#include + +#include +#include + +#include "test_suite.hpp" + +#include +#include +#include + +namespace boost { +namespace capy { + +namespace { + +// Verify Executor concept at compile time +static_assert(Executor, + "any_executor must satisfy Executor concept"); + +// Helper to wait for a condition with timeout +template +bool wait_for(Pred pred, std::chrono::milliseconds timeout = std::chrono::milliseconds(5000)) +{ + auto start = std::chrono::steady_clock::now(); + while(!pred()) + { + if(std::chrono::steady_clock::now() - start > timeout) + return false; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return true; +} + +// Simple test coroutine that increments a counter +struct counter_coro +{ + struct promise_type + { + std::atomic* counter; + + counter_coro + get_return_object() noexcept + { + return counter_coro{std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always + initial_suspend() noexcept + { + return {}; + } + + std::suspend_never + final_suspend() noexcept + { + return {}; + } + + void + return_void() noexcept + { + } + + void + unhandled_exception() + { + std::terminate(); + } + }; + + std::coroutine_handle h_; + + ~counter_coro() + { + if(h_) + h_.destroy(); + } + + counter_coro(counter_coro&& other) noexcept + : h_(other.h_) + { + other.h_ = nullptr; + } + + counter_coro& operator=(counter_coro&& other) noexcept + { + if(h_) + h_.destroy(); + h_ = other.h_; + other.h_ = nullptr; + return *this; + } + + std::coroutine_handle + handle() const noexcept + { + return h_; + } + + void + release() noexcept + { + h_ = nullptr; + } + +private: + explicit counter_coro(std::coroutine_handle h) + : h_(h) + { + } +}; + +// Creates a coroutine that increments counter +inline counter_coro +make_counter_coro(std::atomic& counter) +{ + return [](std::atomic* counter) -> counter_coro { + ++(*counter); + co_return; + }(&counter); +} + +} // namespace + +struct any_executor_test +{ + void + testConstruct() + { + // Default construct + { + any_executor ex; + BOOST_TEST(!ex); + } + + // Construct from executor + { + thread_pool pool(1); + auto executor = pool.get_executor(); + any_executor ex(executor); + BOOST_TEST(static_cast(ex)); + } + } + + void + testCopy() + { + thread_pool pool(1); + auto executor = pool.get_executor(); + any_executor ex1(executor); + + // Copy construction + auto ex2 = ex1; + BOOST_TEST(ex1 == ex2); + + // Copy assignment + any_executor ex3; + ex3 = ex1; + BOOST_TEST(ex1 == ex3); + } + + void + testMove() + { + thread_pool pool(1); + auto executor = pool.get_executor(); + any_executor ex1(executor); + + // Move construction - source should remain valid + any_executor ex2(std::move(ex1)); + BOOST_TEST(static_cast(ex2)); + BOOST_TEST(static_cast(ex1)); // No moved-from state + BOOST_TEST(ex1 == ex2); + + // Move assignment - source should remain valid + any_executor ex3; + ex3 = std::move(ex2); + BOOST_TEST(static_cast(ex3)); + BOOST_TEST(static_cast(ex2)); // No moved-from state + BOOST_TEST(ex2 == ex3); + } + + void + testEquality() + { + thread_pool pool1(1); + thread_pool pool2(1); + auto executor1 = pool1.get_executor(); + auto executor2 = pool2.get_executor(); + + any_executor ex1(executor1); + any_executor ex2(executor1); // Same underlying executor + any_executor ex3(executor2); // Different underlying executor + any_executor ex4; // Empty + + BOOST_TEST(ex1 == ex2); + BOOST_TEST(!(ex1 == ex3)); + + // Empty comparisons + any_executor ex5; + BOOST_TEST(ex4 == ex5); // Both empty + BOOST_TEST(!(ex1 == ex4)); // Non-empty vs empty + } + + void + testTargetType() + { + // Empty executor + { + any_executor ex; + BOOST_TEST(ex.target_type() == typeid(void)); + } + + // With executor + { + thread_pool pool(1); + any_executor ex(pool.get_executor()); + BOOST_TEST(ex.target_type() == typeid(thread_pool::executor_type)); + } + } + + void + testContext() + { + thread_pool pool(1); + auto executor = pool.get_executor(); + any_executor ex(executor); + + // context() should return the same reference as the underlying executor + BOOST_TEST(&ex.context() == &executor.context()); + } + + void + testDispatch() + { + thread_pool pool(1); + auto executor = pool.get_executor(); + any_executor ex(executor); + + std::atomic counter{0}; + auto coro = make_counter_coro(counter); + ex.dispatch(coro.handle()); + coro.release(); + + BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); + BOOST_TEST_EQ(counter.load(), 1); + } + + void + testPost() + { + thread_pool pool(1); + auto executor = pool.get_executor(); + any_executor ex(executor); + + std::atomic counter{0}; + auto coro = make_counter_coro(counter); + ex.post(coro.handle()); + coro.release(); + + BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); + BOOST_TEST_EQ(counter.load(), 1); + } + + void + testMultiplePost() + { + thread_pool pool(2); + auto executor = pool.get_executor(); + any_executor ex(executor); + + std::atomic counter{0}; + constexpr int N = 10; + + for(int i = 0; i < N; ++i) + { + auto coro = make_counter_coro(counter); + ex.post(coro.handle()); + coro.release(); + } + + BOOST_TEST(wait_for([&]{ return counter.load() >= N; })); + BOOST_TEST_EQ(counter.load(), N); + } + + void + testSharedOwnership() + { + thread_pool pool(1); + auto executor = pool.get_executor(); + + std::atomic counter{0}; + + // Create any_executor and make copies + any_executor ex1(executor); + any_executor ex2 = ex1; + any_executor ex3 = ex1; + + // All should be equal + BOOST_TEST(ex1 == ex2); + BOOST_TEST(ex2 == ex3); + + // Post through different copies + { + auto coro = make_counter_coro(counter); + ex1.post(coro.handle()); + coro.release(); + } + { + auto coro = make_counter_coro(counter); + ex2.post(coro.handle()); + coro.release(); + } + { + auto coro = make_counter_coro(counter); + ex3.post(coro.handle()); + coro.release(); + } + + BOOST_TEST(wait_for([&]{ return counter.load() >= 3; })); + BOOST_TEST_EQ(counter.load(), 3); + } + + void + run() + { + testConstruct(); + testCopy(); + testMove(); + testEquality(); + testTargetType(); + testContext(); + testDispatch(); + testPost(); + testMultiplePost(); + testSharedOwnership(); + } +}; + +TEST_SUITE( + any_executor_test, + "boost.capy.any_executor"); + +} // capy +} // boost