From 36920b9d9c309678277d4b9ab61fa1501a4b0c1b Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Mon, 19 Jan 2026 13:53:41 +0100 Subject: [PATCH] feat: Integrate timers with epoll loop, implement posix signals Windows build fixes: - Add BOOST_COROSIO_DECL to tcp_server nested classes (worker_base, workers, launcher) for proper DLL export - Delete copy operations on workers class to prevent MSVC from instantiating copy constructor for vector> - Suppress C4251 warnings for private members with #pragma - Convert push_aw/pop_aw::await_suspend to templates matching IoAwaitable concept POSIX signal_set implementation: - Add posix_signals service using C signal() handlers - Implement global signal state for cross-service coordination - Support signal registration, async wait, cancellation, and queuing Timer service fixes: - Integrate timer_service with posix_scheduler epoll loop - Add calculate_timeout() to wake epoll for timer deadlines - Fix work tracking: use post() instead of dispatch() to prevent premature io_context::run() exit Lambda coroutine lifetime fixes: - Fix stack-use-after-scope bugs in mocket source and tests - Fix 19 lambda coroutines in timer.cpp tests - Fix 15 lambda coroutines in signal_set.cpp tests - Change [&] captures to explicit parameter passing Build system: - Add mocket, signal_set, timer tests to Jamfile --- include/boost/corosio/tcp_server.hpp | 46 +- src/corosio/src/detail/posix_op.hpp | 10 +- src/corosio/src/detail/posix_scheduler.cpp | 224 ++++---- src/corosio/src/detail/posix_scheduler.hpp | 6 +- src/corosio/src/detail/posix_signals.cpp | 574 +++++++++++++++++++-- src/corosio/src/detail/posix_signals.hpp | 231 +++++++++ src/corosio/src/detail/posix_sockets.hpp | 35 +- src/corosio/src/detail/timer_service.cpp | 12 +- src/corosio/src/tcp_server.cpp | 19 - src/corosio/src/test/mocket.cpp | 26 +- test/unit/Jamfile | 3 + test/unit/mocket.cpp | 14 +- test/unit/signal_set.cpp | 171 +++--- test/unit/timer.cpp | 178 +++---- 14 files changed, 1149 insertions(+), 400 deletions(-) create mode 100644 src/corosio/src/detail/posix_signals.hpp diff --git a/include/boost/corosio/tcp_server.hpp b/include/boost/corosio/tcp_server.hpp index cd32276..c970f32 100644 --- a/include/boost/corosio/tcp_server.hpp +++ b/include/boost/corosio/tcp_server.hpp @@ -30,6 +30,11 @@ namespace boost { namespace corosio { +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable: 4251) // class needs to have dll-interface +#endif + class BOOST_COROSIO_DECL tcp_server { @@ -126,7 +131,15 @@ class BOOST_COROSIO_DECL public: push_aw(tcp_server& self, worker_base& w) noexcept; bool await_ready() const noexcept; - std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept; + + template + std::coroutine_handle<> + await_suspend(std::coroutine_handle<> h, Ex const&, std::stop_token) noexcept + { + // Dispatch to server's executor before touching shared state + return self_.dispatch_.dispatch(h); + } + void await_resume() noexcept; }; @@ -142,7 +155,18 @@ class BOOST_COROSIO_DECL public: pop_aw(tcp_server& self) noexcept; bool await_ready() const noexcept; - bool await_suspend(std::coroutine_handle<> h) noexcept; + + template + bool + await_suspend(std::coroutine_handle<> h, Ex const&, std::stop_token) noexcept + { + wait_.h = h; + wait_.w = nullptr; + wait_.next = self_.waiters_; + self_.waiters_ = &wait_; + return true; + } + system::result await_resume() noexcept; }; @@ -151,7 +175,7 @@ class BOOST_COROSIO_DECL capy::task do_accept(acceptor& acc); protected: - class worker_base + class BOOST_COROSIO_DECL worker_base { worker_base* next = nullptr; @@ -171,13 +195,21 @@ class BOOST_COROSIO_DECL } }; - class workers + class BOOST_COROSIO_DECL workers { friend class tcp_server; std::vector> v_; worker_base* idle_ = nullptr; + public: + workers() = default; + workers(workers const&) = delete; + workers& operator=(workers const&) = delete; + workers(workers&&) = default; + workers& operator=(workers&&) = default; + + private: void push(worker_base& w) noexcept { w.next = idle_; @@ -208,7 +240,7 @@ class BOOST_COROSIO_DECL workers wv_; - class launcher + class BOOST_COROSIO_DECL launcher { tcp_server* srv_; worker_base* w_; @@ -283,6 +315,10 @@ class BOOST_COROSIO_DECL void start(); }; +#ifdef _MSC_VER +#pragma warning(pop) +#endif + } // corosio } // boost diff --git a/src/corosio/src/detail/posix_op.hpp b/src/corosio/src/detail/posix_op.hpp index 24add6f..81fdff1 100644 --- a/src/corosio/src/detail/posix_op.hpp +++ b/src/corosio/src/detail/posix_op.hpp @@ -56,9 +56,9 @@ struct posix_op : scheduler_op system::error_code* ec_out = nullptr; std::size_t* bytes_out = nullptr; - int fd = -1; // Socket file descriptor - std::uint32_t events = 0; // Requested epoll events (EPOLLIN/EPOLLOUT) - int error = 0; // errno on completion + int fd = -1; + std::uint32_t events = 0; + int error = 0; std::size_t bytes_transferred = 0; std::atomic cancelled{false}; @@ -101,7 +101,6 @@ struct posix_op : scheduler_op d.dispatch(h).resume(); } - // Returns true if this is a read operation (for EOF detection) virtual bool is_read_operation() const noexcept { return false; } void destroy() override @@ -219,7 +218,6 @@ struct posix_accept_op : posix_op io_object::io_object_impl* peer_impl = nullptr; io_object::io_object_impl** impl_out = nullptr; - // Function to create peer impl - set by posix_sockets using create_peer_fn = io_object::io_object_impl* (*)(void*, int); create_peer_fn create_peer = nullptr; void* service_ptr = nullptr; @@ -269,14 +267,12 @@ struct posix_accept_op : posix_op if (success && accepted_fd >= 0 && peer_impl) { - // Pass impl to awaitable for assignment to peer socket if (impl_out) *impl_out = peer_impl; peer_impl = nullptr; } else { - // Cleanup on failure if (accepted_fd >= 0) { ::close(accepted_fd); diff --git a/src/corosio/src/detail/posix_scheduler.cpp b/src/corosio/src/detail/posix_scheduler.cpp index fd10a8f..fbbfc22 100644 --- a/src/corosio/src/detail/posix_scheduler.cpp +++ b/src/corosio/src/detail/posix_scheduler.cpp @@ -15,8 +15,9 @@ #include #include +#include +#include #include -#include #include #include @@ -60,7 +61,7 @@ struct thread_context_guard posix_scheduler:: posix_scheduler( - capy::execution_context&, + capy::execution_context& ctx, int) : epoll_fd_(-1) , event_fd_(-1) @@ -68,14 +69,12 @@ posix_scheduler( , stopped_(false) , shutdown_(false) { - // Create epoll instance epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); if (epoll_fd_ < 0) detail::throw_system_error( system::error_code(errno, system::system_category()), "epoll_create1"); - // Create eventfd for waking the scheduler event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (event_fd_ < 0) { @@ -86,7 +85,7 @@ posix_scheduler( "eventfd"); } - // Register eventfd with epoll (data.ptr = nullptr signals wakeup event) + // data.ptr = nullptr distinguishes wakeup events from I/O completions epoll_event ev{}; ev.events = EPOLLIN; ev.data.ptr = nullptr; @@ -99,6 +98,12 @@ posix_scheduler( system::error_code(err, system::system_category()), "epoll_ctl"); } + + timer_svc_ = &get_timer_service(ctx, *this); + timer_svc_->set_on_earliest_changed( + timer_service::callback( + this, + [](void* p) { static_cast(p)->wakeup(); })); } posix_scheduler:: @@ -378,19 +383,14 @@ void posix_scheduler:: work_finished() const noexcept { - if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) - { - const_cast(this)->stop(); - } + outstanding_work_.fetch_sub(1, std::memory_order_acq_rel); } void posix_scheduler:: wakeup() const { - // Write to eventfd to wake up epoll_wait - // Return value intentionally ignored - eventfd write cannot fail - // when buffer has space (counter won't overflow with uint64_t max) + // Write cannot fail: eventfd counter won't overflow with single increments std::uint64_t val = 1; [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val)); } @@ -402,126 +402,142 @@ struct work_guard ~work_guard() { self->work_finished(); } }; -std::size_t +long posix_scheduler:: -do_one(long timeout_us) +calculate_timeout(long requested_timeout_us) const { - // Check stopped first - if (stopped_.load(std::memory_order_acquire)) + if (requested_timeout_us == 0) return 0; - // First check if there are handlers in the queue - scheduler_op* h = nullptr; - { - std::lock_guard lock(mutex_); - h = completed_ops_.pop(); - } - - if (h) - { - // Execute handler outside the lock - work_guard g{this}; - (*h)(); - return 1; - } + auto nearest = timer_svc_->nearest_expiry(); + if (nearest == timer_service::time_point::max()) + return requested_timeout_us; - // Check if there's actually work to wait for - if (outstanding_work_.load(std::memory_order_acquire) == 0) + auto now = std::chrono::steady_clock::now(); + if (nearest <= now) return 0; - // Convert timeout from microseconds to milliseconds - int timeout_ms; - if (timeout_us < 0) - timeout_ms = -1; // Infinite wait - else if (timeout_us == 0) - timeout_ms = 0; // Non-blocking poll - else - timeout_ms = static_cast((timeout_us + 999) / 1000); + auto timer_timeout_us = std::chrono::duration_cast( + nearest - now).count(); + + if (requested_timeout_us < 0) + return static_cast(timer_timeout_us); - // Wait for events - epoll_event events[64]; - int nfds = ::epoll_wait(epoll_fd_, events, 64, timeout_ms); + return static_cast((std::min)( + static_cast(requested_timeout_us), + static_cast(timer_timeout_us))); +} - if (nfds < 0) +std::size_t +posix_scheduler:: +do_one(long timeout_us) +{ + for (;;) { - if (errno == EINTR) + if (stopped_.load(std::memory_order_acquire)) return 0; - detail::throw_system_error( - system::error_code(errno, system::system_category()), - "epoll_wait"); - } - // Process epoll events - for (int i = 0; i < nfds; ++i) - { - if (events[i].data.ptr == nullptr) + scheduler_op* h = nullptr; { - // eventfd wakeup - drain it - // Return value intentionally ignored - we just need to consume the event - std::uint64_t val; - [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); - continue; + std::lock_guard lock(mutex_); + h = completed_ops_.pop(); } - // I/O event - get the operation from data.ptr - auto* op = static_cast(events[i].data.ptr); - - // Unregister the fd from epoll (one-shot behavior) - unregister_fd(op->fd); - - // Check for errors - if (events[i].events & (EPOLLERR | EPOLLHUP)) + if (h) { - // Get socket error - int err = 0; - socklen_t len = sizeof(err); - if (::getsockopt(op->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) - err = errno; - if (err == 0) - err = EIO; // Generic I/O error - - op->complete(err, 0); + work_guard g{this}; + (*h)(); + return 1; } + + if (outstanding_work_.load(std::memory_order_acquire) == 0) + return 0; + + long effective_timeout_us = calculate_timeout(timeout_us); + + int timeout_ms; + if (effective_timeout_us < 0) + timeout_ms = -1; + else if (effective_timeout_us == 0) + timeout_ms = 0; else + timeout_ms = static_cast((effective_timeout_us + 999) / 1000); + + epoll_event events[64]; + int nfds = ::epoll_wait(epoll_fd_, events, 64, timeout_ms); + + if (nfds < 0) { - // Operation is ready - perform the actual I/O - op->perform_io(); + if (errno == EINTR) + { + // EINTR: retry for infinite waits, return for timed waits + if (timeout_us < 0) + continue; + return 0; + } + detail::throw_system_error( + system::error_code(errno, system::system_category()), + "epoll_wait"); } - // Post the operation to the handler queue + // May dispatch timer handlers inline + timer_svc_->process_expired(); + + for (int i = 0; i < nfds; ++i) { - std::lock_guard lock(mutex_); - completed_ops_.push(op); + if (events[i].data.ptr == nullptr) + { + // Drain eventfd; read cannot fail since epoll signaled readiness + std::uint64_t val; + [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); + continue; + } + + auto* op = static_cast(events[i].data.ptr); + + // One-shot: unregister before I/O + unregister_fd(op->fd); + + if (events[i].events & (EPOLLERR | EPOLLHUP)) + { + int err = 0; + socklen_t len = sizeof(err); + if (::getsockopt(op->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) + err = errno; + if (err == 0) + err = EIO; + op->complete(err, 0); + } + else + { + op->perform_io(); + } + + { + std::lock_guard lock(mutex_); + completed_ops_.push(op); + } } - } - // Check stopped again after epoll - if (stopped_.load(std::memory_order_acquire)) - return 0; + if (stopped_.load(std::memory_order_acquire)) + return 0; - // Check again for handlers after processing epoll events - { - std::lock_guard lock(mutex_); - h = completed_ops_.pop(); - } + { + std::lock_guard lock(mutex_); + h = completed_ops_.pop(); + } - if (h) - { - work_guard g{this}; - (*h)(); - return 1; - } + if (h) + { + work_guard g{this}; + (*h)(); + return 1; + } - // If we processed only wakeup events (no I/O completions) and - // there's still outstanding work, continue waiting - if (nfds > 0 && outstanding_work_.load(std::memory_order_acquire) > 0) - { - // Recurse to wait again - this handles the case where we - // only processed eventfd wakeups with no actual completions - return do_one(timeout_us); + // Finite timeout: return on timeout; infinite: keep looping + if (timeout_us >= 0) + return 0; } - - return 0; } } // namespace detail diff --git a/src/corosio/src/detail/posix_scheduler.hpp b/src/corosio/src/detail/posix_scheduler.hpp index 388f9f0..b33c3b5 100644 --- a/src/corosio/src/detail/posix_scheduler.hpp +++ b/src/corosio/src/detail/posix_scheduler.hpp @@ -15,6 +15,7 @@ #include #include "src/detail/scheduler_op.hpp" +#include "src/detail/timer_service.hpp" #include #include @@ -26,7 +27,6 @@ namespace boost { namespace corosio { namespace detail { -// Forward declaration struct posix_op; /** POSIX scheduler using epoll for I/O multiplexing. @@ -120,14 +120,16 @@ class posix_scheduler private: std::size_t do_one(long timeout_us); void wakeup() const; + long calculate_timeout(long requested_timeout_us) const; - int epoll_fd_; // epoll instance + int epoll_fd_; int event_fd_; // for waking epoll_wait mutable std::mutex mutex_; mutable op_queue completed_ops_; mutable std::atomic outstanding_work_; std::atomic stopped_; bool shutdown_; + timer_service* timer_svc_ = nullptr; }; } // namespace detail diff --git a/src/corosio/src/detail/posix_signals.cpp b/src/corosio/src/detail/posix_signals.cpp index 38e49a7..7e1fc30 100644 --- a/src/corosio/src/detail/posix_signals.cpp +++ b/src/corosio/src/detail/posix_signals.cpp @@ -9,11 +9,521 @@ #ifndef _WIN32 -#include +#include "src/detail/posix_signals.hpp" +#include "src/detail/posix_scheduler.hpp" + #include +#include +#include + +#include +#include + +#include namespace boost { namespace corosio { +namespace detail { + +//------------------------------------------------------------------------------ +// +// Global signal state +// +//------------------------------------------------------------------------------ + +namespace { + +struct signal_state +{ + std::mutex mutex; + posix_signals* service_list = nullptr; + std::size_t registration_count[max_signal_number] = {}; +}; + +signal_state* get_signal_state() +{ + static signal_state state; + return &state; +} + +// C signal handler - must be async-signal-safe +extern "C" void corosio_posix_signal_handler(int signal_number) +{ + posix_signals::deliver_signal(signal_number); + + // Re-register handler (some systems reset to SIG_DFL after each signal) + ::signal(signal_number, corosio_posix_signal_handler); +} + +} // namespace + +//------------------------------------------------------------------------------ +// +// signal_op +// +//------------------------------------------------------------------------------ + +void +signal_op:: +operator()() +{ + if (ec_out) + *ec_out = {}; + if (signal_out) + *signal_out = signal_number; + + // Capture svc before resuming (coro may destroy us) + auto* service = svc; + svc = nullptr; + + d.post(capy::any_coro{h}); + + // Balance the on_work_started() from start_wait + if (service) + service->work_finished(); +} + +void +signal_op:: +destroy() +{ + // No-op: signal_op is embedded in posix_signal_impl +} + +//------------------------------------------------------------------------------ +// +// posix_signal_impl +// +//------------------------------------------------------------------------------ + +posix_signal_impl:: +posix_signal_impl(posix_signals& svc) noexcept + : svc_(svc) +{ +} + +void +posix_signal_impl:: +release() +{ + clear(); + cancel(); + svc_.destroy_impl(*this); +} + +void +posix_signal_impl:: +wait( + std::coroutine_handle<> h, + capy::any_executor_ref d, + std::stop_token token, + system::error_code* ec, + int* signal_out) +{ + pending_op_.h = h; + pending_op_.d = d; + pending_op_.ec_out = ec; + pending_op_.signal_out = signal_out; + pending_op_.signal_number = 0; + + if (token.stop_requested()) + { + if (ec) + *ec = make_error_code(capy::error::canceled); + if (signal_out) + *signal_out = 0; + d.post(capy::any_coro{h}); + return; + } + + svc_.start_wait(*this, &pending_op_); +} + +system::error_code +posix_signal_impl:: +add(int signal_number) +{ + return svc_.add_signal(*this, signal_number); +} + +system::error_code +posix_signal_impl:: +remove(int signal_number) +{ + return svc_.remove_signal(*this, signal_number); +} + +system::error_code +posix_signal_impl:: +clear() +{ + return svc_.clear_signals(*this); +} + +void +posix_signal_impl:: +cancel() +{ + svc_.cancel_wait(*this); +} + +//------------------------------------------------------------------------------ +// +// posix_signals +// +//------------------------------------------------------------------------------ + +posix_signals:: +posix_signals(capy::execution_context& ctx) + : sched_(ctx.use_service()) +{ + for (int i = 0; i < max_signal_number; ++i) + { + registrations_[i] = nullptr; + registration_count_[i] = 0; + } + add_service(this); +} + +posix_signals:: +~posix_signals() +{ + remove_service(this); +} + +void +posix_signals:: +shutdown() +{ + std::lock_guard lock(mutex_); + + for (auto* impl = impl_list_.pop_front(); impl != nullptr; + impl = impl_list_.pop_front()) + { + while (auto* reg = impl->signals_) + { + impl->signals_ = reg->next_in_set; + delete reg; + } + delete impl; + } +} + +posix_signal_impl& +posix_signals:: +create_impl() +{ + auto* impl = new posix_signal_impl(*this); + + { + std::lock_guard lock(mutex_); + impl_list_.push_back(impl); + } + + return *impl; +} + +void +posix_signals:: +destroy_impl(posix_signal_impl& impl) +{ + { + std::lock_guard lock(mutex_); + impl_list_.remove(&impl); + } + + delete &impl; +} + +system::error_code +posix_signals:: +add_signal( + posix_signal_impl& impl, + int signal_number) +{ + if (signal_number < 1 || signal_number >= max_signal_number) + return make_error_code(system::errc::invalid_argument); + + signal_state* state = get_signal_state(); + std::lock_guard state_lock(state->mutex); + std::lock_guard lock(mutex_); + + // Find insertion point (list is sorted by signal number) + signal_registration** insertion_point = &impl.signals_; + signal_registration* reg = impl.signals_; + while (reg && reg->signal_number < signal_number) + { + insertion_point = ®->next_in_set; + reg = reg->next_in_set; + } + + if (reg && reg->signal_number == signal_number) + return {}; + + auto* new_reg = new signal_registration; + new_reg->signal_number = signal_number; + new_reg->owner = &impl; + new_reg->undelivered = 0; + + // Install signal handler on first global registration + if (state->registration_count[signal_number] == 0) + { + if (::signal(signal_number, corosio_posix_signal_handler) == SIG_ERR) + { + delete new_reg; + return make_error_code(system::errc::invalid_argument); + } + } + + new_reg->next_in_set = reg; + *insertion_point = new_reg; + + new_reg->next_in_table = registrations_[signal_number]; + new_reg->prev_in_table = nullptr; + if (registrations_[signal_number]) + registrations_[signal_number]->prev_in_table = new_reg; + registrations_[signal_number] = new_reg; + + ++state->registration_count[signal_number]; + ++registration_count_[signal_number]; + + return {}; +} + +system::error_code +posix_signals:: +remove_signal( + posix_signal_impl& impl, + int signal_number) +{ + if (signal_number < 1 || signal_number >= max_signal_number) + return make_error_code(system::errc::invalid_argument); + + signal_state* state = get_signal_state(); + std::lock_guard state_lock(state->mutex); + std::lock_guard lock(mutex_); + + signal_registration** deletion_point = &impl.signals_; + signal_registration* reg = impl.signals_; + while (reg && reg->signal_number < signal_number) + { + deletion_point = ®->next_in_set; + reg = reg->next_in_set; + } + + if (!reg || reg->signal_number != signal_number) + return {}; + + // Restore default handler on last global unregistration + if (state->registration_count[signal_number] == 1) + ::signal(signal_number, SIG_DFL); + + *deletion_point = reg->next_in_set; + + if (registrations_[signal_number] == reg) + registrations_[signal_number] = reg->next_in_table; + if (reg->prev_in_table) + reg->prev_in_table->next_in_table = reg->next_in_table; + if (reg->next_in_table) + reg->next_in_table->prev_in_table = reg->prev_in_table; + + --state->registration_count[signal_number]; + --registration_count_[signal_number]; + + delete reg; + return {}; +} + +system::error_code +posix_signals:: +clear_signals(posix_signal_impl& impl) +{ + signal_state* state = get_signal_state(); + std::lock_guard state_lock(state->mutex); + std::lock_guard lock(mutex_); + + while (signal_registration* reg = impl.signals_) + { + int signal_number = reg->signal_number; + + if (state->registration_count[signal_number] == 1) + ::signal(signal_number, SIG_DFL); + + impl.signals_ = reg->next_in_set; + + if (registrations_[signal_number] == reg) + registrations_[signal_number] = reg->next_in_table; + if (reg->prev_in_table) + reg->prev_in_table->next_in_table = reg->next_in_table; + if (reg->next_in_table) + reg->next_in_table->prev_in_table = reg->prev_in_table; + + --state->registration_count[signal_number]; + --registration_count_[signal_number]; + + delete reg; + } + + return {}; +} + +void +posix_signals:: +cancel_wait(posix_signal_impl& impl) +{ + bool was_waiting = false; + signal_op* op = nullptr; + + { + std::lock_guard lock(mutex_); + if (impl.waiting_) + { + was_waiting = true; + impl.waiting_ = false; + op = &impl.pending_op_; + } + } + + if (was_waiting) + { + if (op->ec_out) + *op->ec_out = make_error_code(capy::error::canceled); + if (op->signal_out) + *op->signal_out = 0; + op->d.post(capy::any_coro{op->h}); + sched_.on_work_finished(); + } +} + +void +posix_signals:: +start_wait(posix_signal_impl& impl, signal_op* op) +{ + { + std::lock_guard lock(mutex_); + + signal_registration* reg = impl.signals_; + while (reg) + { + if (reg->undelivered > 0) + { + --reg->undelivered; + op->signal_number = reg->signal_number; + op->svc = nullptr; + sched_.post(op); + return; + } + reg = reg->next_in_set; + } + + // No queued signals - wait for delivery. + // svc is set so signal_op::operator() calls work_finished(). + impl.waiting_ = true; + op->svc = this; + sched_.on_work_started(); + } +} + +void +posix_signals:: +deliver_signal(int signal_number) +{ + if (signal_number < 1 || signal_number >= max_signal_number) + return; + + signal_state* state = get_signal_state(); + std::lock_guard lock(state->mutex); + + posix_signals* service = state->service_list; + while (service) + { + std::lock_guard svc_lock(service->mutex_); + + signal_registration* reg = service->registrations_[signal_number]; + while (reg) + { + posix_signal_impl* impl = reg->owner; + + if (impl->waiting_) + { + impl->waiting_ = false; + impl->pending_op_.signal_number = signal_number; + service->post(&impl->pending_op_); + } + else + { + ++reg->undelivered; + } + + reg = reg->next_in_table; + } + + service = service->next_; + } +} + +void +posix_signals:: +work_started() noexcept +{ + sched_.work_started(); +} + +void +posix_signals:: +work_finished() noexcept +{ + sched_.work_finished(); +} + +void +posix_signals:: +post(signal_op* op) +{ + sched_.post(op); +} + +void +posix_signals:: +add_service(posix_signals* service) +{ + signal_state* state = get_signal_state(); + std::lock_guard lock(state->mutex); + + service->next_ = state->service_list; + service->prev_ = nullptr; + if (state->service_list) + state->service_list->prev_ = service; + state->service_list = service; +} + +void +posix_signals:: +remove_service(posix_signals* service) +{ + signal_state* state = get_signal_state(); + std::lock_guard lock(state->mutex); + + if (service->next_ || service->prev_ || state->service_list == service) + { + if (state->service_list == service) + state->service_list = service->next_; + if (service->prev_) + service->prev_->next_ = service->next_; + if (service->next_) + service->next_->prev_ = service->prev_; + service->next_ = nullptr; + service->prev_ = nullptr; + } +} + +} // namespace detail + +//------------------------------------------------------------------------------ +// +// signal_set implementation +// +//------------------------------------------------------------------------------ signal_set:: ~signal_set() @@ -26,50 +536,46 @@ signal_set:: signal_set(capy::execution_context& ctx) : io_object(ctx) { - // Stub: signal_set not supported on this platform - impl_ = nullptr; + impl_ = &ctx.use_service().create_impl(); } signal_set:: -signal_set(capy::execution_context& ctx, int) +signal_set(capy::execution_context& ctx, int signal_number_1) : io_object(ctx) { - impl_ = nullptr; - detail::throw_system_error( - make_error_code(system::errc::function_not_supported), - "signal_set: not supported on this platform"); + impl_ = &ctx.use_service().create_impl(); + add(signal_number_1); } signal_set:: signal_set( capy::execution_context& ctx, - int, - int) + int signal_number_1, + int signal_number_2) : io_object(ctx) { - impl_ = nullptr; - detail::throw_system_error( - make_error_code(system::errc::function_not_supported), - "signal_set: not supported on this platform"); + impl_ = &ctx.use_service().create_impl(); + add(signal_number_1); + add(signal_number_2); } signal_set:: signal_set( capy::execution_context& ctx, - int, - int, - int) + int signal_number_1, + int signal_number_2, + int signal_number_3) : io_object(ctx) { - impl_ = nullptr; - detail::throw_system_error( - make_error_code(system::errc::function_not_supported), - "signal_set: not supported on this platform"); + impl_ = &ctx.use_service().create_impl(); + add(signal_number_1); + add(signal_number_2); + add(signal_number_3); } signal_set:: signal_set(signal_set&& other) noexcept - : io_object(other.context()) + : io_object(std::move(other)) { impl_ = other.impl_; other.impl_ = nullptr; @@ -82,8 +588,7 @@ operator=(signal_set&& other) if (this != &other) { if (ctx_ != other.ctx_) - detail::throw_logic_error( - "signal_set::operator=: context mismatch"); + detail::throw_logic_error("signal_set::operator=: context mismatch"); if (impl_) impl_->release(); @@ -98,42 +603,39 @@ void signal_set:: add(int signal_number) { - system::error_code ec; - add(signal_number, ec); + system::error_code ec = get().add(signal_number); if (ec) detail::throw_system_error(ec, "signal_set::add"); } void signal_set:: -add(int, system::error_code& ec) +add(int signal_number, system::error_code& ec) { - ec = make_error_code(system::errc::function_not_supported); + ec = get().add(signal_number); } void signal_set:: remove(int signal_number) { - system::error_code ec; - remove(signal_number, ec); + system::error_code ec = get().remove(signal_number); if (ec) detail::throw_system_error(ec, "signal_set::remove"); } void signal_set:: -remove(int, system::error_code& ec) +remove(int signal_number, system::error_code& ec) { - ec = make_error_code(system::errc::function_not_supported); + ec = get().remove(signal_number); } void signal_set:: clear() { - system::error_code ec; - clear(ec); + system::error_code ec = get().clear(); if (ec) detail::throw_system_error(ec, "signal_set::clear"); } @@ -142,14 +644,14 @@ void signal_set:: clear(system::error_code& ec) { - ec = make_error_code(system::errc::function_not_supported); + ec = get().clear(); } void signal_set:: cancel() { - // No-op: nothing to cancel on stub implementation + get().cancel(); } } // namespace corosio diff --git a/src/corosio/src/detail/posix_signals.hpp b/src/corosio/src/detail/posix_signals.hpp new file mode 100644 index 0000000..944479a --- /dev/null +++ b/src/corosio/src/detail/posix_signals.hpp @@ -0,0 +1,231 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_POSIX_SIGNALS_HPP +#define BOOST_COROSIO_DETAIL_POSIX_SIGNALS_HPP + +#include +#include +#include +#include +#include +#include + +#include "src/detail/posix_op.hpp" +#include "src/detail/scheduler_op.hpp" + +#include +#include +#include +#include + +#include + +namespace boost { +namespace corosio { +namespace detail { + +class posix_scheduler; +class posix_signals; +class posix_signal_impl; + +// Maximum signal number supported (NSIG is typically 64 on Linux) +enum { max_signal_number = 64 }; + +//------------------------------------------------------------------------------ + +/** Signal wait operation state. */ +struct signal_op : scheduler_op +{ + std::coroutine_handle<> h; + capy::any_executor_ref d; + system::error_code* ec_out = nullptr; + int* signal_out = nullptr; + int signal_number = 0; + posix_signals* svc = nullptr; // For work_finished callback + + void operator()() override; + void destroy() override; +}; + +//------------------------------------------------------------------------------ + +/** Per-signal registration tracking. */ +struct signal_registration +{ + int signal_number = 0; + posix_signal_impl* owner = nullptr; + std::size_t undelivered = 0; + signal_registration* next_in_table = nullptr; + signal_registration* prev_in_table = nullptr; + signal_registration* next_in_set = nullptr; +}; + +//------------------------------------------------------------------------------ + +/** Signal set implementation for POSIX using signalfd. + + This class contains the state for a single signal_set, including + registered signals and pending wait operation. + + @note Internal implementation detail. Users interact with signal_set class. +*/ +class posix_signal_impl + : public signal_set::signal_set_impl + , public capy::intrusive_list::node +{ + friend class posix_signals; + + posix_signals& svc_; + signal_registration* signals_ = nullptr; + signal_op pending_op_; + bool waiting_ = false; + +public: + explicit posix_signal_impl(posix_signals& svc) noexcept; + + void release() override; + + void wait( + std::coroutine_handle<>, + capy::any_executor_ref, + std::stop_token, + system::error_code*, + int*) override; + + system::error_code add(int signal_number) override; + system::error_code remove(int signal_number) override; + system::error_code clear() override; + void cancel() override; +}; + +//------------------------------------------------------------------------------ + +/** POSIX signal management service using signalfd. + + This service owns all signal set implementations and coordinates + their lifecycle. It provides: + + - Signal implementation allocation and deallocation + - Signal registration via signalfd + - Global signal state management + - Graceful shutdown - destroys all implementations when io_context stops + + @par Thread Safety + All public member functions are thread-safe. + + @note Only available on POSIX platforms with signalfd support. +*/ +class posix_signals : public capy::execution_context::service +{ +public: + using key_type = posix_signals; + + /** Construct the signal service. + + @param ctx Reference to the owning execution_context. + */ + explicit posix_signals(capy::execution_context& ctx); + + /** Destroy the signal service. */ + ~posix_signals(); + + posix_signals(posix_signals const&) = delete; + posix_signals& operator=(posix_signals const&) = delete; + + /** Shut down the service. */ + void shutdown() override; + + /** Create a new signal implementation. */ + posix_signal_impl& create_impl(); + + /** Destroy a signal implementation. */ + void destroy_impl(posix_signal_impl& impl); + + /** Add a signal to a signal set. + + @param impl The signal implementation to modify. + @param signal_number The signal to register. + @return Error code, or success. + */ + system::error_code add_signal( + posix_signal_impl& impl, + int signal_number); + + /** Remove a signal from a signal set. + + @param impl The signal implementation to modify. + @param signal_number The signal to unregister. + @return Error code, or success. + */ + system::error_code remove_signal( + posix_signal_impl& impl, + int signal_number); + + /** Remove all signals from a signal set. + + @param impl The signal implementation to clear. + @return Error code, or success. + */ + system::error_code clear_signals(posix_signal_impl& impl); + + /** Cancel pending wait operations. + + @param impl The signal implementation to cancel. + */ + void cancel_wait(posix_signal_impl& impl); + + /** Start a wait operation. + + @param impl The signal implementation. + @param op The operation to start. + */ + void start_wait(posix_signal_impl& impl, signal_op* op); + + /** Deliver a signal to all registered services. + + Called from the C signal handler. + + @param signal_number The signal that occurred. + */ + static void deliver_signal(int signal_number); + + /** Notify scheduler of pending work. */ + void work_started() noexcept; + + /** Notify scheduler that work completed. */ + void work_finished() noexcept; + + /** Post an operation for completion. */ + void post(signal_op* op); + +private: + static void add_service(posix_signals* service); + static void remove_service(posix_signals* service); + + posix_scheduler& sched_; + std::mutex mutex_; + capy::intrusive_list impl_list_; + + // Per-signal registration table + signal_registration* registrations_[max_signal_number]; + + // Registration counts for each signal + std::size_t registration_count_[max_signal_number]; + + // Linked list of all posix_signals services for signal delivery + posix_signals* next_ = nullptr; + posix_signals* prev_ = nullptr; +}; + +} // namespace detail +} // namespace corosio +} // namespace boost + +#endif diff --git a/src/corosio/src/detail/posix_sockets.hpp b/src/corosio/src/detail/posix_sockets.hpp index 21bcc28..b96e997 100644 --- a/src/corosio/src/detail/posix_sockets.hpp +++ b/src/corosio/src/detail/posix_sockets.hpp @@ -246,13 +246,11 @@ connect( op.fd = fd_; op.start(token); - // Initiate non-blocking connect sockaddr_in addr = detail::to_sockaddr_in(ep); int result = ::connect(fd_, reinterpret_cast(&addr), sizeof(addr)); if (result == 0) { - // Immediate success (rare for TCP) op.complete(0, 0); svc_.post(&op); return; @@ -260,13 +258,11 @@ connect( if (errno == EINPROGRESS) { - // Connection in progress - register for write-ready svc_.work_started(); svc_.scheduler().register_fd(fd_, &op, EPOLLOUT | EPOLLET); return; } - // Immediate error op.complete(errno, 0); svc_.post(&op); } @@ -290,7 +286,6 @@ read_some( op.fd = fd_; op.start(token); - // Fill iovecs from buffer sequence capy::mutable_buffer bufs[posix_read_op::max_buffers]; op.iovec_count = static_cast(param.copy_to(bufs, posix_read_op::max_buffers)); for (int i = 0; i < op.iovec_count; ++i) @@ -299,12 +294,10 @@ read_some( op.iovecs[i].iov_len = bufs[i].size(); } - // Try immediate read first ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count); if (n > 0) { - // Got data immediately op.complete(0, static_cast(n)); svc_.post(&op); return; @@ -312,7 +305,6 @@ read_some( if (n == 0) { - // EOF op.complete(0, 0); svc_.post(&op); return; @@ -320,13 +312,11 @@ read_some( if (errno == EAGAIN || errno == EWOULDBLOCK) { - // Would block - register for read-ready svc_.work_started(); svc_.scheduler().register_fd(fd_, &op, EPOLLIN | EPOLLET); return; } - // Immediate error op.complete(errno, 0); svc_.post(&op); } @@ -350,7 +340,6 @@ write_some( op.fd = fd_; op.start(token); - // Fill iovecs from buffer sequence capy::mutable_buffer bufs[posix_write_op::max_buffers]; op.iovec_count = static_cast(param.copy_to(bufs, posix_write_op::max_buffers)); for (int i = 0; i < op.iovec_count; ++i) @@ -359,12 +348,10 @@ write_some( op.iovecs[i].iov_len = bufs[i].size(); } - // Try immediate write first ssize_t n = ::writev(fd_, op.iovecs, op.iovec_count); if (n > 0) { - // Wrote data immediately op.complete(0, static_cast(n)); svc_.post(&op); return; @@ -372,13 +359,12 @@ write_some( if (errno == EAGAIN || errno == EWOULDBLOCK) { - // Would block - register for write-ready svc_.work_started(); svc_.scheduler().register_fd(fd_, &op, EPOLLOUT | EPOLLET); return; } - // Immediate error (including n == 0 which shouldn't happen for TCP) + // n == 0 shouldn't happen for TCP stream sockets op.complete(errno ? errno : EIO, 0); svc_.post(&op); } @@ -398,7 +384,6 @@ close_socket() noexcept { if (fd_ >= 0) { - // Unregister from epoll before closing svc_.scheduler().unregister_fd(fd_); ::close(fd_); fd_ = -1; @@ -442,7 +427,7 @@ accept( op.fd = fd_; op.start(token); - // Set up callback for creating peer impl when accept completes via epoll + // Callback for creating peer socket when accept completes via epoll op.service_ptr = &svc_; op.create_peer = [](void* svc_ptr, int new_fd) -> io_object::io_object_impl* { auto& svc = *static_cast(svc_ptr); @@ -451,7 +436,6 @@ accept( return &peer_impl; }; - // Try immediate accept first sockaddr_in addr{}; socklen_t addrlen = sizeof(addr); int accepted = ::accept4(fd_, reinterpret_cast(&addr), @@ -459,7 +443,6 @@ accept( if (accepted >= 0) { - // Got a connection immediately auto& peer_impl = svc_.create_impl(); peer_impl.set_socket(accepted); op.accepted_fd = accepted; @@ -471,13 +454,11 @@ accept( if (errno == EAGAIN || errno == EWOULDBLOCK) { - // No pending connections - register for read-ready svc_.work_started(); svc_.scheduler().register_fd(fd_, &op, EPOLLIN | EPOLLET); return; } - // Immediate error op.complete(errno, 0); svc_.post(&op); } @@ -495,7 +476,6 @@ close_socket() noexcept { if (fd_ >= 0) { - // Unregister from epoll before closing svc_.scheduler().unregister_fd(fd_); ::close(fd_); fd_ = -1; @@ -525,14 +505,12 @@ shutdown() { std::lock_guard lock(mutex_); - // Close all sockets while (auto* impl = socket_list_.pop_front()) { impl->close_socket(); delete impl; } - // Close all acceptors while (auto* impl = acceptor_list_.pop_front()) { impl->close_socket(); @@ -572,12 +550,9 @@ open_socket(posix_socket_impl& impl) { impl.close_socket(); - // Create non-blocking TCP socket int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (fd < 0) - { return system::error_code(errno, system::system_category()); - } impl.fd_ = fd; return {}; @@ -618,18 +593,13 @@ open_acceptor( { impl.close_socket(); - // Create non-blocking TCP socket int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (fd < 0) - { return system::error_code(errno, system::system_category()); - } - // Allow address reuse int reuse = 1; ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); - // Bind to endpoint sockaddr_in addr = detail::to_sockaddr_in(ep); if (::bind(fd, reinterpret_cast(&addr), sizeof(addr)) < 0) { @@ -638,7 +608,6 @@ open_acceptor( return system::error_code(err, system::system_category()); } - // Start listening if (::listen(fd, backlog) < 0) { int err = errno; diff --git a/src/corosio/src/detail/timer_service.cpp b/src/corosio/src/detail/timer_service.cpp index 2623e0e..0a97976 100644 --- a/src/corosio/src/detail/timer_service.cpp +++ b/src/corosio/src/detail/timer_service.cpp @@ -12,12 +12,14 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include @@ -415,7 +417,15 @@ wait( timer::timer_impl* timer_service_create(capy::execution_context& ctx) { - return ctx.find_service()->create_impl(); + auto* svc = ctx.find_service(); + if (!svc) + { + // Timer service not yet created - this happens if io_context + // hasn't been constructed yet, or if the scheduler didn't + // initialize the timer service + throw std::runtime_error("timer_service not found"); + } + return svc->create_impl(); } void diff --git a/src/corosio/src/tcp_server.cpp b/src/corosio/src/tcp_server.cpp index c88849f..9ef03f0 100644 --- a/src/corosio/src/tcp_server.cpp +++ b/src/corosio/src/tcp_server.cpp @@ -26,14 +26,6 @@ tcp_server::push_aw::await_ready() const noexcept return false; } -std::coroutine_handle<> -tcp_server::push_aw::await_suspend( - std::coroutine_handle<> h) noexcept -{ - // Dispatch to server's executor before touching shared state - return self_.dispatch_.dispatch(h); -} - void tcp_server::push_aw::await_resume() noexcept { @@ -62,17 +54,6 @@ tcp_server::pop_aw::await_ready() const noexcept return self_.wv_.idle_ != nullptr; } -bool -tcp_server::pop_aw::await_suspend( - std::coroutine_handle<> h) noexcept -{ - wait_.h = h; - wait_.w = nullptr; - wait_.next = self_.waiters_; - self_.waiters_ = &wait_; - return true; -} - system::result tcp_server::pop_aw::await_resume() noexcept { diff --git a/src/corosio/src/test/mocket.cpp b/src/corosio/src/test/mocket.cpp index 7ba2990..9e38358 100644 --- a/src/corosio/src/test/mocket.cpp +++ b/src/corosio/src/test/mocket.cpp @@ -493,23 +493,27 @@ make_mockets(capy::execution_context& ctx, capy::test::fuse& f) socket accepted_socket(ctx); // Launch accept operation + // Note: Pass captures as parameters to store them in the coroutine frame, + // avoiding use-after-scope when the lambda temporary is destroyed. capy::run_async(ex)( - [&]() -> capy::task<> + [](acceptor& a, socket& s, + system::error_code& ec_out, bool& done_out) -> capy::task<> { - auto [ec] = co_await acc.accept(accepted_socket); - accept_ec = ec; - accept_done = true; - }()); + auto [ec] = co_await a.accept(s); + ec_out = ec; + done_out = true; + }(acc, accepted_socket, accept_ec, accept_done)); // Launch connect operation capy::run_async(ex)( - [&]() -> capy::task<> + [](socket& s, endpoint ep, + system::error_code& ec_out, bool& done_out) -> capy::task<> { - auto [ec] = co_await impl2.get_socket().connect( - endpoint(urls::ipv4_address::loopback(), port)); - connect_ec = ec; - connect_done = true; - }()); + auto [ec] = co_await s.connect(ep); + ec_out = ec; + done_out = true; + }(impl2.get_socket(), endpoint(urls::ipv4_address::loopback(), port), + connect_ec, connect_done)); // Run until both complete ioc.run(); diff --git a/test/unit/Jamfile b/test/unit/Jamfile index 0c5fd69..be9d406 100644 --- a/test/unit/Jamfile +++ b/test/unit/Jamfile @@ -24,7 +24,10 @@ run acceptor.cpp ; run consuming_buffers.cpp ; run io_context.cpp ; run io_result.cpp ; +run mocket.cpp ; run read.cpp ; +run signal_set.cpp ; run socket.cpp ; run tcp_server.cpp ; +run timer.cpp ; run write.cpp ; diff --git a/test/unit/mocket.cpp b/test/unit/mocket.cpp index 8ae2d5b..5feca8a 100644 --- a/test/unit/mocket.cpp +++ b/test/unit/mocket.cpp @@ -45,35 +45,37 @@ struct mocket_test m1.expect("write_to_m1"); m2.expect("write_to_m2"); + // Note: Pass captures as parameters to store them in the coroutine frame, + // avoiding use-after-scope when the lambda temporary is destroyed. capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](mocket& m1_ref, mocket& m2_ref) -> capy::task<> { char buf[32] = {}; // m2 reads from m1's provide - auto [ec1, n1] = co_await m2.read_some( + auto [ec1, n1] = co_await m2_ref.read_some( capy::mutable_buffer(buf, sizeof(buf))); BOOST_TEST(!ec1); BOOST_TEST_EQ(std::string_view(buf, n1), "hello_from_m1"); // m1 reads from m2's provide - auto [ec2, n2] = co_await m1.read_some( + auto [ec2, n2] = co_await m1_ref.read_some( capy::mutable_buffer(buf, sizeof(buf))); BOOST_TEST(!ec2); BOOST_TEST_EQ(std::string_view(buf, n2), "hello_from_m2"); // Write to m1's expect - auto [ec3, n3] = co_await m1.write_some( + auto [ec3, n3] = co_await m1_ref.write_some( capy::const_buffer("write_to_m1", 11)); BOOST_TEST(!ec3); BOOST_TEST_EQ(n3, 11u); // Write to m2's expect - auto [ec4, n4] = co_await m2.write_some( + auto [ec4, n4] = co_await m2_ref.write_some( capy::const_buffer("write_to_m2", 11)); BOOST_TEST(!ec4); BOOST_TEST_EQ(n4, 11u); - }()); + }(m1, m2)); ioc.run(); ioc.restart(); diff --git a/test/unit/signal_set.cpp b/test/unit/signal_set.cpp index 42fc1fd..e4c50e1 100644 --- a/test/unit/signal_set.cpp +++ b/test/unit/signal_set.cpp @@ -10,8 +10,6 @@ // Test that header file is self-contained. #include -#ifdef _WIN32 - #include #include #include @@ -236,22 +234,22 @@ struct signal_set_test system::error_code result_ec; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, system::error_code& ec_out, int& sig_out, bool& done_out) -> capy::task<> { - auto [ec, signum] = co_await s.async_wait(); - result_ec = ec; - received_signal = signum; - completed = true; - }()); + auto [ec, signum] = co_await s_ref.async_wait(); + ec_out = ec; + sig_out = signum; + done_out = true; + }(s, result_ec, received_signal, completed)); // Raise signal after a short delay t.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref) -> capy::task<> { - co_await t.wait(); + co_await t_ref.wait(); std::raise(SIGINT); - }()); + }(t)); ioc.run(); BOOST_TEST(completed); @@ -270,21 +268,21 @@ struct signal_set_test int received_signal = 0; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, int& sig_out, bool& done_out) -> capy::task<> { - auto [ec, signum] = co_await s.async_wait(); - received_signal = signum; - completed = true; + auto [ec, signum] = co_await s_ref.async_wait(); + sig_out = signum; + done_out = true; (void)ec; - }()); + }(s, received_signal, completed)); t.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref) -> capy::task<> { - co_await t.wait(); + co_await t_ref.wait(); std::raise(SIGTERM); - }()); + }(t)); ioc.run(); BOOST_TEST(completed); @@ -306,21 +304,21 @@ struct signal_set_test system::error_code result_ec; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, system::error_code& ec_out, bool& done_out) -> capy::task<> { - auto [ec, signum] = co_await s.async_wait(); - result_ec = ec; - completed = true; + auto [ec, signum] = co_await s_ref.async_wait(); + ec_out = ec; + done_out = true; (void)signum; - }()); + }(s, result_ec, completed)); cancel_timer.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, signal_set& s_ref) -> capy::task<> { - co_await cancel_timer.wait(); - s.cancel(); - }()); + co_await t_ref.wait(); + s_ref.cancel(); + }(cancel_timer, s)); ioc.run(); BOOST_TEST(completed); @@ -367,30 +365,30 @@ struct signal_set_test int s2_signal = 0; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, int& sig_out, bool& done_out) -> capy::task<> { - auto [ec, signum] = co_await s1.async_wait(); - s1_signal = signum; - s1_completed = true; + auto [ec, signum] = co_await s_ref.async_wait(); + sig_out = signum; + done_out = true; (void)ec; - }()); + }(s1, s1_signal, s1_completed)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, int& sig_out, bool& done_out) -> capy::task<> { - auto [ec, signum] = co_await s2.async_wait(); - s2_signal = signum; - s2_completed = true; + auto [ec, signum] = co_await s_ref.async_wait(); + sig_out = signum; + done_out = true; (void)ec; - }()); + }(s2, s2_signal, s2_completed)); t.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref) -> capy::task<> { - co_await t.wait(); + co_await t_ref.wait(); std::raise(SIGINT); - }()); + }(t)); ioc.run(); BOOST_TEST(s1_completed); @@ -410,22 +408,22 @@ struct signal_set_test int received_signal = 0; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, int& sig_out, bool& done_out) -> capy::task<> { - auto [ec, signum] = co_await s.async_wait(); - received_signal = signum; - completed = true; + auto [ec, signum] = co_await s_ref.async_wait(); + sig_out = signum; + done_out = true; (void)ec; - }()); + }(s, received_signal, completed)); // Raise SIGTERM (not SIGINT) t.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref) -> capy::task<> { - co_await t.wait(); + co_await t_ref.wait(); std::raise(SIGTERM); - }()); + }(t)); ioc.run(); BOOST_TEST(completed); @@ -449,13 +447,13 @@ struct signal_set_test int received_signal = 0; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, int& sig_out, bool& done_out) -> capy::task<> { - auto [ec, signum] = co_await s.async_wait(); - received_signal = signum; - completed = true; + auto [ec, signum] = co_await s_ref.async_wait(); + sig_out = signum; + done_out = true; (void)ec; - }()); + }(s, received_signal, completed)); ioc.run(); BOOST_TEST(completed); @@ -476,28 +474,28 @@ struct signal_set_test int wait_count = 0; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, timer& t_ref, int& count_out) -> capy::task<> { // First wait - t.expires_after(std::chrono::milliseconds(5)); - co_await t.wait(); + t_ref.expires_after(std::chrono::milliseconds(5)); + co_await t_ref.wait(); std::raise(SIGINT); - auto [ec1, sig1] = co_await s.async_wait(); + auto [ec1, sig1] = co_await s_ref.async_wait(); BOOST_TEST(!ec1); BOOST_TEST_EQ(sig1, SIGINT); - ++wait_count; + ++count_out; // Second wait - t.expires_after(std::chrono::milliseconds(5)); - co_await t.wait(); + t_ref.expires_after(std::chrono::milliseconds(5)); + co_await t_ref.wait(); std::raise(SIGINT); - auto [ec2, sig2] = co_await s.async_wait(); + auto [ec2, sig2] = co_await s_ref.async_wait(); BOOST_TEST(!ec2); BOOST_TEST_EQ(sig2, SIGINT); - ++wait_count; - }()); + ++count_out; + }(s, t, wait_count)); ioc.run(); BOOST_TEST_EQ(wait_count, 2); @@ -517,15 +515,15 @@ struct signal_set_test bool result_ok = false; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, timer& t_ref, bool& ok_out) -> capy::task<> { - t.expires_after(std::chrono::milliseconds(5)); - co_await t.wait(); + t_ref.expires_after(std::chrono::milliseconds(5)); + co_await t_ref.wait(); std::raise(SIGINT); - auto result = co_await s.async_wait(); - result_ok = static_cast(result); - }()); + auto result = co_await s_ref.async_wait(); + ok_out = static_cast(result); + }(s, t, result_ok)); ioc.run(); BOOST_TEST(result_ok); @@ -542,20 +540,20 @@ struct signal_set_test system::error_code result_ec; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, bool& ok_out, system::error_code& ec_out) -> capy::task<> { - auto result = co_await s.async_wait(); - result_ok = static_cast(result); - result_ec = result.ec; - }()); + auto result = co_await s_ref.async_wait(); + ok_out = static_cast(result); + ec_out = result.ec; + }(s, result_ok, result_ec)); cancel_timer.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, signal_set& s_ref) -> capy::task<> { - co_await cancel_timer.wait(); - s.cancel(); - }()); + co_await t_ref.wait(); + s_ref.cancel(); + }(cancel_timer, s)); ioc.run(); BOOST_TEST(!result_ok); @@ -573,16 +571,16 @@ struct signal_set_test int captured_signal = 0; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](signal_set& s_ref, timer& t_ref, system::error_code& ec_out, int& sig_out) -> capy::task<> { - t.expires_after(std::chrono::milliseconds(5)); - co_await t.wait(); + t_ref.expires_after(std::chrono::milliseconds(5)); + co_await t_ref.wait(); std::raise(SIGINT); - auto [ec, signum] = co_await s.async_wait(); - captured_ec = ec; - captured_signal = signum; - }()); + auto [ec, signum] = co_await s_ref.async_wait(); + ec_out = ec; + sig_out = signum; + }(s, t, captured_ec, captured_signal)); ioc.run(); BOOST_TEST(!captured_ec); @@ -644,4 +642,3 @@ TEST_SUITE(signal_set_test, "boost.corosio.signal_set"); } // namespace corosio } // namespace boost -#endif // _WIN32 diff --git a/test/unit/timer.cpp b/test/unit/timer.cpp index 75b96b6..480a09c 100644 --- a/test/unit/timer.cpp +++ b/test/unit/timer.cpp @@ -185,12 +185,12 @@ struct timer_test t.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, system::error_code& ec_out, bool& done_out) -> capy::task<> { - auto [ec] = co_await t.wait(); - result_ec = ec; - completed = true; - }()); + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done_out = true; + }(t, result_ec, completed)); ioc.run(); BOOST_TEST(completed); @@ -209,12 +209,12 @@ struct timer_test t.expires_after(std::chrono::milliseconds(50)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, timer::time_point start_val, timer::duration& elapsed_out) -> capy::task<> { - auto [ec] = co_await t.wait(); - elapsed = timer::clock_type::now() - start; + auto [ec] = co_await t_ref.wait(); + elapsed_out = timer::clock_type::now() - start_val; (void)ec; - }()); + }(t, start, elapsed)); ioc.run(); @@ -234,12 +234,12 @@ struct timer_test t.expires_at(timer::clock_type::now() - std::chrono::seconds(1)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, system::error_code& ec_out, bool& done_out) -> capy::task<> { - auto [ec] = co_await t.wait(); - result_ec = ec; - completed = true; - }()); + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done_out = true; + }(t, result_ec, completed)); ioc.run(); BOOST_TEST(completed); @@ -258,12 +258,12 @@ struct timer_test t.expires_after(std::chrono::milliseconds(0)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, system::error_code& ec_out, bool& done_out) -> capy::task<> { - auto [ec] = co_await t.wait(); - result_ec = ec; - completed = true; - }()); + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done_out = true; + }(t, result_ec, completed)); ioc.run(); BOOST_TEST(completed); @@ -288,19 +288,19 @@ struct timer_test cancel_timer.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, system::error_code& ec_out, bool& done_out) -> capy::task<> { - auto [ec] = co_await t.wait(); - result_ec = ec; - completed = true; - }()); + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done_out = true; + }(t, result_ec, completed)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& cancel_t_ref, timer& t_ref) -> capy::task<> { - co_await cancel_timer.wait(); - t.cancel(); - }()); + co_await cancel_t_ref.wait(); + t_ref.cancel(); + }(cancel_timer, t)); ioc.run(); BOOST_TEST(completed); @@ -347,19 +347,19 @@ struct timer_test delay_timer.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, system::error_code& ec_out, bool& done_out) -> capy::task<> { - auto [ec] = co_await t.wait(); - result_ec = ec; - completed = true; - }()); + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done_out = true; + }(t, result_ec, completed)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& delay_ref, timer& t_ref) -> capy::task<> { - co_await delay_timer.wait(); - t.expires_after(std::chrono::seconds(30)); - }()); + co_await delay_ref.wait(); + t_ref.expires_after(std::chrono::seconds(30)); + }(delay_timer, t)); ioc.run_for(std::chrono::milliseconds(100)); BOOST_TEST(completed); @@ -386,28 +386,28 @@ struct timer_test t3.expires_after(std::chrono::milliseconds(20)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, int& order_ref, int& t_order_out) -> capy::task<> { - auto [ec] = co_await t1.wait(); - t1_order = ++order; + auto [ec] = co_await t_ref.wait(); + t_order_out = ++order_ref; (void)ec; - }()); + }(t1, order, t1_order)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, int& order_ref, int& t_order_out) -> capy::task<> { - auto [ec] = co_await t2.wait(); - t2_order = ++order; + auto [ec] = co_await t_ref.wait(); + t_order_out = ++order_ref; (void)ec; - }()); + }(t2, order, t2_order)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, int& order_ref, int& t_order_out) -> capy::task<> { - auto [ec] = co_await t3.wait(); - t3_order = ++order; + auto [ec] = co_await t_ref.wait(); + t_order_out = ++order_ref; (void)ec; - }()); + }(t3, order, t3_order)); ioc.run(); @@ -430,20 +430,20 @@ struct timer_test t2.expires_at(expiry); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, bool& done_out) -> capy::task<> { - auto [ec] = co_await t1.wait(); - t1_done = true; + auto [ec] = co_await t_ref.wait(); + done_out = true; (void)ec; - }()); + }(t1, t1_done)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, bool& done_out) -> capy::task<> { - auto [ec] = co_await t2.wait(); - t2_done = true; + auto [ec] = co_await t_ref.wait(); + done_out = true; (void)ec; - }()); + }(t2, t2_done)); ioc.run(); @@ -464,23 +464,23 @@ struct timer_test int wait_count = 0; capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, int& count_out) -> capy::task<> { - t.expires_after(std::chrono::milliseconds(5)); - auto [ec1] = co_await t.wait(); + t_ref.expires_after(std::chrono::milliseconds(5)); + auto [ec1] = co_await t_ref.wait(); BOOST_TEST(!ec1); - ++wait_count; + ++count_out; - t.expires_after(std::chrono::milliseconds(5)); - auto [ec2] = co_await t.wait(); + t_ref.expires_after(std::chrono::milliseconds(5)); + auto [ec2] = co_await t_ref.wait(); BOOST_TEST(!ec2); - ++wait_count; + ++count_out; - t.expires_after(std::chrono::milliseconds(5)); - auto [ec3] = co_await t.wait(); + t_ref.expires_after(std::chrono::milliseconds(5)); + auto [ec3] = co_await t_ref.wait(); BOOST_TEST(!ec3); - ++wait_count; - }()); + ++count_out; + }(t, wait_count)); ioc.run(); BOOST_TEST_EQ(wait_count, 3); @@ -501,11 +501,11 @@ struct timer_test t.expires_after(std::chrono::milliseconds(5)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, bool& ok_out) -> capy::task<> { - auto result = co_await t.wait(); - result_ok = static_cast(result); - }()); + auto result = co_await t_ref.wait(); + ok_out = static_cast(result); + }(t, result_ok)); ioc.run(); BOOST_TEST(result_ok); @@ -525,19 +525,19 @@ struct timer_test cancel_timer.expires_after(std::chrono::milliseconds(10)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, bool& ok_out, system::error_code& ec_out) -> capy::task<> { - auto result = co_await t.wait(); - result_ok = static_cast(result); - result_ec = result.ec; - }()); + auto result = co_await t_ref.wait(); + ok_out = static_cast(result); + ec_out = result.ec; + }(t, result_ok, result_ec)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& cancel_t_ref, timer& t_ref) -> capy::task<> { - co_await cancel_timer.wait(); - t.cancel(); - }()); + co_await cancel_t_ref.wait(); + t_ref.cancel(); + }(cancel_timer, t)); ioc.run(); BOOST_TEST(!result_ok); @@ -555,11 +555,11 @@ struct timer_test t.expires_after(std::chrono::milliseconds(5)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, system::error_code& ec_out) -> capy::task<> { - auto [ec] = co_await t.wait(); - captured_ec = ec; - }()); + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + }(t, captured_ec)); ioc.run(); BOOST_TEST(!captured_ec); @@ -595,12 +595,12 @@ struct timer_test t.expires_after(std::chrono::milliseconds(-100)); capy::run_async(ioc.get_executor())( - [&]() -> capy::task<> + [](timer& t_ref, bool& done_out) -> capy::task<> { - auto [ec] = co_await t.wait(); - completed = true; + auto [ec] = co_await t_ref.wait(); + done_out = true; (void)ec; - }()); + }(t, completed)); ioc.run(); BOOST_TEST(completed);