diff --git a/doc/agent-guide.md b/doc/agent-guide.md new file mode 100644 index 0000000..d3992dd --- /dev/null +++ b/doc/agent-guide.md @@ -0,0 +1,17 @@ +--- +Boost.Corosio specific instructions +--- + +* Research: + +* Introduction + - Requirements: Familiarity with Boost.Capy and coroutines + +* First section is an introduction to TCP/IP networking + - https://archive.org/stream/TCPIPIllustratedVol.1TheProtocols1stEdition/TCP-IP%20Illustrated_djvu.txt + - TCP/IP only (no UDP) + +* Second section is an introduction to concurrent programming + - https://start-concurrent.github.io/full/index.html + - C++20 Coroutines + - Strands: Synchronization without mutexes diff --git a/doc/modules/ROOT/nav.adoc b/doc/modules/ROOT/nav.adoc index 2aae5b8..ce88955 100644 --- a/doc/modules/ROOT/nav.adoc +++ b/doc/modules/ROOT/nav.adoc @@ -5,13 +5,17 @@ ** xref:tutorials/http-client.adoc[HTTP Client] ** xref:tutorials/dns-lookup.adoc[DNS Lookup] * Guide +** xref:guide/tcp-networking.adoc[TCP/IP Networking] +** xref:guide/concurrent-programming.adoc[Concurrent Programming] ** xref:guide/io-context.adoc[I/O Context] ** xref:guide/sockets.adoc[Sockets] +** xref:guide/acceptor.adoc[Acceptors] ** xref:guide/endpoints.adoc[Endpoints] ** xref:guide/composed-operations.adoc[Composed Operations] ** xref:guide/timers.adoc[Timers] ** xref:guide/signals.adoc[Signal Handling] ** xref:guide/resolver.adoc[Name Resolution] +** xref:guide/tcp-server.adoc[TCP Server] ** xref:guide/tls.adoc[TLS Encryption] ** xref:guide/error-handling.adoc[Error Handling] ** xref:guide/buffers.adoc[Buffer Sequences] diff --git a/doc/modules/ROOT/pages/guide/acceptor.adoc b/doc/modules/ROOT/pages/guide/acceptor.adoc new file mode 100644 index 0000000..e6d79c6 --- /dev/null +++ b/doc/modules/ROOT/pages/guide/acceptor.adoc @@ -0,0 +1,319 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += Acceptors + +The `acceptor` class listens for incoming TCP connections and accepts them +into socket objects. It's the foundation for building TCP servers. + +NOTE: Code snippets assume: +[source,cpp] +---- +#include +#include +#include + +namespace corosio = boost::corosio; +---- + +== Overview + +An acceptor binds to a local endpoint and waits for clients to connect: + +[source,cpp] +---- +corosio::acceptor acc(ioc); +acc.listen(corosio::endpoint(8080)); // Listen on port 8080 + +corosio::socket peer(ioc); +auto [ec] = co_await acc.accept(peer); + +if (!ec) +{ + // peer is now a connected socket +} +---- + +== Construction + +Acceptors are constructed from an execution context or executor: + +[source,cpp] +---- +// From io_context +corosio::acceptor acc1(ioc); + +// From executor +auto ex = ioc.get_executor(); +corosio::acceptor acc2(ex); +---- + +The acceptor doesn't own system resources until `listen()` is called. + +== Listening + +=== listen() + +The `listen()` method creates a socket, binds to an endpoint, and begins +listening for connections: + +[source,cpp] +---- +acc.listen(corosio::endpoint(8080)); +---- + +This performs three operations: + +1. Creates an IPv4 TCP socket +2. Binds to the specified endpoint +3. Marks the socket as passive (listening) + +Throws `std::system_error` on failure. + +=== Parameters + +[source,cpp] +---- +void listen(endpoint ep, int backlog = 128); +---- + +The `backlog` parameter specifies the maximum queue length for pending +connections. When the queue is full, new connection attempts receive +`ECONNREFUSED`. The default of 128 works for most applications. + +=== Binding to All Interfaces + +To accept connections on any network interface: + +[source,cpp] +---- +// Port only - binds to 0.0.0.0 (all IPv4 interfaces) +acc.listen(corosio::endpoint(8080)); +---- + +=== Binding to a Specific Interface + +To accept connections only on a specific interface: + +[source,cpp] +---- +// Localhost only +acc.listen(corosio::endpoint( + boost::urls::ipv4_address::loopback(), 8080)); +---- + +== Accepting Connections + +=== accept() + +The `accept()` operation waits for and accepts an incoming connection: + +[source,cpp] +---- +corosio::socket peer(ioc); +auto [ec] = co_await acc.accept(peer); +---- + +On success, `peer` is initialized with the new connection. Any existing +connection on `peer` is closed first. + +The operation is asynchronous—your coroutine suspends until a connection +arrives or an error occurs. + +=== Errors + +Common accept errors: + +[cols="1,2"] +|=== +| Error | Meaning + +| `operation_canceled` +| Cancelled via `cancel()` or stop token + +| `bad_file_descriptor` +| Acceptor not listening + +| Resource errors +| System limit reached (file descriptors, memory) +|=== + +=== Preconditions + +* The acceptor must be listening (`is_open() == true`) +* The peer socket must be associated with the same execution context + +== Cancellation + +=== cancel() + +Cancel pending accept operations: + +[source,cpp] +---- +acc.cancel(); +---- + +All outstanding `accept()` operations complete with `operation_canceled`. + +=== Stop Token Cancellation + +Accept operations support `std::stop_token` through the affine awaitable +protocol: + +[source,cpp] +---- +// Inside a cancellable task: +auto [ec] = co_await acc.accept(peer); +if (ec == make_error_code(system::errc::operation_canceled)) + std::cout << "Accept cancelled\n"; +---- + +== Closing + +=== close() + +Release acceptor resources: + +[source,cpp] +---- +acc.close(); +---- + +Pending accept operations complete with `operation_canceled`. + +=== is_open() + +Check if the acceptor is listening: + +[source,cpp] +---- +if (acc.is_open()) + // Ready to accept +---- + +== Move Semantics + +Acceptors are move-only: + +[source,cpp] +---- +corosio::acceptor acc1(ioc); +corosio::acceptor acc2 = std::move(acc1); // OK + +corosio::acceptor acc3 = acc2; // Error: deleted copy constructor +---- + +Move assignment closes any existing acceptor: + +[source,cpp] +---- +acc1 = std::move(acc2); // Closes acc1's socket if open, then moves acc2 +---- + +IMPORTANT: Source and destination must share the same execution context. + +== Thread Safety + +[cols="1,2"] +|=== +| Operation | Thread Safety + +| Distinct acceptors +| Safe from different threads + +| Same acceptor +| NOT safe for concurrent operations +|=== + +Don't start multiple `accept()` operations concurrently on the same acceptor. + +== Example: Accept Loop + +A typical server accept loop: + +[source,cpp] +---- +capy::task accept_loop( + corosio::io_context& ioc, + corosio::acceptor& acc) +{ + for (;;) + { + corosio::socket peer(ioc); + auto [ec] = co_await acc.accept(peer); + + if (ec) + { + if (ec == make_error_code(system::errc::operation_canceled)) + break; // Shutdown requested + + std::cerr << "Accept error: " << ec.message() << "\n"; + continue; // Try again + } + + // Spawn a coroutine to handle this connection + capy::run_async(ioc.get_executor())( + handle_connection(std::move(peer))); + } +} +---- + +Key points: + +* Create a fresh socket for each accept +* Move the socket into the handler coroutine +* Continue accepting after non-fatal errors +* Check for cancellation to support graceful shutdown + +== Example: Graceful Shutdown + +Coordinate shutdown with signal handling: + +[source,cpp] +---- +capy::task run_server(corosio::io_context& ioc) +{ + corosio::acceptor acc(ioc); + acc.listen(corosio::endpoint(8080)); + + corosio::signal_set signals(ioc, SIGINT, SIGTERM); + + // Spawn accept loop + capy::run_async(ioc.get_executor())(accept_loop(ioc, acc)); + + // Wait for shutdown signal + auto [ec, signum] = co_await signals.async_wait(); + if (!ec) + { + std::cout << "Received signal " << signum << ", shutting down\n"; + acc.cancel(); // Stop accepting + // Existing connections continue until complete + } +} +---- + +== Relationship to tcp_server + +For production servers, consider using xref:tcp-server.adoc[tcp_server] which +provides: + +* Worker pool management +* Connection limiting +* Multi-port support +* Automatic coroutine lifecycle + +The `acceptor` class is the lower-level primitive that `tcp_server` builds +upon. + +== Next Steps + +* xref:sockets.adoc[Sockets] — Using accepted connections +* xref:tcp-server.adoc[TCP Server] — Higher-level server framework +* xref:../tutorials/echo-server.adoc[Echo Server Tutorial] — Complete example diff --git a/doc/modules/ROOT/pages/guide/concurrent-programming.adoc b/doc/modules/ROOT/pages/guide/concurrent-programming.adoc new file mode 100644 index 0000000..c5c7068 --- /dev/null +++ b/doc/modules/ROOT/pages/guide/concurrent-programming.adoc @@ -0,0 +1,434 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += Concurrent Programming + +Network servers often handle many clients simultaneously. This chapter explains +how Corosio supports concurrency using C++20 coroutines and the strand pattern +for safe shared state access. + +== The Concurrency Challenge + +When multiple operations run concurrently, they may access shared data. Without +synchronization, this leads to data races: + +[source,cpp] +---- +int counter = 0; + +// Thread 1 // Thread 2 +++counter; ++counter; +// Both read 0, both write 1 +// Expected: 2, Actual: 1 (data race) +---- + +Traditional solutions use mutexes: + +[source,cpp] +---- +std::mutex m; +{ + std::lock_guard lock(m); + ++counter; +} +---- + +Mutexes work but have drawbacks: + +* **Deadlock risk** — Taking multiple locks in different orders +* **Blocking** — Threads wait even when work is available +* **Scattered locking** — Every access site needs correct locking + +Corosio offers a better approach for I/O-bound code: coroutines with strands. + +== C++20 Coroutines + +A coroutine is a function that can suspend and resume execution. Unlike threads, +coroutines don't block the thread when waiting—they yield control back to a +scheduler. + +=== The Language Features + +C++20 adds three keywords: + +[cols="1,3"] +|=== +| Keyword | Purpose + +| `co_await` +| Suspend until an operation completes + +| `co_return` +| Complete the coroutine with a value + +| `co_yield` +| Produce a value and suspend (for generators) +|=== + +=== Coroutines vs Threads + +[cols="1,2,2"] +|=== +| Property | Threads | Coroutines + +| Scheduling +| Preemptive (OS) +| Cooperative (explicit yield) + +| Memory +| Fixed stack (often 1MB+) +| Minimal frame (as needed) + +| Creation cost +| Expensive (kernel call) +| Cheap (allocation) + +| Context switch +| Expensive (kernel) +| Cheap (save/restore frame) +|=== + +Coroutines excel for I/O-bound workloads where operations spend most time +waiting. A single thread can manage thousands of coroutines. + +=== Using Coroutines with Corosio + +Corosio operations return awaitables. You `co_await` them to get results: + +[source,cpp] +---- +capy::task handle_client(corosio::socket sock) +{ + char buf[1024]; + + auto [ec, n] = co_await sock.read_some( + capy::mutable_buffer(buf, sizeof(buf))); + + if (ec) + co_return; // Exit on error + + // Process data... +} +---- + +When `read_some` suspends, the thread can run other coroutines. When data +arrives, `handle_client` resumes—possibly on a different thread. + +== Executor Affinity + +A coroutine has _affinity_ to an executor—its resumptions go through that +executor. This matters for thread safety: + +[source,cpp] +---- +capy::run_async(ioc.get_executor())(my_coroutine()); +// my_coroutine resumes through ioc's executor +---- + +Corosio uses the _affine awaitable protocol_ to propagate this automatically. +When you `co_await` an I/O operation, it captures your executor and resumes +through it. + +See xref:../concepts/affine-awaitables.adoc[Affine Awaitables] for details. + +== Strands: Synchronization Without Mutexes + +A _strand_ guarantees that handlers posted to it don't run concurrently. Even +with multiple threads, strand operations execute one at a time: + +---- + ┌───────────────┐ +Thread A│ │ + │ ┌───┐ │ +Thread B│ │ S │───────│───────────→ Sequential execution + │ │ t │ │ +Thread C│ │ r │ │ + │ │ a │ │ +Thread D│ │ n │ │ + │ │ d │ │ + │ └───┘ │ + └───────────────┘ + Multiple No concurrent + threads handlers +---- + +=== Why Strands Are Better Than Mutexes + +With mutexes, you explicitly lock around shared data: + +[source,cpp] +---- +// Mutex approach +std::mutex m; + +void access_shared_data() +{ + std::lock_guard lock(m); + // Access data +} +---- + +Problems: + +* Every caller must remember to lock +* Calling another function while holding a lock risks deadlock +* Forgetting a lock causes subtle bugs + +With strands, you post all related work to the same strand: + +[source,cpp] +---- +// Strand approach +auto strand = asio::make_strand(ioc); + +void access_shared_data() +{ + asio::post(strand, [&] { + // Access data - no lock needed + }); +} +---- + +Benefits: + +* Serialization is structural, not per-access +* No deadlock risk +* Forgetting to use the strand causes immediate errors (wrong executor) + +=== Strands in Corosio + +While Corosio doesn't expose a standalone strand class, the pattern applies +through executor affinity. When a coroutine has affinity to an executor, +sequential `co_await`s naturally serialize: + +[source,cpp] +---- +capy::task session(corosio::socket sock) +{ + // All code in this coroutine runs sequentially + auto [ec, n] = co_await sock.read_some(buf); + // No other code in this coroutine runs until above completes + + co_await sock.write_some(response); + // Still sequential +} +---- + +For shared state across coroutines, ensure they share the same executor: + +[source,cpp] +---- +auto ex = ioc.get_executor(); + +// Both coroutines resume through the same executor +capy::run_async(ex)(coroutine_a(shared_state)); +capy::run_async(ex)(coroutine_b(shared_state)); +---- + +With a single-threaded `io_context` (concurrency hint = 1), these coroutines +can safely share state without locks. + +== The Event Loop Model + +Corosio uses an event loop that processes completions one at a time: + +[source,cpp] +---- +while (!stopped) +{ + wait_for_completion(); // OS notifies us + dispatch_handler(); // Resume coroutine +} +---- + +Each iteration either: + +* Waits for I/O completion +* Resumes a coroutine +* Processes a posted task + +This single-threaded processing means coroutines don't interleave within a +single `run()` call—only at `co_await` points. + +== Scaling with Multiple Threads + +For higher throughput, run multiple threads on the same `io_context`: + +[source,cpp] +---- +corosio::io_context ioc(4); // Hint: 4 threads + +std::vector threads; +for (int i = 0; i < 4; ++i) + threads.emplace_back([&ioc] { ioc.run(); }); + +for (auto& t : threads) + t.join(); +---- + +With multiple threads, coroutines may run on any thread. Two rules apply: + +1. **Same coroutine, sequential**: A coroutine's code between `co_await` + points never overlaps with itself + +2. **Different coroutines, concurrent**: Multiple coroutines can run + simultaneously on different threads + +For shared state across coroutines with multiple threads, use one of: + +* External synchronization (mutex, atomic) +* A dedicated single-thread executor for that state +* Message passing between coroutines + +== Design Patterns + +=== One Coroutine Per Connection + +The simplest pattern: each client gets a coroutine: + +[source,cpp] +---- +capy::task accept_loop( + corosio::io_context& ioc, + corosio::acceptor& acc) +{ + for (;;) + { + corosio::socket peer(ioc); + auto [ec] = co_await acc.accept(peer); + if (ec) break; + + // Spawn independent coroutine for this client + capy::run_async(ioc.get_executor())( + handle_client(std::move(peer))); + } +} +---- + +Each `handle_client` coroutine runs independently. The accept loop continues +immediately after spawning. + +=== Worker Pool + +For bounded resource usage, use a fixed pool of workers: + +[source,cpp] +---- +struct worker +{ + corosio::socket sock; + std::string buf; + bool in_use = false; + + explicit worker(corosio::io_context& ioc) : sock(ioc) {} +}; + +// Preallocate workers +std::vector workers; +workers.reserve(max_workers); +for (int i = 0; i < max_workers; ++i) + workers.emplace_back(ioc); + +// Assign connections to free workers +---- + +See xref:../tutorials/echo-server.adoc[Echo Server Tutorial] for a complete +example. + +=== Pipeline + +For multi-stage processing, chain coroutines: + +[source,cpp] +---- +capy::task pipeline(corosio::socket sock) +{ + auto message = co_await read_message(sock); + auto result = co_await process(message); + co_await write_response(sock, result); +} +---- + +Each stage suspends independently, allowing other coroutines to run. + +== Avoiding Common Mistakes + +=== Blocking in Coroutines + +Never block inside a coroutine: + +[source,cpp] +---- +// WRONG: blocks the entire io_context +capy::task bad() +{ + std::this_thread::sleep_for(1s); // Don't do this! +} + +// RIGHT: use async timer +capy::task good(corosio::io_context& ioc) +{ + corosio::timer t(ioc); + t.expires_after(1s); + co_await t.wait(); +} +---- + +=== Detached Coroutines + +Spawned coroutines must complete before their resources are destroyed: + +[source,cpp] +---- +// WRONG: socket destroyed while coroutine runs +{ + corosio::socket sock(ioc); + capy::run_async(ex)(use_socket(sock)); // Takes reference! +} // sock destroyed here, coroutine still running + +// RIGHT: move socket into coroutine +{ + corosio::socket sock(ioc); + capy::run_async(ex)(use_socket(std::move(sock))); +} // OK, coroutine owns the socket +---- + +=== Cross-Executor Access + +Don't access an object from a coroutine with different executor affinity: + +[source,cpp] +---- +// Dangerous: timer created on ex1, used from ex2 +corosio::timer timer(ctx1); +capy::run_async(ex2)([&timer]() -> capy::task { + co_await timer.wait(); // Wrong executor! +}); +---- + +Keep I/O objects with the coroutines that use them. + +== Summary + +Corosio's concurrency model: + +* **Coroutines** replace threads for I/O-bound work +* **Executor affinity** ensures resumption through the right executor +* **Sequential at suspend points** within a coroutine +* **Strand pattern** serializes access to shared state +* **Multiple threads** scale throughput when needed + +For most applications, single-threaded operation with multiple coroutines +provides excellent performance with simple, race-free code. + +== Next Steps + +* xref:io-context.adoc[I/O Context] — The event loop in detail +* xref:../concepts/affine-awaitables.adoc[Affine Awaitables] — How affinity propagates +* xref:../tutorials/echo-server.adoc[Echo Server] — Practical concurrency example diff --git a/doc/modules/ROOT/pages/guide/io-context.adoc b/doc/modules/ROOT/pages/guide/io-context.adoc index cba70cc..4a6a178 100644 --- a/doc/modules/ROOT/pages/guide/io-context.adoc +++ b/doc/modules/ROOT/pages/guide/io-context.adoc @@ -300,5 +300,6 @@ Future macOS support will use kqueue for: == Next Steps * xref:sockets.adoc[Sockets] — I/O with TCP sockets +* xref:acceptor.adoc[Acceptors] — Accept incoming connections * xref:timers.adoc[Timers] — Async delays and timeouts * xref:../concepts/affine-awaitables.adoc[Affine Awaitables] — The dispatch protocol diff --git a/doc/modules/ROOT/pages/guide/sockets.adoc b/doc/modules/ROOT/pages/guide/sockets.adoc index 68dcaac..3a2dccd 100644 --- a/doc/modules/ROOT/pages/guide/sockets.adoc +++ b/doc/modules/ROOT/pages/guide/sockets.adoc @@ -342,6 +342,7 @@ capy::task echo_client(corosio::io_context& ioc) == Next Steps +* xref:acceptor.adoc[Acceptors] — Accept incoming connections * xref:endpoints.adoc[Endpoints] — IP addresses and ports * xref:composed-operations.adoc[Composed Operations] — read() and write() -* xref:../tutorials/echo-server.adoc[Echo Server Tutorial] — Accept connections +* xref:../tutorials/echo-server.adoc[Echo Server Tutorial] — Server example diff --git a/doc/modules/ROOT/pages/guide/tcp-networking.adoc b/doc/modules/ROOT/pages/guide/tcp-networking.adoc new file mode 100644 index 0000000..782fbec --- /dev/null +++ b/doc/modules/ROOT/pages/guide/tcp-networking.adoc @@ -0,0 +1,308 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += TCP/IP Networking + +This chapter introduces the networking concepts you need to understand before +using Corosio. If you're already comfortable with TCP/IP, sockets, and the +client-server model, you can skip to xref:io-context.adoc[I/O Context]. + +== The Internet Protocol Suite + +The Internet uses a layered protocol architecture. Data passes through these +layers when sent across a network: + +[cols="1,2,2"] +|=== +| Layer | Purpose | Examples + +| Application +| Your program's logic +| HTTP, DNS, SMTP + +| Transport +| Reliable delivery between processes +| TCP, UDP + +| Internet +| Routing packets across networks +| IP (IPv4, IPv6) + +| Link +| Physical transmission +| Ethernet, Wi-Fi +|=== + +Corosio operates at the Transport layer, providing TCP sockets. Your +application logic sits above, sending and receiving data through Corosio's +abstractions. + +== TCP: The Transmission Control Protocol + +TCP provides _reliable, ordered, byte-stream_ delivery between two endpoints. + +=== Reliable + +TCP guarantees that data arrives. If a packet is lost, TCP automatically +retransmits it. Your application never sees missing data. + +=== Ordered + +Data arrives in the order it was sent. If packets arrive out of order, TCP +reassembles them before delivering to your application. + +=== Byte Stream + +TCP delivers a continuous stream of bytes with no inherent message boundaries. +If you send "Hello" followed by "World", the receiver might read "HelloWorld" +in one operation, or "Hel" then "loWorld", or any other split. + +This has important implications for protocol design. You must define your own +message framing—typically using: + +* **Length prefixes**: Send the message size before the message body +* **Delimiters**: End each message with a special character (like `\n`) +* **Fixed-size messages**: All messages have the same length + +== The Client-Server Model + +Most network applications follow the client-server pattern: + +---- +┌────────┐ connect ┌────────┐ +│ Client │ ───────────────→│ Server │ +└────────┘ └────────┘ + │ + listening + on a port +---- + +**Server**: Listens on a well-known port, waiting for clients to connect. When +a client connects, the server creates a new socket for that specific +connection. + +**Client**: Initiates a connection to a server's IP address and port. Once +connected, the client and server communicate as peers. + +=== Connection Lifecycle + +A TCP connection goes through these stages: + +1. **Server Setup** + - Create a socket + - Bind to a local address and port + - Begin listening for connections + +2. **Client Connects** + - Create a socket + - Connect to the server's address and port + - TCP performs a three-way handshake + +3. **Data Exchange** + - Both sides can send and receive data + - Data flows as a bidirectional byte stream + +4. **Connection Close** + - Either side initiates close + - TCP performs a four-way handshake + - Resources are released + +== Addresses and Ports + +Every TCP connection is identified by four values: + +* Source IP address +* Source port +* Destination IP address +* Destination port + +=== IP Addresses + +An IP address identifies a host on the network. + +**IPv4 addresses** are 32 bits, written as four decimal numbers separated by +dots: `192.168.1.100`. There are about 4 billion possible addresses. + +**IPv6 addresses** are 128 bits, written as eight groups of hexadecimal digits +separated by colons: `2001:0db8:0000:0000:0000:0000:0000:0001` (often shortened +to `2001:db8::1`). + +=== Special Addresses + +[cols="1,2"] +|=== +| Address | Meaning + +| `127.0.0.1` (IPv4) +| Loopback—the local machine + +| `::1` (IPv6) +| Loopback—the local machine + +| `0.0.0.0` (IPv4) +| All interfaces (for binding) + +| `::` (IPv6) +| All interfaces (for binding) +|=== + +=== Ports + +A port is a 16-bit number (0–65535) that identifies an application on a host. +Well-known services use standard ports: + +[cols="1,2"] +|=== +| Port | Service + +| 80 +| HTTP + +| 443 +| HTTPS + +| 22 +| SSH + +| 25 +| SMTP (email) +|=== + +Ports below 1024 typically require administrator privileges to bind. + +== Endpoints in Corosio + +Corosio represents addresses and ports using the `endpoint` class: + +[source,cpp] +---- +// Port only (all interfaces) +corosio::endpoint server_ep(8080); + +// IPv4 address and port +corosio::endpoint client_ep( + boost::urls::ipv4_address::loopback(), 8080); + +// IPv6 address and port +corosio::endpoint ipv6_ep( + boost::urls::ipv6_address::loopback(), 8080); +---- + +== Name Resolution + +Humans use domain names like `www.example.com`, but TCP requires IP addresses. +The Domain Name System (DNS) translates names to addresses. + +Corosio provides the `resolver` class for asynchronous DNS lookups: + +[source,cpp] +---- +corosio::resolver r(ioc); +auto [ec, results] = co_await r.resolve("www.example.com", "https"); + +for (auto const& entry : results) +{ + auto ep = entry.get_endpoint(); + // Try connecting to ep... +} +---- + +A single hostname may resolve to multiple addresses (for load balancing or +redundancy). Your application should try each address until one succeeds. + +== TCP vs UDP + +Corosio currently supports only TCP. Here's why TCP is often the right choice: + +[cols="1,2,2"] +|=== +| Property | TCP | UDP + +| Reliability +| Guaranteed delivery +| Best effort + +| Ordering +| Preserved +| Not guaranteed + +| Connection +| Connection-oriented +| Connectionless + +| Use cases +| Web, email, file transfer +| Video streaming, gaming, DNS +|=== + +TCP's reliability and ordering come at the cost of latency (retransmissions +take time) and overhead (connection setup, acknowledgments). For most +applications—especially those involving requests and responses—TCP's +guarantees outweigh these costs. + +== What Corosio Provides + +Corosio wraps the complexity of TCP programming in a coroutine-friendly API: + +* **socket** — Connect to servers, send and receive data +* **acceptor** — Listen for and accept incoming connections +* **resolver** — Translate hostnames to IP addresses +* **endpoint** — Represent addresses and ports + +All operations are asynchronous and return awaitables. You don't manage raw +socket handles or deal with platform-specific APIs directly. + +== Common Pitfalls + +=== Partial Reads and Writes + +TCP's byte-stream nature means `read_some()` may return fewer bytes than you +requested, and `write_some()` may send fewer bytes than you provided. + +Always loop or use composed operations (`read()`, `write()`) when you need +exact amounts: + +[source,cpp] +---- +// Wrong: might read less than buffer size +auto [ec, n] = co_await sock.read_some(buf); + +// Right: reads until buffer is full or EOF +auto [ec, n] = co_await corosio::read(sock, buf); +---- + +=== Connection Refused + +If no server is listening on the target port, connect fails with +`connection_refused`. Always handle this error—it's common during development +and when servers restart. + +=== Address Already in Use + +A server that terminates and immediately restarts may fail to bind because the +OS keeps the old socket in `TIME_WAIT` state. Production servers typically set +socket options to allow address reuse. + +=== Blocking the Event Loop + +Long-running computations in a coroutine block other operations. For CPU-bound +work, dispatch to a separate thread pool. + +== Further Reading + +For a deeper understanding of TCP/IP: + +* _TCP/IP Illustrated, Volume 1_ by W. Richard Stevens—the classic reference +* _Unix Network Programming_ by W. Richard Stevens—practical socket programming + +== Next Steps + +* xref:concurrent-programming.adoc[Concurrent Programming] — Coroutines and strands +* xref:io-context.adoc[I/O Context] — The event loop +* xref:sockets.adoc[Sockets] — Socket operations in detail diff --git a/doc/modules/ROOT/pages/guide/tcp-server.adoc b/doc/modules/ROOT/pages/guide/tcp-server.adoc new file mode 100644 index 0000000..df3c070 --- /dev/null +++ b/doc/modules/ROOT/pages/guide/tcp-server.adoc @@ -0,0 +1,380 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += TCP Server + +The `tcp_server` class provides a framework for building TCP servers with +connection pooling. It manages acceptors, worker pools, and connection +lifecycle automatically. + +NOTE: Code snippets assume: +[source,cpp] +---- +#include +#include +#include + +namespace corosio = boost::corosio; +namespace capy = boost::capy; +---- + +== Overview + +`tcp_server` is a base class designed for inheritance. You derive from it, +define your worker type, and implement the connection handling logic. The +framework handles: + +* Listening on multiple ports +* Accepting connections +* Worker pool management +* Coroutine lifecycle + +[source,cpp] +---- +class echo_server : public corosio::tcp_server +{ + struct worker : worker_base + { + std::string buf; + + explicit worker(corosio::io_context& ioc) + : worker_base(ioc) + { + buf.reserve(4096); + } + + void run(launcher launch) override + { + launch(sock.context().get_executor(), do_echo()); + } + + capy::task do_echo(); + }; + +public: + echo_server(corosio::io_context& ioc) + : tcp_server(ioc, ioc.get_executor()) + { + wv_.reserve(100); + for (int i = 0; i < 100; ++i) + wv_.emplace(ioc); + } +}; +---- + +== The Worker Pattern + +Workers are preallocated objects that handle connections. Each worker contains +a socket and any state needed for a session. + +=== worker_base + +The `worker_base` class is the foundation: + +[source,cpp] +---- +class worker_base +{ +public: + corosio::socket sock; + + virtual ~worker_base() = default; + virtual void run(launcher launch) = 0; + +protected: + explicit worker_base(capy::execution_context& ctx); +}; +---- + +Your worker inherits from `worker_base` and implements `run()`: + +[source,cpp] +---- +struct my_worker : tcp_server::worker_base +{ + std::string request_buf; + std::string response_buf; + + explicit my_worker(corosio::io_context& ioc) + : worker_base(ioc) + {} + + void run(launcher launch) override + { + launch(sock.context().get_executor(), handle_connection()); + } + + capy::task handle_connection() + { + // Handle the connection using sock + // Worker is automatically returned to pool when coroutine ends + } +}; +---- + +=== The workers Container + +The `workers` class manages the worker pool: + +[source,cpp] +---- +class workers +{ +public: + template + T& emplace(Args&&... args); + + void reserve(std::size_t n); + std::size_t size() const noexcept; +}; +---- + +Use `emplace()` to add workers during construction: + +[source,cpp] +---- +my_server(corosio::io_context& ioc) + : tcp_server(ioc, ioc.get_executor()) +{ + wv_.reserve(max_workers); + for (int i = 0; i < max_workers; ++i) + wv_.emplace(ioc); +} +---- + +Workers are stored polymorphically, allowing different worker types if needed. + +== The Launcher + +When a connection is accepted, `tcp_server` calls your worker's `run()` +method with a `launcher` object. The launcher manages the coroutine lifecycle: + +[source,cpp] +---- +void run(launcher launch) override +{ + // Create and launch the session coroutine + launch(executor, my_coroutine()); +} +---- + +The launcher: + +1. Starts your coroutine on the specified executor +2. Tracks the worker as in-use +3. Returns the worker to the pool when the coroutine completes + +You must call the launcher exactly once. Failure to call it returns the +worker immediately. Calling it multiple times throws `std::logic_error`. + +=== Launcher Signature + +[source,cpp] +---- +template +void operator()(Executor const& ex, capy::task task); +---- + +The executor determines where the coroutine runs. Typically you use the +socket's executor: + +[source,cpp] +---- +launch(sock.context().get_executor(), handle_connection()); +---- + +== Binding and Starting + +=== bind() + +Bind to a local endpoint: + +[source,cpp] +---- +auto ec = server.bind(corosio::endpoint(8080)); +if (ec) + std::cerr << "Bind failed: " << ec.message() << "\n"; +---- + +You can bind to multiple ports: + +[source,cpp] +---- +server.bind(corosio::endpoint(80)); +server.bind(corosio::endpoint(443)); +---- + +=== start() + +Begin accepting connections: + +[source,cpp] +---- +server.start(); +---- + +After `start()`, the server: + +1. Listens on all bound ports +2. Accepts incoming connections +3. Assigns connections to available workers +4. Calls each worker's `run()` method + +The accept loop runs until the `io_context` stops. + +== Complete Example + +[source,cpp] +---- +#include +#include +#include +#include +#include +#include +#include + +namespace corosio = boost::corosio; +namespace capy = boost::capy; + +class echo_server : public corosio::tcp_server +{ + struct worker : worker_base + { + std::string buf; + + explicit worker(corosio::io_context& ioc) + : worker_base(ioc) + { + buf.reserve(4096); + } + + void run(launcher launch) override + { + launch(sock.context().get_executor(), do_session()); + } + + capy::task do_session() + { + for (;;) + { + buf.resize(4096); + auto [ec, n] = co_await sock.read_some( + capy::mutable_buffer(buf.data(), buf.size())); + + if (ec || n == 0) + break; + + buf.resize(n); + auto [wec, wn] = co_await corosio::write( + sock, capy::const_buffer(buf.data(), buf.size())); + + if (wec) + break; + } + + sock.close(); + } + }; + +public: + echo_server(corosio::io_context& ioc, int max_workers) + : tcp_server(ioc, ioc.get_executor()) + { + wv_.reserve(max_workers); + for (int i = 0; i < max_workers; ++i) + wv_.emplace(ioc); + } +}; + +int main() +{ + corosio::io_context ioc; + + echo_server server(ioc, 100); + + auto ec = server.bind(corosio::endpoint(8080)); + if (ec) + { + std::cerr << "Bind failed: " << ec.message() << "\n"; + return 1; + } + + std::cout << "Echo server listening on port 8080\n"; + + server.start(); + ioc.run(); +} +---- + +== Design Considerations + +=== Why a Worker Pool? + +A worker pool provides: + +* **Bounded resources**: Fixed maximum connections +* **No per-connection allocation**: Sockets and buffers preallocated +* **Simple lifecycle**: Workers cycle between idle and active states + +=== Worker Reuse + +When a session coroutine completes, its worker automatically returns to the +idle pool. The next accepted connection receives this worker. Ensure your +worker's state is properly reset between connections: + +[source,cpp] +---- +capy::task do_session() +{ + // Reset state at session start + request_.clear(); + response_.clear(); + + // ... handle connection ... + + // Socket closed, worker returns to pool +} +---- + +=== Multiple Ports + +`tcp_server` can listen on multiple ports simultaneously. All ports share +the same worker pool: + +[source,cpp] +---- +server.bind(corosio::endpoint(80)); // HTTP +server.bind(corosio::endpoint(443)); // HTTPS +server.start(); +---- + +=== Connection Rejection + +When all workers are busy, the server cannot accept new connections until +a worker becomes available. The TCP listen backlog holds pending connections +during this time. + +For high-traffic scenarios, size your worker pool appropriately or implement +connection limits at a higher layer. + +== Thread Safety + +The `tcp_server` class is not thread-safe. All operations on the server +must occur from coroutines running on its `io_context`. Workers may not be +accessed concurrently. + +For multi-threaded operation, create one server per thread, or use external +synchronization. + +== Next Steps + +* xref:sockets.adoc[Sockets] — Socket operations +* xref:concurrent-programming.adoc[Concurrent Programming] — Coroutine patterns +* xref:../tutorials/echo-server.adoc[Echo Server Tutorial] — Simpler approach diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index 7c760cc..777ca5a 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -22,6 +22,7 @@ executor without manual dispatch. * **io_context** — Event loop for processing asynchronous operations * **socket** — Asynchronous TCP socket with connect, read, and write * **acceptor** — TCP listener for accepting incoming connections +* **tcp_server** — Server framework with worker pools * **resolver** — Asynchronous DNS resolution * **timer** — Asynchronous timer for delays and timeouts * **signal_set** — Asynchronous signal handling @@ -55,6 +56,18 @@ on error instead. dispatchers internally. The indirection cost is negligible compared to I/O latency. +== Target Audience + +Corosio is designed for C++ developers who want to build network applications +using modern asynchronous programming patterns. This documentation assumes: + +* **Familiarity with Boost.Capy** — task types, executors, buffer sequences +* **Understanding of C++20 coroutines** — `co_await`, `co_return`, awaitables +* **Basic TCP/IP networking concepts** — clients, servers, ports, connections + +If you're new to these topics, see xref:guide/tcp-networking.adoc[TCP/IP Networking] +and xref:guide/concurrent-programming.adoc[Concurrent Programming] for background. + == Requirements * C++20 compiler with coroutine support @@ -72,6 +85,19 @@ latency. * Linux (planned: io_uring) * macOS (planned: kqueue) +== Code Convention + +NOTE: Code examples in this documentation assume these declarations are in effect: +[source,cpp] +---- +#include +#include +#include + +namespace corosio = boost::corosio; +namespace capy = boost::capy; +---- + == Quick Example [source,cpp] @@ -119,5 +145,7 @@ int main() == Next Steps * xref:quick-start.adoc[Quick Start] — Build a working echo server +* xref:guide/tcp-networking.adoc[TCP/IP Networking] — Networking fundamentals +* xref:guide/concurrent-programming.adoc[Concurrent Programming] — Coroutines and strands * xref:guide/io-context.adoc[I/O Context] — Understand the event loop * xref:guide/sockets.adoc[Sockets] — Learn socket operations in detail diff --git a/doc/modules/ROOT/pages/quick-start.adoc b/doc/modules/ROOT/pages/quick-start.adoc index 6568acc..e85a86f 100644 --- a/doc/modules/ROOT/pages/quick-start.adoc +++ b/doc/modules/ROOT/pages/quick-start.adoc @@ -188,6 +188,8 @@ failed. Now that you have a working echo server: +* xref:guide/tcp-networking.adoc[TCP/IP Networking] — Networking fundamentals +* xref:guide/concurrent-programming.adoc[Concurrent Programming] — Coroutines and strands * xref:tutorials/http-client.adoc[HTTP Client Tutorial] — Make HTTP requests * xref:guide/io-context.adoc[I/O Context Guide] — Understand the event loop * xref:guide/sockets.adoc[Sockets Guide] — Deep dive into socket operations diff --git a/doc/modules/ROOT/pages/reference/design-rationale.adoc b/doc/modules/ROOT/pages/reference/design-rationale.adoc index e23a8de..bc1885c 100644 --- a/doc/modules/ROOT/pages/reference/design-rationale.adoc +++ b/doc/modules/ROOT/pages/reference/design-rationale.adoc @@ -95,8 +95,12 @@ negligible compared to actual I/O latency (microseconds vs. nanoseconds). ---- io_object + ├── acceptor + ├── resolver + ├── timer + ├── signal_set └── io_stream - └── socket + ├── socket └── wolfssl_stream ---- diff --git a/doc/modules/ROOT/pages/reference/glossary.adoc b/doc/modules/ROOT/pages/reference/glossary.adoc index 0bac024..ad5b191 100644 --- a/doc/modules/ROOT/pages/reference/glossary.adoc +++ b/doc/modules/ROOT/pages/reference/glossary.adoc @@ -15,7 +15,7 @@ This glossary defines terms used throughout the Corosio documentation. Acceptor:: An I/O object that listens for and accepts incoming TCP connections. See -`corosio::acceptor`. +`corosio::acceptor` and xref:../guide/acceptor.adoc[Acceptors Guide]. Affine Awaitable:: An awaitable type that implements the affine protocol, receiving a dispatcher @@ -213,6 +213,11 @@ An I/O object for TCP network communication. See `corosio::socket`. Stop Token:: A mechanism for requesting cancellation. See `std::stop_token`. +Strand:: +A serialization mechanism that ensures handlers don't run concurrently. +Operations posted to a strand execute one at a time, eliminating data +races without mutexes. See xref:../guide/concurrent-programming.adoc[Concurrent Programming]. + Stream:: A sequence of bytes that can be read or written incrementally. @@ -231,6 +236,10 @@ stack growth. Task:: A lazy coroutine that produces a value. See `capy::task`. +TCP Server:: +A framework class for building TCP servers with worker pools. See +`corosio::tcp_server` and xref:../guide/tcp-server.adoc[TCP Server Guide]. + Thread Safety:: The ability to use an object safely from multiple threads. Individual I/O objects are generally not thread-safe. @@ -257,6 +266,11 @@ A compact TLS library used by Corosio for secure streams. Work:: Pending operations that keep an I/O context running. +Worker Pool:: +A design pattern where a fixed number of worker objects are preallocated +to handle connections. Provides bounded resource usage and avoids allocation +during operation. See xref:../guide/tcp-server.adoc[TCP Server]. + == See Also * xref:design-rationale.adoc[Design Rationale] — Why Corosio is designed this way diff --git a/include/boost/corosio/acceptor.hpp b/include/boost/corosio/acceptor.hpp index 01cd10b..db2b2df 100644 --- a/include/boost/corosio/acceptor.hpp +++ b/include/boost/corosio/acceptor.hpp @@ -16,8 +16,8 @@ #include #include #include -#include -#include +#include +#include #include #include @@ -97,23 +97,23 @@ class BOOST_COROSIO_DECL acceptor : public io_object return {ec_}; } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d) -> std::coroutine_handle<> + Ex const& ex) -> std::coroutine_handle<> { - acc_.get().accept(h, d, token_, &ec_, &peer_impl_); + acc_.get().accept(h, ex, token_, &ec_, &peer_impl_); return std::noop_coroutine(); } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d, + Ex const& ex, std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); - acc_.get().accept(h, d, token_, &ec_, &peer_impl_); + acc_.get().accept(h, ex, token_, &ec_, &peer_impl_); return std::noop_coroutine(); } }; @@ -137,10 +137,10 @@ class BOOST_COROSIO_DECL acceptor : public io_object @param ex The executor whose context will own the acceptor. */ - template - requires (!std::same_as, acceptor>) && - capy::executor - explicit acceptor(Executor const& ex) + template + requires (!std::same_as, acceptor>) && + capy::Executor + explicit acceptor(Ex const& ex) : acceptor(ex.context()) { } @@ -268,7 +268,7 @@ class BOOST_COROSIO_DECL acceptor : public io_object { virtual void accept( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::stop_token, system::error_code*, io_object_impl**) = 0; diff --git a/include/boost/corosio/any_bufref.hpp b/include/boost/corosio/any_bufref.hpp index 3a57988..57d4c14 100644 --- a/include/boost/corosio/any_bufref.hpp +++ b/include/boost/corosio/any_bufref.hpp @@ -61,11 +61,11 @@ class any_bufref @param bs The buffer sequence to adapt. */ - template + template explicit - any_bufref(BufferSequence const& bs) noexcept + any_bufref(BS const& bs) noexcept : bs_(&bs) - , fn_(©_impl) + , fn_(©_impl) { } @@ -85,19 +85,19 @@ class any_bufref } private: - template + template static std::size_t copy_impl( void const* p, capy::mutable_buffer* dest, std::size_t n) { - auto const& bs = *static_cast(p); + auto const& bs = *static_cast(p); auto it = capy::begin(bs); auto const end_it = capy::end(bs); std::size_t i = 0; - if constexpr (capy::mutable_buffer_sequence) + if constexpr (capy::MutableBufferSequence) { for (; it != end_it && i < n; ++it, ++i) dest[i] = *it; diff --git a/include/boost/corosio/consuming_buffers.hpp b/include/boost/corosio/consuming_buffers.hpp index 1232bcb..3d2c543 100644 --- a/include/boost/corosio/consuming_buffers.hpp +++ b/include/boost/corosio/consuming_buffers.hpp @@ -26,14 +26,14 @@ namespace detail { template struct buffer_type_for; -template +template struct buffer_type_for { using type = capy::mutable_buffer; }; -template - requires (!capy::mutable_buffer_sequence) +template + requires (!capy::MutableBufferSequence) struct buffer_type_for { using type = capy::const_buffer; @@ -52,8 +52,8 @@ struct buffer_type_for @tparam BufferSequence The buffer sequence type. */ template - requires capy::mutable_buffer_sequence || - capy::const_buffer_sequence + requires capy::MutableBufferSequence || + capy::ConstBufferSequence class consuming_buffers { using iterator_type = decltype(capy::begin(std::declval())); @@ -231,16 +231,16 @@ class consuming_buffers // ADL helpers for capy::begin and capy::end template - requires capy::mutable_buffer_sequence || - capy::const_buffer_sequence + requires capy::MutableBufferSequence || + capy::ConstBufferSequence auto begin(consuming_buffers const& cb) noexcept { return cb.begin(); } template - requires capy::mutable_buffer_sequence || - capy::const_buffer_sequence + requires capy::MutableBufferSequence || + capy::ConstBufferSequence auto end(consuming_buffers const& cb) noexcept { return cb.end(); diff --git a/include/boost/corosio/io_context.hpp b/include/boost/corosio/io_context.hpp index 0301ca1..39be23c 100644 --- a/include/boost/corosio/io_context.hpp +++ b/include/boost/corosio/io_context.hpp @@ -291,7 +291,7 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context The executor provides the interface for posting work items and dispatching coroutines to the associated io_context. It satisfies - the `capy::executor` concept. + the `capy::Executor` concept. Executors are lightweight handles that can be copied and compared for equality. Two executors compare equal if they refer to the @@ -365,7 +365,7 @@ class io_context::executor_type /** Dispatch a coroutine handle. - This is the dispatcher interface for capy coroutines. If called + This is the executor interface for capy coroutines. If called from within `run()`, returns the handle for symmetric transfer. Otherwise posts the handle and returns `noop_coroutine`. @@ -375,7 +375,7 @@ class io_context::executor_type if the handle was posted. */ capy::any_coro - operator()(capy::any_coro h) const + dispatch(capy::any_coro h) const { if (running_in_this_thread()) return h; @@ -396,18 +396,6 @@ class io_context::executor_type ctx_->sched_.post(h); } - /** Queue a coroutine for deferred execution. - - This is semantically identical to `post`, but conveys that - `h` is a continuation of the current call context. - - @param h The coroutine handle to defer. - */ - void - defer(capy::any_coro h) const - { - ctx_->sched_.post(h); - } /** Compare two executors for equality. diff --git a/include/boost/corosio/io_stream.hpp b/include/boost/corosio/io_stream.hpp index 45ee5ca..5f669c6 100644 --- a/include/boost/corosio/io_stream.hpp +++ b/include/boost/corosio/io_stream.hpp @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include #include @@ -122,25 +122,25 @@ class BOOST_COROSIO_DECL io_stream : public io_object return {ec_, bytes_transferred_}; } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d) -> std::coroutine_handle<> + Ex const& ex) -> std::coroutine_handle<> { any_bufref param(buffers_); - ios_.get().read_some(h, d, param, token_, &ec_, &bytes_transferred_); + ios_.get().read_some(h, ex, param, token_, &ec_, &bytes_transferred_); return std::noop_coroutine(); } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d, + Ex const& ex, std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); any_bufref param(buffers_); - ios_.get().read_some(h, d, param, token_, &ec_, &bytes_transferred_); + ios_.get().read_some(h, ex, param, token_, &ec_, &bytes_transferred_); return std::noop_coroutine(); } }; @@ -174,25 +174,25 @@ class BOOST_COROSIO_DECL io_stream : public io_object return {ec_, bytes_transferred_}; } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d) -> std::coroutine_handle<> + Ex const& ex) -> std::coroutine_handle<> { any_bufref param(buffers_); - ios_.get().write_some(h, d, param, token_, &ec_, &bytes_transferred_); + ios_.get().write_some(h, ex, param, token_, &ec_, &bytes_transferred_); return std::noop_coroutine(); } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d, + Ex const& ex, std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); any_bufref param(buffers_); - ios_.get().write_some(h, d, param, token_, &ec_, &bytes_transferred_); + ios_.get().write_some(h, ex, param, token_, &ec_, &bytes_transferred_); return std::noop_coroutine(); } }; @@ -202,7 +202,7 @@ class BOOST_COROSIO_DECL io_stream : public io_object { virtual void read_some( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, any_bufref&, std::stop_token, system::error_code*, @@ -210,7 +210,7 @@ class BOOST_COROSIO_DECL io_stream : public io_object virtual void write_some( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, any_bufref&, std::stop_token, system::error_code*, diff --git a/include/boost/corosio/read.hpp b/include/boost/corosio/read.hpp index 55bf9ce..6c656d9 100644 --- a/include/boost/corosio/read.hpp +++ b/include/boost/corosio/read.hpp @@ -76,7 +76,7 @@ namespace corosio { error or EOF occurs), whereas `read_some()` may return after reading any amount of data. */ -template +template capy::task> read(io_stream& ios, MutableBufferSequence const& buffers) { diff --git a/include/boost/corosio/resolver.hpp b/include/boost/corosio/resolver.hpp index 06166bd..50649d0 100644 --- a/include/boost/corosio/resolver.hpp +++ b/include/boost/corosio/resolver.hpp @@ -15,8 +15,8 @@ #include #include #include -#include -#include +#include +#include #include #include @@ -172,23 +172,23 @@ class BOOST_COROSIO_DECL resolver : public io_object return {ec_, std::move(results_)}; } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d) -> std::coroutine_handle<> + Ex const& ex) -> std::coroutine_handle<> { - r_.get().resolve(h, d, host_, service_, flags_, token_, &ec_, &results_); + r_.get().resolve(h, ex, host_, service_, flags_, token_, &ec_, &results_); return std::noop_coroutine(); } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d, + Ex const& ex, std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); - r_.get().resolve(h, d, host_, service_, flags_, token_, &ec_, &results_); + r_.get().resolve(h, ex, host_, service_, flags_, token_, &ec_, &results_); return std::noop_coroutine(); } }; @@ -212,10 +212,10 @@ class BOOST_COROSIO_DECL resolver : public io_object @param ex The executor whose context will own the resolver. */ - template - requires (!std::same_as, resolver>) && - capy::executor - explicit resolver(Executor const& ex) + template + requires (!std::same_as, resolver>) && + capy::Executor + explicit resolver(Ex const& ex) : resolver(ex.context()) { } @@ -319,7 +319,7 @@ class BOOST_COROSIO_DECL resolver : public io_object { virtual void resolve( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::string_view host, std::string_view service, resolve_flags flags, diff --git a/include/boost/corosio/signal_set.hpp b/include/boost/corosio/signal_set.hpp index bfe8fe8..a4a3b9b 100644 --- a/include/boost/corosio/signal_set.hpp +++ b/include/boost/corosio/signal_set.hpp @@ -15,9 +15,9 @@ #include #include #include -#include +#include #include -#include +#include #include #include @@ -73,23 +73,23 @@ class BOOST_COROSIO_DECL signal_set : public io_object return {ec_, signal_number_}; } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d) -> std::coroutine_handle<> + Ex const& ex) -> std::coroutine_handle<> { - s_.get().wait(h, d, token_, &ec_, &signal_number_); + s_.get().wait(h, ex, token_, &ec_, &signal_number_); return std::noop_coroutine(); } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d, + Ex const& ex, std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); - s_.get().wait(h, d, token_, &ec_, &signal_number_); + s_.get().wait(h, ex, token_, &ec_, &signal_number_); return std::noop_coroutine(); } }; @@ -99,7 +99,7 @@ class BOOST_COROSIO_DECL signal_set : public io_object { virtual void wait( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::stop_token, system::error_code*, int*) = 0; diff --git a/include/boost/corosio/socket.hpp b/include/boost/corosio/socket.hpp index 6a4df19..4c30479 100644 --- a/include/boost/corosio/socket.hpp +++ b/include/boost/corosio/socket.hpp @@ -16,8 +16,8 @@ #include #include #include -#include -#include +#include +#include #include #include @@ -77,7 +77,7 @@ class BOOST_COROSIO_DECL socket : public io_stream { virtual void connect( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, endpoint, std::stop_token, system::error_code*) = 0; @@ -108,23 +108,23 @@ class BOOST_COROSIO_DECL socket : public io_stream return {ec_}; } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d) -> std::coroutine_handle<> + Ex const& ex) -> std::coroutine_handle<> { - s_.get().connect(h, d, endpoint_, token_, &ec_); + s_.get().connect(h, ex, endpoint_, token_, &ec_); return std::noop_coroutine(); } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d, + Ex const& ex, std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); - s_.get().connect(h, d, endpoint_, token_, &ec_); + s_.get().connect(h, ex, endpoint_, token_, &ec_); return std::noop_coroutine(); } }; @@ -148,10 +148,10 @@ class BOOST_COROSIO_DECL socket : public io_stream @param ex The executor whose context will own the socket. */ - template - requires (!std::same_as, socket>) && - capy::executor - explicit socket(Executor const& ex) + template + requires (!std::same_as, socket>) && + capy::Executor + explicit socket(Ex const& ex) : socket(ex.context()) { } diff --git a/include/boost/corosio/tcp_server.hpp b/include/boost/corosio/tcp_server.hpp new file mode 100644 index 0000000..cd32276 --- /dev/null +++ b/include/boost/corosio/tcp_server.hpp @@ -0,0 +1,289 @@ +// +// Copyright (c) 2026 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_TCP_SERVER_HPP +#define BOOST_COROSIO_TCP_SERVER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace boost { +namespace corosio { + +class BOOST_COROSIO_DECL + tcp_server +{ +protected: + class worker_base; + class launcher; + class workers; + +private: + struct waiter; + class push_aw; + class pop_aw; + + struct launch_wrapper + { + struct promise_type + { + capy::any_executor_ref d; + + launch_wrapper get_return_object() noexcept { + return {std::coroutine_handle::from_promise(*this)}; + } + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + + // Injects executor for affinity-aware awaitables + template + auto await_transform(Awaitable&& a) + { + struct adapter + { + std::decay_t aw; + capy::any_executor_ref d; + + bool await_ready() { return aw.await_ready(); } + auto await_resume() { return aw.await_resume(); } + + auto await_suspend(std::coroutine_handle h) + { + if constexpr (capy::IoAwaitable< + std::decay_t, capy::any_executor_ref>) + return aw.await_suspend(h, d, std::stop_token{}); + else + return aw.await_suspend(h); + } + }; + return adapter{std::forward(a), d}; + } + }; + + std::coroutine_handle h; + + launch_wrapper(std::coroutine_handle handle) noexcept + : h(handle) + { + } + + ~launch_wrapper() + { + if(h) + h.destroy(); + } + + launch_wrapper(launch_wrapper&& o) noexcept + : h(std::exchange(o.h, nullptr)) + { + } + + launch_wrapper(launch_wrapper const&) = delete; + launch_wrapper& operator=(launch_wrapper const&) = delete; + launch_wrapper& operator=(launch_wrapper&&) = delete; + }; + + io_context& ctx_; + capy::any_executor_ref dispatch_; + capy::any_executor_ref post_; + waiter* waiters_ = nullptr; + std::vector ports_; + + struct waiter + { + waiter* next; + std::coroutine_handle<> h; + worker_base* w; + }; + + class BOOST_COROSIO_DECL push_aw + { + tcp_server& self_; + worker_base& w_; + + public: + push_aw(tcp_server& self, worker_base& w) noexcept; + bool await_ready() const noexcept; + std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept; + void await_resume() noexcept; + }; + + push_aw push(worker_base& w); + + void push_sync(worker_base& w) noexcept; + + class BOOST_COROSIO_DECL pop_aw + { + tcp_server& self_; + waiter wait_; + + public: + pop_aw(tcp_server& self) noexcept; + bool await_ready() const noexcept; + bool await_suspend(std::coroutine_handle<> h) noexcept; + system::result await_resume() noexcept; + }; + + pop_aw pop(); + + capy::task do_accept(acceptor& acc); + +protected: + class worker_base + { + worker_base* next = nullptr; + + friend class tcp_server; + friend class workers; + + public: + socket sock; + + virtual ~worker_base() = default; + virtual void run(launcher launch) = 0; + + protected: + worker_base(capy::execution_context& ctx) + : sock(ctx) + { + } + }; + + class workers + { + friend class tcp_server; + + std::vector> v_; + worker_base* idle_ = nullptr; + + void push(worker_base& w) noexcept + { + w.next = idle_; + idle_ = &w; + } + + worker_base* try_pop() noexcept + { + auto* w = idle_; + idle_ = w->next; + return w; + } + + public: + template + T& emplace(Args&&... args) + { + auto p = std::make_unique(std::forward(args)...); + auto* raw = p.get(); + v_.push_back(std::move(p)); + push(*raw); + return static_cast(*raw); + } + + void reserve(std::size_t n) { v_.reserve(n); } + std::size_t size() const noexcept { return v_.size(); } + }; + + workers wv_; + + class launcher + { + tcp_server* srv_; + worker_base* w_; + + friend class tcp_server; + + launcher(tcp_server& srv, worker_base& w) noexcept + : srv_(&srv) + , w_(&w) + { + } + + public: + ~launcher() + { + if(w_) + srv_->push_sync(*w_); + } + + launcher(launcher&& o) noexcept + : srv_(o.srv_) + , w_(std::exchange(o.w_, nullptr)) + { + } + launcher(launcher const&) = delete; + launcher& operator=(launcher const&) = delete; + launcher& operator=(launcher&&) = delete; + + template + void operator()(Executor const& ex, capy::task task) + { + if(! w_) + throw std::logic_error("launcher already invoked"); + + auto* w = std::exchange(w_, nullptr); + + // Return worker to pool if coroutine setup throws + struct guard_t { + tcp_server* srv; + worker_base* w; + ~guard_t() { if(w) srv->push_sync(*w); } + } guard{srv_, w}; + + auto wrapper = + [](Executor ex, tcp_server* self, capy::task t, worker_base* wp) + -> launch_wrapper + { + (void)ex; // Prevent executor destruction while coroutine runs + co_await std::move(t); + co_await self->push(*wp); + }(ex, srv_, std::move(task), w); + + ex.post(std::exchange(wrapper.h, nullptr)); // Release before post + guard.w = nullptr; // Success - dismiss guard + } + }; + +protected: + template + tcp_server( + io_context& ctx, + Ex const& ex) + : ctx_(ctx) + , dispatch_(ex) + , post_(ex) + { + } + +public: + system::error_code bind(endpoint ep); + + void start(); +}; + +} // corosio +} // boost + +#endif diff --git a/include/boost/corosio/timer.hpp b/include/boost/corosio/timer.hpp index dcc0688..8d11f13 100644 --- a/include/boost/corosio/timer.hpp +++ b/include/boost/corosio/timer.hpp @@ -15,9 +15,9 @@ #include #include #include -#include +#include #include -#include +#include #include #include @@ -65,23 +65,23 @@ class BOOST_COROSIO_DECL timer : public io_object return {ec_}; } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d) -> std::coroutine_handle<> + Ex const& ex) -> std::coroutine_handle<> { - t_.get().wait(h, d, token_, &ec_); + t_.get().wait(h, ex, token_, &ec_); return std::noop_coroutine(); } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d, + Ex const& ex, std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); - t_.get().wait(h, d, token_, &ec_); + t_.get().wait(h, ex, token_, &ec_); return std::noop_coroutine(); } }; @@ -91,7 +91,7 @@ class BOOST_COROSIO_DECL timer : public io_object { virtual void wait( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::stop_token, system::error_code*) = 0; }; diff --git a/include/boost/corosio/wolfssl_stream.hpp b/include/boost/corosio/wolfssl_stream.hpp index 05db62d..140f5c9 100644 --- a/include/boost/corosio/wolfssl_stream.hpp +++ b/include/boost/corosio/wolfssl_stream.hpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include @@ -70,23 +70,23 @@ class BOOST_COROSIO_DECL wolfssl_stream : public io_stream return {ec_}; } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d) -> std::coroutine_handle<> + Ex const& ex) -> std::coroutine_handle<> { - stream_.get().handshake(h, d, type_, token_, &ec_); + stream_.get().handshake(h, ex, type_, token_, &ec_); return std::noop_coroutine(); } - template + template auto await_suspend( std::coroutine_handle<> h, - Dispatcher const& d, + Ex const& ex, std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); - stream_.get().handshake(h, d, type_, token_, &ec_); + stream_.get().handshake(h, ex, type_, token_, &ec_); return std::noop_coroutine(); } }; @@ -153,7 +153,7 @@ class BOOST_COROSIO_DECL wolfssl_stream : public io_stream { virtual void handshake( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, int, std::stop_token, system::error_code*) = 0; diff --git a/include/boost/corosio/write.hpp b/include/boost/corosio/write.hpp index 7e9e36e..39b20ba 100644 --- a/include/boost/corosio/write.hpp +++ b/include/boost/corosio/write.hpp @@ -71,7 +71,7 @@ namespace corosio { error occurs), whereas `write_some()` may return after writing any amount of data. */ -template +template capy::task> write(io_stream& ios, ConstBufferSequence const& buffers) { diff --git a/src/corosio/src/detail/posix_op.hpp b/src/corosio/src/detail/posix_op.hpp index 32ff748..24add6f 100644 --- a/src/corosio/src/detail/posix_op.hpp +++ b/src/corosio/src/detail/posix_op.hpp @@ -12,8 +12,8 @@ #include #include -#include -#include +#include +#include #include #include #include @@ -40,7 +40,7 @@ namespace detail { /** Base class for POSIX async operations. This class is analogous to overlapped_op on Windows. - It stores the coroutine handle, dispatcher, and result + It stores the coroutine handle, executor, and result pointers needed to complete an async operation. */ struct posix_op : scheduler_op @@ -52,7 +52,7 @@ struct posix_op : scheduler_op }; capy::any_coro h; - capy::any_dispatcher d; + capy::any_executor_ref d; system::error_code* ec_out = nullptr; std::size_t* bytes_out = nullptr; @@ -98,7 +98,7 @@ struct posix_op : scheduler_op if (bytes_out) *bytes_out = bytes_transferred; - d(h).resume(); + d.dispatch(h).resume(); } // Returns true if this is a read operation (for EOF detection) @@ -293,7 +293,7 @@ struct posix_accept_op : posix_op *impl_out = nullptr; } - d(h).resume(); + d.dispatch(h).resume(); } }; diff --git a/src/corosio/src/detail/posix_resolver_service.hpp b/src/corosio/src/detail/posix_resolver_service.hpp index 0666f71..ac8f83b 100644 --- a/src/corosio/src/detail/posix_resolver_service.hpp +++ b/src/corosio/src/detail/posix_resolver_service.hpp @@ -13,8 +13,8 @@ #include #include #include -#include -#include +#include +#include #include #include @@ -54,7 +54,7 @@ class posix_resolver_impl void resolve( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::string_view /*host*/, std::string_view /*service*/, resolve_flags /*flags*/, diff --git a/src/corosio/src/detail/posix_signals.cpp b/src/corosio/src/detail/posix_signals.cpp new file mode 100644 index 0000000..38e49a7 --- /dev/null +++ b/src/corosio/src/detail/posix_signals.cpp @@ -0,0 +1,158 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef _WIN32 + +#include +#include + +namespace boost { +namespace corosio { + +signal_set:: +~signal_set() +{ + if (impl_) + impl_->release(); +} + +signal_set:: +signal_set(capy::execution_context& ctx) + : io_object(ctx) +{ + // Stub: signal_set not supported on this platform + impl_ = nullptr; +} + +signal_set:: +signal_set(capy::execution_context& ctx, int) + : io_object(ctx) +{ + impl_ = nullptr; + detail::throw_system_error( + make_error_code(system::errc::function_not_supported), + "signal_set: not supported on this platform"); +} + +signal_set:: +signal_set( + capy::execution_context& ctx, + int, + int) + : io_object(ctx) +{ + impl_ = nullptr; + detail::throw_system_error( + make_error_code(system::errc::function_not_supported), + "signal_set: not supported on this platform"); +} + +signal_set:: +signal_set( + capy::execution_context& ctx, + int, + int, + int) + : io_object(ctx) +{ + impl_ = nullptr; + detail::throw_system_error( + make_error_code(system::errc::function_not_supported), + "signal_set: not supported on this platform"); +} + +signal_set:: +signal_set(signal_set&& other) noexcept + : io_object(other.context()) +{ + impl_ = other.impl_; + other.impl_ = nullptr; +} + +signal_set& +signal_set:: +operator=(signal_set&& other) +{ + if (this != &other) + { + if (ctx_ != other.ctx_) + detail::throw_logic_error( + "signal_set::operator=: context mismatch"); + + if (impl_) + impl_->release(); + + impl_ = other.impl_; + other.impl_ = nullptr; + } + return *this; +} + +void +signal_set:: +add(int signal_number) +{ + system::error_code ec; + add(signal_number, ec); + if (ec) + detail::throw_system_error(ec, "signal_set::add"); +} + +void +signal_set:: +add(int, system::error_code& ec) +{ + ec = make_error_code(system::errc::function_not_supported); +} + +void +signal_set:: +remove(int signal_number) +{ + system::error_code ec; + remove(signal_number, ec); + if (ec) + detail::throw_system_error(ec, "signal_set::remove"); +} + +void +signal_set:: +remove(int, system::error_code& ec) +{ + ec = make_error_code(system::errc::function_not_supported); +} + +void +signal_set:: +clear() +{ + system::error_code ec; + clear(ec); + if (ec) + detail::throw_system_error(ec, "signal_set::clear"); +} + +void +signal_set:: +clear(system::error_code& ec) +{ + ec = make_error_code(system::errc::function_not_supported); +} + +void +signal_set:: +cancel() +{ + // No-op: nothing to cancel on stub implementation +} + +} // namespace corosio +} // namespace boost + +#endif // !_WIN32 diff --git a/src/corosio/src/detail/posix_sockets.hpp b/src/corosio/src/detail/posix_sockets.hpp index 6bc884f..21bcc28 100644 --- a/src/corosio/src/detail/posix_sockets.hpp +++ b/src/corosio/src/detail/posix_sockets.hpp @@ -13,8 +13,8 @@ #include #include #include -#include -#include +#include +#include #include #include @@ -60,14 +60,14 @@ class posix_socket_impl void connect( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, endpoint, std::stop_token, system::error_code*) override; void read_some( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, any_bufref&, std::stop_token, system::error_code*, @@ -75,7 +75,7 @@ class posix_socket_impl void write_some( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, any_bufref&, std::stop_token, system::error_code*, @@ -115,7 +115,7 @@ class posix_acceptor_impl void accept( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::stop_token, system::error_code*, io_object::io_object_impl**) override; @@ -233,7 +233,7 @@ inline void posix_socket_impl:: connect( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, endpoint ep, std::stop_token token, system::error_code* ec) @@ -275,7 +275,7 @@ inline void posix_socket_impl:: read_some( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& param, std::stop_token token, system::error_code* ec, @@ -335,7 +335,7 @@ inline void posix_socket_impl:: write_some( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& param, std::stop_token token, system::error_code* ec, @@ -428,7 +428,7 @@ inline void posix_acceptor_impl:: accept( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, std::stop_token token, system::error_code* ec, io_object::io_object_impl** impl_out) diff --git a/src/corosio/src/detail/timer_service.cpp b/src/corosio/src/detail/timer_service.cpp index 92bee4c..2623e0e 100644 --- a/src/corosio/src/detail/timer_service.cpp +++ b/src/corosio/src/detail/timer_service.cpp @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include @@ -41,7 +41,7 @@ struct timer_impl // Wait operation state std::coroutine_handle<> h_; - capy::any_dispatcher d_; + capy::any_executor_ref d_; system::error_code* ec_out_ = nullptr; std::stop_token token_; bool waiting_ = false; @@ -55,7 +55,7 @@ struct timer_impl void wait( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::stop_token, system::error_code*) override; }; @@ -142,7 +142,7 @@ class timer_service_impl : public timer_service bool notify = false; bool was_waiting = false; std::coroutine_handle<> h; - capy::any_dispatcher d; + capy::any_executor_ref d; system::error_code* ec_out = nullptr; { @@ -186,8 +186,8 @@ class timer_service_impl : public timer_service { if (ec_out) *ec_out = make_error_code(capy::error::canceled); - auto resume_h = d(h); - // Resume the handle if dispatcher returned it for symmetric transfer + auto resume_h = d.dispatch(h); + // Resume the handle if executor returned it for symmetric transfer if (resume_h.address() == h.address()) resume_h.resume(); // Call on_work_finished AFTER the coroutine resumes @@ -207,7 +207,7 @@ class timer_service_impl : public timer_service void cancel_timer(timer_impl& impl) { std::coroutine_handle<> h; - capy::any_dispatcher d; + capy::any_executor_ref d; system::error_code* ec_out = nullptr; bool was_waiting = false; @@ -229,8 +229,8 @@ class timer_service_impl : public timer_service { if (ec_out) *ec_out = make_error_code(capy::error::canceled); - auto resume_h = d(h); - // Resume the handle if dispatcher returned it for symmetric transfer + auto resume_h = d.dispatch(h); + // Resume the handle if executor returned it for symmetric transfer if (resume_h.address() == h.address()) resume_h.resume(); // Call on_work_finished AFTER the coroutine resumes @@ -256,7 +256,7 @@ class timer_service_impl : public timer_service struct expired_entry { std::coroutine_handle<> h; - capy::any_dispatcher d; + capy::any_executor_ref d; system::error_code* ec_out; }; std::vector expired; @@ -285,9 +285,9 @@ class timer_service_impl : public timer_service { if (e.ec_out) *e.ec_out = {}; - auto resume_h = e.d(e.h); - // Resume the handle if dispatcher returned it for symmetric transfer - // (dispatcher returns our handle if we should resume, noop if it posted) + auto resume_h = e.d.dispatch(e.h); + // Resume the handle if executor returned it for symmetric transfer + // (executor returns our handle if we should resume, noop if it posted) if (resume_h.address() == e.h.address()) resume_h.resume(); // Call on_work_finished AFTER the coroutine resumes, so it has a @@ -378,7 +378,7 @@ void timer_impl:: wait( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, std::stop_token token, system::error_code* ec) { @@ -391,8 +391,8 @@ wait( if (ec) *ec = {}; // Note: no work tracking needed - we dispatch synchronously - auto resume_h = d(h); - // Resume the handle if dispatcher returned it for symmetric transfer + auto resume_h = d.dispatch(h); + // Resume the handle if executor returned it for symmetric transfer if (resume_h.address() == h.address()) resume_h.resume(); return; diff --git a/src/corosio/src/detail/win/overlapped_op.hpp b/src/corosio/src/detail/win/overlapped_op.hpp index 9719bb6..6fac0d3 100644 --- a/src/corosio/src/detail/win/overlapped_op.hpp +++ b/src/corosio/src/detail/win/overlapped_op.hpp @@ -11,8 +11,8 @@ #define BOOST_COROSIO_DETAIL_WIN_OVERLAPPED_OP_HPP #include -#include -#include +#include +#include #include #include #include @@ -45,7 +45,7 @@ struct overlapped_op }; capy::any_coro h; - capy::any_dispatcher d; + capy::any_executor_ref d; system::error_code* ec_out = nullptr; std::size_t* bytes_out = nullptr; DWORD error = 0; @@ -97,7 +97,7 @@ struct overlapped_op if (bytes_out) *bytes_out = static_cast(bytes_transferred); - d(h).resume(); + d.dispatch(h).resume(); } // Returns true if this is a read operation (for EOF detection) diff --git a/src/corosio/src/detail/win/resolver_service.cpp b/src/corosio/src/detail/win/resolver_service.cpp index d28c76c..d7099b8 100644 --- a/src/corosio/src/detail/win/resolver_service.cpp +++ b/src/corosio/src/detail/win/resolver_service.cpp @@ -150,7 +150,7 @@ operator()() cancel_handle = nullptr; - d(h).resume(); + d.dispatch(h).resume(); } void @@ -190,7 +190,7 @@ void win_resolver_impl:: resolve( capy::any_coro h, - capy::any_dispatcher d, + capy::any_executor_ref d, std::string_view host, std::string_view service, resolve_flags flags, diff --git a/src/corosio/src/detail/win/resolver_service.hpp b/src/corosio/src/detail/win/resolver_service.hpp index 3b884d0..812c32f 100644 --- a/src/corosio/src/detail/win/resolver_service.hpp +++ b/src/corosio/src/detail/win/resolver_service.hpp @@ -21,8 +21,8 @@ #include #include -#include -#include +#include +#include #include #include @@ -92,7 +92,7 @@ class win_resolver_impl void resolve( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::string_view host, std::string_view service, resolve_flags flags, diff --git a/src/corosio/src/detail/win/signals.cpp b/src/corosio/src/detail/win/signals.cpp index 8dd4d6c..43f6a82 100644 --- a/src/corosio/src/detail/win/signals.cpp +++ b/src/corosio/src/detail/win/signals.cpp @@ -73,7 +73,7 @@ operator()() auto* service = svc; svc = nullptr; - d(h).resume(); + d.dispatch(h).resume(); // Balance the on_work_started() from start_wait if (service) @@ -113,7 +113,7 @@ void win_signal_impl:: wait( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, std::stop_token token, system::error_code* ec, int* signal_out) @@ -131,7 +131,7 @@ wait( *ec = make_error_code(capy::error::canceled); if (signal_out) *signal_out = 0; - d(h).resume(); + d.dispatch(h).resume(); return; } @@ -397,7 +397,7 @@ cancel_wait(win_signal_impl& impl) *op->ec_out = make_error_code(capy::error::canceled); if (op->signal_out) *op->signal_out = 0; - op->d(op->h).resume(); + op->d.dispatch(op->h).resume(); sched_.on_work_finished(); } } diff --git a/src/corosio/src/detail/win/signals.hpp b/src/corosio/src/detail/win/signals.hpp index e7a3fe7..6c4acde 100644 --- a/src/corosio/src/detail/win/signals.hpp +++ b/src/corosio/src/detail/win/signals.hpp @@ -12,8 +12,8 @@ #include #include -#include -#include +#include +#include #include #include #include @@ -45,7 +45,7 @@ enum { max_signal_number = 32 }; struct signal_op : scheduler_op { capy::any_coro h; - capy::any_dispatcher d; + capy::any_executor_ref d; system::error_code* ec_out = nullptr; int* signal_out = nullptr; int signal_number = 0; @@ -96,7 +96,7 @@ class win_signal_impl void wait( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::stop_token, system::error_code*, int*) override; diff --git a/src/corosio/src/detail/win/sockets.cpp b/src/corosio/src/detail/win/sockets.cpp index a21e503..91eb30f 100644 --- a/src/corosio/src/detail/win/sockets.cpp +++ b/src/corosio/src/detail/win/sockets.cpp @@ -104,7 +104,7 @@ operator()() *impl_out = nullptr; } - d(h).resume(); + d.dispatch(h).resume(); } void @@ -176,7 +176,7 @@ void win_socket_impl:: connect( capy::any_coro h, - capy::any_dispatcher d, + capy::any_executor_ref d, endpoint ep, std::stop_token token, system::error_code* ec) @@ -246,7 +246,7 @@ void win_socket_impl:: read_some( capy::any_coro h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& param, std::stop_token token, system::error_code* ec, @@ -307,7 +307,7 @@ void win_socket_impl:: write_some( capy::any_coro h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& param, std::stop_token token, system::error_code* ec, @@ -678,7 +678,7 @@ void win_acceptor_impl:: accept( capy::any_coro h, - capy::any_dispatcher d, + capy::any_executor_ref d, std::stop_token token, system::error_code* ec, io_object::io_object_impl** impl_out) diff --git a/src/corosio/src/detail/win/sockets.hpp b/src/corosio/src/detail/win/sockets.hpp index bb2f57a..7cbaa40 100644 --- a/src/corosio/src/detail/win/sockets.hpp +++ b/src/corosio/src/detail/win/sockets.hpp @@ -13,8 +13,8 @@ #include #include #include -#include -#include +#include +#include #include #include @@ -122,14 +122,14 @@ class win_socket_impl void connect( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, endpoint, std::stop_token, system::error_code*) override; void read_some( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, any_bufref&, std::stop_token, system::error_code*, @@ -137,7 +137,7 @@ class win_socket_impl void write_some( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, any_bufref&, std::stop_token, system::error_code*, @@ -172,7 +172,7 @@ class win_acceptor_impl void accept( std::coroutine_handle<>, - capy::any_dispatcher, + capy::any_executor_ref, std::stop_token, system::error_code*, io_object::io_object_impl**) override; diff --git a/src/corosio/src/tcp_server.cpp b/src/corosio/src/tcp_server.cpp new file mode 100644 index 0000000..c88849f --- /dev/null +++ b/src/corosio/src/tcp_server.cpp @@ -0,0 +1,149 @@ +// +// Copyright (c) 2026 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +namespace boost { +namespace corosio { + +tcp_server::push_aw::push_aw( + tcp_server& self, + worker_base& w) noexcept + : self_(self) + , w_(w) +{ +} + +bool +tcp_server::push_aw::await_ready() const noexcept +{ + return false; +} + +std::coroutine_handle<> +tcp_server::push_aw::await_suspend( + std::coroutine_handle<> h) noexcept +{ + // Dispatch to server's executor before touching shared state + return self_.dispatch_.dispatch(h); +} + +void +tcp_server::push_aw::await_resume() noexcept +{ + if(self_.waiters_) + { + auto* wait = self_.waiters_; + self_.waiters_ = wait->next; + wait->w = &w_; + self_.post_.post(wait->h); + } + else + { + self_.wv_.push(w_); + } +} + +tcp_server::pop_aw::pop_aw(tcp_server& self) noexcept + : self_(self) + , wait_{} +{ +} + +bool +tcp_server::pop_aw::await_ready() const noexcept +{ + return self_.wv_.idle_ != nullptr; +} + +bool +tcp_server::pop_aw::await_suspend( + std::coroutine_handle<> h) noexcept +{ + wait_.h = h; + wait_.w = nullptr; + wait_.next = self_.waiters_; + self_.waiters_ = &wait_; + return true; +} + +system::result +tcp_server::pop_aw::await_resume() noexcept +{ + if(wait_.w) + return *wait_.w; + return *self_.wv_.try_pop(); +} + +tcp_server::push_aw +tcp_server::push(worker_base& w) +{ + return push_aw{*this, w}; +} + +void +tcp_server::push_sync(worker_base& w) noexcept +{ + if(waiters_) + { + auto* wait = waiters_; + waiters_ = wait->next; + wait->w = &w; + post_.post(wait->h); + } + else + { + wv_.push(w); + } +} + +tcp_server::pop_aw +tcp_server::pop() +{ + return pop_aw{*this}; +} + +capy::task +tcp_server::do_accept(acceptor& acc) +{ + auto st = co_await capy::get_stop_token(); + while(! st.stop_requested()) + { + auto rv = co_await pop(); + if(rv.has_error()) + continue; + auto& w = rv.value(); + auto ec = co_await acc.accept(w.sock); + if(ec) + { + co_await push(w); + continue; + } + w.run(launcher{*this, w}); + } +} + +system::error_code +tcp_server::bind(endpoint ep) +{ + ports_.emplace_back(ctx_); + // VFALCO this should return error_code + ports_.back().listen(ep); + return {}; +} + +void +tcp_server::start() +{ + for(auto& t : ports_) + capy::run_async(post_)(do_accept(t)); +} + +} // namespace corosio +} // namespace boost diff --git a/src/corosio/src/test/mocket.cpp b/src/corosio/src/test/mocket.cpp index 8cdf0c4..7ba2990 100644 --- a/src/corosio/src/test/mocket.cpp +++ b/src/corosio/src/test/mocket.cpp @@ -89,7 +89,7 @@ class mocket_impl void read_some( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& buffers, std::stop_token token, system::error_code* ec, @@ -97,7 +97,7 @@ class mocket_impl void write_some( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& buffers, std::stop_token token, system::error_code* ec, @@ -256,12 +256,13 @@ void mocket_impl:: read_some( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& buffers, std::stop_token token, system::error_code* ec, std::size_t* bytes_transferred) { + (void)token; // Fuse check for m1 only if (check_fuse_) { @@ -270,7 +271,7 @@ read_some( { *ec = fail_ec; *bytes_transferred = 0; - d(capy::any_coro{h}).resume(); + d.dispatch(capy::any_coro{h}).resume(); return; } } @@ -285,7 +286,7 @@ read_some( { *ec = {}; *bytes_transferred = n; - d(capy::any_coro{h}).resume(); + d.dispatch(capy::any_coro{h}).resume(); return; } @@ -297,39 +298,24 @@ read_some( } // Pass through to the real socket - capy::run_async(d, token, - [h, d]() - { - d(capy::any_coro{h}).resume(); - }, - [this](std::exception_ptr ep) - { - fuse_.fail(ep); - })( - [this, bufs, count, ec, bytes_transferred]() -> capy::task<> - { - std::array mut_bufs; - for (std::size_t i = 0; i < count; ++i) - mut_bufs[i] = bufs[i]; - - auto [read_ec, read_n] = co_await sock_.read_some( - std::span(mut_bufs.data(), count)); - - *ec = read_ec; - *bytes_transferred = read_n; - }()); + // TODO: Temporarily disabled during API refactoring + // run_async requires Executor concept but any_executor_ref doesn't satisfy it + *ec = make_error_code(system::errc::not_supported); + *bytes_transferred = 0; + d.dispatch(capy::any_coro{h}).resume(); } void mocket_impl:: write_some( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& buffers, std::stop_token token, system::error_code* ec, std::size_t* bytes_transferred) { + (void)token; // Fuse check for m1 only if (check_fuse_) { @@ -338,7 +324,7 @@ write_some( { *ec = fail_ec; *bytes_transferred = 0; - d(capy::any_coro{h}).resume(); + d.dispatch(capy::any_coro{h}).resume(); return; } } @@ -359,40 +345,23 @@ write_some( { *ec = capy::error::test_failure; *bytes_transferred = 0; - d(capy::any_coro{h}).resume(); + d.dispatch(capy::any_coro{h}).resume(); return; } // If all expected data was validated, report success *ec = {}; *bytes_transferred = total_size; - d(capy::any_coro{h}).resume(); + d.dispatch(capy::any_coro{h}).resume(); return; } // Pass through to the real socket - capy::run_async(d, token, - [h, d]() - { - d(capy::any_coro{h}).resume(); - }, - [this](std::exception_ptr ep) - { - fuse_.fail(ep); - })( - [this, bufs, count, ec, bytes_transferred]() -> capy::task<> - { - // Convert to const_buffer for write - std::array const_bufs; - for (std::size_t i = 0; i < count; ++i) - const_bufs[i] = capy::const_buffer(bufs[i].data(), bufs[i].size()); - - auto [write_ec, write_n] = co_await sock_.write_some( - std::span(const_bufs.data(), count)); - - *ec = write_ec; - *bytes_transferred = write_n; - }()); + // TODO: Temporarily disabled during API refactoring + // run_async requires Executor concept but any_executor_ref doesn't satisfy it + *ec = make_error_code(system::errc::not_supported); + *bytes_transferred = 0; + d.dispatch(capy::any_coro{h}).resume(); } //------------------------------------------------------------------------------ diff --git a/src/wolfssl/src/wolfssl_stream.cpp b/src/wolfssl/src/wolfssl_stream.cpp index 3b1a0bf..2aaae4d 100644 --- a/src/wolfssl/src/wolfssl_stream.cpp +++ b/src/wolfssl/src/wolfssl_stream.cpp @@ -238,7 +238,7 @@ struct wolfssl_stream_impl_ system::error_code* ec_out, std::size_t* bytes_out, std::coroutine_handle<> continuation, - capy::any_dispatcher d) + capy::any_executor_ref d) { system::error_code ec; std::size_t total_read = 0; @@ -325,8 +325,8 @@ struct wolfssl_stream_impl_ *ec_out = ec; *bytes_out = total_read; - // Resume the original caller via dispatcher - d(capy::any_coro{continuation}).resume(); + // Resume the original caller via executor + d.dispatch(capy::any_coro{continuation}).resume(); co_return; } @@ -343,7 +343,7 @@ struct wolfssl_stream_impl_ system::error_code* ec_out, std::size_t* bytes_out, std::coroutine_handle<> continuation, - capy::any_dispatcher d) + capy::any_executor_ref d) { system::error_code ec; std::size_t total_written = 0; @@ -436,8 +436,8 @@ struct wolfssl_stream_impl_ *ec_out = ec; *bytes_out = total_written; - // Resume the original caller via dispatcher - d(capy::any_coro{continuation}).resume(); + // Resume the original caller via executor + d.dispatch(capy::any_coro{continuation}).resume(); co_return; } @@ -452,7 +452,7 @@ struct wolfssl_stream_impl_ std::stop_token token, system::error_code* ec_out, std::coroutine_handle<> continuation, - capy::any_dispatcher d) + capy::any_executor_ref d) { system::error_code ec; @@ -565,8 +565,8 @@ struct wolfssl_stream_impl_ *ec_out = ec; - // Resume the original caller via dispatcher - d(capy::any_coro{continuation}).resume(); + // Resume the original caller via executor + d.dispatch(capy::any_coro{continuation}).resume(); co_return; } @@ -581,7 +581,7 @@ struct wolfssl_stream_impl_ void read_some( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& param, std::stop_token token, system::error_code* ec, @@ -599,7 +599,7 @@ struct wolfssl_stream_impl_ void write_some( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, any_bufref& param, std::stop_token token, system::error_code* ec, @@ -617,7 +617,7 @@ struct wolfssl_stream_impl_ void handshake( std::coroutine_handle<> h, - capy::any_dispatcher d, + capy::any_executor_ref d, int type, std::stop_token token, system::error_code* ec) override diff --git a/test/unit/Jamfile b/test/unit/Jamfile index 54b494c..0c5fd69 100644 --- a/test/unit/Jamfile +++ b/test/unit/Jamfile @@ -26,4 +26,5 @@ run io_context.cpp ; run io_result.cpp ; run read.cpp ; run socket.cpp ; +run tcp_server.cpp ; run write.cpp ; diff --git a/test/unit/consuming_buffers.cpp b/test/unit/consuming_buffers.cpp index 36dc1ae..6534f3e 100644 --- a/test/unit/consuming_buffers.cpp +++ b/test/unit/consuming_buffers.cpp @@ -43,7 +43,7 @@ struct consuming_buffers_test // Verify consuming_buffers models mutable_buffer_sequence static_assert( - capy::mutable_buffer_sequence>, + capy::MutableBufferSequence>, "consuming_buffers must model mutable_buffer_sequence"); // Verify it can be used with buffer_size @@ -61,7 +61,7 @@ struct consuming_buffers_test // Verify consuming_buffers models mutable_buffer_sequence for single buffer static_assert( - capy::mutable_buffer_sequence>, + capy::MutableBufferSequence>, "consuming_buffers must model mutable_buffer_sequence for single buffer"); std::size_t const size = capy::buffer_size(cb); @@ -117,7 +117,7 @@ struct consuming_buffers_test } // Final check - Buffer Sequence Concept - static_assert(capy::mutable_buffer_sequence, + static_assert(capy::MutableBufferSequence, "consuming_buffers must model mutable_buffer_sequence"); } diff --git a/test/unit/signal_set.cpp b/test/unit/signal_set.cpp index d037812..42fc1fd 100644 --- a/test/unit/signal_set.cpp +++ b/test/unit/signal_set.cpp @@ -10,6 +10,8 @@ // Test that header file is self-contained. #include +#ifdef _WIN32 + #include #include #include @@ -641,3 +643,5 @@ TEST_SUITE(signal_set_test, "boost.corosio.signal_set"); } // namespace corosio } // namespace boost + +#endif // _WIN32 diff --git a/test/unit/socket.cpp b/test/unit/socket.cpp index 1e56c16..a677b95 100644 --- a/test/unit/socket.cpp +++ b/test/unit/socket.cpp @@ -24,8 +24,8 @@ namespace corosio { // Verify socket satisfies stream concepts //------------------------------------------------ -static_assert(capy::read_stream); -static_assert(capy::write_stream); +static_assert(capy::ReadStream); +static_assert(capy::WriteStream); //------------------------------------------------ // Socket-specific tests diff --git a/test/unit/tcp_server.cpp b/test/unit/tcp_server.cpp new file mode 100644 index 0000000..d4e1492 --- /dev/null +++ b/test/unit/tcp_server.cpp @@ -0,0 +1,28 @@ +// +// Copyright (c) 2026 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +// Test that header file is self-contained. +#include + +#include "test_suite.hpp" + +namespace boost { +namespace corosio { + +struct tcp_server_test +{ + void run() + { + } +}; + +TEST_SUITE(tcp_server_test, "boost.corosio.tcp_server"); + +} // namespace corosio +} // namespace boost