Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions include/boost/corosio/tcp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
namespace boost {
namespace corosio {

#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable: 4251) // class needs to have dll-interface
#endif

class BOOST_COROSIO_DECL
tcp_server
{
Expand Down Expand Up @@ -126,7 +131,15 @@ class BOOST_COROSIO_DECL
public:
push_aw(tcp_server& self, worker_base& w) noexcept;
bool await_ready() const noexcept;
std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept;

template<typename Ex>
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> h, Ex const&, std::stop_token) noexcept
{
// Dispatch to server's executor before touching shared state
return self_.dispatch_.dispatch(h);
}

void await_resume() noexcept;
};

Expand All @@ -142,7 +155,18 @@ class BOOST_COROSIO_DECL
public:
pop_aw(tcp_server& self) noexcept;
bool await_ready() const noexcept;
bool await_suspend(std::coroutine_handle<> h) noexcept;

template<typename Ex>
bool
await_suspend(std::coroutine_handle<> h, Ex const&, std::stop_token) noexcept
{
wait_.h = h;
wait_.w = nullptr;
wait_.next = self_.waiters_;
self_.waiters_ = &wait_;
return true;
}

system::result<worker_base&> await_resume() noexcept;
};

Expand All @@ -151,7 +175,7 @@ class BOOST_COROSIO_DECL
capy::task<void> do_accept(acceptor& acc);

protected:
class worker_base
class BOOST_COROSIO_DECL worker_base
{
worker_base* next = nullptr;

Expand All @@ -171,13 +195,21 @@ class BOOST_COROSIO_DECL
}
};

class workers
class BOOST_COROSIO_DECL workers
{
friend class tcp_server;

std::vector<std::unique_ptr<worker_base>> v_;
worker_base* idle_ = nullptr;

public:
workers() = default;
workers(workers const&) = delete;
workers& operator=(workers const&) = delete;
workers(workers&&) = default;
workers& operator=(workers&&) = default;

private:
void push(worker_base& w) noexcept
{
w.next = idle_;
Expand Down Expand Up @@ -208,7 +240,7 @@ class BOOST_COROSIO_DECL

workers wv_;

class launcher
class BOOST_COROSIO_DECL launcher
{
tcp_server* srv_;
worker_base* w_;
Expand Down Expand Up @@ -283,6 +315,10 @@ class BOOST_COROSIO_DECL
void start();
};

#ifdef _MSC_VER
#pragma warning(pop)
#endif

} // corosio
} // boost

Expand Down
10 changes: 3 additions & 7 deletions src/corosio/src/detail/posix_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ struct posix_op : scheduler_op
system::error_code* ec_out = nullptr;
std::size_t* bytes_out = nullptr;

int fd = -1; // Socket file descriptor
std::uint32_t events = 0; // Requested epoll events (EPOLLIN/EPOLLOUT)
int error = 0; // errno on completion
int fd = -1;
std::uint32_t events = 0;
int error = 0;
std::size_t bytes_transferred = 0;

std::atomic<bool> cancelled{false};
Expand Down Expand Up @@ -101,7 +101,6 @@ struct posix_op : scheduler_op
d.dispatch(h).resume();
}

// Returns true if this is a read operation (for EOF detection)
virtual bool is_read_operation() const noexcept { return false; }

void destroy() override
Expand Down Expand Up @@ -219,7 +218,6 @@ struct posix_accept_op : posix_op
io_object::io_object_impl* peer_impl = nullptr;
io_object::io_object_impl** impl_out = nullptr;

// Function to create peer impl - set by posix_sockets
using create_peer_fn = io_object::io_object_impl* (*)(void*, int);
create_peer_fn create_peer = nullptr;
void* service_ptr = nullptr;
Expand Down Expand Up @@ -269,14 +267,12 @@ struct posix_accept_op : posix_op

if (success && accepted_fd >= 0 && peer_impl)
{
// Pass impl to awaitable for assignment to peer socket
if (impl_out)
*impl_out = peer_impl;
peer_impl = nullptr;
}
else
{
// Cleanup on failure
if (accepted_fd >= 0)
{
::close(accepted_fd);
Expand Down
Loading
Loading