diff --git a/doc/modules/ROOT/nav.adoc b/doc/modules/ROOT/nav.adoc index 66d2d570..2bf64043 100644 --- a/doc/modules/ROOT/nav.adoc +++ b/doc/modules/ROOT/nav.adoc @@ -4,6 +4,7 @@ ** xref:coroutines/tasks.adoc[Tasks] ** xref:coroutines/launching.adoc[Launching Tasks] ** xref:coroutines/when-all.adoc[Concurrent Composition] +** xref:coroutines/when-any.adoc[Racing Tasks] ** xref:coroutines/affinity.adoc[Executor Affinity] ** xref:coroutines/cancellation.adoc[Cancellation] * Execution diff --git a/doc/modules/ROOT/pages/coroutines/when-all.adoc b/doc/modules/ROOT/pages/coroutines/when-all.adoc index 0072b3c2..21a0b342 100644 --- a/doc/modules/ROOT/pages/coroutines/when-all.adoc +++ b/doc/modules/ROOT/pages/coroutines/when-all.adoc @@ -242,7 +242,7 @@ Use `when_all` when: Do NOT use `when_all` when: * Operations depend on each other — use sequential `co_await` -* You need results as they complete — consider `when_any` (not yet available) +* You need only the first result — use xref:when-any.adoc[when_any] * Memory is constrained — concurrent tasks consume more memory == Summary @@ -269,6 +269,7 @@ Do NOT use `when_all` when: == Next Steps +* xref:when-any.adoc[Racing Tasks] — Return first completion with `when_any` * xref:cancellation.adoc[Cancellation] — Stop token propagation * xref:../execution/thread-pool.adoc[Thread Pool] — Multi-threaded execution * xref:affinity.adoc[Executor Affinity] — Control where tasks run diff --git a/doc/modules/ROOT/pages/coroutines/when-any.adoc b/doc/modules/ROOT/pages/coroutines/when-any.adoc new file mode 100644 index 00000000..82622b5d --- /dev/null +++ b/doc/modules/ROOT/pages/coroutines/when-any.adoc @@ -0,0 +1,376 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + += Racing Tasks + +This page explains how to race multiple tasks using `when_any`. + +NOTE: Code snippets assume `using namespace boost::capy;` is in effect. + +== The Problem + +Sometimes you need the result from whichever task finishes first, not all of +them. Common scenarios include: + +* Racing requests to multiple servers, using the first response +* Implementing timeouts by racing against a timer +* Speculative execution of multiple algorithms +* Waiting for first available resource from a pool + +== when_any + +The `when_any` function launches multiple tasks concurrently and returns when +the first one completes: + +[source,cpp] +---- +#include + +task race() +{ + auto [index, result] = co_await when_any( + fetch_from_primary(), + fetch_from_backup() + ); + // index is 0 or 1 (which task won) + // result contains the winner's value +} +---- + +The winning task's result is returned immediately. All sibling tasks receive +a stop request and are allowed to complete before `when_any` returns. + +== Return Value + +`when_any` returns a `std::pair` containing the winner's index and result. + +=== Heterogeneous Tasks (Variadic) + +When racing tasks with different return types, the result is a variant: + +[source,cpp] +---- +auto [index, result] = co_await when_any( + task_returning_int(), // task + task_returning_string() // task +); +// index is 0 or 1 +// result is std::variant + +if (index == 0) + std::cout << "Got int: " << std::get(result) << "\n"; +else + std::cout << "Got string: " << std::get(result) << "\n"; +---- + +=== Void Tasks + +Void tasks contribute `std::monostate` to the variant: + +[source,cpp] +---- +auto [index, result] = co_await when_any( + task_returning_int(), // task + task_void() // task +); +// result is std::variant + +if (index == 0) + std::cout << "Got int: " << std::get(result) << "\n"; +else + std::cout << "Void task completed\n"; +---- + +=== Duplicate Types + +The variant is deduplicated. When racing tasks with the same return type, +use the index to identify which task won: + +[source,cpp] +---- +auto [index, result] = co_await when_any( + fetch_from_server_a(), // task + fetch_from_server_b(), // task + fetch_from_server_c() // task +); +// result is std::variant (deduplicated) +// index tells you which server responded (0, 1, or 2) + +auto response = std::get(result); +std::cout << "Server " << index << " responded first\n"; +---- + +=== Homogeneous Tasks (Vector) + +For a dynamic number of tasks with the same type, use the vector overload: + +[source,cpp] +---- +std::vector> requests; +for (auto& server : servers) + requests.push_back(fetch_from(server)); + +auto [index, response] = co_await when_any(std::move(requests)); +// No variant needed - response is directly Response +std::cout << "Server " << index << " responded: " << response << "\n"; +---- + +The vector overload returns `std::pair` directly, without +a variant wrapper. + +For void tasks in a vector, only the index is returned: + +[source,cpp] +---- +std::vector> tasks; +// ... populate tasks + +std::size_t winner = co_await when_any(std::move(tasks)); +std::cout << "Task " << winner << " completed first\n"; +---- + +== Error Handling + +Exceptions are treated as valid completions. If the winning task throws, +that exception is rethrown from `when_any`: + +[source,cpp] +---- +task handle_errors() +{ + try { + auto [index, result] = co_await when_any( + might_fail(), + might_succeed() + ); + // If we get here, the winner succeeded + } catch (std::exception const& e) { + // The winning task threw this exception + std::cerr << "Winner failed: " << e.what() << "\n"; + } +} +---- + +=== First-Completion Semantics + +Unlike `when_all` (which captures the first _error_), `when_any` returns +whichever task completes first, whether it succeeds or fails. Exceptions +from non-winning tasks are discarded. + +=== Stop Propagation + +When a winner is determined, `when_any` requests stop for all sibling tasks. +Tasks that support cancellation can exit early: + +[source,cpp] +---- +task fetch_with_cancel_support() +{ + auto token = co_await get_stop_token(); + + for (auto& chunk : data_source) + { + if (token.stop_requested()) + co_return partial_response(); // Exit early + co_await send_chunk(chunk); + } + co_return complete_response(); +} + +task example() +{ + // When one fetch wins, the other sees stop_requested + auto [index, response] = co_await when_any( + fetch_with_cancel_support(), + fetch_with_cancel_support() + ); +} +---- + +Tasks that ignore the stop token will run to completion. `when_any` always +waits for all tasks to finish before returning, ensuring proper cleanup. + +== Parent Stop Token + +`when_any` forwards the parent's stop token to children. If the parent is +cancelled, all children see the request: + +[source,cpp] +---- +task parent() +{ + auto [index, result] = co_await when_any( + child_a(), // Sees parent's stop token + child_b() // Sees parent's stop token + ); +} + +std::stop_source source; +run_async(ex, source.get_token())(parent()); + +// Later: cancel everything +source.request_stop(); +---- + +== Execution Model + +All child tasks inherit the parent's executor affinity: + +[source,cpp] +---- +task parent() // Running on executor ex +{ + auto [index, result] = co_await when_any( + child_a(), // Runs on ex + child_b() // Runs on ex + ); +} +---- + +Children are launched via `dispatch()` on the executor, which may run them +inline or queue them depending on the executor implementation. + +=== True Concurrency + +With a multi-threaded executor, tasks race in parallel: + +[source,cpp] +---- +thread_pool pool(4); +run_async(pool.get_executor())(parent()); + +// Tasks may complete in any order based on actual execution time +---- + +With a single-threaded executor, tasks interleave at suspension points but +execute sequentially. + +== Example: Redundant Requests + +Race requests to multiple servers for reliability: + +[source,cpp] +---- +task fetch_with_redundancy(Request req) +{ + auto [index, response] = co_await when_any( + fetch_from(primary_server, req), + fetch_from(backup_server, req) + ); + + std::cout << (index == 0 ? "Primary" : "Backup") + << " server responded\n"; + co_return std::get(response); +} +---- + +== Example: Timeout Pattern + +Race an operation against a timer: + +[source,cpp] +---- +task fetch_with_timeout(Request req) +{ + auto [index, result] = co_await when_any( + fetch_data(req), + timeout_after(100ms) + ); + + if (index == 1) + throw timeout_error{"Request timed out"}; + + co_return std::get(result); +} + +// Helper that waits then throws +template +task timeout_after(std::chrono::milliseconds ms) +{ + co_await sleep(ms); + throw timeout_error{"Timeout"}; + co_return T{}; // Never reached +} +---- + +== Example: First Available Resource + +Wait for the first available connection from a pool: + +[source,cpp] +---- +task get_connection(std::vector& pools) +{ + std::vector> attempts; + for (auto& pool : pools) + attempts.push_back(pool.acquire()); + + auto [index, conn] = co_await when_any(std::move(attempts)); + + std::cout << "Got connection from pool " << index << "\n"; + co_return conn; +} +---- + +== Comparison with when_all + +[cols="1,2,2"] +|=== +| Aspect | `when_all` | `when_any` + +| Completion +| Waits for all tasks +| Returns on first completion + +| Return type +| Tuple of results +| Pair of (index, variant/value) + +| Error handling +| First exception wins, siblings get stop +| Exceptions are valid completions + +| Use case +| Need all results +| Need fastest result +|=== + +== Summary + +[cols="1,3"] +|=== +| Feature | Description + +| `when_any(tasks...)` +| Race tasks, return first completion + +| `when_any(vector>)` +| Race homogeneous tasks from a vector + +| Return type (variadic) +| `pair>` with deduplicated types + +| Return type (vector) +| `pair` or `size_t` for void + +| Error handling +| Winner's exception propagated, others discarded + +| Stop propagation +| Siblings receive stop request on winner + +| Cleanup +| All tasks complete before returning +|=== + +== Next Steps + +* xref:when-all.adoc[Concurrent Composition] — Wait for all tasks +* xref:cancellation.adoc[Cancellation] — Stop token propagation +* xref:../execution/thread-pool.adoc[Thread Pool] — Multi-threaded execution diff --git a/include/boost/capy.hpp b/include/boost/capy.hpp index 053adf57..3efc1124 100644 --- a/include/boost/capy.hpp +++ b/include/boost/capy.hpp @@ -46,5 +46,6 @@ #include #include #include +#include #endif diff --git a/include/boost/capy/when_all.hpp b/include/boost/capy/when_all.hpp index 0519838e..cd320208 100644 --- a/include/boost/capy/when_all.hpp +++ b/include/boost/capy/when_all.hpp @@ -322,7 +322,11 @@ class when_all_launcher state_->stop_source_.request_stop(); } - // Launch all tasks concurrently + // CRITICAL: If the last task finishes synchronously then the parent + // coroutine resumes, destroying its frame, and destroying this object + // prior to the completion of await_suspend. Therefore, await_suspend + // must ensure `this` cannot be referenced after calling `launch_one` + // for the last time. auto token = state_->stop_source_.get_token(); [&](std::index_sequence) { (..., launch_one(caller_ex, token)); diff --git a/include/boost/capy/when_any.hpp b/include/boost/capy/when_any.hpp new file mode 100644 index 00000000..a2527f81 --- /dev/null +++ b/include/boost/capy/when_any.hpp @@ -0,0 +1,1610 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_WHEN_ANY_HPP +#define BOOST_CAPY_WHEN_ANY_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * when_any - Race multiple tasks, return first completion + * ======================================================== + * + * OVERVIEW: + * --------- + * when_any launches N tasks concurrently and completes when the FIRST task + * finishes (success or failure). It then requests stop for all siblings and + * waits for them to acknowledge before returning. + * + * ARCHITECTURE: + * ------------- + * The design mirrors when_all but with inverted completion semantics: + * + * when_all: complete when remaining_count reaches 0 (all done) + * when_any: complete when has_winner becomes true (first done) + * BUT still wait for remaining_count to reach 0 for cleanup + * + * Key components: + * - when_any_state: Shared state tracking winner and completion + * - when_any_runner: Wrapper coroutine for each child task + * - when_any_launcher: Awaitable that starts all runners concurrently + * + * CRITICAL INVARIANTS: + * -------------------- + * 1. Exactly one task becomes the winner (via atomic compare_exchange) + * 2. All tasks must complete before parent resumes (cleanup safety) + * 3. Stop is requested immediately when winner is determined + * 4. Only the winner's result/exception is stored + * + * TYPE DEDUPLICATION: + * ------------------- + * std::variant requires unique alternative types. Since when_any can race + * tasks with identical return types (e.g., three task), we must + * deduplicate types before constructing the variant. + * + * Example: when_any(task, task, task) + * - Raw types after void->monostate: int, string, int + * - Deduplicated variant: std::variant + * - Return: pair> + * + * The winner_index tells you which task won (0, 1, or 2), while the variant + * holds the result. Use the index to determine how to interpret the variant. + * + * VOID HANDLING: + * -------------- + * void tasks contribute std::monostate to the variant (then deduplicated). + * All-void tasks result in: pair> + * + * MEMORY MODEL: + * ------------- + * Synchronization chain from winner's write to parent's read: + * + * 1. Winner thread writes result_/winner_exception_ (non-atomic) + * 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_ + * 3. Last task thread (may be winner or non-winner) calls signal_completion() + * → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0 + * 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer + * 5. Parent coroutine resumes and reads result_/winner_exception_ + * + * Synchronization analysis: + * - All fetch_sub operations on remaining_count_ form a release sequence + * - Winner's fetch_sub releases; subsequent fetch_sub operations participate + * in the modification order of remaining_count_ + * - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the + * modification order, establishing happens-before from winner's writes + * - Executor dispatch() is expected to provide queue-based synchronization + * (release-on-post, acquire-on-execute) completing the chain to parent + * - Even inline executors work (same thread = sequenced-before) + * + * Alternative considered: Adding winner_ready_ atomic (set with release after + * storing winner data, acquired before reading) would make synchronization + * self-contained and not rely on executor implementation details. Current + * approach is correct but requires careful reasoning about release sequences + * and executor behavior. + * + * EXCEPTION SEMANTICS: + * -------------------- + * Unlike when_all (which captures first exception, discards others), when_any + * treats exceptions as valid completions. If the winning task threw, that + * exception is rethrown. Exceptions from non-winners are silently discarded. + */ + +namespace boost { +namespace capy { + +namespace detail { + +/** Convert void to monostate for variant storage. + + std::variant is ill-formed, so void tasks contribute + std::monostate to the result variant instead. Non-void types + pass through unchanged. + + @tparam T The type to potentially convert (void becomes monostate). +*/ +template +using void_to_monostate_t = std::conditional_t, std::monostate, T>; + +/** Type deduplication for variant construction. + + std::variant requires unique alternative types. These metafunctions + deduplicate a type list while preserving order of first occurrence. + + @par Algorithm + Fold left over the type list, appending each type to the accumulator + only if not already present. O(N^2) in number of types but N is + typically small (number of when_any arguments). +*/ +/** Primary template for appending a type to a variant if not already present. + + @tparam Variant The accumulated variant type. + @tparam T The type to potentially append. +*/ +template +struct variant_append_if_unique; + +/** Specialization that checks for type uniqueness and appends if needed. + + @tparam Vs Types already in the variant. + @tparam T The type to potentially append. +*/ +template +struct variant_append_if_unique, T> +{ + /** Result type: original variant if T is duplicate, extended variant otherwise. */ + using type = std::conditional_t< + (std::is_same_v || ...), + std::variant, + std::variant>; +}; + +/** Primary template for type list deduplication. + + @tparam Accumulated The variant accumulating unique types. + @tparam Remaining Types still to be processed. +*/ +template +struct deduplicate_impl; + +/** Base case: no more types to process. + + @tparam Accumulated The final deduplicated variant type. +*/ +template +struct deduplicate_impl +{ + /** The final deduplicated variant type. */ + using type = Accumulated; +}; + +/** Recursive case: add T if unique, then process rest. + + @tparam Accumulated The variant accumulated so far. + @tparam T The current type to potentially add. + @tparam Rest Remaining types to process. +*/ +template +struct deduplicate_impl +{ + /** Intermediate type after potentially appending T. */ + using next = typename variant_append_if_unique::type; + + /** Final result after processing all remaining types. */ + using type = typename deduplicate_impl::type; +}; + +/** Deduplicated variant from a list of types. + + Constructs a std::variant containing unique types from the input list. + Void types are converted to std::monostate before deduplication. + The first type T0 seeds the accumulator, ensuring the variant is well-formed. + + @tparam T0 First result type (required, seeds the deduplication). + @tparam Ts Remaining result types (void is converted to monostate). +*/ +template +using unique_variant_t = typename deduplicate_impl< + std::variant>, + void_to_monostate_t...>::type; + +/** Result type for when_any: (winner_index, deduplicated_variant). + + The first element is the zero-based index of the winning task in the + original argument order. The second element is a variant holding the + winner's result by type. When multiple tasks share the same return type, + use the index to determine which task actually won. + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. +*/ +template +using when_any_result_t = std::pair>; + +/** Shared state for when_any operation. + + Coordinates winner selection, result storage, and completion tracking + for all child tasks in a when_any operation. + + @par Lifetime + Allocated on the parent coroutine's frame, outlives all runners. + + @par Thread Safety + Atomic operations protect winner selection and completion count. + Result storage is written only by the winner before any concurrent access. + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. +*/ +template +struct when_any_state +{ + /** Total number of tasks being raced. */ + static constexpr std::size_t task_count = 1 + sizeof...(Ts); + + /** Deduplicated variant type for storing the winner's result. */ + using variant_type = unique_variant_t; + + /** Counter for tasks still running. + + Must wait for ALL tasks to finish before parent resumes; this ensures + runner coroutine frames are valid until their final_suspend completes. + */ + std::atomic remaining_count_; + + /** Flag indicating whether a winner has been determined. + + Winner selection: exactly one task wins via atomic CAS on has_winner_. + winner_index_ is written only by the winner, read after all complete. + */ + std::atomic has_winner_{false}; + + /** Index of the winning task in the original argument list. */ + std::size_t winner_index_{0}; + + /** Storage for the winner's result value. + + Result storage: deduplicated variant. Stored by type, not task index, + because multiple tasks may share the same return type. + */ + std::optional< variant_type > result_; + + /** Exception thrown by the winner, if any. + + Non-null if winner threw (rethrown to caller after all tasks complete). + */ + std::exception_ptr winner_exception_; + + /** Handles to runner coroutines for cleanup. + + Runner coroutine handles; destroyed in destructor after all complete. + */ + std::array runner_handles_{}; + + /** Stop source for cancelling sibling tasks. + + Owned stop_source: request_stop() called when winner determined. + */ + std::stop_source stop_source_; + + /** Callback functor that forwards stop requests. + + Forwards parent's stop requests to our stop_source, enabling + cancellation to propagate from caller through when_any to children. + */ + struct stop_callback_fn + { + /** Pointer to the stop source to signal. */ + std::stop_source* source_; + + /** Invoke the stop request on the source. */ + void operator()() const noexcept { source_->request_stop(); } + }; + + /** Type alias for the stop callback registration. */ + using stop_callback_t = std::stop_callback; + + /** Optional callback linking parent's stop token to our stop source. */ + std::optional parent_stop_callback_; + + /** Parent coroutine handle to resume when all children complete. */ + coro continuation_; + + /** Executor reference for dispatching the parent resumption. */ + executor_ref caller_ex_; + + /** Construct state for racing task_count tasks. + + Initializes remaining_count_ to task_count so all tasks must complete + before the parent coroutine resumes. + */ + when_any_state() + : remaining_count_(task_count) + { + } + + /** Destroy state and clean up runner coroutine handles. + + All runners must have completed before destruction (guaranteed by + waiting for remaining_count_ to reach zero). + */ + ~when_any_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + /** Attempt to become the winner. + + Atomically claims winner status. Exactly one task succeeds; all others + see false. The winner must store its result before returning. + + @param index The task's index in the original argument list. + @return true if this task is now the winner, false if another won first. + */ + bool try_win(std::size_t index) noexcept + { + bool expected = false; + if(has_winner_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + { + winner_index_ = index; + // Signal siblings to exit early if they support cancellation + stop_source_.request_stop(); + return true; + } + return false; + } + + /** Store the winner's result. + + @pre Only called by the winner (try_win returned true). + @note Uses type-based emplacement because the variant is deduplicated; + task index may differ from variant alternative index. + */ + template + void set_winner_result(T value) + noexcept(std::is_nothrow_move_constructible_v) + { + result_.emplace(std::in_place_type, std::move(value)); + } + + /** Store the winner's void completion as monostate. + + @pre Only called by the winner of a void-returning task. + */ + void set_winner_void() noexcept + { + result_.emplace(std::in_place_type, std::monostate{}); + } + + /** Store the winner's exception. + + @pre Only called by the winner (try_win returned true). + */ + void set_winner_exception(std::exception_ptr ep) noexcept + { + winner_exception_ = ep; + } + + /** Signal that a task has completed (success, failure, or cancelled). + + Called by every runner at final_suspend. The last one to complete + resumes the parent coroutine. This ensures all child coroutine + frames are destroyed before the parent continues. + + @return Coroutine to resume (parent if last, noop otherwise). + */ + coro signal_completion() noexcept + { + auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel); + if(remaining == 1) + return caller_ex_.dispatch(continuation_); + return std::noop_coroutine(); + } +}; + +/** Wrapper coroutine that runs a single child task for when_any. + + Each child task is wrapped in a runner that: + 1. Propagates executor and stop_token to the child + 2. Attempts to claim winner status on completion + 3. Stores result only if this runner won + 4. Signals completion regardless of win/loss (for cleanup) + + @tparam T The result type of the wrapped task. + @tparam Ts All task result types (for when_any_state compatibility). +*/ +template +struct when_any_runner +{ + /** Promise type for the runner coroutine. + + Manages executor propagation, stop token forwarding, and completion + signaling for the wrapped child task. + */ + struct promise_type : frame_allocating_base + { + /** Pointer to shared state for winner coordination. */ + when_any_state* state_ = nullptr; + + /** Index of this task in the original argument list. */ + std::size_t index_ = 0; + + /** Executor reference inherited from the parent coroutine. */ + executor_ref ex_; + + /** Stop token for cooperative cancellation. */ + std::stop_token stop_token_; + + /** Create the runner coroutine object from this promise. + + @return Runner coroutine wrapping this promise's coroutine handle. + */ + when_any_runner get_return_object() + { + return when_any_runner(std::coroutine_handle::from_promise(*this)); + } + + /** Suspend immediately on creation. + + Runner coroutines start suspended; the launcher resumes them + after setting up state_, index_, ex_, and stop_token_. + + @return Always suspends. + */ + std::suspend_always initial_suspend() noexcept + { + return {}; + } + + /** Final suspend awaiter that signals completion to shared state. + + @return Custom awaiter that calls signal_completion(). + */ + auto final_suspend() noexcept + { + /** Awaiter that signals task completion and potentially resumes parent. */ + struct awaiter + { + /** Pointer to the promise for accessing shared state. */ + promise_type* p_; + + /** Never ready; always suspend to signal completion. + + @return Always false. + */ + bool await_ready() const noexcept + { + return false; + } + + /** Signal completion and return next coroutine to resume. + + @return Parent coroutine if this was the last task, noop otherwise. + */ + coro await_suspend(coro) noexcept + { + return p_->state_->signal_completion(); + } + + /** No-op resume; coroutine is destroyed after final suspend. */ + void await_resume() const noexcept + { + } + }; + return awaiter{this}; + } + + /** Called when runner coroutine body completes normally. + + The actual result handling is done in make_when_any_runner; + this just satisfies the coroutine return requirement. + */ + void return_void() + { + } + + /** Handle exceptions thrown by the child task. + + Exceptions are valid completions in when_any (unlike when_all). + If this exception wins, it will be rethrown to the caller. + If another task already won, this exception is discarded. + */ + void unhandled_exception() + { + if(state_->try_win(index_)) + state_->set_winner_exception(std::current_exception()); + } + + /** Awaiter wrapper that injects executor and stop token into child awaitables. + + @tparam Awaitable The underlying awaitable type being wrapped. + */ + template + struct transform_awaiter + { + /** The wrapped awaitable instance. */ + std::decay_t a_; + + /** Pointer to promise for accessing executor and stop token. */ + promise_type* p_; + + /** Check if the underlying awaitable is ready. + + @return True if awaitable can complete synchronously. + */ + bool await_ready() + { + return a_.await_ready(); + } + + /** Get the result from the underlying awaitable. + + @return The awaitable's result value. + */ + auto await_resume() + { + return a_.await_resume(); + } + + /** Suspend with executor and stop token injection. + + @tparam Promise The suspending coroutine's promise type. + @param h Handle to the suspending coroutine. + @return Coroutine to resume or void. + */ + template + auto await_suspend(std::coroutine_handle h) + { + return a_.await_suspend(h, p_->ex_, p_->stop_token_); + } + }; + + /** Transform awaitables to inject executor and stop token. + + @tparam Awaitable The awaitable type being co_awaited. + @param a The awaitable instance. + @return Transformed awaiter with executor/stop_token injection. + */ + template + auto await_transform(Awaitable&& a) + { + using A = std::decay_t; + if constexpr (IoAwaitable) + { + return transform_awaiter{ + std::forward(a), this}; + } + else + { + return make_affine(std::forward(a), ex_); + } + } + }; + + /** Handle to the underlying coroutine frame. */ + std::coroutine_handle h_; + + /** Construct runner from a coroutine handle. + + @param h Handle to the runner coroutine frame. + */ + explicit when_any_runner(std::coroutine_handle h) noexcept + : h_(h) + { + } + + /** Move constructor (Clang 14 workaround). + + Clang 14 (non-Apple) has a coroutine codegen bug requiring explicit + move constructor; other compilers work correctly with deleted move. + */ +#if defined(__clang__) && __clang_major__ == 14 && !defined(__apple_build_version__) + when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {} +#endif + + /** Copy construction is not allowed. */ + when_any_runner(when_any_runner const&) = delete; + + /** Copy assignment is not allowed. */ + when_any_runner& operator=(when_any_runner const&) = delete; + + /** Move construction is deleted (except on Clang 14). */ +#if !defined(__clang__) || __clang_major__ != 14 || defined(__apple_build_version__) + when_any_runner(when_any_runner&&) = delete; +#endif + + /** Move assignment is not allowed. */ + when_any_runner& operator=(when_any_runner&&) = delete; + + /** Release ownership of the coroutine handle. + + @return The coroutine handle; this object becomes empty. + */ + auto release() noexcept + { + return std::exchange(h_, nullptr); + } +}; + +/** Create a runner coroutine for a single task in when_any. + + Factory function that creates a wrapper coroutine for a child task. + The runner handles executor/stop_token propagation and winner selection. + + @tparam Index Compile-time index of this task in the argument list. + @tparam T The result type of the task being wrapped. + @tparam Ts All task result types (for when_any_state compatibility). + @param inner The task to run (will be moved from). + @param state Shared state for winner coordination. + @return Runner coroutine (must be started via resume()). +*/ +template +when_any_runner +make_when_any_runner(task inner, when_any_state* state) +{ + if constexpr (std::is_void_v) + { + co_await std::move(inner); + if(state->try_win(Index)) + state->set_winner_void(); // noexcept + } + else + { + auto result = co_await std::move(inner); + if(state->try_win(Index)) + { + try + { + state->set_winner_result(std::move(result)); + } + catch(...) + { + state->set_winner_exception(std::current_exception()); + } + } + } +} + +/** Awaitable that launches all runner coroutines concurrently. + + Handles the tricky lifetime issue where tasks may complete synchronously + during launch, potentially destroying this awaitable's frame before + all tasks are extracted from the tuple. See await_suspend for details. + + @tparam Ts The result types of the tasks being launched. +*/ +template +class when_any_launcher +{ + /** Pointer to tuple of tasks to launch. */ + std::tuple...>* tasks_; + + /** Pointer to shared state for coordination. */ + when_any_state* state_; + +public: + /** Construct launcher with task tuple and shared state. + + @param tasks Pointer to tuple of tasks (must outlive the await). + @param state Pointer to shared state for winner coordination. + */ + when_any_launcher( + std::tuple...>* tasks, + when_any_state* state) + : tasks_(tasks) + , state_(state) + { + } + + /** Check if the launcher can complete synchronously. + + @return True only if there are no tasks (degenerate case). + */ + bool await_ready() const noexcept + { + return sizeof...(Ts) == 0; + } + + /** Launch all runner coroutines and suspend the parent. + + Sets up stop propagation from parent to children, then launches + each task in a runner coroutine. Returns noop_coroutine because + runners resume the parent via signal_completion(). + + CRITICAL: If the last task finishes synchronously then the parent + coroutine resumes, destroying its frame, and destroying this object + prior to the completion of await_suspend. Therefore, await_suspend + must ensure `this` cannot be referenced after calling `launch_one` + for the last time. + + @tparam Ex The executor type. + @param continuation Handle to the parent coroutine to resume later. + @param caller_ex Executor for dispatching child coroutines. + @param parent_token Stop token from the parent for cancellation propagation. + @return noop_coroutine; parent is resumed by the last completing task. + */ + template + coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {}) + { + state_->continuation_ = continuation; + state_->caller_ex_ = caller_ex; + + // Forward parent's stop requests to children + if(parent_token.stop_possible()) + { + state_->parent_stop_callback_.emplace( + parent_token, + typename when_any_state::stop_callback_fn{&state_->stop_source_}); + + if(parent_token.stop_requested()) + state_->stop_source_.request_stop(); + } + + auto token = state_->stop_source_.get_token(); + [&](std::index_sequence) { + (..., launch_one(caller_ex, token)); + }(std::index_sequence_for{}); + + return std::noop_coroutine(); + } + + /** Resume after all tasks complete. + + No return value; results are accessed via the shared state. + */ + void await_resume() const noexcept + { + } + +private: + /** Launch a single runner coroutine for task at index I. + + Creates the runner, configures its promise with state and executor, + stores its handle for cleanup, and dispatches it for execution. + + @tparam I Compile-time index of the task in the tuple. + @tparam Ex The executor type. + @param caller_ex Executor for dispatching the runner. + @param token Stop token for cooperative cancellation. + + @pre Ex::dispatch() and coro::resume() must not throw. If they do, + the coroutine handle may leak. + */ + template + void launch_one(Ex const& caller_ex, std::stop_token token) + { + auto runner = make_when_any_runner( + std::move(std::get(*tasks_)), state_); + + auto h = runner.release(); + h.promise().state_ = state_; + h.promise().index_ = I; + h.promise().ex_ = caller_ex; + h.promise().stop_token_ = token; + + coro ch{h}; + state_->runner_handles_[I] = ch; + caller_ex.dispatch(ch).resume(); + } +}; + +} // namespace detail + +/** Wait for the first task to complete. + + Races multiple heterogeneous tasks concurrently and returns when the + first one completes. The result includes the winner's index and a + deduplicated variant containing the result value. + + @par Example + @code + task example() { + auto [index, result] = co_await when_any( + fetch_from_primary(), // task + fetch_from_backup() // task + ); + // index is 0 or 1, result holds the winner's Response + auto response = std::get(result); + } + @endcode + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. + @param task0 The first task to race. + @param tasks Additional tasks to race concurrently. + @return A task yielding a pair of (winner_index, result_variant). + + @par Key Features + @li All tasks are launched concurrently + @li Returns when first task completes (success or failure) + @li Stop is requested for all siblings + @li Waits for all siblings to complete before returning + @li If winner threw, that exception is rethrown + @li Void tasks contribute std::monostate to the variant +*/ +template +[[nodiscard]] task> +when_any(task task0, task... tasks) +{ + using result_type = detail::when_any_result_t; + + detail::when_any_state state; + std::tuple, task...> task_tuple(std::move(task0), std::move(tasks)...); + + co_await detail::when_any_launcher(&task_tuple, &state); + + if(state.winner_exception_) + std::rethrow_exception(state.winner_exception_); + + co_return result_type{state.winner_index_, std::move(*state.result_)}; +} + +/** Alias for when_any result type, useful for declaring callback signatures. + + Provides a convenient public alias for the internal result type. + The result is a pair containing the winner's index and a deduplicated + variant holding the result value. + + @par Example + @code + void on_complete(when_any_result_type result); + @endcode + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. +*/ +template +using when_any_result_type = detail::when_any_result_t; + +namespace detail { + +/** Shared state for homogeneous when_any (vector overload). + + Simpler than the heterogeneous version: uses std::optional instead + of variant, and std::vector instead of std::array for runner handles. + + @tparam T The common result type of all tasks. +*/ +template +struct when_any_homogeneous_state +{ + /** Counter for tasks still running. + + Completion tracking - must wait for ALL tasks for proper cleanup. + */ + std::atomic remaining_count_; + + /** Total number of tasks being raced. */ + std::size_t task_count_; + + /** Flag indicating whether a winner has been determined. + + Winner tracking - first task to complete claims this. + */ + std::atomic has_winner_{false}; + + /** Index of the winning task in the vector. */ + std::size_t winner_index_{0}; + + /** Storage for the winner's result value. + + Result storage - simple value, no variant needed. + */ + std::optional result_; + + /** Exception thrown by the winner, if any. */ + std::exception_ptr winner_exception_; + + /** Handles to runner coroutines for cleanup. + + Runner handles - destroyed in destructor. + */ + std::vector runner_handles_; + + /** Stop source for cancelling sibling tasks. + + Stop propagation - requested when winner is found. + */ + std::stop_source stop_source_; + + /** Callback functor that forwards stop requests. + + Connects parent's stop_token to our stop_source. + */ + struct stop_callback_fn + { + /** Pointer to the stop source to signal. */ + std::stop_source* source_; + + /** Invoke the stop request on the source. */ + void operator()() const noexcept { source_->request_stop(); } + }; + + /** Type alias for the stop callback registration. */ + using stop_callback_t = std::stop_callback; + + /** Optional callback linking parent's stop token to our stop source. */ + std::optional parent_stop_callback_; + + /** Parent coroutine handle to resume when all children complete. */ + coro continuation_; + + /** Executor reference for dispatching the parent resumption. */ + executor_ref caller_ex_; + + /** Construct state for racing the given number of tasks. + + @param count Number of tasks to race. + */ + explicit when_any_homogeneous_state(std::size_t count) + : remaining_count_(count) + , task_count_(count) + , runner_handles_(count) + { + } + + /** Destroy state and clean up runner coroutine handles. + + All runners must have completed before destruction. + */ + ~when_any_homogeneous_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + /** Attempt to become the winner. + + @param index The task's index in the vector. + @return true if this task is now the winner, false if another won first. + */ + bool try_win(std::size_t index) noexcept + { + bool expected = false; + if(has_winner_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + { + winner_index_ = index; + stop_source_.request_stop(); + return true; + } + return false; + } + + /** Store the winner's result. + + @pre Only called by the winner (try_win returned true). + */ + void set_winner_result(T value) + noexcept(std::is_nothrow_move_constructible_v) + { + result_.emplace(std::move(value)); + } + + /** Store the winner's exception. + + @pre Only called by the winner (try_win returned true). + */ + void set_winner_exception(std::exception_ptr ep) noexcept + { + winner_exception_ = ep; + } + + /** Signal task completion; last one resumes the parent. + + @return Coroutine to resume (parent if last, noop otherwise). + */ + coro signal_completion() noexcept + { + auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel); + if(remaining == 1) + return caller_ex_.dispatch(continuation_); + return std::noop_coroutine(); + } +}; + +/** Specialization for void tasks (no result storage needed). + + When racing void-returning tasks, there is no result value to store. + Only the winner's index and any exception are tracked. +*/ +template<> +struct when_any_homogeneous_state +{ + /** Counter for tasks still running. */ + std::atomic remaining_count_; + + /** Total number of tasks being raced. */ + std::size_t task_count_; + + /** Flag indicating whether a winner has been determined. */ + std::atomic has_winner_{false}; + + /** Index of the winning task in the vector. */ + std::size_t winner_index_{0}; + + /** Exception thrown by the winner, if any. */ + std::exception_ptr winner_exception_; + + /** Handles to runner coroutines for cleanup. */ + std::vector runner_handles_; + + /** Stop source for cancelling sibling tasks. */ + std::stop_source stop_source_; + + /** Callback functor that forwards stop requests. */ + struct stop_callback_fn + { + /** Pointer to the stop source to signal. */ + std::stop_source* source_; + + /** Invoke the stop request on the source. */ + void operator()() const noexcept { source_->request_stop(); } + }; + + /** Type alias for the stop callback registration. */ + using stop_callback_t = std::stop_callback; + + /** Optional callback linking parent's stop token to our stop source. */ + std::optional parent_stop_callback_; + + /** Parent coroutine handle to resume when all children complete. */ + coro continuation_; + + /** Executor reference for dispatching the parent resumption. */ + executor_ref caller_ex_; + + /** Construct state for racing the given number of void tasks. + + @param count Number of tasks to race. + */ + explicit when_any_homogeneous_state(std::size_t count) + : remaining_count_(count) + , task_count_(count) + , runner_handles_(count) + { + } + + /** Destroy state and clean up runner coroutine handles. */ + ~when_any_homogeneous_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + /** Attempt to become the winner. + + @param index The task's index in the vector. + @return true if this task is now the winner, false if another won first. + */ + bool try_win(std::size_t index) noexcept + { + bool expected = false; + if(has_winner_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + { + winner_index_ = index; + stop_source_.request_stop(); + return true; + } + return false; + } + + /** Store the winner's exception. + + @pre Only called by the winner (try_win returned true). + */ + void set_winner_exception(std::exception_ptr ep) noexcept + { + winner_exception_ = ep; + } + + /** Signal task completion; last one resumes the parent. + + @return Coroutine to resume (parent if last, noop otherwise). + */ + coro signal_completion() noexcept + { + auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel); + if(remaining == 1) + return caller_ex_.dispatch(continuation_); + return std::noop_coroutine(); + } +}; + +/** Wrapper coroutine for homogeneous when_any tasks (vector overload). + + Same role as when_any_runner but uses a runtime index instead of + a compile-time index, allowing it to work with vectors of tasks. + + @tparam T The common result type of all tasks. +*/ +template +struct when_any_homogeneous_runner +{ + /** Promise type for the homogeneous runner coroutine. + + Manages executor propagation, stop token forwarding, and completion + signaling for the wrapped child task. + */ + struct promise_type : frame_allocating_base + { + /** Pointer to shared state for winner coordination. */ + when_any_homogeneous_state* state_ = nullptr; + + /** Runtime index of this task in the vector. */ + std::size_t index_ = 0; + + /** Executor reference inherited from the parent coroutine. */ + executor_ref ex_; + + /** Stop token for cooperative cancellation. */ + std::stop_token stop_token_; + + /** Create the runner coroutine object from this promise. + + @return Runner coroutine wrapping this promise's coroutine handle. + */ + when_any_homogeneous_runner get_return_object() + { + return when_any_homogeneous_runner( + std::coroutine_handle::from_promise(*this)); + } + + /** Suspend immediately on creation. + + Runner coroutines start suspended; the launcher resumes them + after setting up state_, index_, ex_, and stop_token_. + + @return Always suspends. + */ + std::suspend_always initial_suspend() noexcept + { + return {}; + } + + /** Final suspend awaiter that signals completion to shared state. + + @return Custom awaiter that calls signal_completion(). + */ + auto final_suspend() noexcept + { + /** Awaiter that signals task completion and potentially resumes parent. */ + struct awaiter + { + /** Pointer to the promise for accessing shared state. */ + promise_type* p_; + + /** Never ready; always suspend to signal completion. + + @return Always false. + */ + bool await_ready() const noexcept + { + return false; + } + + /** Signal completion and return next coroutine to resume. + + @return Parent coroutine if this was the last task, noop otherwise. + */ + coro await_suspend(coro) noexcept + { + return p_->state_->signal_completion(); + } + + /** No-op resume; coroutine is destroyed after final suspend. */ + void await_resume() const noexcept + { + } + }; + return awaiter{this}; + } + + /** Called when runner coroutine body completes normally. + + The actual result handling is done in make_when_any_homogeneous_runner; + this just satisfies the coroutine return requirement. + */ + void return_void() + { + } + + /** Handle exceptions thrown by the child task. + + Exceptions are valid completions in when_any. If this exception wins, + it will be rethrown to the caller. If another task already won, + this exception is discarded. + */ + void unhandled_exception() + { + if(state_->try_win(index_)) + state_->set_winner_exception(std::current_exception()); + } + + /** Awaiter wrapper that injects executor and stop token into child awaitables. + + @tparam Awaitable The underlying awaitable type being wrapped. + */ + template + struct transform_awaiter + { + /** The wrapped awaitable instance. */ + std::decay_t a_; + + /** Pointer to promise for accessing executor and stop token. */ + promise_type* p_; + + /** Check if the underlying awaitable is ready. + + @return True if awaitable can complete synchronously. + */ + bool await_ready() + { + return a_.await_ready(); + } + + /** Get the result from the underlying awaitable. + + @return The awaitable's result value. + */ + auto await_resume() + { + return a_.await_resume(); + } + + /** Suspend with executor and stop token injection. + + @tparam Promise The suspending coroutine's promise type. + @param h Handle to the suspending coroutine. + @return Coroutine to resume or void. + */ + template + auto await_suspend(std::coroutine_handle h) + { + return a_.await_suspend(h, p_->ex_, p_->stop_token_); + } + }; + + /** Transform awaitables to inject executor and stop token. + + @tparam Awaitable The awaitable type being co_awaited. + @param a The awaitable instance. + @return Transformed awaiter with executor/stop_token injection. + */ + template + auto await_transform(Awaitable&& a) + { + using A = std::decay_t; + if constexpr (IoAwaitable) + { + return transform_awaiter{ + std::forward(a), this}; + } + else + { + return make_affine(std::forward(a), ex_); + } + } + }; + + /** Handle to the underlying coroutine frame. */ + std::coroutine_handle h_; + + /** Construct runner from a coroutine handle. + + @param h Handle to the runner coroutine frame. + */ + explicit when_any_homogeneous_runner(std::coroutine_handle h) noexcept + : h_(h) + { + } + + /** Move constructor (Clang 14 workaround). + + Clang 14 (non-Apple) has a coroutine codegen bug requiring explicit + move constructor; other compilers work correctly with deleted move. + */ +#if defined(__clang__) && __clang_major__ == 14 && !defined(__apple_build_version__) + when_any_homogeneous_runner(when_any_homogeneous_runner&& other) noexcept + : h_(std::exchange(other.h_, nullptr)) {} +#endif + + /** Copy construction is not allowed. */ + when_any_homogeneous_runner(when_any_homogeneous_runner const&) = delete; + + /** Copy assignment is not allowed. */ + when_any_homogeneous_runner& operator=(when_any_homogeneous_runner const&) = delete; + + /** Move construction is deleted (except on Clang 14). */ +#if !defined(__clang__) || __clang_major__ != 14 || defined(__apple_build_version__) + when_any_homogeneous_runner(when_any_homogeneous_runner&&) = delete; +#endif + + /** Move assignment is not allowed. */ + when_any_homogeneous_runner& operator=(when_any_homogeneous_runner&&) = delete; + + /** Release ownership of the coroutine handle. + + @return The coroutine handle; this object becomes empty. + */ + auto release() noexcept + { + return std::exchange(h_, nullptr); + } +}; + +/** Create a runner coroutine for a homogeneous when_any task. + + Factory function that creates a wrapper coroutine for a child task + in the vector overload. Uses a runtime index instead of compile-time. + + @tparam T The result type of the task being wrapped. + @param inner The task to run (will be moved from). + @param state Shared state for winner coordination. + @param index Runtime index of this task in the vector. + @return Runner coroutine (must be started via resume()). +*/ +template +when_any_homogeneous_runner +make_when_any_homogeneous_runner(task inner, when_any_homogeneous_state* state, std::size_t index) +{ + if constexpr (std::is_void_v) + { + co_await std::move(inner); + state->try_win(index); // void tasks have no result to store + } + else + { + auto result = co_await std::move(inner); + if(state->try_win(index)) + { + try + { + state->set_winner_result(std::move(result)); + } + catch(...) + { + state->set_winner_exception(std::current_exception()); + } + } + } +} + +/** Awaitable that launches all runners for homogeneous when_any. + + Same lifetime concerns as when_any_launcher; see its documentation. + Uses runtime iteration over the task vector instead of compile-time + expansion over a tuple. + + @tparam T The common result type of all tasks in the vector. +*/ +template +class when_any_homogeneous_launcher +{ + /** Pointer to vector of tasks to launch. */ + std::vector>* tasks_; + + /** Pointer to shared state for coordination. */ + when_any_homogeneous_state* state_; + +public: + /** Construct launcher with task vector and shared state. + + @param tasks Pointer to vector of tasks (must outlive the await). + @param state Pointer to shared state for winner coordination. + */ + when_any_homogeneous_launcher( + std::vector>* tasks, + when_any_homogeneous_state* state) + : tasks_(tasks) + , state_(state) + { + } + + /** Check if the launcher can complete synchronously. + + @return True only if there are no tasks (degenerate case). + */ + bool await_ready() const noexcept + { + return tasks_->empty(); + } + + /** Launch all runner coroutines and suspend the parent. + + Sets up stop propagation from parent to children, then launches + each task in a runner coroutine. Returns noop_coroutine because + runners resume the parent via signal_completion(). + + CRITICAL: If the last task finishes synchronously then the parent + coroutine resumes, destroying its frame, and destroying this object + prior to the completion of await_suspend. Therefore, await_suspend + must ensure `this` cannot be referenced after calling `launch_one` + for the last time. + + @tparam Ex The executor type. + @param continuation Handle to the parent coroutine to resume later. + @param caller_ex Executor for dispatching child coroutines. + @param parent_token Stop token from the parent for cancellation propagation. + @return noop_coroutine; parent is resumed by the last completing task. + */ + template + coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {}) + { + state_->continuation_ = continuation; + state_->caller_ex_ = caller_ex; + + // Forward parent's stop requests to children + if(parent_token.stop_possible()) + { + state_->parent_stop_callback_.emplace( + parent_token, + typename when_any_homogeneous_state::stop_callback_fn{&state_->stop_source_}); + + if(parent_token.stop_requested()) + state_->stop_source_.request_stop(); + } + + auto num_tasks = tasks_->size(); + auto token = state_->stop_source_.get_token(); + for(std::size_t i = 0; i < num_tasks; ++i) + launch_one( i, caller_ex, token); + + return std::noop_coroutine(); + } + + /** Resume after all tasks complete. + + No return value; results are accessed via the shared state. + */ + void await_resume() const noexcept + { + } + +private: + /** Launch a single runner coroutine for task at the given index. + + Creates the runner, configures its promise with state and executor, + stores its handle for cleanup, and dispatches it for execution. + + @tparam Ex The executor type. + @param index Runtime index of the task in the vector. + @param caller_ex Executor for dispatching the runner. + @param token Stop token for cooperative cancellation. + + @pre Ex::dispatch() and coro::resume() must not throw. If they do, + the coroutine handle may leak. + */ + template + void launch_one(std::size_t index, Ex const& caller_ex, std::stop_token token) + { + auto runner = make_when_any_homogeneous_runner( + std::move((*tasks_)[index]), state_, index); + + auto h = runner.release(); + h.promise().state_ = state_; + h.promise().index_ = index; + h.promise().ex_ = caller_ex; + h.promise().stop_token_ = token; + + coro ch{h}; + state_->runner_handles_[index] = ch; + caller_ex.dispatch(ch).resume(); + } +}; + +} // namespace detail + +/** Wait for the first task to complete (homogeneous overload). + + Races a vector of tasks with the same result type. Simpler than the + heterogeneous overload: returns a direct pair instead of a variant + since all tasks share the same type. + + @par Example + @code + task example() { + std::vector> requests; + requests.push_back(fetch_from_server(0)); + requests.push_back(fetch_from_server(1)); + requests.push_back(fetch_from_server(2)); + + auto [index, response] = co_await when_any(std::move(requests)); + // index is 0, 1, or 2; response is the winner's Response + } + @endcode + + @tparam T The common result type of all tasks (must not be void). + @param tasks Vector of tasks to race concurrently (must not be empty). + @return A task yielding a pair of (winner_index, result). + @throws std::invalid_argument if tasks is empty. + + @par Key Features + @li All tasks are launched concurrently + @li Returns when first task completes (success or failure) + @li Stop is requested for all siblings + @li Waits for all siblings to complete before returning + @li If winner threw, that exception is rethrown + @li Returns simple pair (no variant needed for homogeneous types) +*/ +template + requires (!std::is_void_v) +[[nodiscard]] task> +when_any(std::vector> tasks) +{ + if(tasks.empty()) + throw std::invalid_argument("when_any requires at least one task"); + + using result_type = std::pair; + + detail::when_any_homogeneous_state state(tasks.size()); + + co_await detail::when_any_homogeneous_launcher(&tasks, &state); + + if(state.winner_exception_) + std::rethrow_exception(state.winner_exception_); + + co_return result_type{state.winner_index_, std::move(*state.result_)}; +} + +/** Wait for the first task to complete (homogeneous void overload). + + Races a vector of void-returning tasks. Since void tasks have no + result value, only the winner's index is returned. + + @param tasks Vector of void tasks to race concurrently (must not be empty). + @return A task yielding the winner's index (zero-based). + @throws std::invalid_argument if tasks is empty. + + @par Key Features + @li All tasks are launched concurrently + @li Returns when first task completes (success or failure) + @li Stop is requested for all siblings + @li Waits for all siblings to complete before returning + @li If winner threw, that exception is rethrown +*/ +[[nodiscard]] inline task +when_any(std::vector> tasks) +{ + if(tasks.empty()) + throw std::invalid_argument("when_any requires at least one task"); + + detail::when_any_homogeneous_state state(tasks.size()); + + co_await detail::when_any_homogeneous_launcher(&tasks, &state); + + if(state.winner_exception_) + std::rethrow_exception(state.winner_exception_); + + co_return state.winner_index_; +} + +/** Alias for vector when_any result type. + + For homogeneous when_any (vector overload), the result is simpler: + a pair of the winner's index and the result value directly (no variant + needed since all tasks share the same type). + + @par Example + @code + void on_complete(when_any_vector_result_type result); + @endcode + + @tparam T The common result type of all tasks in the vector. +*/ +template +using when_any_vector_result_type = std::pair; + +} // namespace capy +} // namespace boost + +#endif diff --git a/test/unit/ex/async_event.cpp b/test/unit/ex/async_event.cpp index 7ca2c100..68454a6c 100644 --- a/test/unit/ex/async_event.cpp +++ b/test/unit/ex/async_event.cpp @@ -26,45 +26,6 @@ static_assert(IoAwaitable); namespace { -/** Queuing executor that queues coroutines for manual execution control. -*/ -struct queuing_executor -{ - std::queue* queue_; - test_context* ctx_ = nullptr; - - explicit queuing_executor(std::queue& q) - : queue_(&q) - { - } - - bool operator==(queuing_executor const& other) const noexcept - { - return queue_ == other.queue_; - } - - execution_context& context() const noexcept - { - return ctx_ ? *ctx_ : default_test_context(); - } - - void on_work_started() const noexcept {} - void on_work_finished() const noexcept {} - - coro dispatch(coro h) const - { - queue_->push(h); - return std::noop_coroutine(); - } - - void post(coro h) const - { - queue_->push(h); - } -}; - -static_assert(Executor); - /** Run a task to completion by manually stepping through it. */ template diff --git a/test/unit/task.cpp b/test/unit/task.cpp index 9c81953c..e8ec0c45 100644 --- a/test/unit/task.cpp +++ b/test/unit/task.cpp @@ -15,7 +15,6 @@ #include "test_helpers.hpp" #include -#include #include #include #include @@ -75,46 +74,6 @@ struct tracking_executor static_assert(Executor); -/** Queuing executor that queues coroutines for manual execution control. - Returns noop_coroutine so the caller doesn't resume immediately. -*/ -struct queuing_executor -{ - std::queue* queue_; - test_context* ctx_ = nullptr; - - explicit queuing_executor(std::queue& q) - : queue_(&q) - { - } - - bool operator==(queuing_executor const& other) const noexcept - { - return queue_ == other.queue_; - } - - execution_context& context() const noexcept - { - return ctx_ ? *ctx_ : default_test_context(); - } - - void on_work_started() const noexcept {} - void on_work_finished() const noexcept {} - - coro dispatch(coro h) const - { - queue_->push(h); - return std::noop_coroutine(); - } - - void post(coro h) const - { - queue_->push(h); - } -}; - -static_assert(Executor); - /** Run a task to completion by manually stepping through it. Takes ownership of the task via release() and runs until done. @@ -153,20 +112,6 @@ inline void run_void_task(task t) run_task(std::move(t)); } -struct test_exception : std::runtime_error -{ - explicit test_exception(const char* msg) - : std::runtime_error(msg) - { - } -}; - -[[noreturn]] inline void -throw_test_exception(char const* msg) -{ - throw test_exception(msg); -} - struct task_test { static task diff --git a/test/unit/test_helpers.hpp b/test/unit/test_helpers.hpp index d956c901..0eced14c 100644 --- a/test/unit/test_helpers.hpp +++ b/test/unit/test_helpers.hpp @@ -20,11 +20,15 @@ #include #include #include +#include #include "test_suite.hpp" #include #include +#include +#include +#include #include #if defined(__linux__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__APPLE__) @@ -203,6 +207,144 @@ thread_name_starts_with(char const* prefix) #endif // BOOST_CAPY_TEST_CAN_GET_THREAD_NAME +//---------------------------------------------------------- +// Test Exception Types +//---------------------------------------------------------- + +/** Standard test exception type used across test files. */ +struct test_exception : std::runtime_error +{ + explicit test_exception(const char* msg) + : std::runtime_error(msg) + { + } +}; + +/** Helper to throw test_exception with a message. */ +[[noreturn]] inline void +throw_test_exception(char const* msg) +{ + throw test_exception(msg); +} + +//---------------------------------------------------------- +// Common Test Task Helpers +//---------------------------------------------------------- + +/** Returns a task that completes with an int value. */ +inline task +returns_int(int value) +{ + co_return value; +} + +/** Returns a task that completes with a string value. */ +inline task +returns_string(std::string value) +{ + co_return value; +} + +/** Returns a task that completes with void. */ +inline task +void_task() +{ + co_return; +} + +/** Returns a task that throws an exception. */ +inline task +throws_exception(char const* msg) +{ + throw_test_exception(msg); + co_return 0; +} + +/** Returns a void task that throws an exception. */ +inline task +void_throws_exception(char const* msg) +{ + throw_test_exception(msg); + co_return; +} + +//---------------------------------------------------------- +// Queuing Executor +//---------------------------------------------------------- + +/** Queuing executor that allows controlled interleaving of tasks. + + Unlike test_executor which runs tasks synchronously, this executor + queues work and runs it in FIFO order when manually resumed. + This allows tasks to observe stop requests between suspension points. +*/ +struct queuing_executor +{ + std::queue* queue_; + test_context* ctx_ = nullptr; + + explicit queuing_executor(std::queue& q) + : queue_(&q) + { + } + + bool operator==(queuing_executor const& other) const noexcept + { + return queue_ == other.queue_; + } + + execution_context& context() const noexcept + { + return ctx_ ? *ctx_ : default_test_context(); + } + + void on_work_started() const noexcept {} + void on_work_finished() const noexcept {} + + coro dispatch(coro h) const + { + queue_->push(h); + return std::noop_coroutine(); + } + + void post(coro h) const + { + queue_->push(h); + } +}; + +static_assert(Executor); + +//---------------------------------------------------------- +// Yield Awaitable +//---------------------------------------------------------- + +/** Awaitable that yields to the executor, allowing other tasks to run. + + When awaited, this suspends the current coroutine and posts it back + to the executor's queue. This creates a yield point where the task + can be interleaved with other tasks. +*/ +struct yield_awaitable +{ + bool await_ready() const noexcept + { + return false; + } + + template + coro await_suspend(coro h, Ex const& ex, std::stop_token) + { + // Post ourselves back to the queue + ex.post(h); + return std::noop_coroutine(); + } + + void await_resume() const noexcept + { + } +}; + } // capy } // boost diff --git a/test/unit/when_all.cpp b/test/unit/when_all.cpp index 2e8aecab..121a34fd 100644 --- a/test/unit/when_all.cpp +++ b/test/unit/when_all.cpp @@ -63,54 +63,8 @@ static_assert(std::is_void_v< // Verify when_all returns task which satisfies awaitable protocols static_assert(IoAwaitableTask>>); -struct test_exception : std::runtime_error -{ - explicit test_exception(const char* msg) - : std::runtime_error(msg) - { - } -}; - -[[noreturn]] inline void -throw_test_exception(char const* msg) -{ - throw test_exception(msg); -} - struct when_all_test { - // Helper tasks - static task - returns_int(int value) - { - co_return value; - } - - static task - returns_string(std::string value) - { - co_return value; - } - - static task - void_task() - { - co_return; - } - - static task - throws_exception(char const* msg) - { - throw_test_exception(msg); - co_return 0; - } - - static task - void_throws_exception(char const* msg) - { - throw_test_exception(msg); - co_return; - } // Test: Single task with when_all succeeds void diff --git a/test/unit/when_any.cpp b/test/unit/when_any.cpp new file mode 100644 index 00000000..480dbdd8 --- /dev/null +++ b/test/unit/when_any.cpp @@ -0,0 +1,1799 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Test that header file is self-contained. +#include + +#include +#include +#include +#include + +#include "test_helpers.hpp" +#include "test_suite.hpp" + +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { + +// Static assertions for result type +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +// Void becomes monostate in the variant +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +// Duplicate types are deduplicated (variant requires unique types) +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +// All void tasks deduplicate to single monostate +static_assert(std::is_same_v< + when_any_result_type, + std::pair>>); + +// Verify when_any returns task which satisfies awaitable protocols +static_assert(IoAwaitable< + task>>); + +struct when_any_test +{ + //---------------------------------------------------------- + // Basic functionality tests + //---------------------------------------------------------- + + // Test: Single task returns immediately + void + testSingleTask() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + std::size_t winner_index = 999; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result = std::get<0>(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(42))); + + BOOST_TEST(completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result, 42); + } + + // Test: Two tasks - first completes wins + void + testTwoTasksFirstWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + // Note: when_any_result_type deduplicates to variant + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + // Variant is deduplicated to single int type + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(10), returns_int(20))); + + BOOST_TEST(completed); + // One of them should win, with correct index-to-value mapping + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 10); + else + BOOST_TEST_EQ(result_value, 20); + } + + // Test: Three tasks with different types + void + testMixedTypes() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(returns_int(1), returns_string("hello"), returns_int(3))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1 || winner_index == 2); + if (winner_index == 0) + BOOST_TEST_EQ(std::get(result_value), 1); + else if (winner_index == 1) + BOOST_TEST_EQ(std::get(result_value), "hello"); + else + BOOST_TEST_EQ(std::get(result_value), 3); + } + + // Test: Void task can win + void + testVoidTaskWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(void_task(), returns_int(42))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST(std::holds_alternative(result_value)); + else + BOOST_TEST_EQ(std::get(result_value), 42); + } + + // Test: All void tasks + void + testAllVoidTasks() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(void_task(), void_task(), void_task())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1 || winner_index == 2); + // All void tasks produce monostate regardless of index + BOOST_TEST(std::holds_alternative(result_value)); + } + + //---------------------------------------------------------- + // Exception handling tests + //---------------------------------------------------------- + + // Test: Exception from single task propagates + void + testSingleTaskException() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [&](when_any_result_type) { completed = true; }, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(throws_exception("test error"))); + + BOOST_TEST(!completed); + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "test error"); + } + + // Test: Exception wins the race (exception is a valid completion) + void + testExceptionWinsRace() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](when_any_result_type) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(throws_exception("winner error"), returns_int(42))); + + // With synchronous executor, first task (the thrower) wins + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "winner error"); + } + + // Test: Void task exception + void + testVoidTaskException() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](when_any_result_type) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(void_throws_exception("void error"), returns_int(42))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "void error"); + } + + // Test: Multiple exceptions - first wins + void + testMultipleExceptionsFirstWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](when_any_result_type) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any( + throws_exception("error_1"), + throws_exception("error_2"), + throws_exception("error_3"))); + + BOOST_TEST(caught_exception); + // One of them wins + BOOST_TEST( + error_msg == "error_1" || + error_msg == "error_2" || + error_msg == "error_3"); + } + + //---------------------------------------------------------- + // Stop token propagation tests + //---------------------------------------------------------- + + // Test: Stop is requested when winner completes + void + testStopRequestedOnCompletion() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&]() -> task { + ++completion_count; + co_return completion_count.load(); + }; + + run_async(ex, + [&](when_any_result_type) { + completed = true; + }, + [](std::exception_ptr) {})( + when_any(counting_task(), counting_task(), counting_task())); + + BOOST_TEST(completed); + // All three tasks should run to completion + // (stop is requested, but synchronous tasks complete anyway) + BOOST_TEST_EQ(completion_count.load(), 3); + } + + // Test: All tasks complete even after winner (cleanup) + void + testAllTasksCompleteForCleanup() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&](int value) -> task { + ++completion_count; + co_return value; + }; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + // Winner should be first task (synchronous executor) + BOOST_TEST_EQ(r.first, 0u); + }, + [](std::exception_ptr) {})( + when_any( + counting_task(1), + counting_task(2), + counting_task(3), + counting_task(4))); + + BOOST_TEST(completed); + // All four tasks must complete for proper cleanup + BOOST_TEST_EQ(completion_count.load(), 4); + } + + //---------------------------------------------------------- + // Long-lived task cancellation tests + //---------------------------------------------------------- + + // Test: Long-lived tasks exit early when stop is requested + void + testLongLivedTasksCancelledOnWinner() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + // A task that completes immediately + auto fast_task = [&]() -> task { + ++completed_normally_count; + co_return 42; + }; + + // A task that does multiple steps, checking stop token between each + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; // Cancelled + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + winner_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(fast_task(), slow_task(100, 10), slow_task(200, 10))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); // fast_task wins + BOOST_TEST_EQ(winner_value, 42); + + // The fast task completed normally + BOOST_TEST_EQ(completed_normally_count.load(), 1); + + // Both slow tasks should have been cancelled + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + // Test: Slow task can win if it finishes first + void + testSlowTaskCanWin() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + // A task that does a few steps then completes + auto medium_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + // Task 0: 3 steps, Task 1: 1 step (wins), Task 2: 4 steps + // With FIFO scheduling, task1 completes after 1 yield while others + // are still in progress and will observe the stop request. + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + winner_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(medium_task(10, 3), medium_task(20, 1), medium_task(30, 4))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 1u); // Task with 1 step wins + BOOST_TEST_EQ(winner_value, 20); + + // Only the winner completed normally + BOOST_TEST_EQ(completed_normally_count.load(), 1); + + // Other two tasks were cancelled + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + // Test: Tasks that don't check stop token still complete (cleanup) + void + testNonCooperativeTasksStillComplete() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic completion_count{0}; + bool when_any_completed = false; + + // A task that completes immediately + auto fast_task = [&]() -> task { + ++completion_count; + co_return 42; + }; + + // A task that ignores stop token (non-cooperative) + auto non_cooperative_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + // Deliberately NOT checking stop token + co_await yield_awaitable{}; + } + ++completion_count; + co_return id; + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + BOOST_TEST_EQ(r.first, 0u); // fast_task wins + }, + [](std::exception_ptr) {})( + when_any(fast_task(), non_cooperative_task(100, 3), non_cooperative_task(200, 3))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + + // All three tasks complete (non-cooperative tasks run to completion) + BOOST_TEST_EQ(completion_count.load(), 3); + } + + // Test: Mixed cooperative and non-cooperative tasks + void + testMixedCooperativeAndNonCooperativeTasks() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cooperative_cancelled{0}; + std::atomic non_cooperative_finished{0}; + std::atomic winner_finished{0}; + bool when_any_completed = false; + + auto fast_task = [&]() -> task { + ++winner_finished; + co_return 1; + }; + + auto cooperative_slow = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cooperative_cancelled; + co_return -1; + } + co_await yield_awaitable{}; + } + co_return 2; + }; + + auto non_cooperative_slow = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + co_await yield_awaitable{}; + } + ++non_cooperative_finished; + co_return 3; + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + BOOST_TEST_EQ(r.first, 0u); + }, + [](std::exception_ptr) {})( + when_any(fast_task(), cooperative_slow(5), non_cooperative_slow(5))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_finished.load(), 1); + BOOST_TEST_EQ(cooperative_cancelled.load(), 1); + BOOST_TEST_EQ(non_cooperative_finished.load(), 1); + } + + //---------------------------------------------------------- + // Nested when_any tests + //---------------------------------------------------------- + + // Test: Nested when_any + void + testNestedWhenAny() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + + auto inner1 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(10), returns_int(20)); + co_return std::get(res); + }; + + auto inner2 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(30), returns_int(40)); + co_return std::get(res); + }; + + std::size_t winner_index = 999; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(inner1(), inner2())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // inner1 returns 10 or 20, inner2 returns 30 or 40 + if (winner_index == 0) + BOOST_TEST(result == 10 || result == 20); + else + BOOST_TEST(result == 30 || result == 40); + } + + // Test: when_any inside when_all + void + testWhenAnyInsideWhenAll() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + auto race1 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(1), returns_int(2)); + co_return std::get(res); + }; + + auto race2 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(3), returns_int(4)); + co_return std::get(res); + }; + + run_async(ex, + [&](std::tuple t) { + auto [a, b] = t; + completed = true; + BOOST_TEST((a == 1 || a == 2)); + BOOST_TEST((b == 3 || b == 4)); + }, + [](std::exception_ptr) {})( + when_all(race1(), race2())); + + BOOST_TEST(completed); + } + + // Test: when_all inside when_any + void + testWhenAllInsideWhenAny() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto concurrent1 = []() -> task { + auto [a, b] = co_await when_all(returns_int(1), returns_int(2)); + co_return a + b; + }; + + auto concurrent2 = []() -> task { + auto [a, b] = co_await when_all(returns_int(3), returns_int(4)); + co_return a + b; + }; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(concurrent1(), concurrent2())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // concurrent1 returns 1+2=3, concurrent2 returns 3+4=7 + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 3); + else + BOOST_TEST_EQ(result_value, 7); + } + + //---------------------------------------------------------- + // Edge case tests + //---------------------------------------------------------- + + // Test: Large number of tasks + void + testManyTasks() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + run_async(ex, + [&](auto r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(when_any( + returns_int(1), returns_int(2), returns_int(3), returns_int(4), + returns_int(5), returns_int(6), returns_int(7), returns_int(8))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 8); + // Verify correct index-to-value mapping (index 0 -> value 1, etc.) + BOOST_TEST_EQ(result_value, static_cast(winner_index + 1)); + } + + // Test: Task that does multiple internal operations + static task + multi_step_task(int start) + { + int value = start; + value += co_await returns_int(1); + value += co_await returns_int(2); + co_return value; + } + + void + testTasksWithMultipleSteps() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(multi_step_task(10), multi_step_task(20))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // Index 0: 10+1+2=13, Index 1: 20+1+2=23 + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 13); + else + BOOST_TEST_EQ(result_value, 23); + } + + //---------------------------------------------------------- + // Awaitable lifecycle tests + //---------------------------------------------------------- + + // Test: when_any result is move constructible + void + testAwaitableMoveConstruction() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto awaitable1 = when_any(returns_int(1), returns_int(2)); + auto awaitable2 = std::move(awaitable1); + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(std::move(awaitable2)); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 1); + else + BOOST_TEST_EQ(result_value, 2); + } + + // Test: when_any can be stored and awaited later + void + testDeferredAwait() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto deferred = when_any(returns_int(10), returns_int(20)); + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(std::move(deferred)); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 10); + else + BOOST_TEST_EQ(result_value, 20); + } + + //---------------------------------------------------------- + // Protocol compliance tests + //---------------------------------------------------------- + + // Test: when_any returns task which satisfies IoAwaitable concept + void + testIoAwaitableConcept() + { + static_assert(IoAwaitable< + task>>); + + static_assert(IoAwaitable< + task>>); + + static_assert(IoAwaitable< + task>>); + + static_assert(IoAwaitable< + task>>); + } + + //---------------------------------------------------------- + // Variant access tests + //---------------------------------------------------------- + + // Test: Correct variant alternative is populated + void + testVariantAlternativePopulated() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + // Note: deduplicates to variant + run_async(ex, + [&](when_any_result_type r) { + completed = true; + // With synchronous executor, first task wins + BOOST_TEST_EQ(r.first, 0u); + BOOST_TEST(std::holds_alternative(r.second)); + BOOST_TEST_EQ(std::get(r.second), 42); + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"), returns_int(99))); + + BOOST_TEST(completed); + } + + // Test: Can use std::visit on result variant + void + testVariantVisit() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(std::get(result_value), 42); + else + BOOST_TEST_EQ(std::get(result_value), "hello"); + } + + //---------------------------------------------------------- + // Parent stop token propagation tests + //---------------------------------------------------------- + + // Test: Parent stop token already requested before when_any starts + void + testParentStopAlreadyRequested() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic saw_stop_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + // A task that checks stop token on first suspension + auto check_stop_task = [&](int id) -> task { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++saw_stop_count; + } + co_return id; + }; + + // Use a stop_source to simulate parent cancellation + std::stop_source parent_stop; + parent_stop.request_stop(); + + // Use run_async with stop_token parameter to test propagation + run_async(ex, parent_stop.get_token(), + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(check_stop_task(1), check_stop_task(2), check_stop_task(3))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // All tasks should have seen the stop token as requested + // (inherited from parent) + BOOST_TEST_EQ(saw_stop_count.load(), 3); + } + + // Test: Parent stop requested after tasks start but before winner + void + testParentStopDuringExecution() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + bool when_any_completed = false; + + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + co_return id; + }; + + std::stop_source parent_stop; + + // Use run_async with stop_token parameter + run_async(ex, parent_stop.get_token(), + [&](when_any_result_type) { + when_any_completed = true; + }, + [](std::exception_ptr) {})( + when_any(slow_task(1, 10), slow_task(2, 10))); + + // Run a few iterations, then request parent stop + for (int i = 0; i < 3 && !work_queue.empty(); ++i) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + // Request stop from parent + parent_stop.request_stop(); + + // Finish processing + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // Both tasks should have been cancelled by parent stop + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + //---------------------------------------------------------- + // Interleaved exception tests + //---------------------------------------------------------- + + // Test: Multiple exceptions thrown with interleaved execution + void + testInterleavedExceptions() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + bool caught_exception = false; + std::string error_msg; + + // Tasks that yield before throwing + auto delayed_throw = [](int id, int yields) -> task { + for (int i = 0; i < yields; ++i) { + co_await yield_awaitable{}; + } + throw test_exception(("error_" + std::to_string(id)).c_str()); + co_return id; + }; + + run_async(ex, + [](when_any_result_type) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(delayed_throw(1, 2), delayed_throw(2, 1), delayed_throw(3, 3))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(caught_exception); + // Task 2 throws first (after 1 yield) + BOOST_TEST_EQ(error_msg, "error_2"); + } + + //---------------------------------------------------------- + // Nested stop propagation tests + //---------------------------------------------------------- + + // Test: Stop propagates through nested when_any - outer task cancelled before inner starts + void + testNestedStopPropagationOuterCancelled() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic outer_cancelled{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + auto fast_task = [&]() -> task { + co_return 42; + }; + + // A task that checks stop before launching inner when_any + auto nested_when_any_task = [&]() -> task { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++outer_cancelled; + co_return -1; + } + // Won't reach here if stopped + co_return 100; + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(fast_task(), nested_when_any_task())); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); // fast_task wins + // The nested task should see stop and exit early + BOOST_TEST_EQ(outer_cancelled.load(), 1); + } + + // Test: Stop propagates to inner when_any's children + void + testNestedStopPropagationInnerCancelled() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic inner_cancelled{0}; + std::atomic inner_completed{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + // Fast task that yields first to let nested when_any start + auto yielding_fast_task = [&]() -> task { + co_await yield_awaitable{}; + co_return 42; + }; + + auto slow_inner_task = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++inner_cancelled; + co_return -1; + } + co_await yield_awaitable{}; + } + ++inner_completed; + co_return 100; + }; + + // A task containing a nested when_any - doesn't check stop first + auto nested_when_any_task = [&]() -> task { + // Start inner when_any immediately (no stop check first) + auto [idx, res] = co_await when_any( + slow_inner_task(10), + slow_inner_task(10)); + co_return std::get(res); + }; + + run_async(ex, + [&](when_any_result_type r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(yielding_fast_task(), nested_when_any_task())); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // One of them should win + BOOST_TEST(winner_index == 0 || winner_index == 1); + + if (winner_index == 0) { + // If yielding_fast_task won, the inner tasks should be cancelled + BOOST_TEST_EQ(inner_cancelled.load(), 2); + BOOST_TEST_EQ(inner_completed.load(), 0); + } else { + // If nested_when_any_task won (one of its inner tasks completed) + // one inner task completes, other gets cancelled + BOOST_TEST_EQ(inner_completed.load(), 1); + BOOST_TEST_EQ(inner_cancelled.load(), 1); + } + } + + //---------------------------------------------------------- + // Variant usage pattern tests + //---------------------------------------------------------- + + // Test: Document correct pattern for variant access based on index + void + testVariantAccessByIndex() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + bool correct_access = false; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + // The correct pattern: use index to determine which type to access + switch (r.first) { + case 0: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), 42); + break; + case 1: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), "hello"); + break; + case 2: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), 3.14); + break; + } + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"), []() -> task { co_return 3.14; }())); + + BOOST_TEST(completed); + BOOST_TEST(correct_access); + } + + // Test: Variant with duplicate types - index disambiguation + void + testVariantDuplicateTypesIndexDisambiguation() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + // when_any(int, int, int) deduplicates to variant + // but winner_index tells us WHICH task won + run_async(ex, + [&](when_any_result_type r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(100), returns_int(200), returns_int(300))); + + BOOST_TEST(completed); + // With synchronous executor, first task wins + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result_value, 100); + } + + void + run() + { + // Basic functionality + testSingleTask(); + testTwoTasksFirstWins(); + testMixedTypes(); + testVoidTaskWins(); + testAllVoidTasks(); + + // Exception handling + testSingleTaskException(); + testExceptionWinsRace(); + testVoidTaskException(); + testMultipleExceptionsFirstWins(); + + // Stop token propagation + testStopRequestedOnCompletion(); + testAllTasksCompleteForCleanup(); + + // Parent stop token propagation + testParentStopAlreadyRequested(); + testParentStopDuringExecution(); + + // Long-lived task cancellation + testLongLivedTasksCancelledOnWinner(); + testSlowTaskCanWin(); + testNonCooperativeTasksStillComplete(); + testMixedCooperativeAndNonCooperativeTasks(); + + // Interleaved exceptions + testInterleavedExceptions(); + + // Nested combinators + testNestedWhenAny(); + testWhenAnyInsideWhenAll(); + testWhenAllInsideWhenAny(); + + // Nested stop propagation + testNestedStopPropagationOuterCancelled(); + testNestedStopPropagationInnerCancelled(); + + // Edge cases + testManyTasks(); + testTasksWithMultipleSteps(); + + // Awaitable lifecycle + testAwaitableMoveConstruction(); + testDeferredAwait(); + + // Protocol compliance + testIoAwaitableConcept(); + + // Variant access + testVariantAlternativePopulated(); + testVariantVisit(); + testVariantAccessByIndex(); + testVariantDuplicateTypesIndexDisambiguation(); + } +}; + +TEST_SUITE( + when_any_test, + "boost.capy.when_any"); + +//---------------------------------------------------------- +// Homogeneous when_any tests (vector overload) +//---------------------------------------------------------- + +struct when_any_vector_test +{ + //---------------------------------------------------------- + // Basic functionality tests + //---------------------------------------------------------- + + // Test: Single task in vector + void + testSingleTaskVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + std::size_t winner_index = 999; + + std::vector> tasks; + tasks.push_back(returns_int(42)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result, 42); + } + + // Test: Multiple tasks in vector + void + testMultipleTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + std::vector> tasks; + tasks.push_back(returns_int(10)); + tasks.push_back(returns_int(20)); + tasks.push_back(returns_int(30)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 3); + // Verify correct index-to-value mapping + BOOST_TEST_EQ(result_value, static_cast((winner_index + 1) * 10)); + } + + // Test: Empty vector throws + void + testEmptyVectorThrows() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + + std::vector> tasks; + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (std::invalid_argument const&) { + caught_exception = true; + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + } + + // Test: Void tasks in vector + void + testVoidTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + + std::vector> tasks; + tasks.push_back(void_task()); + tasks.push_back(void_task()); + tasks.push_back(void_task()); + + run_async(ex, + [&](std::size_t idx) { + completed = true; + winner_index = idx; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 3); + } + + //---------------------------------------------------------- + // Exception handling tests + //---------------------------------------------------------- + + // Test: Exception from task in vector + void + testExceptionInVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(throws_exception("vector error")); + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "vector error"); + } + + // Test: Exception wins race in vector + void + testExceptionWinsRaceVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(throws_exception("winner")); + tasks.push_back(returns_int(42)); + tasks.push_back(returns_int(99)); + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "winner"); + } + + // Test: Void task exception in vector + void + testVoidExceptionInVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(void_throws_exception("void vector error")); + tasks.push_back(void_task()); + + run_async(ex, + [](std::size_t) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "void vector error"); + } + + //---------------------------------------------------------- + // Stop token propagation tests + //---------------------------------------------------------- + + // Test: All tasks complete for cleanup (vector) + void + testAllTasksCompleteForCleanupVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&](int value) -> task { + ++completion_count; + co_return value; + }; + + std::vector> tasks; + tasks.push_back(counting_task(1)); + tasks.push_back(counting_task(2)); + tasks.push_back(counting_task(3)); + tasks.push_back(counting_task(4)); + + run_async(ex, + [&](std::pair) { + completed = true; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + // All four tasks must complete for proper cleanup + BOOST_TEST_EQ(completion_count.load(), 4); + } + + //---------------------------------------------------------- + // Long-lived task cancellation tests (vector) + //---------------------------------------------------------- + + // Test: Long-lived tasks cancelled on winner (vector) + void + testLongLivedTasksCancelledVector() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + auto fast_task = [&]() -> task { + ++completed_normally_count; + co_return 42; + }; + + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await get_stop_token(); + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + std::vector> tasks; + tasks.push_back(fast_task()); + tasks.push_back(slow_task(100, 10)); + tasks.push_back(slow_task(200, 10)); + + run_async(ex, + [&](std::pair r) { + when_any_completed = true; + winner_index = r.first; + winner_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(winner_value, 42); + BOOST_TEST_EQ(completed_normally_count.load(), 1); + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + //---------------------------------------------------------- + // Large vector tests + //---------------------------------------------------------- + + // Test: Many tasks in vector + void + testManyTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + std::vector> tasks; + for (int i = 1; i <= 20; ++i) + tasks.push_back(returns_int(i)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 20); + // Verify correct index-to-value mapping (index 0 -> value 1, etc.) + BOOST_TEST_EQ(result_value, static_cast(winner_index + 1)); + } + + //---------------------------------------------------------- + // Nested combinator tests + //---------------------------------------------------------- + + // Test: Nested when_any with vectors + void + testNestedWhenAnyVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + + auto inner = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(10)); + tasks.push_back(returns_int(20)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + std::vector> outer_tasks; + outer_tasks.push_back(inner()); + outer_tasks.push_back(inner()); + + run_async(ex, + [&](std::pair r) { + completed = true; + result = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(outer_tasks))); + + BOOST_TEST(completed); + BOOST_TEST(result == 10 || result == 20); + } + + // Test: when_any vector inside when_all + void + testWhenAnyVectorInsideWhenAll() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + auto race = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(1)); + tasks.push_back(returns_int(2)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + run_async(ex, + [&](std::tuple t) { + auto [a, b] = t; + completed = true; + BOOST_TEST((a == 1 || a == 2)); + BOOST_TEST((b == 1 || b == 2)); + }, + [](std::exception_ptr) {})( + when_all(race(), race())); + + BOOST_TEST(completed); + } + + //---------------------------------------------------------- + // Mixed variadic and vector tests + //---------------------------------------------------------- + + // Test: Mix variadic and vector when_any + void + testMixedVariadicAndVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t outer_winner = 999; + + auto variadic_race = []() -> task { + auto [idx, res] = co_await when_any(returns_int(1), returns_int(2)); + co_return std::get(res); + }; + + auto vector_race = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(3)); + tasks.push_back(returns_int(4)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + run_async(ex, + [&](when_any_result_type r) { + completed = true; + outer_winner = r.first; + auto result = std::get(r.second); + if (outer_winner == 0) + BOOST_TEST((result == 1 || result == 2)); + else + BOOST_TEST((result == 3 || result == 4)); + }, + [](std::exception_ptr) {})( + when_any(variadic_race(), vector_race())); + + BOOST_TEST(completed); + } + + void + run() + { + // Basic functionality + testSingleTaskVector(); + testMultipleTasksVector(); + testEmptyVectorThrows(); + testVoidTasksVector(); + + // Exception handling + testExceptionInVector(); + testExceptionWinsRaceVector(); + testVoidExceptionInVector(); + + // Stop token propagation + testAllTasksCompleteForCleanupVector(); + + // Long-lived task cancellation + testLongLivedTasksCancelledVector(); + + // Large vectors + testManyTasksVector(); + + // Nested combinators + testNestedWhenAnyVector(); + testWhenAnyVectorInsideWhenAll(); + + // Mixed variadic and vector + testMixedVariadicAndVector(); + } +}; + +TEST_SUITE( + when_any_vector_test, + "boost.capy.when_any_vector"); + +} // capy +} // boost