proxygen
FlowTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <type_traits>
18 
19 #include <chrono>
20 using namespace std::literals;
21 
24 
29 
30 using namespace folly::pushmi::aliases;
31 
34 
35 using namespace testing;
36 
37 #if __cpp_deduction_guides >= 201703
38 #define MAKE(x) x MAKE_
39 #define MAKE_(...) \
40  { __VA_ARGS__ }
41 #else
42 #define MAKE(x) make_##x
43 #endif
44 
46  protected:
47  auto make_producer() {
48  return mi::MAKE(flow_single_sender)([&](auto out) {
49  // boolean cancellation
50  bool stop = false;
51  auto set_stop = [](auto& stop) {
52  if (!!stop) {
53  *stop = true;
54  }
55  };
56  auto tokens = mi::shared_entangle(stop, set_stop);
57 
58  using Stopper = decltype(tokens.second);
59  struct Data : mi::receiver<> {
60  explicit Data(Stopper stopper) : stopper(std::move(stopper)) {}
61  Stopper stopper;
62  };
63  auto up = mi::MAKE(receiver)(
64  Data{std::move(tokens.second)},
65  [&](auto& data, auto) noexcept {
66  signals_ += 100000;
67  auto both = lock_both(data.stopper);
68  (*(both.first))(both.second);
69  },
70  mi::on_done([&](auto& data) {
71  signals_ += 10000;
72  auto both = lock_both(data.stopper);
73  (*(both.first))(both.second);
74  }));
75 
76  // pass reference for cancellation.
77  ::mi::set_starting(out, std::move(up));
78 
79  // check boolean to select signal
80  auto both = lock_both(tokens.first);
81  if (!!both.first && !*(both.first)) {
82  ::mi::set_value(out, 42);
83  } else {
84  // cancellation is not an error
85  ::mi::set_done(out);
86  }
87  });
88  }
89 
90  template <class F>
91  auto make_consumer(F f) {
92  return mi::MAKE(flow_receiver)(
93  mi::on_value([&](int) { signals_ += 100; }),
94  mi::on_error([&](auto) noexcept { signals_ += 1000; }),
95  mi::on_done([&]() { signals_ += 1; }),
96  mi::on_starting([&, f](auto up) {
97  signals_ += 10;
98  f(std::move(up));
99  }));
100  }
101 
102  int signals_{0};
103 };
104 
105 TEST_F(ImmediateFlowSingleSender, EarlyCancellation) {
106  make_producer() | op::submit(make_consumer([](auto up) {
107  // immediately stop producer
108  ::mi::set_done(up);
109  }));
110 
111  EXPECT_THAT(signals_, Eq(10011))
112  << "expected that the starting, up.done and out.done signals are each recorded once";
113 }
114 
115 TEST_F(ImmediateFlowSingleSender, LateCancellation) {
116  make_producer() | op::submit(make_consumer([](auto) {
117  // do not stop producer before it is scheduled to run
118  }));
119 
120  EXPECT_THAT(signals_, Eq(110))
121  << "expected that the starting and out.value signals are each recorded once";
122 }
123 
124 using NT = decltype(mi::new_thread());
125 
126 inline auto make_time(mi::time_source<>& t, NT& ex) {
127  return t.make(mi::systemNowF{}, [ex]() { return ex; })();
128 }
129 
131  protected:
133 
134  void reset() {
135  at_ = mi::now(tnt_) + 100ms;
136  signals_ = 0;
137  terminal_ = 0;
138  cancel_ = 0;
139  }
140 
141  void join() {
142  timeproduce_.join();
143  timecancel_.join();
144  }
145 
146  template <class MakeTokens>
148  std::chrono::system_clock::time_point at,
149  MakeTokens make_tokens) {
150  auto f = mi::MAKE(flow_single_sender)([&, make_tokens](auto out) {
151  // boolean cancellation
152  bool stop = false;
153  auto set_stop = [](auto& stop) {
154  if (!!stop) {
155  *stop = true;
156  }
157  };
158  auto tokens = make_tokens(stop, set_stop);
159 
160  using Stopper = decltype(tokens.second);
161  struct Data : mi::receiver<> {
162  explicit Data(Stopper stopper) : stopper(std::move(stopper)) {}
163  Stopper stopper;
164  };
165  auto up = mi::MAKE(receiver)(
166  Data{std::move(tokens.second)},
167  [&](auto& data, auto) noexcept {
168  signals_ += 100000;
169  ++cancel_;
170  auto both = lock_both(data.stopper);
171  (*(both.first))(both.second);
172  },
173  mi::on_done([&](auto& data) {
174  signals_ += 10000;
175  ++cancel_;
176  auto both = lock_both(data.stopper);
177  (*(both.first))(both.second);
178  }));
179 
180  // make all the signals come from the same thread
181  tnt_ |
182  op::submit([&,
183  stoppee = std::move(tokens.first),
184  up_not_a_shadow_howtoeven = std::move(up),
185  out = std::move(out)](auto tnt) mutable {
186  // pass reference for cancellation.
187  ::mi::set_starting(out, std::move(up_not_a_shadow_howtoeven));
188 
189  // submit work to happen later
190  tnt |
191  op::submit_at(
192  at_,
193  [stoppee = std::move(stoppee),
194  out = std::move(out)](auto) mutable {
195  // check boolean to select signal
196  auto both = lock_both(stoppee);
197  if (!!both.first && !*(both.first)) {
198  ::mi::set_value(out, 42);
199  ::mi::set_done(out);
200  } else {
201  // cancellation is not an error
202  ::mi::set_done(out);
203  }
204  });
205  });
206  });
207 
208  f |
209  op::submit(
210  mi::on_value([&](int) { signals_ += 100; }),
211  mi::on_error([&](auto) noexcept {
212  signals_ += 1000;
213  ++terminal_;
214  }),
215  mi::on_done([&]() {
216  signals_ += 1;
217  ++terminal_;
218  }),
219  // stop producer before it is scheduled to run
220  mi::on_starting([&, at](auto up) {
221  signals_ += 10;
222  tcncl_ | op::submit_at(at, [up = std::move(up)](auto) mutable {
223  ::mi::set_done(up);
224  });
225  }));
226 
227  while (terminal_ == 0 || cancel_ == 0) {
229  }
230  }
231 
232  NT ntproduce_{mi::new_thread()};
233  mi::time_source<> timeproduce_{};
234  TNT tnt_{make_time(timeproduce_, ntproduce_)};
235 
236  NT ntcancel_{mi::new_thread()};
237  mi::time_source<> timecancel_{};
238  TNT tcncl_{make_time(timecancel_, ntcancel_)};
239 
240  std::atomic<int> signals_{0};
241  std::atomic<int> terminal_{0};
242  std::atomic<int> cancel_{0};
243  std::chrono::system_clock::time_point at_{mi::now(tnt_) + 100ms};
244 };
245 
246 TEST_F(ConcurrentFlowSingleSender, EarlySharedCancellation) {
247  cancellation_test(at_ - 50ms, [](auto& stop, auto& set_stop) {
248  return mi::shared_entangle(stop, set_stop);
249  });
250 
251  join();
252 
253  EXPECT_THAT(signals_, Eq(10011));
254 }
255 
256 TEST_F(ConcurrentFlowSingleSender, LateSharedCancellation) {
257  cancellation_test(at_ + 50ms, [](auto& stop, auto& set_stop) {
258  return mi::shared_entangle(stop, set_stop);
259  });
260 
261  join();
262 
263  EXPECT_THAT(signals_, Eq(10111))
264  << "expected that the starting, up.done, out.value and out.done signals are each recorded once";
265 }
266 
267 TEST_F(ConcurrentFlowSingleSender, RacingSharedCancellation) {
268  int total = 0;
269  int cancellostrace = 0; // 10111
270  int cancelled = 0; // 10011
271 
272  for (;;) {
273  reset();
274  cancellation_test(at_, [](auto& stop, auto& set_stop) {
275  return mi::shared_entangle(stop, set_stop);
276  });
277 
278  // accumulate known signals
279  ++total;
280  cancellostrace += signals_ == 10111;
281  cancelled += signals_ == 10011;
282 
283  EXPECT_THAT(total, Eq(cancellostrace + cancelled))
284  << signals_ << " <- this set of signals is unrecognized";
285 
286  ASSERT_THAT(total, Lt(100))
287  // too long, show the signals distribution
288  << "total " << total << ", cancel-lost-race " << cancellostrace
289  << ", cancelled " << cancelled;
290 
291  if (cancellostrace > 4 && cancelled > 4) {
292  // yay all known outcomes were observed!
293  break;
294  }
295  // try again
296  continue;
297  }
298 
299  join();
300 }
301 
302 TEST_F(ConcurrentFlowSingleSender, EarlyEntangledCancellation) {
303  cancellation_test(at_ - 50ms, [](auto& stop, auto& set_stop) {
304  return mi::entangle(stop, set_stop);
305  });
306 
307  join();
308 
309  EXPECT_THAT(signals_, Eq(10011));
310 }
311 
312 TEST_F(ConcurrentFlowSingleSender, LateEntangledCancellation) {
313  cancellation_test(at_ + 50ms, [](auto& stop, auto& set_stop) {
314  return mi::entangle(stop, set_stop);
315  });
316 
317  join();
318 
319  EXPECT_THAT(signals_, Eq(10111))
320  << "expected that the starting, up.done, out.value and out.done signals are each recorded once";
321 }
322 
323 TEST_F(ConcurrentFlowSingleSender, RacingEntangledCancellation) {
324  int total = 0;
325  int cancellostrace = 0; // 10111
326  int cancelled = 0; // 10011
327 
328  for (;;) {
329  reset();
330  cancellation_test(at_, [](auto& stop, auto& set_stop) {
331  return mi::entangle(stop, set_stop);
332  });
333 
334  // accumulate known signals
335  ++total;
336  cancellostrace += signals_ == 10111;
337  cancelled += signals_ == 10011;
338 
339  EXPECT_THAT(total, Eq(cancellostrace + cancelled))
340  << signals_ << " <- this set of signals is unrecognized";
341 
342  ASSERT_THAT(total, Lt(100))
343  // too long, show the signals distribution
344  << "total " << total << ", cancel-lost-race " << cancellostrace
345  << ", cancelled " << cancelled;
346 
347  if (cancellostrace > 4 && cancelled > 4) {
348  // yay all known outcomes were observed!
349  break;
350  }
351  // try again
352  continue;
353  }
354 
355  join();
356 }
locked_entangled_pair< T, Dual > lock_both(entangled< T, Dual > &e)
Definition: entangle.h:257
auto f
auto on_value(Fns...fns) -> on_value_fn< Fns... >
Definition: boosters.h:262
internal::EqMatcher< T > Eq(T x)
decltype(nt) NT
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
auto on_error(Fns...fns) -> on_error_fn< Fns... >
Definition: boosters.h:273
auto on_starting(Fns...fns) -> on_starting_fn< Fns... >
Definition: boosters.h:296
requires E e noexcept(noexcept(s.error(std::move(e))))
mi::invoke_result_t< decltype(make_time), mi::time_source<> &, NT & > TNT
Definition: FlowTest.cpp:132
#define MAKE(x)
Definition: FlowTest.cpp:42
void cancellation_test(std::chrono::system_clock::time_point at, MakeTokens make_tokens)
Definition: FlowTest.cpp:147
auto shared_entangle(First f, Second s) -> shared_entangled_pair< First, Second >
Definition: entangle.h:279
internal::LtMatcher< Rhs > Lt(Rhs x)
static void stop()
auto entangle(First f, Second s) -> entangled_pair< First, Second >
Definition: entangle.h:219
new_thread_executor new_thread()
Definition: new_thread.h:50
PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at
Definition: submit.h:387
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
auto make_time(mi::time_source<> &t, NT &ex)
Definition: FlowTest.cpp:126
#define EXPECT_THAT(value, matcher)
#define ASSERT_THAT(value, matcher)
auto on_done(Fn fn) -> on_done_fn< Fn >
Definition: boosters.h:285
#define join
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
decltype(folly::pushmi::invoke(std::declval< F >(), std::declval< As >()...)) invoke_result_t
Definition: functional.h:47
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
requires Invocable< ExecutorFactory & > &&Executor< invoke_result_t< ExecutorFactory & > > &&NeverBlocking< invoke_result_t< ExecutorFactory & > > auto make(NF nf, ExecutorFactory ef)
Definition: time_source.h:545
static const char tokens[256]
Definition: http_parser.c:184
TEST_F(ImmediateFlowSingleSender, EarlyCancellation)
Definition: FlowTest.cpp:105
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done