diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index f8fac7fb3..7f27984a4 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -57,6 +57,8 @@ target_sources(${LIBNAME} data_systems/fdv2/polling_initializer.cpp data_systems/fdv2/polling_synchronizer.hpp data_systems/fdv2/polling_synchronizer.cpp + data_systems/fdv2/streaming_synchronizer.hpp + data_systems/fdv2/streaming_synchronizer.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp data_systems/background_sync/sources/streaming/streaming_data_source.cpp data_systems/background_sync/sources/streaming/event_handler.hpp diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp new file mode 100644 index 000000000..894d35846 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -0,0 +1,391 @@ +#include "streaming_synchronizer.hpp" +#include "fdv2_changeset_translation.hpp" + +#include + +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kIdentity = "FDv2 streaming synchronizer"; + +// Maximum time between bytes read from the stream before the SSE client +// declares the connection dead and reconnects. Must be greater than the +// streaming service's heartbeat interval. Hardcoded rather than read from +// HttpProperties, whose default ReadTimeout is sized for one-shot HTTP +// requests and would cause spurious disconnects on a long-lived stream. +static constexpr std::chrono::minutes kDeadConnectionInterval{5}; + +using data_interfaces::FDv2SourceResult; +using ErrorInfo = FDv2SourceResult::ErrorInfo; +using ErrorKind = ErrorInfo::ErrorKind; + +static ErrorInfo MakeError(ErrorKind kind, + ErrorInfo::StatusCodeType status, + std::string message) { + return ErrorInfo{kind, status, std::move(message), + std::chrono::system_clock::now()}; +} + +template +inline constexpr bool always_false_v = false; + +FDv2StreamingSynchronizer::State::State( + Logger logger, + boost::asio::any_io_executor const& executor, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::milliseconds initial_reconnect_delay) + : logger_(std::move(logger)), + endpoints_(endpoints), + http_properties_(http_properties), + filter_key_(std::move(filter_key)), + initial_reconnect_delay_(initial_reconnect_delay), + executor_(executor) {} + +void FDv2StreamingSynchronizer::State::EnsureStarted( + data_model::Selector const& selector, + std::shared_ptr self) { + { + std::lock_guard lock(mutex_); + latest_selector_ = selector; + if (closed_ || started_) { + return; + } + started_ = true; + } + + auto parsed = boost::urls::parse_uri(endpoints_.StreamingBaseUrl()); + if (!parsed) { + // started_ intentionally left true: a bad endpoint URL is a + // configuration error that won't fix itself. The TerminalError + // result tells the orchestrator to stop retrying this synchronizer. + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": could not parse streaming endpoint URL"; + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kNetworkError, 0, + "could not parse streaming endpoint URL"), + false}}); + return; + } + + boost::urls::url u = parsed.value(); + + // Safer way of appending /sdk/stream than string concatenation: avoids + // double slashes if the base URL has a trailing slash. + u.segments().push_back("sdk"); + u.segments().push_back("stream"); + + // basis and filter are not added here — they are appended per-connect by + // the on_connect hook (OnConnect), so that each (re)connection uses the + // freshest selector. + { + std::lock_guard lock(mutex_); + base_url_ = u; + } + + auto client_builder = sse::Builder(executor_, std::string(u.buffer())); + + client_builder.method(boost::beast::http::verb::get); + client_builder.read_timeout(kDeadConnectionInterval); + client_builder.write_timeout(http_properties_.WriteTimeout()); + client_builder.connect_timeout(http_properties_.ConnectTimeout()); + client_builder.initial_reconnect_delay(initial_reconnect_delay_); + + for (auto const& [key, value] : http_properties_.BaseHeaders()) { + client_builder.header(key, value); + } + if (http_properties_.Tls().PeerVerifyMode() == + launchdarkly::config::shared::built::TlsOptions::VerifyMode:: + kVerifyNone) { + client_builder.skip_verify_peer(true); + } + if (auto ca_file = http_properties_.Tls().CustomCAFile()) { + client_builder.custom_ca_file(*ca_file); + } + if (auto proxy_url = http_properties_.Proxy().Url()) { + client_builder.proxy(*proxy_url); + } + + std::weak_ptr weak = self; + client_builder.on_connect([weak](HttpRequest* req) { + if (auto s = weak.lock()) { + s->OnConnect(req); + } + }); + client_builder.receiver([weak](sse::Event const& event) { + if (auto s = weak.lock()) { + s->OnEvent(event); + } + }); + client_builder.logger([weak](std::string msg) { + if (auto s = weak.lock()) { + LD_LOG(s->logger_, LogLevel::kDebug) << msg; + } + }); + client_builder.errors([weak](sse::Error error) { + if (auto s = weak.lock()) { + s->OnError(error); + } + }); + + auto client = client_builder.build(); + if (!client) { + // started_ intentionally left true: same reasoning as above. + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": could not build SSE client"; + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kNetworkError, 0, + "could not build SSE client"), + false}}); + return; + } + + // Publish-and-connect atomically with respect to Shutdown(). If Shutdown + // ran while we were building, drop the client on the floor — async_connect + // was never called, so there is nothing to clean up. + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + sse_client_ = client; + client->async_connect(); +} + +void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) { + std::lock_guard lock(mutex_); + // base_url_ is guaranteed populated: EnsureStarted publishes it before + // calling async_connect, which is what eventually triggers this hook. + boost::urls::url u = *base_url_; + if (latest_selector_.value) { + u.params().set("basis", latest_selector_.value->state); + } + if (filter_key_) { + u.params().set("filter", *filter_key_); + } + req->target(u.encoded_target()); +} + +void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { + boost::system::error_code ec; + auto data = boost::json::parse(event.data(), ec); + if (ec) { + protocol_handler_.Reset(); + std::string msg = "could not parse FDv2 streaming event payload"; + LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, std::move(msg)), false}}); + return; + } + + auto result = protocol_handler_.HandleEvent(event.type(), data); + + std::visit( + [this](auto const& r) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + // Accumulating, heartbeat, or unknown event — nothing to do. + } else if constexpr (std::is_same_v) { + auto typed = TranslateChangeSet(r, logger_); + if (!typed) { + std::string msg = + "FDv2 streaming changeset could not be translated"; + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": " << msg; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, std::move(msg)), + false}}); + return; + } + Notify(FDv2SourceResult{ + FDv2SourceResult::ChangeSet{std::move(*typed), false}}); + } else if constexpr (std::is_same_v) { + LD_LOG(logger_, LogLevel::kInfo) + << kIdentity + << ": Goodbye was received from the LaunchDarkly " + "connection with reason: '" + << r.reason.value_or("") << "'."; + Notify(FDv2SourceResult{ + FDv2SourceResult::Goodbye{r.reason, false}}); + } else if constexpr (std::is_same_v) { + if (r.kind == FDv2ProtocolHandler::Error::Kind::kServerError) { + auto const& id = r.server_error.value().id; + std::string msg = + "An issue was encountered receiving updates for " + "payload '" + + id.value_or("") + "' with reason: '" + r.message + + "'. Automatic retry will occur."; + LD_LOG(logger_, LogLevel::kInfo) + << kIdentity << ": " << msg; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)), + false}}); + return; + } + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": " << r.message; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, r.message), false}}); + } else { + static_assert(always_false_v, "non-exhaustive visitor"); + } + }, + result); +} + +void FDv2StreamingSynchronizer::State::OnError(sse::Error const& error) { + protocol_handler_.Reset(); + + std::string msg = sse::ErrorToString(error); + + if (sse::IsRecoverable(error)) { + LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}}); + return; + } + + LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; + + if (auto const* client_error = + std::get_if(&error)) { + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError( + ErrorKind::kErrorResponse, + static_cast(client_error->status), + std::move(msg)), + false}}); + return; + } + + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}}); +} + +void FDv2StreamingSynchronizer::State::Notify(FDv2SourceResult result) { + std::optional> promise; + { + std::lock_guard lock(mutex_); + if (pending_promise_) { + promise = std::move(pending_promise_); + pending_promise_.reset(); + } else { + result_queue_.push_back(std::move(result)); + return; + } + } + // Resolve outside the lock — Promise::Resolve may invoke inline + // continuations that could call back into Notify or Next. + promise->Resolve(std::move(result)); +} + +async::Future FDv2StreamingSynchronizer::State::Next( + data_model::Selector const& selector, + std::shared_ptr self) { + EnsureStarted(selector, std::move(self)); + + std::lock_guard lock(mutex_); + if (!result_queue_.empty()) { + auto result = std::move(result_queue_.front()); + result_queue_.pop_front(); + return async::MakeFuture(std::move(result)); + } + return pending_promise_.emplace().GetFuture(); +} + +void FDv2StreamingSynchronizer::State::ClearPendingPromise() { + std::lock_guard lock(mutex_); + pending_promise_.reset(); +} + +void FDv2StreamingSynchronizer::State::Shutdown() { + std::shared_ptr client; + { + std::lock_guard lock(mutex_); + closed_ = true; + client = std::exchange(sse_client_, nullptr); + } + if (client) { + client->async_shutdown([] {}); + } +} + +async::Future FDv2StreamingSynchronizer::State::Delay( + std::chrono::milliseconds duration, + async::CancellationToken token) { + return async::Delay(executor_, duration, std::move(token)); +} + +FDv2StreamingSynchronizer::FDv2StreamingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::milliseconds initial_reconnect_delay) + : state_(std::make_shared(logger, + executor, + endpoints, + http_properties, + std::move(filter_key), + initial_reconnect_delay)) {} + +FDv2StreamingSynchronizer::~FDv2StreamingSynchronizer() { + Close(); +} + +async::Future FDv2StreamingSynchronizer::Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) { + auto closed = close_promise_.GetFuture(); + if (closed.IsFinished()) { + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + + auto result_future = state_->Next(selector, state_); + if (result_future.IsFinished()) { + return result_future; + } + + async::CancellationSource cancel; + auto timeout_future = state_->Delay(timeout, cancel.GetToken()); + + return async::WhenAny(closed, timeout_future, result_future) + .Then( + [state = state_, cancel = std::move(cancel), result_future]( + std::size_t const& idx) mutable -> FDv2SourceResult { + cancel.Cancel(); + if (idx == 0) { + state->ClearPendingPromise(); + return FDv2SourceResult{FDv2SourceResult::Shutdown{}}; + } + if (idx == 1) { + state->ClearPendingPromise(); + if (result_future.IsFinished()) { + return *result_future.GetResult(); + } + return FDv2SourceResult{FDv2SourceResult::Timeout{}}; + } + return *result_future.GetResult(); + }, + async::kInlineExecutor); +} + +void FDv2StreamingSynchronizer::Close() { + if (!close_promise_.Resolve(std::monostate{})) { + return; + } + state_->Shutdown(); +} + +std::string const& FDv2StreamingSynchronizer::Identity() const { + static std::string const identity = kIdentity; + return identity; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp new file mode 100644 index 000000000..e5686f9bb --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp @@ -0,0 +1,176 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_synchronizer.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +class FDv2StreamingSynchronizerTestPeer; + +/** + * FDv2 streaming synchronizer. Opens an SSE connection to the FDv2 streaming + * endpoint and translates the push-based event stream into the pull-based + * IFDv2Synchronizer::Next() interface. + * + * Threading model: + * Next() should only be called once at a time. + * Close() may be called concurrently with Next(). + * This object may be safely destroyed once no call to Next() or Close() + * is in progress. + */ +class FDv2StreamingSynchronizer final + : public data_interfaces::IFDv2Synchronizer { + friend class FDv2StreamingSynchronizerTestPeer; + + public: + /** + * Constructs a synchronizer that streams from the FDv2 streaming endpoint. + * If filter_key is present, only the specified payload filter is requested. + */ + FDv2StreamingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::milliseconds initial_reconnect_delay); + + ~FDv2StreamingSynchronizer() override; + + async::Future Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) override; + + void Close() override; + + [[nodiscard]] std::string const& Identity() const override; + + private: + // Any state that may be accessed by async SSE callbacks needs to be inside + // this class, managed by a shared_ptr. All mutable members are guarded by + // the mutex. + class State { + friend class FDv2StreamingSynchronizerTestPeer; + + public: + State(Logger logger, + boost::asio::any_io_executor const& executor, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::milliseconds initial_reconnect_delay); + + /** + * Updates the stored selector, starts the SSE client if not already + * running, and returns a Future resolving with the next result. + * + * If a buffered result is already available, the Future is resolved + * immediately. Otherwise it resolves when the next event arrives. + * + * The self parameter must be the shared_ptr that owns this State, + * used to form weak_ptr captures in SSE callbacks. + */ + async::Future Next( + data_model::Selector const& selector, + std::shared_ptr self); + + /** + * Cancels an outstanding Next() call without delivering a result. + * Any result that subsequently arrives will be buffered for the next + * Next() call. + */ + void ClearPendingPromise(); + + /** + * Marks the State as closed and shuts down the SSE client if one has + * been built. After Shutdown returns, EnsureStarted is guaranteed not + * to start a new client. Idempotent. + */ + void Shutdown(); + + /** + * Returns a Future that resolves after the given duration. Resolves + * early with false if the token is cancelled before the duration + * elapses. + */ + async::Future Delay(std::chrono::milliseconds duration, + async::CancellationToken token = {}); + + private: + using HttpRequest = + boost::beast::http::request; + + /** + * Starts the SSE client if not already running, and updates the stored + * selector so that the on-connect hook will use it for the next + * connection attempt. + */ + void EnsureStarted(data_model::Selector const& selector, + std::shared_ptr self); + + /** + * Delivers a result to the caller of Next(), or buffers it if no + * caller is waiting. + */ + void Notify(data_interfaces::FDv2SourceResult result); + + /** + * Per-connect hook for the SSE client. Overwrites the request target + * with the streaming path plus query parameters built from the latest + * stored selector and the (immutable) filter key. + */ + void OnConnect(HttpRequest* req); + + void OnEvent(sse::Event const& event); + void OnError(sse::Error const& error); + + // Logger is itself thread-safe. + Logger logger_; + + // Immutable state + config::built::ServiceEndpoints const endpoints_; + config::built::HttpProperties const http_properties_; + std::optional const filter_key_; + std::chrono::milliseconds const initial_reconnect_delay_; + boost::asio::any_io_executor const executor_; + + // Touched only from SSE callbacks, which all run on the same strand. + // No lock required. + FDv2ProtocolHandler protocol_handler_; + + // Mutable state, all guarded by mutex_. + std::mutex mutex_; + bool started_ = false; + bool closed_ = false; + data_model::Selector latest_selector_; + std::optional base_url_; + std::shared_ptr sse_client_; + std::optional> + pending_promise_; + std::deque result_queue_; + }; + + // Resolved by Close() or on destruction, cancelling any outstanding Next(). + async::Promise close_promise_; + + // Shared with async SSE callbacks. + std::shared_ptr state_; +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp new file mode 100644 index 000000000..cb9bdd8f6 --- /dev/null +++ b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp @@ -0,0 +1,602 @@ +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +// Test peer providing access to private State internals via the +// friend declaration in streaming_synchronizer.hpp. Tests drive the State's +// per-event / per-error / per-connect entry points directly so that no real +// SSE connection is required. +class FDv2StreamingSynchronizerTestPeer { + public: + static void OnEvent(FDv2StreamingSynchronizer& sync, + sse::Event const& event) { + sync.state_->OnEvent(event); + } + static void OnError(FDv2StreamingSynchronizer& sync, + sse::Error const& error) { + sync.state_->OnError(error); + } + static void OnConnect( + FDv2StreamingSynchronizer& sync, + boost::beast::http::request* req) { + sync.state_->OnConnect(req); + } + static void MarkStarted(FDv2StreamingSynchronizer& sync) { + std::lock_guard lock(sync.state_->mutex_); + sync.state_->started_ = true; + } + static void SetBaseUrl(FDv2StreamingSynchronizer& sync, + boost::urls::url url) { + std::lock_guard lock(sync.state_->mutex_); + sync.state_->base_url_ = std::move(url); + } + static void SetLatestSelector(FDv2StreamingSynchronizer& sync, + data_model::Selector selector) { + std::lock_guard lock(sync.state_->mutex_); + sync.state_->latest_selector_ = std::move(selector); + } +}; + +} // namespace launchdarkly::server_side::data_systems + +using namespace launchdarkly; +using namespace launchdarkly::server_side::data_interfaces; +using namespace launchdarkly::server_side::data_systems; +using namespace std::chrono_literals; + +namespace { + +Logger MakeNullLogger() { + struct NullBackend : ILogBackend { + bool Enabled(LogLevel) noexcept override { return false; } + void Write(LogLevel, std::string) noexcept override {} + }; + return Logger{std::make_shared()}; +} + +class IoContextRunner { + public: + IoContextRunner() : work_guard_(boost::asio::make_work_guard(ioc_)) { + thread_ = std::thread([this] { ioc_.run(); }); + } + ~IoContextRunner() { + work_guard_.reset(); + ioc_.stop(); + if (thread_.joinable()) { + thread_.join(); + } + } + boost::asio::io_context& context() { return ioc_; } + + private: + boost::asio::io_context ioc_; + boost::asio::executor_work_guard + work_guard_; + std::thread thread_; +}; + +config::shared::built::ServiceEndpoints MakeEndpoints(std::string streaming) { + return config::shared::built::ServiceEndpoints( + "polling", std::move(streaming), "events"); +} + +config::shared::built::HttpProperties MakeHttpProperties() { + return launchdarkly::server_side::config::builders::HttpPropertiesBuilder() + .Build(); +} + +} // namespace + +// ============================================================================ +// Lifecycle +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, NextBadEndpointUrlReturnsTerminalError) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, MakeEndpoints("not a url"), + MakeHttpProperties(), std::nullopt, 1s); + + // Act: trigger setup with a malformed streaming URL. URL parsing happens + // inside EnsureStarted on the first Next call. + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: TerminalError tells the orchestrator not to retry, which is the + // correct response to a misconfigured endpoint URL. + ASSERT_TRUE(result.has_value()); + auto* terminal = + std::get_if(&result->value); + ASSERT_NE(terminal, nullptr); + EXPECT_EQ(terminal->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError); +} + +TEST(FDv2StreamingSynchronizerTest, CloseBeforeNextReturnsShutdown) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + synchronizer.Close(); + + // Act: call Next on an already-closed synchronizer. + auto future = synchronizer.Next(5s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: Shutdown is delivered immediately (the outer Next short-circuits + // when close_promise_ is already resolved). + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +TEST(FDv2StreamingSynchronizerTest, CloseDuringPendingNextResolvesShutdown) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + + // Skip the SSE setup; we want Next to be pending purely on the + // close/timeout race, not on real network activity. + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // Act: start a pending Next, then Close while it is still pending. + auto future = synchronizer.Next(5s, data_model::Selector{}); + synchronizer.Close(); + auto result = future.WaitForResult(2s); + + // Assert: Close resolves the pending Next with Shutdown by winning the + // close-vs-timeout-vs-result race in the outer Next. + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +TEST(FDv2StreamingSynchronizerTest, NextTimeoutReturnsTimeout) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // Act: call Next with a short timeout and never deliver any event. + auto future = synchronizer.Next(100ms, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the timeout future wins the race and Next resolves with Timeout. + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +// ============================================================================ +// on_connect hook — URL/target construction +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, OnConnectEmptySelectorNoBasisParam) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::nullopt, 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + + boost::beast::http::request req; + + // Act: invoke the on_connect hook with no selector configured. + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // Assert: no basis query param appears in the target. The path itself is + // /sdk/stream as built from the streaming base URL. + EXPECT_EQ(req.target(), "/sdk/stream"); +} + +TEST(FDv2StreamingSynchronizerTest, OnConnectWithSelectorAppendsBasis) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::nullopt, 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + FDv2StreamingSynchronizerTestPeer::SetLatestSelector( + synchronizer, + data_model::Selector{data_model::Selector::State{1, "abc"}}); + + boost::beast::http::request req; + + // Act: invoke the on_connect hook with a non-empty selector. + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // Assert: the selector's state string is encoded as the basis query param. + EXPECT_EQ(req.target(), "/sdk/stream?basis=abc"); +} + +TEST(FDv2StreamingSynchronizerTest, OnConnectWithFilterKeyAppendsFilter) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::string("my-filter"), 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + + boost::beast::http::request req; + + // Act: invoke the on_connect hook with a configured filter key but no + // selector. + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // Assert: the filter key is encoded as the filter query param. + EXPECT_EQ(req.target(), "/sdk/stream?filter=my-filter"); +} + +TEST(FDv2StreamingSynchronizerTest, OnConnectReconnectUsesLatestSelector) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::nullopt, 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + + FDv2StreamingSynchronizerTestPeer::SetLatestSelector( + synchronizer, + data_model::Selector{data_model::Selector::State{1, "s0"}}); + boost::beast::http::request req1; + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req1); + + FDv2StreamingSynchronizerTestPeer::SetLatestSelector( + synchronizer, + data_model::Selector{data_model::Selector::State{2, "s1"}}); + + boost::beast::http::request req2; + + // Act: invoke the on_connect hook a second time after the stored selector + // has been updated (modeling a reconnect after a payload-transferred event + // has arrived). + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req2); + + // Assert: the second connection's basis reflects the updated selector, + // not the value captured at construction or first connect. + EXPECT_EQ(req1.target(), "/sdk/stream?basis=s0"); + EXPECT_EQ(req2.target(), "/sdk/stream?basis=s1"); +} + +TEST(FDv2StreamingSynchronizerTest, OnConnectSelectorStateIsPercentEncoded) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::nullopt, 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + FDv2StreamingSynchronizerTestPeer::SetLatestSelector( + synchronizer, + data_model::Selector{data_model::Selector::State{1, "a&b"}}); + + boost::beast::http::request req; + + // Act: invoke the on_connect hook with a selector state containing '&', + // which would terminate the basis value if left raw. + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // Assert: '&' is percent-encoded into the basis query param. + EXPECT_EQ(req.target(), "/sdk/stream?basis=a%26b"); +} + +// ============================================================================ +// SSE event translation +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, FullChangesetEventsReturnsChangeSet) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event server_intent("server-intent", + R"({"payloads":[{"id":"p1","target":1,)" + R"("intentCode":"xfer-full"}]})"); + sse::Event put_object( + "put-object", + R"({"version":1,"kind":"flag","key":"my-flag","object":)" + R"({"key":"my-flag","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":1}})"); + sse::Event payload_transferred("payload-transferred", + R"({"state":"abc","version":1})"); + + // Act: drive a full changeset (intent + put + transferred) through OnEvent + // and then read the queued result via the public Next API. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_intent); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, put_object); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, + payload_transferred); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: a ChangeSet result is delivered, and the translated payload + // contains the put item from the put-object event. + ASSERT_TRUE(result.has_value()); + auto* change_set = std::get_if(&result->value); + ASSERT_NE(change_set, nullptr); + EXPECT_FALSE(change_set->change_set.data.empty()); +} + +TEST(FDv2StreamingSynchronizerTest, GoodbyeEventReturnsGoodbye) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event goodbye("goodbye", R"({"reason":"bye"})"); + + // Act: deliver a goodbye event through OnEvent and read the queued result. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, goodbye); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: a Goodbye result with the wire reason is delivered, signaling + // the orchestrator to rotate sources. + ASSERT_TRUE(result.has_value()); + auto* g = std::get_if(&result->value); + ASSERT_NE(g, nullptr); + ASSERT_TRUE(g->reason.has_value()); + EXPECT_EQ(*g->reason, "bye"); +} + +TEST(FDv2StreamingSynchronizerTest, ServerErrorEventReturnsInterrupted) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event server_error("error", R"({"id":"abc","reason":"oops"})"); + + // Act: deliver an FDv2 server-side error event. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_error); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the error is reported as Interrupted{kErrorResponse}, with the + // formatted retry-will-occur message containing both the payload id and + // the server reason. + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(interrupted->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse); + EXPECT_NE(interrupted->error.Message().find("abc"), std::string::npos); + EXPECT_NE(interrupted->error.Message().find("oops"), std::string::npos); + EXPECT_NE(interrupted->error.Message().find("Automatic retry"), + std::string::npos); +} + +TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event bad_event("server-intent", "this is not json"); + + // Act: deliver an event whose data field cannot be parsed as JSON. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, bad_event); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the synchronizer reports Interrupted{kInvalidData} so the + // orchestrator knows the stream produced unparseable bytes. + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(interrupted->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kInvalidData); +} + +TEST(FDv2StreamingSynchronizerTest, HeartbeatEventNoResultDelivered) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event heartbeat("heartbeat", R"({})"); + + // Act: deliver a heartbeat event, then call Next with a short timeout. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, heartbeat); + auto future = synchronizer.Next(100ms, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the heartbeat does not produce any FDv2SourceResult, so Next + // resolves with Timeout instead. + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +TEST(FDv2StreamingSynchronizerTest, TranslationFailureReturnsInterrupted) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // A non-empty segment object missing required fields triggers a schema + // failure in the translator; matches the segment shape used by + // fdv2_changeset_translation_test's MalformedSegmentAbortsTranslation. + sse::Event server_intent("server-intent", + R"({"payloads":[{"id":"p1","target":1,)" + R"("intentCode":"xfer-full"}]})"); + sse::Event put_object( + "put-object", + R"({"version":1,"kind":"segment","key":"bad-seg","object":)" + R"({"key":"bad-seg"}})"); + sse::Event payload_transferred("payload-transferred", + R"({"state":"abc","version":1})"); + + // Act: drive a full changeset whose put-object payload won't deserialize + // into a Segment. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_intent); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, put_object); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, + payload_transferred); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the changeset is rejected with Interrupted{kInvalidData}. + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(interrupted->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kInvalidData); +} + +// ============================================================================ +// SSE error handling +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, + UnrecoverableStatus500ReturnsTerminalError) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Error error{sse::errors::UnrecoverableClientError{ + boost::beast::http::status::internal_server_error}}; + + // Act: deliver an unrecoverable HTTP 500 error from the SSE client. + FDv2StreamingSynchronizerTestPeer::OnError(synchronizer, error); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the synchronizer reports TerminalError{kErrorResponse} carrying + // the HTTP status, telling the orchestrator to stop retrying this source. + ASSERT_TRUE(result.has_value()); + auto* terminal = + std::get_if(&result->value); + ASSERT_NE(terminal, nullptr); + EXPECT_EQ(terminal->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse); + EXPECT_EQ(terminal->error.StatusCode(), 500u); +} + +TEST(FDv2StreamingSynchronizerTest, RecoverableReadTimeoutReturnsInterrupted) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Error error{sse::errors::ReadTimeout{std::chrono::milliseconds(100)}}; + + // Act: deliver a recoverable read-timeout error from the SSE client. + FDv2StreamingSynchronizerTestPeer::OnError(synchronizer, error); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the synchronizer reports Interrupted{kNetworkError} so the + // orchestrator treats this as a recoverable blip rather than a terminal + // failure. + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(interrupted->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError); +} diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index e5ae7a8b6..81958f849 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -159,13 +159,13 @@ class FoxyClient : public Client, void async_connect() override { boost::asio::post( - session_->get_executor(), + backoff_timer_.get_executor(), beast::bind_front_handler(&FoxyClient::do_run, shared_from_this())); } void async_restart(std::string const& reason) override { boost::asio::post( - session_->get_executor(), + backoff_timer_.get_executor(), beast::bind_front_handler(&FoxyClient::do_restart, shared_from_this(), reason)); } @@ -368,9 +368,10 @@ class FoxyClient : public Client, } void async_shutdown(std::function completion) override { - // Get on the session's executor, otherwise the code in the completion - // handler could race. - boost::asio::post(session_->get_executor(), + // Post onto the strand to avoid races in the completion handler. + // session_ can be replaced by create_session(), so use + // backoff_timer_'s executor (same strand, never replaced) instead. + boost::asio::post(backoff_timer_.get_executor(), beast::bind_front_handler(&FoxyClient::do_shutdown, shared_from_this(), std::move(completion)));