diff --git a/libs/internal/include/launchdarkly/async/cancellation.hpp b/libs/internal/include/launchdarkly/async/cancellation.hpp new file mode 100644 index 000000000..ca1e70be3 --- /dev/null +++ b/libs/internal/include/launchdarkly/async/cancellation.hpp @@ -0,0 +1,242 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +// This file implements a cancellation primitive modelled after C++20's +// std::stop_source / std::stop_token / std::stop_callback design: a source +// owns the ability to trigger cancellation, lightweight tokens (derived from +// the source) can be freely passed around, and CancellationCallback provides +// RAII registration of a callback tied to a token. + +namespace launchdarkly::async { + +// CancellationState is the shared state between a CancellationSource and all +// CancellationTokens and CancellationCallbacks derived from it. This is an +// internal class; use CancellationSource, CancellationToken, and +// CancellationCallback instead. +class CancellationState { + public: + using CallbackId = std::size_t; + + // Sentinel returned by Register() when the state was already cancelled; + // the callback is invoked immediately in that case. Deregister() is a + // no-op for this value. + static constexpr CallbackId kAlreadyCancelled = 0; + + CancellationState() = default; + ~CancellationState() = default; + CancellationState(CancellationState const&) = delete; + CancellationState& operator=(CancellationState const&) = delete; + CancellationState(CancellationState&&) = delete; + CancellationState& operator=(CancellationState&&) = delete; + + // Registers a callback and returns its ID. If Cancel() has already been + // called, invokes cb immediately (outside the lock) and returns + // kAlreadyCancelled. + CallbackId Register(Continuation cb) { + std::unique_lock lock(mutex_); + if (cancelled_) { + lock.unlock(); + cb(); + return kAlreadyCancelled; + } + CallbackId id = next_id_++; + callbacks_.emplace(id, std::move(cb)); + return id; + } + + // Deregisters the callback with the given ID. If the callback is currently + // executing on another thread, blocks until execution completes. This + // mirrors the synchronization guarantee of C++20's stop_callback + // destructor: after Deregister returns, the callback is guaranteed to have + // either never run or fully completed. No-op if id is kAlreadyCancelled or + // the callback has already run. + void Deregister(CallbackId id) { + if (id == kAlreadyCancelled) { + return; + } + + std::unique_lock lock(mutex_); + + // The callback is still pending: remove it before it can run. + if (callbacks_.erase(id)) { + return; + } + + // The callback has already run, or was never registered. + if (executing_id_ != id) { + return; + } + + // The callback is executing on this thread (re-entrant call from + // within the callback itself): return without waiting to avoid + // deadlock. + if (executing_thread_ == std::this_thread::get_id()) { + return; + } + + // The callback is executing on another thread. Wait for it to + // finish. executing_id_ is set while the state lock is held — before + // unlocking for invocation — so there is no window where the callback + // is running but executing_id_ is not yet set. + executing_done_.wait(lock, [this, id] { return executing_id_ != id; }); + } + + // Invokes all registered callbacks in registration order, then clears the + // pending list. Callbacks are executed one at a time with the lock + // released during each invocation to prevent deadlocks. No-op if called + // more than once. + void Cancel() { + std::unique_lock lock(mutex_); + if (cancelled_) { + return; + } + cancelled_ = true; + + while (!callbacks_.empty()) { + // Extract the next entry while still holding the lock, then set + // executing_id_ before releasing. This ensures Deregister can + // never observe a window where the callback is running but + // executing_id_ is unset. + auto node = callbacks_.extract(callbacks_.begin()); + executing_id_ = node.key(); + executing_thread_ = std::this_thread::get_id(); + + lock.unlock(); + node.mapped()(); + lock.lock(); + + executing_id_ = kAlreadyCancelled; + lock.unlock(); + executing_done_.notify_all(); + lock.lock(); + } + } + + // Returns true if Cancel() has been called. + bool IsCancelled() const { + std::lock_guard lock(mutex_); + return cancelled_; + } + + private: + mutable std::mutex mutex_; + bool cancelled_ = false; + CallbackId next_id_ = 1; // Real IDs start at 1; 0 is kAlreadyCancelled. + std::map> callbacks_; + + // Tracks which callback (if any) is currently being invoked by Cancel(), + // and on which thread, to support the blocking destructor in Deregister. + CallbackId executing_id_ = kAlreadyCancelled; + std::thread::id executing_thread_; + std::condition_variable executing_done_; +}; + +class CancellationToken; + +// CancellationSource is the write end of a cancellation pair: call Cancel() +// to signal all operations holding tokens derived from this source. +// +// CancellationSource is copyable; copies share the same underlying +// CancellationState, matching the behaviour of C++20's stop_source. +class CancellationSource { + public: + CancellationSource() : state_(std::make_shared()) {} + + ~CancellationSource() = default; + CancellationSource(CancellationSource const&) = default; + CancellationSource& operator=(CancellationSource const&) = default; + CancellationSource(CancellationSource&&) = default; + CancellationSource& operator=(CancellationSource&&) = default; + + // Invokes all registered callbacks in registration order. No-op if called + // more than once. + void Cancel() { state_->Cancel(); } + + // Returns true if Cancel() has been called. + bool IsCancelled() const { return state_->IsCancelled(); } + + // Returns a token referring to this source's cancellation state. The + // token may be freely copied and passed to any number of + // CancellationCallbacks. + CancellationToken GetToken() const; + + private: + std::shared_ptr state_; +}; + +// CancellationToken is the read end of a cancellation pair. Tokens are +// obtained from CancellationSource::GetToken() and passed to +// CancellationCallback constructors to register callbacks. +// +// A default-constructed token has no associated state: any +// CancellationCallback constructed from it is never invoked. +// +// CancellationToken is cheap to copy; all copies share the same underlying +// CancellationState. +class CancellationToken { + public: + CancellationToken() = default; + + explicit CancellationToken(std::shared_ptr state) + : state_(std::move(state)) {} + + // Returns true if the associated source has been cancelled, or false if + // there is no associated source. + bool IsCancelled() const { return state_ && state_->IsCancelled(); } + + private: + std::shared_ptr state_; + + friend class CancellationCallback; +}; + +inline CancellationToken CancellationSource::GetToken() const { + return CancellationToken(state_); +} + +// CancellationCallback registers a callback to be invoked when the associated +// CancellationSource is cancelled. The callback is invoked on whichever thread +// calls CancellationSource::Cancel(). +// +// The design follows C++20's std::stop_callback: +// +// - Constructing a CancellationCallback registers the callback. If the +// source was already cancelled, the callback is invoked immediately in the +// constructor. +// - Destroying a CancellationCallback deregisters the callback. If the +// callback is currently executing on another thread, the destructor blocks +// until execution completes, preventing use-after-free of anything captured +// by the callback. +// - CancellationCallback is non-copyable and non-movable, matching C++20's +// stop_callback. +class CancellationCallback { + public: + CancellationCallback(CancellationToken token, Continuation cb) + : state_(token.state_), + id_(state_ ? state_->Register(std::move(cb)) + : CancellationState::kAlreadyCancelled) {} + + ~CancellationCallback() { + if (state_) { + state_->Deregister(id_); + } + } + + CancellationCallback(CancellationCallback const&) = delete; + CancellationCallback& operator=(CancellationCallback const&) = delete; + CancellationCallback(CancellationCallback&&) = delete; + CancellationCallback& operator=(CancellationCallback&&) = delete; + + private: + std::shared_ptr state_; + CancellationState::CallbackId id_; +}; + +} // namespace launchdarkly::async diff --git a/libs/internal/include/launchdarkly/async/promise.hpp b/libs/internal/include/launchdarkly/async/promise.hpp index 198e84fdb..030f94680 100644 --- a/libs/internal/include/launchdarkly/async/promise.hpp +++ b/libs/internal/include/launchdarkly/async/promise.hpp @@ -54,7 +54,9 @@ class Continuation { // function pointer, or other callable; it need not be copy-constructible. // F&& is a forwarding reference: accepts any callable by move or copy, // then moves it into Impl so Continuation itself owns the callable. - template + template , Continuation>>> Continuation(F&& f) : impl_(std::make_unique>>(std::forward(f))) {} Continuation(Continuation&&) = default; diff --git a/libs/internal/include/launchdarkly/async/timer.hpp b/libs/internal/include/launchdarkly/async/timer.hpp index ce62889e0..acc140663 100644 --- a/libs/internal/include/launchdarkly/async/timer.hpp +++ b/libs/internal/include/launchdarkly/async/timer.hpp @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include +#include #include #include @@ -15,17 +17,55 @@ namespace launchdarkly::async { // Returns a Future that resolves once the given duration elapses. // The future resolves with true if the timer fired normally, or false if // the timer was cancelled before it expired. +// +// If a CancellationToken is provided, cancelling the associated +// CancellationSource cancels the timer, resolving the future with false. template Future Delay(boost::asio::any_io_executor executor, - std::chrono::duration duration) { + std::chrono::duration duration, + CancellationToken token = {}) { auto timer = std::make_shared(executor); timer->expires_after(duration); Promise promise; auto future = promise.GetFuture(); - timer->async_wait([p = std::move(promise), - timer](boost::system::error_code code) mutable { + + // This code is tricky because there are a few constraints that conflict. + // 1. We need to make sure timer->cancel isn't called _before_ + // timer->async_wait, or else it'll just be ignored. + // 2. The cancellation_callback has to be created _before_ + // timer->async_wait, because it has to be captured by async_wait's + // handler, because it is an RAII type, and once it is destroyed, it + // deregisters itself. It has to stay alive as long as the timer needs to + // be cancellable. + + Promise timer_started_promise; + Future timer_started_future = + timer_started_promise.GetFuture(); + + auto cancel_timer = [timer, executor, + timer_started_future = + std::move(timer_started_future)]() mutable { + timer_started_future.Then( + [timer](auto const&) -> std::monostate { + timer->cancel(); + return {}; + }, + [executor](Continuation f) { + boost::asio::post(executor, std::move(f)); + }); + }; + + auto cancellation_callback = + std::make_shared(std::move(token), cancel_timer); + + timer->async_wait([p = std::move(promise), timer, cancellation_callback]( + boost::system::error_code code) mutable { + cancellation_callback.reset(); p.Resolve(code != boost::asio::error::operation_aborted); }); + + timer_started_promise.Resolve({}); + return future; } diff --git a/libs/internal/tests/cancellation_test.cpp b/libs/internal/tests/cancellation_test.cpp new file mode 100644 index 000000000..b8c858600 --- /dev/null +++ b/libs/internal/tests/cancellation_test.cpp @@ -0,0 +1,241 @@ +#include + +#include + +#include +#include +#include +#include + +#include "launchdarkly/async/cancellation.hpp" +#include "launchdarkly/async/timer.hpp" + +using namespace launchdarkly::async; + +// A default-constructed token has no associated source, so a callback +// registered on it is never invoked. +TEST(Cancellation, DefaultToken_CallbackNeverInvoked) { + CancellationToken token; + + bool invoked = false; + CancellationCallback cb(token, [&invoked] { invoked = true; }); + + EXPECT_FALSE(invoked); + EXPECT_FALSE(token.IsCancelled()); +} + +// Cancelling before a callback is registered invokes the callback immediately +// inside the CancellationCallback constructor. +TEST(Cancellation, Cancel_BeforeRegistration_InvokesImmediately) { + CancellationSource source; + source.Cancel(); + + bool invoked = false; + CancellationCallback cb(source.GetToken(), [&invoked] { invoked = true; }); + + EXPECT_TRUE(invoked); +} + +// The normal case: cancel after registration invokes the callback. +TEST(Cancellation, Cancel_AfterRegistration_InvokesCallback) { + CancellationSource source; + + bool invoked = false; + CancellationCallback cb(source.GetToken(), [&invoked] { invoked = true; }); + + EXPECT_FALSE(invoked); + source.Cancel(); + EXPECT_TRUE(invoked); +} + +// Cancelling more than once is a no-op: callbacks are invoked exactly once. +TEST(Cancellation, Cancel_Idempotent_CallbackInvokedOnce) { + CancellationSource source; + + int count = 0; + CancellationCallback cb(source.GetToken(), [&count] { count++; }); + + source.Cancel(); + source.Cancel(); + source.Cancel(); + + EXPECT_EQ(count, 1); +} + +// Destroying a CancellationCallback before Cancel() deregisters it; the +// callback is not invoked when Cancel() is later called. +TEST(Cancellation, Deregister_BeforeCancel_CallbackNotInvoked) { + CancellationSource source; + + bool invoked = false; + { + CancellationCallback cb(source.GetToken(), + [&invoked] { invoked = true; }); + } + + source.Cancel(); + + EXPECT_FALSE(invoked); +} + +// Destroying a CancellationCallback after Cancel() has already run does not +// crash or block; it is a no-op. +TEST(Cancellation, Deregister_AfterCancel_NoOp) { + CancellationSource source; + CancellationToken token = source.GetToken(); + + auto cb = std::make_unique(token, [] {}); + source.Cancel(); + cb.reset(); // should not crash or block +} + +// All registered callbacks are invoked when Cancel() is called. +TEST(Cancellation, MultipleCallbacks_AllInvoked) { + CancellationSource source; + CancellationToken token = source.GetToken(); + + int a = 0, b = 0, c = 0; + CancellationCallback cb1(token, [&a] { a++; }); + CancellationCallback cb2(token, [&b] { b++; }); + CancellationCallback cb3(token, [&c] { c++; }); + + source.Cancel(); + + EXPECT_EQ(a, 1); + EXPECT_EQ(b, 1); + EXPECT_EQ(c, 1); +} + +// Only callbacks that have not been deregistered are invoked. +TEST(Cancellation, PartialDeregister_OnlyActiveCallbacksInvoked) { + CancellationSource source; + CancellationToken token = source.GetToken(); + + int a = 0, b = 0, c = 0; + CancellationCallback cb1(token, [&a] { a++; }); + auto cb2 = std::make_unique(token, [&b] { b++; }); + CancellationCallback cb3(token, [&c] { c++; }); + + cb2.reset(); + source.Cancel(); + + EXPECT_EQ(a, 1); + EXPECT_EQ(b, 0); + EXPECT_EQ(c, 1); +} + +// IsCancelled reflects the state of the source correctly. +TEST(Cancellation, IsCancelled_BeforeAndAfterCancel) { + CancellationSource source; + CancellationToken token = source.GetToken(); + + EXPECT_FALSE(source.IsCancelled()); + EXPECT_FALSE(token.IsCancelled()); + + source.Cancel(); + + EXPECT_TRUE(source.IsCancelled()); + EXPECT_TRUE(token.IsCancelled()); +} + +// Copies of a CancellationSource share the same underlying state. +TEST(Cancellation, SourceCopyable_CopiesShareState) { + CancellationSource source1; + CancellationSource source2 = source1; + + bool invoked = false; + CancellationCallback cb(source1.GetToken(), [&invoked] { invoked = true; }); + + source2.Cancel(); + + EXPECT_TRUE(invoked); + EXPECT_TRUE(source1.IsCancelled()); +} + +// Copies of a CancellationToken share the same underlying state. +TEST(Cancellation, TokenCopyable_CopiesShareState) { + CancellationSource source; + CancellationToken token1 = source.GetToken(); + CancellationToken token2 = token1; + + bool invoked = false; + CancellationCallback cb(token2, [&invoked] { invoked = true; }); + + source.Cancel(); + + EXPECT_TRUE(invoked); + EXPECT_TRUE(token1.IsCancelled()); +} + +// If a CancellationCallback is destroyed while its callback is executing on +// another thread, the destructor blocks until execution completes. This +// guarantees that anything captured by the callback remains valid for its +// entire execution. +TEST(Cancellation, DestructorBlocks_WhileCallbackExecuting) { + CancellationSource source; + + std::mutex mtx; + std::condition_variable cv; + bool callback_started = false; + bool callback_completed = false; + + auto cb = std::make_unique(source.GetToken(), [&] { + { + std::lock_guard lk(mtx); + callback_started = true; + } + cv.notify_one(); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + callback_completed = true; + }); + + std::thread t([&source] { source.Cancel(); }); + + { + std::unique_lock lk(mtx); + cv.wait(lk, [&] { return callback_started; }); + } + + // Destroy cb while the callback is sleeping on thread t. The destructor + // should block until the callback sets callback_completed. + cb.reset(); + + EXPECT_TRUE(callback_completed); + + t.join(); +} + +// Destroying a CancellationCallback from within another callback on the same +// thread does not deadlock. This covers the re-entrant path in Deregister. +TEST(Cancellation, SameThreadDeregister_NoDeadlock) { + CancellationSource source; + CancellationToken token = source.GetToken(); + + std::unique_ptr cb2; + CancellationCallback cb1(token, [&cb2] { cb2.reset(); }); + cb2 = std::make_unique(token, [] {}); + + source.Cancel(); // cb1 fires, destroys cb2 from the same thread +} + +TEST(Cancellation, TimerCancelDoesNotRecurseInContinuationConstructor) { + boost::asio::io_context ioc; + auto work = boost::asio::make_work_guard(ioc); + std::thread t([&] { ioc.run(); }); + + CancellationSource cancel; + + auto future = + Delay(ioc.get_executor(), std::chrono::seconds(60), cancel.GetToken()); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + cancel.Cancel(); + + auto const result = future.WaitForResult(std::chrono::seconds(5)); + + ASSERT_TRUE(result.has_value()); + EXPECT_FALSE(*result); + + work.reset(); + t.join(); +} diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp index a8dabaa54..c052d4827 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp @@ -54,11 +54,13 @@ FDv2SourceResult FDv2PollingSynchronizer::State::HandlePollResult( } async::Future FDv2PollingSynchronizer::State::Delay( - std::chrono::nanoseconds duration) { - return async::Delay(executor_, duration); + std::chrono::nanoseconds duration, + async::CancellationToken token) { + return async::Delay(executor_, duration, std::move(token)); } -async::Future FDv2PollingSynchronizer::State::CreatePollDelayFuture() { +async::Future FDv2PollingSynchronizer::State::CreatePollDelayFuture( + async::CancellationToken token) { std::lock_guard lock(mutex_); if (!last_poll_start_) { return async::MakeFuture(true); @@ -68,7 +70,7 @@ async::Future FDv2PollingSynchronizer::State::CreatePollDelayFuture() { if (elapsed >= poll_interval_) { return async::MakeFuture(true); } - return Delay(poll_interval_ - elapsed); + return Delay(poll_interval_ - elapsed, std::move(token)); } void FDv2PollingSynchronizer::State::RecordPollStarted() { @@ -126,20 +128,22 @@ std::string const& FDv2PollingSynchronizer::Identity() const { FDv2SourceResult{FDv2SourceResult::Shutdown{}}); } + async::CancellationSource cancel; auto now = std::chrono::steady_clock::now(); auto timeout_deadline = now + timeout; - auto timeout_future = state->Delay(timeout); + auto timeout_future = state->Delay(timeout, cancel.GetToken()); // Figure out how much to delay before starting. - auto delay_future = state->CreatePollDelayFuture(); + auto delay_future = state->CreatePollDelayFuture(cancel.GetToken()); return async::WhenAny(closed, std::move(timeout_future), std::move(delay_future)) .Then( [state = std::move(state), closed = std::move(closed), - timeout_deadline, - selector = std::move(selector)](std::size_t const& idx) mutable + timeout_deadline, selector = std::move(selector), + cancel = std::move(cancel)](std::size_t const& idx) mutable -> async::Future { + cancel.Cancel(); if (idx == 0) { return async::MakeFuture( FDv2SourceResult{FDv2SourceResult::Shutdown{}}); @@ -166,15 +170,21 @@ std::string const& FDv2PollingSynchronizer::Identity() const { state->RecordPollStarted(); + async::CancellationSource cancel; auto now = std::chrono::steady_clock::now(); - auto timeout_future = state->Delay(timeout_deadline - now); + auto timeout_future = + state->Delay(timeout_deadline - now, cancel.GetToken()); + + // TODO: pass cancel.GetToken() to Request() once HTTP requests support it. auto http_future = state->Request(selector); return async::WhenAny(std::move(closed), std::move(timeout_future), http_future) .Then( - [state = std::move(state), http_future = std::move(http_future)]( - std::size_t const& idx) -> FDv2SourceResult { + [state = std::move(state), http_future = std::move(http_future), + cancel = std::move(cancel)]( + std::size_t const& idx) mutable -> FDv2SourceResult { + cancel.Cancel(); if (idx == 0) { return FDv2SourceResult{FDv2SourceResult::Shutdown{}}; } diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp index 42f20bfe1..2a8bd3231 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp @@ -2,6 +2,7 @@ #include "../../data_interfaces/source/ifdv2_synchronizer.hpp" +#include #include #include #include @@ -76,14 +77,19 @@ class FDv2PollingSynchronizer final network::HttpResult const& res); /** Returns a Future that resolves when it is time to start the next - * poll. */ - async::Future CreatePollDelayFuture(); + * poll. If a token is provided and cancelled before the delay elapses, + * the future resolves early with false. */ + async::Future CreatePollDelayFuture( + async::CancellationToken token = {}); /** Records that a poll has started, for interval scheduling. */ void RecordPollStarted(); - /** Returns a Future that resolves after the given duration. */ - async::Future Delay(std::chrono::nanoseconds duration); + /** Returns a Future that resolves after the given duration. If a token + * is provided and cancelled before the duration elapses, the future + * resolves early with false. */ + async::Future Delay(std::chrono::nanoseconds duration, + async::CancellationToken token = {}); private: // Logger is itself thread-safe.