diff --git a/libs/internal/include/launchdarkly/async/promise.hpp b/libs/internal/include/launchdarkly/async/promise.hpp new file mode 100644 index 000000000..ffdc66b87 --- /dev/null +++ b/libs/internal/include/launchdarkly/async/promise.hpp @@ -0,0 +1,531 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace launchdarkly::async { + +// Continuation is a move-only type-erased callable, effectively a polyfill +// for C++23's std::move_only_function. It exists because C++17's std::function +// requires all captured variables to be copy-constructible, which prevents +// storing lambdas that capture move-only types. +// +// Continuation is used to represent units of work passed to executors. An +// executor is a callable with signature void(Continuation) that +// schedules the work to run somewhere — for example, on an ASIO io_context: +// +// auto executor = [&ioc](Continuation work) { +// boost::asio::post(ioc, std::move(work)); +// }; +// +// The primary template is declared but not defined; only the partial +// specialization below (which splits Sig into R and Args...) is usable. +// This lets callers write Continuation instead of Continuation. +template +class Continuation; + +template +class Continuation { + // Base and Impl form a classic type-erasure pair. Base is a non-template + // abstract interface stored via unique_ptr, giving a stable type regardless + // of F. Impl is the concrete template subclass that holds and calls F. + struct Base { + virtual R call(Args...) = 0; + virtual ~Base() = default; + }; + + template + struct Impl : Base { + F f; + Impl(F f) : f(std::move(f)) {} + R call(Args... args) override { return f(std::forward(args)...); } + }; + + std::unique_ptr impl_; + + public: + // Constructs a Continuation from any callable F. F may be a lambda, + // function pointer, or other callable; it need not be copy-constructible. + // F&& is a forwarding reference: accepts any callable by move or copy, + // then moves it into Impl so Continuation itself owns the callable. + template + Continuation(F&& f) + : impl_(std::make_unique>>(std::forward(f))) {} + Continuation(Continuation&&) = default; + Continuation& operator=(Continuation&&) = default; + + // Invokes the stored callable with the given arguments. + // Returns whatever the callable returns. + R operator()(Args... args) const { + return impl_->call(std::forward(args)...); + } +}; + +template +class Promise; + +template +class Future; + +// Type trait to detect whether a type is a Future. The primary template +// defaults to false; the partial specialization matches Future specifically. +template +struct is_future : std::false_type {}; + +template +struct is_future> : std::true_type {}; + +// Type trait to extract T from Future. Only the partial specialization is +// defined, so using future_value on a non-Future type is a compile error. +template +struct future_value; + +template +struct future_value> { + using type = T; +}; + +// PromiseInternal holds the shared state between a Promise and its associated +// Futures: the result value, the mutex protecting it, and the list of +// continuations waiting to run when the result is set. +// +// This is an internal class and not intended to be used directly. Promise and +// Future each hold a shared_ptr, which lets multiple Future +// copies all refer to the same underlying state, and lets Promise and Future +// have independent lifetimes while the shared state remains alive as long as +// either end holds it. +template +class PromiseInternal { + public: + PromiseInternal() = default; + ~PromiseInternal() = default; + PromiseInternal(PromiseInternal const&) = delete; + PromiseInternal& operator=(PromiseInternal const&) = delete; + PromiseInternal(PromiseInternal&&) = delete; + PromiseInternal& operator=(PromiseInternal&&) = delete; + + // Sets the result and schedules all registered continuations via their + // executors. Returns true if the result was set, or false if it was already + // set by a previous call to Resolve. + bool Resolve(T result) { + std::vector> to_call; + { + std::lock_guard lock(mutex_); + if (result_.has_value()) { + return false; + } + + // Move result into storage if possible; otherwise copy. + if constexpr (std::is_move_constructible_v) { + result_ = std::move(result); + } else { + result_ = result; + } + to_call = std::move(continuations_); + } + + // Call continuations outside the lock so that continuations which + // re-enter this future (e.g. via GetResult or Then) don't deadlock. + for (auto& continuation : to_call) { + // It's safe to access result_ outside the lock here, because it + // can't be changed again. + continuation(*result_); + } + + return true; + } + + // Returns true if Resolve has been called. + bool IsFinished() const { + std::lock_guard lock(mutex_); + return result_.has_value(); + } + + // Returns a copy of the result, if resolved. + std::optional GetResult() const { + std::lock_guard lock(mutex_); + return result_; + } + + // Then where the continuation returns R directly, yielding Future. + template , + // Disable when R is a Future so the flattening overload wins + // instead. + typename = std::enable_if_t::value>> + Future Then(F&& continuation, + std::function)> executor) { + Promise newPromise; + Future newFuture = newPromise.GetFuture(); + + std::optional already_resolved; + { + std::lock_guard lock(mutex_); + + if (result_.has_value()) { + already_resolved = result_; + } else { + continuations_.push_back( + [newPromise = std::move(newPromise), + continuation = std::move(continuation), + executor](T const& result) mutable { + executor(Continuation( + [newPromise = std::move(newPromise), + continuation = std::move(continuation), + result]() mutable { + newPromise.Resolve(continuation(result)); + })); + }); + return newFuture; + } + } + + // Already resolved: call executor outside the lock so that + // continuations which re-enter this future don't deadlock. + executor(Continuation( + [newPromise = std::move(newPromise), + continuation = std::move(continuation), + result = std::move(already_resolved)]() mutable { + newPromise.Resolve(continuation(*result)); + })); + return newFuture; + } + + // Then where the continuation returns Future (i.e. R = Future), + // yielding a flattened Future that resolves when the inner future does. + template , + // Unwrap Future -> T2 so the return type is Future, not + // Future>. + typename T2 = typename future_value::type, + // Only enabled when R is a Future; otherwise the direct overload + // wins. + typename = std::enable_if_t::value>> + Future Then(F&& continuation, + std::function)> executor) { + Promise outerPromise; + Future outerFuture = outerPromise.GetFuture(); + + auto do_work = [outerPromise = std::move(outerPromise), + continuation = std::move(continuation), + executor](T const& val) mutable { + Future innerFuture = continuation(val); + innerFuture.Then( + [outerPromise = std::move(outerPromise)]( + T2 const& inner_val) mutable -> std::monostate { + outerPromise.Resolve(inner_val); + return {}; + }, + executor); + }; + + std::optional already_resolved; + { + std::lock_guard lock(mutex_); + if (result_.has_value()) { + already_resolved = result_; + } else { + continuations_.push_back([do_work = std::move(do_work), + executor](T const& result) mutable { + executor(Continuation( + [do_work = std::move(do_work), result]() mutable { + do_work(result); + })); + }); + return outerFuture; + } + } + + // Already resolved: call executor outside the lock so that + // continuations which re-enter this future don't deadlock. + executor(Continuation( + [do_work = std::move(do_work), + result = std::move(already_resolved)]() mutable { + do_work(*result); + })); + return outerFuture; + } + + private: + mutable std::mutex mutex_; + std::optional result_{}; + std::vector> continuations_; +}; + +// Promise is the write end of a one-shot async value, similar to std::promise. +// Create a Promise, hand its Future to a consumer via GetFuture(), then +// call Resolve() exactly once to deliver the value. +// +// Promise is move-only: it cannot be copied, but it can be moved. This +// prevents accidentally resolving the same promise from two places. +// +// Using tl::expected as T is the recommended way to represent +// operations that may fail: +// +// Promise> promise; +// Future> future = promise.GetFuture(); +// // ... hand future to a consumer, then later: +// promise.Resolve(42); // success +// promise.Resolve(tl::unexpected("timed out")); // failure +template +class Promise { + public: + Promise() : internal_(std::make_shared>()) {} + ~Promise() = default; + Promise(Promise const&) = delete; + Promise& operator=(Promise const&) = delete; + Promise(Promise&&) = default; + Promise& operator=(Promise&&) = default; + + // Sets the result to the given value and schedules any continuations that + // were registered via Future::Then. Returns true if the result was set, or + // false if Resolve was already called. + bool Resolve(T result) { + if constexpr (std::is_move_constructible_v) { + return internal_->Resolve(std::move(result)); + } else { + return internal_->Resolve(result); + } + } + + // Returns a Future that will resolve when this Promise is resolved. + // May be called multiple times; each call returns a Future referring to + // the same underlying state. + Future GetFuture() { return Future(internal_); } + + private: + std::shared_ptr> internal_; +}; + +// Future is the read end of a one-shot async value, similar to std::future, +// but with support for chaining via Then. +// +// A Future is obtained from Promise::GetFuture(). Multiple copies of a Future +// may exist and all refer to the same underlying result. When the associated +// Promise is resolved, all continuations registered via Then are scheduled. +// +// Unlike std::future, Future does not support blocking on the result directly. +// Instead, use Then to attach work that runs once the value is available. +// +// Example using tl::expected to represent a fallible async operation: +// +// boost::asio::io_context ioc; +// auto executor = [&ioc](Continuation work) { +// boost::asio::post(ioc, std::move(work)); +// }; +// +// Future> result = future.Then( +// [](tl::expected const& val) +// -> tl::expected { +// if (!val) return tl::unexpected(val.error()); +// return *val * 1.5f; +// }, +// executor); +template +class Future { + public: + Future(std::shared_ptr> internal) + : internal_(std::move(internal)) {} + ~Future() = default; + Future(Future const&) = default; + Future& operator=(Future const&) = default; + Future(Future&&) = default; + Future& operator=(Future&&) = default; + + // Returns true if the associated Promise has been resolved. + bool IsFinished() const { return internal_->IsFinished(); } + + // Returns a copy of the result, if resolved. + std::optional GetResult() const { return internal_->GetResult(); } + + // Blocks the calling thread until the future resolves or the timeout + // expires. Returns a copy of the result, if resolved within the timeout. + template + std::optional WaitForResult(std::chrono::duration timeout) { + struct State { + std::mutex mutex; + std::condition_variable cv; + bool ready = false; + }; + auto state = std::make_shared(); + + // The continuation ignores T entirely. The executor signals the cv + // when called, since being called means the original future has + // resolved. + Then([](T const&) { return std::monostate{}; }, + [state](Continuation work) { + { + std::lock_guard lock(state->mutex); + state->ready = true; + } + state->cv.notify_one(); + work(); + }); + + std::unique_lock lock(state->mutex); + state->cv.wait_for(lock, timeout, [&state] { return state->ready; }); + + return GetResult(); + } + + // Registers a continuation to run when this Future resolves, returning a + // new Future that resolves to the continuation's return value. + // + // Parameters: + // continuation - Called with the resolved T const& when this Future + // resolves. Must return a value of type R (not a Future). + // executor - Called with the work to schedule when this Future + // resolves. Controls where and when the continuation runs. + // + // Returns a Future that resolves to whatever the continuation returns. + template , + // Disable when R is a Future so the flattening overload wins + // instead. + typename = std::enable_if_t::value>> + Future Then(F&& continuation, + std::function)> executor) { + return internal_->Then(std::forward(continuation), + std::move(executor)); + } + + // Registers a continuation to run when this Future resolves, where the + // continuation itself returns a Future. Returns a flattened Future + // that resolves when the inner future does, avoiding Future>. + // Use this overload to chain async operations that themselves return a + // Future, avoiding a nested Future>: + // + // Future> result = future.Then( + // [](tl::expected const& key) { + // return fetch(key); // fetch returns Future> + // }, + // executor); + // + // Parameters: + // continuation - Called with the resolved T const& when this Future + // resolves. Must return a Future. + // executor - Called with the work to schedule when this Future + // resolves, and again when the inner Future resolves. + // + // Returns a Future that resolves when the inner Future resolves. + template , + // Unwrap Future -> T2 so the return type is Future, not + // Future>. + typename T2 = typename future_value::type, + // Only enabled when R is a Future; otherwise the direct overload + // wins. + typename = std::enable_if_t::value>> + Future Then(F&& continuation, + std::function)> executor) { + return internal_->Then(std::forward(continuation), + std::move(executor)); + } + + private: + std::shared_ptr> internal_; +}; + +// WhenAll takes a variadic list of Futures (each with potentially different +// value types) and returns a Future that resolves once all +// of the input futures have resolved. The result carries no value; callers +// who need the individual results can read them from their original futures +// after WhenAll resolves. +// +// If called with no arguments, the returned future is already resolved. +// +// Example: +// +// Future f1 = ...; +// Future f2 = ...; +// WhenAll(f1, f2).Then( +// [&](std::monostate const&) { +// // f1 and f2 are both finished here. +// use(f1.GetResult().value(), f2.GetResult().value()); +// return std::monostate{}; +// }, +// executor); +template +Future WhenAll(Future... futures) { + Promise promise; + Future result = promise.GetFuture(); + + if constexpr (sizeof...(Ts) == 0) { + promise.Resolve(std::monostate{}); + return result; + } + + auto shared_promise = + std::make_shared>(std::move(promise)); + auto count = std::make_shared>(sizeof...(Ts)); + + auto attach = [&](auto future) { + future.Then( + [shared_promise, count](auto const&) -> std::monostate { + if (count->fetch_sub(1) == 1) { + shared_promise->Resolve(std::monostate{}); + } + return std::monostate{}; + }, + [](Continuation f) { f(); }); + }; + + (attach(futures), ...); + + return result; +} + +// WhenAny takes a variadic list of Futures (each with potentially different +// value types) and returns a Future that resolves with the +// 0-based index of whichever input future resolves first. The caller can use +// the index to identify the winning future and read its result directly. +// +// If called with no arguments, the returned future never resolves. +// +// Example: +// +// Future f0 = ...; +// Future f1 = ...; +// WhenAny(f0, f1).Then( +// [&](std::size_t const& index) { +// if (index == 0) use(f0.GetResult().value()); +// else use(f1.GetResult().value()); +// return std::monostate{}; +// }, +// executor); +template +Future WhenAny(Future... futures) { + Promise promise; + Future result = promise.GetFuture(); + + auto shared_promise = + std::make_shared>(std::move(promise)); + + std::size_t index = 0; + auto attach = [&](auto future) { + std::size_t i = index++; + future.Then( + [shared_promise, i](auto const&) -> std::monostate { + shared_promise->Resolve(i); + return std::monostate{}; + }, + [](Continuation f) { f(); }); + }; + + (attach(futures), ...); + + return result; +} + +} // namespace launchdarkly::async diff --git a/libs/internal/src/CMakeLists.txt b/libs/internal/src/CMakeLists.txt index 44600638b..2fffa42b3 100644 --- a/libs/internal/src/CMakeLists.txt +++ b/libs/internal/src/CMakeLists.txt @@ -1,6 +1,7 @@ file(GLOB HEADER_LIST CONFIGURE_DEPENDS "${LaunchDarklyInternalSdk_SOURCE_DIR}/include/launchdarkly/*.hpp" + "${LaunchDarklyInternalSdk_SOURCE_DIR}/include/launchdarkly/async/*.hpp" "${LaunchDarklyInternalSdk_SOURCE_DIR}/include/launchdarkly/events/*.hpp" "${LaunchDarklyInternalSdk_SOURCE_DIR}/include/launchdarkly/network/*.hpp" "${LaunchDarklyInternalSdk_SOURCE_DIR}/include/launchdarkly/serialization/*.hpp" diff --git a/libs/internal/tests/promise_test.cpp b/libs/internal/tests/promise_test.cpp new file mode 100644 index 000000000..d338bcabd --- /dev/null +++ b/libs/internal/tests/promise_test.cpp @@ -0,0 +1,534 @@ +#include +#include +#include +#include + +#include + +#include "launchdarkly/async/promise.hpp" + +using namespace launchdarkly::async; + +TEST(Promise, SimplePromise) { + Promise promise; + Future future = promise.GetFuture(); + + Future future2 = future.Then( + [](int const& inner) { return static_cast(inner * 2.0); }, + [](Continuation f) { f(); }); + + promise.Resolve(43); + + auto result = future2.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(result.has_value()); + EXPECT_FLOAT_EQ(*result, 86.0f); +} + +TEST(Promise, ASIOTest) { + boost::asio::io_context ioc; + auto work = boost::asio::make_work_guard(ioc); + std::thread ioc_thread([&]() { ioc.run(); }); + + Promise promise; + Future future = promise.GetFuture(); + + Future future2 = future.Then( + [](int const& inner) { return static_cast(inner * 2.0); }, + [&ioc](Continuation f) { + boost::asio::post(ioc, [f = std::move(f)]() mutable { f(); }); + }); + + promise.Resolve(42); + + auto result = future2.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(result.has_value()); + EXPECT_FLOAT_EQ(*result, 84.0f); + + work.reset(); + ioc_thread.join(); +} + +TEST(Promise, GetResultNotFinished) { + Promise promise; + Future future = promise.GetFuture(); + + auto result = future.GetResult(); + EXPECT_FALSE(result.has_value()); +} + +TEST(Promise, GetResultFinished) { + Promise promise; + Future future = promise.GetFuture(); + + promise.Resolve(42); + + auto result = future.GetResult(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +// Verifies that a type which is move-assignable but not move-constructible +// can still be used as T. The optional assignment for an empty optional needs +// move-constructibility, not move-assignability, so the if constexpr branch +// must check the correct trait. +TEST(Promise, MoveAssignableNotMoveConstructible) { + struct MoveAssignableOnly { + int value; + explicit MoveAssignableOnly(int v) : value(v) {} + MoveAssignableOnly(MoveAssignableOnly const&) = default; + MoveAssignableOnly& operator=(MoveAssignableOnly const&) = default; + MoveAssignableOnly(MoveAssignableOnly&&) = delete; + MoveAssignableOnly& operator=(MoveAssignableOnly&&) = default; + }; + + Promise promise; + Future future = promise.GetFuture(); + + promise.Resolve(MoveAssignableOnly{42}); + + auto result = future.GetResult(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->value, 42); +} + +TEST(Promise, CopyOnlyResult) { + struct CopyOnly { + int value; + explicit CopyOnly(int v) : value(v) {} + CopyOnly(CopyOnly const&) = default; + CopyOnly& operator=(CopyOnly const&) = default; + CopyOnly(CopyOnly&&) = delete; + CopyOnly& operator=(CopyOnly&&) = delete; + }; + + Promise promise; + Future future = promise.GetFuture(); + + promise.Resolve(CopyOnly{42}); + + auto result = future.GetResult(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->value, 42); +} + +TEST(Promise, ContinueByReturningFuture) { + Promise promise1; + Promise promise2; + Future future2 = promise2.GetFuture(); + + Future chained = + promise1.GetFuture().Then([future2](int const&) { return future2; }, + [](Continuation f) { f(); }); + + promise1.Resolve(0); + promise2.Resolve(42); + + auto result = chained.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +TEST(Promise, ResolvedBeforeContinuation) { + Promise promise; + Future future = promise.GetFuture(); + + promise.Resolve(21); + + Future future2 = future.Then([](int const& val) { return val * 2; }, + [](Continuation f) { f(); }); + + auto result = future2.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +TEST(Promise, ResolvedAfterContinuation) { + Promise promise; + Future future = promise.GetFuture(); + + Future future2 = future.Then([](int const& val) { return val * 2; }, + [](Continuation f) { f(); }); + + promise.Resolve(21); + + auto result = future2.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +// Verifies that a continuation which captures a copy-only type (deleted move +// constructor) can still be registered and invoked correctly. +TEST(Promise, CopyOnlyCallback) { + struct CopyOnlyInt { + int value; + explicit CopyOnlyInt(int v) : value(v) {} + CopyOnlyInt(CopyOnlyInt const&) = default; + CopyOnlyInt& operator=(CopyOnlyInt const&) = default; + CopyOnlyInt(CopyOnlyInt&&) = delete; + CopyOnlyInt& operator=(CopyOnlyInt&&) = delete; + }; + + Promise promise; + Future future = promise.GetFuture(); + + CopyOnlyInt multiplier{2}; + Future future2 = future.Then( + [multiplier](int const& val) { return val * multiplier.value; }, + [](Continuation f) { f(); }); + + promise.Resolve(21); + + auto result = future2.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +// Verifies that a continuation which captures a move-only type (deleted copy +// constructor) can still be registered and invoked correctly. +TEST(Promise, MoveOnlyCallback) { + Promise promise; + Future future = promise.GetFuture(); + + auto captured = std::make_unique(2); + Future future2 = + future.Then([captured = std::move(captured)]( + int const& val) { return val * *captured; }, + [](Continuation f) { f(); }); + + promise.Resolve(21); + + auto result = future2.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +// Verifies that when a continuation is both moveable and copyable, Then stores +// it by moving rather than copying. +TEST(Promise, CallbackMovedWhenPossible) { + int copies = 0; + struct Counted { + int value; + int* copy_count; + explicit Counted(int v, int* c) : value(v), copy_count(c) {} + Counted(Counted const& o) : value(o.value), copy_count(o.copy_count) { + ++(*copy_count); + } + Counted& operator=(Counted const& o) { + value = o.value; + copy_count = o.copy_count; + ++(*copy_count); + return *this; + } + Counted(Counted&&) noexcept = default; + Counted& operator=(Counted&&) noexcept = default; + }; + + Promise promise; + Future future = promise.GetFuture(); + + Counted multiplier{2, &copies}; + copies = 0; // reset after construction + + future.Then([multiplier = std::move(multiplier)]( + int const& val) mutable { return val * multiplier.value; }, + [](Continuation f) { f(); }); + + EXPECT_EQ(copies, 0); + + promise.Resolve(21); +} + +// Demonstrates using std::monostate as a void-like result type for +// fire-and-forget async operations where the completion matters but no value is +// produced. +TEST(Promise, MonostateVoidLike) { + Promise promise; + Future future = promise.GetFuture(); + + bool ran = false; + future.Then( + [&ran](std::monostate const&) { + ran = true; + return std::monostate{}; + }, + [](Continuation f) { f(); }); + + promise.Resolve(std::monostate{}); + + auto result = future.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(ran); +} + +// Demonstrates using tl::expected as T to represent a fallible async operation +// that resolves successfully. +TEST(Promise, ExpectedSuccess) { + Promise> promise; + Future> future = promise.GetFuture(); + + promise.Resolve(42); + + auto result = future.GetResult(); + ASSERT_TRUE(result.has_value()); + ASSERT_TRUE(result->has_value()); + EXPECT_EQ(**result, 42); +} + +// Demonstrates using tl::expected as T to represent a fallible async operation +// that resolves with an error. +TEST(Promise, ExpectedFailure) { + Promise> promise; + Future> future = promise.GetFuture(); + + promise.Resolve(tl::unexpected(std::string("timed out"))); + + auto result = future.GetResult(); + ASSERT_TRUE(result.has_value()); + ASSERT_FALSE(result->has_value()); + EXPECT_EQ(result->error(), "timed out"); +} + +// Verifies that Promise supports move assignment, consistent with it being +// move-only. +TEST(Promise, MoveAssignment) { + Promise p1; + Future future = p1.GetFuture(); + + Promise p2; + p2 = std::move(p1); + p2.Resolve(42); + + EXPECT_EQ(*future.GetResult(), 42); +} + +// Verifies that Future supports move assignment. +TEST(Promise, FutureMoveAssignment) { + Promise promise; + Future f1 = promise.GetFuture(); + Future f2 = promise.GetFuture(); + + f2 = std::move(f1); + promise.Resolve(42); + + EXPECT_EQ(*f2.GetResult(), 42); +} + +// Verifies that Future supports copy assignment. +TEST(Promise, FutureCopyAssignment) { + Promise promise; + Future f1 = promise.GetFuture(); + Future f2 = promise.GetFuture(); + + f2 = f1; + promise.Resolve(42); + + EXPECT_EQ(*f2.GetResult(), 42); +} + +// Verifies that a Continuation can be constructed from an lvalue callable +// (named lambda), not just from a temporary. +TEST(Promise, LvalueLambdaContinuation) { + auto fn = [](int x) { return x * 2; }; + Continuation c(fn); + EXPECT_EQ(c(21), 42); +} + +TEST(WhenAll, NoFutures) { + Future result = WhenAll(); + EXPECT_TRUE(result.IsFinished()); +} + +// Verifies WhenAll resolves when all futures are already resolved. +TEST(WhenAll, AllAlreadyResolved) { + Promise p1; + Promise p2; + + p1.Resolve(1); + p2.Resolve("hello"); + + Future result = WhenAll(p1.GetFuture(), p2.GetFuture()); + auto r = result.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(r.has_value()); +} + +// Verifies WhenAll resolves only after all futures resolve, using futures of +// mixed value types. The original futures still hold their results afterward. +TEST(WhenAll, ResolvesAfterAll) { + Promise p1; + Promise p2; + + Future f1 = p1.GetFuture(); + Future f2 = p2.GetFuture(); + + Future result = WhenAll(f1, f2); + + EXPECT_FALSE(result.IsFinished()); + p1.Resolve(42); + EXPECT_FALSE(result.IsFinished()); + p2.Resolve("done"); + + auto r = result.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(*f1.GetResult(), 42); + EXPECT_EQ(*f2.GetResult(), "done"); +} + +// Verifies that WhenAny resolves with the index of the first future to resolve. +TEST(WhenAny, FirstResolved) { + Promise p0; + Promise p1; + Promise p2; + + Future result = + WhenAny(p0.GetFuture(), p1.GetFuture(), p2.GetFuture()); + + EXPECT_FALSE(result.IsFinished()); + p1.Resolve(42); + + auto r = result.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(*r, 1u); +} + +// Verifies that WhenAny works with futures of mixed value types, and that +// resolving a later future after the winner does not change the result. +TEST(WhenAny, MixedTypesFirstWins) { + Promise p0; + Promise p1; + + Future f0 = p0.GetFuture(); + Future f1 = p1.GetFuture(); + + Future> result = WhenAny(f0, f1).Then( + [f0, f1](size_t const& index) -> std::variant { + if (index == 0) { + return f0.GetResult().value(); + } else { + return f1.GetResult().value(); + } + }, + [](Continuation f) { f(); }); + + p1.Resolve("hello"); + p0.Resolve(99); + + auto r = result.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(std::get(*result.GetResult()), "hello"); +} + +// Verifies that WhenAny resolves immediately if a future is already resolved. +TEST(WhenAny, AlreadyResolved) { + Promise p0; + Promise p1; + + p0.Resolve(42); + + Future result = WhenAny(p0.GetFuture(), p1.GetFuture()); + + auto r = result.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(*r, 0u); +} + +TEST(WhenAll, ConcurrentResolution) { + auto spawn = [](int val) { + Promise p; + Future f = p.GetFuture(); + return std::make_pair( + f, std::thread([p = std::move(p), val]() mutable { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + p.Resolve(val); + })); + }; + + auto [f1, t1] = spawn(1); + auto [f2, t2] = spawn(2); + auto [f3, t3] = spawn(3); + auto [f4, t4] = spawn(4); + auto [f5, t5] = spawn(5); + + auto result = WhenAll(f1, f2, f3, f4, f5); + + auto r = result.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(r.has_value()); + + EXPECT_EQ(f1.GetResult().value(), 1); + EXPECT_EQ(f2.GetResult().value(), 2); + EXPECT_EQ(f3.GetResult().value(), 3); + EXPECT_EQ(f4.GetResult().value(), 4); + EXPECT_EQ(f5.GetResult().value(), 5); + + t1.join(); + t2.join(); + t3.join(); + t4.join(); + t5.join(); +} + +// Verifies that WaitForResult returns nullopt when the promise is never +// resolved within the timeout. +TEST(Promise, WaitForResultTimeout) { + Promise promise; + Future future = promise.GetFuture(); + + auto result = future.WaitForResult(std::chrono::milliseconds(50)); + EXPECT_FALSE(result.has_value()); +} + +// Verifies that when multiple threads race to resolve the same promise, +// exactly one succeeds and the rest return false. +TEST(Promise, ConcurrentResolveSingleWinner) { + Promise promise; + Future future = promise.GetFuture(); + + std::atomic winners{0}; + std::vector threads; + for (int i = 0; i < 10; i++) { + threads.emplace_back([&promise, &winners, i] { + if (promise.Resolve(i)) { + winners++; + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(winners, 1); + EXPECT_TRUE(future.GetResult().has_value()); +} + +// Verifies that a chain of Then calls executes correctly when continuations +// are dispatched via a multi-threaded ASIO executor. +TEST(Promise, MultiThreadedASIOExecutor) { + boost::asio::io_context ioc; + auto work = boost::asio::make_work_guard(ioc); + + std::vector ioc_threads; + for (int i = 0; i < 4; i++) { + ioc_threads.emplace_back([&ioc] { ioc.run(); }); + } + + auto executor = [&ioc](Continuation f) { + boost::asio::post(ioc, std::move(f)); + }; + + Promise promise; + Future result = + promise.GetFuture() + .Then([](int const& v) { return v * 2; }, executor) + .Then([](int const& v) { return v + 1; }, executor); + + promise.Resolve(10); + + auto r = result.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(*r, 21); + + work.reset(); + for (auto& t : ioc_threads) { + t.join(); + } +}