Skip to content

Commit 01ea736

Browse files
committed
let_value, _error, & _stopped: Destroy Child Operation State After Completion
Previously the operation states for let_value, let_error, and let_stopped directly contained the operation states of one or two child operations. When first connected this number was one, and after the completion of that first operation and the connection of the sender returned by the wrapped invocable the number was two. Notably when the second operation state is connected the first operation has completed. P3373 proposes changing the above behavior such that after the tuple of values sent by the first operation is populated the operation state for the first operation is destroyed. Thereafter that storage can be reused for the operation state of the second operation thereby: - Release resources held by the first operation state sooner, and - Reducing the size of the operation states of let_value, let_error, and let_stopped This commit implements the above-described change.
1 parent 9082d76 commit 01ea736

File tree

2 files changed

+182
-108
lines changed

2 files changed

+182
-108
lines changed

include/stdexec/__detail/__let.hpp

Lines changed: 134 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "__variant.hpp"
3434

3535
#include <exception>
36+
#include <type_traits>
3637

3738
namespace stdexec {
3839
//////////////////////////////////////////////////////////////////////////////
@@ -41,6 +42,11 @@ namespace stdexec {
4142
template <class _SetTag>
4243
struct __let_t;
4344

45+
template <class _SetTag>
46+
struct __let_tag {
47+
using __t = _SetTag;
48+
};
49+
4450
template <class _SetTag>
4551
inline constexpr __mstring __in_which_let_msg{"In stdexec::let_value(Sender, Function)..."};
4652

@@ -282,32 +288,84 @@ namespace stdexec {
282288

283289
//! The core of the operation state for `let_*`.
284290
//! This gets bundled up into a larger operation state (`__detail::__op_state<...>`).
285-
template <class _Receiver, class _Fun, class _SetTag, class _Env2, class... _Tuples>
291+
template <class _SetTag, class _Sender, class _Fun, class _Receiver, class... _Tuples>
286292
struct __let_state {
287-
using __fun_t = _Fun;
288-
using __env2_t = _Env2;
289-
using __env_t = __join_env_t<_Env2, env_of_t<_Receiver>>;
290-
using __rcvr_t = __receiver_with_env_t<_Receiver, _Env2>;
293+
using __env2_t =
294+
__let::__env2_t<_SetTag, env_of_t<const _Sender&>, env_of_t<const _Receiver&>>;
295+
using __second_rcvr_t = __receiver_with_env_t<_Receiver, __env2_t>;
296+
template <typename _Tag, typename... _Args>
297+
constexpr void __impl(_Receiver& __rcvr, _Tag __tag, _Args&&... __args) noexcept {
298+
if constexpr (std::is_same_v<_SetTag, _Tag>) {
299+
using __sender_t = __call_result_t<_Fun, __decay_t<_Args>&...>;
300+
using __submit_t = __submit_result<__sender_t, __env2_t, _Receiver>;
301+
constexpr bool __nothrow_store = (__nothrow_decay_copyable<_Args> && ...);
302+
constexpr bool __nothrow_invoke = __nothrow_callable<_Fun, __decay_t<_Args>&...>;
303+
constexpr bool __nothrow_submit = noexcept(__storage_.template emplace<__submit_t>(
304+
__declval<__sender_t>(), __declval<__second_rcvr_t>()));
305+
STDEXEC_TRY {
306+
auto& __tuple = __args_.emplace_from(__mktuple, static_cast<_Args&&>(__args)...);
307+
auto&& __sender = ::stdexec::__apply(static_cast<_Fun&&>(__fun_), __tuple);
308+
__storage_.template emplace<__monostate>();
309+
__second_rcvr_t __r{__rcvr, static_cast<__env2_t&&>(__env2_)};
310+
auto& __op = __storage_.template emplace<__submit_t>(
311+
static_cast<__sender_t&&>(__sender), static_cast<__second_rcvr_t&&>(__r));
312+
__op.submit(static_cast<__sender_t&&>(__sender), static_cast<__second_rcvr_t&&>(__r));
313+
}
314+
STDEXEC_CATCH_ALL {
315+
if constexpr (!(__nothrow_store && __nothrow_invoke && __nothrow_submit)) {
316+
::stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception());
317+
}
318+
}
319+
} else {
320+
__tag(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...);
321+
}
322+
}
323+
struct __first_rcvr_t {
324+
using receiver_concept = ::stdexec::receiver_t;
325+
__let_state& __state;
326+
_Receiver& __rcvr;
327+
template <typename... _Args>
328+
constexpr void set_value(_Args&&... __args) noexcept {
329+
__state.__impl(__rcvr, ::stdexec::set_value, static_cast<_Args&&>(__args)...);
330+
}
331+
template <typename... _Args>
332+
constexpr void set_error(_Args&&... __args) noexcept {
333+
__state.__impl(__rcvr, ::stdexec::set_error, static_cast<_Args&&>(__args)...);
334+
}
335+
template <typename... _Args>
336+
constexpr void set_stopped(_Args&&... __args) noexcept {
337+
__state.__impl(__rcvr, ::stdexec::set_stopped, static_cast<_Args&&>(__args)...);
338+
}
339+
constexpr decltype(auto) get_env() const noexcept {
340+
return ::stdexec::get_env(__rcvr);
341+
}
342+
};
343+
291344
using __result_variant = __variant_for<__monostate, _Tuples...>;
292-
using __submit_variant = __variant_for<
345+
using __op_state_variant = __variant_for<
293346
__monostate,
294-
__mapply<__submit_datum_for<_Receiver, _Fun, _SetTag, _Env2>, _Tuples>...
295-
>;
296-
297-
template <class _ResultSender, class _OpState>
298-
auto __get_result_receiver(const _ResultSender&, _OpState& __op_state) -> decltype(auto) {
299-
return __rcvr_t{__op_state.__rcvr_, __env2_};
347+
::stdexec::connect_result_t<_Sender, __first_rcvr_t>,
348+
__mapply<__submit_datum_for<_Receiver, _Fun, _SetTag, __env2_t>, _Tuples>...>;
349+
350+
constexpr explicit __let_state(_Sender&& __sender, _Fun __fun, _Receiver& __r) noexcept(
351+
__nothrow_connectable<_Sender, __first_rcvr_t>
352+
&& std::is_nothrow_move_constructible_v<_Fun>)
353+
: __fun_(static_cast<_Fun&&>(__fun))
354+
, __env2_(
355+
// TODO(ericniebler): this needs a fallback
356+
__let::__mk_env2<_SetTag>(::stdexec::get_env(__sender), ::stdexec::get_env(__r))) {
357+
__storage_.emplace_from(
358+
::stdexec::connect, static_cast<_Sender&&>(__sender), __first_rcvr_t{*this, __r});
300359
}
301360

302361
STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
303362
_Fun __fun_;
304363
STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
305-
_Env2 __env2_;
364+
__env2_t __env2_;
306365
//! Variant to hold the results passed from upstream before passing them to the function:
307366
__result_variant __args_{};
308-
//! Variant type for holding the operation state from connecting
309-
//! the function result to the downstream receiver:
310-
__submit_variant __storage_{};
367+
//! Variant type for holding the operation state of the currently in flight operation
368+
__op_state_variant __storage_{};
311369
};
312370

313371
// The set_value completions of:
@@ -504,11 +562,20 @@ namespace stdexec {
504562
}
505563
};
506564

565+
template <class _Sender, class _Fun>
566+
struct __data_t {
567+
_Sender __sndr;
568+
_Fun __fun;
569+
};
570+
571+
template <typename _Sender>
572+
using __sender_of = decltype((__declval<__data_of<_Sender>>().__sndr));
573+
template <typename _Sender>
574+
using __fun_of = decltype((__declval<__data_of<_Sender>>().__fun));
575+
507576
//! Implementation of the `let_*_t` types, where `_SetTag` is, e.g., `set_value_t` for `let_value`.
508577
template <class _SetTag>
509578
struct __let_t {
510-
using __t = _SetTag;
511-
512579
template <sender _Sender, __movable_value _Fun>
513580
auto operator()(_Sender&& __sndr, _Fun __fun) const -> __well_formed_sender auto {
514581
return __make_sexpr<__let_t<_SetTag>>(
@@ -520,117 +587,62 @@ namespace stdexec {
520587
auto operator()(_Fun __fun) const {
521588
return __closure(*this, static_cast<_Fun&&>(__fun));
522589
}
590+
591+
template <class _Sender>
592+
auto transform_sender(set_value_t, _Sender&& __sndr, __ignore) {
593+
return __sexpr_apply(
594+
static_cast<_Sender&&>(__sndr),
595+
[]<class _Fun, class _Child>(__ignore, _Fun&& __fun, _Child&& __child) {
596+
return __make_sexpr<__let_tag<_SetTag>>(
597+
__data_t{static_cast<_Child&&>(__child), static_cast<_Fun&&>(__fun)});
598+
});
599+
}
523600
};
524601

525602
template <class _SetTag>
526603
struct __let_impl : __sexpr_defaults {
527-
static constexpr auto get_attrs = []<class _Fun, class _Child>(
528-
const _Fun&,
529-
[[maybe_unused]]
530-
const _Child& __child) noexcept {
531-
// BUGBUG:
532-
return stdexec::get_env(__child);
533-
//return __attrs<__let_t<_SetTag>, _Child, _Fun>{};
534-
};
604+
static constexpr auto get_attrs =
605+
[]<class _Child, class _Fun>(const __data_t<_Child, _Fun>& __data) noexcept {
606+
// BUGBUG:
607+
return stdexec::get_env(__data.__sndr);
608+
};
535609

536610
static constexpr auto get_completion_signatures =
537611
[]<class _Self, class _Env>(_Self&&, _Env&&...) noexcept {
538-
static_assert(sender_expr_for<_Self, __let_t<_SetTag>>);
612+
static_assert(sender_expr_for<_Self, __let_tag<_SetTag>>);
539613
if constexpr (__decay_copyable<_Self>) {
540-
using __fn_t = __decay_t<__data_of<_Self>>;
541-
return __completions_t<__let_t<_SetTag>, __fn_t, __child_of<_Self>, _Env>{};
614+
using __fn_t = __decay_t<__fun_of<_Self>>;
615+
using __result_t =
616+
__completions_t<__let_tag<_SetTag>, __fn_t, __sender_of<_Self>, _Env>;
617+
return __result_t{};
542618
} else {
543619
return __mexception<_SENDER_TYPE_IS_NOT_COPYABLE_, _WITH_SENDER_<_Self>>{};
544620
}
545621
};
546622

547623
static constexpr auto get_state =
548-
[]<class _Receiver, __decay_copyable _Sender>(_Sender&& __sndr, const _Receiver& __rcvr)
549-
requires sender_in<__child_of<_Sender>, env_of_t<_Receiver>>
624+
[]<class _Receiver, __decay_copyable _Sender>(_Sender&& __sndr, _Receiver& __rcvr)
625+
requires sender_in<__sender_of<_Sender>, env_of_t<_Receiver>>
550626
{
551-
static_assert(sender_expr_for<_Sender, __let_t<_SetTag>>);
552-
using _Fun = __decay_t<__data_of<_Sender>>;
553-
using _Child = __child_of<_Sender>;
554-
using _Env2 = __env2_t<_SetTag, env_of_t<_Child>, env_of_t<_Receiver>>;
555-
using __mk_let_state = __mbind_front_q<__let_state, _Receiver, _Fun, _SetTag, _Env2>;
556-
627+
static_assert(sender_expr_for<_Sender, __let_tag<_SetTag>>);
628+
using _Child = __sender_of<_Sender>;
629+
using _Fun = __decay_t<__fun_of<_Sender>>;
630+
using __mk_let_state = __mbind_front_q<__let_state, _SetTag, _Child, _Fun, _Receiver>;
557631
using __let_state_t = __gather_completions_of<
558632
_SetTag,
559633
_Child,
560634
env_of_t<_Receiver>,
561635
__q<__decayed_tuple>,
562-
__mk_let_state
563-
>;
564-
565-
return __sndr.apply(
566-
static_cast<_Sender&&>(__sndr),
567-
[&]<class _Fn, class _Child>(__ignore, _Fn&& __fn, _Child&& __child) {
568-
// TODO(ericniebler): this needs a fallback
569-
_Env2 __env2 =
570-
__let::__mk_env2<_SetTag>(stdexec::get_env(__child), stdexec::get_env(__rcvr));
571-
return __let_state_t{static_cast<_Fn&&>(__fn), static_cast<_Env2&&>(__env2)};
572-
});
636+
__mk_let_state>;
637+
auto&& [__tag, __data] = static_cast<_Sender&&>(__sndr);
638+
return __let_state_t(
639+
__forward_like<_Sender>(__data).__sndr, __forward_like<_Sender>(__data).__fun, __rcvr);
573640
};
574641

575-
//! Helper function to actually invoke the function to produce `let_*`'s sender,
576-
//! connect it to the downstream receiver, and start it. This is the heart of
577-
//! `let_*`.
578-
template <class _State, class _OpState, class... _As>
579-
static void __bind_(_State& __state, _OpState& __op_state, _As&&... __as) {
580-
// Store the passed-in (received) args:
581-
auto& __args = __state.__args_.emplace_from(__mktuple, static_cast<_As&&>(__as)...);
582-
// Apply the function to the args to get the sender:
583-
auto __sndr2 = stdexec::__apply(std::move(__state.__fun_), __args);
584-
// Create a receiver based on the state, the computed sender, and the operation state:
585-
auto __rcvr2 = __state.__get_result_receiver(__sndr2, __op_state);
586-
// Connect the sender to the receiver and start it:
587-
using __result_t = decltype(submit_result{std::move(__sndr2), std::move(__rcvr2)});
588-
auto& __op = __state.__storage_
589-
.template emplace<__result_t>(std::move(__sndr2), std::move(__rcvr2));
590-
__op.submit(std::move(__sndr2), std::move(__rcvr2));
591-
}
592-
593-
template <class _OpState, class... _As>
594-
static void __bind(_OpState& __op_state, _As&&... __as) noexcept {
595-
using _State = decltype(__op_state.__state_);
596-
using _Receiver = decltype(__op_state.__rcvr_);
597-
using _Fun = _State::__fun_t;
598-
using _Env2 = _State::__env2_t;
599-
using _JoinEnv2 = __join_env_t<_Env2, env_of_t<_Receiver>>;
600-
using _ResultSender = __mcall<__result_sender_fn<_SetTag, _Fun, _JoinEnv2>, _As...>;
601-
602-
_State& __state = __op_state.__state_;
603-
_Receiver& __rcvr = __op_state.__rcvr_;
604-
605-
if constexpr (
606-
(__nothrow_decay_copyable<_As> && ...) && __nothrow_callable<_Fun, __decay_t<_As>&...>
607-
&& __nothrow_connectable<_ResultSender, __result_receiver_t<_Receiver, _Env2>>) {
608-
__bind_(__state, __op_state, static_cast<_As&&>(__as)...);
609-
} else {
610-
STDEXEC_TRY {
611-
__bind_(__state, __op_state, static_cast<_As&&>(__as)...);
612-
}
613-
STDEXEC_CATCH_ALL {
614-
using _Receiver = decltype(__op_state.__rcvr_);
615-
stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception());
616-
}
617-
}
618-
}
619-
620-
static constexpr auto complete = []<class _OpState, class _Tag, class... _As>(
621-
__ignore,
622-
_OpState& __op_state,
623-
_Tag,
624-
_As&&... __as) noexcept -> void {
625-
if constexpr (__same_as<_Tag, _SetTag>) {
626-
// Intercept the channel of interest to compute the sender and connect it:
627-
__bind(__op_state, static_cast<_As&&>(__as)...);
628-
} else {
629-
// Forward the other channels downstream:
630-
using _Receiver = decltype(__op_state.__rcvr_);
631-
_Tag()(static_cast<_Receiver&&>(__op_state.__rcvr_), static_cast<_As&&>(__as)...);
632-
}
633-
};
642+
static constexpr auto start =
643+
[]<typename _State, typename _Receiver>(_State& __state, _Receiver&) noexcept {
644+
::stdexec::start(__state.__storage_.template get<1>());
645+
};
634646
};
635647
} // namespace __let
636648

@@ -639,5 +651,19 @@ namespace stdexec {
639651
inline constexpr let_stopped_t let_stopped{};
640652

641653
template <class _SetTag>
642-
struct __sexpr_impl<__let::__let_t<_SetTag>> : __let::__let_impl<_SetTag> { };
654+
struct __sexpr_impl<__let::__let_tag<_SetTag>> : __let::__let_impl<_SetTag> { };
655+
656+
template <class _SetTag>
657+
struct __sexpr_impl<__let::__let_t<_SetTag>> : __sexpr_defaults {
658+
static constexpr auto get_attrs = []<class _Child>(__ignore, const _Child& __child) noexcept {
659+
// BUGBUG:
660+
return stdexec::get_env(__child);
661+
//return __attrs<__let_t<_SetTag>, _Child, _Fun>{};
662+
};
663+
664+
static constexpr auto get_completion_signatures =
665+
[]<class _Sender, class... _Env>(_Sender&&, const _Env&...) noexcept
666+
-> __completion_signatures_of_t<transform_sender_result_t<_Sender, _Env...>, _Env...> {
667+
};
668+
};
643669
} // namespace stdexec

test/stdexec/algos/adaptors/test_let_value.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,4 +396,52 @@ namespace {
396396
ex::start(op);
397397
CHECK(*ptr == 5);
398398
}
399+
400+
TEST_CASE(
401+
"let_value destroys the first operation state before invoking the sender factory",
402+
"[adaptors][let_value]") {
403+
const auto ptr = std::make_shared<int>(5);
404+
CHECK(ptr.use_count() == 1);
405+
auto first = ex::just() | ex::then([ptr = ptr]() { });
406+
CHECK(ptr.use_count() == 2);
407+
auto sender = ex::let_value(std::move(first), [&]() {
408+
CHECK(ptr.use_count() == 2);
409+
return ex::just();
410+
});
411+
CHECK(ptr.use_count() == 2);
412+
auto op = ex::connect(std::move(sender), expect_void_receiver{});
413+
CHECK(ptr.use_count() == 2);
414+
ex::start(op);
415+
CHECK(ptr.use_count() == 1);
416+
}
417+
418+
struct immovable_sender {
419+
using sender_concept = ::stdexec::sender_t;
420+
template <typename... Args>
421+
consteval auto get_completion_signatures(const Args&...) const & noexcept {
422+
return ::stdexec::completion_signatures_of_t<decltype(::stdexec::just()), Args...>{};
423+
}
424+
template <typename Receiver>
425+
auto connect(Receiver r) const & noexcept {
426+
return ::stdexec::connect(::stdexec::just(), std::move(r));
427+
}
428+
immovable_sender() = default;
429+
immovable_sender(const immovable_sender&) {
430+
throw std::logic_error("Unexpected copy");
431+
}
432+
};
433+
static_assert(::stdexec::sender<immovable_sender>);
434+
static_assert(::stdexec::sender<const immovable_sender&>);
435+
static_assert(::stdexec::sender_in<immovable_sender, ::stdexec::env<>>);
436+
static_assert(::stdexec::sender_in<const immovable_sender&, ::stdexec::env<>>);
437+
438+
TEST_CASE(
439+
"If the sender factory returns a reference to a sender that reference is passed to connect",
440+
"[adaptors][let_value]") {
441+
const immovable_sender s;
442+
auto just = ex::just();
443+
auto sender = ex::let_value(just, [&]() -> decltype(auto) { return (s); });
444+
auto op = ex::connect(sender, expect_void_receiver{});
445+
ex::start(op);
446+
}
399447
} // namespace

0 commit comments

Comments
 (0)