Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions libs/server-sent-events/include/launchdarkly/sse/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class Builder {
using EventReceiver = std::function<void(Event)>;
using LogCallback = std::function<void(std::string)>;
using ErrorCallback = std::function<void(Error)>;
using ConnectionHook =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course in newer C++ we could make this noexcept without a bunch of hoopla, but not in C++17.

So I am going to suggest we potentially have a try-catch around the connection_hook invocations?

Or we just use strong wording on the signature.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. We don't have a try-catch around LogCallback or ErrorCallback. Are you worried about us accidentally throwing from inside the hook?

What would you wanna do if we did? Mark the connection as immediately failed and generate an error?

std::function<void(http::request<http::string_body>*)>;

/**
* Create a builder for the given URL. If the port is omitted, 443 is
Expand Down Expand Up @@ -161,18 +163,39 @@ class Builder {
* - HTTP proxies: "http://proxy:port"
* - HTTPS proxies: "https://proxy:port"
* - SOCKS4 proxies: "socks4://proxy:port"
* - SOCKS5 proxies: "socks5://proxy:port" or "socks5://user:pass@proxy:port"
* - SOCKS5 proxies: "socks5://proxy:port" or
* "socks5://user:pass@proxy:port"
* - SOCKS5 with DNS through proxy: "socks5h://proxy:port"
*
* Passing an empty string explicitly disables proxy (overrides environment variables).
* Passing std::nullopt (or not calling this method) uses environment variables.
* Passing an empty string explicitly disables proxy (overrides environment
* variables). Passing std::nullopt (or not calling this method) uses
* environment variables.
*
* @param url Proxy URL, empty string to disable, or std::nullopt for environment variables
* @param url Proxy URL, empty string to disable, or std::nullopt for
* environment variables
* @return Reference to this builder.
* @throws std::runtime_error if proxy is configured without CURL networking support
* @throws std::runtime_error if proxy is configured without CURL networking
* support
*/
Builder& proxy(std::optional<std::string> url);

/**
* Register a hook invoked immediately before each connection attempt,
* including the initial connect and every reconnect. The hook receives
* a mutable request seeded with the Builder's defaults; it may modify
* the request's target, headers, method, and body.
*
* Host, port, and scheme are fixed at build time and cannot be changed
* by the hook. The Last-Event-ID header is managed by the client and
* any value the hook sets for it will be replaced before the request
* is sent.
*
* @param hook Callback invoked with a mutable request before each
* connection attempt.
* @return Reference to this builder.
*/
Builder& on_connect(ConnectionHook hook);

/**
* Builds a Client. The shared pointer is necessary to extend the lifetime
* of the Client to encompass each asynchronous operation that it performs.
Expand All @@ -195,6 +218,7 @@ class Builder {
bool skip_verify_peer_;
std::optional<std::string> custom_ca_file_;
std::optional<std::string> proxy_url_;
ConnectionHook connection_hook_;
};

/**
Expand All @@ -213,7 +237,8 @@ class Client {
* when the SDK detects invalid data from the stream and needs to
* reconnect. The backoff mechanism prevents rapid reconnection attempts
* that could overload the service.
* @param reason A description of why the restart was triggered (for logging)
* @param reason A description of why the restart was triggered (for
* logging)
*/
virtual void async_restart(std::string const& reason) = 0;
};
Expand Down
45 changes: 35 additions & 10 deletions libs/server-sent-events/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class FoxyClient : public Client,
Builder::EventReceiver receiver,
Builder::LogCallback logger,
Builder::ErrorCallback errors,
Builder::ConnectionHook connection_hook,
std::optional<net::ssl::context> maybe_ssl)
: ssl_context_(std::move(maybe_ssl)),
host_(std::move(host)),
Expand All @@ -85,6 +86,7 @@ class FoxyClient : public Client,
event_receiver_(std::move(receiver)),
logger_(std::move(logger)),
errors_(std::move(errors)),
connection_hook_(std::move(connection_hook)),
body_parser_(std::nullopt),
session_(std::nullopt),
last_event_id_(std::nullopt),
Expand Down Expand Up @@ -185,6 +187,16 @@ class FoxyClient : public Client,
}

void do_run() {
if (shutting_down_) {
return;
}

if (connection_hook_) {
connection_hook_(&req_);
}
Comment thread
cursor[bot] marked this conversation as resolved.

req_.prepare_payload();

Comment thread
beekld marked this conversation as resolved.
session_->async_connect(
host_, port_,
beast::bind_front_handler(&FoxyClient::on_connect,
Expand Down Expand Up @@ -278,6 +290,9 @@ class FoxyClient : public Client,
return;
}

host_ = new_url->host();
port_ =
new_url->has_port() ? new_url->port() : new_url->scheme();
req_.set(http::field::host, new_url->host());
req_.target(new_url->encoded_target());
} else {
Expand Down Expand Up @@ -496,6 +511,11 @@ class FoxyClient : public Client,
// which the client communicates error conditions to the user.
Builder::ErrorCallback errors_;

// Optional hook invoked before each connection attempt with a mutable
// request, allowing the caller to vary the request per connect (e.g.
// updating query parameters from external state).
Builder::ConnectionHook connection_hook_;

// Customized parser (see parser.hpp) which repeatedly receives chunks of
// data and parses them into SSE events. It cannot be reused across
// connections, hence the optional so it can be destroyed easily.
Expand Down Expand Up @@ -624,6 +644,11 @@ Builder& Builder::proxy(std::optional<std::string> url) {
return *this;
}

Builder& Builder::on_connect(ConnectionHook hook) {
connection_hook_ = std::move(hook);
return *this;
}

std::shared_ptr<Client> Builder::build() {
auto uri_components = boost::urls::parse_uri(url_);
if (!uri_components) {
Expand All @@ -645,12 +670,12 @@ std::shared_ptr<Client> Builder::build() {
}
}

request.prepare_payload();

std::string host = uri_components->host();

request.set(http::field::host, host);
request.target(uri_components->encoded_target());
// RFC 7230: an empty path in origin-form must be sent as "/".
auto target = uri_components->encoded_target();
Comment thread
beekld marked this conversation as resolved.
request.target(target.empty() ? "/" : target);

if (uri_components->has_scheme()) {
if (!(uri_components->scheme_id() == boost::urls::scheme::http ||
Expand All @@ -667,12 +692,12 @@ std::shared_ptr<Client> Builder::build() {
: uri_components->scheme();

#ifdef LD_CURL_NETWORKING
bool use_https = uri_components->scheme_id() == boost::urls::scheme::https;
return std::make_shared<CurlClient>(
net::make_strand(executor_), request, host, service,
connect_timeout_, read_timeout_, write_timeout_,
initial_reconnect_delay_, receiver_, logging_cb_, error_cb_,
skip_verify_peer_, custom_ca_file_, use_https, proxy_url_);
bool use_https = uri_components->scheme_id() == boost::urls::scheme::https;
return std::make_shared<CurlClient>(
net::make_strand(executor_), request, host, service, connect_timeout_,
read_timeout_, write_timeout_, initial_reconnect_delay_, receiver_,
logging_cb_, error_cb_, connection_hook_, skip_verify_peer_,
custom_ca_file_, use_https, proxy_url_);
#else
std::optional<ssl::context> ssl;
if (uri_components->scheme_id() == boost::urls::scheme::https) {
Expand All @@ -694,7 +719,7 @@ std::shared_ptr<Client> Builder::build() {
return std::make_shared<FoxyClient>(
net::make_strand(executor_), request, host, service, connect_timeout_,
read_timeout_, write_timeout_, initial_reconnect_delay_, receiver_,
logging_cb_, error_cb_, std::move(ssl));
logging_cb_, error_cb_, connection_hook_, std::move(ssl));
#endif
}

Expand Down
Loading
Loading