diff --git a/libs/internal/include/launchdarkly/async/promise.hpp b/libs/internal/include/launchdarkly/async/promise.hpp index ffdc66b87..198e84fdb 100644 --- a/libs/internal/include/launchdarkly/async/promise.hpp +++ b/libs/internal/include/launchdarkly/async/promise.hpp @@ -437,6 +437,10 @@ class Future { std::shared_ptr> internal_; }; +// An executor that runs work immediately on the calling thread. Pass this +// to Then() when no specific thread is required for the continuation. +inline auto const kInlineExecutor = [](Continuation f) { f(); }; + // WhenAll takes a variadic list of Futures (each with potentially different // value types) and returns a Future that resolves once all // of the input futures have resolved. The result carries no value; callers @@ -528,4 +532,16 @@ Future WhenAny(Future... futures) { return result; } +// MakeFuture returns an already-resolved Future. Useful in flattening Then +// continuations where some branches produce a result immediately and others +// return a Future, requiring a uniform Future return type across all +// branches. +template +Future MakeFuture(T value) { + Promise p; + auto f = p.GetFuture(); + p.Resolve(std::move(value)); + return f; +} + } // namespace launchdarkly::async diff --git a/libs/internal/include/launchdarkly/async/timer.hpp b/libs/internal/include/launchdarkly/async/timer.hpp new file mode 100644 index 000000000..ce62889e0 --- /dev/null +++ b/libs/internal/include/launchdarkly/async/timer.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include +#include + +namespace launchdarkly::async { + +// Returns a Future that resolves once the given duration elapses. +// The future resolves with true if the timer fired normally, or false if +// the timer was cancelled before it expired. +template +Future Delay(boost::asio::any_io_executor executor, + std::chrono::duration duration) { + auto timer = std::make_shared(executor); + timer->expires_after(duration); + Promise promise; + auto future = promise.GetFuture(); + timer->async_wait([p = std::move(promise), + timer](boost::system::error_code code) mutable { + p.Resolve(code != boost::asio::error::operation_aborted); + }); + return future; +} + +} // namespace launchdarkly::async diff --git a/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp b/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp new file mode 100644 index 000000000..11cfa7e4e --- /dev/null +++ b/libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp @@ -0,0 +1,114 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +namespace launchdarkly { + +/** + * Protocol state machine for the FDv2 wire format. + * + * Accumulates put-object and delete-object events between a server-intent + * and payload-transferred event, then emits a complete FDv2ChangeSet. + * + * Shared between the polling and streaming synchronizers. + */ +class FDv2ProtocolHandler { + public: + /** + * Typed error returned by HandleEvent. Carries the original underlying + * error context rather than converting to a plain string. + */ + struct Error { + enum class Kind { + kJsonError, // Failed to deserialise an event's data field. + kProtocolError, // Out-of-order or unexpected event. + kServerError, // Server sent a valid 'error' event. + }; + + Kind kind; + std::string message; + + /** + * Set for kJsonError when the tl::expected parse returned an error. + * Nullopt when parse succeeded but the data value was null. + */ + std::optional json_error; + + /** + * Set for kServerError: the full wire error including id and reason. + */ + std::optional server_error; + + /** JSON deserialisation failed — carries the original JsonError. */ + static Error JsonParseError(JsonError err, std::string msg) { + return {Kind::kJsonError, std::move(msg), err, std::nullopt}; + } + /** Parse succeeded but data was null — no underlying JsonError. */ + static Error JsonParseError(std::string msg) { + return {Kind::kJsonError, std::move(msg), std::nullopt, + std::nullopt}; + } + /** Out-of-order or unexpected protocol event. */ + static Error ProtocolError(std::string msg) { + return {Kind::kProtocolError, std::move(msg), std::nullopt, + std::nullopt}; + } + /** Server sent a well-formed 'error' event. */ + static Error ServerError(FDv2Error err) { + return {Kind::kServerError, err.reason, std::nullopt, + std::move(err)}; + } + }; + + /** + * Result of handling a single FDv2 event: + * - monostate: no output yet (accumulating, heartbeat, or unknown event) + * - FDv2ChangeSet: complete changeset ready to apply + * - Error: protocol error (JSON parse failure, protocol violation, or + * server-sent error event) + * - Goodbye: server is closing; caller should rotate sources + */ + using Result = + std::variant; + + /** + * Process one FDv2 event. + * + * @param event_type The event type string (e.g. "server-intent", + * "put-object", "payload-transferred"). + * @param data The parsed JSON value for the event's data field. + * @return A Result indicating what (if anything) the caller + * should act on. + */ + Result HandleEvent(std::string_view event_type, + boost::json::value const& data); + + /** + * Reset accumulated state. Call on reconnect before processing new events. + */ + void Reset(); + + FDv2ProtocolHandler() = default; + + private: + enum class State { kInactive, kFull, kPartial }; + + Result HandleServerIntent(boost::json::value const& data); + Result HandlePutObject(boost::json::value const& data); + Result HandleDeleteObject(boost::json::value const& data); + Result HandlePayloadTransferred(boost::json::value const& data); + Result HandleError(boost::json::value const& data); + Result HandleGoodbye(boost::json::value const& data); + + State state_ = State::kInactive; + std::vector changes_; +}; + +} // namespace launchdarkly diff --git a/libs/internal/include/launchdarkly/network/asio_requester.hpp b/libs/internal/include/launchdarkly/network/asio_requester.hpp index a4078953c..cf0927417 100644 --- a/libs/internal/include/launchdarkly/network/asio_requester.hpp +++ b/libs/internal/include/launchdarkly/network/asio_requester.hpp @@ -282,23 +282,17 @@ class AsioRequester { } template - auto Request(HttpRequest request, CompletionToken&& token) { - // TODO: Clang-tidy wants to pass the request by reference, but I am not - // confident that lifetime would make sense. - - namespace asio = boost::asio; - namespace system = boost::system; - - using Sig = void(HttpResult result); - using Result = asio::async_result, Sig>; - using Handler = typename Result::completion_handler_type; - - Handler handler(std::forward(token)); - Result result(handler); - - InnerRequest(net::make_strand(ctx_), request, std::move(handler), 0); - - return result.get(); + auto Request(HttpRequest request, CompletionToken&& token) const { + return boost::asio::async_initiate( + [this](auto handler, HttpRequest req) { + InnerRequest( + net::make_strand(ctx_), std::move(req), + [h = std::move(handler)](HttpResult result) mutable { + std::move(h)(std::move(result)); + }, + 0); + }, + token, std::move(request)); } private: @@ -313,7 +307,7 @@ class AsioRequester { void InnerRequest(boost::asio::any_io_executor exec, std::optional request, std::function callback, - unsigned char redirect_count) { + unsigned char redirect_count) const { if (redirect_count > kRedirectLimit) { boost::asio::post(exec, [callback, request]() mutable { callback( @@ -336,7 +330,7 @@ class AsioRequester { redirect_count]() mutable { auto beast_request = MakeBeastRequest(*request); - const auto& properties = request->Properties(); + auto const& properties = request->Properties(); std::string service = request->Port().value_or(request->Https() ? "https" : "http"); diff --git a/libs/internal/include/launchdarkly/network/requester.hpp b/libs/internal/include/launchdarkly/network/requester.hpp index 58579cc06..c2f23dd4b 100644 --- a/libs/internal/include/launchdarkly/network/requester.hpp +++ b/libs/internal/include/launchdarkly/network/requester.hpp @@ -1,10 +1,10 @@ #pragma once -#include "http_requester.hpp" -#include +#include #include +#include #include -#include +#include "http_requester.hpp" namespace launchdarkly::network { @@ -15,30 +15,32 @@ using TlsOptions = config::shared::built::TlsOptions; class IRequesterImpl; /** - * Requester provides HTTP request functionality using either CURL or Boost.Beast - * depending on the LD_CURL_NETWORKING compile-time flag. + * Requester provides HTTP request functionality using either CURL or + * Boost.Beast depending on the LD_CURL_NETWORKING compile-time flag. * * When LD_CURL_NETWORKING is ON: Uses CurlRequester (CURL-based implementation) - * When LD_CURL_NETWORKING is OFF: Uses AsioRequester (Boost.Beast-based implementation) + * When LD_CURL_NETWORKING is OFF: Uses AsioRequester (Boost.Beast-based + * implementation) * - * The implementation choice is made at library compile-time and hidden from users - * via the pimpl idiom to avoid ABI issues. + * The implementation choice is made at library compile-time and hidden from + * users via the pimpl idiom to avoid ABI issues. */ class Requester { -public: + public: Requester(net::any_io_executor ctx, TlsOptions const& tls_options); ~Requester(); // Move-only type Requester(Requester&&) noexcept; Requester& operator=(Requester&&) noexcept; - Requester(const Requester&) = delete; - Requester& operator=(const Requester&) = delete; + Requester(Requester const&) = delete; + Requester& operator=(Requester const&) = delete; - void Request(HttpRequest request, std::function cb); + void Request(HttpRequest request, + std::function cb) const; -private: + private: std::unique_ptr impl_; }; -} // namespace launchdarkly::network +} // namespace launchdarkly::network diff --git a/libs/internal/src/CMakeLists.txt b/libs/internal/src/CMakeLists.txt index 2fffa42b3..1b3d54e3c 100644 --- a/libs/internal/src/CMakeLists.txt +++ b/libs/internal/src/CMakeLists.txt @@ -36,6 +36,7 @@ set(INTERNAL_SOURCES serialization/value_mapping.cpp serialization/json_evaluation_result.cpp serialization/json_fdv2_events.cpp + fdv2_protocol_handler.cpp serialization/json_sdk_data_set.cpp serialization/json_segment.cpp serialization/json_primitives.cpp diff --git a/libs/internal/src/fdv2_protocol_handler.cpp b/libs/internal/src/fdv2_protocol_handler.cpp new file mode 100644 index 000000000..d0564e45f --- /dev/null +++ b/libs/internal/src/fdv2_protocol_handler.cpp @@ -0,0 +1,265 @@ +#include + +#include +#include +#include +#include +#include + +#include +#include + +namespace launchdarkly { + +static char const* const kServerIntent = "server-intent"; +static char const* const kPutObject = "put-object"; +static char const* const kDeleteObject = "delete-object"; +static char const* const kPayloadTransferred = "payload-transferred"; +static char const* const kError = "error"; +static char const* const kGoodbye = "goodbye"; + +using Error = FDv2ProtocolHandler::Error; + +// Returns the parsed FDv2Change on success, nullopt for unknown kinds (which +// should be silently skipped for forward-compatibility), or an Error if +// a known kind fails to deserialize. +static tl::expected, Error> ParsePut( + PutObject const& put) { + if (put.kind == "flag") { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + put.object); + // One bad flag aborts the entire transfer so the store is never + // left in a partially-updated state. + if (!result) { + return tl::make_unexpected(Error::JsonParseError( + result.error(), + "could not deserialize flag '" + put.key + "'")); + } + if (!result->has_value()) { + return tl::make_unexpected(Error::JsonParseError( + "flag '" + put.key + "' object was null")); + } + return data_model::FDv2Change{ + put.key, + data_model::ItemDescriptor{std::move(**result)}}; + } + if (put.kind == "segment") { + auto result = boost::json::value_to< + tl::expected, JsonError>>( + put.object); + // One bad segment aborts the entire transfer so the store is never + // left in a partially-updated state. + if (!result) { + return tl::make_unexpected(Error::JsonParseError( + result.error(), + "could not deserialize segment '" + put.key + "'")); + } + if (!result->has_value()) { + return tl::make_unexpected(Error::JsonParseError( + "segment '" + put.key + "' object was null")); + } + return data_model::FDv2Change{ + put.key, data_model::ItemDescriptor{ + std::move(**result)}}; + } + // Silently skip unknown kinds for forward-compatibility. + return std::nullopt; +} + +static data_model::FDv2Change MakeDeleteChange(DeleteObject const& del) { + if (del.kind == "flag") { + return data_model::FDv2Change{ + del.key, + data_model::ItemDescriptor{ + data_model::Tombstone{static_cast(del.version)}}}; + } + return data_model::FDv2Change{ + del.key, + data_model::ItemDescriptor{ + data_model::Tombstone{static_cast(del.version)}}}; +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleEvent( + std::string_view event_type, + boost::json::value const& data) { + if (event_type == kServerIntent) { + return HandleServerIntent(data); + } + if (event_type == kPutObject) { + return HandlePutObject(data); + } + if (event_type == kDeleteObject) { + return HandleDeleteObject(data); + } + if (event_type == kPayloadTransferred) { + return HandlePayloadTransferred(data); + } + if (event_type == kError) { + return HandleError(data); + } + if (event_type == kGoodbye) { + return HandleGoodbye(data); + } + // heartbeat and unrecognized events: no-op. + return std::monostate{}; +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleServerIntent( + boost::json::value const& data) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return Error::JsonParseError(result.error(), + "could not deserialize server-intent"); + } + if (!result->has_value()) { + Reset(); + return Error::JsonParseError("server-intent data was null"); + } + auto const& intent = **result; + if (intent.payloads.empty()) { + // The protocol requires exactly one payload per server-intent, so + // an empty payloads array is a spec violation. Reset to avoid + // leaking accumulated state from a prior incomplete transfer. + Reset(); + return std::monostate{}; + } + // The protocol defines exactly one payload per intent. + auto const& code = intent.payloads[0].intent_code; + changes_.clear(); + if (code == IntentCode::kTransferFull) { + state_ = State::kFull; + } else if (code == IntentCode::kTransferChanges) { + state_ = State::kPartial; + } else { + // kNone or kUnknown: emit an empty changeset immediately. + state_ = State::kInactive; + return data_model::FDv2ChangeSet{ + data_model::FDv2ChangeSet::Type::kNone, {}, data_model::Selector{}}; + } + return std::monostate{}; +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePutObject( + boost::json::value const& data) { + if (state_ == State::kInactive) { + return std::monostate{}; + } + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return Error::JsonParseError(result.error(), + "could not deserialize put-object"); + } + if (!result->has_value()) { + Reset(); + return Error::JsonParseError("put-object data was null"); + } + auto change = ParsePut(**result); + if (!change) { + Reset(); + return std::move(change.error()); + } + if (*change) { + changes_.push_back(std::move(**change)); + } + return std::monostate{}; +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleDeleteObject( + boost::json::value const& data) { + if (state_ == State::kInactive) { + return std::monostate{}; + } + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return Error::JsonParseError(result.error(), + "could not deserialize delete-object"); + } + if (!result->has_value()) { + Reset(); + return Error::JsonParseError("delete-object data was null"); + } + auto const& del = **result; + // Silently skip unknown kinds for forward-compatibility. + if (del.kind != "flag" && del.kind != "segment") { + return std::monostate{}; + } + changes_.push_back(MakeDeleteChange(del)); + return std::monostate{}; +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePayloadTransferred( + boost::json::value const& data) { + if (state_ == State::kInactive) { + Reset(); + return Error::ProtocolError( + "payload-transferred received without an active " + "server-intent"); + } + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + if (!result) { + Reset(); + return Error::JsonParseError( + result.error(), "could not deserialize payload-transferred"); + } + if (!result->has_value()) { + Reset(); + return Error::JsonParseError("payload-transferred data was null"); + } + auto const& transferred = **result; + auto type = (state_ == State::kPartial) + ? data_model::FDv2ChangeSet::Type::kPartial + : data_model::FDv2ChangeSet::Type::kFull; + data_model::FDv2ChangeSet changeset{ + type, std::move(changes_), + data_model::Selector{data_model::Selector::State{transferred.version, + transferred.state}}}; + Reset(); + return changeset; +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleError( + boost::json::value const& data) { + auto result = boost::json::value_to< + tl::expected, JsonError>>(data); + Reset(); + if (!result) { + return Error::JsonParseError(result.error(), + "could not deserialize error event"); + } + if (!result->has_value()) { + return Error::JsonParseError("error event data was null"); + } + return Error::ServerError(std::move(**result)); +} + +FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleGoodbye( + boost::json::value const& data) { + Reset(); + auto result = + boost::json::value_to, JsonError>>( + data); + // Parse failures are intentionally ignored: the caller should rotate + // sources regardless of whether the reason field is readable. + if (!result) { + return Goodbye{std::nullopt}; + } + if (!result->has_value()) { + return Goodbye{std::nullopt}; + } + return **result; +} + +void FDv2ProtocolHandler::Reset() { + state_ = State::kInactive; + changes_.clear(); +} + +} // namespace launchdarkly diff --git a/libs/internal/src/network/requester.cpp b/libs/internal/src/network/requester.cpp index d9e7bdee8..3ddad5d59 100644 --- a/libs/internal/src/network/requester.cpp +++ b/libs/internal/src/network/requester.cpp @@ -10,37 +10,40 @@ namespace launchdarkly::network { // Abstract interface for the implementation class IRequesterImpl { -public: + public: virtual ~IRequesterImpl() = default; - virtual void Request(HttpRequest request, std::function cb) = 0; + virtual void Request(HttpRequest request, + std::function cb) const = 0; }; #ifdef LD_CURL_NETWORKING // CURL-based implementation class CurlRequesterImpl : public IRequesterImpl { -public: + public: CurlRequesterImpl(net::any_io_executor ctx, TlsOptions const& tls_options) : requester_(ctx, tls_options) {} - void Request(HttpRequest request, std::function cb) override { + void Request(HttpRequest request, + std::function cb) const override { requester_.Request(std::move(request), std::move(cb)); } -private: + private: CurlRequester requester_; }; #else // Boost.Beast-based implementation class AsioRequesterImpl : public IRequesterImpl { -public: + public: AsioRequesterImpl(net::any_io_executor ctx, TlsOptions const& tls_options) : requester_(ctx, tls_options) {} - void Request(HttpRequest request, std::function cb) override { + void Request(HttpRequest request, + std::function cb) const override { requester_.Request(std::move(request), std::move(cb)); } -private: + private: AsioRequester requester_; }; #endif @@ -59,8 +62,9 @@ Requester::~Requester() = default; Requester::Requester(Requester&&) noexcept = default; Requester& Requester::operator=(Requester&&) noexcept = default; -void Requester::Request(HttpRequest request, std::function cb) { +void Requester::Request(HttpRequest request, + std::function cb) const { impl_->Request(std::move(request), std::move(cb)); } -} // namespace launchdarkly::network +} // namespace launchdarkly::network diff --git a/libs/internal/tests/fdv2_protocol_handler_test.cpp b/libs/internal/tests/fdv2_protocol_handler_test.cpp new file mode 100644 index 000000000..4a7d9da0d --- /dev/null +++ b/libs/internal/tests/fdv2_protocol_handler_test.cpp @@ -0,0 +1,373 @@ +#include + +#include + +#include + +using namespace launchdarkly; + +// Minimal valid flag JSON accepted by the existing Flag deserializer. +static char const* const kFlagJson = + R"({"key":"my-flag","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":1})"; + +// Minimal valid segment JSON accepted by the existing Segment deserializer. +static char const* const kSegmentJson = + R"({"key":"my-seg","version":2,"rules":[],"included":[],"excluded":[]})"; + +// Build a server-intent event data value. +static boost::json::value MakeServerIntent(std::string const& intent_code) { + return boost::json::parse( + R"({"payloads":[{"id":"p1","target":1,"intentCode":")" + intent_code + + R"("}]})"); +} + +static boost::json::value MakePutObject(std::string const& kind, + std::string const& key, + std::string const& object_json) { + return boost::json::parse(R"({"version":1,"kind":")" + kind + + R"(","key":")" + key + R"(","object":)" + + object_json + "}"); +} + +static boost::json::value MakeDeleteObject(std::string const& kind, + std::string const& key, + int version) { + return boost::json::parse(R"({"version":)" + std::to_string(version) + + R"(,"kind":")" + kind + R"(","key":")" + key + + R"("})"); +} + +static boost::json::value MakePayloadTransferred(std::string const& state, + int version) { + return boost::json::parse(R"({"state":")" + state + R"(","version":)" + + std::to_string(version) + "}"); +} + +// ============================================================================ +// kNone intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, NoneIntentEmitsEmptyChangeSetImmediately) { + FDv2ProtocolHandler handler; + + auto result = + handler.HandleEvent("server-intent", MakeServerIntent("none")); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kNone); + EXPECT_TRUE(cs->changes.empty()); + EXPECT_FALSE(cs->selector.value.has_value()); +} + +// ============================================================================ +// kTransferFull intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, FullIntentEmitsChangeSetOnPayloadTransferred) { + FDv2ProtocolHandler handler; + + auto r1 = + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + EXPECT_TRUE(std::holds_alternative(r1)); + + auto r2 = handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + EXPECT_TRUE(std::holds_alternative(r2)); + + auto r3 = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("state-abc", 7)); + + auto* cs = std::get_if(&r3); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kFull); + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-flag"); + ASSERT_TRUE(cs->selector.value.has_value()); + EXPECT_EQ(cs->selector.value->state, "state-abc"); + EXPECT_EQ(cs->selector.value->version, 7); +} + +TEST(FDv2ProtocolHandlerTest, FullIntentAccumulatesMultipleObjects) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "flag-1", kFlagJson)); + handler.HandleEvent("put-object", + MakePutObject("flag", "flag-2", kFlagJson)); + handler.HandleEvent("delete-object", + MakeDeleteObject("segment", "seg-1", 5)); + + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kFull); + EXPECT_EQ(cs->changes.size(), 3u); +} + +TEST(FDv2ProtocolHandlerTest, + SecondServerIntentMidTransferDiscardsAccumulatedChanges) { + FDv2ProtocolHandler handler; + + // Start a full transfer and accumulate a change. + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "flag-1", kFlagJson)); + + // A second server-intent arrives before payload-transferred. + // The first transfer's accumulated changes should be discarded. + auto r = + handler.HandleEvent("server-intent", MakeServerIntent("xfer-changes")); + EXPECT_TRUE(std::holds_alternative(r)); + + // Only the flag from the second transfer should appear in the changeset. + handler.HandleEvent("put-object", + MakePutObject("flag", "flag-2", kFlagJson)); + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("state-new", 2)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kPartial); + ASSERT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "flag-2"); +} + +// ============================================================================ +// kTransferChanges intent +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, PartialIntentEmitsPartialChangeSet) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-changes")); + handler.HandleEvent("put-object", + MakePutObject("segment", "my-seg", kSegmentJson)); + + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("state-xyz", 3)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_EQ(cs->type, data_model::FDv2ChangeSet::Type::kPartial); + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-seg"); + ASSERT_TRUE(cs->selector.value.has_value()); + EXPECT_EQ(cs->selector.value->state, "state-xyz"); +} + +// ============================================================================ +// Unknown kind in put-object → silently skipped +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, UnknownKindInPutObjectIsSilentlySkipped) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent( + "put-object", + MakePutObject("experiment", "exp-1", R"({"key":"exp-1","version":1})")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + // Only the known kind (flag) should appear. + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-flag"); +} + +TEST(FDv2ProtocolHandlerTest, UnknownKindInDeleteObjectIsSilentlySkipped) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("delete-object", + MakeDeleteObject("experiment", "exp-1", 1)); + handler.HandleEvent("delete-object", + MakeDeleteObject("flag", "my-flag", 2)); + + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + // Only the known kind (flag) should appear. + EXPECT_EQ(cs->changes.size(), 1u); + EXPECT_EQ(cs->changes[0].key, "my-flag"); +} + +// ============================================================================ +// error event → discard accumulated data, return Error::kServerError +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, + ErrorEventDiscardsAccumulatedDataAndReturnsError) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + + auto result = handler.HandleEvent( + "error", boost::json::parse(R"({"reason":"something went wrong"})")); + + auto* err = std::get_if(&result); + ASSERT_NE(err, nullptr); + EXPECT_EQ(err->kind, FDv2ProtocolHandler::Error::Kind::kServerError); + ASSERT_TRUE(err->server_error.has_value()); + EXPECT_EQ(err->server_error->reason, "something went wrong"); + EXPECT_FALSE(err->server_error->id.has_value()); + + // After the error the handler is reset. A subsequent full transfer should + // produce an empty changeset (no leftover data from before the error). + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result2 = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result2); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} + +TEST(FDv2ProtocolHandlerTest, ErrorEventWithIdSetsServerId) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent( + "error", + boost::json::parse(R"({"id":"payload-123","reason":"overloaded"})")); + + auto* err = std::get_if(&result); + ASSERT_NE(err, nullptr); + EXPECT_EQ(err->kind, FDv2ProtocolHandler::Error::Kind::kServerError); + ASSERT_TRUE(err->server_error.has_value()); + ASSERT_TRUE(err->server_error->id.has_value()); + EXPECT_EQ(*err->server_error->id, "payload-123"); + EXPECT_EQ(err->server_error->reason, "overloaded"); +} + +TEST(FDv2ProtocolHandlerTest, MalformedPutObjectReturnsJsonError) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + + // 'object' field is missing required flag fields — deserialisation fails. + auto result = handler.HandleEvent( + "put-object", + boost::json::parse( + R"({"version":1,"kind":"flag","key":"f","object":{}})")); + + auto* err = std::get_if(&result); + ASSERT_NE(err, nullptr); + EXPECT_EQ(err->kind, FDv2ProtocolHandler::Error::Kind::kJsonError); + EXPECT_TRUE(err->json_error.has_value()); +} + +// ============================================================================ +// goodbye event → return Goodbye +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, GoodbyeEventReturnsGoodbye) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent( + "goodbye", boost::json::parse(R"({"reason":"shutting down"})")); + + auto* gb = std::get_if(&result); + ASSERT_NE(gb, nullptr); + ASSERT_TRUE(gb->reason.has_value()); + EXPECT_EQ(*gb->reason, "shutting down"); +} + +TEST(FDv2ProtocolHandlerTest, GoodbyeWithoutReasonReturnsGoodbye) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent("goodbye", boost::json::parse(R"({})")); + + auto* gb = std::get_if(&result); + ASSERT_NE(gb, nullptr); + EXPECT_FALSE(gb->reason.has_value()); +} + +// ============================================================================ +// heartbeat → no-op +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, HeartbeatReturnsMonostate) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent("heartbeat", boost::json::parse(R"({})")); + EXPECT_TRUE(std::holds_alternative(result)); +} + +// ============================================================================ +// Unrecognized event type → no-op +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, UnknownEventTypeReturnsMonostate) { + FDv2ProtocolHandler handler; + + auto result = + handler.HandleEvent("future-event-type", boost::json::parse(R"({})")); + EXPECT_TRUE(std::holds_alternative(result)); +} + +// ============================================================================ +// put-object and delete-object before server-intent are ignored +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, PutBeforeServerIntentIsIgnored) { + FDv2ProtocolHandler handler; + + auto r1 = handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + EXPECT_TRUE(std::holds_alternative(r1)); + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} + +// ============================================================================ +// Reset clears accumulated state +// ============================================================================ + +TEST(FDv2ProtocolHandlerTest, ResetClearsState) { + FDv2ProtocolHandler handler; + + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + handler.HandleEvent("put-object", + MakePutObject("flag", "my-flag", kFlagJson)); + handler.Reset(); + + // After reset, payload-transferred with no prior server-intent produces + // a full changeset with no changes. + handler.HandleEvent("server-intent", MakeServerIntent("xfer-full")); + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s", 1)); + + auto* cs = std::get_if(&result); + ASSERT_NE(cs, nullptr); + EXPECT_TRUE(cs->changes.empty()); +} + +TEST(FDv2ProtocolHandlerTest, PayloadTransferredWithoutServerIntentIsError) { + FDv2ProtocolHandler handler; + + auto result = handler.HandleEvent("payload-transferred", + MakePayloadTransferred("s", 1)); + + auto* err = std::get_if(&result); + ASSERT_NE(err, nullptr); + EXPECT_EQ(err->kind, FDv2ProtocolHandler::Error::Kind::kProtocolError); +} diff --git a/libs/internal/tests/promise_test.cpp b/libs/internal/tests/promise_test.cpp index d338bcabd..8d5ecf08c 100644 --- a/libs/internal/tests/promise_test.cpp +++ b/libs/internal/tests/promise_test.cpp @@ -15,7 +15,7 @@ TEST(Promise, SimplePromise) { Future future2 = future.Then( [](int const& inner) { return static_cast(inner * 2.0); }, - [](Continuation f) { f(); }); + kInlineExecutor); promise.Resolve(43); @@ -116,9 +116,8 @@ TEST(Promise, ContinueByReturningFuture) { Promise promise2; Future future2 = promise2.GetFuture(); - Future chained = - promise1.GetFuture().Then([future2](int const&) { return future2; }, - [](Continuation f) { f(); }); + Future chained = promise1.GetFuture().Then( + [future2](int const&) { return future2; }, kInlineExecutor); promise1.Resolve(0); promise2.Resolve(42); @@ -134,8 +133,8 @@ TEST(Promise, ResolvedBeforeContinuation) { promise.Resolve(21); - Future future2 = future.Then([](int const& val) { return val * 2; }, - [](Continuation f) { f(); }); + Future future2 = + future.Then([](int const& val) { return val * 2; }, kInlineExecutor); auto result = future2.WaitForResult(std::chrono::seconds(5)); ASSERT_TRUE(result.has_value()); @@ -146,8 +145,8 @@ TEST(Promise, ResolvedAfterContinuation) { Promise promise; Future future = promise.GetFuture(); - Future future2 = future.Then([](int const& val) { return val * 2; }, - [](Continuation f) { f(); }); + Future future2 = + future.Then([](int const& val) { return val * 2; }, kInlineExecutor); promise.Resolve(21); @@ -174,7 +173,7 @@ TEST(Promise, CopyOnlyCallback) { CopyOnlyInt multiplier{2}; Future future2 = future.Then( [multiplier](int const& val) { return val * multiplier.value; }, - [](Continuation f) { f(); }); + kInlineExecutor); promise.Resolve(21); @@ -193,7 +192,7 @@ TEST(Promise, MoveOnlyCallback) { Future future2 = future.Then([captured = std::move(captured)]( int const& val) { return val * *captured; }, - [](Continuation f) { f(); }); + kInlineExecutor); promise.Resolve(21); @@ -231,7 +230,7 @@ TEST(Promise, CallbackMovedWhenPossible) { future.Then([multiplier = std::move(multiplier)]( int const& val) mutable { return val * multiplier.value; }, - [](Continuation f) { f(); }); + kInlineExecutor); EXPECT_EQ(copies, 0); @@ -251,7 +250,7 @@ TEST(Promise, MonostateVoidLike) { ran = true; return std::monostate{}; }, - [](Continuation f) { f(); }); + kInlineExecutor); promise.Resolve(std::monostate{}); @@ -407,7 +406,7 @@ TEST(WhenAny, MixedTypesFirstWins) { return f1.GetResult().value(); } }, - [](Continuation f) { f(); }); + kInlineExecutor); p1.Resolve("hello"); p0.Resolve(99); diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index 62a017d41..09f98465b 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -49,6 +49,12 @@ target_sources(${LIBNAME} data_systems/background_sync/detail/payload_filter_validation/payload_filter_validation.cpp data_systems/background_sync/sources/polling/polling_data_source.hpp data_systems/background_sync/sources/polling/polling_data_source.cpp + data_systems/fdv2/fdv2_polling_impl.hpp + data_systems/fdv2/fdv2_polling_impl.cpp + data_systems/fdv2/polling_initializer.hpp + data_systems/fdv2/polling_initializer.cpp + data_systems/fdv2/polling_synchronizer.hpp + data_systems/fdv2/polling_synchronizer.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp data_systems/background_sync/sources/streaming/streaming_data_source.cpp data_systems/background_sync/sources/streaming/event_handler.hpp diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer.hpp index 5c2608cd1..125250b1d 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_initializer.hpp @@ -2,6 +2,8 @@ #include "fdv2_source_result.hpp" +#include + #include namespace launchdarkly::server_side::data_interfaces { @@ -14,13 +16,13 @@ namespace launchdarkly::server_side::data_interfaces { class IFDv2Initializer { public: /** - * Run the initializer to completion. Blocks until a result is available. - * Called at most once per instance. + * Returns a Future that resolves with the result once the initializer + * completes. Called at most once per instance. * * Close() may be called from another thread to unblock Run(), in which - * case Run() returns FDv2SourceResult::Shutdown. + * case the future resolves with FDv2SourceResult::Shutdown. */ - virtual FDv2SourceResult Run() = 0; + virtual async::Future Run() = 0; /** * Unblocks any in-progress Run() call, causing it to return diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp index a56bacee3..96bfc54c0 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp @@ -2,6 +2,7 @@ #include "fdv2_source_result.hpp" +#include #include #include @@ -19,24 +20,26 @@ namespace launchdarkly::server_side::data_interfaces { class IFDv2Synchronizer { public: /** - * Block until the next result is available or the timeout expires. + * Returns a Future that resolves with the next result once it is available + * or the timeout expires. * * On the first call, the synchronizer starts its underlying connection. * Subsequent calls continue reading from the same connection. * - * If the timeout expires before a result arrives, returns + * If the timeout expires before a result arrives, the future resolves with * FDv2SourceResult::Timeout. The orchestrator uses this to evaluate * fallback conditions. * * Close() may be called from another thread to unblock Next(), in which - * case Next() returns FDv2SourceResult::Shutdown. + * case the future resolves with FDv2SourceResult::Shutdown. * * @param timeout Maximum time to wait for the next result. * @param selector The selector to send with the request, reflecting any * changesets applied since the previous call. */ - virtual FDv2SourceResult Next(std::chrono::milliseconds timeout, - data_model::Selector selector) = 0; + virtual async::Future Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) = 0; /** * Unblocks any in-progress Next() call, causing it to return diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp new file mode 100644 index 000000000..df6fda9af --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp @@ -0,0 +1,203 @@ +#include "fdv2_polling_impl.hpp" + +#include +#include + +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kFDv2PollPath = "/sdk/poll"; + +static char const* const kErrorParsingBody = + "Could not parse FDv2 polling response"; +static char const* const kErrorMissingEvents = + "FDv2 polling response missing 'events' array"; +static char const* const kErrorIncompletePayload = + "FDv2 polling response did not contain a complete payload"; + +using data_interfaces::FDv2SourceResult; +using ErrorInfo = FDv2SourceResult::ErrorInfo; +using ErrorKind = ErrorInfo::ErrorKind; + +static ErrorInfo MakeError(ErrorKind kind, + ErrorInfo::StatusCodeType status, + std::string message) { + return ErrorInfo{kind, status, std::move(message), + std::chrono::system_clock::now()}; +} + +network::HttpRequest MakeFDv2PollRequest( + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector const& selector, + std::optional const& filter_key) { + config::builders::HttpPropertiesBuilder const builder(http_properties); + + auto parsed = boost::urls::parse_uri(endpoints.PollingBaseUrl()); + if (!parsed) { + return {"", network::HttpMethod::kGet, builder.Build(), + network::HttpRequest::BodyType{}}; + } + + boost::urls::url u = parsed.value(); + u.set_path(u.path() + kFDv2PollPath); + if (selector.value) { + u.params().append({"basis", selector.value->state}); + } + if (filter_key) { + u.params().append({"filter", *filter_key}); + } + + return {std::string(u.buffer()), network::HttpMethod::kGet, builder.Build(), + network::HttpRequest::BodyType{}}; +} + +static FDv2SourceResult ParseFDv2PollEvents( + boost::json::array const& events, + FDv2ProtocolHandler* protocol_handler) { + for (auto const& event_val : events) { + auto const* event_obj = event_val.if_object(); + if (!event_obj) { + continue; + } + + auto const* event_type_val = event_obj->if_contains("event"); + auto const* event_data_val = event_obj->if_contains("data"); + if (!event_type_val || !event_data_val) { + continue; + } + + auto const* event_type_str = event_type_val->if_string(); + if (!event_type_str) { + continue; + } + + auto result = protocol_handler->HandleEvent( + std::string_view{event_type_str->data(), event_type_str->size()}, + *event_data_val); + + if (auto* changeset = std::get_if(&result)) { + return FDv2SourceResult{ + FDv2SourceResult::ChangeSet{std::move(*changeset), false}}; + } + if (auto* goodbye = std::get_if(&result)) { + return FDv2SourceResult{ + FDv2SourceResult::Goodbye{goodbye->reason, false}}; + } + if (auto* error = std::get_if(&result)) { + if (error->kind == FDv2ProtocolHandler::Error::Kind::kServerError) { + auto const& id = error->server_error.value().id; + std::string msg = + "An issue was encountered receiving updates for " + "payload '" + + id.value_or("") + "' with reason: '" + error->message + + "'. Automatic retry will occur."; + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)), + false}}; + } + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, error->message), false}}; + } + } + + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload), false}}; +} + +static FDv2SourceResult ParseFDv2PollResponse( + std::string const& body, + FDv2ProtocolHandler* protocol_handler) { + boost::system::error_code ec; + auto parsed = boost::json::parse(body, ec); + if (ec) { + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), false}}; + } + + auto const* obj = parsed.if_object(); + if (!obj) { + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), false}}; + } + + auto const* events_val = obj->if_contains("events"); + if (!events_val) { + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}}; + } + + auto const* events_arr = events_val->if_array(); + if (!events_arr) { + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}}; + } + + return ParseFDv2PollEvents(*events_arr, protocol_handler); +} + +data_interfaces::FDv2SourceResult HandleFDv2PollResponse( + network::HttpResult const& res, + FDv2ProtocolHandler* protocol_handler, + Logger const& logger, + std::string_view identity) { + if (res.IsError()) { + auto const& msg = res.ErrorMessage(); + std::string error_msg = msg.has_value() ? *msg : "unknown error"; + LD_LOG(logger, LogLevel::kWarn) << identity << ": " << error_msg; + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg)), + false}}; + } + + if (res.Status() == 304) { + return FDv2SourceResult{FDv2SourceResult::ChangeSet{ + data_model::FDv2ChangeSet{ + data_model::FDv2ChangeSet::Type::kNone, {}, {}}, + false}}; + } + + if (res.Status() == 200) { + auto const& body = res.Body(); + if (!body) { + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kInvalidData, 0, + "polling response contained no body"), + false}}; + } + + auto result = ParseFDv2PollResponse(*body, protocol_handler); + if (auto* interrupted = + std::get_if(&result.value)) { + if (interrupted->error.Kind() == ErrorKind::kErrorResponse) { + LD_LOG(logger, LogLevel::kInfo) + << identity << ": " << interrupted->error.Message(); + } else { + LD_LOG(logger, LogLevel::kError) + << identity << ": " << interrupted->error.Message(); + } + } + return result; + } + + if (network::IsRecoverableStatus(res.Status())) { + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", "will retry"); + LD_LOG(logger, LogLevel::kWarn) << identity << ": " << msg; + return FDv2SourceResult{FDv2SourceResult::Interrupted{ + MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)), + false}}; + } + + std::string msg = network::ErrorForStatusCode( + res.Status(), "FDv2 polling request", std::nullopt); + LD_LOG(logger, LogLevel::kError) << identity << ": " << msg; + return FDv2SourceResult{FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)), + false}}; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.hpp new file mode 100644 index 000000000..086d3c8db --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include "../../data_interfaces/source/fdv2_source_result.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +// Build a polling HTTP GET request for the FDv2 endpoint. +network::HttpRequest MakeFDv2PollRequest( + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector const& selector, + std::optional const& filter_key); + +// Parse an HTTP response from the FDv2 polling endpoint through the protocol +// handler and return the appropriate result. identity is used in log messages +// to identify the caller (e.g. "FDv2 polling initializer"). +data_interfaces::FDv2SourceResult HandleFDv2PollResponse( + network::HttpResult const& res, + FDv2ProtocolHandler* protocol_handler, + Logger const& logger, + std::string_view identity); + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp new file mode 100644 index 000000000..174edd040 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp @@ -0,0 +1,82 @@ +#include "polling_initializer.hpp" +#include "fdv2_polling_impl.hpp" + +#include +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kIdentity = "FDv2 polling initializer"; + +using data_interfaces::FDv2SourceResult; + +FDv2PollingInitializer::FDv2PollingInitializer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector selector, + std::optional filter_key) + : request_(MakeFDv2PollRequest(endpoints, + http_properties, + std::move(selector), + std::move(filter_key))), + requester_(executor, http_properties.Tls()), + state_(std::make_shared(logger)) {} + +FDv2PollingInitializer::~FDv2PollingInitializer() { + close_promise_.Resolve(std::monostate{}); +} + +async::Future FDv2PollingInitializer::Run() { + if (!request_.Valid()) { + LD_LOG(state_->logger, LogLevel::kError) + << kIdentity << ": invalid polling endpoint URL"; + using ErrorInfo = FDv2SourceResult::ErrorInfo; + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::TerminalError{ + ErrorInfo{ErrorInfo::ErrorKind::kUnknown, 0, + "invalid polling endpoint URL", + std::chrono::system_clock::now()}, + false}}); + } + + // Promisify the callback-based HTTP request. + auto http_promise = std::make_shared>(); + auto http_future = http_promise->GetFuture(); + requester_.Request(request_, [hp = std::move(http_promise)]( + network::HttpResult const& res) mutable { + hp->Resolve(res); + }); + + // Race: HTTP result (0) vs close (1). + return async::WhenAny(http_future, close_promise_.GetFuture()) + .Then( + [state = state_, http_future = std::move(http_future)]( + std::size_t const& idx) -> FDv2SourceResult { + if (idx == 1) { + return FDv2SourceResult{FDv2SourceResult::Shutdown{}}; + } + return HandlePollResult(state, *http_future.GetResult()); + }, + async::kInlineExecutor); +} + +void FDv2PollingInitializer::Close() { + close_promise_.Resolve(std::monostate{}); +} + +std::string const& FDv2PollingInitializer::Identity() const { + static std::string const identity = kIdentity; + return identity; +} + +FDv2SourceResult FDv2PollingInitializer::HandlePollResult( + std::shared_ptr state, + network::HttpResult const& res) { + FDv2ProtocolHandler protocol_handler; + return HandleFDv2PollResponse(res, &protocol_handler, state->logger, + kIdentity); +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp new file mode 100644 index 000000000..0fb87c4d0 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_initializer.hpp" + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * FDv2 polling initializer. Makes a single HTTP GET to the FDv2 polling + * endpoint, parses the response via the FDv2 protocol state machine, and + * returns the result. + * + * Threading model: + * Run() should only be called once at a time. + * Close() may be called concurrently with Run(). + * This object may be safely destroyed once no call to Run() or Close() + * is in progress. + */ +class FDv2PollingInitializer final : public data_interfaces::IFDv2Initializer { + public: + /** + * Constructs an initializer for a single poll request. + * If filter_key is present, only the specified payload filter is requested. + */ + FDv2PollingInitializer(boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + data_model::Selector selector, + std::optional filter_key); + + ~FDv2PollingInitializer() override; + + async::Future Run() override; + + void Close() override; + + [[nodiscard]] std::string const& Identity() const override; + + private: + // State needed by async callbacks. Shared so callbacks can safely + // outlive 'this'. + struct State { + // Logger is itself thread-safe. + Logger logger; + + explicit State(Logger logger) : logger(std::move(logger)) {} + }; + + /** Interprets an HTTP response as a source result. */ + static data_interfaces::FDv2SourceResult HandlePollResult( + std::shared_ptr state, + network::HttpResult const& res); + + // Immutable state + network::HttpRequest const request_; + network::Requester const requester_; + + // Resolved when Close() is called (or this object is destroyed), + // cancelling any outstanding Run(). + async::Promise close_promise_; + + // Shared with async callbacks. + std::shared_ptr state_; +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp new file mode 100644 index 000000000..a8dabaa54 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp @@ -0,0 +1,189 @@ +#include "polling_synchronizer.hpp" +#include "fdv2_polling_impl.hpp" + +#include +#include +#include + +#include + +namespace launchdarkly::server_side::data_systems { + +static char const* const kIdentity = "FDv2 polling synchronizer"; + +// Minimum polling interval to prevent accidentally hammering the service. +static constexpr std::chrono::seconds kMinPollInterval{30}; + +using data_interfaces::FDv2SourceResult; + +FDv2PollingSynchronizer::State::State( + Logger logger, + boost::asio::any_io_executor const& executor, + std::chrono::seconds poll_interval, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key) + : logger_(std::move(logger)), + poll_interval_(std::max(poll_interval, kMinPollInterval)), + endpoints_(endpoints), + http_properties_(http_properties), + filter_key_(std::move(filter_key)), + requester_(executor, http_properties.Tls()), + executor_(executor) {} + +async::Future FDv2PollingSynchronizer::State::Request( + data_model::Selector const& selector) const { + auto request = MakeFDv2PollRequest(endpoints_, http_properties_, selector, + filter_key_); + + // Promise must be in a shared_ptr because Requester requires callbacks + // to be copy-constructible (stored in std::function). + auto promise = std::make_shared>(); + auto future = promise->GetFuture(); + requester_.Request(request, [promise = std::move(promise)]( + network::HttpResult const& res) mutable { + promise->Resolve(res); + }); + return future; +} + +FDv2SourceResult FDv2PollingSynchronizer::State::HandlePollResult( + network::HttpResult const& res) { + FDv2ProtocolHandler protocol_handler; + return HandleFDv2PollResponse(res, &protocol_handler, logger_, kIdentity); +} + +async::Future FDv2PollingSynchronizer::State::Delay( + std::chrono::nanoseconds duration) { + return async::Delay(executor_, duration); +} + +async::Future FDv2PollingSynchronizer::State::CreatePollDelayFuture() { + std::lock_guard lock(mutex_); + if (!last_poll_start_) { + return async::MakeFuture(true); + } + auto now = std::chrono::steady_clock::now(); + auto elapsed = now - *last_poll_start_; + if (elapsed >= poll_interval_) { + return async::MakeFuture(true); + } + return Delay(poll_interval_ - elapsed); +} + +void FDv2PollingSynchronizer::State::RecordPollStarted() { + std::lock_guard lock(mutex_); + last_poll_start_ = std::chrono::steady_clock::now(); +} + +FDv2PollingSynchronizer::FDv2PollingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::seconds poll_interval) + : state_(std::make_shared(logger, + executor, + poll_interval, + endpoints, + http_properties, + std::move(filter_key))) { + if (poll_interval < kMinPollInterval) { + LD_LOG(logger, LogLevel::kWarn) + << kIdentity << ": polling interval too frequent, defaulting to " + << kMinPollInterval.count() << " seconds"; + } +} + +FDv2PollingSynchronizer::~FDv2PollingSynchronizer() { + Close(); +} + +async::Future FDv2PollingSynchronizer::Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) { + return DoNext(state_, close_promise_.GetFuture(), timeout, + std::move(selector)); +} + +void FDv2PollingSynchronizer::Close() { + close_promise_.Resolve(std::monostate{}); +} + +std::string const& FDv2PollingSynchronizer::Identity() const { + static std::string const identity = kIdentity; + return identity; +} + +/* static */ async::Future FDv2PollingSynchronizer::DoNext( + std::shared_ptr state, + async::Future closed, + std::chrono::milliseconds timeout, + data_model::Selector selector) { + if (closed.IsFinished()) { + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + + auto now = std::chrono::steady_clock::now(); + auto timeout_deadline = now + timeout; + auto timeout_future = state->Delay(timeout); + + // Figure out how much to delay before starting. + auto delay_future = state->CreatePollDelayFuture(); + + return async::WhenAny(closed, std::move(timeout_future), + std::move(delay_future)) + .Then( + [state = std::move(state), closed = std::move(closed), + timeout_deadline, + selector = std::move(selector)](std::size_t const& idx) mutable + -> async::Future { + if (idx == 0) { + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + if (idx == 1) { + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Timeout{}}); + } + return DoPoll(std::move(state), std::move(closed), + timeout_deadline, selector); + }, + async::kInlineExecutor); +} + +/* static */ async::Future FDv2PollingSynchronizer::DoPoll( + std::shared_ptr state, + async::Future closed, + std::chrono::time_point timeout_deadline, + data_model::Selector const& selector) { + if (closed.IsFinished()) { + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + + state->RecordPollStarted(); + + auto now = std::chrono::steady_clock::now(); + auto timeout_future = state->Delay(timeout_deadline - now); + auto http_future = state->Request(selector); + + return async::WhenAny(std::move(closed), std::move(timeout_future), + http_future) + .Then( + [state = std::move(state), http_future = std::move(http_future)]( + std::size_t const& idx) -> FDv2SourceResult { + if (idx == 0) { + return FDv2SourceResult{FDv2SourceResult::Shutdown{}}; + } + if (idx == 1) { + return FDv2SourceResult{FDv2SourceResult::Timeout{}}; + } + return state->HandlePollResult(*http_future.GetResult()); + }, + async::kInlineExecutor); +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp new file mode 100644 index 000000000..42f20bfe1 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp @@ -0,0 +1,136 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_synchronizer.hpp" + +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * FDv2 polling synchronizer. Repeatedly polls the FDv2 polling endpoint at + * a configurable interval. + * + * The caller passes the current selector into each Next() call, allowing the + * orchestrator to reflect applied changesets without any shared state. + * + * Threading model: + * Next() should only be called once at a time. + * Close() may be called concurrently with Next(). + * This object may be safely destroyed once no call to Next() or Close() + * is in progress. + */ +class FDv2PollingSynchronizer final + : public data_interfaces::IFDv2Synchronizer { + public: + /** + * Constructs a synchronizer that polls at the given interval. + * If filter_key is present, only the specified payload filter is requested. + */ + FDv2PollingSynchronizer( + boost::asio::any_io_executor const& executor, + Logger const& logger, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key, + std::chrono::seconds poll_interval); + + ~FDv2PollingSynchronizer() override; + + async::Future Next( + std::chrono::milliseconds timeout, + data_model::Selector selector) override; + + void Close() override; + + [[nodiscard]] std::string const& Identity() const override; + + private: + // Any state that may be accessed by async callbacks needs to be inside this + // class, managed by a shared_ptr. All mutable members are guarded by the + // mutex. + class State { + public: + State(Logger logger, + boost::asio::any_io_executor const& executor, + std::chrono::seconds poll_interval, + config::built::ServiceEndpoints const& endpoints, + config::built::HttpProperties const& http_properties, + std::optional filter_key); + + /** Issues an async HTTP poll request and returns a Future resolving + * with the result. */ + async::Future Request( + data_model::Selector const& selector) const; + + /** Interprets an HTTP response as a source result. */ + data_interfaces::FDv2SourceResult HandlePollResult( + network::HttpResult const& res); + + /** Returns a Future that resolves when it is time to start the next + * poll. */ + async::Future CreatePollDelayFuture(); + + /** Records that a poll has started, for interval scheduling. */ + void RecordPollStarted(); + + /** Returns a Future that resolves after the given duration. */ + async::Future Delay(std::chrono::nanoseconds duration); + + private: + // Logger is itself thread-safe. + Logger logger_; + + // Immutable state + std::chrono::seconds const poll_interval_; + config::built::ServiceEndpoints const endpoints_; + config::built::HttpProperties const http_properties_; + std::optional const filter_key_; + network::Requester const requester_; + boost::asio::any_io_executor const executor_; + + // Mutable state, guarded by mutex_. + std::mutex mutex_; + std::optional> + last_poll_start_; + }; + + /** + * Waits for the poll interval, then delegates to DoPoll. + * Resolves with Shutdown if closed, or Timeout if the timeout expires + * first. + */ + static async::Future DoNext( + std::shared_ptr state, + async::Future closed, + std::chrono::milliseconds timeout, + data_model::Selector selector); + + /** + * Issues a single HTTP poll request and returns the result. + * Resolves with Shutdown if closed, or Timeout if timeout_deadline passes + * first. + */ + static async::Future DoPoll( + std::shared_ptr state, + async::Future closed, + std::chrono::time_point timeout_deadline, + data_model::Selector const& selector); + + // Resolved by Close() or on destruction, cancelling any outstanding Next() + // calls. + async::Promise close_promise_; + + // Shared with async callbacks. + std::shared_ptr state_; +}; + +} // namespace launchdarkly::server_side::data_systems