Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 134 additions & 108 deletions include/stdexec/__detail/__let.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "__variant.hpp"

#include <exception>
#include <type_traits>

namespace stdexec {
//////////////////////////////////////////////////////////////////////////////
Expand All @@ -41,6 +42,11 @@ namespace stdexec {
template <class _SetTag>
struct __let_t;

template <class _SetTag>
struct __let_tag {
using __t = _SetTag;
};

template <class _SetTag>
inline constexpr __mstring __in_which_let_msg{"In stdexec::let_value(Sender, Function)..."};

Expand Down Expand Up @@ -282,32 +288,84 @@ namespace stdexec {

//! The core of the operation state for `let_*`.
//! This gets bundled up into a larger operation state (`__detail::__op_state<...>`).
template <class _Receiver, class _Fun, class _SetTag, class _Env2, class... _Tuples>
template <class _SetTag, class _Sender, class _Fun, class _Receiver, class... _Tuples>
struct __let_state {
using __fun_t = _Fun;
using __env2_t = _Env2;
using __env_t = __join_env_t<_Env2, env_of_t<_Receiver>>;
using __rcvr_t = __receiver_with_env_t<_Receiver, _Env2>;
using __env2_t =
__let::__env2_t<_SetTag, env_of_t<const _Sender&>, env_of_t<const _Receiver&>>;
using __second_rcvr_t = __receiver_with_env_t<_Receiver, __env2_t>;
template <typename _Tag, typename... _Args>
constexpr void __impl(_Receiver& __rcvr, _Tag __tag, _Args&&... __args) noexcept {
if constexpr (std::is_same_v<_SetTag, _Tag>) {
using __sender_t = __call_result_t<_Fun, __decay_t<_Args>&...>;
using __submit_t = __submit_result<__sender_t, __env2_t, _Receiver>;
constexpr bool __nothrow_store = (__nothrow_decay_copyable<_Args> && ...);
constexpr bool __nothrow_invoke = __nothrow_callable<_Fun, __decay_t<_Args>&...>;
constexpr bool __nothrow_submit = noexcept(__storage_.template emplace<__submit_t>(
__declval<__sender_t>(), __declval<__second_rcvr_t>()));
STDEXEC_TRY {
auto& __tuple = __args_.emplace_from(__mktuple, static_cast<_Args&&>(__args)...);
auto&& __sender = ::stdexec::__apply(static_cast<_Fun&&>(__fun_), __tuple);
__storage_.template emplace<__monostate>();
__second_rcvr_t __r{__rcvr, static_cast<__env2_t&&>(__env2_)};
auto& __op = __storage_.template emplace<__submit_t>(
static_cast<__sender_t&&>(__sender), static_cast<__second_rcvr_t&&>(__r));
__op.submit(static_cast<__sender_t&&>(__sender), static_cast<__second_rcvr_t&&>(__r));
}
STDEXEC_CATCH_ALL {
if constexpr (!(__nothrow_store && __nothrow_invoke && __nothrow_submit)) {
::stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception());
}
}
} else {
__tag(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...);
}
}
struct __first_rcvr_t {
using receiver_concept = ::stdexec::receiver_t;
__let_state& __state;
_Receiver& __rcvr;
template <typename... _Args>
constexpr void set_value(_Args&&... __args) noexcept {
__state.__impl(__rcvr, ::stdexec::set_value, static_cast<_Args&&>(__args)...);
}
template <typename... _Args>
constexpr void set_error(_Args&&... __args) noexcept {
__state.__impl(__rcvr, ::stdexec::set_error, static_cast<_Args&&>(__args)...);
}
template <typename... _Args>
constexpr void set_stopped(_Args&&... __args) noexcept {
__state.__impl(__rcvr, ::stdexec::set_stopped, static_cast<_Args&&>(__args)...);
}
constexpr decltype(auto) get_env() const noexcept {
return ::stdexec::get_env(__rcvr);
}
};

using __result_variant = __variant_for<__monostate, _Tuples...>;
using __submit_variant = __variant_for<
using __op_state_variant = __variant_for<
__monostate,
__mapply<__submit_datum_for<_Receiver, _Fun, _SetTag, _Env2>, _Tuples>...
>;

template <class _ResultSender, class _OpState>
auto __get_result_receiver(const _ResultSender&, _OpState& __op_state) -> decltype(auto) {
return __rcvr_t{__op_state.__rcvr_, __env2_};
::stdexec::connect_result_t<_Sender, __first_rcvr_t>,
__mapply<__submit_datum_for<_Receiver, _Fun, _SetTag, __env2_t>, _Tuples>...>;

constexpr explicit __let_state(_Sender&& __sender, _Fun __fun, _Receiver& __r) noexcept(
__nothrow_connectable<_Sender, __first_rcvr_t>
&& std::is_nothrow_move_constructible_v<_Fun>)
: __fun_(static_cast<_Fun&&>(__fun))
, __env2_(
// TODO(ericniebler): this needs a fallback
__let::__mk_env2<_SetTag>(::stdexec::get_env(__sender), ::stdexec::get_env(__r))) {
__storage_.emplace_from(
::stdexec::connect, static_cast<_Sender&&>(__sender), __first_rcvr_t{*this, __r});
}

STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
_Fun __fun_;
STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
_Env2 __env2_;
__env2_t __env2_;
//! Variant to hold the results passed from upstream before passing them to the function:
__result_variant __args_{};
//! Variant type for holding the operation state from connecting
//! the function result to the downstream receiver:
__submit_variant __storage_{};
//! Variant type for holding the operation state of the currently in flight operation
__op_state_variant __storage_{};
};

// The set_value completions of:
Expand Down Expand Up @@ -504,11 +562,20 @@ namespace stdexec {
}
};

template <class _Sender, class _Fun>
struct __data_t {
_Sender __sndr;
_Fun __fun;
};

template <typename _Sender>
using __sender_of = decltype((__declval<__data_of<_Sender>>().__sndr));
template <typename _Sender>
using __fun_of = decltype((__declval<__data_of<_Sender>>().__fun));

//! Implementation of the `let_*_t` types, where `_SetTag` is, e.g., `set_value_t` for `let_value`.
template <class _SetTag>
struct __let_t {
using __t = _SetTag;

template <sender _Sender, __movable_value _Fun>
auto operator()(_Sender&& __sndr, _Fun __fun) const -> __well_formed_sender auto {
return __make_sexpr<__let_t<_SetTag>>(
Expand All @@ -520,117 +587,62 @@ namespace stdexec {
auto operator()(_Fun __fun) const {
return __closure(*this, static_cast<_Fun&&>(__fun));
}

template <class _Sender>
auto transform_sender(set_value_t, _Sender&& __sndr, __ignore) {
return __sexpr_apply(
static_cast<_Sender&&>(__sndr),
[]<class _Fun, class _Child>(__ignore, _Fun&& __fun, _Child&& __child) {
return __make_sexpr<__let_tag<_SetTag>>(
__data_t{static_cast<_Child&&>(__child), static_cast<_Fun&&>(__fun)});
});
}
};

template <class _SetTag>
struct __let_impl : __sexpr_defaults {
static constexpr auto get_attrs = []<class _Fun, class _Child>(
const _Fun&,
[[maybe_unused]]
const _Child& __child) noexcept {
// BUGBUG:
return stdexec::get_env(__child);
//return __attrs<__let_t<_SetTag>, _Child, _Fun>{};
};
static constexpr auto get_attrs =
[]<class _Child, class _Fun>(const __data_t<_Child, _Fun>& __data) noexcept {
// BUGBUG:
return stdexec::get_env(__data.__sndr);
};

static constexpr auto get_completion_signatures =
[]<class _Self, class _Env>(_Self&&, _Env&&...) noexcept {
static_assert(sender_expr_for<_Self, __let_t<_SetTag>>);
static_assert(sender_expr_for<_Self, __let_tag<_SetTag>>);
if constexpr (__decay_copyable<_Self>) {
using __fn_t = __decay_t<__data_of<_Self>>;
return __completions_t<__let_t<_SetTag>, __fn_t, __child_of<_Self>, _Env>{};
using __fn_t = __decay_t<__fun_of<_Self>>;
using __result_t =
__completions_t<__let_tag<_SetTag>, __fn_t, __sender_of<_Self>, _Env>;
return __result_t{};
} else {
return __mexception<_SENDER_TYPE_IS_NOT_COPYABLE_, _WITH_SENDER_<_Self>>{};
}
};

static constexpr auto get_state =
[]<class _Receiver, __decay_copyable _Sender>(_Sender&& __sndr, const _Receiver& __rcvr)
requires sender_in<__child_of<_Sender>, env_of_t<_Receiver>>
[]<class _Receiver, __decay_copyable _Sender>(_Sender&& __sndr, _Receiver& __rcvr)
requires sender_in<__sender_of<_Sender>, env_of_t<_Receiver>>
{
static_assert(sender_expr_for<_Sender, __let_t<_SetTag>>);
using _Fun = __decay_t<__data_of<_Sender>>;
using _Child = __child_of<_Sender>;
using _Env2 = __env2_t<_SetTag, env_of_t<_Child>, env_of_t<_Receiver>>;
using __mk_let_state = __mbind_front_q<__let_state, _Receiver, _Fun, _SetTag, _Env2>;

static_assert(sender_expr_for<_Sender, __let_tag<_SetTag>>);
using _Child = __sender_of<_Sender>;
using _Fun = __decay_t<__fun_of<_Sender>>;
using __mk_let_state = __mbind_front_q<__let_state, _SetTag, _Child, _Fun, _Receiver>;
using __let_state_t = __gather_completions_of<
_SetTag,
_Child,
env_of_t<_Receiver>,
__q<__decayed_tuple>,
__mk_let_state
>;

return __sndr.apply(
static_cast<_Sender&&>(__sndr),
[&]<class _Fn, class _Child>(__ignore, _Fn&& __fn, _Child&& __child) {
// TODO(ericniebler): this needs a fallback
_Env2 __env2 =
__let::__mk_env2<_SetTag>(stdexec::get_env(__child), stdexec::get_env(__rcvr));
return __let_state_t{static_cast<_Fn&&>(__fn), static_cast<_Env2&&>(__env2)};
});
__mk_let_state>;
auto&& [__tag, __data] = static_cast<_Sender&&>(__sndr);
return __let_state_t(
__forward_like<_Sender>(__data).__sndr, __forward_like<_Sender>(__data).__fun, __rcvr);
};

//! Helper function to actually invoke the function to produce `let_*`'s sender,
//! connect it to the downstream receiver, and start it. This is the heart of
//! `let_*`.
template <class _State, class _OpState, class... _As>
static void __bind_(_State& __state, _OpState& __op_state, _As&&... __as) {
// Store the passed-in (received) args:
auto& __args = __state.__args_.emplace_from(__mktuple, static_cast<_As&&>(__as)...);
// Apply the function to the args to get the sender:
auto __sndr2 = stdexec::__apply(std::move(__state.__fun_), __args);
// Create a receiver based on the state, the computed sender, and the operation state:
auto __rcvr2 = __state.__get_result_receiver(__sndr2, __op_state);
// Connect the sender to the receiver and start it:
using __result_t = decltype(submit_result{std::move(__sndr2), std::move(__rcvr2)});
auto& __op = __state.__storage_
.template emplace<__result_t>(std::move(__sndr2), std::move(__rcvr2));
__op.submit(std::move(__sndr2), std::move(__rcvr2));
}

template <class _OpState, class... _As>
static void __bind(_OpState& __op_state, _As&&... __as) noexcept {
using _State = decltype(__op_state.__state_);
using _Receiver = decltype(__op_state.__rcvr_);
using _Fun = _State::__fun_t;
using _Env2 = _State::__env2_t;
using _JoinEnv2 = __join_env_t<_Env2, env_of_t<_Receiver>>;
using _ResultSender = __mcall<__result_sender_fn<_SetTag, _Fun, _JoinEnv2>, _As...>;

_State& __state = __op_state.__state_;
_Receiver& __rcvr = __op_state.__rcvr_;

if constexpr (
(__nothrow_decay_copyable<_As> && ...) && __nothrow_callable<_Fun, __decay_t<_As>&...>
&& __nothrow_connectable<_ResultSender, __result_receiver_t<_Receiver, _Env2>>) {
__bind_(__state, __op_state, static_cast<_As&&>(__as)...);
} else {
STDEXEC_TRY {
__bind_(__state, __op_state, static_cast<_As&&>(__as)...);
}
STDEXEC_CATCH_ALL {
using _Receiver = decltype(__op_state.__rcvr_);
stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception());
}
}
}

static constexpr auto complete = []<class _OpState, class _Tag, class... _As>(
__ignore,
_OpState& __op_state,
_Tag,
_As&&... __as) noexcept -> void {
if constexpr (__same_as<_Tag, _SetTag>) {
// Intercept the channel of interest to compute the sender and connect it:
__bind(__op_state, static_cast<_As&&>(__as)...);
} else {
// Forward the other channels downstream:
using _Receiver = decltype(__op_state.__rcvr_);
_Tag()(static_cast<_Receiver&&>(__op_state.__rcvr_), static_cast<_As&&>(__as)...);
}
};
static constexpr auto start =
[]<typename _State, typename _Receiver>(_State& __state, _Receiver&) noexcept {
::stdexec::start(__state.__storage_.template get<1>());
};
};
} // namespace __let

Expand All @@ -639,5 +651,19 @@ namespace stdexec {
inline constexpr let_stopped_t let_stopped{};

template <class _SetTag>
struct __sexpr_impl<__let::__let_t<_SetTag>> : __let::__let_impl<_SetTag> { };
struct __sexpr_impl<__let::__let_tag<_SetTag>> : __let::__let_impl<_SetTag> { };

template <class _SetTag>
struct __sexpr_impl<__let::__let_t<_SetTag>> : __sexpr_defaults {
static constexpr auto get_attrs = []<class _Child>(__ignore, const _Child& __child) noexcept {
// BUGBUG:
return stdexec::get_env(__child);
//return __attrs<__let_t<_SetTag>, _Child, _Fun>{};
};

static constexpr auto get_completion_signatures =
[]<class _Sender, class... _Env>(_Sender&&, const _Env&...) noexcept
-> __completion_signatures_of_t<transform_sender_result_t<_Sender, _Env...>, _Env...> {
};
};
} // namespace stdexec
48 changes: 48 additions & 0 deletions test/stdexec/algos/adaptors/test_let_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,52 @@ namespace {
ex::start(op);
CHECK(*ptr == 5);
}

TEST_CASE(
"let_value destroys the first operation state before invoking the sender factory",
"[adaptors][let_value]") {
const auto ptr = std::make_shared<int>(5);
CHECK(ptr.use_count() == 1);
auto first = ex::just() | ex::then([ptr = ptr]() { });
CHECK(ptr.use_count() == 2);
auto sender = ex::let_value(std::move(first), [&]() {
CHECK(ptr.use_count() == 2);
return ex::just();
});
CHECK(ptr.use_count() == 2);
auto op = ex::connect(std::move(sender), expect_void_receiver{});
CHECK(ptr.use_count() == 2);
ex::start(op);
CHECK(ptr.use_count() == 1);
}

struct immovable_sender {
using sender_concept = ::stdexec::sender_t;
template <typename... Args>
consteval auto get_completion_signatures(const Args&...) const & noexcept {
return ::stdexec::completion_signatures_of_t<decltype(::stdexec::just()), Args...>{};
}
template <typename Receiver>
auto connect(Receiver r) const & noexcept {
return ::stdexec::connect(::stdexec::just(), std::move(r));
}
immovable_sender() = default;
immovable_sender(const immovable_sender&) {
throw std::logic_error("Unexpected copy");
}
};
static_assert(::stdexec::sender<immovable_sender>);
static_assert(::stdexec::sender<const immovable_sender&>);
static_assert(::stdexec::sender_in<immovable_sender, ::stdexec::env<>>);
static_assert(::stdexec::sender_in<const immovable_sender&, ::stdexec::env<>>);

TEST_CASE(
"If the sender factory returns a reference to a sender that reference is passed to connect",
"[adaptors][let_value]") {
const immovable_sender s;
auto just = ex::just();
auto sender = ex::let_value(just, [&]() -> decltype(auto) { return (s); });
auto op = ex::connect(sender, expect_void_receiver{});
ex::start(op);
}
} // namespace
Loading