From 5054aa5fc40ba0620699e320b6de5080bb343b0d Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Thu, 22 Jan 2026 16:26:00 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`feature?= =?UTF-8?q?/when-any`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @mvandeberg. * https://github.com/cppalliance/capy/pull/101#issuecomment-3785323460 The following files were modified: * `include/boost/capy/when_all.hpp` * `include/boost/capy/when_any.hpp` * `test/unit/when_any.cpp` --- include/boost/capy/when_all.hpp | 24 +- include/boost/capy/when_any.hpp | 1841 +++++++++++++++++++++++ test/unit/when_any.cpp | 2422 +++++++++++++++++++++++++++++++ 3 files changed, 4284 insertions(+), 3 deletions(-) create mode 100644 include/boost/capy/when_any.hpp create mode 100644 test/unit/when_any.cpp diff --git a/include/boost/capy/when_all.hpp b/include/boost/capy/when_all.hpp index 46dccfeb..cbb374a3 100644 --- a/include/boost/capy/when_all.hpp +++ b/include/boost/capy/when_all.hpp @@ -230,7 +230,7 @@ struct when_all_runner auto await_transform(Awaitable&& a) { using A = std::decay_t; - if constexpr (IoAwaitable) + if constexpr (IoAwaitable) { return transform_awaiter{ std::forward(a), this}; @@ -312,6 +312,20 @@ class when_all_launcher } template + /** + * @brief Launches all child runners, wires parent stop propagation, and defers parent resumption to the shared state. + * + * Stores the parent continuation and executor in the shared state, connects the optional parent stop_token + * to the state's stop_source (requesting stop immediately if the parent already requested stop), then + * launches a runner for each task with the state's stop token. Parent resumption is not performed here; + * the shared state will resume the parent when all children have completed. + * + * @tparam Ex Type of the caller executor. + * @param continuation The parent coroutine handle to be resumed once all tasks complete. + * @param caller_ex Executor on which child runners should be scheduled. + * @param parent_token Optional parent stop_token whose stop requests are forwarded to child runners. + * @return coro A no-op coroutine; actual resumption of the parent continuation is performed by the shared state. + */ coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {}) { state_->continuation_ = continuation; @@ -328,7 +342,11 @@ class when_all_launcher state_->stop_source_.request_stop(); } - // Launch all tasks concurrently + // CRITICAL: If the last task finishes synchronously then the parent + // coroutine resumes, destroying its frame, and destroying this object + // prior to the completion of await_suspend. Therefore, await_suspend + // must ensure `this` cannot be referenced after calling `launch_one` + // for the last time. auto token = state_->stop_source_.get_token(); [&](std::index_sequence) { (..., launch_one(caller_ex, token)); @@ -458,4 +476,4 @@ using when_all_result_type = detail::when_all_result_t; } // namespace capy } // namespace boost -#endif +#endif \ No newline at end of file diff --git a/include/boost/capy/when_any.hpp b/include/boost/capy/when_any.hpp new file mode 100644 index 00000000..c20238a4 --- /dev/null +++ b/include/boost/capy/when_any.hpp @@ -0,0 +1,1841 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_WHEN_ANY_HPP +#define BOOST_CAPY_WHEN_ANY_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * when_any - Race multiple tasks, return first completion + * ======================================================== + * + * OVERVIEW: + * --------- + * when_any launches N tasks concurrently and completes when the FIRST task + * finishes (success or failure). It then requests stop for all siblings and + * waits for them to acknowledge before returning. + * + * ARCHITECTURE: + * ------------- + * The design mirrors when_all but with inverted completion semantics: + * + * when_all: complete when remaining_count reaches 0 (all done) + * when_any: complete when has_winner becomes true (first done) + * BUT still wait for remaining_count to reach 0 for cleanup + * + * Key components: + * - when_any_state: Shared state tracking winner and completion + * - when_any_runner: Wrapper coroutine for each child task + * - when_any_launcher: Awaitable that starts all runners concurrently + * + * CRITICAL INVARIANTS: + * -------------------- + * 1. Exactly one task becomes the winner (via atomic compare_exchange) + * 2. All tasks must complete before parent resumes (cleanup safety) + * 3. Stop is requested immediately when winner is determined + * 4. Only the winner's result/exception is stored + * + * TYPE DEDUPLICATION: + * ------------------- + * std::variant requires unique alternative types. Since when_any can race + * tasks with identical return types (e.g., three task), we must + * deduplicate types before constructing the variant. + * + * Example: when_any(task, task, task) + * - Raw types after void->monostate: int, string, int + * - Deduplicated variant: std::variant + * - Return: pair> + * + * The winner_index tells you which task won (0, 1, or 2), while the variant + * holds the result. Use the index to determine how to interpret the variant. + * + * VOID HANDLING: + * -------------- + * void tasks contribute std::monostate to the variant (then deduplicated). + * All-void tasks result in: pair> + * + * MEMORY MODEL: + * ------------- + * - try_win() uses acq_rel to synchronize winner selection + * - signal_completion() uses acq_rel for remaining_count + * - Winner data (result/exception) is written before try_win() returns true, + * and read after all tasks complete, so no additional synchronization needed + * + * EXCEPTION SEMANTICS: + * -------------------- + * Unlike when_all (which captures first exception, discards others), when_any + * treats exceptions as valid completions. If the winning task threw, that + * exception is rethrown. Exceptions from non-winners are silently discarded. + */ + +namespace boost { +namespace capy { + +namespace detail { + +/** Convert void to monostate for variant storage. + + std::variant is ill-formed, so void tasks contribute + std::monostate to the result variant instead. Non-void types + pass through unchanged. + + @tparam T The type to potentially convert (void becomes monostate). +*/ +template +using void_to_monostate_t = std::conditional_t, std::monostate, T>; + +/** Type deduplication for variant construction. + + std::variant requires unique alternative types. These metafunctions + deduplicate a type list while preserving order of first occurrence. + + @par Algorithm + Fold left over the type list, appending each type to the accumulator + only if not already present. O(N^2) in number of types but N is + typically small (number of when_any arguments). +*/ +/** Primary template for appending a type to a variant if not already present. + + @tparam Variant The accumulated variant type. + @tparam T The type to potentially append. +*/ +template +struct variant_append_if_unique; + +/** Specialization that checks for type uniqueness and appends if needed. + + @tparam Vs Types already in the variant. + @tparam T The type to potentially append. +*/ +template +struct variant_append_if_unique, T> +{ + /** Result type: original variant if T is duplicate, extended variant otherwise. */ + using type = std::conditional_t< + (std::is_same_v || ...), + std::variant, + std::variant>; +}; + +/** Primary template for type list deduplication. + + @tparam Accumulated The variant accumulating unique types. + @tparam Remaining Types still to be processed. +*/ +template +struct deduplicate_impl; + +/** Base case: no more types to process. + + @tparam Accumulated The final deduplicated variant type. +*/ +template +struct deduplicate_impl +{ + /** The final deduplicated variant type. */ + using type = Accumulated; +}; + +/** Recursive case: add T if unique, then process rest. + + @tparam Accumulated The variant accumulated so far. + @tparam T The current type to potentially add. + @tparam Rest Remaining types to process. +*/ +template +struct deduplicate_impl +{ + /** Intermediate type after potentially appending T. */ + using next = typename variant_append_if_unique::type; + + /** Final result after processing all remaining types. */ + using type = typename deduplicate_impl::type; +}; + +/** Deduplicated variant from a list of types. + + Constructs a std::variant containing unique types from the input list. + Void types are converted to std::monostate before deduplication. + The first type T0 seeds the accumulator, ensuring the variant is well-formed. + + @tparam T0 First result type (required, seeds the deduplication). + @tparam Ts Remaining result types (void is converted to monostate). +*/ +template +using unique_variant_t = typename deduplicate_impl< + std::variant>, + void_to_monostate_t...>::type; + +/** Result type for when_any: (winner_index, deduplicated_variant). + + The first element is the zero-based index of the winning task in the + original argument order. The second element is a variant holding the + winner's result by type. When multiple tasks share the same return type, + use the index to determine which task actually won. + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. +*/ +template +using when_any_result_t = std::pair>; + +/** Shared state for when_any operation. + + Coordinates winner selection, result storage, and completion tracking + for all child tasks in a when_any operation. + + @par Lifetime + Allocated on the parent coroutine's frame, outlives all runners. + + @par Thread Safety + Atomic operations protect winner selection and completion count. + Result storage is written only by the winner before any concurrent access. + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. +*/ +template +struct when_any_state +{ + /** Total number of tasks being raced. */ + static constexpr std::size_t task_count = 1 + sizeof...(Ts); + + /** Deduplicated variant type for storing the winner's result. */ + using variant_type = unique_variant_t; + + /** Counter for tasks still running. + + Must wait for ALL tasks to finish before parent resumes; this ensures + runner coroutine frames are valid until their final_suspend completes. + */ + std::atomic remaining_count_; + + /** Flag indicating whether a winner has been determined. + + Winner selection: exactly one task wins via atomic CAS on has_winner_. + winner_index_ is written only by the winner, read after all complete. + */ + std::atomic has_winner_{false}; + + /** Index of the winning task in the original argument list. */ + std::size_t winner_index_{0}; + + /** Storage for the winner's result value. + + Result storage: deduplicated variant. Stored by type, not task index, + because multiple tasks may share the same return type. + */ + variant_type result_; + + /** Exception thrown by the winner, if any. + + Non-null if winner threw (rethrown to caller after all tasks complete). + */ + std::exception_ptr winner_exception_; + + /** Handles to runner coroutines for cleanup. + + Runner coroutine handles; destroyed in destructor after all complete. + */ + std::array runner_handles_{}; + + /** Stop source for cancelling sibling tasks. + + Owned stop_source: request_stop() called when winner determined. + */ + std::stop_source stop_source_; + + /** Callback functor that forwards stop requests. + + Forwards parent's stop requests to our stop_source, enabling + cancellation to propagate from caller through when_any to children. + */ + struct stop_callback_fn + { + /** Pointer to the stop source to signal. */ + std::stop_source* source_; + + /** + * @brief Request cancellation on the associated stop source. + * + * Invokes `request_stop()` on the underlying `std::stop_source`. + */ + void operator()() const noexcept { source_->request_stop(); } + }; + + /** Type alias for the stop callback registration. */ + using stop_callback_t = std::stop_callback; + + /** Optional callback linking parent's stop token to our stop source. */ + std::optional parent_stop_callback_; + + /** Parent coroutine handle to resume when all children complete. */ + coro continuation_; + + /** Executor reference for dispatching the parent resumption. */ + executor_ref caller_ex_; + + /** Construct state for racing task_count tasks. + + Initializes remaining_count_ to task_count so all tasks must complete + before the parent coroutine resumes. + */ + when_any_state() + : remaining_count_(task_count) + { + } + + /** Destroy state and clean up runner coroutine handles. + + All runners must have completed before destruction (guaranteed by + waiting for remaining_count_ to reach zero). + */ + ~when_any_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + /** Attempt to become the winner. + + Atomically claims winner status. Exactly one task succeeds; all others + see false. The winner must store its result before returning. + + @param index The task's index in the original argument list. + @return true if this task is now the winner, false if another won first. + */ + bool try_win(std::size_t index) noexcept + { + bool expected = false; + if(has_winner_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + { + winner_index_ = index; + // Signal siblings to exit early if they support cancellation + stop_source_.request_stop(); + return true; + } + return false; + } + + /** Store the winner's result. + + @pre Only called by the winner (try_win returned true). + @note Uses type-based emplacement because the variant is deduplicated; + task index may differ from variant alternative index. + */ + template + /** + * @brief Store the winner's result into the shared deduplicated variant. + * + * Moves the provided value into the state's result variant under the alternative + * corresponding to its type. + * + * @param value The winner's result to store; will be moved into the variant. + */ + void set_winner_result(T value) + noexcept(std::is_nothrow_move_constructible_v) + { + result_.template emplace(std::move(value)); + } + + /** + * @brief Store the winner's completion value for a void-returning task as std::monostate. + * + * @pre Called only by the runner that won the race. + */ + void set_winner_void() noexcept + { + result_.template emplace(std::monostate{}); + } + + /** + * Stores the winning task's exception for later inspection or rethrow. + * + * @param ep Exception pointer captured from the winning task. + * @pre This is called only by the winner (i.e., after try_win returned true). + */ + void set_winner_exception(std::exception_ptr ep) noexcept + { + winner_exception_ = ep; + } + + /** + * @brief Signals that a runner has finished and, if it was the last, resumes the parent coroutine. + * + * Called by each runner's final_suspend. Decrements the remaining count and, when the + * last runner completes, dispatches the stored continuation via the caller executor so + * the parent resumes after all child frames are destroyed. + * + * @return Coroutine to resume: the parent's continuation if this call observed the final completion, otherwise a noop coroutine. + */ + coro signal_completion() noexcept + { + auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel); + if(remaining == 1) + return caller_ex_.dispatch(continuation_); + return std::noop_coroutine(); + } +}; + +/** Wrapper coroutine that runs a single child task for when_any. + + Each child task is wrapped in a runner that: + 1. Propagates executor and stop_token to the child + 2. Attempts to claim winner status on completion + 3. Stores result only if this runner won + 4. Signals completion regardless of win/loss (for cleanup) + + @tparam T The result type of the wrapped task. + @tparam Ts All task result types (for when_any_state compatibility). +*/ +template +struct when_any_runner +{ + /** Promise type for the runner coroutine. + + Manages executor propagation, stop token forwarding, and completion + signaling for the wrapped child task. + */ + struct promise_type : frame_allocating_base + { + /** Pointer to shared state for winner coordination. */ + when_any_state* state_ = nullptr; + + /** Index of this task in the original argument list. */ + std::size_t index_ = 0; + + /** Executor reference inherited from the parent coroutine. */ + executor_ref ex_; + + /** Stop token for cooperative cancellation. */ + std::stop_token stop_token_; + + /** + * @brief Construct a runner object that owns the coroutine associated with this promise. + * + * @return when_any_runner A runner wrapping the coroutine handle obtained from this promise. + */ + when_any_runner get_return_object() + { + return when_any_runner(std::coroutine_handle::from_promise(*this)); + } + + /** + * @brief Suspend the coroutine immediately upon creation. + * + * @return An awaiter that always suspends the coroutine. + */ + std::suspend_always initial_suspend() noexcept + { + return {}; + } + + /** + * @brief Final-suspend awaiter that notifies shared state this runner has finished. + * + * The returned awaiter always suspends; on suspend it calls the shared state's + * signal_completion() to decrement the remaining task count and obtain the + * coroutine to resume (the parent continuation) if this was the last runner. + * + * @return The parent coroutine to resume if this was the last task, `std::noop_coroutine()` otherwise. + */ + auto final_suspend() noexcept + { + /** Awaiter that signals task completion and potentially resumes parent. */ + struct awaiter + { + /** Pointer to the promise for accessing shared state. */ + promise_type* p_; + + /** + * @brief Indicates the awaiter is never ready and always suspends. + * + * @return `false` — always suspend the awaiting coroutine. + */ + bool await_ready() const noexcept + { + return false; + } + + /** + * Notify the shared when_any state that this runner has completed and obtain the coroutine to resume. + * + * @return The parent coroutine to resume if this was the last outstanding task; otherwise `std::noop_coroutine()`. + */ + coro await_suspend(coro) noexcept + { + return p_->state_->signal_completion(); + } + + /** + * @brief No-op await_resume for the final_suspend awaiter. + * + * Performs no action when resumed; control simply returns to the awaiting context. + */ + void await_resume() const noexcept + { + } + }; + return awaiter{this}; + } + + /** + * @brief Satisfies the coroutine promise's return requirement when a runner completes successfully. + * + * @details No-op used to indicate normal completion of the runner coroutine. Result capture and + * propagation are performed by the runner/launcher infrastructure; this function itself performs no work. + */ + void return_void() + { + } + + /** + * @brief Handle an exception thrown by the child runner. + * + * Exceptions are treated as valid completions. If this runner's exception + * becomes the winning result it is stored for rethrowing to the caller once + * all runners complete; otherwise the exception is discarded. + */ + void unhandled_exception() + { + if(state_->try_win(index_)) + state_->set_winner_exception(std::current_exception()); + } + + /** Awaiter wrapper that injects executor and stop token into child awaitables. + + @tparam Awaitable The underlying awaitable type being wrapped. + */ + template + struct transform_awaiter + { + /** The wrapped awaitable instance. */ + std::decay_t a_; + + /** Pointer to promise for accessing executor and stop token. */ + promise_type* p_; + + /** + * Indicates whether the underlying awaitable can complete synchronously. + * + * @return `true` if the awaitable can complete synchronously, `false` otherwise. + */ + bool await_ready() + { + return a_.await_ready(); + } + + /** + * @brief Resume the underlying awaitable and obtain its result. + * + * @return The value returned by the wrapped awaitable's `await_resume`. + */ + auto await_resume() + { + return a_.await_resume(); + } + + /** Suspend with executor and stop token injection. + + @tparam Promise The suspending coroutine's promise type. + @param h Handle to the suspending coroutine. + @return Coroutine to resume or void. + */ + template + /** + * @brief Forwards suspension to the wrapped awaitable while injecting the promise's executor and stop token. + * + * @param h Coroutine handle of the awaiting coroutine. + * @return The value returned by the underlying awaitable's `await_suspend` call. + */ + auto await_suspend(std::coroutine_handle h) + { + return a_.await_suspend(h, p_->ex_, p_->stop_token_); + } + }; + + /** Transform awaitables to inject executor and stop token. + + @tparam Awaitable The awaitable type being co_awaited. + @param a The awaitable instance. + @return Transformed awaiter with executor/stop_token injection. + */ + template + /** + * @brief Adapts an awaitable to the runner promise's execution context. + * + * Produces an awaitable that will run with this promise's executor and stop token: + * if the awaited type accepts an executor and stop token, the returned awaitable + * forwards this promise's executor and stop token to the inner operation; + * otherwise the returned awaitable is bound to this promise's executor. + * + * @tparam Awaitable Type of the awaitable being adapted. + * @param a The awaitable to adapt. + * @return An awaitable adapted to execute with the promise's executor and stop token. + */ + auto await_transform(Awaitable&& a) + { + using A = std::decay_t; + if constexpr (IoAwaitable) + { + return transform_awaiter{ + std::forward(a), this}; + } + else + { + return make_affine(std::forward(a), ex_); + } + } + }; + + /** Handle to the underlying coroutine frame. */ + std::coroutine_handle h_; + + /** + * @brief Construct a runner wrapper from an existing coroutine handle. + * + * @param h Coroutine handle for the runner promise; ownership of the handle is stored by the returned wrapper. + */ + explicit when_any_runner(std::coroutine_handle h) + : h_(h) + { + } + + /** Move constructor (Clang 14 workaround). + + Clang 14 (non-Apple) has a coroutine codegen bug requiring explicit + move constructor; other compilers work correctly with deleted move. + */ +#if defined(__clang__) && __clang_major__ == 14 && !defined(__apple_build_version__) + /** + * @brief Move-constructs a runner by transferring ownership of the underlying coroutine handle. + * + * The source runner's handle is set to `nullptr` after the transfer. + * + * @param other Source runner whose coroutine handle will be moved-from and nulled. + */ +when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {} +#endif + + /** Copy construction is not allowed. */ + when_any_runner(when_any_runner const&) = delete; + + /** Copy assignment is not allowed. */ + when_any_runner& operator=(when_any_runner const&) = delete; + + /** Move construction is deleted (except on Clang 14). */ +#if !defined(__clang__) || __clang_major__ != 14 || defined(__apple_build_version__) + /** + * @brief Deleted move constructor; instances of this type cannot be moved. + * + * Attempts to move a `when_any_runner` are prohibited to preserve unique coroutine handle ownership and invariants. + */ +when_any_runner(when_any_runner&&) = delete; +#endif + + /** Move assignment is not allowed. */ + when_any_runner& operator=(when_any_runner&&) = delete; + + /** Release ownership of the coroutine handle. + + @return The coroutine handle; this object becomes empty. + */ + auto release() noexcept + { + return std::exchange(h_, nullptr); + } +}; + +/** Create a runner coroutine for a single task in when_any. + + Factory function that creates a wrapper coroutine for a child task. + The runner handles executor/stop_token propagation and winner selection. + + @tparam Index Compile-time index of this task in the argument list. + @tparam T The result type of the task being wrapped. + @tparam Ts All task result types (for when_any_state compatibility). + @param inner The task to run (will be moved from). + @param state Shared state for winner coordination. + @return Runner coroutine (must be started via resume()). +*/ +template +/** + * @brief Creates a runner coroutine that awaits a child task and, if it completes first, records its outcome into shared when_any state. + * + * The returned runner will await the provided task `inner`. If the awaited task becomes the winner, the runner stores the winner's value (or a monostate for `void`) or captures and stores the winner's exception in `state`. + * + * @tparam Index Index of this task within the when_any launch set. + * @tparam T Type returned by the `inner` task. + * @tparam Ts Remaining types in the when_any state. + * @param inner The child task to run. + * @param state Pointer to the shared when_any_state coordinating winner selection and result storage. + * @return when_any_runner A coroutine wrapper that runs `inner` and updates `state` on completion. + */ +when_any_runner +make_when_any_runner(task inner, when_any_state* state) +{ + if constexpr (std::is_void_v) + { + co_await std::move(inner); + if(state->try_win(Index)) + state->set_winner_void(); // noexcept + } + else + { + auto result = co_await std::move(inner); + if(state->try_win(Index)) + { + try + { + state->set_winner_result(std::move(result)); + } + catch(...) + { + state->set_winner_exception(std::current_exception()); + } + } + } +} + +/** Awaitable that launches all runner coroutines concurrently. + + Handles the tricky lifetime issue where tasks may complete synchronously + during launch, potentially destroying this awaitable's frame before + all tasks are extracted from the tuple. See await_suspend for details. + + @tparam Ts The result types of the tasks being launched. +*/ +template +class when_any_launcher +{ + /** Pointer to tuple of tasks to launch. */ + std::tuple...>* tasks_; + + /** Pointer to shared state for coordination. */ + when_any_state* state_; + +public: + /** Construct launcher with task tuple and shared state. + + @param tasks Pointer to tuple of tasks (must outlive the await). + @param state Pointer to shared state for winner coordination. + */ + when_any_launcher( + std::tuple...>* tasks, + when_any_state* state) + : tasks_(tasks) + , state_(state) + { + } + + /** + * @brief Determines whether the launcher completes immediately. + * + * The launcher is ready if there are no tasks to launch. + * + * @return `true` if there are zero tasks, `false` otherwise. + */ + bool await_ready() const noexcept + { + return sizeof...(Ts) == 0; + } + + /** Launch all runner coroutines and suspend the parent. + + Sets up stop propagation from parent to children, then launches + each task in a runner coroutine. Returns noop_coroutine because + runners resume the parent via signal_completion(). + + CRITICAL: If the last task finishes synchronously then the parent + coroutine resumes, destroying its frame, and destroying this object + prior to the completion of await_suspend. Therefore, await_suspend + must ensure `this` cannot be referenced after calling `launch_one` + for the last time. + + @tparam Ex The executor type. + @param continuation Handle to the parent coroutine to resume later. + @param caller_ex Executor for dispatching child coroutines. + @param parent_token Stop token from the parent for cancellation propagation. + @return noop_coroutine; parent is resumed by the last completing task. + */ + template + /** + * @brief Prepare and start all child runners, forwarding parent cancellation and saving the parent continuation and executor. + * + * Stores the awaiting coroutine's continuation and caller executor into shared state, links the parent's stop token to the state's stop source (propagating an immediate stop if already requested), launches each child runner with the derived stop token and executor, and returns a noop coroutine handle so the awaiting coroutine remains suspended while children run. + * + * @param continuation The continuation coroutine to resume once all children complete. + * @param caller_ex The executor used to dispatch the parent continuation and to resume child runners. + * @param parent_token Optional stop token from the parent; if provided, its stop requests are forwarded to all children. + * @return coro A coroutine handle equal to `std::noop_coroutine()`. + */ + coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {}) + { + state_->continuation_ = continuation; + state_->caller_ex_ = caller_ex; + + // Forward parent's stop requests to children + if(parent_token.stop_possible()) + { + state_->parent_stop_callback_.emplace( + parent_token, + typename when_any_state::stop_callback_fn{&state_->stop_source_}); + + if(parent_token.stop_requested()) + state_->stop_source_.request_stop(); + } + + auto token = state_->stop_source_.get_token(); + [&](std::index_sequence) { + (..., launch_one(caller_ex, token)); + }(std::index_sequence_for{}); + + return std::noop_coroutine(); + } + + /** + * @brief Resume the awaiting coroutine once all launched runner coroutines have completed. + * + * This function performs no action and returns no value; results and any winner state are available + * from the shared when_any state object maintained by the launcher. + */ + void await_resume() const noexcept + { + } + +private: + /** Launch a single runner coroutine for task at index I. + + Creates the runner, configures its promise with state and executor, + stores its handle for cleanup, and dispatches it for execution. + + @tparam I Compile-time index of the task in the tuple. + @tparam Ex The executor type. + @param caller_ex Executor for dispatching the runner. + @param token Stop token for cooperative cancellation. + + @pre Ex::dispatch() and coro::resume() must not throw. If they do, + the coroutine handle may leak. + */ + template + /** + * @brief Create, initialize, and start the runner coroutine for task `I`. + * + * Initializes the runner's promise (shared state pointer, runtime index, executor, and stop token), + * stores the coroutine handle into the shared state's runner_handles_ at position `I`, and + * dispatches and resumes the coroutine via the provided executor. + * + * @tparam I Compile-time index of the task to launch. + * @param caller_ex Executor used to dispatch and resume the runner coroutine. + * @param token Stop token that will be forwarded to the runner's promise for cancellation. + */ + void launch_one(Ex const& caller_ex, std::stop_token token) + { + auto runner = make_when_any_runner( + std::move(std::get(*tasks_)), state_); + + auto h = runner.release(); + h.promise().state_ = state_; + h.promise().index_ = I; + h.promise().ex_ = caller_ex; + h.promise().stop_token_ = token; + + coro ch{h}; + state_->runner_handles_[I] = ch; + caller_ex.dispatch(ch).resume(); + } +}; + +} // namespace detail + +/** Wait for the first task to complete. + + Races multiple heterogeneous tasks concurrently and returns when the + first one completes. The result includes the winner's index and a + deduplicated variant containing the result value. + + @par Example + @code + task example() { + auto [index, result] = co_await when_any( + fetch_from_primary(), // task + fetch_from_backup() // task + ); + // index is 0 or 1, result holds the winner's Response + auto response = std::get(result); + } + @endcode + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. + @param task0 The first task to race. + @param tasks Additional tasks to race concurrently. + @return A task yielding a pair of (winner_index, result_variant). + + @par Key Features + @li All tasks are launched concurrently + @li Returns when first task completes (success or failure) + @li Stop is requested for all siblings + @li Waits for all siblings to complete before returning + @li If winner threw, that exception is rethrown + @li Void tasks contribute std::monostate to the variant +*/ +template +/** + * @brief Races multiple tasks and completes with the first task that finishes. + * + * Launches all provided tasks concurrently and returns the index of the winning task + * together with the winner's result stored in a deduplicated variant type. + * + * @param task0 First task; ownership is moved into the when_any operation. + * @param tasks Additional tasks; each is moved into the when_any operation. + * @returns result_type A pair where the first element is the zero-based index of the winning task and + * the second element is a deduplicated `std::variant` holding the winner's result (void results are represented by `std::monostate`). + * @throws Any exception thrown by the winning task, rethrown after all tasks have completed. + */ +[[nodiscard]] task> +when_any(task task0, task... tasks) +{ + using result_type = detail::when_any_result_t; + + detail::when_any_state state; + std::tuple, task...> task_tuple(std::move(task0), std::move(tasks)...); + + co_await detail::when_any_launcher(&task_tuple, &state); + + if(state.winner_exception_) + std::rethrow_exception(state.winner_exception_); + + co_return result_type{state.winner_index_, std::move(state.result_)}; +} + +/** Alias for when_any result type, useful for declaring callback signatures. + + Provides a convenient public alias for the internal result type. + The result is a pair containing the winner's index and a deduplicated + variant holding the result value. + + @par Example + @code + void on_complete(when_any_result_type result); + @endcode + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. +*/ +template +using when_any_result_type = detail::when_any_result_t; + +namespace detail { + +/** Shared state for homogeneous when_any (vector overload). + + Simpler than the heterogeneous version: uses std::optional instead + of variant, and std::vector instead of std::array for runner handles. + + @tparam T The common result type of all tasks. +*/ +template +struct when_any_homogeneous_state +{ + /** Counter for tasks still running. + + Completion tracking - must wait for ALL tasks for proper cleanup. + */ + std::atomic remaining_count_; + + /** Total number of tasks being raced. */ + std::size_t task_count_; + + /** Flag indicating whether a winner has been determined. + + Winner tracking - first task to complete claims this. + */ + std::atomic has_winner_{false}; + + /** Index of the winning task in the vector. */ + std::size_t winner_index_{0}; + + /** Storage for the winner's result value. + + Result storage - simple value, no variant needed. + */ + std::optional result_; + + /** Exception thrown by the winner, if any. */ + std::exception_ptr winner_exception_; + + /** Handles to runner coroutines for cleanup. + + Runner handles - destroyed in destructor. + */ + std::vector runner_handles_; + + /** Stop source for cancelling sibling tasks. + + Stop propagation - requested when winner is found. + */ + std::stop_source stop_source_; + + /** Callback functor that forwards stop requests. + + Connects parent's stop_token to our stop_source. + */ + struct stop_callback_fn + { + /** Pointer to the stop source to signal. */ + std::stop_source* source_; + + /** + * @brief Request cancellation on the associated stop source. + * + * Invokes `request_stop()` on the underlying `std::stop_source`. + */ + void operator()() const noexcept { source_->request_stop(); } + }; + + /** Type alias for the stop callback registration. */ + using stop_callback_t = std::stop_callback; + + /** Optional callback linking parent's stop token to our stop source. */ + std::optional parent_stop_callback_; + + /** Parent coroutine handle to resume when all children complete. */ + coro continuation_; + + /** Executor reference for dispatching the parent resumption. */ + executor_ref caller_ex_; + + /** + * @brief Create a shared state for racing a given number of homogeneous tasks. + * + * Initializes the completion counter, records the total task count, + * and allocates storage for per-task runner coroutine handles. + * + * @param count Number of tasks participating in the race. + */ + explicit when_any_homogeneous_state(std::size_t count) + : remaining_count_(count) + , task_count_(count) + , runner_handles_(count) + { + } + + /** + * @brief Destroy the state and release any stored runner coroutine handles. + * + * Iterates over stored coroutine handles in `runner_handles_` and calls `destroy()` on each non-null handle + * to release coroutine resources and avoid leaks. + */ + ~when_any_homogeneous_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + /** + * Attempt to claim winner status for the calling task. + * + * If successful, records the winner index and requests cancellation of sibling tasks. + * + * @param index The task's runtime index to record as the winner. + * @return `true` if this task became the winner, `false` otherwise. + */ + bool try_win(std::size_t index) noexcept + { + bool expected = false; + if(has_winner_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + { + winner_index_ = index; + stop_source_.request_stop(); + return true; + } + return false; + } + + /** + * @brief Store the winning task's result into the state's optional storage. + * + * @pre Called only by the winner (i.e., after `try_win` returned `true`). + * @param value The winning task's result; moved into the state's `std::optional`. + * + * This operation is noexcept when moving into `std::optional` is nothrow. + */ + void set_winner_result(T value) + noexcept(std::is_nothrow_move_assignable_v>) + { + result_ = std::move(value); + } + + /** + * Stores the winning task's exception for later inspection or rethrow. + * + * @param ep Exception pointer captured from the winning task. + * @pre This is called only by the winner (i.e., after try_win returned true). + */ + void set_winner_exception(std::exception_ptr ep) noexcept + { + winner_exception_ = ep; + } + + /** + * @brief Signals that a runner has finished and, if it was the last, resumes the parent coroutine. + * + * Called by each runner's final_suspend. Decrements the remaining count and, when the + * last runner completes, dispatches the stored continuation via the caller executor so + * the parent resumes after all child frames are destroyed. + * + * @return Coroutine to resume: the parent's continuation if this call observed the final completion, otherwise a noop coroutine. + */ + coro signal_completion() noexcept + { + auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel); + if(remaining == 1) + return caller_ex_.dispatch(continuation_); + return std::noop_coroutine(); + } +}; + +/** Specialization for void tasks (no result storage needed). + + When racing void-returning tasks, there is no result value to store. + Only the winner's index and any exception are tracked. +*/ +template<> +struct when_any_homogeneous_state +{ + /** Counter for tasks still running. */ + std::atomic remaining_count_; + + /** Total number of tasks being raced. */ + std::size_t task_count_; + + /** Flag indicating whether a winner has been determined. */ + std::atomic has_winner_{false}; + + /** Index of the winning task in the vector. */ + std::size_t winner_index_{0}; + + /** Exception thrown by the winner, if any. */ + std::exception_ptr winner_exception_; + + /** Handles to runner coroutines for cleanup. */ + std::vector runner_handles_; + + /** Stop source for cancelling sibling tasks. */ + std::stop_source stop_source_; + + /** Callback functor that forwards stop requests. */ + struct stop_callback_fn + { + /** Pointer to the stop source to signal. */ + std::stop_source* source_; + + /** + * @brief Request cancellation on the associated stop source. + * + * Invokes `request_stop()` on the underlying `std::stop_source`. + */ + void operator()() const noexcept { source_->request_stop(); } + }; + + /** Type alias for the stop callback registration. */ + using stop_callback_t = std::stop_callback; + + /** Optional callback linking parent's stop token to our stop source. */ + std::optional parent_stop_callback_; + + /** Parent coroutine handle to resume when all children complete. */ + coro continuation_; + + /** Executor reference for dispatching the parent resumption. */ + executor_ref caller_ex_; + + /** + * @brief Create a shared state for racing a given number of homogeneous tasks. + * + * Initializes the completion counter, records the total task count, + * and allocates storage for per-task runner coroutine handles. + * + * @param count Number of tasks participating in the race. + */ + explicit when_any_homogeneous_state(std::size_t count) + : remaining_count_(count) + , task_count_(count) + , runner_handles_(count) + { + } + + /** + * @brief Destroy the state and release any stored runner coroutine handles. + * + * Iterates over stored coroutine handles in `runner_handles_` and calls `destroy()` on each non-null handle + * to release coroutine resources and avoid leaks. + */ + ~when_any_homogeneous_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + /** + * Attempt to claim winner status for the calling task. + * + * If successful, records the winner index and requests cancellation of sibling tasks. + * + * @param index The task's runtime index to record as the winner. + * @return `true` if this task became the winner, `false` otherwise. + */ + bool try_win(std::size_t index) noexcept + { + bool expected = false; + if(has_winner_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + { + winner_index_ = index; + stop_source_.request_stop(); + return true; + } + return false; + } + + /** + * Stores the winning task's exception for later inspection or rethrow. + * + * @param ep Exception pointer captured from the winning task. + * @pre This is called only by the winner (i.e., after try_win returned true). + */ + void set_winner_exception(std::exception_ptr ep) noexcept + { + winner_exception_ = ep; + } + + /** + * @brief Signals that a runner has finished and, if it was the last, resumes the parent coroutine. + * + * Called by each runner's final_suspend. Decrements the remaining count and, when the + * last runner completes, dispatches the stored continuation via the caller executor so + * the parent resumes after all child frames are destroyed. + * + * @return Coroutine to resume: the parent's continuation if this call observed the final completion, otherwise a noop coroutine. + */ + coro signal_completion() noexcept + { + auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel); + if(remaining == 1) + return caller_ex_.dispatch(continuation_); + return std::noop_coroutine(); + } +}; + +/** Wrapper coroutine for homogeneous when_any tasks (vector overload). + + Same role as when_any_runner but uses a runtime index instead of + a compile-time index, allowing it to work with vectors of tasks. + + @tparam T The common result type of all tasks. +*/ +template +struct when_any_homogeneous_runner +{ + /** Promise type for the homogeneous runner coroutine. + + Manages executor propagation, stop token forwarding, and completion + signaling for the wrapped child task. + */ + struct promise_type : frame_allocating_base + { + /** Pointer to shared state for winner coordination. */ + when_any_homogeneous_state* state_ = nullptr; + + /** Runtime index of this task in the vector. */ + std::size_t index_ = 0; + + /** Executor reference inherited from the parent coroutine. */ + executor_ref ex_; + + /** Stop token for cooperative cancellation. */ + std::stop_token stop_token_; + + /** + * @brief Construct a runner coroutine object bound to this promise. + * + * @return when_any_homogeneous_runner A coroutine wrapper owning the coroutine handle created from this promise. + */ + when_any_homogeneous_runner get_return_object() + { + return when_any_homogeneous_runner( + std::coroutine_handle::from_promise(*this)); + } + + /** + * @brief Suspend the coroutine immediately upon creation. + * + * @return An awaiter that always suspends the coroutine. + */ + std::suspend_always initial_suspend() noexcept + { + return {}; + } + + /** + * @brief Final-suspend awaiter that notifies shared state this runner has finished. + * + * The returned awaiter always suspends; on suspend it calls the shared state's + * signal_completion() to decrement the remaining task count and obtain the + * coroutine to resume (the parent continuation) if this was the last runner. + * + * @return The parent coroutine to resume if this was the last task, `std::noop_coroutine()` otherwise. + */ + auto final_suspend() noexcept + { + /** Awaiter that signals task completion and potentially resumes parent. */ + struct awaiter + { + /** Pointer to the promise for accessing shared state. */ + promise_type* p_; + + /** + * @brief Indicates the awaiter is never ready and always suspends. + * + * @return `false` — always suspend the awaiting coroutine. + */ + bool await_ready() const noexcept + { + return false; + } + + /** + * Notify the shared when_any state that this runner has completed and obtain the coroutine to resume. + * + * @return The parent coroutine to resume if this was the last outstanding task; otherwise `std::noop_coroutine()`. + */ + coro await_suspend(coro) noexcept + { + return p_->state_->signal_completion(); + } + + /** + * @brief No-op await_resume for the final_suspend awaiter. + * + * Performs no action when resumed; control simply returns to the awaiting context. + */ + void await_resume() const noexcept + { + } + }; + return awaiter{this}; + } + + /** + * @brief Satisfies the coroutine promise's return requirement when a runner completes successfully. + * + * @details No-op used to indicate normal completion of the runner coroutine. Result capture and + * propagation are performed by the runner/launcher infrastructure; this function itself performs no work. + */ + void return_void() + { + } + + /** + * @brief Handle an exception thrown by the child runner. + * + * Exceptions are treated as valid completions. If this runner's exception + * becomes the winning result it is stored for rethrowing to the caller once + * all runners complete; otherwise the exception is discarded. + */ + void unhandled_exception() + { + if(state_->try_win(index_)) + state_->set_winner_exception(std::current_exception()); + } + + /** Awaiter wrapper that injects executor and stop token into child awaitables. + + @tparam Awaitable The underlying awaitable type being wrapped. + */ + template + struct transform_awaiter + { + /** The wrapped awaitable instance. */ + std::decay_t a_; + + /** Pointer to promise for accessing executor and stop token. */ + promise_type* p_; + + /** + * Indicates whether the underlying awaitable can complete synchronously. + * + * @return `true` if the awaitable can complete synchronously, `false` otherwise. + */ + bool await_ready() + { + return a_.await_ready(); + } + + /** + * @brief Resume the underlying awaitable and obtain its result. + * + * @return The value returned by the wrapped awaitable's `await_resume`. + */ + auto await_resume() + { + return a_.await_resume(); + } + + /** Suspend with executor and stop token injection. + + @tparam Promise The suspending coroutine's promise type. + @param h Handle to the suspending coroutine. + @return Coroutine to resume or void. + */ + template + /** + * @brief Forwards suspension to the wrapped awaitable while injecting the promise's executor and stop token. + * + * @param h Coroutine handle of the awaiting coroutine. + * @return The value returned by the underlying awaitable's `await_suspend` call. + */ + auto await_suspend(std::coroutine_handle h) + { + return a_.await_suspend(h, p_->ex_, p_->stop_token_); + } + }; + + /** Transform awaitables to inject executor and stop token. + + @tparam Awaitable The awaitable type being co_awaited. + @param a The awaitable instance. + @return Transformed awaiter with executor/stop_token injection. + */ + template + /** + * @brief Adapts an awaitable to the runner promise's execution context. + * + * Produces an awaitable that will run with this promise's executor and stop token: + * if the awaited type accepts an executor and stop token, the returned awaitable + * forwards this promise's executor and stop token to the inner operation; + * otherwise the returned awaitable is bound to this promise's executor. + * + * @tparam Awaitable Type of the awaitable being adapted. + * @param a The awaitable to adapt. + * @return An awaitable adapted to execute with the promise's executor and stop token. + */ + auto await_transform(Awaitable&& a) + { + using A = std::decay_t; + if constexpr (IoAwaitable) + { + return transform_awaiter{ + std::forward(a), this}; + } + else + { + return make_affine(std::forward(a), ex_); + } + } + }; + + /** Handle to the underlying coroutine frame. */ + std::coroutine_handle h_; + + /** + * @brief Create a runner wrapper that holds the given coroutine handle. + * + * @param h Coroutine handle for the runner's promise; stored by the wrapper. + */ + explicit when_any_homogeneous_runner(std::coroutine_handle h) + : h_(h) + { + } + + /** Move constructor (Clang 14 workaround). + + Clang 14 (non-Apple) has a coroutine codegen bug requiring explicit + move constructor; other compilers work correctly with deleted move. + */ +#if defined(__clang__) && __clang_major__ == 14 && !defined(__apple_build_version__) + /** + * @brief Move-constructs a runner by transferring ownership of the coroutine handle. + * + * Transfers the internal coroutine handle from `other` into this object and leaves + * `other` in a null (released) state. + * + * @param other Runner to move from; its handle is set to null after construction. + */ + when_any_homogeneous_runner(when_any_homogeneous_runner&& other) noexcept + : h_(std::exchange(other.h_, nullptr)) {} +#endif + + /** + * @brief Deleted copy constructor; instances of this type cannot be copied and are move-only. + */ + when_any_homogeneous_runner(when_any_homogeneous_runner const&) = delete; + + /** Copy assignment is not allowed. */ + when_any_homogeneous_runner& operator=(when_any_homogeneous_runner const&) = delete; + + /** Move construction is deleted (except on Clang 14). */ +#if !defined(__clang__) || __clang_major__ != 14 || defined(__apple_build_version__) + /** + * @brief Deleted move constructor to disable moving of runner objects. + * + * Runner instances are not movable; ownership of the underlying coroutine handle cannot be transferred. + */ +when_any_homogeneous_runner(when_any_homogeneous_runner&&) = delete; +#endif + + /** + * @brief Move-assignment is disabled to prevent assigning runner instances. + * + * The runner type is non-assignable; move assignment is explicitly deleted. + */ + when_any_homogeneous_runner& operator=(when_any_homogeneous_runner&&) = delete; + + /** Release ownership of the coroutine handle. + + @return The coroutine handle; this object becomes empty. + */ + auto release() noexcept + { + return std::exchange(h_, nullptr); + } +}; + +/** Create a runner coroutine for a homogeneous when_any task. + + Factory function that creates a wrapper coroutine for a child task + in the vector overload. Uses a runtime index instead of compile-time. + + @tparam T The result type of the task being wrapped. + @param inner The task to run (will be moved from). + @param state Shared state for winner coordination. + @param index Runtime index of this task in the vector. + @return Runner coroutine (must be started via resume()). +*/ +template +/** + * @brief Creates a runner that awaits a single task and attempts to claim it as the winner for a homogeneous when_any. + * + * The returned runner awaits the provided `inner` task; upon its completion it calls `state->try_win(index)` to + * attempt to become the race winner. If `T` is not `void` and the runner wins, the runner stores the task's result + * into `state` via `set_winner_result`, or stores an exception via `set_winner_exception` if storing the result throws. + * + * @tparam T The task result type. + * @param inner The task to be awaited by the runner. + * @param state Pointer to the shared homogeneous when_any state used to coordinate winner selection and result storage. + * @param index The runtime index of this task within the originating task vector; becomes the winner index if this runner wins. + * @return when_any_homogeneous_runner A coroutine runner that will execute `inner` and update `state` on completion. + */ +when_any_homogeneous_runner +make_when_any_homogeneous_runner(task inner, when_any_homogeneous_state* state, std::size_t index) +{ + if constexpr (std::is_void_v) + { + co_await std::move(inner); + state->try_win(index); // void tasks have no result to store + } + else + { + auto result = co_await std::move(inner); + if(state->try_win(index)) + { + try + { + state->set_winner_result(std::move(result)); + } + catch(...) + { + state->set_winner_exception(std::current_exception()); + } + } + } +} + +/** Awaitable that launches all runners for homogeneous when_any. + + Same lifetime concerns as when_any_launcher; see its documentation. + Uses runtime iteration over the task vector instead of compile-time + expansion over a tuple. + + @tparam T The common result type of all tasks in the vector. +*/ +template +class when_any_homogeneous_launcher +{ + /** Pointer to vector of tasks to launch. */ + std::vector>* tasks_; + + /** Pointer to shared state for coordination. */ + when_any_homogeneous_state* state_; + +public: + /** + * @brief Constructs a launcher that will start runners for a vector of tasks using shared state. + * + * @param tasks Pointer to the vector of tasks; must be non-null and remain valid for the lifetime of the awaitable. + * @param state Pointer to the shared when_any_homogeneous_state coordinating winner selection; must remain valid for the lifetime of the awaitable. + */ + when_any_homogeneous_launcher( + std::vector>* tasks, + when_any_homogeneous_state* state) + : tasks_(tasks) + , state_(state) + { + } + + /** Check if the launcher can complete synchronously. + + @return True only if there are no tasks (degenerate case). + */ + bool await_ready() const noexcept + { + return tasks_->empty(); + } + + /** Launch all runner coroutines and suspend the parent. + + Sets up stop propagation from parent to children, then launches + each task in a runner coroutine. Returns noop_coroutine because + runners resume the parent via signal_completion(). + + CRITICAL: If the last task finishes synchronously then the parent + coroutine resumes, destroying its frame, and destroying this object + prior to the completion of await_suspend. Therefore, await_suspend + must ensure `this` cannot be referenced after calling `launch_one` + for the last time. + + @tparam Ex The executor type. + @param continuation Handle to the parent coroutine to resume later. + @param caller_ex Executor for dispatching child coroutines. + @param parent_token Stop token from the parent for cancellation propagation. + @return noop_coroutine; parent is resumed by the last completing task. + */ + template + /** + * @brief Stores parent continuation and executor, propagates the parent's stop requests, and launches all runner coroutines for the task vector. + * + * The method records the parent's continuation and executor into shared state, registers a callback to forward the provided parent stop token to the state's stop_source_ (and requests stop immediately if the parent already requested it), then creates and dispatches a runner for each task using the state's stop token. + * + * @param continuation Coroutine handle of the awaiting parent; saved to be resumed when all runners complete. + * @param caller_ex Executor reference used to dispatch runners. + * @param parent_token Parent-level stop token; if stop is possible it is linked to the state's stop_source_ and, if already requested, causes an immediate request_stop on the state's stop_source_. + * @returns coro A noop coroutine handle (std::noop_coroutine()) to indicate the caller remains suspended while runners execute. + */ + coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {}) + { + state_->continuation_ = continuation; + state_->caller_ex_ = caller_ex; + + // Forward parent's stop requests to children + if(parent_token.stop_possible()) + { + state_->parent_stop_callback_.emplace( + parent_token, + typename when_any_homogeneous_state::stop_callback_fn{&state_->stop_source_}); + + if(parent_token.stop_requested()) + state_->stop_source_.request_stop(); + } + + auto num_tasks = tasks_->size(); + auto token = state_->stop_source_.get_token(); + for(std::size_t i = 0; i < num_tasks; ++i) + launch_one( i, caller_ex, token); + + return std::noop_coroutine(); + } + + /** + * @brief Resume the awaiting coroutine once all launched runner coroutines have completed. + * + * This function performs no action and returns no value; results and any winner state are available + * from the shared when_any state object maintained by the launcher. + */ + void await_resume() const noexcept + { + } + +private: + /** Launch a single runner coroutine for task at the given index. + + Creates the runner, configures its promise with state and executor, + stores its handle for cleanup, and dispatches it for execution. + + @tparam Ex The executor type. + @param index Runtime index of the task in the vector. + @param caller_ex Executor for dispatching the runner. + @param token Stop token for cooperative cancellation. + + @pre Ex::dispatch() and coro::resume() must not throw. If they do, + the coroutine handle may leak. + */ + template + /** + * @brief Starts a runner coroutine for the task at the given index and schedules it for execution. + * + * Initializes the runner's promise with the shared state, the runtime index, the caller executor, + * and the provided stop token; stores the runner handle into the shared state's runner handle + * collection and dispatches it on the caller executor. + * + * @param index Runtime index of the task to launch within the tasks vector. + * @param caller_ex Executor on which the runner coroutine will be dispatched and resumed. + * @param token Stop token used to propagate cancellation to the runner. + */ + void launch_one(std::size_t index, Ex const& caller_ex, std::stop_token token) + { + auto runner = make_when_any_homogeneous_runner( + std::move((*tasks_)[index]), state_, index); + + auto h = runner.release(); + h.promise().state_ = state_; + h.promise().index_ = index; + h.promise().ex_ = caller_ex; + h.promise().stop_token_ = token; + + coro ch{h}; + state_->runner_handles_[index] = ch; + caller_ex.dispatch(ch).resume(); + } +}; + +} // namespace detail + +/** Wait for the first task to complete (homogeneous overload). + + Races a vector of tasks with the same result type. Simpler than the + heterogeneous overload: returns a direct pair instead of a variant + since all tasks share the same type. + + @par Example + @code + task example() { + std::vector> requests; + requests.push_back(fetch_from_server(0)); + requests.push_back(fetch_from_server(1)); + requests.push_back(fetch_from_server(2)); + + auto [index, response] = co_await when_any(std::move(requests)); + // index is 0, 1, or 2; response is the winner's Response + } + @endcode + + @tparam T The common result type of all tasks (must not be void). + @param tasks Vector of tasks to race concurrently (must not be empty). + @return A task yielding a pair of (winner_index, result). + @throws std::invalid_argument if tasks is empty. + + @par Key Features + @li All tasks are launched concurrently + @li Returns when first task completes (success or failure) + @li Stop is requested for all siblings + @li Waits for all siblings to complete before returning + @li If winner threw, that exception is rethrown + @li Returns simple pair (no variant needed for homogeneous types) +*/ +template + requires (!std::is_void_v) +/** + * @brief Waits for the first-completing task in a homogeneous vector and returns its index and value. + * + * Awaits all provided tasks concurrently, cancels remaining tasks when a winner is selected, and returns + * a pair containing the winner's zero-based index and the winner's result value. + * + * @param tasks Vector of tasks to race; each task must yield a value of type `T`. + * @return std::pair Pair where the first element is the winner's index and the second is the winner's result (moved). + * @throws std::invalid_argument if `tasks` is empty. + * @throws Any exception propagated from the winning task (rethrown after all tasks complete). + */ +[[nodiscard]] task> +when_any(std::vector> tasks) +{ + if(tasks.empty()) + throw std::invalid_argument("when_any requires at least one task"); + + using result_type = std::pair; + + detail::when_any_homogeneous_state state(tasks.size()); + + co_await detail::when_any_homogeneous_launcher(&tasks, &state); + + if(state.winner_exception_) + std::rethrow_exception(state.winner_exception_); + + co_return result_type{state.winner_index_, std::move(*state.result_)}; +} + +/** + * @brief Race a set of void-returning tasks and produce the zero-based index of the first task to complete. + * + * @param tasks Vector of void tasks to race; must contain at least one task. + * @return Zero-based index of the task that completed first. + * @throws std::invalid_argument if `tasks` is empty. + * + * If the winning task throws an exception, that exception is rethrown after all tasks have finished. + */ +[[nodiscard]] inline task +when_any(std::vector> tasks) +{ + if(tasks.empty()) + throw std::invalid_argument("when_any requires at least one task"); + + detail::when_any_homogeneous_state state(tasks.size()); + + co_await detail::when_any_homogeneous_launcher(&tasks, &state); + + if(state.winner_exception_) + std::rethrow_exception(state.winner_exception_); + + co_return state.winner_index_; +} + +/** Alias for vector when_any result type. + + For homogeneous when_any (vector overload), the result is simpler: + a pair of the winner's index and the result value directly (no variant + needed since all tasks share the same type). + + @par Example + @code + void on_complete(when_any_vector_result_type result); + @endcode + + @tparam T The common result type of all tasks in the vector. +*/ +template +using when_any_vector_result_type = std::pair; + +} // namespace capy +} // namespace boost + +#endif \ No newline at end of file diff --git a/test/unit/when_any.cpp b/test/unit/when_any.cpp new file mode 100644 index 00000000..0e40fecf --- /dev/null +++ b/test/unit/when_any.cpp @@ -0,0 +1,2422 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Test that header file is self-contained. +#include + +#include +#include +#include +#include + +#include "test_suite.hpp" + +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { + +// Static assertions for result type +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +// Void becomes monostate in the variant +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +// Duplicate types are deduplicated (variant requires unique types) +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +// All void tasks deduplicate to single monostate +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +// Verify when_any returns task which satisfies awaitable protocols +static_assert(IoAwaitable< + task>, + executor_ref>); + +// Minimal test context +class test_context : public execution_context +{ +}; + +static test_context default_test_ctx_; + +/** Simple synchronous executor for testing. +*/ +struct test_executor +{ + int* dispatch_count_; + test_context* ctx_ = nullptr; + + /** + * @brief Constructs a test executor that records dispatch calls. + * + * The executor will increment the provided counter each time its `dispatch` + * method is invoked. + * + * @param count Reference to an integer used to track the number of dispatches. + */ + explicit test_executor(int& count) + : dispatch_count_(&count) + { + } + + /** + * @brief Checks whether two test_executor instances refer to the same dispatch counter. + * + * @param other The other executor to compare against. + * @return true if both executors share the same `dispatch_count_` pointer, false otherwise. + */ + bool operator==(test_executor const& other) const noexcept + { + return dispatch_count_ == other.dispatch_count_; + } + + /** + * @brief Obtain the execution context used by this executor. + * + * Returns the executor's stored execution_context if one was provided; otherwise returns the global default_test_ctx_. + * + * @return execution_context& Reference to the active execution context for this executor. + */ + execution_context& context() const noexcept + { + return ctx_ ? *ctx_ : default_test_ctx_; + } + + /** + * @brief Notifies the executor that a unit of work has started. + * + * This implementation is a no-op; it exists to satisfy the executor interface. + */ +void on_work_started() const noexcept {} + /** + * @brief Notifies that a unit of work has finished on the executor. + * + * This executor implementation does not track active work, so the hook is a no-op. + */ +void on_work_finished() const noexcept {} + + /** + * @brief Marks a coroutine as dispatched by incrementing the dispatch counter and returning the handle. + * + * Increments the executor's internal dispatch counter to record a dispatch operation. + * + * @param h Coroutine handle to be dispatched. + * @return coro The same coroutine handle `h`. + */ + coro dispatch(coro h) const + { + ++(*dispatch_count_); + return h; + } + + /** + * @brief Immediately resumes the provided coroutine handle. + * + * @param h Coroutine handle to resume; the function will call `resume()` on it. + */ + void post(coro h) const + { + h.resume(); + } +}; + +static_assert(Executor); + +struct test_exception : std::runtime_error +{ + /** + * @brief Constructs a test_exception with the specified message. + * + * @param msg Null-terminated error message stored in the exception. + */ + explicit test_exception(const char* msg) + : std::runtime_error(msg) + { + } +}; + +/** + * @brief Throws a test_exception constructed with the provided message. + * + * @param msg Message forwarded to the thrown test_exception. + * @throws test_exception Always throws a test_exception initialized with `msg`. + */ +[[noreturn]] inline void +throw_test_exception(char const* msg) +{ + throw test_exception(msg); +} + +//---------------------------------------------------------- +// Shared helper tasks for all when_any tests +/** + * @brief Creates a task that completes with the specified integer. + * + * @param value Integer to be returned by the task. + * @return task A task whose result is the provided `value`. + */ + +inline task +returns_int(int value) +{ + co_return value; +} + +/** + * @brief Creates a task that completes with the provided string. + * + * @param value The string value that the task will produce. + * @return std::string The provided string value. + */ +inline task +returns_string(std::string value) +{ + co_return value; +} + +/** + * @brief Creates a task that completes immediately without producing a value. + * + * @return task A task that is already completed and yields no value. + */ +inline task +void_task() +{ + co_return; +} + +/** + * @brief Coroutine that immediately throws a test_exception with the provided message. + * + * This task does not produce a normal integer result; it signals failure by throwing + * a test_exception constructed from `msg`. + * + * @param msg Human-readable message used to construct the thrown test_exception. + * @return task An awaitable task which, when resumed, will propagate the thrown test_exception. + * @throws test_exception Always thrown with the given `msg`. + */ +inline task +throws_exception(char const* msg) +{ + throw_test_exception(msg); + co_return 0; +} + +/** + * @brief Returns a task that immediately throws a test_exception with the given message. + * + * The returned coroutine does not produce a value; awaiting it will propagate a + * test_exception constructed from `msg`. + * + * @param msg Null-terminated message used for the thrown test_exception. + */ +inline task +void_throws_exception(char const* msg) +{ + throw_test_exception(msg); + co_return; +} + +//---------------------------------------------------------- +// Shared executors and awaitables for all when_any tests +//---------------------------------------------------------- + +/** Queuing executor that allows controlled interleaving of tasks. + + Unlike test_executor which runs tasks synchronously, this executor + queues work and runs it in FIFO order when run_one() is called. + This allows tasks to observe stop requests between suspension points. +*/ +struct queuing_executor +{ + std::vector* queue_; + test_context* ctx_ = nullptr; + + /** + * @brief Constructs a queuing_executor that uses the provided coroutine queue. + * + * The executor enqueues dispatched or posted coroutine handles into the supplied vector. + * + * @param q Vector used as the FIFO queue for coroutine handles; must outlive the executor. + */ + explicit queuing_executor(std::vector& q) + : queue_(&q) + { + } + + /** + * @brief Compares two queuing_executor instances for equality. + * + * @returns `true` if both executors refer to the same underlying queue, `false` otherwise. + */ + bool operator==(queuing_executor const& other) const noexcept + { + return queue_ == other.queue_; + } + + /** + * @brief Obtain the execution context used by this executor. + * + * Returns the executor's stored execution_context if one was provided; otherwise returns the global default_test_ctx_. + * + * @return execution_context& Reference to the active execution context for this executor. + */ + execution_context& context() const noexcept + { + return ctx_ ? *ctx_ : default_test_ctx_; + } + + /** + * @brief Notifies the executor that a unit of work has started. + * + * This implementation is a no-op; it exists to satisfy the executor interface. + */ +void on_work_started() const noexcept {} + /** + * @brief Notifies that a unit of work has finished on the executor. + * + * This executor implementation does not track active work, so the hook is a no-op. + */ +void on_work_finished() const noexcept {} + + /** + * @brief Enqueues a coroutine handle for later execution by this executor. + * + * @param h Coroutine handle to append to the executor's internal queue. + * @return coro A no-op coroutine handle (`std::noop_coroutine`) offered back to the caller. + */ + coro dispatch(coro h) const + { + queue_->push_back(h); + return std::noop_coroutine(); + } + + /** + * @brief Enqueues a coroutine handle onto the executor's queue for later execution. + * + * @param h Coroutine handle to append to the queue. + */ + void post(coro h) const + { + queue_->push_back(h); + } +}; + +static_assert(Executor); + +/** Awaitable that yields to the executor, allowing other tasks to run. + + When awaited, this suspends the current coroutine and posts it back + to the executor's queue. This creates a yield point where the task + can be interleaved with other tasks. +*/ +struct yield_awaitable +{ + /** + * @brief Indicates that the awaitable will always suspend. + * + * @return `false` indicating the awaitable is not ready and the coroutine should suspend. + */ + bool await_ready() const noexcept + { + return false; + } + + template + /** + * @brief Suspends the awaiting coroutine and re-posts it to the given executor's queue. + * + * Posts the coroutine handle back to the executor via ex.post(h) and yields by returning + * a no-op coroutine handle. + * + * @tparam Ex Executor type providing a `post(coro)` overload. + * @param h The awaiting coroutine handle to be re-scheduled. + * @param ex The executor used to re-post the coroutine. + * @param st Stop token passed by the caller (unused). + * @return coro A no-op coroutine handle used to complete suspension. + */ + coro await_suspend(coro h, Ex const& ex, std::stop_token) + { + // Post ourselves back to the queue + ex.post(h); + return std::noop_coroutine(); + } + + /** + * @brief No-op resume step invoked when the awaitable resumes. + * + * This function intentionally performs no action and exists only to satisfy the awaitable resume hook. + */ + void await_resume() const noexcept + { + } +}; + +struct when_any_test +{ + //---------------------------------------------------------- + // Basic functionality tests + //---------------------------------------------------------- + + /** + * @brief Verifies that a single-task when_any completes immediately with the expected index and value. + * + * Runs a when_any over a single coroutine that returns 42 and asserts that the callback is invoked, + * the reported winner index is 0, and the extracted result is 42. + */ + void + testSingleTask() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + std::size_t winner_index = 999; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result = std::get<0>(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(42))); + + BOOST_TEST(completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result, 42); + } + + // Test: Two tasks - first completes wins + void + testTwoTasksFirstWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + // Note: when_any_result_type deduplicates to variant + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + // Variant is deduplicated to single int type + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(10), returns_int(20))); + + BOOST_TEST(completed); + // One of them should win, with correct index-to-value mapping + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 10); + else + BOOST_TEST_EQ(result_value, 20); + } + + /** + * @brief Verifies when_any reports the correct winner and value for mixed result types. + * + * Runs when_any on three tasks producing `int`, `std::string`, and `int`, captures the winner + * index and the variant-held result, and asserts that a completion occurred, the winner index + * is 0, 1, or 2, and the variant contains the corresponding value for the reported winner. + */ + void + testMixedTypes() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(returns_int(1), returns_string("hello"), returns_int(3))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1 || winner_index == 2); + if (winner_index == 0) + BOOST_TEST_EQ(std::get(result_value), 1); + else if (winner_index == 1) + BOOST_TEST_EQ(std::get(result_value), "hello"); + else + BOOST_TEST_EQ(std::get(result_value), 3); + } + + // Test: Void task can win + void + testVoidTaskWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(void_task(), returns_int(42))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST(std::holds_alternative(result_value)); + else + BOOST_TEST_EQ(std::get(result_value), 42); + } + + /** + * @brief Verifies that when_any with only void tasks reports a valid winner index and yields a monostate result. + * + * Runs three void-returning tasks in a when_any and asserts the operation completes, the reported winner index is 0, 1, or 2, and the returned variant holds a `std::monostate`. + */ + void + testAllVoidTasks() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(void_task(), void_task(), void_task())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1 || winner_index == 2); + // All void tasks produce monostate regardless of index + BOOST_TEST(std::holds_alternative(result_value)); + } + + //---------------------------------------------------------- + // Exception handling tests + //---------------------------------------------------------- + + /** + * @brief Verifies that an exception thrown by the single task passed to when_any is propagated to the exception handler. + * + * @details Invokes when_any with a single coroutine that throws a test_exception and installs handlers that set flags + * indicating normal completion or exception capture. Asserts that the normal completion handler is not called, + * the exception handler is invoked, and the captured exception message equals "test error". + */ + void + testSingleTaskException() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [&](when_any_result_type) { completed = true; }, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(throws_exception("test error"))); + + BOOST_TEST(!completed); + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "test error"); + } + + /** + * @brief Verifies that a thrown exception from one participant of when_any is reported as the winning completion. + * + * Sets up a synchronous test executor and runs when_any with one coroutine that throws and one that returns an int. + * Confirms the exception path is invoked and the propagated exception carries the expected message. + */ + void + testExceptionWinsRace() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](when_any_result_type) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(throws_exception("winner error"), returns_int(42))); + + // With synchronous executor, first task (the thrower) wins + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "winner error"); + } + + /** + * @brief Verifies that an exception thrown by a void task in when_any is propagated to the awaiter. + * + * Runs a void task that throws and a concurrent int-returning task under a test executor, + * awaits their when_any composition, and asserts the thrown test_exception is caught with the expected message. + */ + void + testVoidTaskException() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](when_any_result_type) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(void_throws_exception("void error"), returns_int(42))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "void error"); + } + + /** + * @brief Verifies that when_any delivers the exception from the task that completes first when multiple tasks throw. + * + * Runs three tasks that each throw a test_exception and asserts that an exception is observed + * and that the delivered exception message matches one of the thrown messages. + */ + void + testMultipleExceptionsFirstWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](when_any_result_type) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any( + throws_exception("error_1"), + throws_exception("error_2"), + throws_exception("error_3"))); + + BOOST_TEST(caught_exception); + // One of them wins + BOOST_TEST( + error_msg == "error_1" || + error_msg == "error_2" || + error_msg == "error_3"); + } + + //---------------------------------------------------------- + // Stop token propagation tests + //---------------------------------------------------------- + + /** + * @brief Verifies that completing the winning awaitable triggers stop requests for the others + * + * Runs three synchronous tasks with when_any and checks that the when_any completion + * causes stop to be requested on the remaining awaitables while all tasks still run + * to completion in the synchronous executor scenario. + */ + void + testStopRequestedOnCompletion() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&]() -> task { + ++completion_count; + co_return completion_count.load(); + }; + + run_async(ex, + [&](when_any_result_type) { + completed = true; + }, + [](std::exception_ptr) {})( + when_any(counting_task(), counting_task(), counting_task())); + + BOOST_TEST(completed); + // All three tasks should run to completion + // (stop is requested, but synchronous tasks complete anyway) + BOOST_TEST_EQ(completion_count.load(), 3); + } + + /** + * @brief Verifies that all participating tasks complete for cleanup even after a winner is determined. + * + * Runs a four-task `when_any` on a synchronous test executor, asserts the reported winner index is 0, + * and verifies that all four tasks have completed (cleanup semantics). + */ + void + testAllTasksCompleteForCleanup() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&](int value) -> task { + ++completion_count; + co_return value; + }; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + // Winner should be first task (synchronous executor) + BOOST_TEST_EQ(r.first, 0u); + }, + [](std::exception_ptr) {})( + when_any( + counting_task(1), + counting_task(2), + counting_task(3), + counting_task(4))); + + BOOST_TEST(completed); + // All four tasks must complete for proper cleanup + BOOST_TEST_EQ(completion_count.load(), 4); + } + + //---------------------------------------------------------- + // Long-lived task cancellation tests + //---------------------------------------------------------- + + /** + * @brief Verifies that long-lived tasks are cancelled when a faster task wins a when_any race. + * + * Constructs a when_any composed of one immediately completing task and two multi-step tasks + * that cooperatively observe the stop token. Asserts that the fast task is reported as the + * winner (index 0) with the expected value, that only the fast task completed normally, + * and that the slow tasks saw stop requests and were cancelled. + * + * Expected assertions: + * - winner index is 0 and winner value is 42. + * - completed_normally_count == 1. + * - cancelled_count == 2. + */ + void + testLongLivedTasksCancelledOnWinner() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + // A task that completes immediately + auto fast_task = [&]() -> task { + ++completed_normally_count; + co_return 42; + }; + + // A task that does multiple steps, checking stop token between each + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; // Cancelled + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + winner_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(fast_task(), slow_task(100, 10), slow_task(200, 10))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); // fast_task wins + BOOST_TEST_EQ(winner_value, 42); + + // The fast task completed normally + BOOST_TEST_EQ(completed_normally_count.load(), 1); + + // Both slow tasks should have been cancelled + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + /** + * @brief Verifies that a slower task can still win a when_any race if it completes first. + * + * Sets up three cooperative tasks with differing step counts, runs them on a queuing executor + * with FIFO scheduling, and asserts that the task which finishes first is reported as the winner, + * that its value is propagated, and that the remaining tasks observe a stop request and are cancelled. + * + * The test specifically checks: + * - the reported winner index and value match the task that completed first, + * - exactly one task completed normally, + * - the other tasks were cancelled. + */ + void + testSlowTaskCanWin() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + // A task that does a few steps then completes + auto medium_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + // Task 0: 3 steps, Task 1: 1 step (wins), Task 2: 4 steps + // With FIFO scheduling, task1 completes after 1 yield while others + // are still in progress and will observe the stop request. + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + winner_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(medium_task(10, 3), medium_task(20, 1), medium_task(30, 4))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 1u); // Task with 1 step wins + BOOST_TEST_EQ(winner_value, 20); + + // Only the winner completed normally + BOOST_TEST_EQ(completed_normally_count.load(), 1); + + // Other two tasks were cancelled + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + /** + * @brief Ensures non-cooperative tasks complete even if another task wins a when_any race. + * + * Sets up a queuing executor with one immediately-completing task and two tasks that + * ignore stop tokens and yield repeatedly. Verifies the fast task wins the when_any + * race and that all tasks (including the non-cooperative ones) run to completion. + */ + void + testNonCooperativeTasksStillComplete() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic completion_count{0}; + bool when_any_completed = false; + + // A task that completes immediately + auto fast_task = [&]() -> task { + ++completion_count; + co_return 42; + }; + + // A task that ignores stop token (non-cooperative) + auto non_cooperative_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + // Deliberately NOT checking stop token + co_await yield_awaitable{}; + } + ++completion_count; + co_return id; + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + BOOST_TEST_EQ(r.first, 0u); // fast_task wins + }, + [](std::exception_ptr) {})( + when_any(fast_task(), non_cooperative_task(100, 3), non_cooperative_task(200, 3))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + + // All three tasks complete (non-cooperative tasks run to completion) + BOOST_TEST_EQ(completion_count.load(), 3); + } + + /** + * @brief Verifies when_any behavior with a mix of cooperative and non-cooperative tasks. + * + * Runs three tasks on a queuing executor: a fast winning task, a cooperative slow task that checks the stop token, + * and a non-cooperative slow task that does not observe cancellation. Asserts that the fast task wins (reported index 0), + * the cooperative task observes cancellation and increments its cancelled counter, and the non-cooperative task still + * runs to completion. Also asserts the winner ran once and the when_any completion callback executed. + */ + void + testMixedCooperativeAndNonCooperativeTasks() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic cooperative_cancelled{0}; + std::atomic non_cooperative_finished{0}; + std::atomic winner_finished{0}; + bool when_any_completed = false; + + auto fast_task = [&]() -> task { + ++winner_finished; + co_return 1; + }; + + auto cooperative_slow = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cooperative_cancelled; + co_return -1; + } + co_await yield_awaitable{}; + } + co_return 2; + }; + + auto non_cooperative_slow = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + co_await yield_awaitable{}; + } + ++non_cooperative_finished; + co_return 3; + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + BOOST_TEST_EQ(r.first, 0u); + }, + [](std::exception_ptr) {})( + when_any(fast_task(), cooperative_slow(5), non_cooperative_slow(5))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_finished.load(), 1); + BOOST_TEST_EQ(cooperative_cancelled.load(), 1); + BOOST_TEST_EQ(non_cooperative_finished.load(), 1); + } + + //---------------------------------------------------------- + // Nested when_any tests + //---------------------------------------------------------- + + /** + * @brief Verifies that a nested when_any correctly reports the winning task and its value. + * + * Creates two inner when_any operations that each yield an `int`, runs an outer when_any + * over those inner tasks, and asserts the reported winner index and the integer value + * returned by the winning inner task. + */ + void + testNestedWhenAny() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + + auto inner1 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(10), returns_int(20)); + co_return std::get(res); + }; + + auto inner2 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(30), returns_int(40)); + co_return std::get(res); + }; + + std::size_t winner_index = 999; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(inner1(), inner2())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // inner1 returns 10 or 20, inner2 returns 30 or 40 + if (winner_index == 0) + BOOST_TEST(result == 10 || result == 20); + else + BOOST_TEST(result == 30 || result == 40); + } + + /** + * @brief Verifies that a when_any combinator nested inside a when_all correctly reports winners. + * + * Executes two independent when_any races inside a when_all and asserts that the combined + * awaiter receives the winner values from each race and that the overall operation completes. + * + * The test checks that the first result is either 1 or 2 and the second result is either 3 or 4, + * and that the completion callback is invoked. + */ + void + testWhenAnyInsideWhenAll() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + auto race1 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(1), returns_int(2)); + co_return std::get(res); + }; + + auto race2 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(3), returns_int(4)); + co_return std::get(res); + }; + + run_async(ex, + [&](std::tuple t) { + auto [a, b] = t; + completed = true; + BOOST_TEST((a == 1 || a == 2)); + BOOST_TEST((b == 3 || b == 4)); + }, + [](std::exception_ptr) {})( + when_all(race1(), race2())); + + BOOST_TEST(completed); + } + + /** + * @brief Verifies that a when_any composed of two tasks that each await when_all + * correctly reports the winning task and its combined integer result. + * + * Spawns two tasks where each task awaits when_all on two int-returning tasks and returns their sum. + * Executes when_any over these tasks and asserts that the completion callback observes a valid + * winner index (0 or 1) and the corresponding summed value (3 for the first task, 7 for the second). + */ + void + testWhenAllInsideWhenAny() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto concurrent1 = []() -> task { + auto [a, b] = co_await when_all(returns_int(1), returns_int(2)); + co_return a + b; + }; + + auto concurrent2 = []() -> task { + auto [a, b] = co_await when_all(returns_int(3), returns_int(4)); + co_return a + b; + }; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(concurrent1(), concurrent2())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // concurrent1 returns 1+2=3, concurrent2 returns 3+4=7 + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 3); + else + BOOST_TEST_EQ(result_value, 7); + } + + //---------------------------------------------------------- + // Edge case tests + //---------------------------------------------------------- + + /** + * @brief Verifies when_any with many integer-returning tasks reports the correct winner. + * + * Runs when_any over eight tasks that return integers 1..8 and asserts that a winner + * is reported (completed == true), the winner index is within range, and the returned + * integer matches the mapping index -> value (index 0 -> 1, index 1 -> 2, etc.). + */ + void + testManyTasks() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + run_async(ex, + [&](auto r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(when_any( + returns_int(1), returns_int(2), returns_int(3), returns_int(4), + returns_int(5), returns_int(6), returns_int(7), returns_int(8))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 8); + // Verify correct index-to-value mapping (index 0 -> value 1, etc.) + BOOST_TEST_EQ(result_value, static_cast(winner_index + 1)); + } + + /** + * @brief Creates a task that accumulates two awaited integer results onto an initial value. + * + * @param start Initial integer value to accumulate onto. + * @return int Final accumulated value: start + 1 + 2. + */ + static task + multi_step_task(int start) + { + int value = start; + value += co_await returns_int(1); + value += co_await returns_int(2); + co_return value; + } + + /** + * @brief Tests that when_any correctly selects the first-completing task across multi-step tasks. + * + * Runs two multi-step tasks (producing ints) via when_any and verifies that a winner index of 0 or 1 + * is reported and that the associated value matches the expected sum for the winning task. + * + * Expected outcomes: + * - winner index is either 0 or 1. + * - if winner index == 0, reported value is 13 (10 + 1 + 2). + * - if winner index == 1, reported value is 23 (20 + 1 + 2). + */ + void + testTasksWithMultipleSteps() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(multi_step_task(10), multi_step_task(20))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // Index 0: 10+1+2=13, Index 1: 20+1+2=23 + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 13); + else + BOOST_TEST_EQ(result_value, 23); + } + + //---------------------------------------------------------- + // Awaitable lifecycle tests + //---------------------------------------------------------- + + /** + * @brief Verifies that a `when_any` awaitable can be move-constructed and still be awaited. + * + * Creates a `when_any` awaitable for two int-returning tasks, move-constructs it, awaits + * the moved awaitable via the test executor, and asserts that a valid winner index and + * corresponding int value are produced. + */ + void + testAwaitableMoveConstruction() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto awaitable1 = when_any(returns_int(1), returns_int(2)); + auto awaitable2 = std::move(awaitable1); + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(std::move(awaitable2)); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 1); + else + BOOST_TEST_EQ(result_value, 2); + } + + /** + * @brief Verifies that a when_any awaitable can be stored and awaited at a later time. + * + * Creates two integer-returning tasks, stores their combined when_any awaitable, then + * passes it to a delayed awaiter. Asserts that the awaiter observes a valid winner + * index (0 or 1) and that the contained integer matches the corresponding task value. + */ + void + testDeferredAwait() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto deferred = when_any(returns_int(10), returns_int(20)); + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(std::move(deferred)); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 10); + else + BOOST_TEST_EQ(result_value, 20); + } + + //---------------------------------------------------------- + // Protocol compliance tests + //---------------------------------------------------------- + + /** + * @brief Ensures compile-time conformance of when_any results to the IoAwaitable concept. + * + * Performs static assertions that `task>` satisfies + * `IoAwaitable<..., executor_ref>` for representative type combinations + * (homogeneous, heterogeneous, and `void`-containing results). + */ + void + testIoAwaitableConcept() + { + static_assert(IoAwaitable< + task>, + executor_ref>); + + static_assert(IoAwaitable< + task>, + executor_ref>); + + static_assert(IoAwaitable< + task>, + executor_ref>); + + static_assert(IoAwaitable< + task>, + executor_ref>); + } + + //---------------------------------------------------------- + // Variant access tests + //---------------------------------------------------------- + + /** + * @brief Verifies that when_any populates the correct variant alternative for deduplicated result types. + * + * This test invokes when_any with tasks producing `int`, `std::string`, and `int` (duplicate `int`). + * It asserts that duplicate types are deduplicated in the resulting variant, the winner index is 0 + * for a synchronous executor, and the variant holds the expected `int` value (42). + */ + void + testVariantAlternativePopulated() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + // Note: deduplicates to variant + run_async(ex, + [&](when_any_result_type r) { + completed = true; + // With synchronous executor, first task wins + BOOST_TEST_EQ(r.first, 0u); + BOOST_TEST(std::holds_alternative(r.second)); + BOOST_TEST_EQ(std::get(r.second), 42); + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"), returns_int(99))); + + BOOST_TEST(completed); + } + + /** + * @brief Verifies that when_any's result variant holds the correct alternative for int and string tasks. + * + * Runs two tasks (one returning `int`, one returning `std::string`) with `when_any` and asserts that: + * - the combinator completes, + * - the reported winner index is either 0 or 1, + * - the variant contains the corresponding value (`int` when index 0, `std::string` when index 1). + */ + void + testVariantVisit() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(std::get(result_value), 42); + else + BOOST_TEST_EQ(std::get(result_value), "hello"); + } + + //---------------------------------------------------------- + // Parent stop token propagation tests + //---------------------------------------------------------- + + /** + * @brief Verifies that a parent stop token already requested before starting when_any + * + * Tests that when a parent stop token is requested prior to invoking when_any, each + * child task observes the stop request (via get_stop_token) on its first suspension + * and the when_any operation completes with a reported winner index. + * + * The test asserts that when_any completes and that all launched tasks saw the + * stop token as requested. + */ + void + testParentStopAlreadyRequested() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic saw_stop_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + // A task that checks stop token on first suspension + auto check_stop_task = [&](int id) -> task { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++saw_stop_count; + } + co_return id; + }; + + // Use a stop_source to simulate parent cancellation + std::stop_source parent_stop; + parent_stop.request_stop(); + + // Use run_async with stop_token parameter to test propagation + run_async(ex, parent_stop.get_token(), + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(check_stop_task(1), check_stop_task(2), check_stop_task(3))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // All tasks should have seen the stop token as requested + // (inherited from parent) + BOOST_TEST_EQ(saw_stop_count.load(), 3); + } + + /** + * @brief Verifies that a parent stop request during task execution cancels all child tasks and completes when_any. + * + * Spawns two cooperative long-running tasks via when_any on a queuing executor, starts a few scheduling iterations, + * then requests stop from the parent stop_source. Observes that the when_any completion callback runs and that both + * tasks observe the stop request and increment the cancellation counter. + * + * Observable effects: + * - Sets the external flag indicating when_any completed. + * - Increments the provided atomic cancellation counter for each task that observes the stop. + */ + void + testParentStopDuringExecution() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + bool when_any_completed = false; + + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + co_return id; + }; + + std::stop_source parent_stop; + + // Use run_async with stop_token parameter + run_async(ex, parent_stop.get_token(), + [&](when_any_result_type) { + when_any_completed = true; + }, + [](std::exception_ptr) {})( + when_any(slow_task(1, 10), slow_task(2, 10))); + + // Run a few iterations, then request parent stop + for (int i = 0; i < 3 && !work_queue.empty(); ++i) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + // Request stop from parent + parent_stop.request_stop(); + + // Finish processing + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // Both tasks should have been cancelled by parent stop + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + //---------------------------------------------------------- + // Interleaved exception tests + //---------------------------------------------------------- + + /** + * @brief Verifies that when_any reports the first exception produced when multiple tasks throw. + * + * Schedules three tasks that each throw a test_exception after a configurable number of yields, + * runs them on a queuing executor until completion, and asserts that the first-thrown exception + * is observed and carries the expected message ("error_2"). + */ + void + testInterleavedExceptions() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + bool caught_exception = false; + std::string error_msg; + + // Tasks that yield before throwing + auto delayed_throw = [](int id, int yields) -> task { + for (int i = 0; i < yields; ++i) { + co_await yield_awaitable{}; + } + throw test_exception(("error_" + std::to_string(id)).c_str()); + co_return id; + }; + + run_async(ex, + [](when_any_result_type) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(delayed_throw(1, 2), delayed_throw(2, 1), delayed_throw(3, 3))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(caught_exception); + // Task 2 throws first (after 1 yield) + BOOST_TEST_EQ(error_msg, "error_2"); + } + + //---------------------------------------------------------- + // Nested stop propagation tests + //---------------------------------------------------------- + + /** + * @brief Tests that stop requests propagate to a nested when_any so the outer task is cancelled before launching the inner work. + * + * Verifies that when one branch of a when_any completes quickly, other branches observe the stop request: + * the fast task should win the race, the when_any completion callback should record the winner index 0, + * and the nested branch should observe the stop token and increment the cancellation counter instead of running inner logic. + */ + void + testNestedStopPropagationOuterCancelled() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic outer_cancelled{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + auto fast_task = [&]() -> task { + co_return 42; + }; + + // A task that checks stop before launching inner when_any + auto nested_when_any_task = [&]() -> task { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++outer_cancelled; + co_return -1; + } + // Won't reach here if stopped + co_return 100; + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(fast_task(), nested_when_any_task())); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); // fast_task wins + // The nested task should see stop and exit early + BOOST_TEST_EQ(outer_cancelled.load(), 1); + } + + /** + * @brief Tests that stop requests propagate into a nested when_any so its children observe cancellation. + * + * Sets up a nested when_any inside a task and races it against a fast yielding task using a queuing_executor. + * Observes the resulting winner and verifies cancellation/completion counts on the nested children: + * - If the outer fast task wins, both inner tasks must observe a stop request (inner_cancelled == 2, inner_completed == 0). + * - If the nested when_any wins, one inner task completes and the other is cancelled (inner_completed == 1, inner_cancelled == 1). + * + * The test also asserts that the outer when_any completes and reports a valid winner index (0 or 1). + */ + void + testNestedStopPropagationInnerCancelled() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic inner_cancelled{0}; + std::atomic inner_completed{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + // Fast task that yields first to let nested when_any start + auto yielding_fast_task = [&]() -> task { + co_await yield_awaitable{}; + co_return 42; + }; + + auto slow_inner_task = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++inner_cancelled; + co_return -1; + } + co_await yield_awaitable{}; + } + ++inner_completed; + co_return 100; + }; + + // A task containing a nested when_any - doesn't check stop first + auto nested_when_any_task = [&]() -> task { + // Start inner when_any immediately (no stop check first) + auto [idx, res] = co_await when_any( + slow_inner_task(10), + slow_inner_task(10)); + co_return std::get(res); + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(yielding_fast_task(), nested_when_any_task())); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // One of them should win + BOOST_TEST(winner_index == 0 || winner_index == 1); + + if (winner_index == 0) { + // If yielding_fast_task won, the inner tasks should be cancelled + BOOST_TEST_EQ(inner_cancelled.load(), 2); + BOOST_TEST_EQ(inner_completed.load(), 0); + } else { + // If nested_when_any_task won (one of its inner tasks completed) + // one inner task completes, other gets cancelled + BOOST_TEST_EQ(inner_completed.load(), 1); + BOOST_TEST_EQ(inner_cancelled.load(), 1); + } + } + + //---------------------------------------------------------- + // Variant usage pattern tests + //---------------------------------------------------------- + + /** + * @brief Verifies accessing a when_any result variant by the reported index. + * + * Confirms that the index in a when_any_result_type + * correctly identifies which variant alternative is active and that the stored + * value matches the expected value for each alternative. + */ + void + testVariantAccessByIndex() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + bool correct_access = false; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + // The correct pattern: use index to determine which type to access + switch (r.first) { + case 0: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), 42); + break; + case 1: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), "hello"); + break; + case 2: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), 3.14); + break; + } + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"), []() -> task { co_return 3.14; }())); + + BOOST_TEST(completed); + BOOST_TEST(correct_access); + } + + /** + * @brief Tests that when_any deduplicates identical result types in the variant + * while preserving the task index to disambiguate which task completed. + * + * Sets up three int-returning tasks passed to when_any and awaits the result. + * Verifies the returned variant holds an `int` (deduplicated alternative) and + * the accompanying index identifies which of the original tasks completed. + * Also verifies completion and the expected value for a synchronous executor + * (the first task should win). + */ + void + testVariantDuplicateTypesIndexDisambiguation() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + // when_any(int, int, int) deduplicates to variant + // but winner_index tells us WHICH task won + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(100), returns_int(200), returns_int(300))); + + BOOST_TEST(completed); + // With synchronous executor, first task wins + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result_value, 100); + } + + /** + * @brief Executes the complete when_any test suite. + * + * Invokes every individual test case in a fixed sequence to validate + * functionality, exception handling, stop-token propagation, cancellation, + * nested combinators, edge cases, awaitable lifecycle, protocol compliance, + * and variant access behaviors. + */ + void + run() + { + // Basic functionality + testSingleTask(); + testTwoTasksFirstWins(); + testMixedTypes(); + testVoidTaskWins(); + testAllVoidTasks(); + + // Exception handling + testSingleTaskException(); + testExceptionWinsRace(); + testVoidTaskException(); + testMultipleExceptionsFirstWins(); + + // Stop token propagation + testStopRequestedOnCompletion(); + testAllTasksCompleteForCleanup(); + + // Parent stop token propagation + testParentStopAlreadyRequested(); + testParentStopDuringExecution(); + + // Long-lived task cancellation + testLongLivedTasksCancelledOnWinner(); + testSlowTaskCanWin(); + testNonCooperativeTasksStillComplete(); + testMixedCooperativeAndNonCooperativeTasks(); + + // Interleaved exceptions + testInterleavedExceptions(); + + // Nested combinators + testNestedWhenAny(); + testWhenAnyInsideWhenAll(); + testWhenAllInsideWhenAny(); + + // Nested stop propagation + testNestedStopPropagationOuterCancelled(); + testNestedStopPropagationInnerCancelled(); + + // Edge cases + testManyTasks(); + testTasksWithMultipleSteps(); + + // Awaitable lifecycle + testAwaitableMoveConstruction(); + testDeferredAwait(); + + // Protocol compliance + testIoAwaitableConcept(); + + // Variant access + testVariantAlternativePopulated(); + testVariantVisit(); + testVariantAccessByIndex(); + testVariantDuplicateTypesIndexDisambiguation(); + } +}; + +TEST_SUITE( + when_any_test, + "boost.capy.when_any"); + +//---------------------------------------------------------- +// Homogeneous when_any tests (vector overload) +//---------------------------------------------------------- + +struct when_any_vector_test +{ + //---------------------------------------------------------- + // Basic functionality tests + //---------------------------------------------------------- + + // Test: Single task in vector + void + testSingleTaskVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + std::size_t winner_index = 999; + + std::vector> tasks; + tasks.push_back(returns_int(42)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result, 42); + } + + /** + * @brief Tests when_any with a vector of int-returning tasks and verifies the reported winner. + * + * @details Constructs three tasks that return 10, 20, and 30 respectively, runs them with a + * test executor via when_any(std::vector>), and asserts that: + * - the combinator completes, + * - the reported winner index is within the vector range, + * - the returned value matches the expected mapping (index 0 -> 10, index 1 -> 20, index 2 -> 30). + */ + void + testMultipleTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + std::vector> tasks; + tasks.push_back(returns_int(10)); + tasks.push_back(returns_int(20)); + tasks.push_back(returns_int(30)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 3); + // Verify correct index-to-value mapping + BOOST_TEST_EQ(result_value, static_cast((winner_index + 1) * 10)); + } + + /** + * @brief Verifies that calling when_any with an empty vector of tasks results in an `std::invalid_argument` exception. + * + * Constructs an empty `std::vector>`, invokes `when_any` with it via `run_async`, and asserts that an + * `std::invalid_argument` is propagated to the error handler. + */ + void + testEmptyVectorThrows() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + + std::vector> tasks; + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (std::invalid_argument const&) { + caught_exception = true; + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + } + + /** + * @brief Tests that when_any on a vector of void tasks completes and reports a valid winner. + * + * Runs when_any over three void-returning tasks and verifies the continuation is invoked + * and the reported winner index is within the bounds of the task vector (0..2). + */ + void + testVoidTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + + std::vector> tasks; + tasks.push_back(void_task()); + tasks.push_back(void_task()); + tasks.push_back(void_task()); + + run_async(ex, + [&](std::size_t idx) { + completed = true; + winner_index = idx; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 3); + } + + //---------------------------------------------------------- + // Exception handling tests + //---------------------------------------------------------- + + /** + * @brief Verifies that an exception thrown by a task in a vector passed to `when_any` is propagated to the awaiter. + * + * Schedules a single `task` that throws `test_exception`, awaits `when_any` on a vector containing that task, + * and asserts the awaiter receives the exception with the expected message. + */ + void + testExceptionInVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(throws_exception("vector error")); + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "vector error"); + } + + /** + * @brief Verifies that an exception from a vector task wins the when_any race and is propagated. + * + * Constructs a vector of three tasks (one that throws a test_exception and two that return ints), + * runs a vector-based when_any, and asserts that the thrown test_exception is observed and its + * message equals "winner". + */ + void + testExceptionWinsRaceVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(throws_exception("winner")); + tasks.push_back(returns_int(42)); + tasks.push_back(returns_int(99)); + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "winner"); + } + + /** + * @brief Verifies exception propagation for a void-task in a vector-based when_any. + * + * Confirms that when_any executed on a vector of void tasks forwards an exception + * thrown by one of the tasks to the provided exception handler and that the + * original exception message is preserved. + * + * @details The test asserts that the exception handler is invoked and that the + * caught test_exception's what() equals "void vector error". + */ + void + testVoidExceptionInVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(void_throws_exception("void vector error")); + tasks.push_back(void_task()); + + run_async(ex, + [](std::size_t) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "void vector error"); + } + + //---------------------------------------------------------- + // Stop token propagation tests + //---------------------------------------------------------- + + /** + * @brief Verifies that all tasks in a vector are allowed to complete for cleanup after a winning task is selected. + * + * Runs a vector-based when_any where each task increments a shared counter on start and then returns. + * Asserts that the when_any completion handler runs and that every task in the vector has executed + * (completion counter equals the number of tasks). + */ + void + testAllTasksCompleteForCleanupVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&](int value) -> task { + ++completion_count; + co_return value; + }; + + std::vector> tasks; + tasks.push_back(counting_task(1)); + tasks.push_back(counting_task(2)); + tasks.push_back(counting_task(3)); + tasks.push_back(counting_task(4)); + + run_async(ex, + [&](std::pair) { + completed = true; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + // All four tasks must complete for proper cleanup + BOOST_TEST_EQ(completion_count.load(), 4); + } + + //---------------------------------------------------------- + // Long-lived task cancellation tests (vector) + //---------------------------------------------------------- + + // Test: Long-lived tasks cancelled on winner (vector) + void + testLongLivedTasksCancelledVector() + { + std::vector work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + auto fast_task = [&]() -> task { + ++completed_normally_count; + co_return 42; + }; + + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + std::vector> tasks; + tasks.push_back(fast_task()); + tasks.push_back(slow_task(100, 10)); + tasks.push_back(slow_task(200, 10)); + + run_async(ex, + [&](std::pair r) { + when_any_completed = true; + winner_index = r.first; + winner_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.erase(work_queue.begin()); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(winner_value, 42); + BOOST_TEST_EQ(completed_normally_count.load(), 1); + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + //---------------------------------------------------------- + // Large vector tests + //---------------------------------------------------------- + + /** + * @brief Verifies when_any over a vector of int tasks reports a valid winner and correct value mapping. + * + * Runs when_any on 20 tasks that return the integers 1 through 20 using a test_executor, + * and asserts that the operation completes, the reported winner index is within range, + * and the returned value equals `winner_index + 1`. + */ + void + testManyTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + std::vector> tasks; + for (int i = 1; i <= 20; ++i) + tasks.push_back(returns_int(i)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 20); + // Verify correct index-to-value mapping (index 0 -> value 1, etc.) + BOOST_TEST_EQ(result_value, static_cast(winner_index + 1)); + } + + //---------------------------------------------------------- + // Nested combinator tests + //---------------------------------------------------------- + + /** + * @brief Verifies nested vector-based when_any correctly reports the winning task's result. + * + * Creates two inner tasks that use when_any on a vector of int-producing tasks, then runs + * an outer when_any over those inner tasks and asserts the outer result completes and + * yields either 10 or 20. + */ + void + testNestedWhenAnyVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + + auto inner = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(10)); + tasks.push_back(returns_int(20)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + std::vector> outer_tasks; + outer_tasks.push_back(inner()); + outer_tasks.push_back(inner()); + + run_async(ex, + [&](std::pair r) { + completed = true; + result = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(outer_tasks))); + + BOOST_TEST(completed); + BOOST_TEST(result == 10 || result == 20); + } + + /** + * @brief Verifies that a vector-based when_any used inside when_all produces valid winners and values. + * + * Launches two identical asynchronous "race" tasks (each performing a when_any on a vector of int tasks) + * inside a when_all and asserts that each race completes with a winner whose value is either 1 or 2, + * and that the outer when_all completion handler is invoked. + */ + void + testWhenAnyVectorInsideWhenAll() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + auto race = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(1)); + tasks.push_back(returns_int(2)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + run_async(ex, + [&](std::tuple t) { + auto [a, b] = t; + completed = true; + BOOST_TEST((a == 1 || a == 2)); + BOOST_TEST((b == 1 || b == 2)); + }, + [](std::exception_ptr) {})( + when_all(race(), race())); + + BOOST_TEST(completed); + } + + //---------------------------------------------------------- + // Mixed variadic and vector tests + //---------------------------------------------------------- + + /** + * @brief Tests mixing variadic and vector overloads of `when_any` to verify winner selection and result mapping. + * + * This test launches two coroutines: one that races two `task` instances using the variadic + * `when_any` (producing 1 or 2) and one that races a `std::vector>` using the vector + * `when_any` (producing 3 or 4). It then races those two coroutines with an outer `when_any` + * and asserts that the reported winner index identifies which inner race completed and that the + * returned integer matches the expected value set for that inner race. The test also verifies + * the outer completion callback is invoked. + */ + void + testMixedVariadicAndVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t outer_winner = 999; + + auto variadic_race = []() -> task { + auto [idx, res] = co_await when_any(returns_int(1), returns_int(2)); + co_return std::get(res); + }; + + auto vector_race = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(3)); + tasks.push_back(returns_int(4)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + outer_winner = r.first; + auto result = std::get(r.second); + if (outer_winner == 0) + BOOST_TEST((result == 1 || result == 2)); + else + BOOST_TEST((result == 3 || result == 4)); + }, + [](std::exception_ptr) {})( + when_any(variadic_race(), vector_race())); + + BOOST_TEST(completed); + } + + /** + * @brief Executes the complete suite of vector-based when_any tests. + * + * Runs every test in when_any_vector_test in a fixed sequence, covering basic + * functionality, exception handling, stop-token propagation, long-lived task + * cancellation, large-vector behavior, nested combinators, and mixed variadic + * + vector scenarios. + */ + void + run() + { + // Basic functionality + testSingleTaskVector(); + testMultipleTasksVector(); + testEmptyVectorThrows(); + testVoidTasksVector(); + + // Exception handling + testExceptionInVector(); + testExceptionWinsRaceVector(); + testVoidExceptionInVector(); + + // Stop token propagation + testAllTasksCompleteForCleanupVector(); + + // Long-lived task cancellation + testLongLivedTasksCancelledVector(); + + // Large vectors + testManyTasksVector(); + + // Nested combinators + testNestedWhenAnyVector(); + testWhenAnyVectorInsideWhenAll(); + + // Mixed variadic and vector + testMixedVariadicAndVector(); + } +}; + +TEST_SUITE( + when_any_vector_test, + "boost.capy.when_any_vector"); + +} // capy +} // boost \ No newline at end of file