From 088418f29c9da6044d556b48d13ae3e3a89a27d2 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 21 Apr 2026 23:15:10 -0700 Subject: [PATCH 1/9] refactor: split json parsing/translation for future-proofing client/server distinction --- .../launchdarkly/data_model/change_set.hpp | 20 ++ .../launchdarkly/data_model/fdv2_change.hpp | 23 +- libs/internal/src/fdv2_protocol_handler.cpp | 95 +------ .../tests/fdv2_protocol_handler_test.cpp | 42 +--- libs/server-sdk/src/CMakeLists.txt | 2 + .../memory_store/memory_store.cpp | 11 +- .../memory_store/memory_store.hpp | 5 +- .../src/data_interfaces/item_change.hpp | 22 ++ .../source/fdv2_source_result.hpp | 16 +- .../fdv2/fdv2_changeset_translator.cpp | 97 +++++++ .../fdv2/fdv2_changeset_translator.hpp | 26 ++ .../data_systems/fdv2/fdv2_polling_impl.cpp | 33 ++- .../tests/fdv2_changeset_translator_test.cpp | 238 ++++++++++++++++++ .../tests/memory_store_apply_test.cpp | 42 ++-- 14 files changed, 508 insertions(+), 164 deletions(-) create mode 100644 libs/internal/include/launchdarkly/data_model/change_set.hpp create mode 100644 libs/server-sdk/src/data_interfaces/item_change.hpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.hpp create mode 100644 libs/server-sdk/tests/fdv2_changeset_translator_test.cpp diff --git a/libs/internal/include/launchdarkly/data_model/change_set.hpp b/libs/internal/include/launchdarkly/data_model/change_set.hpp new file mode 100644 index 000000000..d720e39bc --- /dev/null +++ b/libs/internal/include/launchdarkly/data_model/change_set.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include + +namespace launchdarkly::data_model { + +enum class ChangeSetType { + kFull = 0, + kPartial = 1, + kNone = 2, +}; + +template +struct ChangeSet { + ChangeSetType type; + Selector selector; + T data; +}; + +} // namespace launchdarkly::data_model diff --git a/libs/internal/include/launchdarkly/data_model/fdv2_change.hpp b/libs/internal/include/launchdarkly/data_model/fdv2_change.hpp index 0d45470aa..4600eb3a1 100644 --- a/libs/internal/include/launchdarkly/data_model/fdv2_change.hpp +++ b/libs/internal/include/launchdarkly/data_model/fdv2_change.hpp @@ -1,29 +1,28 @@ #pragma once -#include -#include -#include +#include #include +#include + +#include #include -#include #include namespace launchdarkly::data_model { struct FDv2Change { + enum class ChangeType { kPut, kDelete }; + + ChangeType change_type; + std::string kind; std::string key; - std::variant, ItemDescriptor> object; + uint64_t version; + boost::json::value object; // set for kPut; unused for kDelete }; struct FDv2ChangeSet { - enum class Type { - kFull = 0, - kPartial = 1, - kNone = 2, - }; - - Type type; + ChangeSetType type; std::vector changes; Selector selector; }; diff --git a/libs/internal/src/fdv2_protocol_handler.cpp b/libs/internal/src/fdv2_protocol_handler.cpp index d0564e45f..13b8e6823 100644 --- a/libs/internal/src/fdv2_protocol_handler.cpp +++ b/libs/internal/src/fdv2_protocol_handler.cpp @@ -1,11 +1,5 @@ #include -#include -#include -#include -#include -#include - #include #include @@ -20,66 +14,6 @@ static char const* const kGoodbye = "goodbye"; using Error = FDv2ProtocolHandler::Error; -// Returns the parsed FDv2Change on success, nullopt for unknown kinds (which -// should be silently skipped for forward-compatibility), or an Error if -// a known kind fails to deserialize. -static tl::expected, Error> ParsePut( - PutObject const& put) { - if (put.kind == "flag") { - auto result = boost::json::value_to< - tl::expected, JsonError>>( - put.object); - // One bad flag aborts the entire transfer so the store is never - // left in a partially-updated state. - if (!result) { - return tl::make_unexpected(Error::JsonParseError( - result.error(), - "could not deserialize flag '" + put.key + "'")); - } - if (!result->has_value()) { - return tl::make_unexpected(Error::JsonParseError( - "flag '" + put.key + "' object was null")); - } - return data_model::FDv2Change{ - put.key, - data_model::ItemDescriptor{std::move(**result)}}; - } - if (put.kind == "segment") { - auto result = boost::json::value_to< - tl::expected, JsonError>>( - put.object); - // One bad segment aborts the entire transfer so the store is never - // left in a partially-updated state. - if (!result) { - return tl::make_unexpected(Error::JsonParseError( - result.error(), - "could not deserialize segment '" + put.key + "'")); - } - if (!result->has_value()) { - return tl::make_unexpected(Error::JsonParseError( - "segment '" + put.key + "' object was null")); - } - return data_model::FDv2Change{ - put.key, data_model::ItemDescriptor{ - std::move(**result)}}; - } - // Silently skip unknown kinds for forward-compatibility. - return std::nullopt; -} - -static data_model::FDv2Change MakeDeleteChange(DeleteObject const& del) { - if (del.kind == "flag") { - return data_model::FDv2Change{ - del.key, - data_model::ItemDescriptor{ - data_model::Tombstone{static_cast(del.version)}}}; - } - return data_model::FDv2Change{ - del.key, - data_model::ItemDescriptor{ - data_model::Tombstone{static_cast(del.version)}}}; -} - FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleEvent( std::string_view event_type, boost::json::value const& data) { @@ -137,7 +71,7 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleServerIntent( // kNone or kUnknown: emit an empty changeset immediately. state_ = State::kInactive; return data_model::FDv2ChangeSet{ - data_model::FDv2ChangeSet::Type::kNone, {}, data_model::Selector{}}; + data_model::ChangeSetType::kNone, {}, data_model::Selector{}}; } return std::monostate{}; } @@ -158,14 +92,10 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePutObject( Reset(); return Error::JsonParseError("put-object data was null"); } - auto change = ParsePut(**result); - if (!change) { - Reset(); - return std::move(change.error()); - } - if (*change) { - changes_.push_back(std::move(**change)); - } + auto const& put = **result; + changes_.push_back(data_model::FDv2Change{ + data_model::FDv2Change::ChangeType::kPut, put.kind, put.key, + static_cast(put.version), put.object}); return std::monostate{}; } @@ -186,11 +116,12 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleDeleteObject( return Error::JsonParseError("delete-object data was null"); } auto const& del = **result; - // Silently skip unknown kinds for forward-compatibility. - if (del.kind != "flag" && del.kind != "segment") { - return std::monostate{}; - } - changes_.push_back(MakeDeleteChange(del)); + changes_.push_back( + data_model::FDv2Change{data_model::FDv2Change::ChangeType::kDelete, + del.kind, + del.key, + static_cast(del.version), + {}}); return std::monostate{}; } @@ -215,8 +146,8 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePayloadTransferred( } auto const& transferred = **result; auto type = (state_ == State::kPartial) - ? data_model::FDv2ChangeSet::Type::kPartial - : data_model::FDv2ChangeSet::Type::kFull; + ? data_model::ChangeSetType::kPartial + : data_model::ChangeSetType::kFull; data_model::FDv2ChangeSet changeset{ type, std::move(changes_), data_model::Selector{data_model::Selector::State{transferred.version, diff --git a/libs/internal/tests/fdv2_protocol_handler_test.cpp b/libs/internal/tests/fdv2_protocol_handler_test.cpp index 4a7d9da0d..fb428c243 100644 --- a/libs/internal/tests/fdv2_protocol_handler_test.cpp +++ b/libs/internal/tests/fdv2_protocol_handler_test.cpp @@ -1,5 +1,6 @@ #include +#include #include #include @@ -56,7 +57,7 @@ TEST(FDv2ProtocolHandlerTest, NoneIntentEmitsEmptyChangeSetImmediately) { auto* cs = std::get_if(&result); ASSERT_NE(cs, nullptr); - EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kNone); + EXPECT_EQ(cs->type, data_model::ChangeSetType::kNone); EXPECT_TRUE(cs->changes.empty()); EXPECT_FALSE(cs->selector.value.has_value()); } @@ -81,7 +82,7 @@ TEST(FDv2ProtocolHandlerTest, FullIntentEmitsChangeSetOnPayloadTransferred) { auto* cs = std::get_if(&r3); ASSERT_NE(cs, nullptr); - EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kFull); + EXPECT_EQ(cs->type, data_model::ChangeSetType::kFull); EXPECT_EQ(cs->changes.size(), 1u); EXPECT_EQ(cs->changes[0].key, "my-flag"); ASSERT_TRUE(cs->selector.value.has_value()); @@ -105,7 +106,7 @@ TEST(FDv2ProtocolHandlerTest, FullIntentAccumulatesMultipleObjects) { auto* cs = std::get_if(&result); ASSERT_NE(cs, nullptr); - EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kFull); + EXPECT_EQ(cs->type, data_model::ChangeSetType::kFull); EXPECT_EQ(cs->changes.size(), 3u); } @@ -132,7 +133,7 @@ TEST(FDv2ProtocolHandlerTest, auto* cs = std::get_if(&result); ASSERT_NE(cs, nullptr); - EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kPartial); + EXPECT_EQ(cs->type, data_model::ChangeSetType::kPartial); ASSERT_EQ(cs->changes.size(), 1u); EXPECT_EQ(cs->changes[0].key, "flag-2"); } @@ -153,7 +154,7 @@ TEST(FDv2ProtocolHandlerTest, PartialIntentEmitsPartialChangeSet) { auto* cs = std::get_if(&result); ASSERT_NE(cs, nullptr); - EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kPartial); + EXPECT_EQ(cs->type, data_model::ChangeSetType::kPartial); EXPECT_EQ(cs->changes.size(), 1u); EXPECT_EQ(cs->changes[0].key, "my-seg"); ASSERT_TRUE(cs->selector.value.has_value()); @@ -164,7 +165,7 @@ TEST(FDv2ProtocolHandlerTest, PartialIntentEmitsPartialChangeSet) { // Unknown kind in put-object → silently skipped // ============================================================================ -TEST(FDv2ProtocolHandlerTest, UnknownKindInPutObjectIsSilentlySkipped) { +TEST(FDv2ProtocolHandlerTest, UnknownKindInPutObjectIsPassedThrough) { FDv2ProtocolHandler handler; handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); @@ -179,12 +180,11 @@ TEST(FDv2ProtocolHandlerTest, UnknownKindInPutObjectIsSilentlySkipped) { auto* cs = std::get_if(&result); ASSERT_NE(cs, nullptr); - // Only the known kind (flag) should appear. - EXPECT_EQ(cs->changes.size(), 1u); - EXPECT_EQ(cs->changes[0].key, "my-flag"); + // All kinds pass through; filtering happens in the translator. + EXPECT_EQ(cs->changes.size(), 2u); } -TEST(FDv2ProtocolHandlerTest, UnknownKindInDeleteObjectIsSilentlySkipped) { +TEST(FDv2ProtocolHandlerTest, UnknownKindInDeleteObjectIsPassedThrough) { FDv2ProtocolHandler handler; handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); @@ -198,9 +198,8 @@ TEST(FDv2ProtocolHandlerTest, UnknownKindInDeleteObjectIsSilentlySkipped) { auto* cs = std::get_if(&result); ASSERT_NE(cs, nullptr); - // Only the known kind (flag) should appear. - EXPECT_EQ(cs->changes.size(), 1u); - EXPECT_EQ(cs->changes[0].key, "my-flag"); + // All kinds pass through; filtering happens in the translator. + EXPECT_EQ(cs->changes.size(), 2u); } // ============================================================================ @@ -252,23 +251,6 @@ TEST(FDv2ProtocolHandlerTest, ErrorEventWithIdSetsServerId) { EXPECT_EQ(err->server_error->reason, "overloaded"); } -TEST(FDv2ProtocolHandlerTest, MalformedPutObjectReturnsJsonError) { - FDv2ProtocolHandler handler; - - handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); - - // 'object' field is missing required flag fields — deserialisation fails. - auto result = handler.HandleEvent( - "put-object", - boost::json::parse( - R"({"version":1,"kind":"flag","key":"f","object":{}})")); - - auto* err = std::get_if(&result); - ASSERT_NE(err, nullptr); - EXPECT_EQ(err->kind, FDv2ProtocolHandler::Error::Kind::kJsonError); - EXPECT_TRUE(err->json_error.has_value()); -} - // ============================================================================ // goodbye event → return Goodbye // ============================================================================ diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index 09f98465b..eb3841179 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -49,6 +49,8 @@ target_sources(${LIBNAME} data_systems/background_sync/detail/payload_filter_validation/payload_filter_validation.cpp data_systems/background_sync/sources/polling/polling_data_source.hpp data_systems/background_sync/sources/polling/polling_data_source.cpp + data_systems/fdv2/fdv2_changeset_translator.hpp + data_systems/fdv2/fdv2_changeset_translator.cpp data_systems/fdv2/fdv2_polling_impl.hpp data_systems/fdv2/fdv2_polling_impl.cpp data_systems/fdv2/polling_initializer.hpp diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp index c71acf08a..2eee8ff03 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.cpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.cpp @@ -84,15 +84,16 @@ bool MemoryStore::RemoveSegment(std::string const& key) { return segments_.erase(key) == 1; } -void MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { +void MemoryStore::Apply( + data_model::ChangeSet changeSet) { std::lock_guard lock{data_mutex_}; switch (changeSet.type) { - case data_model::FDv2ChangeSet::Type::kNone: + case data_model::ChangeSetType::kNone: return; - case data_model::FDv2ChangeSet::Type::kPartial: + case data_model::ChangeSetType::kPartial: break; - case data_model::FDv2ChangeSet::Type::kFull: + case data_model::ChangeSetType::kFull: initialized_ = true; flags_.clear(); segments_.clear(); @@ -101,7 +102,7 @@ void MemoryStore::Apply(data_model::FDv2ChangeSet changeSet) { detail::unreachable(); } - for (auto& change : changeSet.changes) { + for (auto& change : changeSet.data) { if (std::holds_alternative(change.object)) { flags_[change.key] = std::make_shared( std::move(std::get(change.object))); diff --git a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp index e9a067881..7bda17d3f 100644 --- a/libs/server-sdk/src/data_components/memory_store/memory_store.hpp +++ b/libs/server-sdk/src/data_components/memory_store/memory_store.hpp @@ -1,9 +1,10 @@ #pragma once #include "../../data_interfaces/destination/idestination.hpp" +#include "../../data_interfaces/item_change.hpp" #include "../../data_interfaces/store/istore.hpp" -#include +#include #include #include @@ -46,7 +47,7 @@ class MemoryStore final : public data_interfaces::IStore, bool RemoveSegment(std::string const& key); - void Apply(data_model::FDv2ChangeSet changeSet); + void Apply(data_model::ChangeSet changeSet); MemoryStore() = default; ~MemoryStore() override = default; diff --git a/libs/server-sdk/src/data_interfaces/item_change.hpp b/libs/server-sdk/src/data_interfaces/item_change.hpp new file mode 100644 index 000000000..1c2cc8292 --- /dev/null +++ b/libs/server-sdk/src/data_interfaces/item_change.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace launchdarkly::server_side::data_interfaces { + +struct ItemChange { + std::string key; + std::variant, + data_model::ItemDescriptor> + object; +}; + +using ChangeSetData = std::vector; + +} // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp index 069fe5a38..a2a7589e3 100644 --- a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp +++ b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp @@ -1,6 +1,8 @@ #pragma once -#include +#include "../item_change.hpp" + +#include #include #include @@ -11,8 +13,6 @@ namespace launchdarkly::server_side::data_interfaces { /** * Result returned by IFDv2Initializer::Run and IFDv2Synchronizer::Next. - * - * Mirrors Java's FDv2SourceResult. */ struct FDv2SourceResult { using ErrorInfo = common::data_sources::DataSourceStatusErrorInfo; @@ -21,7 +21,7 @@ struct FDv2SourceResult { * A changeset was successfully received and is ready to apply. */ struct ChangeSet { - data_model::FDv2ChangeSet change_set; + data_model::ChangeSet change_set; /** If true, the server signaled that the client should fall back to * FDv1. */ bool fdv1_fallback; @@ -61,8 +61,12 @@ struct FDv2SourceResult { */ struct Timeout {}; - using Value = std::variant; + using Value = std::variant; Value value; }; diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.cpp new file mode 100644 index 000000000..70a2bd2be --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.cpp @@ -0,0 +1,97 @@ +#include "fdv2_changeset_translator.hpp" + +#include +#include +#include +#include +#include + +#include +#include + +namespace launchdarkly::server_side::data_systems { + +using data_interfaces::ChangeSetData; +using data_interfaces::ItemChange; +using data_model::ChangeSet; +using data_model::ChangeSetType; +using data_model::FDv2ChangeSet; + +std::optional> FDv2ChangeSetTranslator::Translate( + FDv2ChangeSet const& change_set, + Logger const& logger) const { + if (change_set.type == ChangeSetType::kNone) { + return ChangeSet{ + change_set.type, change_set.selector, {}}; + } + + ChangeSetData changes; + changes.reserve(change_set.changes.size()); + + for (auto const& change : change_set.changes) { + if (change.change_type == data_model::FDv2Change::ChangeType::kDelete) { + if (change.kind == "flag") { + changes.push_back(ItemChange{ + change.key, data_model::ItemDescriptor{ + data_model::Tombstone{change.version}}}); + } else if (change.kind == "segment") { + changes.push_back(ItemChange{ + change.key, data_model::ItemDescriptor{ + data_model::Tombstone{change.version}}}); + } else { + LD_LOG(logger, LogLevel::kWarn) + << "FDv2: unknown kind '" << change.kind + << "' in delete-object, skipping"; + } + } else { + if (change.kind == "flag") { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + change.object); + if (!result) { + LD_LOG(logger, LogLevel::kError) + << "FDv2: could not deserialize flag '" << change.key + << "'"; + return std::nullopt; + } + if (!result->has_value()) { + LD_LOG(logger, LogLevel::kWarn) + << "FDv2: flag '" << change.key + << "' object was null, skipping"; + continue; + } + changes.push_back(ItemChange{ + change.key, data_model::ItemDescriptor{ + std::move(**result)}}); + } else if (change.kind == "segment") { + auto result = boost::json::value_to, JsonError>>( + change.object); + if (!result) { + LD_LOG(logger, LogLevel::kError) + << "FDv2: could not deserialize segment '" << change.key + << "'"; + return std::nullopt; + } + if (!result->has_value()) { + LD_LOG(logger, LogLevel::kWarn) + << "FDv2: segment '" << change.key + << "' object was null, skipping"; + continue; + } + changes.push_back(ItemChange{ + change.key, data_model::ItemDescriptor{ + std::move(**result)}}); + } else { + LD_LOG(logger, LogLevel::kWarn) + << "FDv2: unknown kind '" << change.kind + << "' in put-object, skipping"; + } + } + } + + return ChangeSet{change_set.type, change_set.selector, + std::move(changes)}; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.hpp new file mode 100644 index 000000000..5763a31eb --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include "../../data_interfaces/item_change.hpp" + +#include +#include +#include + +#include + +namespace launchdarkly::server_side::data_systems { + +class FDv2ChangeSetTranslator { + public: + /** + * Translates a changeset into typed changes ready to apply to the store. + * + * Unknown kinds are warned and skipped. If any known kind fails to + * deserialize, the entire changeset is aborted and nullopt is returned. + */ + std::optional> + Translate(data_model::FDv2ChangeSet const& change_set, + Logger const& logger) const; +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp index df6fda9af..b09a85162 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp @@ -1,4 +1,5 @@ #include "fdv2_polling_impl.hpp" +#include "fdv2_changeset_translator.hpp" #include #include @@ -17,6 +18,8 @@ static char const* const kErrorMissingEvents = "FDv2 polling response missing 'events' array"; static char const* const kErrorIncompletePayload = "FDv2 polling response did not contain a complete payload"; +static char const* const kErrorTranslation = + "FDv2 polling response could not be translated"; using data_interfaces::FDv2SourceResult; using ErrorInfo = FDv2SourceResult::ErrorInfo; @@ -57,7 +60,9 @@ network::HttpRequest MakeFDv2PollRequest( static FDv2SourceResult ParseFDv2PollEvents( boost::json::array const& events, - FDv2ProtocolHandler* protocol_handler) { + FDv2ProtocolHandler* protocol_handler, + FDv2ChangeSetTranslator const& translator, + Logger const& logger) { for (auto const& event_val : events) { auto const* event_obj = event_val.if_object(); if (!event_obj) { @@ -79,9 +84,16 @@ static FDv2SourceResult ParseFDv2PollEvents( std::string_view{event_type_str->data(), event_type_str->size()}, *event_data_val); - if (auto* changeset = std::get_if(&result)) { + if (auto* change_set = + std::get_if(&result)) { + auto typed = translator.Translate(*change_set, logger); + if (!typed) { + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorTranslation), + false}}; + } return FDv2SourceResult{ - FDv2SourceResult::ChangeSet{std::move(*changeset), false}}; + FDv2SourceResult::ChangeSet{std::move(*typed), false}}; } if (auto* goodbye = std::get_if(&result)) { return FDv2SourceResult{ @@ -110,7 +122,9 @@ static FDv2SourceResult ParseFDv2PollEvents( static FDv2SourceResult ParseFDv2PollResponse( std::string const& body, - FDv2ProtocolHandler* protocol_handler) { + FDv2ProtocolHandler* protocol_handler, + FDv2ChangeSetTranslator const& translator, + Logger const& logger) { boost::system::error_code ec; auto parsed = boost::json::parse(body, ec); if (ec) { @@ -136,7 +150,8 @@ static FDv2SourceResult ParseFDv2PollResponse( MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}}; } - return ParseFDv2PollEvents(*events_arr, protocol_handler); + return ParseFDv2PollEvents(*events_arr, protocol_handler, translator, + logger); } data_interfaces::FDv2SourceResult HandleFDv2PollResponse( @@ -155,8 +170,8 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( if (res.Status() == 304) { return FDv2SourceResult{FDv2SourceResult::ChangeSet{ - data_model::FDv2ChangeSet{ - data_model::FDv2ChangeSet::Type::kNone, {}, {}}, + data_model::ChangeSet{ + data_model::ChangeSetType::kNone, data_model::Selector{}, {}}, false}}; } @@ -169,7 +184,9 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( false}}; } - auto result = ParseFDv2PollResponse(*body, protocol_handler); + FDv2ChangeSetTranslator translator; + auto result = + ParseFDv2PollResponse(*body, protocol_handler, translator, logger); if (auto* interrupted = std::get_if(&result.value)) { if (interrupted->error.Kind() == ErrorKind::kErrorResponse) { diff --git a/libs/server-sdk/tests/fdv2_changeset_translator_test.cpp b/libs/server-sdk/tests/fdv2_changeset_translator_test.cpp new file mode 100644 index 000000000..a5111ce39 --- /dev/null +++ b/libs/server-sdk/tests/fdv2_changeset_translator_test.cpp @@ -0,0 +1,238 @@ +#include + +#include + +#include +#include +#include + +#include + +using namespace launchdarkly; +using namespace launchdarkly::data_model; +using namespace launchdarkly::server_side::data_interfaces; +using namespace launchdarkly::server_side::data_systems; + +// Minimal valid flag JSON accepted by the Flag deserializer. +static char const* const kFlagJson = + R"({"key":"my-flag","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":1})"; + +// Minimal valid segment JSON accepted by the Segment deserializer. +static char const* const kSegmentJson = + R"({"key":"my-seg","version":2,"rules":[],"included":[],"excluded":[]})"; + +static Logger MakeNullLogger() { + struct NullBackend : ILogBackend { + bool Enabled(LogLevel) noexcept override { return false; } + void Write(LogLevel, std::string) noexcept override {} + }; + return Logger{std::make_shared()}; +} + +// ============================================================================ +// kNone changeset +// ============================================================================ + +TEST(FDv2ChangeSetTranslatorTest, NoneChangeSetProducesEmptyTypedChangeSet) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ChangeSetType::kNone, {}, Selector{}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->type, ChangeSetType::kNone); + EXPECT_TRUE(result->data.empty()); +} + +// ============================================================================ +// Known kinds — put +// ============================================================================ + +TEST(FDv2ChangeSetTranslatorTest, PutFlagProducesTypedFlag) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ChangeSetType::kFull, + {FDv2Change{FDv2Change::ChangeType::kPut, "flag", + "my-flag", 1, boost::json::parse(kFlagJson)}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result->data.size(), 1u); + EXPECT_EQ(result->data[0].key, "my-flag"); + EXPECT_TRUE( + std::holds_alternative>(result->data[0].object)); +} + +TEST(FDv2ChangeSetTranslatorTest, PutSegmentProducesTypedSegment) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ + ChangeSetType::kFull, + {FDv2Change{FDv2Change::ChangeType::kPut, "segment", "my-seg", 2, + boost::json::parse(kSegmentJson)}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result->data.size(), 1u); + EXPECT_EQ(result->data[0].key, "my-seg"); + EXPECT_TRUE(std::holds_alternative>( + result->data[0].object)); +} + +// ============================================================================ +// Known kinds — delete +// ============================================================================ + +TEST(FDv2ChangeSetTranslatorTest, DeleteFlagProducesFlagTombstone) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ + ChangeSetType::kPartial, + {FDv2Change{FDv2Change::ChangeType::kDelete, "flag", "my-flag", 5, {}}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result->data.size(), 1u); + EXPECT_EQ(result->data[0].key, "my-flag"); + auto const* desc = + std::get_if>(&result->data[0].object); + ASSERT_NE(desc, nullptr); + EXPECT_EQ(desc->version, 5u); + EXPECT_FALSE(desc->item.has_value()); +} + +TEST(FDv2ChangeSetTranslatorTest, DeleteSegmentProducesSegmentTombstone) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ + ChangeSetType::kPartial, + {FDv2Change{ + FDv2Change::ChangeType::kDelete, "segment", "my-seg", 3, {}}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result->data.size(), 1u); + auto const* desc = + std::get_if>(&result->data[0].object); + ASSERT_NE(desc, nullptr); + EXPECT_EQ(desc->version, 3u); + EXPECT_FALSE(desc->item.has_value()); +} + +// ============================================================================ +// Unknown kind — skipped +// ============================================================================ + +TEST(FDv2ChangeSetTranslatorTest, UnknownKindInPutIsSkipped) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ + ChangeSetType::kFull, + {FDv2Change{FDv2Change::ChangeType::kPut, "experiment", "exp-1", 1, + boost::json::parse(R"({"key":"exp-1","version":1})")}, + FDv2Change{FDv2Change::ChangeType::kPut, "flag", "my-flag", 1, + boost::json::parse(kFlagJson)}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result->data.size(), 1u); + EXPECT_EQ(result->data[0].key, "my-flag"); +} + +TEST(FDv2ChangeSetTranslatorTest, UnknownKindInDeleteIsSkipped) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ + ChangeSetType::kFull, + {FDv2Change{ + FDv2Change::ChangeType::kDelete, "experiment", "exp-1", 1, {}}, + FDv2Change{FDv2Change::ChangeType::kDelete, "flag", "my-flag", 2, {}}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result->data.size(), 1u); + EXPECT_EQ(result->data[0].key, "my-flag"); +} + +// ============================================================================ +// Null object on put — skipped +// ============================================================================ + +TEST(FDv2ChangeSetTranslatorTest, NullObjectInPutFlagIsSkipped) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ChangeSetType::kFull, + {FDv2Change{FDv2Change::ChangeType::kPut, "flag", + "my-flag", 1, boost::json::value{nullptr}}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result->data.empty()); +} + +// ============================================================================ +// Deserialization failure — abort +// ============================================================================ + +TEST(FDv2ChangeSetTranslatorTest, MalformedFlagAbortsTranslation) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ChangeSetType::kFull, + {FDv2Change{FDv2Change::ChangeType::kPut, "flag", + "bad-flag", 1, boost::json::parse(R"({})")}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + EXPECT_FALSE(result.has_value()); +} + +TEST(FDv2ChangeSetTranslatorTest, MalformedSegmentAbortsTranslation) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + // A non-empty object missing required fields triggers a schema failure + // (the deserializer treats an empty object as null/skip, not an error). + FDv2ChangeSet raw{ + ChangeSetType::kFull, + {FDv2Change{FDv2Change::ChangeType::kPut, "segment", "bad-seg", 1, + boost::json::parse(R"({"key":"bad-seg"})")}}, + Selector{}}; + auto result = translator.Translate(raw, logger); + + EXPECT_FALSE(result.has_value()); +} + +// ============================================================================ +// Selector is preserved +// ============================================================================ + +TEST(FDv2ChangeSetTranslatorTest, SelectorIsPreserved) { + FDv2ChangeSetTranslator translator; + auto logger = MakeNullLogger(); + + FDv2ChangeSet raw{ + ChangeSetType::kFull, {}, Selector{Selector::State{7, "state-abc"}}}; + auto result = translator.Translate(raw, logger); + + ASSERT_TRUE(result.has_value()); + ASSERT_TRUE(result->selector.value.has_value()); + EXPECT_EQ(result->selector.value->state, "state-abc"); + EXPECT_EQ(result->selector.value->version, 7); +} diff --git a/libs/server-sdk/tests/memory_store_apply_test.cpp b/libs/server-sdk/tests/memory_store_apply_test.cpp index 003285c53..00ecf62b4 100644 --- a/libs/server-sdk/tests/memory_store_apply_test.cpp +++ b/libs/server-sdk/tests/memory_store_apply_test.cpp @@ -1,10 +1,14 @@ #include #include +#include + +#include #include using namespace launchdarkly::data_model; using namespace launchdarkly::server_side::data_components; +using namespace launchdarkly::server_side::data_interfaces; // --------------------------------------------------------------------------- // kNone tests @@ -27,7 +31,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { {"segA", SegmentDescriptor(seg_a)}}, }); - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + store.Apply(ChangeSet{ChangeSetType::kNone, Selector{}, {}}); auto fetched_flag = store.GetFlag("flagA"); ASSERT_TRUE(fetched_flag); @@ -39,7 +43,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { MemoryStore store; - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kNone, {}, Selector{}}); + store.Apply(ChangeSet{ChangeSetType::kNone, Selector{}, {}}); EXPECT_FALSE(store.Initialized()); } @@ -50,7 +54,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { TEST(MemoryStoreApplyTest, ApplyFull_SetsInitialized) { MemoryStore store; ASSERT_FALSE(store.Initialized()); - store.Apply(FDv2ChangeSet{FDv2ChangeSet::Type::kFull, {}, Selector{}}); + store.Apply(ChangeSet{ChangeSetType::kFull, Selector{}, {}}); EXPECT_TRUE(store.Initialized()); } @@ -64,11 +68,11 @@ TEST(MemoryStoreApplyTest, ApplyFull_StoresItems) { seg_a.version = 1; seg_a.key = "segA"; - store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kFull, - std::vector{{"flagA", FlagDescriptor(flag_a)}, - {"segA", SegmentDescriptor(seg_a)}}, + store.Apply(ChangeSet{ + ChangeSetType::kFull, Selector{}, + ChangeSetData{ItemChange{"flagA", FlagDescriptor(flag_a)}, + ItemChange{"segA", SegmentDescriptor(seg_a)}}, }); auto fetched_flag = store.GetFlag("flagA"); @@ -114,11 +118,11 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingItems) { seg_b.version = 1; seg_b.key = "segB"; - store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kFull, - std::vector{{"flagC", FlagDescriptor(flag_c)}, - {"segB", SegmentDescriptor(seg_b)}}, + store.Apply(ChangeSet{ + ChangeSetType::kFull, Selector{}, + ChangeSetData{ItemChange{"flagC", FlagDescriptor(flag_c)}, + ItemChange{"segB", SegmentDescriptor(seg_b)}}, }); EXPECT_FALSE(store.GetFlag("flagA")); @@ -157,11 +161,11 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesItems) { seg_a_new.version = 6; seg_a_new.key = "segA"; - store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagA", FlagDescriptor(flag_a_new)}, - {"segA", SegmentDescriptor(seg_a_new)}}, + store.Apply(ChangeSet{ + ChangeSetType::kPartial, Selector{}, + ChangeSetData{ItemChange{"flagA", FlagDescriptor(flag_a_new)}, + ItemChange{"segA", SegmentDescriptor(seg_a_new)}}, }); ASSERT_TRUE(store.GetFlag("flagA")); @@ -205,11 +209,11 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedItems) { seg_b_new.version = 2; seg_b_new.key = "segB"; - store.Apply(FDv2ChangeSet{ - FDv2ChangeSet::Type::kPartial, - std::vector{{"flagB", FlagDescriptor(flag_b_new)}, - {"segB", SegmentDescriptor(seg_b_new)}}, + store.Apply(ChangeSet{ + ChangeSetType::kPartial, Selector{}, + ChangeSetData{ItemChange{"flagB", FlagDescriptor(flag_b_new)}, + ItemChange{"segB", SegmentDescriptor(seg_b_new)}}, }); ASSERT_TRUE(store.GetFlag("flagA")); From c990e300709392a851c90ce4d3bf06f119ce0353 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 21 Apr 2026 23:34:15 -0700 Subject: [PATCH 2/9] refactor: make the changeset translator a function, not a class --- libs/server-sdk/src/CMakeLists.txt | 4 +- ...tor.cpp => fdv2_changeset_translation.cpp} | 6 +- .../fdv2/fdv2_changeset_translation.hpp | 23 ++++++++ .../fdv2/fdv2_changeset_translator.hpp | 26 --------- .../data_systems/fdv2/fdv2_polling_impl.cpp | 13 ++--- ...pp => fdv2_changeset_translation_test.cpp} | 57 ++++++++----------- 6 files changed, 55 insertions(+), 74 deletions(-) rename libs/server-sdk/src/data_systems/fdv2/{fdv2_changeset_translator.cpp => fdv2_changeset_translation.cpp} (96%) create mode 100644 libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.hpp delete mode 100644 libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.hpp rename libs/server-sdk/tests/{fdv2_changeset_translator_test.cpp => fdv2_changeset_translation_test.cpp} (79%) diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index eb3841179..f8fac7fb3 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -49,8 +49,8 @@ target_sources(${LIBNAME} data_systems/background_sync/detail/payload_filter_validation/payload_filter_validation.cpp data_systems/background_sync/sources/polling/polling_data_source.hpp data_systems/background_sync/sources/polling/polling_data_source.cpp - data_systems/fdv2/fdv2_changeset_translator.hpp - data_systems/fdv2/fdv2_changeset_translator.cpp + data_systems/fdv2/fdv2_changeset_translation.hpp + data_systems/fdv2/fdv2_changeset_translation.cpp data_systems/fdv2/fdv2_polling_impl.hpp data_systems/fdv2/fdv2_polling_impl.cpp data_systems/fdv2/polling_initializer.hpp diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp similarity index 96% rename from libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.cpp rename to libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp index 70a2bd2be..8e138ced4 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp @@ -1,4 +1,4 @@ -#include "fdv2_changeset_translator.hpp" +#include "fdv2_changeset_translation.hpp" #include #include @@ -17,9 +17,9 @@ using data_model::ChangeSet; using data_model::ChangeSetType; using data_model::FDv2ChangeSet; -std::optional> FDv2ChangeSetTranslator::Translate( +std::optional> TranslateChangeSet( FDv2ChangeSet const& change_set, - Logger const& logger) const { + Logger const& logger) { if (change_set.type == ChangeSetType::kNone) { return ChangeSet{ change_set.type, change_set.selector, {}}; diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.hpp new file mode 100644 index 000000000..8e3aea9f1 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include "../../data_interfaces/item_change.hpp" + +#include +#include +#include + +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * Translates an FDv2ChangeSet into typed changes ready to apply to the store. + * + * Unknown kinds are warned and skipped. If any known kind fails to + * deserialize, the entire changeset is aborted and nullopt is returned. + */ +std::optional> +TranslateChangeSet(data_model::FDv2ChangeSet const& change_set, + Logger const& logger); + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.hpp deleted file mode 100644 index 5763a31eb..000000000 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translator.hpp +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include "../../data_interfaces/item_change.hpp" - -#include -#include -#include - -#include - -namespace launchdarkly::server_side::data_systems { - -class FDv2ChangeSetTranslator { - public: - /** - * Translates a changeset into typed changes ready to apply to the store. - * - * Unknown kinds are warned and skipped. If any known kind fails to - * deserialize, the entire changeset is aborted and nullopt is returned. - */ - std::optional> - Translate(data_model::FDv2ChangeSet const& change_set, - Logger const& logger) const; -}; - -} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp index b09a85162..4deb66d70 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp @@ -1,5 +1,5 @@ #include "fdv2_polling_impl.hpp" -#include "fdv2_changeset_translator.hpp" +#include "fdv2_changeset_translation.hpp" #include #include @@ -61,7 +61,6 @@ network::HttpRequest MakeFDv2PollRequest( static FDv2SourceResult ParseFDv2PollEvents( boost::json::array const& events, FDv2ProtocolHandler* protocol_handler, - FDv2ChangeSetTranslator const& translator, Logger const& logger) { for (auto const& event_val : events) { auto const* event_obj = event_val.if_object(); @@ -86,7 +85,7 @@ static FDv2SourceResult ParseFDv2PollEvents( if (auto* change_set = std::get_if(&result)) { - auto typed = translator.Translate(*change_set, logger); + auto typed = TranslateChangeSet(*change_set, logger); if (!typed) { return FDv2SourceResult{FDv2SourceResult::Interrupted{ MakeError(ErrorKind::kInvalidData, 0, kErrorTranslation), @@ -123,7 +122,6 @@ static FDv2SourceResult ParseFDv2PollEvents( static FDv2SourceResult ParseFDv2PollResponse( std::string const& body, FDv2ProtocolHandler* protocol_handler, - FDv2ChangeSetTranslator const& translator, Logger const& logger) { boost::system::error_code ec; auto parsed = boost::json::parse(body, ec); @@ -150,8 +148,7 @@ static FDv2SourceResult ParseFDv2PollResponse( MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}}; } - return ParseFDv2PollEvents(*events_arr, protocol_handler, translator, - logger); + return ParseFDv2PollEvents(*events_arr, protocol_handler, logger); } data_interfaces::FDv2SourceResult HandleFDv2PollResponse( @@ -184,9 +181,7 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( false}}; } - FDv2ChangeSetTranslator translator; - auto result = - ParseFDv2PollResponse(*body, protocol_handler, translator, logger); + auto result = ParseFDv2PollResponse(*body, protocol_handler, logger); if (auto* interrupted = std::get_if(&result.value)) { if (interrupted->error.Kind() == ErrorKind::kErrorResponse) { diff --git a/libs/server-sdk/tests/fdv2_changeset_translator_test.cpp b/libs/server-sdk/tests/fdv2_changeset_translation_test.cpp similarity index 79% rename from libs/server-sdk/tests/fdv2_changeset_translator_test.cpp rename to libs/server-sdk/tests/fdv2_changeset_translation_test.cpp index a5111ce39..f64bc359a 100644 --- a/libs/server-sdk/tests/fdv2_changeset_translator_test.cpp +++ b/libs/server-sdk/tests/fdv2_changeset_translation_test.cpp @@ -1,6 +1,6 @@ #include -#include +#include #include #include @@ -34,12 +34,11 @@ static Logger MakeNullLogger() { // kNone changeset // ============================================================================ -TEST(FDv2ChangeSetTranslatorTest, NoneChangeSetProducesEmptyTypedChangeSet) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, NoneChangeSetProducesEmptyTypedChangeSet) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ChangeSetType::kNone, {}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); EXPECT_EQ(result->type, ChangeSetType::kNone); @@ -50,15 +49,14 @@ TEST(FDv2ChangeSetTranslatorTest, NoneChangeSetProducesEmptyTypedChangeSet) { // Known kinds — put // ============================================================================ -TEST(FDv2ChangeSetTranslatorTest, PutFlagProducesTypedFlag) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, PutFlagProducesTypedFlag) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ChangeSetType::kFull, {FDv2Change{FDv2Change::ChangeType::kPut, "flag", "my-flag", 1, boost::json::parse(kFlagJson)}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); ASSERT_EQ(result->data.size(), 1u); @@ -67,8 +65,7 @@ TEST(FDv2ChangeSetTranslatorTest, PutFlagProducesTypedFlag) { std::holds_alternative>(result->data[0].object)); } -TEST(FDv2ChangeSetTranslatorTest, PutSegmentProducesTypedSegment) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, PutSegmentProducesTypedSegment) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ @@ -76,7 +73,7 @@ TEST(FDv2ChangeSetTranslatorTest, PutSegmentProducesTypedSegment) { {FDv2Change{FDv2Change::ChangeType::kPut, "segment", "my-seg", 2, boost::json::parse(kSegmentJson)}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); ASSERT_EQ(result->data.size(), 1u); @@ -89,15 +86,14 @@ TEST(FDv2ChangeSetTranslatorTest, PutSegmentProducesTypedSegment) { // Known kinds — delete // ============================================================================ -TEST(FDv2ChangeSetTranslatorTest, DeleteFlagProducesFlagTombstone) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, DeleteFlagProducesFlagTombstone) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ ChangeSetType::kPartial, {FDv2Change{FDv2Change::ChangeType::kDelete, "flag", "my-flag", 5, {}}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); ASSERT_EQ(result->data.size(), 1u); @@ -109,8 +105,7 @@ TEST(FDv2ChangeSetTranslatorTest, DeleteFlagProducesFlagTombstone) { EXPECT_FALSE(desc->item.has_value()); } -TEST(FDv2ChangeSetTranslatorTest, DeleteSegmentProducesSegmentTombstone) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, DeleteSegmentProducesSegmentTombstone) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ @@ -118,7 +113,7 @@ TEST(FDv2ChangeSetTranslatorTest, DeleteSegmentProducesSegmentTombstone) { {FDv2Change{ FDv2Change::ChangeType::kDelete, "segment", "my-seg", 3, {}}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); ASSERT_EQ(result->data.size(), 1u); @@ -133,8 +128,7 @@ TEST(FDv2ChangeSetTranslatorTest, DeleteSegmentProducesSegmentTombstone) { // Unknown kind — skipped // ============================================================================ -TEST(FDv2ChangeSetTranslatorTest, UnknownKindInPutIsSkipped) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, UnknownKindInPutIsSkipped) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ @@ -144,15 +138,14 @@ TEST(FDv2ChangeSetTranslatorTest, UnknownKindInPutIsSkipped) { FDv2Change{FDv2Change::ChangeType::kPut, "flag", "my-flag", 1, boost::json::parse(kFlagJson)}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); ASSERT_EQ(result->data.size(), 1u); EXPECT_EQ(result->data[0].key, "my-flag"); } -TEST(FDv2ChangeSetTranslatorTest, UnknownKindInDeleteIsSkipped) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, UnknownKindInDeleteIsSkipped) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ @@ -161,7 +154,7 @@ TEST(FDv2ChangeSetTranslatorTest, UnknownKindInDeleteIsSkipped) { FDv2Change::ChangeType::kDelete, "experiment", "exp-1", 1, {}}, FDv2Change{FDv2Change::ChangeType::kDelete, "flag", "my-flag", 2, {}}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); ASSERT_EQ(result->data.size(), 1u); @@ -172,15 +165,14 @@ TEST(FDv2ChangeSetTranslatorTest, UnknownKindInDeleteIsSkipped) { // Null object on put — skipped // ============================================================================ -TEST(FDv2ChangeSetTranslatorTest, NullObjectInPutFlagIsSkipped) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, NullObjectInPutFlagIsSkipped) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ChangeSetType::kFull, {FDv2Change{FDv2Change::ChangeType::kPut, "flag", "my-flag", 1, boost::json::value{nullptr}}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); EXPECT_TRUE(result->data.empty()); @@ -190,21 +182,19 @@ TEST(FDv2ChangeSetTranslatorTest, NullObjectInPutFlagIsSkipped) { // Deserialization failure — abort // ============================================================================ -TEST(FDv2ChangeSetTranslatorTest, MalformedFlagAbortsTranslation) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, MalformedFlagAbortsTranslation) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ChangeSetType::kFull, {FDv2Change{FDv2Change::ChangeType::kPut, "flag", "bad-flag", 1, boost::json::parse(R"({})")}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); EXPECT_FALSE(result.has_value()); } -TEST(FDv2ChangeSetTranslatorTest, MalformedSegmentAbortsTranslation) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, MalformedSegmentAbortsTranslation) { auto logger = MakeNullLogger(); // A non-empty object missing required fields triggers a schema failure @@ -214,7 +204,7 @@ TEST(FDv2ChangeSetTranslatorTest, MalformedSegmentAbortsTranslation) { {FDv2Change{FDv2Change::ChangeType::kPut, "segment", "bad-seg", 1, boost::json::parse(R"({"key":"bad-seg"})")}}, Selector{}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); EXPECT_FALSE(result.has_value()); } @@ -223,13 +213,12 @@ TEST(FDv2ChangeSetTranslatorTest, MalformedSegmentAbortsTranslation) { // Selector is preserved // ============================================================================ -TEST(FDv2ChangeSetTranslatorTest, SelectorIsPreserved) { - FDv2ChangeSetTranslator translator; +TEST(FDv2ChangeSetTranslationTest, SelectorIsPreserved) { auto logger = MakeNullLogger(); FDv2ChangeSet raw{ ChangeSetType::kFull, {}, Selector{Selector::State{7, "state-abc"}}}; - auto result = translator.Translate(raw, logger); + auto result = TranslateChangeSet(raw, logger); ASSERT_TRUE(result.has_value()); ASSERT_TRUE(result->selector.value.has_value()); From 09b2ef44982f9c6cef7e555bd61a28407bee7476 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Wed, 22 Apr 2026 10:24:08 -0700 Subject: [PATCH 3/9] refactor: split up a long function into a few smaller ones --- .../fdv2/fdv2_changeset_translation.cpp | 110 +++++++++++------- 1 file changed, 66 insertions(+), 44 deletions(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp index 8e138ced4..639215a1b 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp @@ -17,6 +17,68 @@ using data_model::ChangeSet; using data_model::ChangeSetType; using data_model::FDv2ChangeSet; +static std::optional TranslateDelete( + data_model::FDv2Change const& change, + Logger const& logger) { + if (change.kind == "flag") { + return ItemChange{change.key, + data_model::ItemDescriptor{ + data_model::Tombstone{change.version}}}; + } + if (change.kind == "segment") { + return ItemChange{change.key, + data_model::ItemDescriptor{ + data_model::Tombstone{change.version}}}; + } + LD_LOG(logger, LogLevel::kWarn) << "FDv2: unknown kind '" << change.kind + << "' in delete-object, skipping"; + return std::nullopt; +} + +static bool TranslatePutFlag(data_model::FDv2Change const& change, + ChangeSetData* changes, + Logger const& logger) { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + change.object); + if (!result) { + LD_LOG(logger, LogLevel::kError) + << "FDv2: could not deserialize flag '" << change.key << "'"; + return false; + } + if (!result->has_value()) { + LD_LOG(logger, LogLevel::kWarn) + << "FDv2: flag '" << change.key << "' object was null, skipping"; + return true; + } + changes->push_back(ItemChange{ + change.key, + data_model::ItemDescriptor{std::move(**result)}}); + return true; +} + +static bool TranslatePutSegment(data_model::FDv2Change const& change, + ChangeSetData* changes, + Logger const& logger) { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + change.object); + if (!result) { + LD_LOG(logger, LogLevel::kError) + << "FDv2: could not deserialize segment '" << change.key << "'"; + return false; + } + if (!result->has_value()) { + LD_LOG(logger, LogLevel::kWarn) + << "FDv2: segment '" << change.key << "' object was null, skipping"; + return true; + } + changes->push_back(ItemChange{ + change.key, + data_model::ItemDescriptor{std::move(**result)}}); + return true; +} + std::optional> TranslateChangeSet( FDv2ChangeSet const& change_set, Logger const& logger) { @@ -30,58 +92,18 @@ std::optional> TranslateChangeSet( for (auto const& change : change_set.changes) { if (change.change_type == data_model::FDv2Change::ChangeType::kDelete) { - if (change.kind == "flag") { - changes.push_back(ItemChange{ - change.key, data_model::ItemDescriptor{ - data_model::Tombstone{change.version}}}); - } else if (change.kind == "segment") { - changes.push_back(ItemChange{ - change.key, data_model::ItemDescriptor{ - data_model::Tombstone{change.version}}}); - } else { - LD_LOG(logger, LogLevel::kWarn) - << "FDv2: unknown kind '" << change.kind - << "' in delete-object, skipping"; + if (auto item = TranslateDelete(change, logger)) { + changes.push_back(std::move(*item)); } } else { if (change.kind == "flag") { - auto result = boost::json::value_to< - tl::expected, JsonError>>( - change.object); - if (!result) { - LD_LOG(logger, LogLevel::kError) - << "FDv2: could not deserialize flag '" << change.key - << "'"; + if (!TranslatePutFlag(change, &changes, logger)) { return std::nullopt; } - if (!result->has_value()) { - LD_LOG(logger, LogLevel::kWarn) - << "FDv2: flag '" << change.key - << "' object was null, skipping"; - continue; - } - changes.push_back(ItemChange{ - change.key, data_model::ItemDescriptor{ - std::move(**result)}}); } else if (change.kind == "segment") { - auto result = boost::json::value_to, JsonError>>( - change.object); - if (!result) { - LD_LOG(logger, LogLevel::kError) - << "FDv2: could not deserialize segment '" << change.key - << "'"; + if (!TranslatePutSegment(change, &changes, logger)) { return std::nullopt; } - if (!result->has_value()) { - LD_LOG(logger, LogLevel::kWarn) - << "FDv2: segment '" << change.key - << "' object was null, skipping"; - continue; - } - changes.push_back(ItemChange{ - change.key, data_model::ItemDescriptor{ - std::move(**result)}}); } else { LD_LOG(logger, LogLevel::kWarn) << "FDv2: unknown kind '" << change.kind From 309eb037ead8c5ec523a533afd9f3d50ee1cc241 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Wed, 22 Apr 2026 14:18:12 -0700 Subject: [PATCH 4/9] refactor: restore original parameter order --- .../include/launchdarkly/data_model/change_set.hpp | 2 +- .../fdv2/fdv2_changeset_translation.cpp | 6 +++--- .../src/data_systems/fdv2/fdv2_polling_impl.cpp | 2 +- libs/server-sdk/tests/memory_store_apply_test.cpp | 14 +++++++------- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/libs/internal/include/launchdarkly/data_model/change_set.hpp b/libs/internal/include/launchdarkly/data_model/change_set.hpp index d720e39bc..972c24f4e 100644 --- a/libs/internal/include/launchdarkly/data_model/change_set.hpp +++ b/libs/internal/include/launchdarkly/data_model/change_set.hpp @@ -13,8 +13,8 @@ enum class ChangeSetType { template struct ChangeSet { ChangeSetType type; - Selector selector; T data; + Selector selector; }; } // namespace launchdarkly::data_model diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp index 639215a1b..6e7b700b2 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_changeset_translation.cpp @@ -84,7 +84,7 @@ std::optional> TranslateChangeSet( Logger const& logger) { if (change_set.type == ChangeSetType::kNone) { return ChangeSet{ - change_set.type, change_set.selector, {}}; + change_set.type, {}, change_set.selector}; } ChangeSetData changes; @@ -112,8 +112,8 @@ std::optional> TranslateChangeSet( } } - return ChangeSet{change_set.type, change_set.selector, - std::move(changes)}; + return ChangeSet{change_set.type, std::move(changes), + change_set.selector}; } } // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp index 4deb66d70..54b9dc867 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp @@ -168,7 +168,7 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( if (res.Status() == 304) { return FDv2SourceResult{FDv2SourceResult::ChangeSet{ data_model::ChangeSet{ - data_model::ChangeSetType::kNone, data_model::Selector{}, {}}, + data_model::ChangeSetType::kNone, {}, data_model::Selector{}}, false}}; } diff --git a/libs/server-sdk/tests/memory_store_apply_test.cpp b/libs/server-sdk/tests/memory_store_apply_test.cpp index 00ecf62b4..e853f9808 100644 --- a/libs/server-sdk/tests/memory_store_apply_test.cpp +++ b/libs/server-sdk/tests/memory_store_apply_test.cpp @@ -31,7 +31,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { {"segA", SegmentDescriptor(seg_a)}}, }); - store.Apply(ChangeSet{ChangeSetType::kNone, Selector{}, {}}); + store.Apply(ChangeSet{ChangeSetType::kNone, {}, Selector{}}); auto fetched_flag = store.GetFlag("flagA"); ASSERT_TRUE(fetched_flag); @@ -43,7 +43,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_IsNoOp) { TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { MemoryStore store; - store.Apply(ChangeSet{ChangeSetType::kNone, Selector{}, {}}); + store.Apply(ChangeSet{ChangeSetType::kNone, {}, Selector{}}); EXPECT_FALSE(store.Initialized()); } @@ -54,7 +54,7 @@ TEST(MemoryStoreApplyTest, ApplyNone_DoesNotInitialize) { TEST(MemoryStoreApplyTest, ApplyFull_SetsInitialized) { MemoryStore store; ASSERT_FALSE(store.Initialized()); - store.Apply(ChangeSet{ChangeSetType::kFull, Selector{}, {}}); + store.Apply(ChangeSet{ChangeSetType::kFull, {}, Selector{}}); EXPECT_TRUE(store.Initialized()); } @@ -70,9 +70,9 @@ TEST(MemoryStoreApplyTest, ApplyFull_StoresItems) { store.Apply(ChangeSet{ ChangeSetType::kFull, - Selector{}, ChangeSetData{ItemChange{"flagA", FlagDescriptor(flag_a)}, ItemChange{"segA", SegmentDescriptor(seg_a)}}, + Selector{}, }); auto fetched_flag = store.GetFlag("flagA"); @@ -120,9 +120,9 @@ TEST(MemoryStoreApplyTest, ApplyFull_ClearsExistingItems) { store.Apply(ChangeSet{ ChangeSetType::kFull, - Selector{}, ChangeSetData{ItemChange{"flagC", FlagDescriptor(flag_c)}, ItemChange{"segB", SegmentDescriptor(seg_b)}}, + Selector{}, }); EXPECT_FALSE(store.GetFlag("flagA")); @@ -163,9 +163,9 @@ TEST(MemoryStoreApplyTest, ApplyPartial_AppliesItems) { store.Apply(ChangeSet{ ChangeSetType::kPartial, - Selector{}, ChangeSetData{ItemChange{"flagA", FlagDescriptor(flag_a_new)}, ItemChange{"segA", SegmentDescriptor(seg_a_new)}}, + Selector{}, }); ASSERT_TRUE(store.GetFlag("flagA")); @@ -211,9 +211,9 @@ TEST(MemoryStoreApplyTest, ApplyPartial_PreservesUnchangedItems) { store.Apply(ChangeSet{ ChangeSetType::kPartial, - Selector{}, ChangeSetData{ItemChange{"flagB", FlagDescriptor(flag_b_new)}, ItemChange{"segB", SegmentDescriptor(seg_b_new)}}, + Selector{}, }); ASSERT_TRUE(store.GetFlag("flagA")); From 5b93349c52c1546d6599cc06c6435209c8bbd176 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Wed, 22 Apr 2026 16:47:56 -0700 Subject: [PATCH 5/9] fix: fix the build on linux --- libs/server-sdk/tests/fdv2_changeset_translation_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/server-sdk/tests/fdv2_changeset_translation_test.cpp b/libs/server-sdk/tests/fdv2_changeset_translation_test.cpp index f64bc359a..bc8a086f2 100644 --- a/libs/server-sdk/tests/fdv2_changeset_translation_test.cpp +++ b/libs/server-sdk/tests/fdv2_changeset_translation_test.cpp @@ -170,7 +170,7 @@ TEST(FDv2ChangeSetTranslationTest, NullObjectInPutFlagIsSkipped) { FDv2ChangeSet raw{ChangeSetType::kFull, {FDv2Change{FDv2Change::ChangeType::kPut, "flag", - "my-flag", 1, boost::json::value{nullptr}}}, + "my-flag", 1, boost::json::value(nullptr)}}, Selector{}}; auto result = TranslateChangeSet(raw, logger); From 79fc3c8fcbe6cd304b6275a9ac5c5fe4c376da63 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 28 Apr 2026 13:46:32 -0700 Subject: [PATCH 6/9] refactor: implement server FDv2 streaming synchronizer --- libs/server-sdk/src/CMakeLists.txt | 2 + .../fdv2/streaming_synchronizer.cpp | 399 ++++++++++++ .../fdv2/streaming_synchronizer.hpp | 176 ++++++ .../fdv2_streaming_synchronizer_test.cpp | 574 ++++++++++++++++++ 4 files changed, 1151 insertions(+) create mode 100644 libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp create mode 100644 libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index f8fac7fb3..7f27984a4 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -57,6 +57,8 @@ target_sources(${LIBNAME} data_systems/fdv2/polling_initializer.cpp data_systems/fdv2/polling_synchronizer.hpp data_systems/fdv2/polling_synchronizer.cpp + data_systems/fdv2/streaming_synchronizer.hpp + data_systems/fdv2/streaming_synchronizer.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp data_systems/background_sync/sources/streaming/streaming_data_source.cpp data_systems/background_sync/sources/streaming/event_handler.hpp diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp new file mode 100644 index 000000000..0e7004aab --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -0,0 +1,399 @@ +#include "streaming_synchronizer.hpp" +#include "fdv2_changeset_translation.hpp" + +#include + +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kIdentity = "FDv2 streaming synchronizer"; + +// Maximum time between bytes read from the stream before the SSE client +// declares the connection dead and reconnects. Must be greater than the +// streaming service's heartbeat interval. Hardcoded rather than read from +// HttpProperties, whose default ReadTimeout is sized for one-shot HTTP +// requests and would cause spurious disconnects on a long-lived stream. +static constexpr std::chrono::minutes kDeadConnectionInterval{5}; + +using data_interfaces::FDv2SourceResult; +using ErrorInfo = FDv2SourceResult::ErrorInfo; +using ErrorKind = ErrorInfo::ErrorKind; + +static ErrorInfo MakeError(ErrorKind kind, + ErrorInfo::StatusCodeType status, + std::string message) { + return ErrorInfo{kind, status, std::move(message), + std::chrono::system_clock::now()}; +} + +template +inline constexpr bool always_false_v = false; + +FDv2StreamingSynchronizer::State::State( + Logger logger, + boost::asio::any_io_executor const& executor, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::milliseconds initial_reconnect_delay) + : logger_(std::move(logger)), + endpoints_(endpoints), + http_properties_(http_properties), + filter_key_(std::move(filter_key)), + initial_reconnect_delay_(initial_reconnect_delay), + executor_(executor) {} + +void FDv2StreamingSynchronizer::State::EnsureStarted( + data_model::Selector const& selector, + std::shared_ptr self) { + { + std::lock_guard lock(mutex_); + latest_selector_ = selector; + if (closed_ || started_) { + return; + } + started_ = true; + } + + auto parsed = boost::urls::parse_uri(endpoints_.StreamingBaseUrl()); + if (!parsed) { + // started_ intentionally left true: a bad endpoint URL is a + // configuration error that won't fix itself. The TerminalError + // result tells the orchestrator to stop retrying this synchronizer. + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": could not parse streaming endpoint URL"; + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kNetworkError, 0, + "could not parse streaming endpoint URL"), + false}}); + return; + } + + boost::urls::url u = parsed.value(); + + // Safer way of appending /sdk/stream than string concatenation: avoids + // double slashes if the base URL has a trailing slash. TODO: add a test. + u.segments().push_back("sdk"); + u.segments().push_back("stream"); + + // basis and filter are not added here — they are appended per-connect by + // the on_connect hook (OnConnect), so that each (re)connection uses the + // freshest selector. + { + std::lock_guard lock(mutex_); + base_url_ = u; + } + + auto client_builder = sse::Builder(executor_, std::string(u.buffer())); + + client_builder.method(boost::beast::http::verb::get); + client_builder.read_timeout(kDeadConnectionInterval); + client_builder.write_timeout(http_properties_.WriteTimeout()); + client_builder.connect_timeout(http_properties_.ConnectTimeout()); + client_builder.initial_reconnect_delay(initial_reconnect_delay_); + + for (auto const& [key, value] : http_properties_.BaseHeaders()) { + client_builder.header(key, value); + } + if (http_properties_.Tls().PeerVerifyMode() == + launchdarkly::config::shared::built::TlsOptions::VerifyMode:: + kVerifyNone) { + client_builder.skip_verify_peer(true); + } + if (auto ca_file = http_properties_.Tls().CustomCAFile()) { + client_builder.custom_ca_file(*ca_file); + } + if (auto proxy_url = http_properties_.Proxy().Url()) { + client_builder.proxy(*proxy_url); + } + + std::weak_ptr weak = self; + client_builder.on_connect([weak](HttpRequest* req) { + if (auto s = weak.lock()) { + s->OnConnect(req); + } + }); + client_builder.receiver([weak](sse::Event const& event) { + if (auto s = weak.lock()) { + s->OnEvent(event); + } + }); + client_builder.logger([weak](std::string msg) { + if (auto s = weak.lock()) { + LD_LOG(s->logger_, LogLevel::kDebug) << msg; + } + }); + client_builder.errors([weak](sse::Error error) { + if (auto s = weak.lock()) { + s->OnError(error); + } + }); + + auto client = client_builder.build(); + if (!client) { + // started_ intentionally left true: same reasoning as above. + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": could not build SSE client"; + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kNetworkError, 0, + "could not build SSE client"), + false}}); + return; + } + + // Publish-and-connect atomically with respect to Shutdown(). If Shutdown + // ran while we were building, drop the client on the floor — async_connect + // was never called, so there is nothing to clean up. + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + sse_client_ = client; + client->async_connect(); +} + +void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) { + std::lock_guard lock(mutex_); + if (!base_url_) { + return; + } + boost::urls::url u = *base_url_; + if (latest_selector_.value) { + u.params().set("basis", latest_selector_.value->state); + } + if (filter_key_) { + u.params().set("filter", *filter_key_); + } + req->target(u.encoded_target()); +} + +void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { + boost::system::error_code ec; + auto data = boost::json::parse(event.data(), ec); + if (ec) { + protocol_handler_.Reset(); + std::string msg = "could not parse FDv2 streaming event payload"; + LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, std::move(msg)), false}}); + return; + } + + auto result = protocol_handler_.HandleEvent(event.type(), data); + + std::visit( + [this](auto const& r) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + // Accumulating, heartbeat, or unknown event — nothing to do. + } else if constexpr (std::is_same_v) { + auto typed = TranslateChangeSet(r, logger_); + if (!typed) { + std::string msg = + "FDv2 streaming changeset could not be translated"; + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": " << msg; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, std::move(msg)), + false}}); + return; + } + Notify(FDv2SourceResult{ + FDv2SourceResult::ChangeSet{std::move(*typed), false}}); + } else if constexpr (std::is_same_v) { + LD_LOG(logger_, LogLevel::kInfo) + << kIdentity + << ": Goodbye was received from the LaunchDarkly " + "connection with reason: '" + << r.reason.value_or("") << "'."; + Notify(FDv2SourceResult{ + FDv2SourceResult::Goodbye{r.reason, false}}); + } else if constexpr (std::is_same_v) { + if (r.kind == FDv2ProtocolHandler::Error::Kind::kServerError) { + auto const& id = r.server_error.value().id; + std::string msg = + "An issue was encountered receiving updates for " + "payload '" + + id.value_or("") + "' with reason: '" + r.message + + "'. Automatic retry will occur."; + LD_LOG(logger_, LogLevel::kInfo) + << kIdentity << ": " << msg; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)), + false}}); + return; + } + LD_LOG(logger_, LogLevel::kError) + << kIdentity << ": " << r.message; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, r.message), false}}); + } else { + static_assert(always_false_v, "non-exhaustive visitor"); + } + }, + result); +} + +void FDv2StreamingSynchronizer::State::OnError(sse::Error const& error) { + protocol_handler_.Reset(); + + std::string msg = sse::ErrorToString(error); + + if (sse::IsRecoverable(error)) { + LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg; + Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}}); + return; + } + + LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; + + std::visit( + [this, &msg](auto const& e) { + using T = std::decay_t; + if constexpr (std::is_same_v< + T, sse::errors::UnrecoverableClientError>) { + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kErrorResponse, + static_cast(e.status), + std::move(msg)), + false}}); + } else if constexpr (std::is_same_v< + T, sse::errors::InvalidRedirectLocation> || + std::is_same_v || + std::is_same_v) { + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), + false}}); + } else { + static_assert(always_false_v, "non-exhaustive visitor"); + } + }, + error); +} + +void FDv2StreamingSynchronizer::State::Notify(FDv2SourceResult result) { + std::optional> promise; + { + std::lock_guard lock(mutex_); + if (pending_promise_) { + promise = std::move(pending_promise_); + pending_promise_.reset(); + } else { + result_queue_.push_back(std::move(result)); + return; + } + } + // Resolve outside the lock — Promise::Resolve may invoke inline + // continuations that could call back into Notify or Next. + promise->Resolve(std::move(result)); +} + +async::Future FDv2StreamingSynchronizer::State::Next( + data_model::Selector const& selector, + std::shared_ptr self) { + EnsureStarted(selector, std::move(self)); + + std::lock_guard lock(mutex_); + if (!result_queue_.empty()) { + auto result = std::move(result_queue_.front()); + result_queue_.pop_front(); + return async::MakeFuture(std::move(result)); + } + return pending_promise_.emplace().GetFuture(); +} + +void FDv2StreamingSynchronizer::State::ClearPendingPromise() { + std::lock_guard lock(mutex_); + pending_promise_.reset(); +} + +void FDv2StreamingSynchronizer::State::Shutdown() { + std::shared_ptr client; + { + std::lock_guard lock(mutex_); + closed_ = true; + client = std::exchange(sse_client_, nullptr); + } + if (client) { + client->async_shutdown([] {}); + } +} + +async::Future FDv2StreamingSynchronizer::State::Delay( + std::chrono::milliseconds duration, + async::CancellationToken token) { + return async::Delay(executor_, duration, std::move(token)); +} + +FDv2StreamingSynchronizer::FDv2StreamingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::milliseconds initial_reconnect_delay) + : state_(std::make_shared(logger, + executor, + endpoints, + http_properties, + std::move(filter_key), + initial_reconnect_delay)) {} + +FDv2StreamingSynchronizer::~FDv2StreamingSynchronizer() { + Close(); +} + +async::Future FDv2StreamingSynchronizer::Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) { + auto closed = close_promise_.GetFuture(); + if (closed.IsFinished()) { + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + + auto result_future = state_->Next(selector, state_); + if (result_future.IsFinished()) { + return result_future; + } + + async::CancellationSource cancel; + auto timeout_future = state_->Delay(timeout, cancel.GetToken()); + + return async::WhenAny(closed, timeout_future, result_future) + .Then( + [state = state_, cancel = std::move(cancel), result_future]( + std::size_t const& idx) mutable -> FDv2SourceResult { + cancel.Cancel(); + if (idx == 0) { + state->ClearPendingPromise(); + return FDv2SourceResult{FDv2SourceResult::Shutdown{}}; + } + if (idx == 1) { + state->ClearPendingPromise(); + return FDv2SourceResult{FDv2SourceResult::Timeout{}}; + } + return *result_future.GetResult(); + }, + async::kInlineExecutor); +} + +void FDv2StreamingSynchronizer::Close() { + if (!close_promise_.Resolve(std::monostate{})) { + return; + } + state_->Shutdown(); +} + +std::string const& FDv2StreamingSynchronizer::Identity() const { + static std::string const identity = kIdentity; + return identity; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp new file mode 100644 index 000000000..e5686f9bb --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp @@ -0,0 +1,176 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_synchronizer.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +class FDv2StreamingSynchronizerTestPeer; + +/** + * FDv2 streaming synchronizer. Opens an SSE connection to the FDv2 streaming + * endpoint and translates the push-based event stream into the pull-based + * IFDv2Synchronizer::Next() interface. + * + * Threading model: + * Next() should only be called once at a time. + * Close() may be called concurrently with Next(). + * This object may be safely destroyed once no call to Next() or Close() + * is in progress. + */ +class FDv2StreamingSynchronizer final + : public data_interfaces::IFDv2Synchronizer { + friend class FDv2StreamingSynchronizerTestPeer; + + public: + /** + * Constructs a synchronizer that streams from the FDv2 streaming endpoint. + * If filter_key is present, only the specified payload filter is requested. + */ + FDv2StreamingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::milliseconds initial_reconnect_delay); + + ~FDv2StreamingSynchronizer() override; + + async::Future Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) override; + + void Close() override; + + [[nodiscard]] std::string const& Identity() const override; + + private: + // Any state that may be accessed by async SSE callbacks needs to be inside + // this class, managed by a shared_ptr. All mutable members are guarded by + // the mutex. + class State { + friend class FDv2StreamingSynchronizerTestPeer; + + public: + State(Logger logger, + boost::asio::any_io_executor const& executor, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::milliseconds initial_reconnect_delay); + + /** + * Updates the stored selector, starts the SSE client if not already + * running, and returns a Future resolving with the next result. + * + * If a buffered result is already available, the Future is resolved + * immediately. Otherwise it resolves when the next event arrives. + * + * The self parameter must be the shared_ptr that owns this State, + * used to form weak_ptr captures in SSE callbacks. + */ + async::Future Next( + data_model::Selector const& selector, + std::shared_ptr self); + + /** + * Cancels an outstanding Next() call without delivering a result. + * Any result that subsequently arrives will be buffered for the next + * Next() call. + */ + void ClearPendingPromise(); + + /** + * Marks the State as closed and shuts down the SSE client if one has + * been built. After Shutdown returns, EnsureStarted is guaranteed not + * to start a new client. Idempotent. + */ + void Shutdown(); + + /** + * Returns a Future that resolves after the given duration. Resolves + * early with false if the token is cancelled before the duration + * elapses. + */ + async::Future Delay(std::chrono::milliseconds duration, + async::CancellationToken token = {}); + + private: + using HttpRequest = + boost::beast::http::request; + + /** + * Starts the SSE client if not already running, and updates the stored + * selector so that the on-connect hook will use it for the next + * connection attempt. + */ + void EnsureStarted(data_model::Selector const& selector, + std::shared_ptr self); + + /** + * Delivers a result to the caller of Next(), or buffers it if no + * caller is waiting. + */ + void Notify(data_interfaces::FDv2SourceResult result); + + /** + * Per-connect hook for the SSE client. Overwrites the request target + * with the streaming path plus query parameters built from the latest + * stored selector and the (immutable) filter key. + */ + void OnConnect(HttpRequest* req); + + void OnEvent(sse::Event const& event); + void OnError(sse::Error const& error); + + // Logger is itself thread-safe. + Logger logger_; + + // Immutable state + config::built::ServiceEndpoints const endpoints_; + config::built::HttpProperties const http_properties_; + std::optional const filter_key_; + std::chrono::milliseconds const initial_reconnect_delay_; + boost::asio::any_io_executor const executor_; + + // Touched only from SSE callbacks, which all run on the same strand. + // No lock required. + FDv2ProtocolHandler protocol_handler_; + + // Mutable state, all guarded by mutex_. + std::mutex mutex_; + bool started_ = false; + bool closed_ = false; + data_model::Selector latest_selector_; + std::optional base_url_; + std::shared_ptr sse_client_; + std::optional> + pending_promise_; + std::deque result_queue_; + }; + + // Resolved by Close() or on destruction, cancelling any outstanding Next(). + async::Promise close_promise_; + + // Shared with async SSE callbacks. + std::shared_ptr state_; +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp new file mode 100644 index 000000000..896fcf221 --- /dev/null +++ b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp @@ -0,0 +1,574 @@ +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +// Test peer providing access to private State internals via the +// friend declaration in streaming_synchronizer.hpp. Tests drive the State's +// per-event / per-error / per-connect entry points directly so that no real +// SSE connection is required. +class FDv2StreamingSynchronizerTestPeer { + public: + static void OnEvent(FDv2StreamingSynchronizer& sync, + sse::Event const& event) { + sync.state_->OnEvent(event); + } + static void OnError(FDv2StreamingSynchronizer& sync, + sse::Error const& error) { + sync.state_->OnError(error); + } + static void OnConnect( + FDv2StreamingSynchronizer& sync, + boost::beast::http::request* req) { + sync.state_->OnConnect(req); + } + static void MarkStarted(FDv2StreamingSynchronizer& sync) { + std::lock_guard lock(sync.state_->mutex_); + sync.state_->started_ = true; + } + static void SetBaseUrl(FDv2StreamingSynchronizer& sync, + boost::urls::url url) { + std::lock_guard lock(sync.state_->mutex_); + sync.state_->base_url_ = std::move(url); + } + static void SetLatestSelector(FDv2StreamingSynchronizer& sync, + data_model::Selector selector) { + std::lock_guard lock(sync.state_->mutex_); + sync.state_->latest_selector_ = std::move(selector); + } +}; + +} // namespace launchdarkly::server_side::data_systems + +using namespace launchdarkly; +using namespace launchdarkly::server_side::data_interfaces; +using namespace launchdarkly::server_side::data_systems; +using namespace std::chrono_literals; + +namespace { + +Logger MakeNullLogger() { + struct NullBackend : ILogBackend { + bool Enabled(LogLevel) noexcept override { return false; } + void Write(LogLevel, std::string) noexcept override {} + }; + return Logger{std::make_shared()}; +} + +class IoContextRunner { + public: + IoContextRunner() : work_guard_(boost::asio::make_work_guard(ioc_)) { + thread_ = std::thread([this] { ioc_.run(); }); + } + ~IoContextRunner() { + work_guard_.reset(); + ioc_.stop(); + if (thread_.joinable()) { + thread_.join(); + } + } + boost::asio::io_context& context() { return ioc_; } + + private: + boost::asio::io_context ioc_; + boost::asio::executor_work_guard + work_guard_; + std::thread thread_; +}; + +config::shared::built::ServiceEndpoints MakeEndpoints(std::string streaming) { + return config::shared::built::ServiceEndpoints( + "polling", std::move(streaming), "events"); +} + +config::shared::built::HttpProperties MakeHttpProperties() { + return launchdarkly::server_side::config::builders::HttpPropertiesBuilder() + .Build(); +} + +} // namespace + +// ============================================================================ +// Lifecycle +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, NextBadEndpointUrlReturnsTerminalError) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, MakeEndpoints("not a url"), + MakeHttpProperties(), std::nullopt, 1s); + + // Act: trigger setup with a malformed streaming URL. URL parsing happens + // inside EnsureStarted on the first Next call. + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: TerminalError tells the orchestrator not to retry, which is the + // correct response to a misconfigured endpoint URL. + ASSERT_TRUE(result.has_value()); + auto* terminal = + std::get_if(&result->value); + ASSERT_NE(terminal, nullptr); + EXPECT_EQ(terminal->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError); +} + +TEST(FDv2StreamingSynchronizerTest, CloseBeforeNextReturnsShutdown) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + synchronizer.Close(); + + // Act: call Next on an already-closed synchronizer. + auto future = synchronizer.Next(5s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: Shutdown is delivered immediately (the outer Next short-circuits + // when close_promise_ is already resolved). + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +TEST(FDv2StreamingSynchronizerTest, CloseDuringPendingNextResolvesShutdown) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + + // Skip the SSE setup; we want Next to be pending purely on the + // close/timeout race, not on real network activity. + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // Act: start a pending Next, then Close while it is still pending. + auto future = synchronizer.Next(5s, data_model::Selector{}); + synchronizer.Close(); + auto result = future.WaitForResult(2s); + + // Assert: Close resolves the pending Next with Shutdown by winning the + // close-vs-timeout-vs-result race in the outer Next. + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +TEST(FDv2StreamingSynchronizerTest, NextTimeoutReturnsTimeout) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // Act: call Next with a short timeout and never deliver any event. + auto future = synchronizer.Next(100ms, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the timeout future wins the race and Next resolves with Timeout. + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +// ============================================================================ +// on_connect hook — URL/target construction +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, OnConnectEmptySelectorNoBasisParam) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::nullopt, 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + + boost::beast::http::request req; + + // Act: invoke the on_connect hook with no selector configured. + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // Assert: no basis query param appears in the target. The path itself is + // /sdk/stream as built from the streaming base URL. + EXPECT_EQ(req.target(), "/sdk/stream"); +} + +TEST(FDv2StreamingSynchronizerTest, OnConnectWithSelectorAppendsBasis) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::nullopt, 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + FDv2StreamingSynchronizerTestPeer::SetLatestSelector( + synchronizer, + data_model::Selector{data_model::Selector::State{1, "abc"}}); + + boost::beast::http::request req; + + // Act: invoke the on_connect hook with a non-empty selector. + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // Assert: the selector's state string is encoded as the basis query param. + EXPECT_EQ(req.target(), "/sdk/stream?basis=abc"); +} + +TEST(FDv2StreamingSynchronizerTest, OnConnectWithFilterKeyAppendsFilter) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::string("my-filter"), 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + + boost::beast::http::request req; + + // Act: invoke the on_connect hook with a configured filter key but no + // selector. + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // Assert: the filter key is encoded as the filter query param. + EXPECT_EQ(req.target(), "/sdk/stream?filter=my-filter"); +} + +TEST(FDv2StreamingSynchronizerTest, OnConnectReconnectUsesLatestSelector) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::nullopt, 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + + FDv2StreamingSynchronizerTestPeer::SetLatestSelector( + synchronizer, + data_model::Selector{data_model::Selector::State{1, "s0"}}); + boost::beast::http::request req1; + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req1); + + FDv2StreamingSynchronizerTestPeer::SetLatestSelector( + synchronizer, + data_model::Selector{data_model::Selector::State{2, "s1"}}); + + boost::beast::http::request req2; + + // Act: invoke the on_connect hook a second time after the stored selector + // has been updated (modeling a reconnect after a payload-transferred event + // has arrived). + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req2); + + // Assert: the second connection's basis reflects the updated selector, + // not the value captured at construction or first connect. + EXPECT_EQ(req1.target(), "/sdk/stream?basis=s0"); + EXPECT_EQ(req2.target(), "/sdk/stream?basis=s1"); +} + +// ============================================================================ +// SSE event translation +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, FullChangesetEventsReturnsChangeSet) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event server_intent("server-intent", + R"({"payloads":[{"id":"p1","target":1,)" + R"("intentCode":"xfer-full"}]})"); + sse::Event put_object( + "put-object", + R"({"version":1,"kind":"flag","key":"my-flag","object":)" + R"({"key":"my-flag","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":1}})"); + sse::Event payload_transferred("payload-transferred", + R"({"state":"abc","version":1})"); + + // Act: drive a full changeset (intent + put + transferred) through OnEvent + // and then read the queued result via the public Next API. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_intent); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, put_object); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, + payload_transferred); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: a ChangeSet result is delivered, and the translated payload + // contains the put item from the put-object event. + ASSERT_TRUE(result.has_value()); + auto* change_set = std::get_if(&result->value); + ASSERT_NE(change_set, nullptr); + EXPECT_FALSE(change_set->change_set.data.empty()); +} + +TEST(FDv2StreamingSynchronizerTest, GoodbyeEventReturnsGoodbye) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event goodbye("goodbye", R"({"reason":"bye"})"); + + // Act: deliver a goodbye event through OnEvent and read the queued result. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, goodbye); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: a Goodbye result with the wire reason is delivered, signaling + // the orchestrator to rotate sources. + ASSERT_TRUE(result.has_value()); + auto* g = std::get_if(&result->value); + ASSERT_NE(g, nullptr); + ASSERT_TRUE(g->reason.has_value()); + EXPECT_EQ(*g->reason, "bye"); +} + +TEST(FDv2StreamingSynchronizerTest, ServerErrorEventReturnsInterrupted) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event server_error("error", R"({"id":"abc","reason":"oops"})"); + + // Act: deliver an FDv2 server-side error event. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_error); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the error is reported as Interrupted{kErrorResponse}, with the + // formatted retry-will-occur message containing both the payload id and + // the server reason. + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(interrupted->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse); + EXPECT_NE(interrupted->error.Message().find("abc"), std::string::npos); + EXPECT_NE(interrupted->error.Message().find("oops"), std::string::npos); + EXPECT_NE(interrupted->error.Message().find("Automatic retry"), + std::string::npos); +} + +TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event bad_event("server-intent", "this is not json"); + + // Act: deliver an event whose data field cannot be parsed as JSON. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, bad_event); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the synchronizer reports Interrupted{kInvalidData} so the + // orchestrator knows the stream produced unparseable bytes. + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(interrupted->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kInvalidData); +} + +TEST(FDv2StreamingSynchronizerTest, HeartbeatEventNoResultDelivered) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Event heartbeat("heartbeat", R"({})"); + + // Act: deliver a heartbeat event, then call Next with a short timeout. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, heartbeat); + auto future = synchronizer.Next(100ms, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the heartbeat does not produce any FDv2SourceResult, so Next + // resolves with Timeout instead. + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +TEST(FDv2StreamingSynchronizerTest, TranslationFailureReturnsInterrupted) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // A non-empty segment object missing required fields triggers a schema + // failure in the translator; matches the segment shape used by + // fdv2_changeset_translation_test's MalformedSegmentAbortsTranslation. + sse::Event server_intent("server-intent", + R"({"payloads":[{"id":"p1","target":1,)" + R"("intentCode":"xfer-full"}]})"); + sse::Event put_object( + "put-object", + R"({"version":1,"kind":"segment","key":"bad-seg","object":)" + R"({"key":"bad-seg"}})"); + sse::Event payload_transferred("payload-transferred", + R"({"state":"abc","version":1})"); + + // Act: drive a full changeset whose put-object payload won't deserialize + // into a Segment. + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_intent); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, put_object); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, + payload_transferred); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the changeset is rejected with Interrupted{kInvalidData}. + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(interrupted->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kInvalidData); +} + +// ============================================================================ +// SSE error handling +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, + UnrecoverableStatus500ReturnsTerminalError) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Error error{sse::errors::UnrecoverableClientError{ + boost::beast::http::status::internal_server_error}}; + + // Act: deliver an unrecoverable HTTP 500 error from the SSE client. + FDv2StreamingSynchronizerTestPeer::OnError(synchronizer, error); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the synchronizer reports TerminalError{kErrorResponse} carrying + // the HTTP status, telling the orchestrator to stop retrying this source. + ASSERT_TRUE(result.has_value()); + auto* terminal = + std::get_if(&result->value); + ASSERT_NE(terminal, nullptr); + EXPECT_EQ(terminal->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse); + EXPECT_EQ(terminal->error.StatusCode(), 500u); +} + +TEST(FDv2StreamingSynchronizerTest, RecoverableReadTimeoutReturnsInterrupted) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + sse::Error error{sse::errors::ReadTimeout{std::chrono::milliseconds(100)}}; + + // Act: deliver a recoverable read-timeout error from the SSE client. + FDv2StreamingSynchronizerTestPeer::OnError(synchronizer, error); + auto future = synchronizer.Next(2s, data_model::Selector{}); + auto result = future.WaitForResult(2s); + + // Assert: the synchronizer reports Interrupted{kNetworkError} so the + // orchestrator treats this as a recoverable blip rather than a terminal + // failure. + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(interrupted->error.Kind(), + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError); +} From 732c162c89205bfe8e38b0e467ac138a5195ebda Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 28 Apr 2026 13:52:05 -0700 Subject: [PATCH 7/9] docs: update a comment --- .../server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp index 0e7004aab..070a71186 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -75,7 +75,7 @@ void FDv2StreamingSynchronizer::State::EnsureStarted( boost::urls::url u = parsed.value(); // Safer way of appending /sdk/stream than string concatenation: avoids - // double slashes if the base URL has a trailing slash. TODO: add a test. + // double slashes if the base URL has a trailing slash. u.segments().push_back("sdk"); u.segments().push_back("stream"); From b5fc6db922b2b06b4541b58d2b718b3f5f21d873 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 28 Apr 2026 17:05:50 -0700 Subject: [PATCH 8/9] fix: fix a crash in flaky tests --- libs/server-sent-events/src/client.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index e5ae7a8b6..81958f849 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -159,13 +159,13 @@ class FoxyClient : public Client, void async_connect() override { boost::asio::post( - session_->get_executor(), + backoff_timer_.get_executor(), beast::bind_front_handler(&FoxyClient::do_run, shared_from_this())); } void async_restart(std::string const& reason) override { boost::asio::post( - session_->get_executor(), + backoff_timer_.get_executor(), beast::bind_front_handler(&FoxyClient::do_restart, shared_from_this(), reason)); } @@ -368,9 +368,10 @@ class FoxyClient : public Client, } void async_shutdown(std::function completion) override { - // Get on the session's executor, otherwise the code in the completion - // handler could race. - boost::asio::post(session_->get_executor(), + // Post onto the strand to avoid races in the completion handler. + // session_ can be replaced by create_session(), so use + // backoff_timer_'s executor (same strand, never replaced) instead. + boost::asio::post(backoff_timer_.get_executor(), beast::bind_front_handler(&FoxyClient::do_shutdown, shared_from_this(), std::move(completion))); From 7de31e8228f5afd092953052356d4f04b8f4926f Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 28 Apr 2026 22:03:21 -0700 Subject: [PATCH 9/9] refactor: minor cleanup --- .../fdv2/streaming_synchronizer.cpp | 44 ++++++++----------- .../fdv2_streaming_synchronizer_test.cpp | 28 ++++++++++++ 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp index 070a71186..894d35846 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -157,9 +157,8 @@ void FDv2StreamingSynchronizer::State::EnsureStarted( void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) { std::lock_guard lock(mutex_); - if (!base_url_) { - return; - } + // base_url_ is guaranteed populated: EnsureStarted publishes it before + // calling async_connect, which is what eventually triggers this hook. boost::urls::url u = *base_url_; if (latest_selector_.value) { u.params().set("basis", latest_selector_.value->state); @@ -252,29 +251,19 @@ void FDv2StreamingSynchronizer::State::OnError(sse::Error const& error) { LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; - std::visit( - [this, &msg](auto const& e) { - using T = std::decay_t; - if constexpr (std::is_same_v< - T, sse::errors::UnrecoverableClientError>) { - Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ - MakeError(ErrorKind::kErrorResponse, - static_cast(e.status), - std::move(msg)), - false}}); - } else if constexpr (std::is_same_v< - T, sse::errors::InvalidRedirectLocation> || - std::is_same_v || - std::is_same_v) { - Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ - MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), - false}}); - } else { - static_assert(always_false_v, "non-exhaustive visitor"); - } - }, - error); + if (auto const* client_error = + std::get_if(&error)) { + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError( + ErrorKind::kErrorResponse, + static_cast(client_error->status), + std::move(msg)), + false}}); + return; + } + + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}}); } void FDv2StreamingSynchronizer::State::Notify(FDv2SourceResult result) { @@ -377,6 +366,9 @@ async::Future FDv2StreamingSynchronizer::Next( } if (idx == 1) { state->ClearPendingPromise(); + if (result_future.IsFinished()) { + return *result_future.GetResult(); + } return FDv2SourceResult{FDv2SourceResult::Timeout{}}; } return *result_future.GetResult(); diff --git a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp index 896fcf221..cb9bdd8f6 100644 --- a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp +++ b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp @@ -319,6 +319,34 @@ TEST(FDv2StreamingSynchronizerTest, OnConnectReconnectUsesLatestSelector) { EXPECT_EQ(req2.target(), "/sdk/stream?basis=s1"); } +TEST(FDv2StreamingSynchronizerTest, OnConnectSelectorStateIsPercentEncoded) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("https://stream.example.com"), MakeHttpProperties(), + std::nullopt, 1s); + + boost::urls::url base = + boost::urls::parse_uri("https://stream.example.com").value(); + base.segments().push_back("sdk"); + base.segments().push_back("stream"); + FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base); + FDv2StreamingSynchronizerTestPeer::SetLatestSelector( + synchronizer, + data_model::Selector{data_model::Selector::State{1, "a&b"}}); + + boost::beast::http::request req; + + // Act: invoke the on_connect hook with a selector state containing '&', + // which would terminate the basis value if left raw. + FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req); + + // Assert: '&' is percent-encoded into the basis query param. + EXPECT_EQ(req.target(), "/sdk/stream?basis=a%26b"); +} + // ============================================================================ // SSE event translation // ============================================================================