From d2f5614e79708ab852e846c102d4244f0f6f68c2 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 18 Jan 2026 12:30:19 -0800 Subject: [PATCH 1/2] Update docs --- doc/modules/ROOT/nav.adoc | 4 + .../ROOT/pages/coroutines/affinity.adoc | 3 +- .../ROOT/pages/coroutines/cancellation.adoc | 2 +- .../ROOT/pages/coroutines/launching.adoc | 19 +- doc/modules/ROOT/pages/coroutines/tasks.adoc | 3 +- .../ROOT/pages/coroutines/when-all.adoc | 274 +++++++++++++++++ .../ROOT/pages/execution/contexts.adoc | 2 +- .../ROOT/pages/execution/executors.adoc | 2 +- .../pages/execution/frame-allocation.adoc | 2 +- doc/modules/ROOT/pages/execution/strand.adoc | 272 +++++++++++++++++ .../ROOT/pages/execution/thread-pool.adoc | 1 + doc/modules/ROOT/pages/index.adoc | 9 +- .../pages/synchronization/async-mutex.adoc | 284 ++++++++++++++++++ doc/modules/ROOT/pages/utilities/file-io.adoc | 268 ++++++++--------- 14 files changed, 991 insertions(+), 154 deletions(-) create mode 100644 doc/modules/ROOT/pages/coroutines/when-all.adoc create mode 100644 doc/modules/ROOT/pages/execution/strand.adoc create mode 100644 doc/modules/ROOT/pages/synchronization/async-mutex.adoc diff --git a/doc/modules/ROOT/nav.adoc b/doc/modules/ROOT/nav.adoc index 38e4a5e2..7ca61c63 100644 --- a/doc/modules/ROOT/nav.adoc +++ b/doc/modules/ROOT/nav.adoc @@ -3,13 +3,17 @@ * Coroutines ** xref:coroutines/tasks.adoc[Tasks] ** xref:coroutines/launching.adoc[Launching Tasks] +** xref:coroutines/when-all.adoc[Concurrent Composition] ** xref:coroutines/affinity.adoc[Executor Affinity] ** xref:coroutines/cancellation.adoc[Cancellation] * Execution ** xref:execution/thread-pool.adoc[Thread Pool] ** xref:execution/contexts.adoc[Execution Contexts] ** xref:execution/executors.adoc[Executors] +** xref:execution/strand.adoc[Strands] ** xref:execution/frame-allocation.adoc[Frame Allocation] +* Synchronization +** xref:synchronization/async-mutex.adoc[Async Mutex] * Buffers ** xref:buffers/index.adoc[Buffer Types] ** xref:buffers/sequences.adoc[Buffer Sequences] diff --git a/doc/modules/ROOT/pages/coroutines/affinity.adoc b/doc/modules/ROOT/pages/coroutines/affinity.adoc index f189a981..8a5d5950 100644 --- a/doc/modules/ROOT/pages/coroutines/affinity.adoc +++ b/doc/modules/ROOT/pages/coroutines/affinity.adoc @@ -230,5 +230,6 @@ Do NOT use `run_on` when: == Next Steps +* xref:when-all.adoc[Concurrent Composition] — Running multiple tasks in parallel * xref:cancellation.adoc[Cancellation] — Stop token propagation -* xref:../execution/executors.adoc[Executors] — The execution model in depth +* xref:../execution/strand.adoc[Strands] — Serializing coroutine execution diff --git a/doc/modules/ROOT/pages/coroutines/cancellation.adoc b/doc/modules/ROOT/pages/coroutines/cancellation.adoc index a76f6e6c..eb63afc7 100644 --- a/doc/modules/ROOT/pages/coroutines/cancellation.adoc +++ b/doc/modules/ROOT/pages/coroutines/cancellation.adoc @@ -264,5 +264,5 @@ Do NOT use cancellation when: == Next Steps +* xref:when-all.adoc[Concurrent Composition] — Cancellation with `when_all` * xref:../execution/executors.adoc[Executors] — Understand the execution model -* xref:reference:boost/capy.adoc[API Reference] — Full reference documentation diff --git a/doc/modules/ROOT/pages/coroutines/launching.adoc b/doc/modules/ROOT/pages/coroutines/launching.adoc index b6546a01..c6523e1a 100644 --- a/doc/modules/ROOT/pages/coroutines/launching.adoc +++ b/doc/modules/ROOT/pages/coroutines/launching.adoc @@ -124,18 +124,20 @@ runner(make_task()); // Won't compile (deleted move) This design ensures the frame allocator is active when your task is created, enabling frame recycling optimization. -== Custom Frame Allocators +== Stop Token Support -By default, `run_async` uses a recycling allocator that caches deallocated -frames. For custom allocation strategies: +Pass a stop token for cooperative cancellation: [source,cpp] ---- -my_pool_allocator alloc{pool}; -run_async(ex, alloc)(my_task()); +std::stop_source source; +run_async(ex, source.get_token())(cancellable_task()); + +// Later: request cancellation +source.request_stop(); ---- -The allocator is used for all coroutine frames in the launched call tree. +See xref:cancellation.adoc[Cancellation] for details on stop token propagation. == When NOT to Use run_async @@ -165,11 +167,12 @@ Do NOT use `run_async` when: | Success + error handlers | `run_async(ex)(task, on_success, on_error)` -| Custom allocator -| `run_async(ex, alloc)(task)` +| With stop token +| `run_async(ex, stop_token)(task)` |=== == Next Steps +* xref:when-all.adoc[Concurrent Composition] — Run multiple tasks in parallel * xref:affinity.adoc[Executor Affinity] — Control where coroutines execute * xref:../execution/frame-allocation.adoc[Frame Allocation] — Optimize memory usage diff --git a/doc/modules/ROOT/pages/coroutines/tasks.adoc b/doc/modules/ROOT/pages/coroutines/tasks.adoc index 3adbfd77..b36b68e0 100644 --- a/doc/modules/ROOT/pages/coroutines/tasks.adoc +++ b/doc/modules/ROOT/pages/coroutines/tasks.adoc @@ -170,7 +170,7 @@ Tasks are appropriate when: Tasks are NOT appropriate when: * The operation is purely synchronous — just use a regular function -* You need parallel execution — tasks are sequential; use parallel composition +* You need parallel execution — tasks are sequential; use `when_all` for concurrency * You need to detach and forget — tasks must be awaited or explicitly launched == Summary @@ -197,4 +197,5 @@ Tasks are NOT appropriate when: Now that you understand tasks, learn how to run them: * xref:launching.adoc[Launching Tasks] — Start tasks with `run_async` +* xref:when-all.adoc[Concurrent Composition] — Run tasks in parallel with `when_all` * xref:affinity.adoc[Executor Affinity] — Control where tasks execute diff --git a/doc/modules/ROOT/pages/coroutines/when-all.adoc b/doc/modules/ROOT/pages/coroutines/when-all.adoc new file mode 100644 index 00000000..0072b3c2 --- /dev/null +++ b/doc/modules/ROOT/pages/coroutines/when-all.adoc @@ -0,0 +1,274 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + += Concurrent Composition + +This page explains how to run multiple tasks concurrently using `when_all`. + +NOTE: Code snippets assume `using namespace boost::capy;` is in effect. + +== The Problem + +Tasks are sequential by default. When you await multiple tasks: + +[source,cpp] +---- +task sequential() +{ + int a = co_await fetch_a(); // Wait for A + int b = co_await fetch_b(); // Then wait for B + int c = co_await fetch_c(); // Then wait for C + // Total time: A + B + C +} +---- + +Each task waits for the previous one to complete. For independent operations, +this wastes time. + +== when_all + +The `when_all` function launches multiple tasks concurrently and waits for +all of them to complete: + +[source,cpp] +---- +#include + +task concurrent() +{ + auto [a, b, c] = co_await when_all( + fetch_a(), + fetch_b(), + fetch_c() + ); + // Total time: max(A, B, C) +} +---- + +All three fetches run in parallel. The `co_await` completes when the slowest +one finishes. + +== Return Value + +`when_all` returns a tuple of results, with void types filtered out: + +[source,cpp] +---- +// All non-void: get a tuple of all results +auto [x, y] = co_await when_all( + task_returning_int(), // task + task_returning_string() // task +); +// x is int, y is std::string + +// Mixed with void: void tasks don't contribute +auto [value] = co_await when_all( + task_returning_int(), // task + task_void(), // task - no contribution + task_void() // task - no contribution +); +// value is int (only non-void result) + +// All void: returns void +co_await when_all( + task_void(), + task_void() +); +// No tuple, no return value +---- + +Results appear in the same order as the input tasks. + +== Error Handling + +Exceptions propagate from child tasks to the parent. When a task throws: + +1. The exception is captured +2. Stop is requested for sibling tasks +3. All tasks are allowed to complete (or respond to stop) +4. The first exception is rethrown + +[source,cpp] +---- +task handle_errors() +{ + try { + co_await when_all( + might_fail(), + another_task(), + third_task() + ); + } catch (std::exception const& e) { + // First exception from any child + std::cerr << "Error: " << e.what() << "\n"; + } +} +---- + +=== First-Error Semantics + +Only the first exception is captured; subsequent exceptions are discarded. +This matches the behavior of most concurrent frameworks. + +=== Stop Propagation + +When an error occurs, `when_all` requests stop for all sibling tasks. Tasks +that support cancellation can respond by exiting early: + +[source,cpp] +---- +task cancellable_work() +{ + auto token = co_await get_stop_token(); + for (int i = 0; i < 1000; ++i) + { + if (token.stop_requested()) + co_return; // Exit early + co_await do_chunk(i); + } +} + +task example() +{ + // If failing_task throws, cancellable_work sees stop_requested + co_await when_all( + failing_task(), + cancellable_work() + ); +} +---- + +== Parent Stop Token + +`when_all` forwards the parent's stop token to children. If the parent is +cancelled, all children see the request: + +[source,cpp] +---- +task parent() +{ + // Parent has a stop token from run_async + co_await when_all( + child_a(), // Sees parent's stop token + child_b() // Sees parent's stop token + ); +} + +std::stop_source source; +run_async(ex, source.get_token())(parent()); + +// Later: cancel everything +source.request_stop(); +---- + +== Execution Model + +All child tasks inherit the parent's executor affinity: + +[source,cpp] +---- +task parent() // Running on executor ex +{ + co_await when_all( + child_a(), // Runs on ex + child_b() // Runs on ex + ); +} +---- + +Children are launched via `dispatch()` on the executor, which may run them +inline or queue them depending on the executor implementation. + +=== No Parallelism by Default + +With a single-threaded executor, tasks interleave but don't run truly in +parallel: + +[source,cpp] +---- +thread_pool pool(1); // Single thread +run_async(pool.get_executor())(parent()); + +// Tasks interleave at suspension points, but only one runs at a time +---- + +For true parallelism, use a multi-threaded pool: + +[source,cpp] +---- +thread_pool pool(4); // Four threads +run_async(pool.get_executor())(parent()); + +// Tasks may run on different threads +---- + +== Example: Parallel HTTP Fetches + +[source,cpp] +---- +task fetch(http_client& client, std::string url) +{ + co_return co_await client.get(url); +} + +task fetch_all(http_client& client) +{ + auto [home, about, contact] = co_await when_all( + fetch(client, "https://example.com/"), + fetch(client, "https://example.com/about"), + fetch(client, "https://example.com/contact") + ); + + std::cout << "Home: " << home.size() << " bytes\n"; + std::cout << "About: " << about.size() << " bytes\n"; + std::cout << "Contact: " << contact.size() << " bytes\n"; +} +---- + +== When NOT to Use when_all + +Use `when_all` when: + +* Operations are independent +* You want to reduce total wait time +* You need all results before proceeding + +Do NOT use `when_all` when: + +* Operations depend on each other — use sequential `co_await` +* You need results as they complete — consider `when_any` (not yet available) +* Memory is constrained — concurrent tasks consume more memory + +== Summary + +[cols="1,3"] +|=== +| Feature | Description + +| `when_all(tasks...)` +| Launch tasks concurrently, wait for all + +| Return type +| Tuple of non-void results in input order + +| Error handling +| First exception propagated, siblings get stop + +| Affinity +| Children inherit parent's executor + +| Stop propagation +| Parent and sibling stop tokens forwarded +|=== + +== Next Steps + +* xref:cancellation.adoc[Cancellation] — Stop token propagation +* xref:../execution/thread-pool.adoc[Thread Pool] — Multi-threaded execution +* xref:affinity.adoc[Executor Affinity] — Control where tasks run diff --git a/doc/modules/ROOT/pages/execution/contexts.adoc b/doc/modules/ROOT/pages/execution/contexts.adoc index ac5684b8..37ae2685 100644 --- a/doc/modules/ROOT/pages/execution/contexts.adoc +++ b/doc/modules/ROOT/pages/execution/contexts.adoc @@ -315,5 +315,5 @@ Do NOT use `execution_context` directly when: == Next Steps * xref:thread-pool.adoc[Thread Pool] — Using the thread pool execution context +* xref:strand.adoc[Strands] — Serializing coroutine execution * xref:frame-allocation.adoc[Frame Allocation] — Optimize coroutine memory -* xref:reference:boost/capy.adoc[API Reference] — Full reference documentation diff --git a/doc/modules/ROOT/pages/execution/executors.adoc b/doc/modules/ROOT/pages/execution/executors.adoc index 22923552..0f5fc649 100644 --- a/doc/modules/ROOT/pages/execution/executors.adoc +++ b/doc/modules/ROOT/pages/execution/executors.adoc @@ -225,4 +225,4 @@ Do NOT use executors directly when: == Next Steps * xref:contexts.adoc[Execution Contexts] — Service management and thread pools -* xref:reference:boost/capy.adoc[API Reference] — Full reference documentation +* xref:strand.adoc[Strands] — Serializing coroutine execution diff --git a/doc/modules/ROOT/pages/execution/frame-allocation.adoc b/doc/modules/ROOT/pages/execution/frame-allocation.adoc index 4aad52c1..c480575a 100644 --- a/doc/modules/ROOT/pages/execution/frame-allocation.adoc +++ b/doc/modules/ROOT/pages/execution/frame-allocation.adoc @@ -208,4 +208,4 @@ Do NOT use custom allocators when: == Next Steps * xref:../utilities/containers.adoc[Containers] — Type-erased storage -* xref:reference:boost/capy.adoc[API Reference] — Full reference documentation +* xref:../performance-tuning/high-performance-allocators.adoc[High-Performance Allocators] — System-wide memory optimization diff --git a/doc/modules/ROOT/pages/execution/strand.adoc b/doc/modules/ROOT/pages/execution/strand.adoc new file mode 100644 index 00000000..79e0b712 --- /dev/null +++ b/doc/modules/ROOT/pages/execution/strand.adoc @@ -0,0 +1,272 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + += Strands + +This page explains how to use strands to serialize coroutine execution. + +NOTE: Code snippets assume `using namespace boost::capy;` is in effect. + +== The Problem + +When multiple coroutines access shared state concurrently, you need +synchronization. Traditional mutexes block threads, wasting resources. +Strands provide an alternative: guarantee that only one coroutine runs +at a time without blocking. + +[source,cpp] +---- +// Without synchronization: data race +int counter = 0; + +task increment() +{ + ++counter; // UNSAFE: concurrent access + co_return; +} + +// Running on a thread pool with 4 threads +run_async(pool.get_executor())(when_all( + increment(), + increment(), + increment() +)); +---- + +== What is a Strand? + +A strand wraps an executor and ensures that coroutines dispatched through +it never run concurrently. At most one coroutine executes within the strand +at any given time: + +[source,cpp] +---- +#include + +thread_pool pool(4); +strand s(pool.get_executor()); + +// These coroutines will never run concurrently +s.post(coro1); +s.post(coro2); +s.post(coro3); +// They may run on different threads, but one at a time +---- + +== Creating a Strand + +Construct a strand by wrapping an existing executor: + +[source,cpp] +---- +thread_pool pool(4); + +// Create a strand from the pool's executor +strand s(pool.get_executor()); + +// Or use the deduction guide +strand s2{pool.get_executor()}; +---- + +The strand type is templated on the inner executor: + +[source,cpp] +---- +strand s(pool.get_executor()); +---- + +== Strand as Executor + +Strands satisfy the `Executor` concept and can be used anywhere an executor +is expected: + +[source,cpp] +---- +strand s(pool.get_executor()); + +// Use strand with run_async +run_async(s)(my_task()); + +// Use strand with run_on +co_await run_on(s, other_task()); +---- + +== Post vs Dispatch + +Strands provide two methods for submitting coroutines: + +[cols="1,3"] +|=== +| Method | Behavior + +| `post(h)` +| Always queues the coroutine, guaranteeing FIFO ordering + +| `dispatch(h)` +| If already in the strand, resumes immediately; otherwise queues +|=== + +=== FIFO Ordering with post + +Use `post` when ordering matters: + +[source,cpp] +---- +s.post(first); +s.post(second); +s.post(third); +// Execution order: first, second, third (guaranteed) +---- + +=== Inline Execution with dispatch + +Use `dispatch` for performance when ordering doesn't matter: + +[source,cpp] +---- +// If we're already in the strand, dispatch resumes inline +s.dispatch(continuation); // May run immediately +---- + +Dispatch provides symmetric transfer when the caller is already in the +strand's execution context, avoiding unnecessary queuing. + +== Protecting Shared State + +Use a strand to serialize access to shared data: + +[source,cpp] +---- +class counter +{ + strand strand_; + int value_ = 0; + +public: + explicit counter(thread_pool& pool) + : strand_(pool.get_executor()) + { + } + + // Increment must run on the strand + task increment() + { + co_await run_on(strand_, [this]() -> task { + ++value_; // Safe: only one coroutine at a time + co_return; + }()); + } + + // Read also runs on the strand + task get() + { + co_return co_await run_on(strand_, [this]() -> task { + co_return value_; + }()); + } +}; +---- + +== Strand Identity + +Strands are lightweight handles. Copies share the same serialization state: + +[source,cpp] +---- +strand s1(pool.get_executor()); +strand s2 = s1; // Same strand, same serialization + +s1.post(coro1); +s2.post(coro2); +// coro1 and coro2 are serialized with respect to each other +---- + +Compare strands to check if they serialize: + +[source,cpp] +---- +if (s1 == s2) +{ + // Same strand — coroutines will be serialized +} +---- + +== running_in_this_thread + +Check if the current thread is executing within a strand: + +[source,cpp] +---- +strand s(pool.get_executor()); + +void callback() +{ + if (s.running_in_this_thread()) + { + // We're in the strand — safe to access protected data + } + else + { + // Not in strand — need to post/dispatch + } +} +---- + +== Implementation Notes + +Capy's strand uses a fixed pool of 211 implementation objects. New strands +hash to select an impl from the pool. Strands that hash to the same index +share serialization: + +* This is harmless — just extra serialization +* Rare with 211 buckets +* No allocation for strand creation + +This design trades minimal extra serialization for zero per-strand allocation. + +== When NOT to Use Strands + +Use strands when: + +* Coroutines share mutable state +* You want to avoid blocking threads +* Operations are I/O-bound (not CPU-intensive) + +Do NOT use strands when: + +* You need fine-grained locking (use `async_mutex` instead) +* Operations are CPU-intensive — one long operation blocks others +* You need cross-context synchronization — strands are per-executor + +== Summary + +[cols="1,3"] +|=== +| Feature | Description + +| `strand` +| Wraps executor `Ex` with serialization + +| `post(h)` +| Always queue (strict FIFO) + +| `dispatch(h)` +| Inline if in strand, else queue + +| `running_in_this_thread()` +| Check if caller is in the strand + +| Copies +| Share serialization state +|=== + +== Next Steps + +* xref:../coroutines/affinity.adoc[Executor Affinity] — How `run_on` changes executors +* xref:thread-pool.adoc[Thread Pool] — The underlying executor diff --git a/doc/modules/ROOT/pages/execution/thread-pool.adoc b/doc/modules/ROOT/pages/execution/thread-pool.adoc index 6cba5ae5..0fb75944 100644 --- a/doc/modules/ROOT/pages/execution/thread-pool.adoc +++ b/doc/modules/ROOT/pages/execution/thread-pool.adoc @@ -295,3 +295,4 @@ void process_batch() * xref:contexts.adoc[Execution Contexts] — Service management details * xref:executors.adoc[Executors] — Executor concepts in depth +* xref:strand.adoc[Strands] — Serializing coroutine execution diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index 50e7d4b8..7fa7d2c8 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -10,8 +10,8 @@ = Boost.Capy Boost.Capy is a lightweight C++20 coroutine framework that provides lazy tasks -with automatic executor affinity propagation, along with buffer management, -compression, and cryptographic utilities. +with automatic executor affinity propagation, along with buffer management +and compression utilities. == What This Library Does @@ -23,12 +23,14 @@ Capy provides: * **Lazy tasks** that do not start until awaited or explicitly launched * **Automatic affinity propagation** through coroutine call chains +* **Concurrent composition** via `when_all` for parallel task execution * **Zero-overhead dispatcher protocol** for custom awaitables * **Frame allocation recycling** to minimize allocation overhead * **Thread pool** execution context with service management +* **Strand** serialization for concurrent executor access +* **Async mutex** for coroutine-friendly synchronization * **Buffer types** for efficient memory handling (`const_buffer`, `mutable_buffer`, sequences) * **Compression** support (Brotli and ZLib) -* **BCrypt password hashing** for secure credential storage == What This Library Does Not Do @@ -102,4 +104,5 @@ int main() * xref:quick-start.adoc[Quick Start] — Get a working program in 5 minutes * xref:coroutines/tasks.adoc[Tasks] — Understand lazy coroutines +* xref:coroutines/when-all.adoc[Concurrent Composition] — Run tasks in parallel with `when_all` * xref:execution/contexts.adoc[Execution Contexts] — Thread pools and services diff --git a/doc/modules/ROOT/pages/synchronization/async-mutex.adoc b/doc/modules/ROOT/pages/synchronization/async-mutex.adoc new file mode 100644 index 00000000..797afbd1 --- /dev/null +++ b/doc/modules/ROOT/pages/synchronization/async-mutex.adoc @@ -0,0 +1,284 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + += Async Mutex + +This page explains how to use `async_mutex` for coroutine-friendly mutual exclusion. + +NOTE: Code snippets assume `using namespace boost::capy;` is in effect. + +== The Problem + +Standard mutexes block the calling thread. In a coroutine context, this wastes +resources—a blocked thread could be running other coroutines. What you need is +a mutex that _suspends_ the coroutine instead of blocking the thread. + +[source,cpp] +---- +// BAD: Blocks the thread +std::mutex mtx; + +task bad_example() +{ + std::lock_guard lock(mtx); // Thread blocked while waiting + // ... critical section ... + co_return; +} + +// GOOD: Suspends the coroutine +async_mutex mtx; + +task good_example() +{ + co_await mtx.lock(); // Coroutine suspends, thread is free + // ... critical section ... + mtx.unlock(); + co_return; +} +---- + +== What is async_mutex? + +An `async_mutex` provides mutual exclusion for coroutines. When a coroutine +attempts to acquire a locked mutex, it suspends and joins a wait queue. When +the holder unlocks, the next waiter is resumed. + +[source,cpp] +---- +#include + +async_mutex mtx; + +task protected_operation() +{ + co_await mtx.lock(); + // Only one coroutine executes this section at a time + do_work(); + mtx.unlock(); +} +---- + +== Zero Allocation + +The wait queue uses intrusive linking—no heap allocation occurs when waiting. +The queue node is stored in the awaiter, which lives on the coroutine frame: + +---- +Coroutine Frame: [...state...] [awaiter with queue node] +---- + +This makes `async_mutex` suitable for high-frequency locking scenarios. + +== Basic Usage + +=== Manual Lock/Unlock + +[source,cpp] +---- +async_mutex mtx; + +task example() +{ + co_await mtx.lock(); + // Critical section + mtx.unlock(); +} +---- + +WARNING: Forgetting to call `unlock()` deadlocks all waiters. + +=== RAII with scoped_lock + +Use `scoped_lock()` for automatic unlocking: + +[source,cpp] +---- +async_mutex mtx; + +task example() +{ + auto guard = co_await mtx.scoped_lock(); + // Critical section + // Guard unlocks automatically on scope exit +} +---- + +The `lock_guard` is move-only and releases the lock when destroyed. + +== lock_guard + +The `lock_guard` class provides RAII semantics: + +[source,cpp] +---- +async_mutex::lock_guard guard = co_await mtx.scoped_lock(); + +// Move to extend lifetime +async_mutex::lock_guard g2 = std::move(guard); + +// Guard unlocks in destructor +---- + +A moved-from guard is empty and does not unlock. + +== Query Lock State + +Check if the mutex is currently held: + +[source,cpp] +---- +if (mtx.is_locked()) +{ + // Someone holds the lock +} +---- + +This is informational only—the state may change before you act on it. + +== Thread Safety + +IMPORTANT: `async_mutex` is NOT thread-safe. It is designed for single-threaded +use where multiple coroutines may contend for a resource. + +For multi-threaded scenarios, combine with a strand: + +[source,cpp] +---- +// All access through the same strand +strand s(pool.get_executor()); +async_mutex mtx; + +task multi_threaded_safe() +{ + co_await run_on(s, [&]() -> task { + auto guard = co_await mtx.scoped_lock(); + // Now safe: strand serializes, mutex excludes + co_return; + }()); +} +---- + +== Example: Protecting Shared State + +[source,cpp] +---- +class shared_counter +{ + async_mutex mtx_; + int value_ = 0; + +public: + task increment() + { + auto guard = co_await mtx_.scoped_lock(); + ++value_; + } + + task get() + { + auto guard = co_await mtx_.scoped_lock(); + co_return value_; + } +}; +---- + +== Example: Serializing I/O + +[source,cpp] +---- +class serial_writer +{ + async_mutex mtx_; + file& file_; + +public: + explicit serial_writer(file& f) : file_(f) {} + + task write(std::string_view data) + { + auto guard = co_await mtx_.scoped_lock(); + // Only one write at a time + co_await file_.async_write(data); + } +}; +---- + +== async_mutex vs Strand + +Both provide serialization, but differ in scope: + +[cols="1,2,2"] +|=== +| Feature | async_mutex | Strand + +| Scope +| Single resource +| All operations through the strand + +| Overhead +| Per-lock wait +| Per-operation dispatch + +| Use case +| Fine-grained locking +| Coarse-grained serialization + +| Thread safety +| Single-threaded only +| Multi-threaded safe +|=== + +Use `async_mutex` for protecting specific resources. Use strands for broader +serialization of all operations. + +== When NOT to Use async_mutex + +Use `async_mutex` when: + +* You need fine-grained mutual exclusion +* Lock contention is expected +* Critical sections are short + +Do NOT use `async_mutex` when: + +* Operations are multi-threaded — combine with a strand +* Critical sections are long — consider restructuring +* You need condition variable semantics — not yet available +* A strand provides sufficient serialization — simpler is better + +== Summary + +[cols="1,3"] +|=== +| Feature | Description + +| `async_mutex` +| Non-blocking mutex for coroutines + +| `lock()` +| Awaitable that acquires the mutex + +| `unlock()` +| Releases the mutex + +| `scoped_lock()` +| Returns `lock_guard` for RAII + +| `is_locked()` +| Query current state + +| `lock_guard` +| RAII wrapper for automatic unlock +|=== + +== Next Steps + +* xref:../execution/strand.adoc[Strands] — Coarse-grained serialization +* xref:../coroutines/when-all.adoc[Concurrent Composition] — Running tasks in parallel +* xref:../execution/thread-pool.adoc[Thread Pool] — Multi-threaded execution diff --git a/doc/modules/ROOT/pages/utilities/file-io.adoc b/doc/modules/ROOT/pages/utilities/file-io.adoc index d84d85d5..5e916c60 100644 --- a/doc/modules/ROOT/pages/utilities/file-io.adoc +++ b/doc/modules/ROOT/pages/utilities/file-io.adoc @@ -9,220 +9,216 @@ = File I/O -This page documents the file and path utilities in Capy. +This page documents the portable file operations in Capy. NOTE: Code snippets assume `using namespace boost::capy;` is in effect. == file -A platform-independent file handle: +A platform-independent file handle for reading and writing: [source,cpp] ---- +#include + file f("data.txt", file_mode::read); // Read data std::vector buf(1024); std::size_t n = f.read(buf.data(), buf.size()); -// Write data -f.write(data.data(), data.size()); - -// Query and seek +// Query file info std::uint64_t sz = f.size(); std::uint64_t pos = f.pos(); + +// Seek to position f.seek(100); ---- +=== Construction + +[source,cpp] +---- +// Open on construction +file f("data.txt", file_mode::read); + +// Default construct, then open +file f2; +f2.open("data.txt", file_mode::write); +---- + +Files automatically close when destroyed. + === File Modes -[cols="1,3"] +[cols="1,2,2"] |=== -| Mode | Description +| Mode | Access | Behavior | `file_mode::read` -| Open for reading (must exist) +| Read-only +| Must exist, random access + +| `file_mode::scan` +| Read-only +| Must exist, sequential access | `file_mode::write` -| Create or truncate for writing +| Read/Write +| Create or truncate, random access + +| `file_mode::write_new` +| Read/Write +| Must not exist, random access + +| `file_mode::write_existing` +| Read/Write +| Must exist, random access | `file_mode::append` -| Open for appending (create if needed) +| Write-only +| Create or truncate, sequential -| `file_mode::read_write` -| Open for both reading and writing +| `file_mode::append_existing` +| Write-only +| Must exist, sequential |=== -=== Error Handling - -Two error handling styles are available: +=== Reading [source,cpp] ---- -// Exception style -try { - file f("data.txt", file_mode::read); - f.read(buf, n); -} catch (system_error const& e) { - // Handle error -} - -// Error code style -system::error_code ec; -f.open("data.txt", file_mode::read, ec); -if (ec) { - // Handle error -} +std::size_t n = f.read(buffer, size); +// Returns bytes read (0 on EOF) ---- -=== Platform Notes +Read operations advance the file position. -The file class uses the native API on each platform: - -* **Windows**: Win32 API (`CreateFile`, `ReadFile`, etc.) -* **POSIX**: POSIX API (`open`, `read`, etc.) -* **Fallback**: Standard C library (`fopen`, `fread`, etc.) - -== path - -An owning, mutable path string with UTF-8 encoding: +=== Writing [source,cpp] ---- -path p("C:/Users/data.txt"); - -// Decomposition -path_view dir = p.parent_path(); // "C:/Users" -path_view name = p.filename(); // "data.txt" -path_view stem = p.stem(); // "data" -path_view ext = p.extension(); // ".txt" - -// Modification -p /= "subdir"; // Append with separator -p.replace_extension(".json"); - -// Native conversion (Windows) -std::wstring native = p.native_wstring(); +std::size_t n = f.write(data, size); +// Returns bytes written ---- -=== Internal Format +Write operations advance the file position. -Paths use forward slashes internally, regardless of platform: +=== Seeking [source,cpp] ---- -path p("C:\\Users\\data.txt"); // Input with backslashes -std::cout << p.string(); // "C:/Users/data.txt" +f.seek(offset); // Seek to absolute position +std::uint64_t p = f.pos(); // Get current position ---- -This enables cross-platform serialization. Native format conversion happens -at API boundaries. +Seeking is available in random-access modes (`read`, `write`, `write_new`, +`write_existing`). -=== Validation - -Paths are validated at construction time: +=== File Information [source,cpp] ---- -// Throws system_error on invalid path -path p("\0invalid"); // Embedded null - -// Non-throwing alternative -auto result = try_parse_path(input); -if (result) - use(*result); -else - handle_error(result.error()); +std::uint64_t sz = f.size(); // Total file size +bool open = f.is_open(); // Check if open ---- -=== Decomposition Reference +=== Native Handle -[cols="1,2,2"] -|=== -| Method | Example Input | Result +Access the platform-specific handle for advanced operations: -| `root_name()` -| `"C:/foo/bar"` -| `"C:"` +[source,cpp] +---- +auto handle = f.native_handle(); -| `root_directory()` -| `"C:/foo/bar"` -| `"/"` +// Set a handle (closes current file if open) +f.native_handle(some_handle); +---- -| `root_path()` -| `"C:/foo/bar"` -| `"C:/"` +The handle type varies by platform: -| `relative_path()` -| `"C:/foo/bar"` -| `"foo/bar"` +* **Windows:** `HANDLE` +* **POSIX:** `int` (file descriptor) +* **Fallback:** `FILE*` -| `parent_path()` -| `"C:/foo/bar"` -| `"C:/foo"` +== Error Handling -| `filename()` -| `"C:/foo/bar.txt"` -| `"bar.txt"` +Two error handling styles are available: -| `stem()` -| `"C:/foo/bar.txt"` -| `"bar"` +=== Exception Style -| `extension()` -| `"C:/foo/bar.txt"` -| `".txt"` -|=== +[source,cpp] +---- +try { + file f("data.txt", file_mode::read); + f.read(buf, n); +} catch (system::system_error const& e) { + std::cerr << "Error: " << e.what() << "\n"; +} +---- -== path_view +Methods throw `system::system_error` on failure. -A non-owning reference to a valid path string: +=== Error Code Style [source,cpp] ---- -void process(path_view p) +system::error_code ec; +f.open("data.txt", file_mode::read, ec); +if (ec) { - // Decomposition works the same - path_view name = p.filename(); + std::cerr << "Error: " << ec.message() << "\n"; + return; +} - // Convert to owning path if needed - path owned(p); +std::size_t n = f.read(buf, size, ec); +if (ec) +{ + // Handle error } ---- -Path views are validated at construction, just like paths. All decomposition -methods return path_view pointing into the original storage. - -== Path Generation +Error code overloads never throw. -[source,cpp] ----- -// Normalize: remove "." and "..", collapse separators -path normal = p.lexically_normal(); +== Platform Implementation -// Relative path from base to target -path rel = target.lexically_relative(base); +The `file` class automatically selects the best implementation: -// Same as relative, but returns target if not possible -path prox = target.lexically_proximate(base); ----- +* **Windows:** Uses Win32 API (`CreateFile`, `ReadFile`, `WriteFile`) +* **POSIX:** Uses POSIX API (`open`, `read`, `write`) +* **Fallback:** Uses standard C library (`fopen`, `fread`, `fwrite`) -== Iteration +All implementations provide the same interface. -Iterate over path components: +== Example: Copy File [source,cpp] ---- -path p("C:/foo/bar"); -for (path_view component : p) +void copy_file(char const* src, char const* dst) { - // "C:", "/", "foo", "bar" + file in(src, file_mode::read); + file out(dst, file_mode::write); + + char buf[4096]; + while (true) + { + std::size_t n = in.read(buf, sizeof(buf)); + if (n == 0) + break; + out.write(buf, n); + } } +---- -// Or iterate as string_view -for (std::string_view segment : p.segments()) +== Example: Append to Log + +[source,cpp] +---- +void log_message(char const* path, std::string_view msg) { - // Same components as string_view + file f(path, file_mode::append); + f.write(msg.data(), msg.size()); + f.write("\n", 1); } ---- @@ -236,19 +232,17 @@ for (std::string_view segment : p.segments()) | Platform-independent file handle | `file_mode` -| File open mode enumeration - -| `path` -| Owning UTF-8 path string +| Open mode enumeration (read, write, append, etc.) -| `path_view` -| Non-owning path reference +| Exception methods +| Throw on error -| `try_parse_path` -| Non-throwing path parsing +| Error code methods +| Return error via out parameter |=== == Next Steps +* xref:containers.adoc[Containers] — Type-erased containers * xref:../compression/brotli.adoc[Brotli] — High-ratio compression * xref:../compression/zlib.adoc[ZLib] — DEFLATE/gzip compression From 798ce5ffa13a6bfd7b66dc51d7d742fc67b7298d Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 18 Jan 2026 12:38:47 -0800 Subject: [PATCH 2/2] Remove async_op --- include/boost/capy.hpp | 1 - include/boost/capy/buffers/any_stream.hpp | 90 --- include/boost/capy/ex/async_op.hpp | 404 ------------- include/boost/capy/ex/run_async.hpp | 2 +- test/unit/ex/async_op.cpp | 11 - test/unit/task.cpp | 679 +--------------------- 6 files changed, 2 insertions(+), 1185 deletions(-) delete mode 100644 include/boost/capy/ex/async_op.hpp delete mode 100644 test/unit/ex/async_op.cpp diff --git a/include/boost/capy.hpp b/include/boost/capy.hpp index c351e82d..49775deb 100644 --- a/include/boost/capy.hpp +++ b/include/boost/capy.hpp @@ -12,7 +12,6 @@ #include #include -#include #include #include #include diff --git a/include/boost/capy/buffers/any_stream.hpp b/include/boost/capy/buffers/any_stream.hpp index 3e3b2afa..a71c543c 100644 --- a/include/boost/capy/buffers/any_stream.hpp +++ b/include/boost/capy/buffers/any_stream.hpp @@ -12,94 +12,4 @@ #include -#if 0 - -#include -#include -#include -#include -#include - -namespace boost { -namespace capy { - -/** Result of an asynchronous I/O operation. -*/ -struct io_result -{ - /** The error code, if any occurred. - */ - system::error_code ec; - - /** Number of bytes transferred. - */ - std::size_t bytes_transferred; -}; - -/** Asynchronous result type for I/O operations. -*/ -using async_io_result = capy::async_op; - -class any_stream -{ -public: - struct impl - { - virtual ~impl() = default; - - virtual auto read_some( - mutable_buffer) -> - async_io_result = 0; - - virtual auto write_some( - const_buffer) -> - async_io_result = 0; - }; - - /** Constructor - @param p A pointer to the stream implementation. - */ - any_stream( - std::shared_ptr p) noexcept - : impl_(std::move(p)) - { - } - - /** Read some data into the buffer. - - This coroutine reads some data into the buffer - and returns the number of bytes read. - - @param b The buffer to read into. - @return The asynchronous result. - */ - auto read_some( - mutable_buffer b) -> - async_io_result - { - return impl_->read_some(b); - } - - /** Write some data from the buffer. - This coroutine writes some data from the buffer - and returns the number of bytes written. - @param b The buffer to write from. - @return The asynchronous result. - */ - auto write_some( - const_buffer b) -> - async_io_result - { - return impl_->write_some(b); - } - -private: - std::shared_ptr impl_; -}; - -} // capy -} // boost - -#endif - #endif diff --git a/include/boost/capy/ex/async_op.hpp b/include/boost/capy/ex/async_op.hpp deleted file mode 100644 index a1667503..00000000 --- a/include/boost/capy/ex/async_op.hpp +++ /dev/null @@ -1,404 +0,0 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/capy -// - -#ifndef BOOST_CAPY_ASYNC_OP_HPP -#define BOOST_CAPY_ASYNC_OP_HPP - -#include - -#include -#include -#include -#include -#include -#include -#include - -namespace boost { -namespace capy { -namespace detail { - -template -struct async_op_impl_base -{ - virtual ~async_op_impl_base() = default; - virtual void start(std::function on_done) = 0; - virtual T get_result() = 0; -}; - -struct async_op_void_impl_base -{ - virtual ~async_op_void_impl_base() = default; - virtual void start(std::function on_done) = 0; - virtual void get_result() = 0; -}; - -template -struct async_op_impl : async_op_impl_base -{ - DeferredOp op_; - std::variant result_{}; - - explicit - async_op_impl(DeferredOp&& op) - : op_(std::forward(op)) - { - } - - void - start(std::function on_done) override - { - std::move(op_)( - [this, on_done = std::move(on_done)](auto&&... args) mutable - { - result_.template emplace<1>(T{std::forward(args)...}); - on_done(); - }); - } - - T - get_result() override - { - if (result_.index() == 0 && std::get<0>(result_)) - std::rethrow_exception(std::get<0>(result_)); - return std::move(std::get<1>(result_)); - } -}; - -template -struct async_op_void_impl : async_op_void_impl_base -{ - DeferredOp op_; - std::exception_ptr exception_{}; - - explicit - async_op_void_impl(DeferredOp&& op) - : op_(std::forward(op)) - { - } - - void - start(std::function on_done) override - { - std::move(op_)(std::move(on_done)); - } - - void - get_result() override - { - if (exception_) - std::rethrow_exception(exception_); - } -}; - -} // detail - -//----------------------------------------------------------------------------- - -/** An awaitable wrapper for callback-based asynchronous operations. - - This class template provides a bridge between traditional - callback-based asynchronous APIs and C++20 coroutines. It - wraps a deferred operation and makes it awaitable, allowing - seamless integration with coroutine-based code. - - @par Thread Safety - Distinct objects may be accessed concurrently. Shared objects - require external synchronization. - - @par Example - @code - // Wrap a callback-based timer - async_op async_sleep(std::chrono::milliseconds ms) - { - return make_async_op( - [ms](auto&& handler) { - // Start timer, call handler when done - start_timer(ms, std::move(handler)); - }); - } - - task example() - { - co_await async_sleep(std::chrono::milliseconds(100)); - } - @endcode - - @tparam T The type of value produced by the asynchronous operation. - - @see make_async_op, task -*/ -template -class async_op -{ - std::unique_ptr> impl_; - -// Workaround: clang fails to match friend function template declarations -#if defined(__clang__) && (__clang_major__ == 16 || \ - (defined(__apple_build_version__) && __apple_build_version__ >= 15000000)) -public: -#endif - explicit - async_op(std::unique_ptr> p) - : impl_(std::move(p)) - { - } -#if defined(__clang__) && (__clang_major__ == 16 || \ - (defined(__apple_build_version__) && __apple_build_version__ >= 15000000)) -private: -#endif - - template - requires (!std::is_void_v) - friend async_op - make_async_op(DeferredOp&& op); - -public: - /** Return whether the result is ready. - - @return Always returns false; the operation must be started. - */ - bool - await_ready() const noexcept - { - return false; - } - - /** Suspend the caller and start the operation. - - Initiates the asynchronous operation and arranges for - the caller to be resumed when it completes. - - @param h The coroutine handle of the awaiting coroutine. - */ - void - await_suspend(std::coroutine_handle<> h) - { - impl_->start([h]{ h.resume(); }); - } - - /** Suspend the caller with scheduler affinity (IoAwaitable protocol). - - Initiates the asynchronous operation and arranges for - the caller to be resumed through the executor when - it completes, maintaining scheduler affinity. - - @param h The coroutine handle of the awaiting coroutine. - @param ex The executor to resume through. - @param token The stop token for cancellation (currently unused). - */ - template - void - await_suspend(std::coroutine_handle<> h, Ex const& ex, std::stop_token = {}) - { - impl_->start([h, &ex]{ ex.dispatch(h).resume(); }); - } - - /** Return the result after completion. - - @return The value produced by the asynchronous operation. - - @throws Any exception that occurred during the operation. - */ - [[nodiscard]] - T - await_resume() - { - return impl_->get_result(); - } -}; - -//----------------------------------------------------------------------------- - -/** An awaitable wrapper for callback-based operations with no result. - - This specialization of async_op is used for asynchronous - operations that signal completion but do not produce a value, - such as timers, write operations, or connection establishment. - - @par Thread Safety - Distinct objects may be accessed concurrently. Shared objects - require external synchronization. - - @par Example - @code - // Wrap a callback-based timer - async_op async_sleep(std::chrono::milliseconds ms) - { - return make_async_op( - [ms](auto handler) { - start_timer(ms, [h = std::move(handler)]{ h(); }); - }); - } - - task example() - { - co_await async_sleep(std::chrono::milliseconds(100)); - } - @endcode - - @see async_op, make_async_op -*/ -template<> -class async_op -{ - std::unique_ptr impl_; - -// Workaround: clang fails to match friend function template declarations -#if defined(__clang__) && (__clang_major__ == 16 || \ - (defined(__apple_build_version__) && __apple_build_version__ >= 15000000)) -public: -#endif - explicit - async_op(std::unique_ptr p) - : impl_(std::move(p)) - { - } -#if defined(__clang__) && (__clang_major__ == 16 || \ - (defined(__apple_build_version__) && __apple_build_version__ >= 15000000)) -private: -#endif - - template - requires std::is_void_v - friend async_op - make_async_op(DeferredOp&& op); - -public: - /** Return whether the result is ready. - - @return Always returns false; the operation must be started. - */ - bool - await_ready() const noexcept - { - return false; - } - - /** Suspend the caller and start the operation. - - Initiates the asynchronous operation and arranges for - the caller to be resumed when it completes. - - @param h The coroutine handle of the awaiting coroutine. - */ - void - await_suspend(std::coroutine_handle<> h) - { - impl_->start([h]{ h.resume(); }); - } - - /** Suspend the caller with scheduler affinity (IoAwaitable protocol). - - Initiates the asynchronous operation and arranges for - the caller to be resumed through the executor when - it completes, maintaining scheduler affinity. - - @param h The coroutine handle of the awaiting coroutine. - @param ex The executor to resume through. - @param token The stop token for cancellation (currently unused). - */ - template - void - await_suspend(std::coroutine_handle<> h, Ex const& ex, std::stop_token = {}) - { - impl_->start([h, &ex]{ ex.dispatch(h).resume(); }); - } - - /** Complete the await and check for exceptions. - - @throws Any exception that occurred during the operation. - */ - void - await_resume() - { - impl_->get_result(); - } -}; - -//----------------------------------------------------------------------------- - -/** Return an async_op from a deferred operation. - - This factory function creates an awaitable async_op that - wraps a callback-based asynchronous operation. - - @par Example - @code - async_op async_read() - { - return make_async_op( - [](auto handler) { - // Simulate async read - handler("Hello, World!"); - }); - } - @endcode - - @tparam T The result type of the asynchronous operation. - - @param op A callable that accepts a completion handler. When invoked, - it should initiate the asynchronous operation and call the - handler with the result when complete. - - @return An async_op that can be awaited in a coroutine. - - @see async_op -*/ -template - requires (!std::is_void_v) -[[nodiscard]] -async_op -make_async_op(DeferredOp&& op) -{ - using impl_type = detail::async_op_impl>; - return async_op( - std::make_unique(std::forward(op))); -} - -/** Return an async_op from a deferred operation. - - This overload is used for operations that signal completion - without producing a value. - - @par Example - @code - async_op async_wait(int milliseconds) - { - return make_async_op( - [milliseconds](auto on_done) { - // Start timer, call on_done() when elapsed - start_timer(milliseconds, std::move(on_done)); - }); - } - @endcode - - @param op A callable that accepts a completion handler taking no - arguments. When invoked, it should initiate the operation - and call the handler when complete. - - @return An async_op that can be awaited in a coroutine. - - @see async_op -*/ -template - requires std::is_void_v -[[nodiscard]] -async_op -make_async_op(DeferredOp&& op) -{ - using impl_type = detail::async_op_void_impl>; - return async_op( - std::make_unique(std::forward(op))); -} - -} // capy -} // boost - -#endif diff --git a/include/boost/capy/ex/run_async.hpp b/include/boost/capy/ex/run_async.hpp index fe0ddb99..b84fdc54 100644 --- a/include/boost/capy/ex/run_async.hpp +++ b/include/boost/capy/ex/run_async.hpp @@ -343,7 +343,7 @@ class [[nodiscard]] run_async_wrapper // Setup task's continuation to return to trampoline task_h.promise().continuation_ = tr_.h_; task_h.promise().caller_ex_ = ex_; - task_h.promise().ex_ = ex_; // Used by awaited async_ops + task_h.promise().ex_ = ex_; task_h.promise().set_stop_token(st_); // Resume task through executor diff --git a/test/unit/ex/async_op.cpp b/test/unit/ex/async_op.cpp deleted file mode 100644 index e466bd9e..00000000 --- a/test/unit/ex/async_op.cpp +++ /dev/null @@ -1,11 +0,0 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/capy -// - -// Test that header file is self-contained. -#include diff --git a/test/unit/task.cpp b/test/unit/task.cpp index c9c78e35..9791c08e 100644 --- a/test/unit/task.cpp +++ b/test/unit/task.cpp @@ -10,7 +10,6 @@ // Test that header file is self-contained. #include -#include #include #include @@ -392,79 +391,6 @@ struct task_test } } - static async_op - async_returns_value() - { - return make_async_op( - [](auto cb) { - cb(123); - }); - } - - static async_op - async_with_delayed_completion() - { - return make_async_op( - [](auto cb) { - cb(456); - }); - } - - static task - task_awaits_async_op() - { - int v = co_await async_returns_value(); - co_return v + 1; - } - - static task - task_awaits_multiple_async_ops() - { - int v1 = co_await async_returns_value(); - int v2 = co_await async_with_delayed_completion(); - co_return v1 + v2; - } - - void - testTaskAwaitsAsyncResult() - { - // task awaits single async_op - needs run_async for executor - { - int dispatch_count = 0; - test_executor ex(dispatch_count); - int result = 0; - bool completed = false; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(task_awaits_async_op()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 124); - } - - // task awaits multiple async_ops - if (false) { - int dispatch_count = 0; - test_executor ex(dispatch_count); - int result = 0; - bool completed = false; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(task_awaits_multiple_async_ops()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 579); - } - } - void testAwaitReady() { @@ -566,176 +492,6 @@ struct task_test h.destroy(); } - static task - void_task_awaits_async_op() - { - int v = co_await async_returns_value(); - (void)v; - co_return; - } - - void - testVoidTaskAwaitsAsyncResult() - { - // Needs run_async since void_task_awaits_async_op awaits an async_op - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - - run_async(ex, - [&]() { completed = true; }, - [](std::exception_ptr) {})(void_task_awaits_async_op()); - - BOOST_TEST(completed); - } - - // Dispatcher tests using run_async - - static async_op - async_op_immediate(int value) - { - return make_async_op( - [value](auto cb) { - cb(value); - }); - } - - static task - task_with_async_for_affinity_test() - { - int v = co_await async_returns_value(); - co_return v + 1; - } - - void - testDispatcherUsedByAwait() - { - // Verify that executor is used when awaiting via run_async - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - int result = 0; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(task_with_async_for_affinity_test()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 124); - // Work should have been dispatched - BOOST_TEST_GE(dispatch_count, 1); - } - - static task - void_task_with_async_for_affinity_test() - { - auto v = co_await async_returns_value(); - (void)v; - co_return; - } - - void - testVoidTaskDispatcherUsedByAwait() - { - // Verify that executor is used for void tasks - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - - run_async(ex, - [&]() { completed = true; }, - [](std::exception_ptr) {})(void_task_with_async_for_affinity_test()); - - BOOST_TEST(completed); - // Work should have been dispatched - BOOST_TEST_GE(dispatch_count, 1); - } - - // Affinity propagation tests - - static task - inner_task_c() - { - co_return co_await async_returns_value(); - } - - static task - middle_task_b() - { - int v = co_await inner_task_c(); - co_return v + 1; - } - - static task - outer_task_a() - { - int v = co_await middle_task_b(); - co_return v + 1; - } - - void - testAffinityPropagation() - { - // Verify affinity propagates through task chain (ABC problem) - // The executor from run_async should be inherited by nested tasks - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - int result = 0; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(outer_task_a()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 125); // 123 + 1 + 1 - // All async completions should dispatch through the executor - BOOST_TEST_GE(dispatch_count, 1); - } - - static task - inner_void_task_c() - { - co_await async_returns_value(); - co_return; - } - - static task - middle_void_task_b() - { - co_await inner_void_task_c(); - co_return; - } - - static task - outer_void_task_a() - { - co_await middle_void_task_b(); - co_return; - } - - void - testAffinityPropagationVoid() - { - // Verify affinity propagates through void task chain - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - - run_async(ex, - [&]() { completed = true; }, - [](std::exception_ptr) {})(outer_void_task_a()); - - BOOST_TEST(completed); - BOOST_TEST_GE(dispatch_count, 1); - } - void testNoDispatcherRunsInline() { @@ -744,130 +500,6 @@ struct task_test BOOST_TEST_EQ(run_task(chained_tasks()), 25); } - // Affinity preservation tests with tracking executor - - void - testInheritedAffinityVerification() - { - // Test that child tasks actually use inherited affinity - // by checking that all resumptions go through the parent's executor - std::vector log; - int dispatch_count = 0; - tracking_executor ex(1, dispatch_count, &log); - - bool completed = false; - int result = 0; - - // Chain: outer -> middle -> inner - auto inner = []() -> task { - co_return co_await async_op_immediate(100); - }; - - auto middle = [inner]() -> task { - int v = co_await inner(); - co_return v + co_await async_op_immediate(10); - }; - - auto outer = [middle]() -> task { - int v = co_await middle(); - co_return v + co_await async_op_immediate(1); - }; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(outer()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 111); - // All three async_ops should have resumed through executor 1 - BOOST_TEST_GE(dispatch_count, 3); - for (int id : log) - BOOST_TEST_EQ(id, 1); - } - - void - testAffinityPreservedAcrossMultipleAwaits() - { - // Test that affinity is preserved across multiple co_await expressions - std::vector log; - int dispatch_count = 0; - tracking_executor ex(1, dispatch_count, &log); - - bool completed = false; - int result = 0; - - auto multi_await = []() -> task { - int sum = 0; - sum += co_await async_op_immediate(1); - sum += co_await async_op_immediate(2); - sum += co_await async_op_immediate(3); - sum += co_await async_op_immediate(4); - sum += co_await async_op_immediate(5); - co_return sum; - }; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(multi_await()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 15); - // 6 dispatches: 1 from run_async start + 5 from async_ops completing - BOOST_TEST_EQ(dispatch_count, 6); - BOOST_TEST_EQ(log.size(), 6u); - for (int id : log) - BOOST_TEST_EQ(id, 1); - } - - void - testAffinityWithNestedVoidTasks() - { - // Test affinity propagation through void task nesting - std::vector log; - int dispatch_count = 0; - tracking_executor ex(1, dispatch_count, &log); - - std::atomic counter{0}; - bool completed = false; - - auto leaf = [&counter]() -> task { - co_await async_op_immediate(0); - ++counter; - co_return; - }; - - auto branch = [leaf, &counter]() -> task { - co_await leaf(); - co_await async_op_immediate(0); - ++counter; - co_return; - }; - - auto root = [branch, &counter]() -> task { - co_await branch(); - co_await async_op_immediate(0); - ++counter; - co_return; - }; - - run_async(ex, - [&]() { completed = true; }, - [](std::exception_ptr) {})(root()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(counter.load(), 3); - // All async_ops should dispatch through the executor - BOOST_TEST_GE(dispatch_count, 3); - for (int id : log) - BOOST_TEST_EQ(id, 1); - } - void testFinalSuspendUsesDispatcher() { @@ -1034,64 +666,6 @@ struct task_test BOOST_TEST_EQ(result, 20); } - void - testAsyncRunWithAsyncOp() - { - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - int result = 0; - - auto task_with_async = []() -> task { - int v = co_await async_op_immediate(100); - co_return v + 1; - }; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(task_with_async()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 101); - BOOST_TEST_GE(dispatch_count, 1); - } - - void - testAsyncRunAffinityPropagation() - { - std::vector log; - int dispatch_count = 0; - tracking_executor ex(1, dispatch_count, &log); - bool completed = false; - int result = 0; - - auto inner = []() -> task { - co_return co_await async_op_immediate(50); - }; - - auto outer = [inner]() -> task { - int v = co_await inner(); - v += co_await async_op_immediate(5); - co_return v; - }; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(outer()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 55); - BOOST_TEST_GE(dispatch_count, 2); - for (int id : log) - BOOST_TEST_EQ(id, 1); - } - void testAsyncRunChained() { @@ -1138,40 +712,6 @@ struct task_test BOOST_TEST_EQ(error_msg, "specific error"); } - void - testAsyncRunDeeplyNested() - { - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - int result = 0; - - auto level3 = []() -> task { - co_return co_await async_op_immediate(1); - }; - - auto level2 = [level3]() -> task { - int v = co_await level3(); - co_return v + co_await async_op_immediate(10); - }; - - auto level1 = [level2]() -> task { - int v = co_await level2(); - co_return v + co_await async_op_immediate(100); - }; - - run_async(ex, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(level1()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 111); - BOOST_TEST_GE(dispatch_count, 3); - } - void testAsyncRunFireAndForget() { @@ -1326,97 +866,6 @@ struct task_test BOOST_TEST_EQ(id, 1); } - void - testAllocatorRestoredAfterAwait() - { - // Verify that TLS is restored after co_await, - // allowing child tasks created after await to use the correct allocator - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - - int alloc_count = 0; - int dealloc_count = 0; - std::vector alloc_log; - - tracking_frame_allocator alloc{1, &alloc_count, &dealloc_count, &alloc_log}; - - // Create a task that awaits an async_op, then creates a child task - auto child_after_await = []() -> task { - co_return 10; - }; - - auto parent = [child_after_await]() -> task { - // First await an async_op (simulates I/O) - int v1 = co_await async_op_immediate(5); - // After resume, TLS should be restored, so this child - // should use the same allocator - int v2 = co_await child_after_await(); - co_return v1 + v2; - }; - - int result = 0; - run_async(ex, std::stop_token{}, alloc, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(parent()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 15); - // At least one allocation should occur - BOOST_TEST_GE(alloc_count, 1); - // All allocations must use our allocator - for(int id : alloc_log) - BOOST_TEST_EQ(id, 1); - } - - void - testAllocatorRestoredAcrossMultipleAwaits() - { - // Verify TLS restoration across multiple sequential awaits - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - - int alloc_count = 0; - int dealloc_count = 0; - std::vector alloc_log; - - tracking_frame_allocator alloc{1, &alloc_count, &dealloc_count, &alloc_log}; - - auto make_child = [](int v) -> task { - co_return v; - }; - - auto parent = [make_child]() -> task { - int sum = 0; - // Each await should restore TLS before the next child creation - sum += co_await async_op_immediate(1); - sum += co_await make_child(10); - sum += co_await async_op_immediate(2); - sum += co_await make_child(20); - sum += co_await async_op_immediate(3); - sum += co_await make_child(30); - co_return sum; - }; - - int result = 0; - run_async(ex, std::stop_token{}, alloc, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(parent()); - - BOOST_TEST(completed); - BOOST_TEST_EQ(result, 66); // 1+10+2+20+3+30 - // All child tasks should use the same allocator - for(int id : alloc_log) - BOOST_TEST_EQ(id, 1); - } - void testDeeplyNestedAllocatorPropagation() { @@ -1466,50 +915,6 @@ struct task_test BOOST_TEST_EQ(id, 1); } - void - testAllocatorWithMixedTasksAndAsyncOps() - { - // Verify allocator works correctly with interleaved tasks and async_ops - int dispatch_count = 0; - test_executor ex(dispatch_count); - bool completed = false; - - int alloc_count = 0; - int dealloc_count = 0; - std::vector alloc_log; - - tracking_frame_allocator alloc{1, &alloc_count, &dealloc_count, &alloc_log}; - - auto compute = [](int x) -> task { - co_return x * 2; - }; - - auto complex_task = [compute]() -> task { - int v = 0; - // async_op -> task -> async_op -> task pattern - v += co_await async_op_immediate(1); - v += co_await compute(v); // Creates child task after I/O - v += co_await async_op_immediate(10); - v += co_await compute(v); // Creates another child after I/O - co_return v; - }; - - int result = 0; - run_async(ex, std::stop_token{}, alloc, - [&](int v) { - result = v; - completed = true; - }, - [](std::exception_ptr) {})(complex_task()); - - BOOST_TEST(completed); - // v = 0 + 1 = 1, then v = 1 + 2 = 3, then v = 3 + 10 = 13, then v = 13 + 26 = 39 - BOOST_TEST_EQ(result, 39); - // All allocations should use our allocator - for(int id : alloc_log) - BOOST_TEST_EQ(id, 1); - } - void testDeallocationCount() { @@ -1714,61 +1119,6 @@ struct task_test BOOST_TEST(all_same); } - void - testStopTokenReceivesStopSignal() - { - // This test manually sets up a task to demonstrate stop token propagation. - // We use a queuing executor for precise control over execution ordering. - std::queue pending; - queuing_executor ex(pending); - std::stop_source source; - - bool was_stoppable = false; - std::vector checkpoints; - - auto checkpoint_task = [&]() -> task { - auto token = co_await get_stop_token(); - was_stoppable = token.stop_possible(); - checkpoints.push_back(token.stop_requested()); // Checkpoint 0: before stop - - co_await async_op_immediate(0); // Yields control - - checkpoints.push_back(token.stop_requested()); // Checkpoint 1: after stop - }; - - // Create task and manually configure its promise - auto t = checkpoint_task(); - auto h = t.release(); - h.promise().set_stop_token(source.get_token()); - h.promise().ex_ = ex; - h.promise().caller_ex_ = ex; - h.promise().needs_dispatch_ = false; - - // Start task - runs until async_op suspends, then queues continuation - h.resume(); - - // Verify checkpoint 0 was captured - BOOST_TEST_EQ(checkpoints.size(), 1u); - BOOST_TEST(!checkpoints[0]); // Not stopped yet - - // Signal stop while task is suspended - source.request_stop(); - - // Resume task via queued continuation - BOOST_TEST(!pending.empty()); - pending.front().resume(); - pending.pop(); - - // Verify task saw the stop signal - BOOST_TEST_EQ(checkpoints.size(), 2u); - BOOST_TEST(checkpoints[1]); // Now stopped - - BOOST_TEST(was_stoppable); - - // Clean up - task completed, destroy the handle - h.destroy(); - } - void run() { @@ -1776,7 +1126,6 @@ struct task_test testException(); testTaskAwaitsTask(); testMoveOperations(); - testTaskAwaitsAsyncResult(); testAwaitReady(); // task tests @@ -1785,21 +1134,9 @@ struct task_test testVoidTaskAwaits(); testVoidTaskChain(); testVoidTaskMove(); - testVoidTaskAwaitsAsyncResult(); - - // executor tests (via run_async) - testDispatcherUsedByAwait(); - testVoidTaskDispatcherUsedByAwait(); - - // affinity propagation tests (ABC problem) - testAffinityPropagation(); - testAffinityPropagationVoid(); - testNoDispatcherRunsInline(); // affinity preservation tests - testInheritedAffinityVerification(); - testAffinityPreservedAcrossMultipleAwaits(); - testAffinityWithNestedVoidTasks(); + testNoDispatcherRunsInline(); testFinalSuspendUsesDispatcher(); // run_async() function tests @@ -1808,31 +1145,17 @@ struct task_test testAsyncRunTaskWithException(); testAsyncRunVoidTaskWithException(); testAsyncRunWithNestedAwaits(); - testAsyncRunWithAsyncOp(); - testAsyncRunAffinityPropagation(); testAsyncRunChained(); testAsyncRunErrorHandler(); - testAsyncRunDeeplyNested(); testAsyncRunFireAndForget(); testAsyncRunSingleHandler(); - // Memory allocation tests - skipped: allocator is currently ignored per design - // testAllocatorCapturedOnCreation(); - // testAllocatorUsedByChildTasks(); - // testAllocatorRestoredAfterAwait(); - // testAllocatorRestoredAcrossMultipleAwaits(); - // testDeeplyNestedAllocatorPropagation(); - // testAllocatorWithMixedTasksAndAsyncOps(); - // testDeallocationCount(); - // testFrameAllocationOrder(); - // get_stop_token() tests testGetStopTokenBasic(); testGetStopTokenWithSource(); testGetStopTokenPropagation(); testGetStopTokenInLoop(); testGetStopTokenMultipleCalls(); - testStopTokenReceivesStopSignal(); } };