19 #include <type_traits> 41 #if __cpp_deduction_guides >= 201703 42 #define MAKE(x) x MAKE_ 46 #define MAKE(x) make_##x 52 return mi::MAKE(flow_many_sender)([&](
auto out) {
53 using Out = decltype(out);
62 [&](
auto&
data,
auto requested) {
105 make_producer() |
op::submit(make_consumer([](
auto up) {
111 <<
"expected that the starting, up.done and out.done signals are each recorded once";
115 make_producer() |
op::submit(make_consumer([](
auto up) {
121 <<
"expected that the starting, up.value, value and done signals are each recorded once";
147 auto f =
mi::MAKE(flow_many_sender)([&](
auto out) {
148 using Out = decltype(out);
152 producer(Out out,
TNT tnt,
bool s)
156 std::atomic<bool>
stop;
158 auto p = std::make_shared<producer>(
std::move(out), tnt_,
false);
161 explicit Data(std::shared_ptr<producer> p) : p(
std::move(p)) {}
162 std::shared_ptr<producer> p;
167 [&](
auto&
data,
auto requested) {
176 ::mi::set_value(p->out, 42);
177 ::mi::set_done(p->out);
183 data.p->stop.store(
true);
185 op::submit([p = data.p](
auto) { ::mi::set_done(p->out); });
190 data.p->stop.store(
true);
192 op::submit([p = data.p](
auto) { ::mi::set_done(p->out); });
221 while (terminal_ == 0 || cancel_ == 0) {
234 std::atomic<int> signals_{0};
235 std::atomic<int> terminal_{0};
236 std::atomic<int> cancel_{0};
237 std::chrono::system_clock::time_point at_{
mi::now(tnt_) + 100ms};
242 cancellation_test(at_ - 50ms);
247 <<
"expected that the starting, up.done and out.done signals are each recorded once";
252 cancellation_test(at_ + 50ms);
257 <<
"expected that the starting, up.done and out.value signals are each recorded once";
262 int cancellostrace = 0;
267 cancellation_test(at_);
271 cancellostrace += signals_ == 1010111;
272 cancelled += signals_ == 1010011;
275 << signals_ <<
" <- this set of signals is unrecognized";
279 <<
"total " << total <<
", cancel-lost-race " << cancellostrace
280 <<
", cancelled " << cancelled;
282 if (cancellostrace > 4 && cancelled > 4) {
294 auto v = std::array<int, 5>{0, 1, 2, 3, 4};
300 EXPECT_THAT(actual,
Eq(5)) <<
"expexcted that all the values are sent once";
TEST(FlowManySender, From)
auto on_value(Fns...fns) -> on_value_fn< Fns... >
internal::EqMatcher< T > Eq(T x)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
auto on_error(Fns...fns) -> on_error_fn< Fns... >
auto on_starting(Fns...fns) -> on_starting_fn< Fns... >
requires E e noexcept(noexcept(s.error(std::move(e))))
auto make_time(mi::time_source<> &t, NT &ex)
internal::LtMatcher< Rhs > Lt(Rhs x)
TEST_F(ImmediateFlowManySender, EarlyCancellation)
new_thread_executor new_thread()
PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::operators::flow_from_fn flow_from
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
#define EXPECT_THAT(value, matcher)
#define ASSERT_THAT(value, matcher)
auto on_done(Fn fn) -> on_done_fn< Fn >
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
mi::invoke_result_t< decltype(make_time), mi::time_source<> &, NT & > TNT
void cancellation_test(std::chrono::system_clock::time_point at)
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
void for_each(T const &range, Function< void(typename T::value_type const &) const > const &func)
static constexpr uint64_t data[1]
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&NeverBlocking< invoke_result_t< ExecutorFactory & > > auto make(NF nf, ExecutorFactory ef)
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done