17 #include <type_traits> 37 #if __cpp_deduction_guides >= 201703 38 #define MAKE(x) x MAKE_ 42 #define MAKE(x) make_##x 48 return mi::MAKE(flow_single_sender)([&](
auto out) {
51 auto set_stop = [](
auto&
stop) {
58 using Stopper = decltype(
tokens.second);
60 explicit Data(Stopper stopper) : stopper(
std::move(stopper)) {}
68 (*(both.first))(both.second);
73 (*(both.first))(both.second);
81 if (!!both.first && !*(both.first)) {
106 make_producer() |
op::submit(make_consumer([](
auto up) {
112 <<
"expected that the starting, up.done and out.done signals are each recorded once";
116 make_producer() |
op::submit(make_consumer([](
auto) {
121 <<
"expected that the starting and out.value signals are each recorded once";
146 template <
class MakeTokens>
148 std::chrono::system_clock::time_point at,
149 MakeTokens make_tokens) {
150 auto f =
mi::MAKE(flow_single_sender)([&, make_tokens](
auto out) {
153 auto set_stop = [](
auto&
stop) {
158 auto tokens = make_tokens(stop, set_stop);
160 using Stopper = decltype(
tokens.second);
162 explicit Data(Stopper stopper) : stopper(
std::move(stopper)) {}
171 (*(both.first))(both.second);
177 (*(both.first))(both.second);
184 up_not_a_shadow_howtoeven =
std::move(up),
185 out =
std::move(out)](
auto tnt)
mutable {
187 ::mi::set_starting(out, std::move(up_not_a_shadow_howtoeven));
193 [stoppee = std::move(stoppee),
194 out = std::move(out)](auto) mutable {
196 auto both = lock_both(stoppee);
197 if (!!both.first && !*(both.first)) {
198 ::mi::set_value(out, 42);
227 while (terminal_ == 0 || cancel_ == 0) {
240 std::atomic<int> signals_{0};
241 std::atomic<int> terminal_{0};
242 std::atomic<int> cancel_{0};
243 std::chrono::system_clock::time_point at_{
mi::now(tnt_) + 100ms};
247 cancellation_test(at_ - 50ms, [](
auto&
stop,
auto& set_stop) {
257 cancellation_test(at_ + 50ms, [](
auto&
stop,
auto& set_stop) {
264 <<
"expected that the starting, up.done, out.value and out.done signals are each recorded once";
269 int cancellostrace = 0;
274 cancellation_test(at_, [](
auto&
stop,
auto& set_stop) {
280 cancellostrace += signals_ == 10111;
281 cancelled += signals_ == 10011;
284 << signals_ <<
" <- this set of signals is unrecognized";
288 <<
"total " << total <<
", cancel-lost-race " << cancellostrace
289 <<
", cancelled " << cancelled;
291 if (cancellostrace > 4 && cancelled > 4) {
303 cancellation_test(at_ - 50ms, [](
auto&
stop,
auto& set_stop) {
313 cancellation_test(at_ + 50ms, [](
auto&
stop,
auto& set_stop) {
320 <<
"expected that the starting, up.done, out.value and out.done signals are each recorded once";
325 int cancellostrace = 0;
330 cancellation_test(at_, [](
auto&
stop,
auto& set_stop) {
336 cancellostrace += signals_ == 10111;
337 cancelled += signals_ == 10011;
340 << signals_ <<
" <- this set of signals is unrecognized";
344 <<
"total " << total <<
", cancel-lost-race " << cancellostrace
345 <<
", cancelled " << cancelled;
347 if (cancellostrace > 4 && cancelled > 4) {
locked_entangled_pair< T, Dual > lock_both(entangled< T, Dual > &e)
auto on_value(Fns...fns) -> on_value_fn< Fns... >
internal::EqMatcher< T > Eq(T x)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
auto on_error(Fns...fns) -> on_error_fn< Fns... >
auto on_starting(Fns...fns) -> on_starting_fn< Fns... >
requires E e noexcept(noexcept(s.error(std::move(e))))
mi::invoke_result_t< decltype(make_time), mi::time_source<> &, NT & > TNT
void cancellation_test(std::chrono::system_clock::time_point at, MakeTokens make_tokens)
auto shared_entangle(First f, Second s) -> shared_entangled_pair< First, Second >
internal::LtMatcher< Rhs > Lt(Rhs x)
auto entangle(First f, Second s) -> entangled_pair< First, Second >
new_thread_executor new_thread()
PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
auto make_time(mi::time_source<> &t, NT &ex)
#define EXPECT_THAT(value, matcher)
#define ASSERT_THAT(value, matcher)
auto on_done(Fn fn) -> on_done_fn< Fn >
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
static constexpr uint64_t data[1]
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&NeverBlocking< invoke_result_t< ExecutorFactory & > > auto make(NF nf, ExecutorFactory ef)
static const char tokens[256]
TEST_F(ImmediateFlowSingleSender, EarlyCancellation)
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done