proxygen
FlowManyTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <array>
18 
19 #include <type_traits>
20 
21 #include <chrono>
22 using namespace std::literals;
23 
28 
33 
34 using namespace folly::pushmi::aliases;
35 
38 
39 using namespace testing;
40 
41 #if __cpp_deduction_guides >= 201703
42 #define MAKE(x) x MAKE_
43 #define MAKE_(...) \
44  { __VA_ARGS__ }
45 #else
46 #define MAKE(x) make_##x
47 #endif
48 
49 class ImmediateFlowManySender : public Test {
50  protected:
51  auto make_producer() {
52  return mi::MAKE(flow_many_sender)([&](auto out) {
53  using Out = decltype(out);
54  struct Data : mi::receiver<> {
55  explicit Data(Out out) : out(std::move(out)), stop(false) {}
56  Out out;
57  bool stop;
58  };
59 
60  auto up = mi::MAKE(receiver)(
61  Data{std::move(out)},
62  [&](auto& data, auto requested) {
63  signals_ += 1000000;
64  if (requested < 1) {
65  return;
66  }
67  // check boolean to select signal
68  if (!data.stop) {
69  ::mi::set_value(data.out, 42);
70  }
71  ::mi::set_done(data.out);
72  },
73  [&](auto& data, auto) noexcept {
74  signals_ += 100000;
75  data.stop = true;
76  ::mi::set_done(data.out);
77  },
78  [&](auto& data) {
79  signals_ += 10000;
80  data.stop = true;
81  ::mi::set_done(data.out);
82  });
83 
84  // pass reference for cancellation.
85  ::mi::set_starting(up.data().out, std::move(up));
86  });
87  }
88 
89  template <class F>
90  auto make_consumer(F f) {
91  return mi::MAKE(flow_receiver)(
92  mi::on_value([&](int) { signals_ += 100; }),
93  mi::on_error([&](auto) noexcept { signals_ += 1000; }),
94  mi::on_done([&]() { signals_ += 1; }),
95  mi::on_starting([&, f](auto up) {
96  signals_ += 10;
97  f(std::move(up));
98  }));
99  }
100 
101  int signals_{0};
102 };
103 
104 TEST_F(ImmediateFlowManySender, EarlyCancellation) {
105  make_producer() | op::submit(make_consumer([](auto up) {
106  // immediately stop producer
107  ::mi::set_done(up);
108  }));
109 
110  EXPECT_THAT(signals_, Eq(10011))
111  << "expected that the starting, up.done and out.done signals are each recorded once";
112 }
113 
114 TEST_F(ImmediateFlowManySender, LateCancellation) {
115  make_producer() | op::submit(make_consumer([](auto up) {
116  // do not stop producer before it is scheduled to run
117  ::mi::set_value(up, 1);
118  }));
119 
120  EXPECT_THAT(signals_, Eq(1000111))
121  << "expected that the starting, up.value, value and done signals are each recorded once";
122 }
123 
124 using NT = decltype(mi::new_thread());
125 
126 inline auto make_time(mi::time_source<>& t, NT& ex) {
127  return t.make(mi::systemNowF{}, [ex]() { return ex; })();
128 }
129 
131  protected:
133 
134  void reset() {
135  at_ = mi::now(tnt_) + 100ms;
136  signals_ = 0;
137  terminal_ = 0;
138  cancel_ = 0;
139  }
140 
141  void join() {
142  timeproduce_.join();
143  timecancel_.join();
144  }
145 
146  void cancellation_test(std::chrono::system_clock::time_point at) {
147  auto f = mi::MAKE(flow_many_sender)([&](auto out) {
148  using Out = decltype(out);
149 
150  // boolean cancellation
151  struct producer {
152  producer(Out out, TNT tnt, bool s)
153  : out(std::move(out)), tnt(std::move(tnt)), stop(s) {}
154  Out out;
155  TNT tnt;
156  std::atomic<bool> stop;
157  };
158  auto p = std::make_shared<producer>(std::move(out), tnt_, false);
159 
160  struct Data : mi::receiver<> {
161  explicit Data(std::shared_ptr<producer> p) : p(std::move(p)) {}
162  std::shared_ptr<producer> p;
163  };
164 
165  auto up = mi::MAKE(receiver)(
166  Data{p},
167  [&](auto& data, auto requested) {
168  signals_ += 1000000;
169  if (requested < 1) {
170  return;
171  }
172  // submit work to happen later
173  data.p->tnt | op::submit_at(at_, [p = data.p](auto) {
174  // check boolean to select signal
175  if (!p->stop) {
176  ::mi::set_value(p->out, 42);
177  ::mi::set_done(p->out);
178  }
179  });
180  },
181  [&](auto& data, auto) noexcept {
182  signals_ += 100000;
183  data.p->stop.store(true);
184  data.p->tnt |
185  op::submit([p = data.p](auto) { ::mi::set_done(p->out); });
186  ++cancel_;
187  },
188  [&](auto& data) {
189  signals_ += 10000;
190  data.p->stop.store(true);
191  data.p->tnt |
192  op::submit([p = data.p](auto) { ::mi::set_done(p->out); });
193  ++cancel_;
194  });
195 
196  tnt_ | op::submit([p, sup = std::move(up)](auto) mutable {
197  // pass reference for cancellation.
198  ::mi::set_starting(p->out, std::move(sup));
199  });
200  });
201  f |
202  op::submit(mi::MAKE(flow_receiver)(
203  mi::on_value([&](int) { signals_ += 100; }),
204  mi::on_error([&](auto) noexcept {
205  signals_ += 1000;
206  ++terminal_;
207  }),
208  mi::on_done([&]() {
209  signals_ += 1;
210  ++terminal_;
211  }),
212  // stop producer before it is scheduled to run
213  mi::on_starting([&, at](auto up) {
214  signals_ += 10;
215  mi::set_value(up, 1);
216  tcncl_ | op::submit_at(at, [up = std::move(up)](auto) mutable {
217  ::mi::set_done(up);
218  });
219  })));
220 
221  while (terminal_ == 0 || cancel_ == 0) {
223  }
224  }
225 
226  NT ntproduce_{mi::new_thread()};
227  mi::time_source<> timeproduce_{};
228  TNT tnt_{make_time(timeproduce_, ntproduce_)};
229 
230  NT ntcancel_{mi::new_thread()};
231  mi::time_source<> timecancel_{};
232  TNT tcncl_{make_time(timecancel_, ntcancel_)};
233 
234  std::atomic<int> signals_{0};
235  std::atomic<int> terminal_{0};
236  std::atomic<int> cancel_{0};
237  std::chrono::system_clock::time_point at_{mi::now(tnt_) + 100ms};
238 };
239 
240 TEST_F(ConcurrentFlowManySender, EarlyCancellation) {
241  // this nightmare brought to you by ASAN stack-use-after-return.
242  cancellation_test(at_ - 50ms);
243 
244  join();
245 
246  EXPECT_THAT(signals_, Eq(1010011))
247  << "expected that the starting, up.done and out.done signals are each recorded once";
248 }
249 
250 TEST_F(ConcurrentFlowManySender, LateCancellation) {
251  // this nightmare brought to you by ASAN stack-use-after-return.
252  cancellation_test(at_ + 50ms);
253 
254  join();
255 
256  EXPECT_THAT(signals_, Eq(1010111))
257  << "expected that the starting, up.done and out.value signals are each recorded once";
258 }
259 
260 TEST_F(ConcurrentFlowManySender, RacingCancellation) {
261  int total = 0;
262  int cancellostrace = 0; // 1010111
263  int cancelled = 0; // 1010011
264 
265  for (;;) {
266  reset();
267  cancellation_test(at_);
268 
269  // accumulate known signals
270  ++total;
271  cancellostrace += signals_ == 1010111;
272  cancelled += signals_ == 1010011;
273 
274  EXPECT_THAT(total, Eq(cancellostrace + cancelled))
275  << signals_ << " <- this set of signals is unrecognized";
276 
277  ASSERT_THAT(total, Lt(100))
278  // too long, abort and show the signals distribution
279  << "total " << total << ", cancel-lost-race " << cancellostrace
280  << ", cancelled " << cancelled;
281 
282  if (cancellostrace > 4 && cancelled > 4) {
283  // yay all known outcomes were observed!
284  break;
285  }
286  // try again
287  continue;
288  }
289 
290  join();
291 }
292 
293 TEST(FlowManySender, From) {
294  auto v = std::array<int, 5>{0, 1, 2, 3, 4};
295  auto f = op::flow_from(v);
296 
297  int actual = 0;
298  f | op::for_each(mi::MAKE(receiver)([&](int) { ++actual; }));
299 
300  EXPECT_THAT(actual, Eq(5)) << "expexcted that all the values are sent once";
301 }
TEST(FlowManySender, From)
auto f
auto on_value(Fns...fns) -> on_value_fn< Fns... >
Definition: boosters.h:262
internal::EqMatcher< T > Eq(T x)
decltype(nt) NT
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
auto on_error(Fns...fns) -> on_error_fn< Fns... >
Definition: boosters.h:273
auto on_starting(Fns...fns) -> on_starting_fn< Fns... >
Definition: boosters.h:296
requires E e noexcept(noexcept(s.error(std::move(e))))
auto make_time(mi::time_source<> &t, NT &ex)
internal::LtMatcher< Rhs > Lt(Rhs x)
TEST_F(ImmediateFlowManySender, EarlyCancellation)
static void stop()
new_thread_executor new_thread()
Definition: new_thread.h:50
PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at
Definition: submit.h:387
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::operators::flow_from_fn flow_from
#define MAKE(x)
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
#define EXPECT_THAT(value, matcher)
#define ASSERT_THAT(value, matcher)
auto on_done(Fn fn) -> on_done_fn< Fn >
Definition: boosters.h:285
#define join
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
static set< string > s
mi::invoke_result_t< decltype(make_time), mi::time_source<> &, NT & > TNT
void cancellation_test(std::chrono::system_clock::time_point at)
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
Definition: functional.h:47
void for_each(T const &range, Function< void(typename T::value_type const &) const > const &func)
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&NeverBlocking< invoke_result_t< ExecutorFactory & > > auto make(NF nf, ExecutorFactory ef)
Definition: time_source.h:545
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done