diff --git a/libs/server-sent-events/include/launchdarkly/sse/client.hpp b/libs/server-sent-events/include/launchdarkly/sse/client.hpp index 500467fd0..7e21596b6 100644 --- a/libs/server-sent-events/include/launchdarkly/sse/client.hpp +++ b/libs/server-sent-events/include/launchdarkly/sse/client.hpp @@ -34,6 +34,8 @@ class Builder { using EventReceiver = std::function; using LogCallback = std::function; using ErrorCallback = std::function; + using ConnectionHook = + std::function*)>; /** * Create a builder for the given URL. If the port is omitted, 443 is @@ -161,18 +163,39 @@ class Builder { * - HTTP proxies: "http://proxy:port" * - HTTPS proxies: "https://proxy:port" * - SOCKS4 proxies: "socks4://proxy:port" - * - SOCKS5 proxies: "socks5://proxy:port" or "socks5://user:pass@proxy:port" + * - SOCKS5 proxies: "socks5://proxy:port" or + * "socks5://user:pass@proxy:port" * - SOCKS5 with DNS through proxy: "socks5h://proxy:port" * - * Passing an empty string explicitly disables proxy (overrides environment variables). - * Passing std::nullopt (or not calling this method) uses environment variables. + * Passing an empty string explicitly disables proxy (overrides environment + * variables). Passing std::nullopt (or not calling this method) uses + * environment variables. * - * @param url Proxy URL, empty string to disable, or std::nullopt for environment variables + * @param url Proxy URL, empty string to disable, or std::nullopt for + * environment variables * @return Reference to this builder. - * @throws std::runtime_error if proxy is configured without CURL networking support + * @throws std::runtime_error if proxy is configured without CURL networking + * support */ Builder& proxy(std::optional url); + /** + * Register a hook invoked immediately before each connection attempt, + * including the initial connect and every reconnect. The hook receives + * a mutable request seeded with the Builder's defaults; it may modify + * the request's target, headers, method, and body. + * + * Host, port, and scheme are fixed at build time and cannot be changed + * by the hook. The Last-Event-ID header is managed by the client and + * any value the hook sets for it will be replaced before the request + * is sent. + * + * @param hook Callback invoked with a mutable request before each + * connection attempt. + * @return Reference to this builder. + */ + Builder& on_connect(ConnectionHook hook); + /** * Builds a Client. The shared pointer is necessary to extend the lifetime * of the Client to encompass each asynchronous operation that it performs. @@ -195,6 +218,7 @@ class Builder { bool skip_verify_peer_; std::optional custom_ca_file_; std::optional proxy_url_; + ConnectionHook connection_hook_; }; /** @@ -213,7 +237,8 @@ class Client { * when the SDK detects invalid data from the stream and needs to * reconnect. The backoff mechanism prevents rapid reconnection attempts * that could overload the service. - * @param reason A description of why the restart was triggered (for logging) + * @param reason A description of why the restart was triggered (for + * logging) */ virtual void async_restart(std::string const& reason) = 0; }; diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 99e8fdb10..e5ae7a8b6 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -74,6 +74,7 @@ class FoxyClient : public Client, Builder::EventReceiver receiver, Builder::LogCallback logger, Builder::ErrorCallback errors, + Builder::ConnectionHook connection_hook, std::optional maybe_ssl) : ssl_context_(std::move(maybe_ssl)), host_(std::move(host)), @@ -85,6 +86,7 @@ class FoxyClient : public Client, event_receiver_(std::move(receiver)), logger_(std::move(logger)), errors_(std::move(errors)), + connection_hook_(std::move(connection_hook)), body_parser_(std::nullopt), session_(std::nullopt), last_event_id_(std::nullopt), @@ -185,6 +187,16 @@ class FoxyClient : public Client, } void do_run() { + if (shutting_down_) { + return; + } + + if (connection_hook_) { + connection_hook_(&req_); + } + + req_.prepare_payload(); + session_->async_connect( host_, port_, beast::bind_front_handler(&FoxyClient::on_connect, @@ -278,6 +290,9 @@ class FoxyClient : public Client, return; } + host_ = new_url->host(); + port_ = + new_url->has_port() ? new_url->port() : new_url->scheme(); req_.set(http::field::host, new_url->host()); req_.target(new_url->encoded_target()); } else { @@ -496,6 +511,11 @@ class FoxyClient : public Client, // which the client communicates error conditions to the user. Builder::ErrorCallback errors_; + // Optional hook invoked before each connection attempt with a mutable + // request, allowing the caller to vary the request per connect (e.g. + // updating query parameters from external state). + Builder::ConnectionHook connection_hook_; + // Customized parser (see parser.hpp) which repeatedly receives chunks of // data and parses them into SSE events. It cannot be reused across // connections, hence the optional so it can be destroyed easily. @@ -624,6 +644,11 @@ Builder& Builder::proxy(std::optional url) { return *this; } +Builder& Builder::on_connect(ConnectionHook hook) { + connection_hook_ = std::move(hook); + return *this; +} + std::shared_ptr Builder::build() { auto uri_components = boost::urls::parse_uri(url_); if (!uri_components) { @@ -645,12 +670,12 @@ std::shared_ptr Builder::build() { } } - request.prepare_payload(); - std::string host = uri_components->host(); request.set(http::field::host, host); - request.target(uri_components->encoded_target()); + // RFC 7230: an empty path in origin-form must be sent as "/". + auto target = uri_components->encoded_target(); + request.target(target.empty() ? "/" : target); if (uri_components->has_scheme()) { if (!(uri_components->scheme_id() == boost::urls::scheme::http || @@ -667,12 +692,12 @@ std::shared_ptr Builder::build() { : uri_components->scheme(); #ifdef LD_CURL_NETWORKING - bool use_https = uri_components->scheme_id() == boost::urls::scheme::https; - return std::make_shared( - net::make_strand(executor_), request, host, service, - connect_timeout_, read_timeout_, write_timeout_, - initial_reconnect_delay_, receiver_, logging_cb_, error_cb_, - skip_verify_peer_, custom_ca_file_, use_https, proxy_url_); + bool use_https = uri_components->scheme_id() == boost::urls::scheme::https; + return std::make_shared( + net::make_strand(executor_), request, host, service, connect_timeout_, + read_timeout_, write_timeout_, initial_reconnect_delay_, receiver_, + logging_cb_, error_cb_, connection_hook_, skip_verify_peer_, + custom_ca_file_, use_https, proxy_url_); #else std::optional ssl; if (uri_components->scheme_id() == boost::urls::scheme::https) { @@ -694,7 +719,7 @@ std::shared_ptr Builder::build() { return std::make_shared( net::make_strand(executor_), request, host, service, connect_timeout_, read_timeout_, write_timeout_, initial_reconnect_delay_, receiver_, - logging_cb_, error_cb_, std::move(ssl)); + logging_cb_, error_cb_, connection_hook_, std::move(ssl)); #endif } diff --git a/libs/server-sent-events/src/curl_client.cpp b/libs/server-sent-events/src/curl_client.cpp index c99b6830a..58f18cbbf 100644 --- a/libs/server-sent-events/src/curl_client.cpp +++ b/libs/server-sent-events/src/curl_client.cpp @@ -28,41 +28,37 @@ constexpr auto kCurlTransferAbort = 1; constexpr auto kHttpStatusCodeMovedPermanently = 301; constexpr auto kHttpStatusCodeTemporaryRedirect = 307; -CurlClient::CurlClient(boost::asio::any_io_executor executor, - http::request req, - std::string host, - std::string port, - std::optional connect_timeout, - std::optional read_timeout, - std::optional write_timeout, - std::optional - initial_reconnect_delay, - Builder::EventReceiver receiver, - Builder::LogCallback logger, - Builder::ErrorCallback errors, - bool skip_verify_peer, - std::optional custom_ca_file, - bool use_https, - std::optional proxy_url) - : - host_(std::move(host)), - port_(std::move(port)), - event_receiver_(std::move(receiver)), - logger_(std::move(logger)), - errors_(std::move(errors)), - use_https_(use_https), - backoff_timer_(executor), - multi_manager_(CurlMultiManager::create(executor)), - backoff_(initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), - kDefaultMaxBackoffDelay) { +CurlClient::CurlClient( + boost::asio::any_io_executor executor, + http::request req, + std::string host, + std::string port, + std::optional connect_timeout, + std::optional read_timeout, + std::optional write_timeout, + std::optional initial_reconnect_delay, + Builder::EventReceiver receiver, + Builder::LogCallback logger, + Builder::ErrorCallback errors, + Builder::ConnectionHook connection_hook, + bool skip_verify_peer, + std::optional custom_ca_file, + bool use_https, + std::optional proxy_url) + : host_(std::move(host)), + port_(std::move(port)), + event_receiver_(std::move(receiver)), + logger_(std::move(logger)), + errors_(std::move(errors)), + connection_hook_(std::move(connection_hook)), + use_https_(use_https), + backoff_timer_(executor), + multi_manager_(CurlMultiManager::create(executor)), + backoff_(initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), + kDefaultMaxBackoffDelay) { request_context_ = std::make_shared( - build_url(req), - std::move(req), - connect_timeout, - read_timeout, - write_timeout, - std::move(custom_ca_file), - std::move(proxy_url), + build_url(req), std::move(req), connect_timeout, read_timeout, + write_timeout, std::move(custom_ca_file), std::move(proxy_url), skip_verify_peer); } @@ -77,14 +73,14 @@ void CurlClient::async_connect() { } void CurlClient::async_restart(std::string const& reason) { - boost::asio::post(backoff_timer_.get_executor(), - [self = shared_from_this(), reason]() { - // Close the socket to abort the current transfer. - // CURL will detect the error and call the completion - // handler, which will trigger backoff and reconnection. - self->log_message("async_restart: aborting transfer due to " + reason); - self->request_context_->abort_transfer(); - }); + boost::asio::post(backoff_timer_.get_executor(), [self = shared_from_this(), + reason]() { + // Close the socket to abort the current transfer. + // CURL will detect the error and call the completion + // handler, which will trigger backoff and reconnection. + self->log_message("async_restart: aborting transfer due to " + reason); + self->request_context_->abort_transfer(); + }); } void CurlClient::do_run() { @@ -92,59 +88,68 @@ void CurlClient::do_run() { return; } + if (connection_hook_) { + connection_hook_(&request_context_->req); + request_context_->url = build_url(request_context_->req); + } + + // Set Last-Event-ID if we have one from a previous connection, otherwise + // erase it to override any value set by the hook. + if (request_context_->last_event_id && + !request_context_->last_event_id->empty()) { + request_context_->req.set("last-event-id", + *request_context_->last_event_id); + } else { + request_context_->req.erase("last-event-id"); + } + + request_context_->req.prepare_payload(); + auto ctx = request_context_; auto weak_self = weak_from_this(); std::weak_ptr weak_ctx = ctx; - ctx->set_callbacks(Callbacks([weak_self, weak_ctx](const std::string& message) { - if (auto ctx = weak_ctx.lock()) { - if (auto self = weak_self.lock()) { - boost::asio::post( - self->backoff_timer_. - get_executor(), - [self, message]() { - self->async_backoff(message); - }); - } - } - }, - [weak_self, weak_ctx](const Event& event) { - if (auto ctx = weak_ctx.lock()) { - if (auto self = weak_self.lock()) { - boost::asio::post( - self->backoff_timer_. - get_executor(), - [self, event]() { - self->event_receiver_(event); - }); - } - } - }, - [weak_self, weak_ctx](const Error& error) { - if (auto ctx = weak_ctx.lock()) { - if (const auto self = weak_self.lock()) { - // report_error does an asio post. - self->report_error(error); - } - } - }, - [weak_self, weak_ctx]() { - if (auto ctx = weak_ctx.lock()) { - if (const auto self = weak_self.lock()) { - boost::asio::post( - self->backoff_timer_. - get_executor(), - [self]() { - self->backoff_.succeed(); - }); - } - } - }, [weak_self, weak_ctx](const std::string& message) { - if (auto ctx = weak_ctx.lock()) { - if (const auto self = weak_self.lock()) { - self->log_message(message); - } - } - })); + ctx->set_callbacks(Callbacks( + [weak_self, weak_ctx](std::string const& message) { + if (auto ctx = weak_ctx.lock()) { + if (auto self = weak_self.lock()) { + boost::asio::post( + self->backoff_timer_.get_executor(), + [self, message]() { self->async_backoff(message); }); + } + } + }, + [weak_self, weak_ctx](Event const& event) { + if (auto ctx = weak_ctx.lock()) { + if (auto self = weak_self.lock()) { + boost::asio::post( + self->backoff_timer_.get_executor(), + [self, event]() { self->event_receiver_(event); }); + } + } + }, + [weak_self, weak_ctx](Error const& error) { + if (auto ctx = weak_ctx.lock()) { + if (auto const self = weak_self.lock()) { + // report_error does an asio post. + self->report_error(error); + } + } + }, + [weak_self, weak_ctx]() { + if (auto ctx = weak_ctx.lock()) { + if (auto const self = weak_self.lock()) { + boost::asio::post(self->backoff_timer_.get_executor(), + [self]() { self->backoff_.succeed(); }); + } + } + }, + [weak_self, weak_ctx](std::string const& message) { + if (auto ctx = weak_ctx.lock()) { + if (auto const self = weak_self.lock()) { + self->log_message(message); + } + } + })); // Start request using CURL multi (non-blocking) PerformRequestWithMulti(multi_manager_, ctx); } @@ -155,14 +160,14 @@ void CurlClient::async_backoff(std::string const& reason) { std::stringstream msg; msg << "backing off in (" << std::chrono::duration_cast(backoff_.delay()) - .count() + .count() << ") seconds due to " << reason; log_message(msg.str()); auto weak_self = weak_from_this(); backoff_timer_.expires_after(backoff_.delay()); - backoff_timer_.async_wait([weak_self](const boost::system::error_code& ec) { + backoff_timer_.async_wait([weak_self](boost::system::error_code const& ec) { if (auto self = weak_self.lock()) { self->on_backoff(ec); } @@ -177,13 +182,14 @@ void CurlClient::on_backoff(boost::system::error_code const& ec) { } std::string CurlClient::build_url( - const http::request& req) const { - const std::string scheme = use_https_ ? "https" : "http"; + http::request const& req) const { + std::string const scheme = use_https_ ? "https" : "http"; std::string url = scheme + "://" + host_; // Add port if it's not the default service name - // port_ can be either a port number (like "8123") or service name (like "https"/"http") + // port_ can be either a port number (like "8123") or service name (like + // "https"/"http") if (port_ != "https" && port_ != "http") { url += ":" + port_; } @@ -198,15 +204,15 @@ bool CurlClient::SetupCurlOptions(CURL* curl, RequestContext& context) { // Helper macro to check curl_easy_setopt return values // Returns false on error to signal setup failure -#define CURL_SETOPT_CHECK(handle, option, parameter) \ - do { \ - CURLcode code = curl_easy_setopt(handle, option, parameter); \ - if (code != CURLE_OK) { \ - context.log_message("curl_easy_setopt failed for " #option ": " + \ - std::string(curl_easy_strerror(code))); \ - return false; \ - } \ - } while(0) +#define CURL_SETOPT_CHECK(handle, option, parameter) \ + do { \ + CURLcode code = curl_easy_setopt(handle, option, parameter); \ + if (code != CURLE_OK) { \ + context.log_message("curl_easy_setopt failed for " #option ": " + \ + std::string(curl_easy_strerror(code))); \ + return false; \ + } \ + } while (0) // Set URL CURL_SETOPT_CHECK(curl, CURLOPT_URL, context.url.c_str()); @@ -229,8 +235,7 @@ bool CurlClient::SetupCurlOptions(CURL* curl, // Set request body if present if (!context.req.body().empty()) { - CURL_SETOPT_CHECK(curl, CURLOPT_POSTFIELDS, - context.req.body().c_str()); + CURL_SETOPT_CHECK(curl, CURLOPT_POSTFIELDS, context.req.body().c_str()); CURL_SETOPT_CHECK(curl, CURLOPT_POSTFIELDSIZE, context.req.body().size()); } @@ -243,12 +248,6 @@ bool CurlClient::SetupCurlOptions(CURL* curl, headers = curl_slist_append(headers, header.c_str()); } - // Add Last-Event-ID if we have one from previous connection - if (context.last_event_id && !context.last_event_id->empty()) { - std::string last_event_header = "Last-Event-ID: " + *context.last_event_id; - headers = curl_slist_append(headers, last_event_header.c_str()); - } - if (headers) { CURL_SETOPT_CHECK(curl, CURLOPT_HTTPHEADER, headers); } @@ -288,7 +287,8 @@ bool CurlClient::SetupCurlOptions(CURL* curl, if (context.proxy_url) { CURL_SETOPT_CHECK(curl, CURLOPT_PROXY, context.proxy_url->c_str()); } - // If proxy_url_ is std::nullopt, CURL will use environment variables (default behavior) + // If proxy_url_ is std::nullopt, CURL will use environment variables + // (default behavior) // Set callbacks CURL_SETOPT_CHECK(curl, CURLOPT_WRITEFUNCTION, WriteCallback); @@ -324,7 +324,7 @@ int CurlClient::ProgressCallback(void* clientp, // Check if we've exceeded the read timeout if (context->read_timeout) { - const auto now = std::chrono::steady_clock::now(); + auto const now = std::chrono::steady_clock::now(); // If download amount has changed, update the last progress time if (dlnow != context->last_download_amount) { @@ -332,9 +332,9 @@ int CurlClient::ProgressCallback(void* clientp, context->last_progress_time = now; } else { // No new data - check if we've exceeded the timeout - const auto elapsed = std::chrono::duration_cast< - std::chrono::milliseconds>( - now - context->last_progress_time); + auto const elapsed = + std::chrono::duration_cast( + now - context->last_progress_time); if (elapsed > *context->read_timeout) { return kCurlTransferAbort; @@ -350,12 +350,12 @@ int CurlClient::ProgressCallback(void* clientp, // https://curl.se/libcurl/c/CURLOPT_OPENSOCKETFUNCTION.html curl_socket_t CurlClient::OpenSocketCallback(void* clientp, curlsocktype purpose, - const curl_sockaddr* address) { + curl_sockaddr const* address) { auto* context = static_cast(clientp); // Create the socket - curl_socket_t sockfd = socket(address->family, address->socktype, - address->protocol); + curl_socket_t sockfd = + socket(address->family, address->socktype, address->protocol); // Store it so we can close it during shutdown if (sockfd != CURL_SOCKET_BAD) { @@ -368,7 +368,7 @@ curl_socket_t CurlClient::OpenSocketCallback(void* clientp, // Callback for writing response data // // https://curl.se/libcurl/c/CURLOPT_WRITEFUNCTION.html -size_t CurlClient::WriteCallback(const char* data, +size_t CurlClient::WriteCallback(char const* data, size_t size, size_t nmemb, void* userp) { @@ -376,7 +376,7 @@ size_t CurlClient::WriteCallback(const char* data, auto* context = static_cast(userp); if (context->is_shutting_down()) { - return 0; // Abort the transfer + return 0; // Abort the transfer } // Set up the event receiver callback for the parser @@ -388,7 +388,7 @@ size_t CurlClient::WriteCallback(const char* data, context->receive(std::move(event)); }); - const std::string_view data_view(data, total_size); + std::string_view const data_view(data, total_size); context->parser_reader->put(data_view); return total_size; @@ -397,15 +397,15 @@ size_t CurlClient::WriteCallback(const char* data, // Callback for reading request headers // // https://curl.se/libcurl/c/CURLOPT_HEADERFUNCTION.html -size_t CurlClient::HeaderCallback(const char* buffer, +size_t CurlClient::HeaderCallback(char const* buffer, size_t size, size_t nitems, void* userdata) { - const size_t total_size = size * nitems; + size_t const total_size = size * nitems; auto* context = static_cast(userdata); // Check for Content-Type header - if (const std::string header(buffer, total_size); + if (std::string const header(buffer, total_size); header.find("Content-Type:") == 0 || header.find("content-type:") == 0) { if (header.find("text/event-stream") == std::string::npos) { @@ -419,15 +419,15 @@ size_t CurlClient::HeaderCallback(const char* buffer, void CurlClient::PerformRequestWithMulti( std::shared_ptr multi_manager, std::shared_ptr context) { - if (context->is_shutting_down()) { return; } - // Initialize parser for new connection (last_event_id is tracked separately) + // Initialize parser for new connection (last_event_id is tracked + // separately) context->init_parser(); - std::shared_ptrcurl (curl_easy_init(), curl_easy_cleanup); + std::shared_ptr curl(curl_easy_init(), curl_easy_cleanup); if (!curl) { if (context->is_shutting_down()) { return; @@ -439,7 +439,8 @@ void CurlClient::PerformRequestWithMulti( curl_slist* headers = nullptr; if (!SetupCurlOptions(curl.get(), &headers, *context)) { - // setup_curl_options returned false, indicating an error (it already logged the error) + // setup_curl_options returned false, indicating an error (it already + // logged the error) if (context->is_shutting_down()) { return; @@ -452,112 +453,123 @@ void CurlClient::PerformRequestWithMulti( // Add handle to multi manager for async processing // Headers will be freed automatically by CurlMultiManager std::weak_ptr weak_context = context; - multi_manager->add_handle(curl, headers, [weak_context](std::shared_ptr easy, CurlMultiManager::Result result) { - auto context = weak_context.lock(); - if (!context) { - return; - } - - // Check if this was a read timeout from the multi manager - if (result.type == CurlMultiManager::Result::Type::ReadTimeout) { - if (!context->is_shutting_down()) { - context->error(errors::ReadTimeout{context->read_timeout}); - context->backoff("read timeout - no data received"); + multi_manager->add_handle( + curl, headers, + [weak_context](std::shared_ptr easy, + CurlMultiManager::Result result) { + auto context = weak_context.lock(); + if (!context) { + return; } - return; - } - // Handle CURLcode result - CURLcode res = result.curl_code; - - // Get response code - long response_code = 0; - curl_easy_getinfo(easy.get(), CURLINFO_RESPONSE_CODE, &response_code); - - // Handle HTTP status codes - auto status = static_cast(response_code); - auto status_class = http::to_status_class(status); + // Check if this was a read timeout from the multi manager + if (result.type == CurlMultiManager::Result::Type::ReadTimeout) { + if (!context->is_shutting_down()) { + context->error(errors::ReadTimeout{context->read_timeout}); + context->backoff("read timeout - no data received"); + } + return; + } + // Handle CURLcode result + CURLcode res = result.curl_code; - if (context->is_shutting_down()) { - return; - } + // Get response code + long response_code = 0; + curl_easy_getinfo(easy.get(), CURLINFO_RESPONSE_CODE, + &response_code); - if (status_class == http::status_class::redirection) { - // The internal CURL handling of redirects failed. - // This situation is likely the result of a missing redirect header - // or empty header. - context->error(errors::NotRedirectable{}); - return; - } + // Handle HTTP status codes + auto status = static_cast(response_code); + auto status_class = http::to_status_class(status); - // Handle result - if (res != CURLE_OK) { if (context->is_shutting_down()) { return; } - // Check if the error was due to progress callback aborting (read timeout) - if (res == CURLE_ABORTED_BY_CALLBACK && context->read_timeout) { - context->error(errors::ReadTimeout{context->read_timeout}); - context->backoff("aborting read of response body (timeout)"); - } else { - std::string error_msg = "CURL error: " + std::string(curl_easy_strerror(res)); - context->backoff(error_msg); + if (status_class == http::status_class::redirection) { + // The internal CURL handling of redirects failed. + // This situation is likely the result of a missing redirect + // header or empty header. + context->error(errors::NotRedirectable{}); + return; } - return; - } + // Handle result + if (res != CURLE_OK) { + if (context->is_shutting_down()) { + return; + } - if (status_class == http::status_class::successful) { - if (status == http::status::no_content) { - if (!context->is_shutting_down()) { - context->error(errors::UnrecoverableClientError{http::status::no_content}); + // Check if the error was due to progress callback aborting + // (read timeout) + if (res == CURLE_ABORTED_BY_CALLBACK && context->read_timeout) { + context->error(errors::ReadTimeout{context->read_timeout}); + context->backoff( + "aborting read of response body (timeout)"); + } else { + std::string error_msg = + "CURL error: " + std::string(curl_easy_strerror(res)); + context->backoff(error_msg); } + return; } - context->reset_backoff(); - // Connection ended normally, reconnect - if (!context->is_shutting_down()) { - context->backoff("connection closed normally"); + + if (status_class == http::status_class::successful) { + if (status == http::status::no_content) { + if (!context->is_shutting_down()) { + context->error(errors::UnrecoverableClientError{ + http::status::no_content}); + } + return; + } + context->reset_backoff(); + // Connection ended normally, reconnect + if (!context->is_shutting_down()) { + context->backoff("connection closed normally"); + } + return; } - return; - } - if (status_class == http::status_class::client_error) { - if (!context->is_shutting_down()) { - bool recoverable = (status == http::status::bad_request || - status == http::status::request_timeout || - status == http::status::too_many_requests); - - if (recoverable) { - std::stringstream ss; - ss << "HTTP status " << static_cast(status); - context->backoff(ss.str()); - } else { - context->error(errors::UnrecoverableClientError{status}); + if (status_class == http::status_class::client_error) { + if (!context->is_shutting_down()) { + bool recoverable = + (status == http::status::bad_request || + status == http::status::request_timeout || + status == http::status::too_many_requests); + + if (recoverable) { + std::stringstream ss; + ss << "HTTP status " << static_cast(status); + context->backoff(ss.str()); + } else { + context->error( + errors::UnrecoverableClientError{status}); + } } + return; } - return; - } - // Server error or other - backoff and retry - if (!context->is_shutting_down()) { - std::stringstream ss; - ss << "HTTP status " << static_cast(status); - context->backoff(ss.str()); - } - }, context->read_timeout); + // Server error or other - backoff and retry + if (!context->is_shutting_down()) { + std::stringstream ss; + ss << "HTTP status " << static_cast(status); + context->backoff(ss.str()); + } + }, + context->read_timeout); } void CurlClient::async_shutdown(std::function completion) { - boost::asio::post(backoff_timer_.get_executor(), [self = shared_from_this(), - completion = std::move(completion)]() { - self->do_shutdown(completion); - }); + boost::asio::post( + backoff_timer_.get_executor(), + [self = shared_from_this(), completion = std::move(completion)]() { + self->do_shutdown(completion); + }); } -void CurlClient::do_shutdown(const std::function& completion) { +void CurlClient::do_shutdown(std::function const& completion) { request_context_->shutdown(); backoff_timer_.cancel(); @@ -572,11 +584,10 @@ void CurlClient::log_message(std::string const& message) { } void CurlClient::report_error(Error error) { - boost::asio::post(backoff_timer_.get_executor(), - [errors = errors_, error = std::move(error)]() { - errors(error); - }); + boost::asio::post( + backoff_timer_.get_executor(), + [errors = errors_, error = std::move(error)]() { errors(error); }); } -} // namespace launchdarkly::sse +} // namespace launchdarkly::sse #endif // LD_CURL_NETWORKING \ No newline at end of file diff --git a/libs/server-sent-events/src/curl_client.hpp b/libs/server-sent-events/src/curl_client.hpp index e210791d1..f0df1bb02 100644 --- a/libs/server-sent-events/src/curl_client.hpp +++ b/libs/server-sent-events/src/curl_client.hpp @@ -2,8 +2,8 @@ #ifdef LD_CURL_NETWORKING -#include #include +#include #include "backoff.hpp" #include "parser.hpp" @@ -38,11 +38,11 @@ using launchdarkly::network::CurlMultiManager; class CurlClient final : public Client, public std::enable_shared_from_this { /** - * Structure containing callbacks between the CURL interactions and the - * IO executor. Callbacks are set while a connection is being established, - * instead of at construction time, to allow the use of weak_from_self. - * The weak_from_self method cannot be used during the constructor. - */ + * Structure containing callbacks between the CURL interactions and the + * IO executor. Callbacks are set while a connection is being established, + * instead of at construction time, to allow the use of weak_from_self. + * The weak_from_self method cannot be used during the constructor. + */ struct Callbacks { std::function do_backoff; std::function on_receive; @@ -50,31 +50,28 @@ class CurlClient final : public Client, std::function reset_backoff; std::function log_message; - Callbacks( - std::function do_backoff, - std::function on_receive, - std::function on_error, - std::function reset_backoff, - std::function log_message - ) : - do_backoff(std::move(do_backoff)), - on_receive(std::move(on_receive)), - on_error(std::move(on_error)), - reset_backoff(std::move(reset_backoff)), - log_message(std::move(log_message)) { - } + Callbacks(std::function do_backoff, + std::function on_receive, + std::function on_error, + std::function reset_backoff, + std::function log_message) + : do_backoff(std::move(do_backoff)), + on_receive(std::move(on_receive)), + on_error(std::move(on_error)), + reset_backoff(std::move(reset_backoff)), + log_message(std::move(log_message)) {} }; /** - * The request context represents the state required by the executing CURL - * request. Not directly including the shared data in the CurlClient allows - * for easy separation of its lifetime from that of the CURL client. This - * facilitates destruction of the CurlClient being used to stop in-progress - * requests. - * - * The CURL client can be destructed and pending tasks will still - * have a valid RequestContext and will detect the shutdown. - */ + * The request context represents the state required by the executing CURL + * request. Not directly including the shared data in the CurlClient allows + * for easy separation of its lifetime from that of the CURL client. This + * facilitates destruction of the CurlClient being used to stop in-progress + * requests. + * + * The CURL client can be destructed and pending tasks will still + * have a valid RequestContext and will detect the shutdown. + */ class RequestContext { // Only items used by both the curl thread and the executor/main // thread need to be mutex protected. @@ -84,7 +81,7 @@ class CurlClient final : public Client, // End mutex protected items. std::optional callbacks_; - public: + public: // SSE parser using common parser from parser.hpp using ParserBody = detail::EventBody>; std::unique_ptr parser_body; @@ -97,16 +94,21 @@ class CurlClient final : public Client, std::chrono::steady_clock::time_point last_progress_time; curl_off_t last_download_amount; - const http::request req; - const std::string url; - const std::optional connect_timeout; - const std::optional read_timeout; - const std::optional write_timeout; - const std::optional custom_ca_file; - const std::optional proxy_url; - const bool skip_verify_peer; - - void backoff(const std::string& message) { + // Mutated on the strand in do_run() before each transfer, and read by + // libcurl via raw pointers (CURLOPT_URL, CURLOPT_POSTFIELDS) for the + // duration of the transfer. Safe because the next do_run() only fires + // after the previous transfer's completion callback, so reads and + // writes never overlap. + http::request req; + std::string url; + std::optional const connect_timeout; + std::optional const read_timeout; + std::optional const write_timeout; + std::optional const custom_ca_file; + std::optional const proxy_url; + bool const skip_verify_peer; + + void backoff(std::string const& message) { std::lock_guard lock(mutex_); if (shutting_down_) { return; @@ -116,7 +118,7 @@ class CurlClient final : public Client, } } - void error(const Error& error) { + void error(Error const& error) { std::lock_guard lock(mutex_); if (shutting_down_) { return; @@ -126,7 +128,7 @@ class CurlClient final : public Client, } } - void receive(const Event& event) { + void receive(Event const& event) { std::lock_guard lock(mutex_); if (shutting_down_) { return; @@ -146,7 +148,7 @@ class CurlClient final : public Client, } } - void log_message(const std::string& message) { + void log_message(std::string const& message) { std::lock_guard lock(mutex_); if (shutting_down_) { return; @@ -161,9 +163,7 @@ class CurlClient final : public Client, callbacks_ = std::move(callbacks); } - bool is_shutting_down() { - return shutting_down_; - } + bool is_shutting_down() { return shutting_down_; } void set_curl_socket(curl_socket_t curl_socket) { std::lock_guard lock(mutex_); @@ -197,7 +197,6 @@ class CurlClient final : public Client, } } - RequestContext(std::string url, http::request req, std::optional connect_timeout, @@ -205,28 +204,28 @@ class CurlClient final : public Client, std::optional write_timeout, std::optional custom_ca_file, std::optional proxy_url, - bool skip_verify_peer - ) : shutting_down_(false), - curl_socket_(CURL_SOCKET_BAD), - last_download_amount(0), - req(std::move(req)), - url(std::move(url)), - connect_timeout(connect_timeout), - read_timeout(read_timeout), - write_timeout(write_timeout), - custom_ca_file(std::move(custom_ca_file)), - proxy_url(std::move(proxy_url)), - skip_verify_peer(skip_verify_peer) { - } + bool skip_verify_peer) + : shutting_down_(false), + curl_socket_(CURL_SOCKET_BAD), + last_download_amount(0), + req(std::move(req)), + url(std::move(url)), + connect_timeout(connect_timeout), + read_timeout(read_timeout), + write_timeout(write_timeout), + custom_ca_file(std::move(custom_ca_file)), + proxy_url(std::move(proxy_url)), + skip_verify_peer(skip_verify_peer) {} void init_parser() { parser_body = std::make_unique(); - parser_reader = std::make_unique(*parser_body); + parser_reader = + std::make_unique(*parser_body); parser_reader->init(); } }; -public: + public: CurlClient(boost::asio::any_io_executor executor, http::request req, std::string host, @@ -238,6 +237,7 @@ class CurlClient final : public Client, Builder::EventReceiver receiver, Builder::LogCallback logger, Builder::ErrorCallback errors, + Builder::ConnectionHook connection_hook, bool skip_verify_peer, std::optional custom_ca_file, bool use_https, @@ -249,32 +249,32 @@ class CurlClient final : public Client, void async_shutdown(std::function completion) override; void async_restart(std::string const& reason) override; -private: + private: void do_run(); - void do_shutdown(const std::function& completion); + void do_shutdown(std::function const& completion); void async_backoff(std::string const& reason); void on_backoff(boost::system::error_code const& ec); static void PerformRequestWithMulti( std::shared_ptr multi_manager, std::shared_ptr context); - static size_t WriteCallback(const char* data, + static size_t WriteCallback(char const* data, size_t size, size_t nmemb, void* userp); - static size_t HeaderCallback(const char* buffer, + static size_t HeaderCallback(char const* buffer, size_t size, size_t nitems, void* userdata); - static curl_socket_t OpenSocketCallback(void* clientp, - curlsocktype purpose, - const struct curl_sockaddr* - address); + static curl_socket_t OpenSocketCallback( + void* clientp, + curlsocktype purpose, + const struct curl_sockaddr* address); void log_message(std::string const& message); void report_error(Error error); - std::string build_url(const http::request& req) const; + std::string build_url(http::request const& req) const; static bool SetupCurlOptions(CURL* curl, curl_slist** headers, RequestContext& context); @@ -293,6 +293,7 @@ class CurlClient final : public Client, Builder::EventReceiver event_receiver_; Builder::LogCallback logger_; Builder::ErrorCallback errors_; + Builder::ConnectionHook connection_hook_; bool use_https_; boost::asio::steady_timer backoff_timer_; @@ -300,6 +301,6 @@ class CurlClient final : public Client, Backoff backoff_; }; -} // namespace launchdarkly::sse +} // namespace launchdarkly::sse #endif // LD_CURL_NETWORKING \ No newline at end of file diff --git a/libs/server-sent-events/tests/client_test.cpp b/libs/server-sent-events/tests/client_test.cpp new file mode 100644 index 000000000..9fa1c692e --- /dev/null +++ b/libs/server-sent-events/tests/client_test.cpp @@ -0,0 +1,1506 @@ + +#include +#include + +#include + +#include "mock_sse_server.hpp" + +#include + +#include +#include +#include +#include +#include + +using namespace launchdarkly::sse; +using namespace launchdarkly::sse::test; +using namespace std::chrono_literals; + +namespace { + +// C++17-compatible latch replacement +// https://en.cppreference.com/w/cpp/thread/latch.html +class SimpleLatch { + public: + explicit SimpleLatch(std::size_t const count) : count_(count) {} + + void count_down() { + std::lock_guard lock(mutex_); + if (count_ > 0) { + --count_; + } + cv_.notify_all(); + } + + template + bool wait_for(std::chrono::duration timeout) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this] { return count_ == 0; }); + } + + private: + std::mutex mutex_; + std::condition_variable cv_; + std::size_t count_; +}; + +// Helper to synchronize event reception in tests +class EventCollector { + public: + void add_event(Event event) { + std::lock_guard lock(mutex_); + events_.push_back(std::move(event)); + cv_.notify_all(); + } + + void add_error(Error error) { + std::lock_guard lock(mutex_); + errors_.push_back(std::move(error)); + cv_.notify_all(); + } + + bool wait_for_events(size_t count, + std::chrono::milliseconds timeout = 5000ms) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, + [&] { return events_.size() >= count; }); + } + + bool wait_for_errors(size_t count, + std::chrono::milliseconds timeout = 5000ms) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, + [&] { return errors_.size() >= count; }); + } + + std::vector events() const { + std::lock_guard lock(mutex_); + return events_; + } + + std::vector errors() const { + std::lock_guard lock(mutex_); + return errors_; + } + + private: + mutable std::mutex mutex_; + std::condition_variable cv_; + std::vector events_; + std::vector errors_; +}; + +// Helper to run io_context in background thread +class IoContextRunner { + public: + IoContextRunner() : work_guard_(boost::asio::make_work_guard(ioc_)) { + thread_ = std::thread([this] { ioc_.run(); }); + } + + ~IoContextRunner() { + work_guard_.reset(); + ioc_.stop(); + if (thread_.joinable()) { + thread_.join(); + } + } + + boost::asio::io_context& context() { return ioc_; } + + private: + boost::asio::io_context ioc_; + boost::asio::executor_work_guard + work_guard_; + std::thread thread_; +}; + +} // namespace + +// Basic connectivity tests + +TEST(ClientTest, ConnectsToHttpServer) { + MockSSEServer server; + auto port = server.start(TestHandlers::simple_event("hello world")); + + // Give server a moment to start accepting connections + std::this_thread::sleep_for(100ms); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + auto events = collector.events(); + ASSERT_EQ(1, events.size()); + EXPECT_EQ("hello world", events[0].data()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, HandlesMultipleEvents) { + MockSSEServer server; + auto port = server.start( + TestHandlers::multiple_events({"event1", "event2", "event3"})); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(3)); + auto events = collector.events(); + ASSERT_EQ(3, events.size()); + EXPECT_EQ("event1", events[0].data()); + EXPECT_EQ("event2", events[1].data()); + EXPECT_EQ("event3", events[2].data()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +// SSE parsing tests + +TEST(ClientTest, ParsesEventWithType) { + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("test data", "custom-type")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + auto events = collector.events(); + ASSERT_EQ(1, events.size()); + EXPECT_EQ("test data", events[0].data()); + EXPECT_EQ("custom-type", events[0].type()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, ParsesEventWithId) { + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("test data", "", "event-123")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + auto events = collector.events(); + ASSERT_EQ(1, events.size()); + EXPECT_EQ("test data", events[0].data()); + EXPECT_EQ("event-123", events[0].id()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, ParsesMultiLineData) { + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("line1\nline2\nline3")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + auto events = collector.events(); + ASSERT_EQ(1, events.size()); + EXPECT_EQ("line1\nline2\nline3", events[0].data()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, HandlesComments) { + GTEST_SKIP() + << "Comment filtering is not yet implemented in the SSE parser"; + + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + // Send a comment (should be ignored) + send_sse_event(SSEFormatter::comment("this is a comment")); + // Send an actual event + send_sse_event(SSEFormatter::event("real data")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + auto events = collector.events(); + // Should only receive the real event, not the comment + ASSERT_EQ(1, events.size()); + EXPECT_EQ("real data", events[0].data()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +// HTTP method tests + +TEST(ClientTest, SupportsPostMethod) { + MockSSEServer server; + std::string received_method; + + auto port = server.start([&](auto const& req, auto send_response, + auto send_sse_event, auto close) { + received_method = std::string(req.method_string()); + + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("response")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .method(http::verb::post) + .body("test body") + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + EXPECT_EQ("POST", received_method); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, SupportsReportMethod) { + MockSSEServer server; + std::string received_method; + + auto port = server.start([&](auto const& req, auto send_response, + auto send_sse_event, auto close) { + received_method = std::string(req.method_string()); + + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("response")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .method(http::verb::report) + .body("test body") + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + EXPECT_EQ("REPORT", received_method); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +// HTTP header tests + +TEST(ClientTest, SendsCustomHeaders) { + MockSSEServer server; + std::string custom_header_value; + + auto port = server.start([&](auto const& req, auto send_response, + auto send_sse_event, auto close) { + auto it = req.find("X-Custom-Header"); + if (it != req.end()) { + custom_header_value = std::string(it->value()); + } + + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("response")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .header("X-Custom-Header", "custom-value") + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + EXPECT_EQ("custom-value", custom_header_value); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +// HTTP status code tests + +TEST(ClientTest, Handles404Error) { + MockSSEServer server; + auto port = server.start(TestHandlers::http_error(http::status::not_found)); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .errors([&](Error e) { collector.add_error(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_errors(1)); + auto errors = collector.errors(); + ASSERT_GE(errors.size(), 1); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, Handles500Error) { + // 500 errors are treated as transient server errors and should trigger + // backoff/retry behavior, not error callbacks. This is correct SSE client + // behavior. + std::atomic connection_attempts{0}; + + auto handler = [&](auto const&, auto send_response, auto, auto) { + ++connection_attempts; + http::response res{ + http::status::internal_server_error, 11}; + res.body() = "Error"; + res.prepare_payload(); + send_response(res); + }; + + MockSSEServer server; + auto port = server.start(handler); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .errors([&](Error e) { collector.add_error(std::move(e)); }) + .initial_reconnect_delay(50ms) // Short delay for test + .build(); + + client->async_connect(); + + // Should NOT receive error callbacks - should retry instead + // Wait a bit to let multiple reconnection attempts happen + std::this_thread::sleep_for(300ms); + + // Verify that multiple reconnection attempts occurred (backoff/retry + // behavior) + EXPECT_GE(connection_attempts.load(), 2); + + // Verify no error callbacks were invoked (5xx are not reported as errors) + EXPECT_EQ(0, collector.errors().size()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +// Redirect tests + +TEST(ClientTest, FollowsRedirects) { + MockSSEServer redirect_server; + MockSSEServer target_server; + + auto target_port = + target_server.start(TestHandlers::simple_event("redirected")); + auto redirect_port = redirect_server.start(TestHandlers::redirect( + "http://localhost:" + std::to_string(target_port) + "/")); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(redirect_port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + auto events = collector.events(); + ASSERT_EQ(1, events.size()); + EXPECT_EQ("redirected", events[0].data()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +// Connection lifecycle tests + +TEST(ClientTest, ShutdownStopsClient) { + MockSSEServer server; + auto port = server.start([](auto const&, auto send_response, + auto send_sse_event, auto) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + // Keep sending events forever (until connection closes) + for (int i = 0; i < 1000; i++) { + send_sse_event(SSEFormatter::event("event " + std::to_string(i))); + std::this_thread::sleep_for(10ms); + } + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + // Wait for at least one event + ASSERT_TRUE(collector.wait_for_events(1)); + + // Shutdown should complete quickly + auto shutdown_start = std::chrono::steady_clock::now(); + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); + auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; + + // Shutdown should complete in reasonable time (less than 2 seconds) + EXPECT_LT(shutdown_duration, 2000ms); +} + +TEST(ClientTest, CanShutdownBeforeConnection) { + MockSSEServer server; + auto port = server.start(TestHandlers::simple_event("test")); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + // Shutdown immediately without connecting + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, HandlesImmediateClose) { + // Immediate connection close is treated as a transient network error and + // should trigger backoff/retry behavior, not error callbacks. This is + // correct SSE client behavior. + std::atomic connection_attempts{0}; + + auto handler = [&](auto const&, auto, auto, auto close) { + ++connection_attempts; + close(); // Immediately close without sending headers + }; + + MockSSEServer server; + auto port = server.start(handler); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .errors([&](Error e) { collector.add_error(std::move(e)); }) + .initial_reconnect_delay(50ms) // Short delay for test + .build(); + + client->async_connect(); + + // Should NOT receive error callbacks - should retry instead + // Wait a bit to let multiple reconnection attempts happen + std::this_thread::sleep_for(300ms); + + // Verify that multiple reconnection attempts occurred (backoff/retry + // behavior) + EXPECT_GE(connection_attempts.load(), 2); + + // Verify no error callbacks were invoked (connection errors trigger retry) + EXPECT_EQ(0, collector.errors().size()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +// Timeout tests + +TEST(ClientTest, RespectsReadTimeout) { + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + // Send one event + send_sse_event(SSEFormatter::event("first")); + + // Then wait longer than read timeout without sending anything + std::this_thread::sleep_for(5000ms); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .errors([&](Error e) { + std::cerr << "Error" << e.index() << std::endl; + collector.add_error(std::move(e)); + }) + .logger([&](std::string const& message) { + std::cerr << "log_message" << message << std::endl; + }) + .read_timeout(500ms) // Short timeout for test + .initial_reconnect_delay(50ms) + .build(); + + client->async_connect(); + + // Should receive the first event + ASSERT_TRUE(collector.wait_for_events(1, 100ms)); + + // Then should get a timeout error + ASSERT_TRUE(collector.wait_for_errors(1, 1000ms)); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, DestructorCleansUpProperly) { + { + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + // Keep sending events + for (int i = 0; i < 100; i++) { + send_sse_event( + SSEFormatter::event("event " + std::to_string(i))); + std::this_thread::sleep_for(10ms); + } + }); + EventCollector collector; + IoContextRunner runner; + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + ASSERT_TRUE(collector.wait_for_events(1)); + + // Let destructor run without explicit shutdown + } + + // If destructor doesn't properly clean up, this could hang or crash + // Test passing indicates proper cleanup in destructor +} + +TEST(ClientTest, HandlesEmptyEventData) { + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + auto events = collector.events(); + ASSERT_EQ(1, events.size()); + EXPECT_EQ("", events[0].data()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, HandlesEventWithOnlyType) { + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + // Send event with type but empty data + send_sse_event("event: heartbeat\ndata: \n\n"); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(1)); + auto events = collector.events(); + ASSERT_EQ(1, events.size()); + EXPECT_EQ("heartbeat", events[0].type()); + EXPECT_EQ("", events[0].data()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, HandlesRapidEvents) { + MockSSEServer server; + constexpr int num_events = 100; + + // num_events needs to be captured for MSVC. + auto port = server.start([num_events](auto const&, auto send_response, + auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + // Send many events rapidly + for (int i = 0; i < num_events; i++) { + send_sse_event(SSEFormatter::event("event" + std::to_string(i))); + } + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + ASSERT_TRUE(collector.wait_for_events(num_events, 10000ms)); + auto events = collector.events(); + EXPECT_EQ(num_events, events.size()); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, ShutdownDuringBackoffDelay) { + // This ensures clean shutdown during backoff/retry wait period + std::atomic connection_attempts{0}; + + auto handler = [&](auto const&, auto send_response, auto, auto) { + ++connection_attempts; + // Return 500 to trigger backoff + http::response res{ + http::status::internal_server_error, 11}; + res.body() = "Error"; + res.prepare_payload(); + send_response(res); + }; + + MockSSEServer server; + auto port = server.start(handler); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .initial_reconnect_delay( + 2000ms) // Long delay to ensure we shutdown during wait + .build(); + + client->async_connect(); + + // Wait for first connection attempt to complete + std::this_thread::sleep_for(200ms); + EXPECT_GE(connection_attempts.load(), 1); + + // Now shutdown while it's waiting in backoff + auto shutdown_start = std::chrono::steady_clock::now(); + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); + auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; + + // Shutdown should complete quickly despite long backoff delay + EXPECT_LT(shutdown_duration, 1000ms); + + // Should NOT have made another connection attempt during backoff + EXPECT_EQ(1, connection_attempts.load()); +} + +TEST(ClientTest, ShutdownDuringDataReception) { + // This covers the branch where we abort during SSE data parsing + SimpleLatch server_sending(1); + SimpleLatch client_received_some(1); + + auto handler = [&](auto const&, auto send_response, auto send_sse_event, + auto) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + // Send events continuously + for (int i = 0; i < 100; i++) { + if (!send_sse_event( + SSEFormatter::event("event " + std::to_string(i)))) { + return; // Connection closed or error - stop sending + } + if (i == 2) { + server_sending.count_down(); + } + std::this_thread::sleep_for( + 10ms); // Slow enough to allow shutdown mid-stream + } + }; + + MockSSEServer server; + auto port = server.start(handler); + + IoContextRunner runner; + // Shared ptr to prevent handling events during destruction. + auto collector = std::make_shared(); + + auto client = Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([collector, &client_received_some](Event e) { + collector->add_event(std::move(e)); + if (collector->events().size() >= 2) { + client_received_some.count_down(); + } + }) + .build(); + + client->async_connect(); + + // Wait until server is sending and client has received some events + ASSERT_TRUE(server_sending.wait_for(5000ms)); + ASSERT_TRUE(client_received_some.wait_for(5000ms)); + + // Shutdown while WriteCallback is actively processing data + auto shutdown_start = std::chrono::steady_clock::now(); + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); + auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; + + // Shutdown should complete quickly even during active data transfer + EXPECT_LT(shutdown_duration, 2000ms); +} + +TEST(ClientTest, ShutdownDuringProgressCallback) { + // This ensures we can abort during slow data transfer + SimpleLatch server_started(1); + + auto handler = [&](auto const&, auto send_response, auto send_sse_event, + auto) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + server_started.count_down(); + + // Send one event then pause (simulating slow connection) + send_sse_event(SSEFormatter::event("first")); + std::this_thread::sleep_for( + 5000ms); // Pause to simulate slow connection + }; + + MockSSEServer server; + auto port = server.start(handler); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .read_timeout(10000ms) // Long timeout so ProgressCallback is + // called but doesn't abort + .build(); + + client->async_connect(); + + // Wait for first event and server pause + ASSERT_TRUE(server_started.wait_for(5000ms)); + ASSERT_TRUE(collector.wait_for_events(1, 5000ms)); + + // Shutdown while ProgressCallback is being invoked during the pause + auto shutdown_start = std::chrono::steady_clock::now(); + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); + auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; + + // Shutdown should abort the transfer quickly + EXPECT_LT(shutdown_duration, 2000ms); +} + +TEST(ClientTest, MultipleShutdownCalls) { + // Ensures multiple shutdown calls don't cause issues (idempotency test) + MockSSEServer server; + auto port = server.start(TestHandlers::simple_event("test")); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + ASSERT_TRUE(collector.wait_for_events(1)); + + // Call shutdown multiple times in rapid succession + SimpleLatch shutdown_latch1(1); + SimpleLatch shutdown_latch2(1); + SimpleLatch shutdown_latch3(1); + + client->async_shutdown([&] { shutdown_latch1.count_down(); }); + client->async_shutdown([&] { shutdown_latch2.count_down(); }); + client->async_shutdown([&] { shutdown_latch3.count_down(); }); + + // All shutdown completions should be called + EXPECT_TRUE(shutdown_latch1.wait_for(5000ms)); + EXPECT_TRUE(shutdown_latch2.wait_for(5000ms)); + EXPECT_TRUE(shutdown_latch3.wait_for(5000ms)); +} + +TEST(ClientTest, ShutdownAfterConnectionClosed) { + // Tests shutdown when connection has already ended naturally + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("only event")); + std::this_thread::sleep_for(10ms); + close(); // Server closes connection + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .initial_reconnect_delay( + 500ms) // Will try to reconnect after close + .build(); + + client->async_connect(); + ASSERT_TRUE(collector.wait_for_events(1)); + + // Wait for connection to close and reconnect attempt to start + std::this_thread::sleep_for(200ms); + + // Shutdown after natural connection close + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, ShutdownDuringConnectionAttempt) { + // Server that delays before responding to test shutdown during connection + // phase + SimpleLatch connection_started(1); + + auto handler = [&](auto const&, auto send_response, auto send_sse_event, + auto close) { + connection_started.count_down(); + // Delay before responding + std::this_thread::sleep_for(500ms); + + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + + send_sse_event(SSEFormatter::event("test")); + std::this_thread::sleep_for(10ms); + close(); + }; + + MockSSEServer server; + auto port = server.start(handler); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .build(); + + client->async_connect(); + + // Wait for connection to start but shutdown before it completes + ASSERT_TRUE(connection_started.wait_for(5000ms)); + std::this_thread::sleep_for( + 50ms); // Give CURL time to start but not finish + + auto shutdown_start = std::chrono::steady_clock::now(); + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); + auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; + + // Shutdown should abort the pending connection quickly + EXPECT_LT(shutdown_duration, 2000ms); + + // Should not have received any events since we shutdown during connection + EXPECT_EQ(0, collector.events().size()); +} + +// on_connect hook tests + +TEST(ClientTest, OnConnectHookInvokedBeforeRequest) { + MockSSEServer server; + auto port = server.start(TestHandlers::simple_event("hello")); + + IoContextRunner runner; + EventCollector collector; + + std::atomic hook_calls{0}; + std::string seen_target; + std::mutex target_mutex; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port) + "/initial-path") + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .on_connect([&](http::request* req) { + ++hook_calls; + std::lock_guard lock(target_mutex); + seen_target = std::string(req->target()); + }) + .build(); + + // Connect and let the hook fire. + client->async_connect(); + + // Verify the hook ran and saw the construction-time target. + ASSERT_TRUE(collector.wait_for_events(1)); + EXPECT_GE(hook_calls.load(), 1); + { + std::lock_guard lock(target_mutex); + EXPECT_EQ("/initial-path", seen_target); + } + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, OnConnectHookCanMutateTarget) { + MockSSEServer server; + std::string seen_target; + std::mutex target_mutex; + + auto port = server.start([&](auto const& req, auto send_response, + auto send_sse_event, auto close) { + { + std::lock_guard lock(target_mutex); + seen_target = std::string(req.target()); + } + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + send_sse_event(SSEFormatter::event("ok")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port) + "/original") + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .on_connect([](http::request* req) { + req->target("/mutated?param=value"); + }) + .build(); + + // Connect; hook overrides the target before the request is sent. + client->async_connect(); + + // Verify the server received the mutated target, not the original URL's + // path. + ASSERT_TRUE(collector.wait_for_events(1)); + { + std::lock_guard lock(target_mutex); + EXPECT_EQ("/mutated?param=value", seen_target); + } + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, OnConnectHookCanMutateHeaders) { + MockSSEServer server; + std::string seen_header; + std::mutex header_mutex; + + auto port = server.start([&](auto const& req, auto send_response, + auto send_sse_event, auto close) { + if (auto it = req.find("X-Hook-Header"); it != req.end()) { + std::lock_guard lock(header_mutex); + seen_header = std::string(it->value()); + } + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + send_sse_event(SSEFormatter::event("ok")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .on_connect([](http::request* req) { + req->set("X-Hook-Header", "from-hook"); + }) + .build(); + + // Connect; hook adds a custom header before the request is sent. + client->async_connect(); + + // Verify the server received the hook-injected header. + ASSERT_TRUE(collector.wait_for_events(1)); + { + std::lock_guard lock(header_mutex); + EXPECT_EQ("from-hook", seen_header); + } + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, OnConnectHookInvokedOnEachReconnect) { + MockSSEServer server; + auto port = server.start( + [](auto const&, auto send_response, auto send_sse_event, auto close) { + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + send_sse_event(SSEFormatter::event("event")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + std::atomic hook_calls{0}; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .initial_reconnect_delay(50ms) + .on_connect( + [&](http::request*) { ++hook_calls; }) + .build(); + + // Connect; the server closes after each event so the client reconnects. + client->async_connect(); + + // Verify the hook fires on every connection attempt, not just the first. + ASSERT_TRUE(collector.wait_for_events(3)); + EXPECT_GE(hook_calls.load(), 3); + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, OnConnectHookSeesPreviousMutations) { + MockSSEServer server; + std::vector seen_targets; + std::mutex targets_mutex; + std::condition_variable targets_cv; + + auto port = server.start([&](auto const& req, auto send_response, + auto send_sse_event, auto close) { + { + std::lock_guard lock(targets_mutex); + seen_targets.push_back(std::string(req.target())); + targets_cv.notify_all(); + } + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + send_sse_event(SSEFormatter::event("event")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port) + "/start") + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .initial_reconnect_delay(50ms) + .on_connect([](http::request* req) { + req->target(std::string(req->target()) + "/x"); + }) + .build(); + + // Connect; on each reconnect the hook appends "/x" to whatever target it + // sees. + client->async_connect(); + + // Verify successive hook invocations see their own previous mutations + // rather than a reset to the construction-time target. + { + std::unique_lock lock(targets_mutex); + ASSERT_TRUE(targets_cv.wait_for( + lock, 5000ms, [&] { return seen_targets.size() >= 3; })); + } + + { + std::lock_guard lock(targets_mutex); + EXPECT_EQ("/start/x", seen_targets[0]); + EXPECT_EQ("/start/x/x", seen_targets[1]); + EXPECT_EQ("/start/x/x/x", seen_targets[2]); + } + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, OnConnectHookCanMutateBody) { + MockSSEServer server; + std::string received_body; + std::string received_content_length; + std::mutex received_mutex; + + auto port = server.start([&](auto const& req, auto send_response, + auto send_sse_event, auto close) { + { + std::lock_guard lock(received_mutex); + received_body = req.body(); + if (auto it = req.find(http::field::content_length); + it != req.end()) { + received_content_length = std::string(it->value()); + } + } + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + send_sse_event(SSEFormatter::event("ok")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + std::string const build_time_body = "short"; + std::string const hook_body = + "this-body-is-much-longer-than-the-build-time-body"; + ASSERT_GT(hook_body.size(), build_time_body.size()); + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .method(http::verb::post) + .body(build_time_body) + .on_connect([&](http::request* req) { + req->body() = hook_body; + }) + .build(); + + // Connect; hook replaces the body with one larger than the build-time body. + client->async_connect(); + + // Verify the server received the full hook body and a matching + // Content-Length, not the stale build-time value. + ASSERT_TRUE(collector.wait_for_events(1)); + { + std::lock_guard lock(received_mutex); + EXPECT_EQ(hook_body, received_body) + << "Server received a truncated body. Content-Length header was '" + << received_content_length << "' (hook set body of size " + << hook_body.size() << ")."; + EXPECT_EQ(std::to_string(hook_body.size()), received_content_length); + } + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} + +TEST(ClientTest, OnConnectHookLastEventIdIsManagedByClient) { + MockSSEServer server; + std::vector> last_event_id_observations; + std::mutex received_mutex; + std::condition_variable received_cv; + + auto port = server.start([&](auto const& req, auto send_response, + auto send_sse_event, auto close) { + { + std::lock_guard lock(received_mutex); + std::size_t count = req.count("Last-Event-ID"); + auto it = req.find("Last-Event-ID"); + std::string value = + (it != req.end()) ? std::string(it->value()) : ""; + last_event_id_observations.emplace_back(count, value); + received_cv.notify_all(); + } + + http::response res{http::status::ok, 11}; + res.set(http::field::content_type, "text/event-stream"); + res.chunked(true); + send_response(res); + send_sse_event(SSEFormatter::event("data", "", "evt-42")); + std::this_thread::sleep_for(10ms); + close(); + }); + + IoContextRunner runner; + EventCollector collector; + + auto client = + Builder(runner.context().get_executor(), + "http://localhost:" + std::to_string(port)) + .receiver([&](Event e) { collector.add_event(std::move(e)); }) + .initial_reconnect_delay(50ms) + .on_connect([](http::request* req) { + req->set("last-event-id", "from-hook"); + }) + .build(); + + // Connect; the server closes after each event so the client reconnects. + client->async_connect(); + + // Verify the documented Last-Event-ID contract: the client manages this + // header and overrides any value set by the hook. On first connect no + // event ID has been seen, so the header should be absent. On reconnect + // the client should send exactly one Last-Event-ID with the most recent + // event's ID, not the hook's value. + { + std::unique_lock lock(received_mutex); + ASSERT_TRUE(received_cv.wait_for(lock, 5000ms, [&] { + return last_event_id_observations.size() >= 2; + })); + } + + { + std::lock_guard lock(received_mutex); + EXPECT_EQ(0u, last_event_id_observations[0].first) + << "first connect should send no Last-Event-ID; got '" + << last_event_id_observations[0].second << "'"; + EXPECT_EQ(1u, last_event_id_observations[1].first) + << "reconnect should send exactly one Last-Event-ID header"; + EXPECT_EQ("evt-42", last_event_id_observations[1].second); + } + + SimpleLatch shutdown_latch(1); + client->async_shutdown([&] { shutdown_latch.count_down(); }); + EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); +} diff --git a/libs/server-sent-events/tests/curl_client_test.cpp b/libs/server-sent-events/tests/curl_client_test.cpp deleted file mode 100644 index 5ac9476c7..000000000 --- a/libs/server-sent-events/tests/curl_client_test.cpp +++ /dev/null @@ -1,1051 +0,0 @@ -#ifdef LD_CURL_NETWORKING - -#include -#include - -#include - -#include "mock_sse_server.hpp" - -#include - -#include -#include -#include -#include -#include - -using namespace launchdarkly::sse; -using namespace launchdarkly::sse::test; -using namespace std::chrono_literals; - -namespace { - -// C++17-compatible latch replacement -// https://en.cppreference.com/w/cpp/thread/latch.html -class SimpleLatch { -public: - explicit SimpleLatch(const std::size_t count) : count_(count) {} - - void count_down() { - std::lock_guard lock(mutex_); - if (count_ > 0) { - --count_; - } - cv_.notify_all(); - } - - template - bool wait_for(std::chrono::duration timeout) { - std::unique_lock lock(mutex_); - return cv_.wait_for(lock, timeout, [this] { return count_ == 0; }); - } - -private: - std::mutex mutex_; - std::condition_variable cv_; - std::size_t count_; -}; - -// Helper to synchronize event reception in tests -class EventCollector { -public: - void add_event(Event event) { - std::lock_guard lock(mutex_); - events_.push_back(std::move(event)); - cv_.notify_all(); - } - - void add_error(Error error) { - std::lock_guard lock(mutex_); - errors_.push_back(std::move(error)); - cv_.notify_all(); - } - - bool wait_for_events(size_t count, std::chrono::milliseconds timeout = 5000ms) { - std::unique_lock lock(mutex_); - return cv_.wait_for(lock, timeout, [&] { return events_.size() >= count; }); - } - - bool wait_for_errors(size_t count, std::chrono::milliseconds timeout = 5000ms) { - std::unique_lock lock(mutex_); - return cv_.wait_for(lock, timeout, [&] { return errors_.size() >= count; }); - } - - std::vector events() const { - std::lock_guard lock(mutex_); - return events_; - } - - std::vector errors() const { - std::lock_guard lock(mutex_); - return errors_; - } - -private: - mutable std::mutex mutex_; - std::condition_variable cv_; - std::vector events_; - std::vector errors_; -}; - -// Helper to run io_context in background thread -class IoContextRunner { -public: - IoContextRunner() : work_guard_(boost::asio::make_work_guard(ioc_)) { - thread_ = std::thread([this] { ioc_.run(); }); - } - - ~IoContextRunner() { - work_guard_.reset(); - ioc_.stop(); - if (thread_.joinable()) { - thread_.join(); - } - } - - boost::asio::io_context& context() { return ioc_; } - -private: - boost::asio::io_context ioc_; - boost::asio::executor_work_guard work_guard_; - std::thread thread_; -}; - -} // namespace - -// Basic connectivity tests - -TEST(CurlClientTest, ConnectsToHttpServer) { - MockSSEServer server; - auto port = server.start(TestHandlers::simple_event("hello world")); - - // Give server a moment to start accepting connections - std::this_thread::sleep_for(100ms); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - auto events = collector.events(); - ASSERT_EQ(1, events.size()); - EXPECT_EQ("hello world", events[0].data()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, HandlesMultipleEvents) { - MockSSEServer server; - auto port = server.start(TestHandlers::multiple_events({"event1", "event2", "event3"})); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(3)); - auto events = collector.events(); - ASSERT_EQ(3, events.size()); - EXPECT_EQ("event1", events[0].data()); - EXPECT_EQ("event2", events[1].data()); - EXPECT_EQ("event3", events[2].data()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -// SSE parsing tests - -TEST(CurlClientTest, ParsesEventWithType) { - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto close) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("test data", "custom-type")); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - auto events = collector.events(); - ASSERT_EQ(1, events.size()); - EXPECT_EQ("test data", events[0].data()); - EXPECT_EQ("custom-type", events[0].type()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, ParsesEventWithId) { - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto close) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("test data", "", "event-123")); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - auto events = collector.events(); - ASSERT_EQ(1, events.size()); - EXPECT_EQ("test data", events[0].data()); - EXPECT_EQ("event-123", events[0].id()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, ParsesMultiLineData) { - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto close) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("line1\nline2\nline3")); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - auto events = collector.events(); - ASSERT_EQ(1, events.size()); - EXPECT_EQ("line1\nline2\nline3", events[0].data()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, HandlesComments) { - GTEST_SKIP() << "Comment filtering is not yet implemented in the SSE parser"; - - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto close) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - // Send a comment (should be ignored) - send_sse_event(SSEFormatter::comment("this is a comment")); - // Send an actual event - send_sse_event(SSEFormatter::event("real data")); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - auto events = collector.events(); - // Should only receive the real event, not the comment - ASSERT_EQ(1, events.size()); - EXPECT_EQ("real data", events[0].data()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -// HTTP method tests - -TEST(CurlClientTest, SupportsPostMethod) { - MockSSEServer server; - std::string received_method; - - auto port = server.start([&](auto const& req, auto send_response, auto send_sse_event, auto close) { - received_method = std::string(req.method_string()); - - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("response")); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .method(http::verb::post) - .body("test body") - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - EXPECT_EQ("POST", received_method); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, SupportsReportMethod) { - MockSSEServer server; - std::string received_method; - - auto port = server.start([&](auto const& req, auto send_response, auto send_sse_event, auto close) { - received_method = std::string(req.method_string()); - - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("response")); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .method(http::verb::report) - .body("test body") - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - EXPECT_EQ("REPORT", received_method); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -// HTTP header tests - -TEST(CurlClientTest, SendsCustomHeaders) { - MockSSEServer server; - std::string custom_header_value; - - auto port = server.start([&](auto const& req, auto send_response, auto send_sse_event, auto close) { - auto it = req.find("X-Custom-Header"); - if (it != req.end()) { - custom_header_value = std::string(it->value()); - } - - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("response")); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .header("X-Custom-Header", "custom-value") - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - EXPECT_EQ("custom-value", custom_header_value); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -// HTTP status code tests - -TEST(CurlClientTest, Handles404Error) { - MockSSEServer server; - auto port = server.start(TestHandlers::http_error(http::status::not_found)); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .errors([&](Error e) { collector.add_error(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_errors(1)); - auto errors = collector.errors(); - ASSERT_GE(errors.size(), 1); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, Handles500Error) { - // 500 errors are treated as transient server errors and should trigger - // backoff/retry behavior, not error callbacks. This is correct SSE client behavior. - std::atomic connection_attempts{0}; - - auto handler = [&](auto const&, auto send_response, auto, auto) { - ++connection_attempts; - http::response res{http::status::internal_server_error, 11}; - res.body() = "Error"; - res.prepare_payload(); - send_response(res); - }; - - MockSSEServer server; - auto port = server.start(handler); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .errors([&](Error e) { collector.add_error(std::move(e)); }) - .initial_reconnect_delay(50ms) // Short delay for test - .build(); - - client->async_connect(); - - // Should NOT receive error callbacks - should retry instead - // Wait a bit to let multiple reconnection attempts happen - std::this_thread::sleep_for(300ms); - - // Verify that multiple reconnection attempts occurred (backoff/retry behavior) - EXPECT_GE(connection_attempts.load(), 2); - - // Verify no error callbacks were invoked (5xx are not reported as errors) - EXPECT_EQ(0, collector.errors().size()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -// Redirect tests - -TEST(CurlClientTest, FollowsRedirects) { - MockSSEServer redirect_server; - MockSSEServer target_server; - - auto target_port = target_server.start(TestHandlers::simple_event("redirected")); - auto redirect_port = redirect_server.start( - TestHandlers::redirect("http://localhost:" + std::to_string(target_port) + "/") - ); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(redirect_port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - auto events = collector.events(); - ASSERT_EQ(1, events.size()); - EXPECT_EQ("redirected", events[0].data()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -// Connection lifecycle tests - -TEST(CurlClientTest, ShutdownStopsClient) { - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - // Keep sending events forever (until connection closes) - for (int i = 0; i < 1000; i++) { - send_sse_event(SSEFormatter::event("event " + std::to_string(i))); - std::this_thread::sleep_for(10ms); - } - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - // Wait for at least one event - ASSERT_TRUE(collector.wait_for_events(1)); - - // Shutdown should complete quickly - auto shutdown_start = std::chrono::steady_clock::now(); - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); - auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; - - // Shutdown should complete in reasonable time (less than 2 seconds) - EXPECT_LT(shutdown_duration, 2000ms); -} - -TEST(CurlClientTest, CanShutdownBeforeConnection) { - MockSSEServer server; - auto port = server.start(TestHandlers::simple_event("test")); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - // Shutdown immediately without connecting - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, HandlesImmediateClose) { - // Immediate connection close is treated as a transient network error and should trigger - // backoff/retry behavior, not error callbacks. This is correct SSE client behavior. - std::atomic connection_attempts{0}; - - auto handler = [&](auto const&, auto, auto, auto close) { - ++connection_attempts; - close(); // Immediately close without sending headers - }; - - MockSSEServer server; - auto port = server.start(handler); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .errors([&](Error e) { collector.add_error(std::move(e)); }) - .initial_reconnect_delay(50ms) // Short delay for test - .build(); - - client->async_connect(); - - // Should NOT receive error callbacks - should retry instead - // Wait a bit to let multiple reconnection attempts happen - std::this_thread::sleep_for(300ms); - - // Verify that multiple reconnection attempts occurred (backoff/retry behavior) - EXPECT_GE(connection_attempts.load(), 2); - - // Verify no error callbacks were invoked (connection errors trigger retry) - EXPECT_EQ(0, collector.errors().size()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -// Timeout tests - -TEST(CurlClientTest, RespectsReadTimeout) { - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - // Send one event - send_sse_event(SSEFormatter::event("first")); - - // Then wait longer than read timeout without sending anything - std::this_thread::sleep_for(5000ms); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .errors([&](Error e) { - std::cerr << "Error" << e.index() << std::endl; - collector.add_error(std::move(e)); - }) - .logger([&](const std::string& message) { - std::cerr << "log_message" << message << std::endl; - }) - .read_timeout(500ms) // Short timeout for test - .initial_reconnect_delay(50ms) - .build(); - - client->async_connect(); - - // Should receive the first event - ASSERT_TRUE(collector.wait_for_events(1, 100ms)); - - // Then should get a timeout error - ASSERT_TRUE(collector.wait_for_errors(1, 1000ms)); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(100ms)); -} - -TEST(CurlClientTest, DestructorCleansUpProperly) { - { - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - // Keep sending events - for (int i = 0; i < 100; i++) { - send_sse_event(SSEFormatter::event("event " + std::to_string(i))); - std::this_thread::sleep_for(10ms); - } - }); - EventCollector collector; - IoContextRunner runner; - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - ASSERT_TRUE(collector.wait_for_events(1)); - - // Let destructor run without explicit shutdown - } - - // If destructor doesn't properly clean up, this could hang or crash - // Test passing indicates proper cleanup in destructor -} - -TEST(CurlClientTest, HandlesEmptyEventData) { - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto close) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("")); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - auto events = collector.events(); - ASSERT_EQ(1, events.size()); - EXPECT_EQ("", events[0].data()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, HandlesEventWithOnlyType) { - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto close) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - // Send event with type but empty data - send_sse_event("event: heartbeat\ndata: \n\n"); - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(1)); - auto events = collector.events(); - ASSERT_EQ(1, events.size()); - EXPECT_EQ("heartbeat", events[0].type()); - EXPECT_EQ("", events[0].data()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, HandlesRapidEvents) { - MockSSEServer server; - constexpr int num_events = 100; - - // num_events needs to be captured for MSVC. - auto port = server.start([num_events](auto const&, auto send_response, auto send_sse_event, auto close) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - // Send many events rapidly - for (int i = 0; i < num_events; i++) { - send_sse_event(SSEFormatter::event("event" + std::to_string(i))); - } - std::this_thread::sleep_for(10ms); - close(); - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - ASSERT_TRUE(collector.wait_for_events(num_events, 10000ms)); - auto events = collector.events(); - EXPECT_EQ(num_events, events.size()); - - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, ShutdownDuringBackoffDelay) { - // This ensures clean shutdown during backoff/retry wait period - std::atomic connection_attempts{0}; - - auto handler = [&](auto const&, auto send_response, auto, auto) { - ++connection_attempts; - // Return 500 to trigger backoff - http::response res{http::status::internal_server_error, 11}; - res.body() = "Error"; - res.prepare_payload(); - send_response(res); - }; - - MockSSEServer server; - auto port = server.start(handler); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .initial_reconnect_delay(2000ms) // Long delay to ensure we shutdown during wait - .build(); - - client->async_connect(); - - // Wait for first connection attempt to complete - std::this_thread::sleep_for(200ms); - EXPECT_GE(connection_attempts.load(), 1); - - // Now shutdown while it's waiting in backoff - auto shutdown_start = std::chrono::steady_clock::now(); - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); - auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; - - // Shutdown should complete quickly despite long backoff delay - EXPECT_LT(shutdown_duration, 1000ms); - - // Should NOT have made another connection attempt during backoff - EXPECT_EQ(1, connection_attempts.load()); -} - -TEST(CurlClientTest, ShutdownDuringDataReception) { - // This covers the branch where we abort during SSE data parsing - SimpleLatch server_sending(1); - SimpleLatch client_received_some(1); - - auto handler = [&](auto const&, auto send_response, auto send_sse_event, auto) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - // Send events continuously - for (int i = 0; i < 100; i++) { - if (!send_sse_event(SSEFormatter::event("event " + std::to_string(i)))) { - return; // Connection closed or error - stop sending - } - if (i == 2) { - server_sending.count_down(); - } - std::this_thread::sleep_for(10ms); // Slow enough to allow shutdown mid-stream - } - }; - - MockSSEServer server; - auto port = server.start(handler); - - IoContextRunner runner; - // Shared ptr to prevent handling events during destruction. - auto collector = std::make_shared(); - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([collector, &client_received_some](Event e) { - collector->add_event(std::move(e)); - if (collector->events().size() >= 2) { - client_received_some.count_down(); - } - }) - .build(); - - client->async_connect(); - - // Wait until server is sending and client has received some events - ASSERT_TRUE(server_sending.wait_for(5000ms)); - ASSERT_TRUE(client_received_some.wait_for(5000ms)); - - // Shutdown while WriteCallback is actively processing data - auto shutdown_start = std::chrono::steady_clock::now(); - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); - auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; - - // Shutdown should complete quickly even during active data transfer - EXPECT_LT(shutdown_duration, 2000ms); -} - -TEST(CurlClientTest, ShutdownDuringProgressCallback) { - // This ensures we can abort during slow data transfer - SimpleLatch server_started(1); - - auto handler = [&](auto const&, auto send_response, auto send_sse_event, auto) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - server_started.count_down(); - - // Send one event then pause (simulating slow connection) - send_sse_event(SSEFormatter::event("first")); - std::this_thread::sleep_for(5000ms); // Pause to simulate slow connection - }; - - MockSSEServer server; - auto port = server.start(handler); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .read_timeout(10000ms) // Long timeout so ProgressCallback is called but doesn't abort - .build(); - - client->async_connect(); - - // Wait for first event and server pause - ASSERT_TRUE(server_started.wait_for(5000ms)); - ASSERT_TRUE(collector.wait_for_events(1, 5000ms)); - - // Shutdown while ProgressCallback is being invoked during the pause - auto shutdown_start = std::chrono::steady_clock::now(); - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); - auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; - - // Shutdown should abort the transfer quickly - EXPECT_LT(shutdown_duration, 2000ms); -} - -TEST(CurlClientTest, MultipleShutdownCalls) { - // Ensures multiple shutdown calls don't cause issues (idempotency test) - MockSSEServer server; - auto port = server.start(TestHandlers::simple_event("test")); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - ASSERT_TRUE(collector.wait_for_events(1)); - - // Call shutdown multiple times in rapid succession - SimpleLatch shutdown_latch1(1); - SimpleLatch shutdown_latch2(1); - SimpleLatch shutdown_latch3(1); - - client->async_shutdown([&] { shutdown_latch1.count_down(); }); - client->async_shutdown([&] { shutdown_latch2.count_down(); }); - client->async_shutdown([&] { shutdown_latch3.count_down(); }); - - // All shutdown completions should be called - EXPECT_TRUE(shutdown_latch1.wait_for(5000ms)); - EXPECT_TRUE(shutdown_latch2.wait_for(5000ms)); - EXPECT_TRUE(shutdown_latch3.wait_for(5000ms)); -} - -TEST(CurlClientTest, ShutdownAfterConnectionClosed) { - // Tests shutdown when connection has already ended naturally - MockSSEServer server; - auto port = server.start([](auto const&, auto send_response, auto send_sse_event, auto close) { - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("only event")); - std::this_thread::sleep_for(10ms); - close(); // Server closes connection - }); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .initial_reconnect_delay(500ms) // Will try to reconnect after close - .build(); - - client->async_connect(); - ASSERT_TRUE(collector.wait_for_events(1)); - - // Wait for connection to close and reconnect attempt to start - std::this_thread::sleep_for(200ms); - - // Shutdown after natural connection close - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); -} - -TEST(CurlClientTest, ShutdownDuringConnectionAttempt) { - // Server that delays before responding to test shutdown during connection phase - SimpleLatch connection_started(1); - - auto handler = [&](auto const&, auto send_response, auto send_sse_event, auto close) { - connection_started.count_down(); - // Delay before responding - std::this_thread::sleep_for(500ms); - - http::response res{http::status::ok, 11}; - res.set(http::field::content_type, "text/event-stream"); - res.chunked(true); - send_response(res); - - send_sse_event(SSEFormatter::event("test")); - std::this_thread::sleep_for(10ms); - close(); - }; - - MockSSEServer server; - auto port = server.start(handler); - - IoContextRunner runner; - EventCollector collector; - - auto client = Builder(runner.context().get_executor(), "http://localhost:" + std::to_string(port)) - .receiver([&](Event e) { collector.add_event(std::move(e)); }) - .build(); - - client->async_connect(); - - // Wait for connection to start but shutdown before it completes - ASSERT_TRUE(connection_started.wait_for(5000ms)); - std::this_thread::sleep_for(50ms); // Give CURL time to start but not finish - - auto shutdown_start = std::chrono::steady_clock::now(); - SimpleLatch shutdown_latch(1); - client->async_shutdown([&] { shutdown_latch.count_down(); }); - EXPECT_TRUE(shutdown_latch.wait_for(5000ms)); - auto shutdown_duration = std::chrono::steady_clock::now() - shutdown_start; - - // Shutdown should abort the pending connection quickly - EXPECT_LT(shutdown_duration, 2000ms); - - // Should not have received any events since we shutdown during connection - EXPECT_EQ(0, collector.events().size()); -} -#endif // LD_CURL_NETWORKING