diff --git a/doc/modules/ROOT/pages/guide/buffers.adoc b/doc/modules/ROOT/pages/guide/buffers.adoc index 046abc7..a62bbe1 100644 --- a/doc/modules/ROOT/pages/guide/buffers.adoc +++ b/doc/modules/ROOT/pages/guide/buffers.adoc @@ -1,298 +1,298 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -= Buffer Sequences - -Corosio I/O operations work with buffer sequences from Boost.Capy. This page -explains how to use buffers effectively. - -NOTE: Code snippets assume: -[source,cpp] ----- -#include -namespace capy = boost::capy; ----- - -== Buffer Types - -=== mutable_buffer - -A writable region of memory: - -[source,cpp] ----- -char data[1024]; -capy::mutable_buffer buf(data, sizeof(data)); ----- - -=== const_buffer - -A read-only region of memory: - -[source,cpp] ----- -std::string msg = "Hello"; -capy::const_buffer buf(msg.data(), msg.size()); ----- - -== Creating Buffers - -=== From Raw Arrays - -[source,cpp] ----- -char data[1024]; -capy::mutable_buffer mbuf(data, sizeof(data)); - -const char* str = "Hello"; -capy::const_buffer cbuf(str, 5); ----- - -=== From std::string - -[source,cpp] ----- -std::string s = "Hello, World!"; - -// Writable (be careful with string invalidation) -capy::mutable_buffer mbuf(s.data(), s.size()); - -// Read-only -capy::const_buffer cbuf(s.data(), s.size()); ----- - -=== From std::vector - -[source,cpp] ----- -std::vector vec(1024); -capy::mutable_buffer buf(vec.data(), vec.size()); ----- - -== Buffer Sequences - -I/O operations accept sequences of buffers for scatter/gather I/O: - -=== Single Buffer - -A single buffer is a valid buffer sequence: - -[source,cpp] ----- -capy::mutable_buffer buf(data, size); -co_await sock.read_some(buf); // Works directly ----- - -=== Multiple Buffers - -Use arrays or vectors of buffers: - -[source,cpp] ----- -// Array of buffers -std::array bufs = { - capy::mutable_buffer(header, header_size), - capy::mutable_buffer(body, body_size) -}; -co_await sock.read_some(bufs); - -// Vector of buffers -std::vector send_bufs; -send_bufs.push_back(capy::const_buffer(header.data(), header.size())); -send_bufs.push_back(capy::const_buffer(body.data(), body.size())); -co_await sock.write_some(send_bufs); ----- - -== Buffer Sequence Concepts - -Corosio uses concepts from Capy: - -[source,cpp] ----- -// Readable buffers (for writing to sockets) -template -auto write_some(ConstBufferSequence const& buffers); - -// Writable buffers (for reading from sockets) -template -auto read_some(MutableBufferSequence const& buffers); ----- - -A type satisfies these concepts if it's iterable and yields buffer types. - -== Buffer Size - -Get the total size of a buffer sequence: - -[source,cpp] ----- -std::array bufs = {...}; -std::size_t total = capy::buffer_size(bufs); ----- - -== consuming_buffers - -The `consuming_buffers` wrapper tracks progress through a buffer sequence: - -[source,cpp] ----- -#include - -std::array bufs = { - capy::mutable_buffer(header, 16), - capy::mutable_buffer(body, 1024) -}; - -corosio::consuming_buffers consuming(bufs); - -// After reading 20 bytes: -auto [ec, n] = co_await sock.read_some(consuming); -consuming.consume(n); // Advance by bytes read - -// Now consuming represents the remaining unread portion ----- - -This is used internally by `read()` and `write()` but can be used directly. - -== any_bufref - -The `any_bufref` class type-erases buffer sequences: - -[source,cpp] ----- -#include - -void accept_any_buffer(corosio::any_bufref bufref) -{ - capy::mutable_buffer temp[8]; - std::size_t n = bufref.copy_to(temp, 8); - // Use temp[0..n-1] -} - -// Works with any buffer sequence -std::array bufs = {...}; -accept_any_buffer(corosio::any_bufref(bufs)); ----- - -This enables non-templated code to work with any buffer type. - -== Memory Safety - -=== Lifetime - -Buffers don't own memory. The underlying storage must outlive the I/O -operation: - -[source,cpp] ----- -// WRONG: buffer outlives string -capy::task bad_example(corosio::socket& sock) -{ - capy::const_buffer buf; - { - std::string temp = "Hello"; - buf = capy::const_buffer(temp.data(), temp.size()); - } // temp destroyed here! - - co_await sock.write_some(buf); // Undefined behavior -} - -// CORRECT: keep storage alive -capy::task good_example(corosio::socket& sock) -{ - std::string msg = "Hello"; - co_await sock.write_some( - capy::const_buffer(msg.data(), msg.size())); -} ----- - -=== String Invalidation - -Be careful when using `std::string` as buffer storage: - -[source,cpp] ----- -std::string s = "Hello"; -capy::mutable_buffer buf(s.data(), s.size()); - -s += " World"; // May reallocate, invalidating buf! - -// Use buf here: UNDEFINED BEHAVIOR ----- - -Either: - -* Reserve sufficient capacity upfront -* Don't modify the string while the buffer is in use -* Create a new buffer after modification - -== Scatter/Gather I/O - -Multiple buffers can be used for efficient scatter/gather operations: - -=== Reading into Multiple Buffers (Scatter) - -[source,cpp] ----- -struct message_header { ... }; -char body[1024]; - -std::array read_bufs = { - capy::mutable_buffer(&header, sizeof(header)), - capy::mutable_buffer(body, sizeof(body)) -}; - -auto [ec, n] = co_await sock.read_some(read_bufs); -// Data fills header first, then body ----- - -=== Writing from Multiple Buffers (Gather) - -[source,cpp] ----- -std::string header = "HTTP/1.1 200 OK\r\n\r\n"; -std::string body = "Hello, World!"; - -std::array write_bufs = { - capy::const_buffer(header.data(), header.size()), - capy::const_buffer(body.data(), body.size()) -}; - -auto [ec, n] = co_await sock.write_some(write_bufs); -// Sends header followed by body in a single operation ----- - -== Example: Reading a Fixed-Size Header - -[source,cpp] ----- -struct packet_header -{ - std::uint32_t magic; - std::uint32_t length; -}; - -capy::task read_header(corosio::io_stream& stream) -{ - packet_header header; - auto [ec, n] = co_await corosio::read( - stream, capy::mutable_buffer(&header, sizeof(header))); - - if (ec) - throw boost::system::system_error(ec); - - return header; -} ----- - -== Next Steps - -* xref:composed-operations.adoc[Composed Operations] — Using buffers with read/write -* xref:sockets.adoc[Sockets] — Socket I/O operations -* xref:../tutorials/echo-server.adoc[Echo Server Tutorial] — Practical usage +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += Buffer Sequences + +Corosio I/O operations work with buffer sequences from Boost.Capy. This page +explains how to use buffers effectively. + +NOTE: Code snippets assume: +[source,cpp] +---- +#include +namespace capy = boost::capy; +---- + +== Buffer Types + +=== mutable_buffer + +A writable region of memory: + +[source,cpp] +---- +char data[1024]; +capy::mutable_buffer buf(data, sizeof(data)); +---- + +=== const_buffer + +A read-only region of memory: + +[source,cpp] +---- +std::string msg = "Hello"; +capy::const_buffer buf(msg.data(), msg.size()); +---- + +== Creating Buffers + +=== From Raw Arrays + +[source,cpp] +---- +char data[1024]; +capy::mutable_buffer mbuf(data, sizeof(data)); + +const char* str = "Hello"; +capy::const_buffer cbuf(str, 5); +---- + +=== From std::string + +[source,cpp] +---- +std::string s = "Hello, World!"; + +// Writable (be careful with string invalidation) +capy::mutable_buffer mbuf(s.data(), s.size()); + +// Read-only +capy::const_buffer cbuf(s.data(), s.size()); +---- + +=== From std::vector + +[source,cpp] +---- +std::vector vec(1024); +capy::mutable_buffer buf(vec.data(), vec.size()); +---- + +== Buffer Sequences + +I/O operations accept sequences of buffers for scatter/gather I/O: + +=== Single Buffer + +A single buffer is a valid buffer sequence: + +[source,cpp] +---- +capy::mutable_buffer buf(data, size); +co_await sock.read_some(buf); // Works directly +---- + +=== Multiple Buffers + +Use arrays or vectors of buffers: + +[source,cpp] +---- +// Array of buffers +std::array bufs = { + capy::mutable_buffer(header, header_size), + capy::mutable_buffer(body, body_size) +}; +co_await sock.read_some(bufs); + +// Vector of buffers +std::vector send_bufs; +send_bufs.push_back(capy::const_buffer(header.data(), header.size())); +send_bufs.push_back(capy::const_buffer(body.data(), body.size())); +co_await sock.write_some(send_bufs); +---- + +== Buffer Sequence Concepts + +Corosio uses concepts from Capy: + +[source,cpp] +---- +// Readable buffers (for writing to sockets) +template +auto write_some(ConstBufferSequence const& buffers); + +// Writable buffers (for reading from sockets) +template +auto read_some(MutableBufferSequence const& buffers); +---- + +A type satisfies these concepts if it's iterable and yields buffer types. + +== Buffer Size + +Get the total size of a buffer sequence: + +[source,cpp] +---- +std::array bufs = {...}; +std::size_t total = capy::buffer_size(bufs); +---- + +== consuming_buffers + +The `consuming_buffers` wrapper tracks progress through a buffer sequence: + +[source,cpp] +---- +#include + +std::array bufs = { + capy::mutable_buffer(header, 16), + capy::mutable_buffer(body, 1024) +}; + +corosio::consuming_buffers consuming(bufs); + +// After reading 20 bytes: +auto [ec, n] = co_await sock.read_some(consuming); +consuming.consume(n); // Advance by bytes read + +// Now consuming represents the remaining unread portion +---- + +This is used internally by `read()` and `write()` but can be used directly. + +== buffer_param + +The `buffer_param` class type-erases buffer sequences: + +[source,cpp] +---- +#include + +void accept_any_buffer(capy::buffer_param buffers) +{ + capy::mutable_buffer temp[8]; + std::size_t n = buffers.copy_to(temp, 8); + // Use temp[0..n-1] +} + +// Works with any buffer sequence (implicit conversion) +std::array bufs = {...}; +accept_any_buffer(bufs); +---- + +This enables non-templated code to work with any buffer type. + +== Memory Safety + +=== Lifetime + +Buffers don't own memory. The underlying storage must outlive the I/O +operation: + +[source,cpp] +---- +// WRONG: buffer outlives string +capy::task bad_example(corosio::socket& sock) +{ + capy::const_buffer buf; + { + std::string temp = "Hello"; + buf = capy::const_buffer(temp.data(), temp.size()); + } // temp destroyed here! + + co_await sock.write_some(buf); // Undefined behavior +} + +// CORRECT: keep storage alive +capy::task good_example(corosio::socket& sock) +{ + std::string msg = "Hello"; + co_await sock.write_some( + capy::const_buffer(msg.data(), msg.size())); +} +---- + +=== String Invalidation + +Be careful when using `std::string` as buffer storage: + +[source,cpp] +---- +std::string s = "Hello"; +capy::mutable_buffer buf(s.data(), s.size()); + +s += " World"; // May reallocate, invalidating buf! + +// Use buf here: UNDEFINED BEHAVIOR +---- + +Either: + +* Reserve sufficient capacity upfront +* Don't modify the string while the buffer is in use +* Create a new buffer after modification + +== Scatter/Gather I/O + +Multiple buffers can be used for efficient scatter/gather operations: + +=== Reading into Multiple Buffers (Scatter) + +[source,cpp] +---- +struct message_header { ... }; +char body[1024]; + +std::array read_bufs = { + capy::mutable_buffer(&header, sizeof(header)), + capy::mutable_buffer(body, sizeof(body)) +}; + +auto [ec, n] = co_await sock.read_some(read_bufs); +// Data fills header first, then body +---- + +=== Writing from Multiple Buffers (Gather) + +[source,cpp] +---- +std::string header = "HTTP/1.1 200 OK\r\n\r\n"; +std::string body = "Hello, World!"; + +std::array write_bufs = { + capy::const_buffer(header.data(), header.size()), + capy::const_buffer(body.data(), body.size()) +}; + +auto [ec, n] = co_await sock.write_some(write_bufs); +// Sends header followed by body in a single operation +---- + +== Example: Reading a Fixed-Size Header + +[source,cpp] +---- +struct packet_header +{ + std::uint32_t magic; + std::uint32_t length; +}; + +capy::task read_header(corosio::io_stream& stream) +{ + packet_header header; + auto [ec, n] = co_await corosio::read( + stream, capy::mutable_buffer(&header, sizeof(header))); + + if (ec) + throw boost::system::system_error(ec); + + return header; +} +---- + +== Next Steps + +* xref:composed-operations.adoc[Composed Operations] — Using buffers with read/write +* xref:sockets.adoc[Sockets] — Socket I/O operations +* xref:../tutorials/echo-server.adoc[Echo Server Tutorial] — Practical usage diff --git a/doc/modules/ROOT/pages/reference/design-rationale.adoc b/doc/modules/ROOT/pages/reference/design-rationale.adoc index bc1885c..dc8c389 100644 --- a/doc/modules/ROOT/pages/reference/design-rationale.adoc +++ b/doc/modules/ROOT/pages/reference/design-rationale.adoc @@ -1,290 +1,290 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -= Design Rationale - -This page explains the key design decisions in Corosio and the trade-offs -considered. - -== Coroutine-First Design - -=== Decision - -Every I/O operation returns an awaitable. There is no callback-based API. - -=== Rationale - -* **Simplicity**: One interface, not two parallel APIs -* **Optimal codegen**: No compatibility layer between callbacks and coroutines -* **Natural error handling**: Structured bindings and exceptions work directly -* **Composability**: Awaitables compose with standard coroutine patterns - -=== Trade-off - -Users without C++20 coroutine support cannot use the library. This is -intentional—Corosio targets modern C++ exclusively. - -== Affine Awaitable Protocol - -=== Decision - -Executor affinity propagates through `await_suspend` parameters rather than -thread-local storage or coroutine promise members. - -=== Rationale - -* **Explicit data flow**: The dispatcher visibly flows through the code -* **No hidden state**: No surprises from thread-local or global state -* **Compatibility**: Works with any coroutine framework that calls `await_suspend` -* **Efficient**: Symmetric transfer works automatically when appropriate - -=== Trade-off - -Implementing affine awaitables requires additional `await_suspend` overloads. -The complexity is contained in the library; users just `co_await`. - -== io_result Type - -=== Decision - -Operations return `io_result` which combines an error code with optional -values and supports both structured bindings and exceptions. - -=== Rationale - -* **Flexibility**: Users choose error handling style per-callsite -* **Zero overhead**: No exception overhead when using structured bindings -* **No information loss**: Byte count available even on error -* **Clean syntax**: `auto [ec, n] = co_await ...` is concise - -=== Trade-off - -The `.value()` method name might conflict with users' expectations from -`std::optional` (which throws on empty). Here it throws on error, which is -semantically similar but contextually different. - -== Type-Erased Dispatchers - -=== Decision - -Socket implementations use `capy::any_dispatcher` internally rather than -templating on the executor type. - -=== Rationale - -* **Binary size**: Only one implementation per I/O object -* **Compile time**: No template instantiation explosion -* **Virtual interface**: Enables platform-specific implementations - -=== Trade-off - -Small runtime overhead from type erasure. For I/O-bound code, this is -negligible compared to actual I/O latency (microseconds vs. nanoseconds). - -== Inheritance Hierarchy - -=== Decision - -`socket` inherits from `io_stream` which inherits from `io_object`. - ----- -io_object - ├── acceptor - ├── resolver - ├── timer - ├── signal_set - └── io_stream - ├── socket - └── wolfssl_stream ----- - -=== Rationale - -* **Polymorphism**: Code accepting `io_stream&` works with any stream type -* **Code reuse**: `read()` and `write()` free functions work with all streams -* **Future extensibility**: New stream types fit naturally - -=== Trade-off - -Virtual function overhead for `read_some()`/`write_some()`. Acceptable -because I/O operations are inherently expensive. - -== Buffer Type Erasure (any_bufref) - -=== Decision - -Buffer sequences are type-erased at the I/O boundary using `any_bufref`. - -=== Rationale - -* **Non-template implementations**: Scheduler and I/O objects aren't templates -* **ABI stability**: Buffer types can change without recompilation -* **Reduced binary size**: Single implementation handles all buffer types - -=== Trade-off - -One level of indirection when copying buffer descriptors. The copy is into -a small fixed-size array, so overhead is minimal. - -== consuming_buffers for Composed Operations - -=== Decision - -The `read()` and `write()` composed operations use `consuming_buffers` to -track progress through buffer sequences. - -=== Rationale - -* **Efficiency**: Avoids copying buffer sequences -* **Correctness**: Handles partial reads/writes across multiple buffers -* **Reusability**: Can be used directly by advanced users - -=== Trade-off - -More complex than repeatedly constructing sub-buffers, but more efficient -for multi-buffer sequences. - -== Separate open() and connect() - -=== Decision - -Sockets require explicit `open()` before `connect()`. - -=== Rationale - -* **Explicit resource management**: Clear when system resources are allocated -* **Error handling**: Open errors distinct from connect errors -* **Consistency**: Matches acceptor pattern (explicit `listen()`) - -=== Trade-off - -Two calls instead of one. A `connect(endpoint)` overload that opens -automatically could be added if users prefer. - -== Move-Only I/O Objects - -=== Decision - -Sockets, timers, and other I/O objects are move-only. - -=== Rationale - -* **Ownership semantics**: I/O objects own system resources -* **No accidental copies**: Prevents resource leaks -* **Efficient transfer**: Moving is cheap (pointer swap) - -=== Trade-off - -Cannot store in containers that require copyability. Use `std::unique_ptr` -or move-aware containers. - -== Context-Locked Move Assignment - -=== Decision - -Moving an I/O object to another with a different execution context throws. - -=== Rationale - -* **Safety**: Prevents dangling references to old context's services -* **Simplicity**: No need for detach/reattach mechanism - -=== Trade-off - -Cannot move objects between contexts. Create new objects instead. - -== Platform-Specific Backends - -=== Decision - -Windows uses IOCP directly. Linux will use io_uring. macOS will use kqueue. - -=== Rationale - -* **Performance**: Native backends are fastest -* **Scalability**: Platform-optimized for thousands of connections -* **Features**: Full access to platform capabilities - -=== Trade-off - -More implementation work per platform. Epoll fallback could be added for -broader Linux compatibility. - -== WolfSSL for TLS - -=== Decision - -TLS is provided through WolfSSL rather than OpenSSL. - -=== Rationale - -* **Small footprint**: WolfSSL is more compact -* **Clean API**: Modern C++ friendly -* **Licensing**: Flexible licensing options - -=== Trade-off - -OpenSSL is more widely deployed. Users who need OpenSSL can create their -own stream wrapper following the `io_stream` interface. - -== No UDP (Yet) - -=== Decision - -Only TCP is currently supported. - -=== Rationale - -* **Focus**: TCP covers most use cases -* **Complexity**: UDP requires different abstractions (datagrams vs. streams) -* **Priority**: Get TCP right first - -=== Trade-off - -Users needing UDP must use other libraries. UDP support is planned. - -== Single-Header Include - -=== Decision - -`` includes core functionality but not everything. - -=== Rationale - -* **Convenience**: Easy to get started -* **Control**: Advanced headers included explicitly -* **Compile time**: Full include not excessive - -The main header includes: - -* io_context -* socket -* endpoint -* resolver -* read/write - -Not included (explicit include required): - -* acceptor -* timer -* signal_set -* wolfssl_stream -* test/mocket - -== Summary - -Corosio's design prioritizes: - -1. **Simplicity**: One way to do things, not two -2. **Performance**: Zero-overhead abstractions where possible -3. **Safety**: Ownership semantics prevent resource leaks -4. **Composability**: Works with standard C++ patterns -5. **Extensibility**: Clean hierarchy for new types - -Trade-offs generally favor correctness and clarity over maximum flexibility. +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += Design Rationale + +This page explains the key design decisions in Corosio and the trade-offs +considered. + +== Coroutine-First Design + +=== Decision + +Every I/O operation returns an awaitable. There is no callback-based API. + +=== Rationale + +* **Simplicity**: One interface, not two parallel APIs +* **Optimal codegen**: No compatibility layer between callbacks and coroutines +* **Natural error handling**: Structured bindings and exceptions work directly +* **Composability**: Awaitables compose with standard coroutine patterns + +=== Trade-off + +Users without C++20 coroutine support cannot use the library. This is +intentional—Corosio targets modern C++ exclusively. + +== Affine Awaitable Protocol + +=== Decision + +Executor affinity propagates through `await_suspend` parameters rather than +thread-local storage or coroutine promise members. + +=== Rationale + +* **Explicit data flow**: The dispatcher visibly flows through the code +* **No hidden state**: No surprises from thread-local or global state +* **Compatibility**: Works with any coroutine framework that calls `await_suspend` +* **Efficient**: Symmetric transfer works automatically when appropriate + +=== Trade-off + +Implementing affine awaitables requires additional `await_suspend` overloads. +The complexity is contained in the library; users just `co_await`. + +== io_result Type + +=== Decision + +Operations return `io_result` which combines an error code with optional +values and supports both structured bindings and exceptions. + +=== Rationale + +* **Flexibility**: Users choose error handling style per-callsite +* **Zero overhead**: No exception overhead when using structured bindings +* **No information loss**: Byte count available even on error +* **Clean syntax**: `auto [ec, n] = co_await ...` is concise + +=== Trade-off + +The `.value()` method name might conflict with users' expectations from +`std::optional` (which throws on empty). Here it throws on error, which is +semantically similar but contextually different. + +== Type-Erased Dispatchers + +=== Decision + +Socket implementations use `capy::any_dispatcher` internally rather than +templating on the executor type. + +=== Rationale + +* **Binary size**: Only one implementation per I/O object +* **Compile time**: No template instantiation explosion +* **Virtual interface**: Enables platform-specific implementations + +=== Trade-off + +Small runtime overhead from type erasure. For I/O-bound code, this is +negligible compared to actual I/O latency (microseconds vs. nanoseconds). + +== Inheritance Hierarchy + +=== Decision + +`socket` inherits from `io_stream` which inherits from `io_object`. + +---- +io_object + ├── acceptor + ├── resolver + ├── timer + ├── signal_set + └── io_stream + ├── socket + └── wolfssl_stream +---- + +=== Rationale + +* **Polymorphism**: Code accepting `io_stream&` works with any stream type +* **Code reuse**: `read()` and `write()` free functions work with all streams +* **Future extensibility**: New stream types fit naturally + +=== Trade-off + +Virtual function overhead for `read_some()`/`write_some()`. Acceptable +because I/O operations are inherently expensive. + +== Buffer Type Erasure (buffer_param) + +=== Decision + +Buffer sequences are type-erased at the I/O boundary using `buffer_param`. + +=== Rationale + +* **Non-template implementations**: Scheduler and I/O objects aren't templates +* **ABI stability**: Buffer types can change without recompilation +* **Reduced binary size**: Single implementation handles all buffer types + +=== Trade-off + +One level of indirection when copying buffer descriptors. The copy is into +a small fixed-size array, so overhead is minimal. + +== consuming_buffers for Composed Operations + +=== Decision + +The `read()` and `write()` composed operations use `consuming_buffers` to +track progress through buffer sequences. + +=== Rationale + +* **Efficiency**: Avoids copying buffer sequences +* **Correctness**: Handles partial reads/writes across multiple buffers +* **Reusability**: Can be used directly by advanced users + +=== Trade-off + +More complex than repeatedly constructing sub-buffers, but more efficient +for multi-buffer sequences. + +== Separate open() and connect() + +=== Decision + +Sockets require explicit `open()` before `connect()`. + +=== Rationale + +* **Explicit resource management**: Clear when system resources are allocated +* **Error handling**: Open errors distinct from connect errors +* **Consistency**: Matches acceptor pattern (explicit `listen()`) + +=== Trade-off + +Two calls instead of one. A `connect(endpoint)` overload that opens +automatically could be added if users prefer. + +== Move-Only I/O Objects + +=== Decision + +Sockets, timers, and other I/O objects are move-only. + +=== Rationale + +* **Ownership semantics**: I/O objects own system resources +* **No accidental copies**: Prevents resource leaks +* **Efficient transfer**: Moving is cheap (pointer swap) + +=== Trade-off + +Cannot store in containers that require copyability. Use `std::unique_ptr` +or move-aware containers. + +== Context-Locked Move Assignment + +=== Decision + +Moving an I/O object to another with a different execution context throws. + +=== Rationale + +* **Safety**: Prevents dangling references to old context's services +* **Simplicity**: No need for detach/reattach mechanism + +=== Trade-off + +Cannot move objects between contexts. Create new objects instead. + +== Platform-Specific Backends + +=== Decision + +Windows uses IOCP directly. Linux will use io_uring. macOS will use kqueue. + +=== Rationale + +* **Performance**: Native backends are fastest +* **Scalability**: Platform-optimized for thousands of connections +* **Features**: Full access to platform capabilities + +=== Trade-off + +More implementation work per platform. Epoll fallback could be added for +broader Linux compatibility. + +== WolfSSL for TLS + +=== Decision + +TLS is provided through WolfSSL rather than OpenSSL. + +=== Rationale + +* **Small footprint**: WolfSSL is more compact +* **Clean API**: Modern C++ friendly +* **Licensing**: Flexible licensing options + +=== Trade-off + +OpenSSL is more widely deployed. Users who need OpenSSL can create their +own stream wrapper following the `io_stream` interface. + +== No UDP (Yet) + +=== Decision + +Only TCP is currently supported. + +=== Rationale + +* **Focus**: TCP covers most use cases +* **Complexity**: UDP requires different abstractions (datagrams vs. streams) +* **Priority**: Get TCP right first + +=== Trade-off + +Users needing UDP must use other libraries. UDP support is planned. + +== Single-Header Include + +=== Decision + +`` includes core functionality but not everything. + +=== Rationale + +* **Convenience**: Easy to get started +* **Control**: Advanced headers included explicitly +* **Compile time**: Full include not excessive + +The main header includes: + +* io_context +* socket +* endpoint +* resolver +* read/write + +Not included (explicit include required): + +* acceptor +* timer +* signal_set +* wolfssl_stream +* test/mocket + +== Summary + +Corosio's design prioritizes: + +1. **Simplicity**: One way to do things, not two +2. **Performance**: Zero-overhead abstractions where possible +3. **Safety**: Ownership semantics prevent resource leaks +4. **Composability**: Works with standard C++ patterns +5. **Extensibility**: Clean hierarchy for new types + +Trade-offs generally favor correctness and clarity over maximum flexibility. diff --git a/doc/modules/ROOT/pages/reference/glossary.adoc b/doc/modules/ROOT/pages/reference/glossary.adoc index ad5b191..b136272 100644 --- a/doc/modules/ROOT/pages/reference/glossary.adoc +++ b/doc/modules/ROOT/pages/reference/glossary.adoc @@ -1,277 +1,277 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -= Glossary - -This glossary defines terms used throughout the Corosio documentation. - -== A - -Acceptor:: -An I/O object that listens for and accepts incoming TCP connections. See -`corosio::acceptor` and xref:../guide/acceptor.adoc[Acceptors Guide]. - -Affine Awaitable:: -An awaitable type that implements the affine protocol, receiving a dispatcher -parameter in `await_suspend` to ensure correct executor affinity. - -Affinity:: -The binding of a coroutine to a specific executor. A coroutine with affinity -to executor `ex` will have all its resumptions dispatched through `ex`. - -any_bufref:: -A type-erased reference to a buffer sequence, allowing non-template code to -work with any buffer type. - -any_dispatcher:: -A type-erased wrapper for dispatchers, enabling runtime polymorphism for -executor types. - -Awaitable:: -A type that can be used with `co_await`. Must provide `await_ready()`, -`await_suspend()`, and `await_resume()` methods. - -== B - -Buffer:: -A contiguous region of memory. See `capy::mutable_buffer` (writable) and -`capy::const_buffer` (read-only). - -Buffer Sequence:: -A collection of buffers that can be iterated. Enables scatter/gather I/O -with a single operation. - -== C - -Cancellation:: -The ability to abort an in-progress asynchronous operation. Operations -complete with an error code indicating cancellation. - -Completion:: -The event when an asynchronous operation finishes, either successfully or -with an error. - -Composed Operation:: -An operation built from multiple primitive operations. For example, -`corosio::read()` repeatedly calls `read_some()` until the buffer is full. - -Concurrency Hint:: -A value passed to `io_context` indicating how many threads may call `run()`. -Affects internal synchronization strategy. - -Continuation:: -The code that runs after an asynchronous operation completes. In coroutines, -this is the code following `co_await`. - -Coroutine:: -A function that can suspend and resume execution. Uses `co_await`, `co_yield`, -or `co_return` keywords. - -Coroutine Handle:: -A low-level handle to a suspended coroutine, represented by -`std::coroutine_handle<>`. - -== D - -Dispatcher:: -An object that can dispatch coroutine handles for execution. Satisfies the -`capy::dispatcher` concept. - -== E - -Endpoint:: -A combination of IP address and port number identifying a network destination. -See `corosio::endpoint`. - -EOF (End of File/Stream):: -A condition indicating no more data is available. Signaled by -`capy::error::eof`. - -Error Category:: -A grouping of related error codes. Boost.System uses categories to -distinguish different error sources. - -Error Code:: -A lightweight error indicator. See `boost::system::error_code`. - -Error Condition:: -A portable error classification. Enables comparing errors across categories. - -Execution Context:: -An environment where work runs. Provides service management and an executor. -See `capy::execution_context`. - -Executor:: -An object that can dispatch work for execution. Satisfies the `capy::executor` -concept. - -== H - -Handle:: -A reference to a system resource (socket, file, etc.) managed by the -operating system. - -Handler:: -A callback function or coroutine handle that runs when an operation completes. - -== I - -I/O Context:: -The main event loop in Corosio. Processes asynchronous operations and -dispatches completions. See `corosio::io_context`. - -I/O Object:: -A class representing an I/O resource (socket, timer, etc.). Base class is -`corosio::io_object`. - -I/O Stream:: -An I/O object that supports reading and writing data. Base class is -`corosio::io_stream`. - -io_result:: -The result type for Corosio operations, containing an error code and optional -values. Supports structured bindings. - -IOCP (I/O Completion Ports):: -Windows kernel mechanism for scalable asynchronous I/O. Used by Corosio on -Windows. - -== L - -Lazy:: -A coroutine or operation that doesn't start until explicitly triggered. -`capy::task` is lazy—it starts when awaited. - -== M - -Mocket:: -A mock socket for testing. See `corosio::test::mocket`. - -Move-Only:: -A type that can be moved but not copied. Sockets and other I/O objects are -move-only. - -== O - -Operation:: -An asynchronous action that completes in the future. Returns an awaitable -that can be `co_await`ed. - -Outstanding Work:: -Work tracked by an I/O context. The context's `run()` continues while -outstanding work exists. - -== P - -Platform Backend:: -The operating system-specific implementation (IOCP, io_uring, kqueue). - -Poll:: -Processing ready work without blocking. See `io_context::poll()`. - -Post:: -Queuing work for later execution. See `executor_type::post()`. - -Primitive Operation:: -A basic I/O operation like `read_some()` or `write_some()` that may transfer -any amount of data. - -== R - -Resolver:: -An I/O object that performs DNS lookups. See `corosio::resolver`. - -Resume:: -Continuing execution of a suspended coroutine. - -Run:: -Processing the event loop. See `io_context::run()`. - -== S - -Scatter/Gather I/O:: -Reading into or writing from multiple non-contiguous buffers in a single -operation. - -Service:: -A polymorphic component owned by an execution context. Provides shared -functionality. - -Signal Set:: -An I/O object that waits for operating system signals. See -`corosio::signal_set`. - -Socket:: -An I/O object for TCP network communication. See `corosio::socket`. - -Stop Token:: -A mechanism for requesting cancellation. See `std::stop_token`. - -Strand:: -A serialization mechanism that ensures handlers don't run concurrently. -Operations posted to a strand execute one at a time, eliminating data -races without mutexes. See xref:../guide/concurrent-programming.adoc[Concurrent Programming]. - -Stream:: -A sequence of bytes that can be read or written incrementally. - -Structured Bindings:: -C++17 feature for unpacking tuple-like types: `auto [a, b] = expr;` - -Suspend:: -Pausing execution of a coroutine, yielding control to the caller. - -Symmetric Transfer:: -A tail-call optimization where one coroutine directly resumes another without -stack growth. - -== T - -Task:: -A lazy coroutine that produces a value. See `capy::task`. - -TCP Server:: -A framework class for building TCP servers with worker pools. See -`corosio::tcp_server` and xref:../guide/tcp-server.adoc[TCP Server Guide]. - -Thread Safety:: -The ability to use an object safely from multiple threads. Individual I/O -objects are generally not thread-safe. - -Timer:: -An I/O object for scheduling delays. See `corosio::timer`. - -TLS (Transport Layer Security):: -Cryptographic protocol for secure communication. See `corosio::wolfssl_stream`. - -Type Erasure:: -Hiding concrete types behind an abstract interface. Enables runtime -polymorphism without templates. - -== W - -Wait:: -An operation that suspends until a condition is met (timer expires, signal -received, etc.). - -WolfSSL:: -A compact TLS library used by Corosio for secure streams. - -Work:: -Pending operations that keep an I/O context running. - -Worker Pool:: -A design pattern where a fixed number of worker objects are preallocated -to handle connections. Provides bounded resource usage and avoids allocation -during operation. See xref:../guide/tcp-server.adoc[TCP Server]. - -== See Also - -* xref:design-rationale.adoc[Design Rationale] — Why Corosio is designed this way -* xref:../concepts/affine-awaitables.adoc[Affine Awaitables] — The dispatch protocol +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += Glossary + +This glossary defines terms used throughout the Corosio documentation. + +== A + +Acceptor:: +An I/O object that listens for and accepts incoming TCP connections. See +`corosio::acceptor` and xref:../guide/acceptor.adoc[Acceptors Guide]. + +Affine Awaitable:: +An awaitable type that implements the affine protocol, receiving a dispatcher +parameter in `await_suspend` to ensure correct executor affinity. + +Affinity:: +The binding of a coroutine to a specific executor. A coroutine with affinity +to executor `ex` will have all its resumptions dispatched through `ex`. + +buffer_param:: +A type-erased buffer sequence parameter, allowing non-template code to +work with any buffer type. + +any_dispatcher:: +A type-erased wrapper for dispatchers, enabling runtime polymorphism for +executor types. + +Awaitable:: +A type that can be used with `co_await`. Must provide `await_ready()`, +`await_suspend()`, and `await_resume()` methods. + +== B + +Buffer:: +A contiguous region of memory. See `capy::mutable_buffer` (writable) and +`capy::const_buffer` (read-only). + +Buffer Sequence:: +A collection of buffers that can be iterated. Enables scatter/gather I/O +with a single operation. + +== C + +Cancellation:: +The ability to abort an in-progress asynchronous operation. Operations +complete with an error code indicating cancellation. + +Completion:: +The event when an asynchronous operation finishes, either successfully or +with an error. + +Composed Operation:: +An operation built from multiple primitive operations. For example, +`corosio::read()` repeatedly calls `read_some()` until the buffer is full. + +Concurrency Hint:: +A value passed to `io_context` indicating how many threads may call `run()`. +Affects internal synchronization strategy. + +Continuation:: +The code that runs after an asynchronous operation completes. In coroutines, +this is the code following `co_await`. + +Coroutine:: +A function that can suspend and resume execution. Uses `co_await`, `co_yield`, +or `co_return` keywords. + +Coroutine Handle:: +A low-level handle to a suspended coroutine, represented by +`std::coroutine_handle<>`. + +== D + +Dispatcher:: +An object that can dispatch coroutine handles for execution. Satisfies the +`capy::dispatcher` concept. + +== E + +Endpoint:: +A combination of IP address and port number identifying a network destination. +See `corosio::endpoint`. + +EOF (End of File/Stream):: +A condition indicating no more data is available. Signaled by +`capy::error::eof`. + +Error Category:: +A grouping of related error codes. Boost.System uses categories to +distinguish different error sources. + +Error Code:: +A lightweight error indicator. See `boost::system::error_code`. + +Error Condition:: +A portable error classification. Enables comparing errors across categories. + +Execution Context:: +An environment where work runs. Provides service management and an executor. +See `capy::execution_context`. + +Executor:: +An object that can dispatch work for execution. Satisfies the `capy::executor` +concept. + +== H + +Handle:: +A reference to a system resource (socket, file, etc.) managed by the +operating system. + +Handler:: +A callback function or coroutine handle that runs when an operation completes. + +== I + +I/O Context:: +The main event loop in Corosio. Processes asynchronous operations and +dispatches completions. See `corosio::io_context`. + +I/O Object:: +A class representing an I/O resource (socket, timer, etc.). Base class is +`corosio::io_object`. + +I/O Stream:: +An I/O object that supports reading and writing data. Base class is +`corosio::io_stream`. + +io_result:: +The result type for Corosio operations, containing an error code and optional +values. Supports structured bindings. + +IOCP (I/O Completion Ports):: +Windows kernel mechanism for scalable asynchronous I/O. Used by Corosio on +Windows. + +== L + +Lazy:: +A coroutine or operation that doesn't start until explicitly triggered. +`capy::task` is lazy—it starts when awaited. + +== M + +Mocket:: +A mock socket for testing. See `corosio::test::mocket`. + +Move-Only:: +A type that can be moved but not copied. Sockets and other I/O objects are +move-only. + +== O + +Operation:: +An asynchronous action that completes in the future. Returns an awaitable +that can be `co_await`ed. + +Outstanding Work:: +Work tracked by an I/O context. The context's `run()` continues while +outstanding work exists. + +== P + +Platform Backend:: +The operating system-specific implementation (IOCP, io_uring, kqueue). + +Poll:: +Processing ready work without blocking. See `io_context::poll()`. + +Post:: +Queuing work for later execution. See `executor_type::post()`. + +Primitive Operation:: +A basic I/O operation like `read_some()` or `write_some()` that may transfer +any amount of data. + +== R + +Resolver:: +An I/O object that performs DNS lookups. See `corosio::resolver`. + +Resume:: +Continuing execution of a suspended coroutine. + +Run:: +Processing the event loop. See `io_context::run()`. + +== S + +Scatter/Gather I/O:: +Reading into or writing from multiple non-contiguous buffers in a single +operation. + +Service:: +A polymorphic component owned by an execution context. Provides shared +functionality. + +Signal Set:: +An I/O object that waits for operating system signals. See +`corosio::signal_set`. + +Socket:: +An I/O object for TCP network communication. See `corosio::socket`. + +Stop Token:: +A mechanism for requesting cancellation. See `std::stop_token`. + +Strand:: +A serialization mechanism that ensures handlers don't run concurrently. +Operations posted to a strand execute one at a time, eliminating data +races without mutexes. See xref:../guide/concurrent-programming.adoc[Concurrent Programming]. + +Stream:: +A sequence of bytes that can be read or written incrementally. + +Structured Bindings:: +C++17 feature for unpacking tuple-like types: `auto [a, b] = expr;` + +Suspend:: +Pausing execution of a coroutine, yielding control to the caller. + +Symmetric Transfer:: +A tail-call optimization where one coroutine directly resumes another without +stack growth. + +== T + +Task:: +A lazy coroutine that produces a value. See `capy::task`. + +TCP Server:: +A framework class for building TCP servers with worker pools. See +`corosio::tcp_server` and xref:../guide/tcp-server.adoc[TCP Server Guide]. + +Thread Safety:: +The ability to use an object safely from multiple threads. Individual I/O +objects are generally not thread-safe. + +Timer:: +An I/O object for scheduling delays. See `corosio::timer`. + +TLS (Transport Layer Security):: +Cryptographic protocol for secure communication. See `corosio::wolfssl_stream`. + +Type Erasure:: +Hiding concrete types behind an abstract interface. Enables runtime +polymorphism without templates. + +== W + +Wait:: +An operation that suspends until a condition is met (timer expires, signal +received, etc.). + +WolfSSL:: +A compact TLS library used by Corosio for secure streams. + +Work:: +Pending operations that keep an I/O context running. + +Worker Pool:: +A design pattern where a fixed number of worker objects are preallocated +to handle connections. Provides bounded resource usage and avoids allocation +during operation. See xref:../guide/tcp-server.adoc[TCP Server]. + +== See Also + +* xref:design-rationale.adoc[Design Rationale] — Why Corosio is designed this way +* xref:../concepts/affine-awaitables.adoc[Affine Awaitables] — The dispatch protocol diff --git a/include/boost/corosio/io_stream.hpp b/include/boost/corosio/io_stream.hpp index 1ac3372..d5807ab 100644 --- a/include/boost/corosio/io_stream.hpp +++ b/include/boost/corosio/io_stream.hpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include @@ -129,8 +129,7 @@ class BOOST_COROSIO_DECL io_stream : public io_object std::coroutine_handle<> h, Ex const& ex) -> std::coroutine_handle<> { - capy::any_bufref param(buffers_); - ios_.get().read_some(h, ex, param, token_, &ec_, &bytes_transferred_); + ios_.get().read_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_); return std::noop_coroutine(); } @@ -141,8 +140,7 @@ class BOOST_COROSIO_DECL io_stream : public io_object std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); - capy::any_bufref param(buffers_); - ios_.get().read_some(h, ex, param, token_, &ec_, &bytes_transferred_); + ios_.get().read_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_); return std::noop_coroutine(); } }; @@ -181,8 +179,7 @@ class BOOST_COROSIO_DECL io_stream : public io_object std::coroutine_handle<> h, Ex const& ex) -> std::coroutine_handle<> { - capy::any_bufref param(buffers_); - ios_.get().write_some(h, ex, param, token_, &ec_, &bytes_transferred_); + ios_.get().write_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_); return std::noop_coroutine(); } @@ -193,8 +190,7 @@ class BOOST_COROSIO_DECL io_stream : public io_object std::stop_token token) -> std::coroutine_handle<> { token_ = std::move(token); - capy::any_bufref param(buffers_); - ios_.get().write_some(h, ex, param, token_, &ec_, &bytes_transferred_); + ios_.get().write_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_); return std::noop_coroutine(); } }; @@ -205,7 +201,7 @@ class BOOST_COROSIO_DECL io_stream : public io_object virtual void read_some( std::coroutine_handle<>, capy::any_executor_ref, - capy::any_bufref&, + capy::buffer_param, std::stop_token, system::error_code*, std::size_t*) = 0; @@ -213,7 +209,7 @@ class BOOST_COROSIO_DECL io_stream : public io_object virtual void write_some( std::coroutine_handle<>, capy::any_executor_ref, - capy::any_bufref&, + capy::buffer_param, std::stop_token, system::error_code*, std::size_t*) = 0; diff --git a/include/boost/corosio/read.hpp b/include/boost/corosio/read.hpp index 2a3a0fd..5e9759d 100644 --- a/include/boost/corosio/read.hpp +++ b/include/boost/corosio/read.hpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/include/boost/corosio/socket.hpp b/include/boost/corosio/socket.hpp index 4b86ef4..fbf95e8 100644 --- a/include/boost/corosio/socket.hpp +++ b/include/boost/corosio/socket.hpp @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/include/boost/corosio/write.hpp b/include/boost/corosio/write.hpp index f6e4370..1c4dcbf 100644 --- a/include/boost/corosio/write.hpp +++ b/include/boost/corosio/write.hpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/corosio/src/detail/epoll/sockets.hpp b/src/corosio/src/detail/epoll/sockets.hpp index cce9839..4d6c7ad 100644 --- a/src/corosio/src/detail/epoll/sockets.hpp +++ b/src/corosio/src/detail/epoll/sockets.hpp @@ -124,7 +124,7 @@ class epoll_socket_impl void read_some( std::coroutine_handle<>, capy::any_executor_ref, - capy::any_bufref&, + capy::buffer_param, std::stop_token, system::error_code*, std::size_t*) override; @@ -132,7 +132,7 @@ class epoll_socket_impl void write_some( std::coroutine_handle<>, capy::any_executor_ref, - capy::any_bufref&, + capy::buffer_param, std::stop_token, system::error_code*, std::size_t*) override; @@ -308,7 +308,7 @@ epoll_socket_impl:: read_some( std::coroutine_handle<> h, capy::any_executor_ref d, - capy::any_bufref& param, + capy::buffer_param param, std::stop_token token, system::error_code* ec, std::size_t* bytes_out) @@ -372,7 +372,7 @@ epoll_socket_impl:: write_some( std::coroutine_handle<> h, capy::any_executor_ref d, - capy::any_bufref& param, + capy::buffer_param param, std::stop_token token, system::error_code* ec, std::size_t* bytes_out) diff --git a/src/corosio/src/detail/iocp/sockets.cpp b/src/corosio/src/detail/iocp/sockets.cpp index 6eb2a58..9614292 100644 --- a/src/corosio/src/detail/iocp/sockets.cpp +++ b/src/corosio/src/detail/iocp/sockets.cpp @@ -418,7 +418,7 @@ win_socket_impl_internal:: read_some( capy::any_coro h, capy::any_executor_ref d, - capy::any_bufref& param, + capy::buffer_param param, std::stop_token token, system::error_code* ec, std::size_t* bytes_out) @@ -492,7 +492,7 @@ win_socket_impl_internal:: write_some( capy::any_coro h, capy::any_executor_ref d, - capy::any_bufref& param, + capy::buffer_param param, std::stop_token token, system::error_code* ec, std::size_t* bytes_out) diff --git a/src/corosio/src/detail/iocp/sockets.hpp b/src/corosio/src/detail/iocp/sockets.hpp index 70648b1..336f1c9 100644 --- a/src/corosio/src/detail/iocp/sockets.hpp +++ b/src/corosio/src/detail/iocp/sockets.hpp @@ -1,468 +1,468 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -#ifndef BOOST_COROSIO_DETAIL_IOCP_SOCKETS_HPP -#define BOOST_COROSIO_DETAIL_IOCP_SOCKETS_HPP - -#include "src/detail/config_backend.hpp" - -#if defined(BOOST_COROSIO_BACKEND_IOCP) - -#include -#include -#include -#include -#include -#include -#include - -#include "src/detail/iocp/windows.hpp" -#include "src/detail/iocp/completion_key.hpp" -#include "src/detail/iocp/overlapped_op.hpp" -#include "src/detail/iocp/mutex.hpp" -#include "src/detail/iocp/wsa_init.hpp" - -#include - -#include -#include - -namespace boost { -namespace corosio { -namespace detail { - -class win_scheduler; -class win_sockets; -class win_socket_impl; -class win_socket_impl_internal; -class win_acceptor_impl; -class win_acceptor_impl_internal; - -//------------------------------------------------------------------------------ - -/** Connect operation state. */ -struct connect_op : overlapped_op -{ - win_socket_impl_internal& internal; - std::shared_ptr internal_ptr; // Keeps internal alive during I/O - - explicit connect_op(win_socket_impl_internal& internal_) noexcept : internal(internal_) {} - - void operator()() override; - void do_cancel() noexcept override; -}; - -/** Read operation state with buffer descriptors. */ -struct read_op : overlapped_op -{ - static constexpr std::size_t max_buffers = 16; - WSABUF wsabufs[max_buffers]; - DWORD wsabuf_count = 0; - DWORD flags = 0; - win_socket_impl_internal& internal; - std::shared_ptr internal_ptr; // Keeps internal alive during I/O - - explicit read_op(win_socket_impl_internal& internal_) noexcept : internal(internal_) {} - - void operator()() override; - bool is_read_operation() const noexcept override { return true; } - void do_cancel() noexcept override; -}; - -/** Write operation state with buffer descriptors. */ -struct write_op : overlapped_op -{ - static constexpr std::size_t max_buffers = 16; - WSABUF wsabufs[max_buffers]; - DWORD wsabuf_count = 0; - win_socket_impl_internal& internal; - std::shared_ptr internal_ptr; // Keeps internal alive during I/O - - explicit write_op(win_socket_impl_internal& internal_) noexcept : internal(internal_) {} - - void operator()() override; - void do_cancel() noexcept override; -}; - -/** Accept operation state. */ -struct accept_op : overlapped_op -{ - SOCKET accepted_socket = INVALID_SOCKET; - win_socket_impl* peer_wrapper = nullptr; // Wrapper for accepted socket - std::shared_ptr acceptor_ptr; // Keeps acceptor alive during I/O - SOCKET listen_socket = INVALID_SOCKET; // For SO_UPDATE_ACCEPT_CONTEXT - io_object::io_object_impl** impl_out = nullptr; // Output: wrapper for awaitable - // Buffer for AcceptEx: local + remote addresses - char addr_buf[2 * (sizeof(sockaddr_in6) + 16)]; - - /** Resume the coroutine after accept completes. */ - void operator()() override; - - /** Cancel the pending accept via CancelIoEx. */ - void do_cancel() noexcept override; -}; - -//------------------------------------------------------------------------------ - -/** Internal socket state for IOCP-based I/O. - - This class contains the actual state for a single socket, including - the native socket handle and pending operations. It derives from - enable_shared_from_this so operations can extend its lifetime. - - @note Internal implementation detail. Users interact with socket class. -*/ -class win_socket_impl_internal - : public capy::intrusive_list::node - , public std::enable_shared_from_this -{ - friend class win_sockets; - friend class win_socket_impl; - friend struct read_op; - friend struct write_op; - friend struct connect_op; - - win_sockets& svc_; - connect_op conn_; - read_op rd_; - write_op wr_; - SOCKET socket_ = INVALID_SOCKET; - -public: - explicit win_socket_impl_internal(win_sockets& svc) noexcept; - ~win_socket_impl_internal(); - - void release_internal(); - - void connect( - capy::any_coro, - capy::any_executor_ref, - endpoint, - std::stop_token, - system::error_code*); - - void read_some( - capy::any_coro, - capy::any_executor_ref, - capy::any_bufref&, - std::stop_token, - system::error_code*, - std::size_t*); - - void write_some( - capy::any_coro, - capy::any_executor_ref, - capy::any_bufref&, - std::stop_token, - system::error_code*, - std::size_t*); - - SOCKET native_handle() const noexcept { return socket_; } - bool is_open() const noexcept { return socket_ != INVALID_SOCKET; } - void cancel() noexcept; - void close_socket() noexcept; - void set_socket(SOCKET s) noexcept { socket_ = s; } -}; - -//------------------------------------------------------------------------------ - -/** Socket implementation wrapper for IOCP-based I/O. - - This class is the public-facing socket_impl that holds a shared_ptr - to the internal state. The shared_ptr is hidden from the public interface. - - @note Internal implementation detail. Users interact with socket class. -*/ -class win_socket_impl - : public socket::socket_impl - , public capy::intrusive_list::node -{ - std::shared_ptr internal_; - -public: - explicit win_socket_impl(std::shared_ptr internal) noexcept - : internal_(std::move(internal)) - { - } - - void release() override; - - void connect( - std::coroutine_handle<> h, - capy::any_executor_ref d, - endpoint ep, - std::stop_token token, - system::error_code* ec) override - { - internal_->connect(h, d, ep, token, ec); - } - - void read_some( - std::coroutine_handle<> h, - capy::any_executor_ref d, - capy::any_bufref& buf, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes) override - { - internal_->read_some(h, d, buf, token, ec, bytes); - } - - void write_some( - std::coroutine_handle<> h, - capy::any_executor_ref d, - capy::any_bufref& buf, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes) override - { - internal_->write_some(h, d, buf, token, ec, bytes); - } - - system::error_code shutdown(socket::shutdown_type what) noexcept override - { - int how; - switch (what) - { - case socket::shutdown_receive: how = SD_RECEIVE; break; - case socket::shutdown_send: how = SD_SEND; break; - case socket::shutdown_both: how = SD_BOTH; break; - default: - return make_err(WSAEINVAL); - } - if (::shutdown(internal_->native_handle(), how) != 0) - return make_err(WSAGetLastError()); - return {}; - } - - win_socket_impl_internal* get_internal() const noexcept { return internal_.get(); } -}; - -//------------------------------------------------------------------------------ - -/** Internal acceptor state for IOCP-based I/O. - - This class contains the actual state for a listening socket, including - the native socket handle and pending accept operation. - - @note Internal implementation detail. Users interact with acceptor class. -*/ -class win_acceptor_impl_internal - : public capy::intrusive_list::node - , public std::enable_shared_from_this -{ - friend class win_sockets; - friend class win_acceptor_impl; - -public: - explicit win_acceptor_impl_internal(win_sockets& svc) noexcept; - ~win_acceptor_impl_internal(); - - void release_internal(); - - void accept( - capy::any_coro, - capy::any_executor_ref, - std::stop_token, - system::error_code*, - io_object::io_object_impl**); - - SOCKET native_handle() const noexcept { return socket_; } - bool is_open() const noexcept { return socket_ != INVALID_SOCKET; } - void cancel() noexcept; - void close_socket() noexcept; - - accept_op acc_; - -private: - win_sockets& svc_; - SOCKET socket_ = INVALID_SOCKET; -}; - -//------------------------------------------------------------------------------ - -/** Acceptor implementation wrapper for IOCP-based I/O. - - This class is the public-facing acceptor_impl that holds a shared_ptr - to the internal state. The shared_ptr is hidden from the public interface. - - @note Internal implementation detail. Users interact with acceptor class. -*/ -class win_acceptor_impl - : public acceptor::acceptor_impl - , public capy::intrusive_list::node -{ - std::shared_ptr internal_; - -public: - explicit win_acceptor_impl(std::shared_ptr internal) noexcept - : internal_(std::move(internal)) - { - } - - void release() override; - - void accept( - std::coroutine_handle<> h, - capy::any_executor_ref d, - std::stop_token token, - system::error_code* ec, - io_object::io_object_impl** impl_out) override - { - internal_->accept(h, d, token, ec, impl_out); - } - - win_acceptor_impl_internal* get_internal() const noexcept { return internal_.get(); } -}; - -//------------------------------------------------------------------------------ - -/** Windows IOCP socket management service. - - This service owns all socket implementations and coordinates their - lifecycle with the IOCP. It provides: - - - Socket implementation allocation and deallocation - - IOCP handle association for sockets - - Function pointer loading for ConnectEx/AcceptEx - - Graceful shutdown - destroys all implementations when io_context stops - - @par Thread Safety - All public member functions are thread-safe. - - @note Only available on Windows platforms. -*/ -class win_sockets - : private win_wsa_init - , public capy::execution_context::service -{ -public: - using key_type = win_sockets; - - /** Construct the socket service. - - Obtains the IOCP handle from the scheduler service and - loads extension function pointers. - - @param ctx Reference to the owning execution_context. - */ - explicit win_sockets(capy::execution_context& ctx); - - /** Destroy the socket service. */ - ~win_sockets(); - - win_sockets(win_sockets const&) = delete; - win_sockets& operator=(win_sockets const&) = delete; - - /** Shut down the service. */ - void shutdown() override; - - /** Create a new socket implementation wrapper. - The service owns the returned object. - */ - win_socket_impl& create_impl(); - - /** Destroy a socket implementation wrapper. - Removes from tracking list and deletes. - */ - void destroy_impl(win_socket_impl& impl); - - /** Unregister a socket implementation from the service list. - Called by the internal impl destructor. - */ - void unregister_impl(win_socket_impl_internal& impl); - - /** Create and register a socket with the IOCP. - - @param impl The socket implementation internal to initialize. - @return Error code, or success. - */ - system::error_code open_socket(win_socket_impl_internal& impl); - - /** Create a new acceptor implementation wrapper. - The service owns the returned object. - */ - win_acceptor_impl& create_acceptor_impl(); - - /** Destroy an acceptor implementation wrapper. - Removes from tracking list and deletes. - */ - void destroy_acceptor_impl(win_acceptor_impl& impl); - - /** Unregister an acceptor implementation from the service list. - Called by the internal impl destructor. - */ - void unregister_acceptor_impl(win_acceptor_impl_internal& impl); - - /** Create, bind, and listen on an acceptor socket. - - @param impl The acceptor implementation internal to initialize. - @param ep The local endpoint to bind to. - @param backlog The listen backlog. - @return Error code, or success. - */ - system::error_code open_acceptor( - win_acceptor_impl_internal& impl, - endpoint ep, - int backlog); - - /** Return the IOCP handle. */ - void* native_handle() const noexcept { return iocp_; } - - /** Return the completion key for associating sockets with IOCP. */ - completion_key* io_key() noexcept { return &overlapped_key_; } - - /** Return the ConnectEx function pointer. */ - LPFN_CONNECTEX connect_ex() const noexcept { return connect_ex_; } - - /** Return the AcceptEx function pointer. */ - LPFN_ACCEPTEX accept_ex() const noexcept { return accept_ex_; } - - /** Post an overlapped operation for completion. */ - void post(overlapped_op* op); - - /** Notify scheduler of pending I/O work. */ - void work_started() noexcept; - - /** Notify scheduler that I/O work completed. */ - void work_finished() noexcept; - -private: - struct overlapped_key final : completion_key - { - result on_completion( - win_scheduler& sched, - DWORD bytes, - DWORD dwError, - LPOVERLAPPED overlapped) override; - - void destroy(LPOVERLAPPED overlapped) override; - }; - - void load_extension_functions(); - - win_scheduler& sched_; - overlapped_key overlapped_key_; - win_mutex mutex_; - capy::intrusive_list socket_list_; - capy::intrusive_list acceptor_list_; - capy::intrusive_list socket_wrapper_list_; - capy::intrusive_list acceptor_wrapper_list_; - void* iocp_; - LPFN_CONNECTEX connect_ex_ = nullptr; - LPFN_ACCEPTEX accept_ex_ = nullptr; -}; - -} // namespace detail -} // namespace corosio -} // namespace boost - -#endif // BOOST_COROSIO_BACKEND_IOCP - -#endif // BOOST_COROSIO_DETAIL_IOCP_SOCKETS_HPP +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_IOCP_SOCKETS_HPP +#define BOOST_COROSIO_DETAIL_IOCP_SOCKETS_HPP + +#include "src/detail/config_backend.hpp" + +#if defined(BOOST_COROSIO_BACKEND_IOCP) + +#include +#include +#include +#include +#include +#include +#include + +#include "src/detail/iocp/windows.hpp" +#include "src/detail/iocp/completion_key.hpp" +#include "src/detail/iocp/overlapped_op.hpp" +#include "src/detail/iocp/mutex.hpp" +#include "src/detail/iocp/wsa_init.hpp" + +#include + +#include +#include + +namespace boost { +namespace corosio { +namespace detail { + +class win_scheduler; +class win_sockets; +class win_socket_impl; +class win_socket_impl_internal; +class win_acceptor_impl; +class win_acceptor_impl_internal; + +//------------------------------------------------------------------------------ + +/** Connect operation state. */ +struct connect_op : overlapped_op +{ + win_socket_impl_internal& internal; + std::shared_ptr internal_ptr; // Keeps internal alive during I/O + + explicit connect_op(win_socket_impl_internal& internal_) noexcept : internal(internal_) {} + + void operator()() override; + void do_cancel() noexcept override; +}; + +/** Read operation state with buffer descriptors. */ +struct read_op : overlapped_op +{ + static constexpr std::size_t max_buffers = 16; + WSABUF wsabufs[max_buffers]; + DWORD wsabuf_count = 0; + DWORD flags = 0; + win_socket_impl_internal& internal; + std::shared_ptr internal_ptr; // Keeps internal alive during I/O + + explicit read_op(win_socket_impl_internal& internal_) noexcept : internal(internal_) {} + + void operator()() override; + bool is_read_operation() const noexcept override { return true; } + void do_cancel() noexcept override; +}; + +/** Write operation state with buffer descriptors. */ +struct write_op : overlapped_op +{ + static constexpr std::size_t max_buffers = 16; + WSABUF wsabufs[max_buffers]; + DWORD wsabuf_count = 0; + win_socket_impl_internal& internal; + std::shared_ptr internal_ptr; // Keeps internal alive during I/O + + explicit write_op(win_socket_impl_internal& internal_) noexcept : internal(internal_) {} + + void operator()() override; + void do_cancel() noexcept override; +}; + +/** Accept operation state. */ +struct accept_op : overlapped_op +{ + SOCKET accepted_socket = INVALID_SOCKET; + win_socket_impl* peer_wrapper = nullptr; // Wrapper for accepted socket + std::shared_ptr acceptor_ptr; // Keeps acceptor alive during I/O + SOCKET listen_socket = INVALID_SOCKET; // For SO_UPDATE_ACCEPT_CONTEXT + io_object::io_object_impl** impl_out = nullptr; // Output: wrapper for awaitable + // Buffer for AcceptEx: local + remote addresses + char addr_buf[2 * (sizeof(sockaddr_in6) + 16)]; + + /** Resume the coroutine after accept completes. */ + void operator()() override; + + /** Cancel the pending accept via CancelIoEx. */ + void do_cancel() noexcept override; +}; + +//------------------------------------------------------------------------------ + +/** Internal socket state for IOCP-based I/O. + + This class contains the actual state for a single socket, including + the native socket handle and pending operations. It derives from + enable_shared_from_this so operations can extend its lifetime. + + @note Internal implementation detail. Users interact with socket class. +*/ +class win_socket_impl_internal + : public capy::intrusive_list::node + , public std::enable_shared_from_this +{ + friend class win_sockets; + friend class win_socket_impl; + friend struct read_op; + friend struct write_op; + friend struct connect_op; + + win_sockets& svc_; + connect_op conn_; + read_op rd_; + write_op wr_; + SOCKET socket_ = INVALID_SOCKET; + +public: + explicit win_socket_impl_internal(win_sockets& svc) noexcept; + ~win_socket_impl_internal(); + + void release_internal(); + + void connect( + capy::any_coro, + capy::any_executor_ref, + endpoint, + std::stop_token, + system::error_code*); + + void read_some( + capy::any_coro, + capy::any_executor_ref, + capy::buffer_param, + std::stop_token, + system::error_code*, + std::size_t*); + + void write_some( + capy::any_coro, + capy::any_executor_ref, + capy::buffer_param, + std::stop_token, + system::error_code*, + std::size_t*); + + SOCKET native_handle() const noexcept { return socket_; } + bool is_open() const noexcept { return socket_ != INVALID_SOCKET; } + void cancel() noexcept; + void close_socket() noexcept; + void set_socket(SOCKET s) noexcept { socket_ = s; } +}; + +//------------------------------------------------------------------------------ + +/** Socket implementation wrapper for IOCP-based I/O. + + This class is the public-facing socket_impl that holds a shared_ptr + to the internal state. The shared_ptr is hidden from the public interface. + + @note Internal implementation detail. Users interact with socket class. +*/ +class win_socket_impl + : public socket::socket_impl + , public capy::intrusive_list::node +{ + std::shared_ptr internal_; + +public: + explicit win_socket_impl(std::shared_ptr internal) noexcept + : internal_(std::move(internal)) + { + } + + void release() override; + + void connect( + std::coroutine_handle<> h, + capy::any_executor_ref d, + endpoint ep, + std::stop_token token, + system::error_code* ec) override + { + internal_->connect(h, d, ep, token, ec); + } + + void read_some( + std::coroutine_handle<> h, + capy::any_executor_ref d, + capy::buffer_param buf, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes) override + { + internal_->read_some(h, d, buf, token, ec, bytes); + } + + void write_some( + std::coroutine_handle<> h, + capy::any_executor_ref d, + capy::buffer_param buf, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes) override + { + internal_->write_some(h, d, buf, token, ec, bytes); + } + + system::error_code shutdown(socket::shutdown_type what) noexcept override + { + int how; + switch (what) + { + case socket::shutdown_receive: how = SD_RECEIVE; break; + case socket::shutdown_send: how = SD_SEND; break; + case socket::shutdown_both: how = SD_BOTH; break; + default: + return make_err(WSAEINVAL); + } + if (::shutdown(internal_->native_handle(), how) != 0) + return make_err(WSAGetLastError()); + return {}; + } + + win_socket_impl_internal* get_internal() const noexcept { return internal_.get(); } +}; + +//------------------------------------------------------------------------------ + +/** Internal acceptor state for IOCP-based I/O. + + This class contains the actual state for a listening socket, including + the native socket handle and pending accept operation. + + @note Internal implementation detail. Users interact with acceptor class. +*/ +class win_acceptor_impl_internal + : public capy::intrusive_list::node + , public std::enable_shared_from_this +{ + friend class win_sockets; + friend class win_acceptor_impl; + +public: + explicit win_acceptor_impl_internal(win_sockets& svc) noexcept; + ~win_acceptor_impl_internal(); + + void release_internal(); + + void accept( + capy::any_coro, + capy::any_executor_ref, + std::stop_token, + system::error_code*, + io_object::io_object_impl**); + + SOCKET native_handle() const noexcept { return socket_; } + bool is_open() const noexcept { return socket_ != INVALID_SOCKET; } + void cancel() noexcept; + void close_socket() noexcept; + + accept_op acc_; + +private: + win_sockets& svc_; + SOCKET socket_ = INVALID_SOCKET; +}; + +//------------------------------------------------------------------------------ + +/** Acceptor implementation wrapper for IOCP-based I/O. + + This class is the public-facing acceptor_impl that holds a shared_ptr + to the internal state. The shared_ptr is hidden from the public interface. + + @note Internal implementation detail. Users interact with acceptor class. +*/ +class win_acceptor_impl + : public acceptor::acceptor_impl + , public capy::intrusive_list::node +{ + std::shared_ptr internal_; + +public: + explicit win_acceptor_impl(std::shared_ptr internal) noexcept + : internal_(std::move(internal)) + { + } + + void release() override; + + void accept( + std::coroutine_handle<> h, + capy::any_executor_ref d, + std::stop_token token, + system::error_code* ec, + io_object::io_object_impl** impl_out) override + { + internal_->accept(h, d, token, ec, impl_out); + } + + win_acceptor_impl_internal* get_internal() const noexcept { return internal_.get(); } +}; + +//------------------------------------------------------------------------------ + +/** Windows IOCP socket management service. + + This service owns all socket implementations and coordinates their + lifecycle with the IOCP. It provides: + + - Socket implementation allocation and deallocation + - IOCP handle association for sockets + - Function pointer loading for ConnectEx/AcceptEx + - Graceful shutdown - destroys all implementations when io_context stops + + @par Thread Safety + All public member functions are thread-safe. + + @note Only available on Windows platforms. +*/ +class win_sockets + : private win_wsa_init + , public capy::execution_context::service +{ +public: + using key_type = win_sockets; + + /** Construct the socket service. + + Obtains the IOCP handle from the scheduler service and + loads extension function pointers. + + @param ctx Reference to the owning execution_context. + */ + explicit win_sockets(capy::execution_context& ctx); + + /** Destroy the socket service. */ + ~win_sockets(); + + win_sockets(win_sockets const&) = delete; + win_sockets& operator=(win_sockets const&) = delete; + + /** Shut down the service. */ + void shutdown() override; + + /** Create a new socket implementation wrapper. + The service owns the returned object. + */ + win_socket_impl& create_impl(); + + /** Destroy a socket implementation wrapper. + Removes from tracking list and deletes. + */ + void destroy_impl(win_socket_impl& impl); + + /** Unregister a socket implementation from the service list. + Called by the internal impl destructor. + */ + void unregister_impl(win_socket_impl_internal& impl); + + /** Create and register a socket with the IOCP. + + @param impl The socket implementation internal to initialize. + @return Error code, or success. + */ + system::error_code open_socket(win_socket_impl_internal& impl); + + /** Create a new acceptor implementation wrapper. + The service owns the returned object. + */ + win_acceptor_impl& create_acceptor_impl(); + + /** Destroy an acceptor implementation wrapper. + Removes from tracking list and deletes. + */ + void destroy_acceptor_impl(win_acceptor_impl& impl); + + /** Unregister an acceptor implementation from the service list. + Called by the internal impl destructor. + */ + void unregister_acceptor_impl(win_acceptor_impl_internal& impl); + + /** Create, bind, and listen on an acceptor socket. + + @param impl The acceptor implementation internal to initialize. + @param ep The local endpoint to bind to. + @param backlog The listen backlog. + @return Error code, or success. + */ + system::error_code open_acceptor( + win_acceptor_impl_internal& impl, + endpoint ep, + int backlog); + + /** Return the IOCP handle. */ + void* native_handle() const noexcept { return iocp_; } + + /** Return the completion key for associating sockets with IOCP. */ + completion_key* io_key() noexcept { return &overlapped_key_; } + + /** Return the ConnectEx function pointer. */ + LPFN_CONNECTEX connect_ex() const noexcept { return connect_ex_; } + + /** Return the AcceptEx function pointer. */ + LPFN_ACCEPTEX accept_ex() const noexcept { return accept_ex_; } + + /** Post an overlapped operation for completion. */ + void post(overlapped_op* op); + + /** Notify scheduler of pending I/O work. */ + void work_started() noexcept; + + /** Notify scheduler that I/O work completed. */ + void work_finished() noexcept; + +private: + struct overlapped_key final : completion_key + { + result on_completion( + win_scheduler& sched, + DWORD bytes, + DWORD dwError, + LPOVERLAPPED overlapped) override; + + void destroy(LPOVERLAPPED overlapped) override; + }; + + void load_extension_functions(); + + win_scheduler& sched_; + overlapped_key overlapped_key_; + win_mutex mutex_; + capy::intrusive_list socket_list_; + capy::intrusive_list acceptor_list_; + capy::intrusive_list socket_wrapper_list_; + capy::intrusive_list acceptor_wrapper_list_; + void* iocp_; + LPFN_CONNECTEX connect_ex_ = nullptr; + LPFN_ACCEPTEX accept_ex_ = nullptr; +}; + +} // namespace detail +} // namespace corosio +} // namespace boost + +#endif // BOOST_COROSIO_BACKEND_IOCP + +#endif // BOOST_COROSIO_DETAIL_IOCP_SOCKETS_HPP diff --git a/src/corosio/src/test/mocket.cpp b/src/corosio/src/test/mocket.cpp index ad8b57f..2e3e4ef 100644 --- a/src/corosio/src/test/mocket.cpp +++ b/src/corosio/src/test/mocket.cpp @@ -1,535 +1,535 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -namespace boost { -namespace corosio { -namespace test { - -namespace { - -constexpr std::size_t max_buffers = 8; -using buffer_array = std::array; - -} // namespace - -//------------------------------------------------------------------------------ - -class mocket_service; - -class mocket_impl - : public io_stream::io_stream_impl - , public capy::intrusive_list::node -{ - mocket_service& svc_; - capy::test::fuse& fuse_; - socket sock_; - std::string provide_; - std::string expect_; - mocket_impl* peer_ = nullptr; - bool check_fuse_; - -public: - mocket_impl( - mocket_service& svc, - capy::execution_context& ctx, - capy::test::fuse& f, - bool check_fuse); - - void set_peer(mocket_impl* peer) noexcept - { - peer_ = peer; - } - - socket& get_socket() noexcept - { - return sock_; - } - - void provide(std::string s) - { - provide_.append(std::move(s)); - } - - void expect(std::string s) - { - expect_.append(std::move(s)); - } - - system::error_code close(); - - bool is_open() const noexcept - { - return sock_.is_open(); - } - - void release() override; - - void read_some( - std::coroutine_handle<> h, - capy::any_executor_ref d, - capy::any_bufref& buffers, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes_transferred) override; - - void write_some( - std::coroutine_handle<> h, - capy::any_executor_ref d, - capy::any_bufref& buffers, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes_transferred) override; - -private: - std::size_t - fill_from_provide( - buffer_array const& bufs, - std::size_t count); - - bool - validate_expect( - buffer_array const& bufs, - std::size_t count, - std::size_t total_size); -}; - -//------------------------------------------------------------------------------ - -class mocket_service - : public capy::execution_context::service -{ - capy::execution_context& ctx_; - capy::intrusive_list impls_; - -public: - explicit mocket_service(capy::execution_context& ctx) - : ctx_(ctx) - { - } - - mocket_impl& - create_impl(capy::test::fuse& f, bool check_fuse) - { - auto* impl = new mocket_impl(*this, ctx_, f, check_fuse); - impls_.push_back(impl); - return *impl; - } - - void - destroy_impl(mocket_impl& impl) - { - impls_.remove(&impl); - delete &impl; - } - -protected: - void shutdown() override - { - while (auto* impl = impls_.pop_front()) - delete impl; - } -}; - -//------------------------------------------------------------------------------ - -mocket_impl:: -mocket_impl( - mocket_service& svc, - capy::execution_context& ctx, - capy::test::fuse& f, - bool check_fuse) - : svc_(svc) - , fuse_(f) - , sock_(ctx) - , check_fuse_(check_fuse) -{ -} - -system::error_code -mocket_impl:: -close() -{ - // Verify test expectations - if (!expect_.empty()) - { - fuse_.fail(); - sock_.close(); - return capy::error::test_failure; - } - if (!provide_.empty()) - { - fuse_.fail(); - sock_.close(); - return capy::error::test_failure; - } - - sock_.close(); - return {}; -} - -void -mocket_impl:: -release() -{ - svc_.destroy_impl(*this); -} - -std::size_t -mocket_impl:: -fill_from_provide( - buffer_array const& bufs, - std::size_t count) -{ - if (!peer_ || peer_->provide_.empty()) - return 0; - - std::size_t total = 0; - auto& src = peer_->provide_; - - for (std::size_t i = 0; i < count && !src.empty(); ++i) - { - auto const n = std::min(bufs[i].size(), src.size()); - std::memcpy(bufs[i].data(), src.data(), n); - src.erase(0, n); - total += n; - } - return total; -} - -bool -mocket_impl:: -validate_expect( - buffer_array const& bufs, - std::size_t count, - std::size_t total_size) -{ - if (expect_.empty()) - return true; - - // Build the write data - std::string written; - written.reserve(total_size); - for (std::size_t i = 0; i < count; ++i) - { - written.append( - static_cast(bufs[i].data()), - bufs[i].size()); - } - - // Check if written data matches expect prefix - auto const n = std::min(written.size(), expect_.size()); - if (std::memcmp(written.data(), expect_.data(), n) != 0) - { - fuse_.fail(); - return false; - } - - // Consume matched portion - expect_.erase(0, n); - return true; -} - -void -mocket_impl:: -read_some( - std::coroutine_handle<> h, - capy::any_executor_ref d, - capy::any_bufref& buffers, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes_transferred) -{ - (void)token; - // Fuse check for m1 only - if (check_fuse_) - { - auto fail_ec = fuse_.maybe_fail(); - if (fail_ec) - { - *ec = fail_ec; - *bytes_transferred = 0; - d.dispatch(capy::any_coro{h}).resume(); - return; - } - } - - // Check if peer has staged data - if so, serve from provide buffer - if (peer_ && !peer_->provide_.empty()) - { - // Extract buffers only when we need them for staged data - buffer_array bufs{}; - std::size_t count = buffers.copy_to(bufs.data(), max_buffers); - - std::size_t n = fill_from_provide(bufs, count); - *ec = {}; - *bytes_transferred = n; - d.dispatch(capy::any_coro{h}).resume(); - return; - } - - // Pass through to the real socket (don't extract buffers - forward as-is) - sock_.get_impl()->read_some(h, d, buffers, token, ec, bytes_transferred); -} - -void -mocket_impl:: -write_some( - std::coroutine_handle<> h, - capy::any_executor_ref d, - capy::any_bufref& buffers, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes_transferred) -{ - (void)token; - // Fuse check for m1 only - if (check_fuse_) - { - auto fail_ec = fuse_.maybe_fail(); - if (fail_ec) - { - *ec = fail_ec; - *bytes_transferred = 0; - d.dispatch(capy::any_coro{h}).resume(); - return; - } - } - - // Check if we have staged expectations to validate - if (!expect_.empty()) - { - // Extract buffers only when we need them for validation - buffer_array bufs{}; - std::size_t count = buffers.copy_to(bufs.data(), max_buffers); - - // Calculate total size - std::size_t total_size = 0; - for (std::size_t i = 0; i < count; ++i) - total_size += bufs[i].size(); - - if (!validate_expect(bufs, count, total_size)) - { - *ec = capy::error::test_failure; - *bytes_transferred = 0; - d.dispatch(capy::any_coro{h}).resume(); - return; - } - - // If all expected data was validated, report success - *ec = {}; - *bytes_transferred = total_size; - d.dispatch(capy::any_coro{h}).resume(); - return; - } - - // Pass through to the real socket (don't extract buffers - forward as-is) - sock_.get_impl()->write_some(h, d, buffers, token, ec, bytes_transferred); -} - -//------------------------------------------------------------------------------ - -mocket_impl* -mocket:: -get_impl() const noexcept -{ - return static_cast(impl_); -} - -mocket:: -~mocket() -{ - if (impl_) - impl_->release(); - impl_ = nullptr; -} - -mocket:: -mocket(mocket_impl* impl) noexcept - : io_stream(impl->get_socket().context()) -{ - impl_ = impl; -} - -mocket:: -mocket(mocket&& other) noexcept - : io_stream(other.context()) -{ - impl_ = other.impl_; - other.impl_ = nullptr; -} - -mocket& -mocket:: -operator=(mocket&& other) noexcept -{ - if (this != &other) - { - if (impl_) - impl_->release(); - impl_ = other.impl_; - other.impl_ = nullptr; - } - return *this; -} - -void -mocket:: -provide(std::string s) -{ - get_impl()->provide(std::move(s)); -} - -void -mocket:: -expect(std::string s) -{ - get_impl()->expect(std::move(s)); -} - -system::error_code -mocket:: -close() -{ - if (!impl_) - return {}; - return get_impl()->close(); -} - -bool -mocket:: -is_open() const noexcept -{ - return impl_ && get_impl()->is_open(); -} - -//------------------------------------------------------------------------------ - -namespace { - -// Test port range for mocket connections -constexpr std::uint16_t test_port_base = 49200; -constexpr std::uint16_t test_port_range = 100; -std::uint16_t next_test_port = 0; - -std::uint16_t -get_test_port() noexcept -{ - auto port = test_port_base + (next_test_port % test_port_range); - ++next_test_port; - return static_cast(port); -} - -} // namespace - -std::pair -make_mockets(capy::execution_context& ctx, capy::test::fuse& f) -{ - auto& svc = ctx.use_service(); - - // Create the two implementations - auto& impl1 = svc.create_impl(f, true); // m1 checks fuse - auto& impl2 = svc.create_impl(f, false); // m2 does not - - // Link them as peers - impl1.set_peer(&impl2); - impl2.set_peer(&impl1); - - auto& ioc = static_cast(ctx); - auto ex = ioc.get_executor(); - - // Get a test port - std::uint16_t port = get_test_port(); - system::error_code accept_ec; - system::error_code connect_ec; - bool accept_done = false; - bool connect_done = false; - - // Set up loopback connection using acceptor - acceptor acc(ctx); - acc.listen(endpoint(urls::ipv4_address::loopback(), port)); - - // Open impl2's socket for connect - impl2.get_socket().open(); - - // Create a socket to receive the accepted connection - socket accepted_socket(ctx); - - // Launch accept operation - // Note: Pass captures as parameters to store them in the coroutine frame, - // avoiding use-after-scope when the lambda temporary is destroyed. - capy::run_async(ex)( - [](acceptor& a, socket& s, - system::error_code& ec_out, bool& done_out) -> capy::task<> - { - auto [ec] = co_await a.accept(s); - ec_out = ec; - done_out = true; - }(acc, accepted_socket, accept_ec, accept_done)); - - // Launch connect operation - capy::run_async(ex)( - [](socket& s, endpoint ep, - system::error_code& ec_out, bool& done_out) -> capy::task<> - { - auto [ec] = co_await s.connect(ep); - ec_out = ec; - done_out = true; - }(impl2.get_socket(), endpoint(urls::ipv4_address::loopback(), port), - connect_ec, connect_done)); - - // Run until both complete - ioc.run(); - ioc.restart(); - - // Check for errors - if (!accept_done || accept_ec) - { - acc.close(); - throw std::runtime_error("mocket accept failed"); - } - - if (!connect_done || connect_ec) - { - acc.close(); - accepted_socket.close(); - throw std::runtime_error("mocket connect failed"); - } - - // Transfer the accepted socket to impl1 - impl1.get_socket() = std::move(accepted_socket); - - acc.close(); - - // Create the mocket wrappers - mocket m1(&impl1); - mocket m2(&impl2); - - return {std::move(m1), std::move(m2)}; -} - -} // namespace test -} // namespace corosio -} // namespace boost +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace boost { +namespace corosio { +namespace test { + +namespace { + +constexpr std::size_t max_buffers = 8; +using buffer_array = std::array; + +} // namespace + +//------------------------------------------------------------------------------ + +class mocket_service; + +class mocket_impl + : public io_stream::io_stream_impl + , public capy::intrusive_list::node +{ + mocket_service& svc_; + capy::test::fuse& fuse_; + socket sock_; + std::string provide_; + std::string expect_; + mocket_impl* peer_ = nullptr; + bool check_fuse_; + +public: + mocket_impl( + mocket_service& svc, + capy::execution_context& ctx, + capy::test::fuse& f, + bool check_fuse); + + void set_peer(mocket_impl* peer) noexcept + { + peer_ = peer; + } + + socket& get_socket() noexcept + { + return sock_; + } + + void provide(std::string s) + { + provide_.append(std::move(s)); + } + + void expect(std::string s) + { + expect_.append(std::move(s)); + } + + system::error_code close(); + + bool is_open() const noexcept + { + return sock_.is_open(); + } + + void release() override; + + void read_some( + std::coroutine_handle<> h, + capy::any_executor_ref d, + capy::buffer_param buffers, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes_transferred) override; + + void write_some( + std::coroutine_handle<> h, + capy::any_executor_ref d, + capy::buffer_param buffers, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes_transferred) override; + +private: + std::size_t + fill_from_provide( + buffer_array const& bufs, + std::size_t count); + + bool + validate_expect( + buffer_array const& bufs, + std::size_t count, + std::size_t total_size); +}; + +//------------------------------------------------------------------------------ + +class mocket_service + : public capy::execution_context::service +{ + capy::execution_context& ctx_; + capy::intrusive_list impls_; + +public: + explicit mocket_service(capy::execution_context& ctx) + : ctx_(ctx) + { + } + + mocket_impl& + create_impl(capy::test::fuse& f, bool check_fuse) + { + auto* impl = new mocket_impl(*this, ctx_, f, check_fuse); + impls_.push_back(impl); + return *impl; + } + + void + destroy_impl(mocket_impl& impl) + { + impls_.remove(&impl); + delete &impl; + } + +protected: + void shutdown() override + { + while (auto* impl = impls_.pop_front()) + delete impl; + } +}; + +//------------------------------------------------------------------------------ + +mocket_impl:: +mocket_impl( + mocket_service& svc, + capy::execution_context& ctx, + capy::test::fuse& f, + bool check_fuse) + : svc_(svc) + , fuse_(f) + , sock_(ctx) + , check_fuse_(check_fuse) +{ +} + +system::error_code +mocket_impl:: +close() +{ + // Verify test expectations + if (!expect_.empty()) + { + fuse_.fail(); + sock_.close(); + return capy::error::test_failure; + } + if (!provide_.empty()) + { + fuse_.fail(); + sock_.close(); + return capy::error::test_failure; + } + + sock_.close(); + return {}; +} + +void +mocket_impl:: +release() +{ + svc_.destroy_impl(*this); +} + +std::size_t +mocket_impl:: +fill_from_provide( + buffer_array const& bufs, + std::size_t count) +{ + if (!peer_ || peer_->provide_.empty()) + return 0; + + std::size_t total = 0; + auto& src = peer_->provide_; + + for (std::size_t i = 0; i < count && !src.empty(); ++i) + { + auto const n = std::min(bufs[i].size(), src.size()); + std::memcpy(bufs[i].data(), src.data(), n); + src.erase(0, n); + total += n; + } + return total; +} + +bool +mocket_impl:: +validate_expect( + buffer_array const& bufs, + std::size_t count, + std::size_t total_size) +{ + if (expect_.empty()) + return true; + + // Build the write data + std::string written; + written.reserve(total_size); + for (std::size_t i = 0; i < count; ++i) + { + written.append( + static_cast(bufs[i].data()), + bufs[i].size()); + } + + // Check if written data matches expect prefix + auto const n = std::min(written.size(), expect_.size()); + if (std::memcmp(written.data(), expect_.data(), n) != 0) + { + fuse_.fail(); + return false; + } + + // Consume matched portion + expect_.erase(0, n); + return true; +} + +void +mocket_impl:: +read_some( + std::coroutine_handle<> h, + capy::any_executor_ref d, + capy::buffer_param buffers, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes_transferred) +{ + (void)token; + // Fuse check for m1 only + if (check_fuse_) + { + auto fail_ec = fuse_.maybe_fail(); + if (fail_ec) + { + *ec = fail_ec; + *bytes_transferred = 0; + d.dispatch(capy::any_coro{h}).resume(); + return; + } + } + + // Check if peer has staged data - if so, serve from provide buffer + if (peer_ && !peer_->provide_.empty()) + { + // Extract buffers only when we need them for staged data + buffer_array bufs{}; + std::size_t count = buffers.copy_to(bufs.data(), max_buffers); + + std::size_t n = fill_from_provide(bufs, count); + *ec = {}; + *bytes_transferred = n; + d.dispatch(capy::any_coro{h}).resume(); + return; + } + + // Pass through to the real socket (don't extract buffers - forward as-is) + sock_.get_impl()->read_some(h, d, buffers, token, ec, bytes_transferred); +} + +void +mocket_impl:: +write_some( + std::coroutine_handle<> h, + capy::any_executor_ref d, + capy::buffer_param buffers, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes_transferred) +{ + (void)token; + // Fuse check for m1 only + if (check_fuse_) + { + auto fail_ec = fuse_.maybe_fail(); + if (fail_ec) + { + *ec = fail_ec; + *bytes_transferred = 0; + d.dispatch(capy::any_coro{h}).resume(); + return; + } + } + + // Check if we have staged expectations to validate + if (!expect_.empty()) + { + // Extract buffers only when we need them for validation + buffer_array bufs{}; + std::size_t count = buffers.copy_to(bufs.data(), max_buffers); + + // Calculate total size + std::size_t total_size = 0; + for (std::size_t i = 0; i < count; ++i) + total_size += bufs[i].size(); + + if (!validate_expect(bufs, count, total_size)) + { + *ec = capy::error::test_failure; + *bytes_transferred = 0; + d.dispatch(capy::any_coro{h}).resume(); + return; + } + + // If all expected data was validated, report success + *ec = {}; + *bytes_transferred = total_size; + d.dispatch(capy::any_coro{h}).resume(); + return; + } + + // Pass through to the real socket (don't extract buffers - forward as-is) + sock_.get_impl()->write_some(h, d, buffers, token, ec, bytes_transferred); +} + +//------------------------------------------------------------------------------ + +mocket_impl* +mocket:: +get_impl() const noexcept +{ + return static_cast(impl_); +} + +mocket:: +~mocket() +{ + if (impl_) + impl_->release(); + impl_ = nullptr; +} + +mocket:: +mocket(mocket_impl* impl) noexcept + : io_stream(impl->get_socket().context()) +{ + impl_ = impl; +} + +mocket:: +mocket(mocket&& other) noexcept + : io_stream(other.context()) +{ + impl_ = other.impl_; + other.impl_ = nullptr; +} + +mocket& +mocket:: +operator=(mocket&& other) noexcept +{ + if (this != &other) + { + if (impl_) + impl_->release(); + impl_ = other.impl_; + other.impl_ = nullptr; + } + return *this; +} + +void +mocket:: +provide(std::string s) +{ + get_impl()->provide(std::move(s)); +} + +void +mocket:: +expect(std::string s) +{ + get_impl()->expect(std::move(s)); +} + +system::error_code +mocket:: +close() +{ + if (!impl_) + return {}; + return get_impl()->close(); +} + +bool +mocket:: +is_open() const noexcept +{ + return impl_ && get_impl()->is_open(); +} + +//------------------------------------------------------------------------------ + +namespace { + +// Test port range for mocket connections +constexpr std::uint16_t test_port_base = 49200; +constexpr std::uint16_t test_port_range = 100; +std::uint16_t next_test_port = 0; + +std::uint16_t +get_test_port() noexcept +{ + auto port = test_port_base + (next_test_port % test_port_range); + ++next_test_port; + return static_cast(port); +} + +} // namespace + +std::pair +make_mockets(capy::execution_context& ctx, capy::test::fuse& f) +{ + auto& svc = ctx.use_service(); + + // Create the two implementations + auto& impl1 = svc.create_impl(f, true); // m1 checks fuse + auto& impl2 = svc.create_impl(f, false); // m2 does not + + // Link them as peers + impl1.set_peer(&impl2); + impl2.set_peer(&impl1); + + auto& ioc = static_cast(ctx); + auto ex = ioc.get_executor(); + + // Get a test port + std::uint16_t port = get_test_port(); + system::error_code accept_ec; + system::error_code connect_ec; + bool accept_done = false; + bool connect_done = false; + + // Set up loopback connection using acceptor + acceptor acc(ctx); + acc.listen(endpoint(urls::ipv4_address::loopback(), port)); + + // Open impl2's socket for connect + impl2.get_socket().open(); + + // Create a socket to receive the accepted connection + socket accepted_socket(ctx); + + // Launch accept operation + // Note: Pass captures as parameters to store them in the coroutine frame, + // avoiding use-after-scope when the lambda temporary is destroyed. + capy::run_async(ex)( + [](acceptor& a, socket& s, + system::error_code& ec_out, bool& done_out) -> capy::task<> + { + auto [ec] = co_await a.accept(s); + ec_out = ec; + done_out = true; + }(acc, accepted_socket, accept_ec, accept_done)); + + // Launch connect operation + capy::run_async(ex)( + [](socket& s, endpoint ep, + system::error_code& ec_out, bool& done_out) -> capy::task<> + { + auto [ec] = co_await s.connect(ep); + ec_out = ec; + done_out = true; + }(impl2.get_socket(), endpoint(urls::ipv4_address::loopback(), port), + connect_ec, connect_done)); + + // Run until both complete + ioc.run(); + ioc.restart(); + + // Check for errors + if (!accept_done || accept_ec) + { + acc.close(); + throw std::runtime_error("mocket accept failed"); + } + + if (!connect_done || connect_ec) + { + acc.close(); + accepted_socket.close(); + throw std::runtime_error("mocket connect failed"); + } + + // Transfer the accepted socket to impl1 + impl1.get_socket() = std::move(accepted_socket); + + acc.close(); + + // Create the mocket wrappers + mocket m1(&impl1); + mocket m2(&impl2); + + return {std::move(m1), std::move(m2)}; +} + +} // namespace test +} // namespace corosio +} // namespace boost diff --git a/src/openssl/src/openssl_stream.cpp b/src/openssl/src/openssl_stream.cpp index b7ae121..167ffe3 100644 --- a/src/openssl/src/openssl_stream.cpp +++ b/src/openssl/src/openssl_stream.cpp @@ -1,774 +1,774 @@ -// -// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -#include -#include -#include -#include -#include - -// Internal context implementation -#include "src/tls/detail/context_impl.hpp" - -#include -#include -#include -#include - -#include -#include -#include -#include - -/* - openssl_stream Architecture - =========================== - - TLS layer wrapping an underlying io_stream. Supports one concurrent - read_some and one concurrent write_some (like Asio's ssl::stream). - - Data Flow (using BIO pairs) - --------------------------- - App -> SSL_write -> int_bio_ -> BIO_read(ext_bio_) -> out_buf_ -> s_.write_some -> Network - App <- SSL_read <- int_bio_ <- BIO_write(ext_bio_) <- in_buf_ <- s_.read_some <- Network - - WANT_READ / WANT_WRITE Pattern - ------------------------------ - OpenSSL's SSL_read/SSL_write return SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE - when they need I/O. Our coroutine handles this by: - - 1. Call SSL_read or SSL_write - 2. Check for pending output in ext_bio_ via BIO_ctrl_pending - 3. If output pending: write to network via s_.write_some - 4. If SSL_ERROR_WANT_READ: read from network into ext_bio_ via s_.read_some + BIO_write - 5. Loop back to step 1 - - Renegotiation causes cross-direction I/O: SSL_read may need to write - handshake data, SSL_write may need to read. Each operation handles - whatever I/O direction OpenSSL requests. - - Key Types - --------- - - openssl_stream_impl_ : tls_stream_impl -- the impl stored in io_object::impl_ - - do_read_some, do_write_some -- inner coroutines with WANT_* loops -*/ - -namespace boost { -namespace corosio { - -namespace { - -// Default buffer size for TLS I/O -constexpr std::size_t default_buffer_size = 16384; - -// Maximum number of buffers to handle in a single operation -constexpr std::size_t max_buffers = 8; - -// Buffer array type for coroutine parameters (copied into frame) -using buffer_array = std::array; - -} // namespace - -//------------------------------------------------------------------------------ -// -// Native context caching -// -//------------------------------------------------------------------------------ - -namespace tls { -namespace detail { - -/** Cached OpenSSL context owning SSL_CTX. - - Created on first stream construction for a given tls::context, - then reused for subsequent streams sharing that context. -*/ -class openssl_native_context - : public native_context_base -{ -public: - SSL_CTX* ctx_; - - explicit - openssl_native_context( context_data const& cd ) - : ctx_( nullptr ) - { - // Create SSL_CTX supporting both client and server - ctx_ = SSL_CTX_new( TLS_method() ); - if( !ctx_ ) - return; - - // Set modes for partial writes and moving buffers - SSL_CTX_set_mode( ctx_, SSL_MODE_ENABLE_PARTIAL_WRITE ); - SSL_CTX_set_mode( ctx_, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER ); -#if defined( SSL_MODE_RELEASE_BUFFERS ) - SSL_CTX_set_mode( ctx_, SSL_MODE_RELEASE_BUFFERS ); -#endif - - // Apply verify mode from config - int verify_mode_flag = SSL_VERIFY_NONE; - if( cd.verification_mode == verify_mode::peer ) - verify_mode_flag = SSL_VERIFY_PEER; - else if( cd.verification_mode == verify_mode::require_peer ) - verify_mode_flag = SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT; - SSL_CTX_set_verify( ctx_, verify_mode_flag, nullptr ); - - // Apply certificates if provided - if( !cd.entity_certificate.empty() ) - { - BIO* bio = BIO_new_mem_buf( - cd.entity_certificate.data(), - static_cast( cd.entity_certificate.size() ) ); - if( bio ) - { - X509* cert = nullptr; - if( cd.entity_cert_format == file_format::pem ) - cert = PEM_read_bio_X509( bio, nullptr, nullptr, nullptr ); - else - cert = d2i_X509_bio( bio, nullptr ); - if( cert ) - { - SSL_CTX_use_certificate( ctx_, cert ); - X509_free( cert ); - } - BIO_free( bio ); - } - } - - // Apply private key if provided - if( !cd.private_key.empty() ) - { - BIO* bio = BIO_new_mem_buf( - cd.private_key.data(), - static_cast( cd.private_key.size() ) ); - if( bio ) - { - EVP_PKEY* pkey = nullptr; - if( cd.private_key_format == file_format::pem ) - pkey = PEM_read_bio_PrivateKey( bio, nullptr, nullptr, nullptr ); - else - pkey = d2i_PrivateKey_bio( bio, nullptr ); - if( pkey ) - { - SSL_CTX_use_PrivateKey( ctx_, pkey ); - EVP_PKEY_free( pkey ); - } - BIO_free( bio ); - } - } - - // Apply CA certificates for verification - X509_STORE* store = SSL_CTX_get_cert_store( ctx_ ); - for( auto const& ca : cd.ca_certificates ) - { - BIO* bio = BIO_new_mem_buf( ca.data(), static_cast( ca.size() ) ); - if( bio ) - { - X509* cert = PEM_read_bio_X509( bio, nullptr, nullptr, nullptr ); - if( cert ) - { - X509_STORE_add_cert( store, cert ); - X509_free( cert ); - } - BIO_free( bio ); - } - } - - // Apply verify depth - SSL_CTX_set_verify_depth( ctx_, cd.verify_depth ); - - // Apply cipher suites if provided - if( !cd.ciphersuites.empty() ) - { - SSL_CTX_set_security_level( ctx_, 0 ); - SSL_CTX_set_cipher_list( ctx_, cd.ciphersuites.c_str() ); - } - } - - ~openssl_native_context() override - { - if( ctx_ ) - SSL_CTX_free( ctx_ ); - } -}; - -/** Get or create cached SSL_CTX for this context. - - @param cd The context implementation. - - @return Pointer to the cached SSL_CTX. -*/ -inline SSL_CTX* -get_openssl_context( context_data const& cd ) -{ - static char key; - auto* p = cd.find( &key, [&] - { - return new openssl_native_context( cd ); - }); - return static_cast( p )->ctx_; -} - -} // namespace detail -} // namespace tls - -//------------------------------------------------------------------------------ - -struct openssl_stream_impl_ - : tls_stream::tls_stream_impl -{ - io_stream& s_; - tls::context ctx_; // holds ref to cached native context - SSL* ssl_ = nullptr; - BIO* ext_bio_ = nullptr; - - // Buffers for network I/O - std::vector in_buf_; - std::vector out_buf_; - - // Renegotiation can cause both TLS read/write to access the socket - capy::async_mutex io_mutex_; - - //-------------------------------------------------------------------------- - - openssl_stream_impl_( io_stream& s, tls::context ctx ) - : s_( s ) - , ctx_( std::move( ctx ) ) - { - in_buf_.resize( default_buffer_size ); - out_buf_.resize( default_buffer_size ); - } - - ~openssl_stream_impl_() - { - if( ext_bio_ ) - BIO_free( ext_bio_ ); - if( ssl_ ) - SSL_free( ssl_ ); - // SSL_CTX* is owned by cached native context, not freed here - } - - //-------------------------------------------------------------------------- - // Helper to flush pending output from BIO to network - //-------------------------------------------------------------------------- - - capy::task - flush_output() - { - while(BIO_ctrl_pending(ext_bio_) > 0) - { - int pending = static_cast(BIO_ctrl_pending(ext_bio_)); - int to_read = (std::min)(pending, static_cast(out_buf_.size())); - int n = BIO_read(ext_bio_, out_buf_.data(), to_read); - if(n <= 0) - break; - - // Write to underlying stream - auto guard = co_await io_mutex_.scoped_lock(); - auto [ec, written] = co_await s_.write_some( - capy::mutable_buffer(out_buf_.data(), static_cast(n))); - if(ec) - co_return ec; - } - co_return system::error_code{}; - } - - capy::task - read_input() - { - auto guard = co_await io_mutex_.scoped_lock(); - auto [ec, n] = co_await s_.read_some( - capy::mutable_buffer(in_buf_.data(), in_buf_.size())); - if(ec) - co_return ec; - - // Feed data into OpenSSL - int written = BIO_write(ext_bio_, in_buf_.data(), static_cast(n)); - (void)written; - - co_return system::error_code{}; - } - - //-------------------------------------------------------------------------- - // Inner coroutines for TLS read/write operations - //-------------------------------------------------------------------------- - - capy::task<> - do_read_some( - buffer_array dest_bufs, - std::size_t buf_count, - std::stop_token token, - system::error_code* ec_out, - std::size_t* bytes_out, - std::coroutine_handle<> continuation, - capy::any_executor_ref d) - { - system::error_code ec; - std::size_t total_read = 0; - - // Process each destination buffer - for(std::size_t i = 0; i < buf_count && !token.stop_requested(); ++i) - { - char* dest = static_cast(dest_bufs[i].data()); - int remaining = static_cast(dest_bufs[i].size()); - - while(remaining > 0 && !token.stop_requested()) - { - ERR_clear_error(); - int ret = SSL_read(ssl_, dest, remaining); - - if(ret > 0) - { - dest += ret; - remaining -= ret; - total_read += static_cast(ret); - - // For read_some semantics, return after first successful read - if(total_read > 0) - goto done; - } - else - { - int err = SSL_get_error(ssl_, ret); - - if(err == SSL_ERROR_WANT_WRITE) - { - // Flush pending output (renegotiation) - ec = co_await flush_output(); - if(ec) - goto done; - } - else if(err == SSL_ERROR_WANT_READ) - { - // First flush any pending output - ec = co_await flush_output(); - if(ec) - goto done; - - // Then read from network - ec = co_await read_input(); - if(ec) - { - if(ec == make_error_code(capy::error::eof)) - { - // Check if we got a proper shutdown - if(SSL_get_shutdown(ssl_) & SSL_RECEIVED_SHUTDOWN) - ec = make_error_code(capy::error::eof); - } - goto done; - } - } - else if(err == SSL_ERROR_ZERO_RETURN) - { - ec = make_error_code(capy::error::eof); - goto done; - } - else if(err == SSL_ERROR_SYSCALL) - { - unsigned long ssl_err = ERR_get_error(); - if(ssl_err == 0) - ec = make_error_code(capy::error::eof); - else - ec = system::error_code( - static_cast(ssl_err), system::system_category()); - goto done; - } - else - { - unsigned long ssl_err = ERR_get_error(); - ec = system::error_code( - static_cast(ssl_err), system::system_category()); - goto done; - } - } - } - } - - done: - if(token.stop_requested()) - ec = make_error_code(system::errc::operation_canceled); - - *ec_out = ec; - *bytes_out = total_read; - - d.dispatch(capy::any_coro{continuation}).resume(); - co_return; - } - - capy::task<> - do_write_some( - buffer_array src_bufs, - std::size_t buf_count, - std::stop_token token, - system::error_code* ec_out, - std::size_t* bytes_out, - std::coroutine_handle<> continuation, - capy::any_executor_ref d) - { - system::error_code ec; - std::size_t total_written = 0; - - // Process each source buffer - for(std::size_t i = 0; i < buf_count && !token.stop_requested(); ++i) - { - char const* src = static_cast(src_bufs[i].data()); - int remaining = static_cast(src_bufs[i].size()); - - while(remaining > 0 && !token.stop_requested()) - { - ERR_clear_error(); - int ret = SSL_write(ssl_, src, remaining); - - if(ret > 0) - { - src += ret; - remaining -= ret; - total_written += static_cast(ret); - - // For write_some semantics, flush and return after first successful write - if(total_written > 0) - { - ec = co_await flush_output(); - goto done; - } - } - else - { - int err = SSL_get_error(ssl_, ret); - - if(err == SSL_ERROR_WANT_WRITE) - { - ec = co_await flush_output(); - if(ec) - goto done; - } - else if(err == SSL_ERROR_WANT_READ) - { - // Renegotiation - flush then read - ec = co_await flush_output(); - if(ec) - goto done; - - ec = co_await read_input(); - if(ec) - goto done; - } - else - { - unsigned long ssl_err = ERR_get_error(); - ec = system::error_code( - static_cast(ssl_err), system::system_category()); - goto done; - } - } - } - } - - done: - if(token.stop_requested()) - ec = make_error_code(system::errc::operation_canceled); - - *ec_out = ec; - *bytes_out = total_written; - - d.dispatch(capy::any_coro{continuation}).resume(); - co_return; - } - - capy::task<> - do_handshake( - int type, - std::stop_token token, - system::error_code* ec_out, - std::coroutine_handle<> continuation, - capy::any_executor_ref d) - { - system::error_code ec; - - while(!token.stop_requested()) - { - ERR_clear_error(); - int ret; - if(type == openssl_stream::client) - ret = SSL_connect(ssl_); - else - ret = SSL_accept(ssl_); - - if(ret == 1) - { - // Handshake completed - flush any remaining output - ec = co_await flush_output(); - break; - } - else - { - int err = SSL_get_error(ssl_, ret); - - if(err == SSL_ERROR_WANT_WRITE) - { - ec = co_await flush_output(); - if(ec) - break; - } - else if(err == SSL_ERROR_WANT_READ) - { - // Flush output first (e.g., ClientHello) - ec = co_await flush_output(); - if(ec) - break; - - // Then read response - ec = co_await read_input(); - if(ec) - break; - } - else - { - unsigned long ssl_err = ERR_get_error(); - ec = system::error_code( - static_cast(ssl_err), system::system_category()); - break; - } - } - } - - if(token.stop_requested()) - ec = make_error_code(system::errc::operation_canceled); - - *ec_out = ec; - - d.dispatch(capy::any_coro{continuation}).resume(); - co_return; - } - - capy::task<> - do_shutdown( - std::stop_token token, - system::error_code* ec_out, - std::coroutine_handle<> continuation, - capy::any_executor_ref d) - { - system::error_code ec; - - while(!token.stop_requested()) - { - ERR_clear_error(); - int ret = SSL_shutdown(ssl_); - - if(ret == 1) - { - // Bidirectional shutdown complete - ec = co_await flush_output(); - break; - } - else if(ret == 0) - { - // Sent close_notify, need to receive peer's - ec = co_await flush_output(); - if(ec) - break; - - // Continue to receive peer's close_notify - ec = co_await read_input(); - if(ec) - { - // EOF is expected during shutdown - if(ec == make_error_code(capy::error::eof)) - ec = {}; - break; - } - } - else - { - int err = SSL_get_error(ssl_, ret); - - if(err == SSL_ERROR_WANT_WRITE) - { - ec = co_await flush_output(); - if(ec) - break; - } - else if(err == SSL_ERROR_WANT_READ) - { - ec = co_await flush_output(); - if(ec) - break; - - ec = co_await read_input(); - if(ec) - { - if(ec == make_error_code(capy::error::eof)) - ec = {}; - break; - } - } - else - { - unsigned long ssl_err = ERR_get_error(); - if(ssl_err == 0 && err == SSL_ERROR_SYSCALL) - { - // Connection closed without close_notify - acceptable - ec = {}; - } - else - { - ec = system::error_code( - static_cast(ssl_err), system::system_category()); - } - break; - } - } - } - - if(token.stop_requested()) - ec = make_error_code(system::errc::operation_canceled); - - *ec_out = ec; - - d.dispatch(capy::any_coro{continuation}).resume(); - co_return; - } - - //-------------------------------------------------------------------------- - // io_stream_impl interface - //-------------------------------------------------------------------------- - - void release() override - { - delete this; - } - - void read_some( - std::coroutine_handle<> h, - capy::any_executor_ref d, - capy::any_bufref& param, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes) override - { - buffer_array bufs{}; - std::size_t count = param.copy_to(bufs.data(), max_buffers); - - capy::run_async(d)( - do_read_some(bufs, count, token, ec, bytes, h, d)); - } - - void write_some( - std::coroutine_handle<> h, - capy::any_executor_ref d, - capy::any_bufref& param, - std::stop_token token, - system::error_code* ec, - std::size_t* bytes) override - { - buffer_array bufs{}; - std::size_t count = param.copy_to(bufs.data(), max_buffers); - - capy::run_async(d)( - do_write_some(bufs, count, token, ec, bytes, h, d)); - } - - void handshake( - std::coroutine_handle<> h, - capy::any_executor_ref d, - int type, - std::stop_token token, - system::error_code* ec) override - { - capy::run_async(d)( - do_handshake(type, token, ec, h, d)); - } - - void shutdown( - std::coroutine_handle<> h, - capy::any_executor_ref d, - std::stop_token token, - system::error_code* ec) override - { - capy::run_async(d)( - do_shutdown(token, ec, h, d)); - } - - //-------------------------------------------------------------------------- - // Initialization - //-------------------------------------------------------------------------- - - system::error_code - init_ssl() - { - // Get cached SSL_CTX from tls::context - auto& impl = tls::detail::get_context_data( ctx_ ); - SSL_CTX* native_ctx = tls::detail::get_openssl_context( impl ); - if( !native_ctx ) - { - unsigned long err = ERR_get_error(); - return system::error_code( - static_cast( err ), system::system_category() ); - } - - // Create SSL session from cached context - ssl_ = SSL_new( native_ctx ); - if( !ssl_ ) - { - unsigned long err = ERR_get_error(); - return system::error_code( - static_cast( err ), system::system_category() ); - } - - // Create BIO pair for I/O - BIO* int_bio = nullptr; - if( !BIO_new_bio_pair( &int_bio, 0, &ext_bio_, 0 ) ) - { - unsigned long err = ERR_get_error(); - SSL_free( ssl_ ); - ssl_ = nullptr; - return system::error_code( - static_cast( err ), system::system_category() ); - } - - // Attach internal BIO to SSL (SSL takes ownership) - SSL_set_bio( ssl_, int_bio, int_bio ); - - // Apply per-session config (SNI) from context - if( !impl.hostname.empty() ) - { - SSL_set_tlsext_host_name( ssl_, impl.hostname.c_str() ); - } - - return {}; - } -}; - -//------------------------------------------------------------------------------ - -openssl_stream:: -openssl_stream( io_stream& stream, tls::context ctx ) - : tls_stream( stream ) -{ - auto* impl = new openssl_stream_impl_( s_, std::move( ctx ) ); - - auto ec = impl->init_ssl(); - if( ec ) - { - delete impl; - return; - } - - impl_ = impl; -} - -openssl_stream:: -~openssl_stream() -{ - if( impl_ ) - impl_->release(); -} - -} // namespace corosio -} // namespace boost +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include +#include +#include +#include +#include + +// Internal context implementation +#include "src/tls/detail/context_impl.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include + +/* + openssl_stream Architecture + =========================== + + TLS layer wrapping an underlying io_stream. Supports one concurrent + read_some and one concurrent write_some (like Asio's ssl::stream). + + Data Flow (using BIO pairs) + --------------------------- + App -> SSL_write -> int_bio_ -> BIO_read(ext_bio_) -> out_buf_ -> s_.write_some -> Network + App <- SSL_read <- int_bio_ <- BIO_write(ext_bio_) <- in_buf_ <- s_.read_some <- Network + + WANT_READ / WANT_WRITE Pattern + ------------------------------ + OpenSSL's SSL_read/SSL_write return SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE + when they need I/O. Our coroutine handles this by: + + 1. Call SSL_read or SSL_write + 2. Check for pending output in ext_bio_ via BIO_ctrl_pending + 3. If output pending: write to network via s_.write_some + 4. If SSL_ERROR_WANT_READ: read from network into ext_bio_ via s_.read_some + BIO_write + 5. Loop back to step 1 + + Renegotiation causes cross-direction I/O: SSL_read may need to write + handshake data, SSL_write may need to read. Each operation handles + whatever I/O direction OpenSSL requests. + + Key Types + --------- + - openssl_stream_impl_ : tls_stream_impl -- the impl stored in io_object::impl_ + - do_read_some, do_write_some -- inner coroutines with WANT_* loops +*/ + +namespace boost { +namespace corosio { + +namespace { + +// Default buffer size for TLS I/O +constexpr std::size_t default_buffer_size = 16384; + +// Maximum number of buffers to handle in a single operation +constexpr std::size_t max_buffers = 8; + +// Buffer array type for coroutine parameters (copied into frame) +using buffer_array = std::array; + +} // namespace + +//------------------------------------------------------------------------------ +// +// Native context caching +// +//------------------------------------------------------------------------------ + +namespace tls { +namespace detail { + +/** Cached OpenSSL context owning SSL_CTX. + + Created on first stream construction for a given tls::context, + then reused for subsequent streams sharing that context. +*/ +class openssl_native_context + : public native_context_base +{ +public: + SSL_CTX* ctx_; + + explicit + openssl_native_context( context_data const& cd ) + : ctx_( nullptr ) + { + // Create SSL_CTX supporting both client and server + ctx_ = SSL_CTX_new( TLS_method() ); + if( !ctx_ ) + return; + + // Set modes for partial writes and moving buffers + SSL_CTX_set_mode( ctx_, SSL_MODE_ENABLE_PARTIAL_WRITE ); + SSL_CTX_set_mode( ctx_, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER ); +#if defined( SSL_MODE_RELEASE_BUFFERS ) + SSL_CTX_set_mode( ctx_, SSL_MODE_RELEASE_BUFFERS ); +#endif + + // Apply verify mode from config + int verify_mode_flag = SSL_VERIFY_NONE; + if( cd.verification_mode == verify_mode::peer ) + verify_mode_flag = SSL_VERIFY_PEER; + else if( cd.verification_mode == verify_mode::require_peer ) + verify_mode_flag = SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT; + SSL_CTX_set_verify( ctx_, verify_mode_flag, nullptr ); + + // Apply certificates if provided + if( !cd.entity_certificate.empty() ) + { + BIO* bio = BIO_new_mem_buf( + cd.entity_certificate.data(), + static_cast( cd.entity_certificate.size() ) ); + if( bio ) + { + X509* cert = nullptr; + if( cd.entity_cert_format == file_format::pem ) + cert = PEM_read_bio_X509( bio, nullptr, nullptr, nullptr ); + else + cert = d2i_X509_bio( bio, nullptr ); + if( cert ) + { + SSL_CTX_use_certificate( ctx_, cert ); + X509_free( cert ); + } + BIO_free( bio ); + } + } + + // Apply private key if provided + if( !cd.private_key.empty() ) + { + BIO* bio = BIO_new_mem_buf( + cd.private_key.data(), + static_cast( cd.private_key.size() ) ); + if( bio ) + { + EVP_PKEY* pkey = nullptr; + if( cd.private_key_format == file_format::pem ) + pkey = PEM_read_bio_PrivateKey( bio, nullptr, nullptr, nullptr ); + else + pkey = d2i_PrivateKey_bio( bio, nullptr ); + if( pkey ) + { + SSL_CTX_use_PrivateKey( ctx_, pkey ); + EVP_PKEY_free( pkey ); + } + BIO_free( bio ); + } + } + + // Apply CA certificates for verification + X509_STORE* store = SSL_CTX_get_cert_store( ctx_ ); + for( auto const& ca : cd.ca_certificates ) + { + BIO* bio = BIO_new_mem_buf( ca.data(), static_cast( ca.size() ) ); + if( bio ) + { + X509* cert = PEM_read_bio_X509( bio, nullptr, nullptr, nullptr ); + if( cert ) + { + X509_STORE_add_cert( store, cert ); + X509_free( cert ); + } + BIO_free( bio ); + } + } + + // Apply verify depth + SSL_CTX_set_verify_depth( ctx_, cd.verify_depth ); + + // Apply cipher suites if provided + if( !cd.ciphersuites.empty() ) + { + SSL_CTX_set_security_level( ctx_, 0 ); + SSL_CTX_set_cipher_list( ctx_, cd.ciphersuites.c_str() ); + } + } + + ~openssl_native_context() override + { + if( ctx_ ) + SSL_CTX_free( ctx_ ); + } +}; + +/** Get or create cached SSL_CTX for this context. + + @param cd The context implementation. + + @return Pointer to the cached SSL_CTX. +*/ +inline SSL_CTX* +get_openssl_context( context_data const& cd ) +{ + static char key; + auto* p = cd.find( &key, [&] + { + return new openssl_native_context( cd ); + }); + return static_cast( p )->ctx_; +} + +} // namespace detail +} // namespace tls + +//------------------------------------------------------------------------------ + +struct openssl_stream_impl_ + : tls_stream::tls_stream_impl +{ + io_stream& s_; + tls::context ctx_; // holds ref to cached native context + SSL* ssl_ = nullptr; + BIO* ext_bio_ = nullptr; + + // Buffers for network I/O + std::vector in_buf_; + std::vector out_buf_; + + // Renegotiation can cause both TLS read/write to access the socket + capy::async_mutex io_mutex_; + + //-------------------------------------------------------------------------- + + openssl_stream_impl_( io_stream& s, tls::context ctx ) + : s_( s ) + , ctx_( std::move( ctx ) ) + { + in_buf_.resize( default_buffer_size ); + out_buf_.resize( default_buffer_size ); + } + + ~openssl_stream_impl_() + { + if( ext_bio_ ) + BIO_free( ext_bio_ ); + if( ssl_ ) + SSL_free( ssl_ ); + // SSL_CTX* is owned by cached native context, not freed here + } + + //-------------------------------------------------------------------------- + // Helper to flush pending output from BIO to network + //-------------------------------------------------------------------------- + + capy::task + flush_output() + { + while(BIO_ctrl_pending(ext_bio_) > 0) + { + int pending = static_cast(BIO_ctrl_pending(ext_bio_)); + int to_read = (std::min)(pending, static_cast(out_buf_.size())); + int n = BIO_read(ext_bio_, out_buf_.data(), to_read); + if(n <= 0) + break; + + // Write to underlying stream + auto guard = co_await io_mutex_.scoped_lock(); + auto [ec, written] = co_await s_.write_some( + capy::mutable_buffer(out_buf_.data(), static_cast(n))); + if(ec) + co_return ec; + } + co_return system::error_code{}; + } + + capy::task + read_input() + { + auto guard = co_await io_mutex_.scoped_lock(); + auto [ec, n] = co_await s_.read_some( + capy::mutable_buffer(in_buf_.data(), in_buf_.size())); + if(ec) + co_return ec; + + // Feed data into OpenSSL + int written = BIO_write(ext_bio_, in_buf_.data(), static_cast(n)); + (void)written; + + co_return system::error_code{}; + } + + //-------------------------------------------------------------------------- + // Inner coroutines for TLS read/write operations + //-------------------------------------------------------------------------- + + capy::task<> + do_read_some( + buffer_array dest_bufs, + std::size_t buf_count, + std::stop_token token, + system::error_code* ec_out, + std::size_t* bytes_out, + std::coroutine_handle<> continuation, + capy::any_executor_ref d) + { + system::error_code ec; + std::size_t total_read = 0; + + // Process each destination buffer + for(std::size_t i = 0; i < buf_count && !token.stop_requested(); ++i) + { + char* dest = static_cast(dest_bufs[i].data()); + int remaining = static_cast(dest_bufs[i].size()); + + while(remaining > 0 && !token.stop_requested()) + { + ERR_clear_error(); + int ret = SSL_read(ssl_, dest, remaining); + + if(ret > 0) + { + dest += ret; + remaining -= ret; + total_read += static_cast(ret); + + // For read_some semantics, return after first successful read + if(total_read > 0) + goto done; + } + else + { + int err = SSL_get_error(ssl_, ret); + + if(err == SSL_ERROR_WANT_WRITE) + { + // Flush pending output (renegotiation) + ec = co_await flush_output(); + if(ec) + goto done; + } + else if(err == SSL_ERROR_WANT_READ) + { + // First flush any pending output + ec = co_await flush_output(); + if(ec) + goto done; + + // Then read from network + ec = co_await read_input(); + if(ec) + { + if(ec == make_error_code(capy::error::eof)) + { + // Check if we got a proper shutdown + if(SSL_get_shutdown(ssl_) & SSL_RECEIVED_SHUTDOWN) + ec = make_error_code(capy::error::eof); + } + goto done; + } + } + else if(err == SSL_ERROR_ZERO_RETURN) + { + ec = make_error_code(capy::error::eof); + goto done; + } + else if(err == SSL_ERROR_SYSCALL) + { + unsigned long ssl_err = ERR_get_error(); + if(ssl_err == 0) + ec = make_error_code(capy::error::eof); + else + ec = system::error_code( + static_cast(ssl_err), system::system_category()); + goto done; + } + else + { + unsigned long ssl_err = ERR_get_error(); + ec = system::error_code( + static_cast(ssl_err), system::system_category()); + goto done; + } + } + } + } + + done: + if(token.stop_requested()) + ec = make_error_code(system::errc::operation_canceled); + + *ec_out = ec; + *bytes_out = total_read; + + d.dispatch(capy::any_coro{continuation}).resume(); + co_return; + } + + capy::task<> + do_write_some( + buffer_array src_bufs, + std::size_t buf_count, + std::stop_token token, + system::error_code* ec_out, + std::size_t* bytes_out, + std::coroutine_handle<> continuation, + capy::any_executor_ref d) + { + system::error_code ec; + std::size_t total_written = 0; + + // Process each source buffer + for(std::size_t i = 0; i < buf_count && !token.stop_requested(); ++i) + { + char const* src = static_cast(src_bufs[i].data()); + int remaining = static_cast(src_bufs[i].size()); + + while(remaining > 0 && !token.stop_requested()) + { + ERR_clear_error(); + int ret = SSL_write(ssl_, src, remaining); + + if(ret > 0) + { + src += ret; + remaining -= ret; + total_written += static_cast(ret); + + // For write_some semantics, flush and return after first successful write + if(total_written > 0) + { + ec = co_await flush_output(); + goto done; + } + } + else + { + int err = SSL_get_error(ssl_, ret); + + if(err == SSL_ERROR_WANT_WRITE) + { + ec = co_await flush_output(); + if(ec) + goto done; + } + else if(err == SSL_ERROR_WANT_READ) + { + // Renegotiation - flush then read + ec = co_await flush_output(); + if(ec) + goto done; + + ec = co_await read_input(); + if(ec) + goto done; + } + else + { + unsigned long ssl_err = ERR_get_error(); + ec = system::error_code( + static_cast(ssl_err), system::system_category()); + goto done; + } + } + } + } + + done: + if(token.stop_requested()) + ec = make_error_code(system::errc::operation_canceled); + + *ec_out = ec; + *bytes_out = total_written; + + d.dispatch(capy::any_coro{continuation}).resume(); + co_return; + } + + capy::task<> + do_handshake( + int type, + std::stop_token token, + system::error_code* ec_out, + std::coroutine_handle<> continuation, + capy::any_executor_ref d) + { + system::error_code ec; + + while(!token.stop_requested()) + { + ERR_clear_error(); + int ret; + if(type == openssl_stream::client) + ret = SSL_connect(ssl_); + else + ret = SSL_accept(ssl_); + + if(ret == 1) + { + // Handshake completed - flush any remaining output + ec = co_await flush_output(); + break; + } + else + { + int err = SSL_get_error(ssl_, ret); + + if(err == SSL_ERROR_WANT_WRITE) + { + ec = co_await flush_output(); + if(ec) + break; + } + else if(err == SSL_ERROR_WANT_READ) + { + // Flush output first (e.g., ClientHello) + ec = co_await flush_output(); + if(ec) + break; + + // Then read response + ec = co_await read_input(); + if(ec) + break; + } + else + { + unsigned long ssl_err = ERR_get_error(); + ec = system::error_code( + static_cast(ssl_err), system::system_category()); + break; + } + } + } + + if(token.stop_requested()) + ec = make_error_code(system::errc::operation_canceled); + + *ec_out = ec; + + d.dispatch(capy::any_coro{continuation}).resume(); + co_return; + } + + capy::task<> + do_shutdown( + std::stop_token token, + system::error_code* ec_out, + std::coroutine_handle<> continuation, + capy::any_executor_ref d) + { + system::error_code ec; + + while(!token.stop_requested()) + { + ERR_clear_error(); + int ret = SSL_shutdown(ssl_); + + if(ret == 1) + { + // Bidirectional shutdown complete + ec = co_await flush_output(); + break; + } + else if(ret == 0) + { + // Sent close_notify, need to receive peer's + ec = co_await flush_output(); + if(ec) + break; + + // Continue to receive peer's close_notify + ec = co_await read_input(); + if(ec) + { + // EOF is expected during shutdown + if(ec == make_error_code(capy::error::eof)) + ec = {}; + break; + } + } + else + { + int err = SSL_get_error(ssl_, ret); + + if(err == SSL_ERROR_WANT_WRITE) + { + ec = co_await flush_output(); + if(ec) + break; + } + else if(err == SSL_ERROR_WANT_READ) + { + ec = co_await flush_output(); + if(ec) + break; + + ec = co_await read_input(); + if(ec) + { + if(ec == make_error_code(capy::error::eof)) + ec = {}; + break; + } + } + else + { + unsigned long ssl_err = ERR_get_error(); + if(ssl_err == 0 && err == SSL_ERROR_SYSCALL) + { + // Connection closed without close_notify - acceptable + ec = {}; + } + else + { + ec = system::error_code( + static_cast(ssl_err), system::system_category()); + } + break; + } + } + } + + if(token.stop_requested()) + ec = make_error_code(system::errc::operation_canceled); + + *ec_out = ec; + + d.dispatch(capy::any_coro{continuation}).resume(); + co_return; + } + + //-------------------------------------------------------------------------- + // io_stream_impl interface + //-------------------------------------------------------------------------- + + void release() override + { + delete this; + } + + void read_some( + std::coroutine_handle<> h, + capy::any_executor_ref d, + capy::buffer_param param, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes) override + { + buffer_array bufs{}; + std::size_t count = param.copy_to(bufs.data(), max_buffers); + + capy::run_async(d)( + do_read_some(bufs, count, token, ec, bytes, h, d)); + } + + void write_some( + std::coroutine_handle<> h, + capy::any_executor_ref d, + capy::buffer_param param, + std::stop_token token, + system::error_code* ec, + std::size_t* bytes) override + { + buffer_array bufs{}; + std::size_t count = param.copy_to(bufs.data(), max_buffers); + + capy::run_async(d)( + do_write_some(bufs, count, token, ec, bytes, h, d)); + } + + void handshake( + std::coroutine_handle<> h, + capy::any_executor_ref d, + int type, + std::stop_token token, + system::error_code* ec) override + { + capy::run_async(d)( + do_handshake(type, token, ec, h, d)); + } + + void shutdown( + std::coroutine_handle<> h, + capy::any_executor_ref d, + std::stop_token token, + system::error_code* ec) override + { + capy::run_async(d)( + do_shutdown(token, ec, h, d)); + } + + //-------------------------------------------------------------------------- + // Initialization + //-------------------------------------------------------------------------- + + system::error_code + init_ssl() + { + // Get cached SSL_CTX from tls::context + auto& impl = tls::detail::get_context_data( ctx_ ); + SSL_CTX* native_ctx = tls::detail::get_openssl_context( impl ); + if( !native_ctx ) + { + unsigned long err = ERR_get_error(); + return system::error_code( + static_cast( err ), system::system_category() ); + } + + // Create SSL session from cached context + ssl_ = SSL_new( native_ctx ); + if( !ssl_ ) + { + unsigned long err = ERR_get_error(); + return system::error_code( + static_cast( err ), system::system_category() ); + } + + // Create BIO pair for I/O + BIO* int_bio = nullptr; + if( !BIO_new_bio_pair( &int_bio, 0, &ext_bio_, 0 ) ) + { + unsigned long err = ERR_get_error(); + SSL_free( ssl_ ); + ssl_ = nullptr; + return system::error_code( + static_cast( err ), system::system_category() ); + } + + // Attach internal BIO to SSL (SSL takes ownership) + SSL_set_bio( ssl_, int_bio, int_bio ); + + // Apply per-session config (SNI) from context + if( !impl.hostname.empty() ) + { + SSL_set_tlsext_host_name( ssl_, impl.hostname.c_str() ); + } + + return {}; + } +}; + +//------------------------------------------------------------------------------ + +openssl_stream:: +openssl_stream( io_stream& stream, tls::context ctx ) + : tls_stream( stream ) +{ + auto* impl = new openssl_stream_impl_( s_, std::move( ctx ) ); + + auto ec = impl->init_ssl(); + if( ec ) + { + delete impl; + return; + } + + impl_ = impl; +} + +openssl_stream:: +~openssl_stream() +{ + if( impl_ ) + impl_->release(); +} + +} // namespace corosio +} // namespace boost diff --git a/src/wolfssl/src/wolfssl_stream.cpp b/src/wolfssl/src/wolfssl_stream.cpp index 262412c..3e51ae6 100644 --- a/src/wolfssl/src/wolfssl_stream.cpp +++ b/src/wolfssl/src/wolfssl_stream.cpp @@ -862,7 +862,7 @@ struct wolfssl_stream_impl_ void read_some( std::coroutine_handle<> h, capy::any_executor_ref d, - capy::any_bufref& param, + capy::buffer_param param, std::stop_token token, system::error_code* ec, std::size_t* bytes) override @@ -880,7 +880,7 @@ struct wolfssl_stream_impl_ void write_some( std::coroutine_handle<> h, capy::any_executor_ref d, - capy::any_bufref& param, + capy::buffer_param param, std::stop_token token, system::error_code* ec, std::size_t* bytes) override