proxygen
PushmiBenchmarks.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <vector>
17 
27 
31 
34 
35 #include <folly/experimental/pushmi/pool.h>
36 
37 using namespace folly::pushmi::aliases;
38 
39 template <class R>
40 struct countdown {
41  explicit countdown(std::atomic<int>& c) : counter(&c) {}
42 
43  using properties = mi::properties_t<decltype(R{}())>;
44 
45  std::atomic<int>* counter;
46 
47  template <class ExecutorRef>
48  void value(ExecutorRef exec);
49  template <class E>
50  void error(E e) {
51  std::abort();
52  }
53  void done() {}
54  PUSHMI_TEMPLATE(class Up)
55  (requires mi::Invocable<
56  decltype(mi::set_value),
57  Up,
58  std::ptrdiff_t>)void starting(Up up) {
59  mi::set_value(up, 1);
60  }
61  PUSHMI_TEMPLATE(class Up)
62  (requires mi::True<>&& mi::
63  Invocable<decltype(mi::set_value), Up>)void starting(Up up) volatile {
64  mi::set_value(up);
65  }
66 };
67 
68 template <class R>
69 template <class ExecutorRef>
70 void countdown<R>::value(ExecutorRef exec) {
71  if (--*counter >= 0) {
72  exec | op::submit(R{}(*this));
73  }
74 }
75 
80 
88 
89  std::chrono::system_clock::time_point top() {
91  }
92  auto executor() {
93  return *this;
94  }
95  template <class Out>
96  void submit(std::chrono::system_clock::time_point at, Out out) {
97  std::this_thread::sleep_until(at);
98  ::mi::set_value(out, *this);
99  }
100 };
101 
108  auto executor() {
109  return inline_time_executor{};
110  }
111  template <class Out>
112  void submit(Out out) {
113  ::mi::set_value(out, *this);
114  }
115 };
116 
117 template <class CancellationFactory>
119  CancellationFactory cf;
120 
127  auto executor() {
128  return inline_time_executor{};
129  }
130  template <class Out>
131  void submit(Out out) {
132  auto tokens = cf();
133 
134  using Stopper = decltype(tokens.second);
135  struct Data : mi::receiver<> {
136  explicit Data(Stopper stopper) : stopper(std::move(stopper)) {}
137  Stopper stopper;
138  };
139  auto up = mi::MAKE(receiver)(
140  Data{std::move(tokens.second)},
141  [](auto& data) {},
142  [](auto& data, auto e) noexcept {
143  auto both = lock_both(data.stopper);
144  (*(both.first))(both.second);
145  },
146  [](auto& data) {
147  auto both = lock_both(data.stopper);
148  (*(both.first))(both.second);
149  });
150 
151  // pass reference for cancellation.
152  ::mi::set_starting(out, std::move(up));
153 
154  auto both = lock_both(tokens.first);
155  if (!!both.first && !*(both.first)) {
156  ::mi::set_value(out, *this);
157  } else {
158  // cancellation is not an error
159  ::mi::set_done(out);
160  }
161  }
162 };
163 
165  auto operator()() {
166  // boolean cancellation
167  bool stop = false;
168  auto set_stop = [](auto& stop) {
169  if (!!stop) {
170  *stop = true;
171  }
172  };
173  return mi::shared_entangle(stop, set_stop);
174  }
175 };
178 
180  auto operator()() {
181  // boolean cancellation
182  bool stop = false;
183  auto set_stop = [](auto& stop) {
184  if (!!stop) {
185  *stop = true;
186  }
187  };
188  return mi::entangle(stop, set_stop);
189  }
190 };
193 
201  auto executor() {
202  return inline_time_executor{};
203  }
204  template <class Out>
205  void submit(Out out) {
206  // pass reference for cancellation.
208 
209  ::mi::set_value(out, *this);
210  }
211 };
212 
215  inline_executor_flow_many(std::atomic<int>& c) : counter(&c) {}
216 
217  std::atomic<int>* counter;
218 
225 
226  auto executor() {
227  return inline_time_executor{};
228  }
229  template <class Out>
230  void submit(Out out) {
231  // boolean cancellation
232  struct producer {
233  producer(Out out, bool s) : out(std::move(out)), stop(s) {}
234  Out out;
235  std::atomic<bool> stop;
236  };
237  auto p = std::make_shared<producer>(std::move(out), false);
238 
239  struct Data : mi::receiver<> {
240  explicit Data(std::shared_ptr<producer> p) : p(std::move(p)) {}
241  std::shared_ptr<producer> p;
242  };
243 
244  auto up = mi::MAKE(receiver)(
245  Data{p},
246  [counter = this->counter](auto& data, auto requested) {
247  if (requested < 1) {
248  return;
249  }
250  // this is re-entrant
251  while (!data.p->stop && --requested >= 0 &&
252  (!counter || --*counter >= 0)) {
254  data.p->out,
255  !!counter ? inline_executor_flow_many{*counter}
257  }
258  if (!counter || *counter == 0) {
259  ::mi::set_done(data.p->out);
260  }
261  },
262  [](auto& data, auto e) noexcept {
263  data.p->stop.store(true);
264  ::mi::set_done(data.p->out);
265  },
266  [](auto& data) {
267  data.p->stop.store(true);
268  ::mi::set_done(data.p->out);
269  });
270 
271  // pass reference for cancellation.
272  ::mi::set_starting(p->out, std::move(up));
273  }
274 };
275 
283  auto executor() {
284  return inline_time_executor{};
285  }
286  template <class Out>
287  void submit(Out out) {
288  // pass reference for cancellation.
290 
291  ::mi::set_value(out, *this);
292 
293  ::mi::set_done(out);
294  }
295 };
296 
303  auto executor() {
304  return inline_time_executor{};
305  }
306  template <class Out>
307  void submit(Out out) {
308  ::mi::set_value(out, *this);
309  ::mi::set_done(out);
310  }
311 };
312 
313 #define concept Concept
314 #include <nonius/nonius.h++>
315 
316 NONIUS_BENCHMARK("ready 1'000 single get (submit)", [](nonius::chronometer meter){
317  int counter{0};
318  meter.measure([&]{
319  counter = 1'000;
320  while (--counter >=0) {
321  auto fortyTwo = op::just(42) | op::get<int>;
322  }
323  return counter;
324  });
325 })
326 
327 NONIUS_BENCHMARK("ready 1'000 single get (blocking_submit)", [](nonius::chronometer meter){
328  int counter{0};
329  meter.measure([&]{
330  counter = 1'000;
331  while (--counter >=0) {
332  auto fortyTwo = mi::make_single_sender([](auto out){ mi::set_value(out, 42); mi::set_done(out);}) | op::get<int>;
333  }
334  return counter;
335  });
336 })
337 
338 NONIUS_BENCHMARK("inline 1'000 single", [](nonius::chronometer meter){
339  std::atomic<int> counter{0};
340  auto ie = inline_executor{};
341  using IE = decltype(ie);
342  countdownsingle single{counter};
343  meter.measure([&]{
344  counter.store(1'000);
346  while(counter.load() > 0);
347  return counter.load();
348  });
349 })
350 
351 NONIUS_BENCHMARK("inline 1'000 time single", [](nonius::chronometer meter){
352  std::atomic<int> counter{0};
354  using IE = decltype(ie);
356  meter.measure([&]{
357  counter.store(1'000);
358  ie | op::submit(mi::make_receiver(single));
359  while(counter.load() > 0);
360  return counter.load();
361  });
362 })
363 
364 NONIUS_BENCHMARK("inline 1'000 many", [](nonius::chronometer meter){
365  std::atomic<int> counter{0};
366  auto ie = inline_executor_many{};
367  using IE = decltype(ie);
368  countdownmany many{counter};
369  meter.measure([&]{
370  counter.store(1'000);
371  ie | op::submit(mi::make_receiver(many));
372  while(counter.load() > 0);
373  return counter.load();
374  });
375 })
376 
377 NONIUS_BENCHMARK("inline 1'000 flow_single shared", [](nonius::chronometer meter){
378  std::atomic<int> counter{0};
379  auto ie = inline_executor_flow_single_shared{};
380  using IE = decltype(ie);
381  countdownflowsingle flowsingle{counter};
382  meter.measure([&]{
383  counter.store(1'000);
385  while(counter.load() > 0);
386  return counter.load();
387  });
388 })
389 
390 NONIUS_BENCHMARK("inline 1'000 flow_single entangle", [](nonius::chronometer meter){
391  std::atomic<int> counter{0};
393  using IE = decltype(ie);
395  meter.measure([&]{
396  counter.store(1'000);
397  ie | op::submit(mi::make_flow_receiver(flowsingle));
398  while(counter.load() > 0);
399  return counter.load();
400  });
401 })
402 
403 NONIUS_BENCHMARK("inline 1'000 flow_single ignore cancellation", [](nonius::chronometer meter){
404  std::atomic<int> counter{0};
405  auto ie = inline_executor_flow_single_ignore{};
406  using IE = decltype(ie);
407  countdownflowsingle flowsingle{counter};
408  meter.measure([&]{
409  counter.store(1'000);
410  ie | op::submit(mi::make_flow_receiver(flowsingle));
411  while(counter.load() > 0);
412  return counter.load();
413  });
414 })
415 
416 NONIUS_BENCHMARK("inline 1'000 flow_many", [](nonius::chronometer meter){
417  std::atomic<int> counter{0};
418  auto ie = inline_executor_flow_many{};
419  using IE = decltype(ie);
420  countdownflowmany flowmany{counter};
421  meter.measure([&]{
422  counter.store(1'000);
424  while(counter.load() > 0);
425  return counter.load();
426  });
427 })
428 
429 NONIUS_BENCHMARK("inline 1 flow_many with 1'000 values pull 1", [](nonius::chronometer meter){
430  std::atomic<int> counter{0};
431  auto ie = inline_executor_flow_many{counter};
432  using IE = decltype(ie);
433  meter.measure([&]{
434  counter.store(1'000);
435  ie | op::for_each(mi::make_receiver());
436  while(counter.load() > 0);
437  return counter.load();
438  });
439 })
440 
441 NONIUS_BENCHMARK("inline 1 flow_many with 1'000 values pull 1'000", [](nonius::chronometer meter){
442  std::atomic<int> counter{0};
443  auto ie = inline_executor_flow_many{counter};
444  using IE = decltype(ie);
445  meter.measure([&]{
446  counter.store(1'000);
448  mi::set_value(up, 1'000);
449  }));
450  while(counter.load() > 0);
451  return counter.load();
452  });
453 })
454 
455 NONIUS_BENCHMARK("inline 1'000 flow_many ignore cancellation", [](nonius::chronometer meter){
456  std::atomic<int> counter{0};
457  auto ie = inline_executor_flow_many_ignore{};
458  using IE = decltype(ie);
459  countdownflowmany flowmany{counter};
460  meter.measure([&]{
461  counter.store(1'000);
462  ie | op::submit(mi::make_flow_receiver(flowmany));
463  while(counter.load() > 0);
464  return counter.load();
465  });
466 })
467 
468 NONIUS_BENCHMARK("trampoline 1'000 single get (blocking_submit)", [](nonius::chronometer meter){
469  int counter{0};
470  auto tr = mi::trampoline();
471  using TR = decltype(tr);
472  meter.measure([&]{
473  counter = 1'000;
474  while (--counter >=0) {
475  auto fortyTwo = tr | op::transform([](auto){return 42;}) | op::get<int>;
476  }
477  return counter;
478  });
479 })
480 
481 NONIUS_BENCHMARK("trampoline static derecursion 1'000", [](nonius::chronometer meter){
482  std::atomic<int> counter{0};
483  auto tr = mi::trampoline();
484  using TR = decltype(tr);
485  countdownsingle single{counter};
486  meter.measure([&]{
487  counter.store(1'000);
488  tr | op::submit(single);
489  while(counter.load() > 0);
490  return counter.load();
491  });
492 })
493 
494 NONIUS_BENCHMARK("trampoline virtual derecursion 1'000", [](nonius::chronometer meter){
495  std::atomic<int> counter{0};
496  auto tr = mi::trampoline();
497  using TR = decltype(tr);
498  auto single = countdownsingle{counter};
499  std::function<void(mi::any_executor_ref<>)> recurse{
500  [&](auto exec) { ::folly::pushmi::set_value(single, exec); }};
501  meter.measure([&]{
502  counter.store(1'000);
503  tr | op::submit([&](auto exec) { recurse(exec); });
504  while(counter.load() > 0);
505  return counter.load();
506  });
507 })
508 
509 NONIUS_BENCHMARK("trampoline flow_many_sender 1'000", [](nonius::chronometer meter){
510  std::atomic<int> counter{0};
511  auto tr = mi::trampoline();
512  using TR = decltype(tr);
513  std::vector<int> values(1'000);
514  std::iota(values.begin(), values.end(), 1);
515  auto f = op::flow_from(values, tr) | op::tap([&](int){
516  --counter;
517  });
518  meter.measure([&]{
519  counter.store(1'000);
520  f | op::for_each(mi::make_receiver());
521  while(counter.load() > 0);
522  return counter.load();
523  });
524 })
525 
526 NONIUS_BENCHMARK("pool{1} submit 1'000", [](nonius::chronometer meter){
527  mi::pool pl{std::max(1u,std::thread::hardware_concurrency())};
528  auto pe = pl.executor();
529  using PE = decltype(pe);
530  std::atomic<int> counter{0};
531  countdownsingle single{counter};
532  meter.measure([&]{
533  counter.store(1'000);
534  pe | op::submit(single);
535  while(counter.load() > 0);
536  return counter.load();
537  });
538 })
539 
540 NONIUS_BENCHMARK("pool{hardware_concurrency} submit 1'000", [](nonius::chronometer meter){
541  mi::pool pl{std::min(1u,std::thread::hardware_concurrency())};
542  auto pe = pl.executor();
543  using PE = decltype(pe);
544  std::atomic<int> counter{0};
545  countdownsingle single{counter};
546  meter.measure([&]{
547  counter.store(1'000);
548  pe | op::submit(single);
549  while(counter.load() > 0);
550  return counter.load();
551  });
552 })
553 
554 NONIUS_BENCHMARK("new thread submit 1'000", [](nonius::chronometer meter){
555  auto nt = mi::new_thread();
556  using NT = decltype(nt);
557  std::atomic<int> counter{0};
558  countdownsingle single{counter};
559  meter.measure([&]{
560  counter.store(1'000);
561  nt | op::submit(single);
562  while(counter.load() > 0);
563  return counter.load();
564  });
565 })
566 
567 NONIUS_BENCHMARK("new thread blocking_submit 1'000", [](nonius::chronometer meter){
568  auto nt = mi::new_thread();
569  using NT = decltype(nt);
570  std::atomic<int> counter{0};
571  countdownsingle single{counter};
572  meter.measure([&]{
573  counter.store(1'000);
574  nt | op::blocking_submit(single);
575  return counter.load();
576  });
577 })
578 
579 NONIUS_BENCHMARK("new thread + time submit 1'000", [](nonius::chronometer meter){
580  auto nt = mi::new_thread();
581  using NT = decltype(nt);
582  auto time = mi::time_source<>{};
583  auto tnt = time.make(mi::systemNowF{}, [nt](){ return nt; })();
584  using TNT = decltype(tnt);
585  std::atomic<int> counter{0};
586  countdownsingle single{counter};
587  meter.measure([&]{
588  counter.store(1'000);
589  tnt | op::submit(single);
590  while(counter.load() > 0);
591  return counter.load();
592  });
593  time.join();
594 })
locked_entangled_pair< T, Dual > lock_both(entangled< T, Dual > &e)
Definition: entangle.h:257
auto f
decltype(ie) IE
std::atomic< int > * counter
detail::delegator< E > trampoline()
Definition: trampoline.h:261
PUSHMI_INLINE_VAR constexpr detail::tap_fn tap
Definition: tap.h:126
std::chrono::system_clock::time_point top()
std::enable_if_t< PropertySet< __properties_t< property_set_traits< T >>>, __properties_t< property_set_traits< T >>> properties_t
Definition: properties.h:105
decltype(nt) NT
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::chrono::steady_clock::time_point now()
mi::properties_t< decltype(R{}())> properties
requires E e noexcept(noexcept(s.error(std::move(e))))
decltype(tr) TR
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
Definition: transform.h:158
#define nullptr
Definition: http_parser.c:41
countdown(std::atomic< int > &c)
#define MAKE(x)
Definition: pool.h:29
auto shared_entangle(First f, Second s) -> shared_entangled_pair< First, Second >
Definition: entangle.h:279
nonius::chronometer meter
countdownflowsingle flowsingle
PUSHMI_INLINE_VAR constexpr detail::blocking_submit_fn blocking_submit
Definition: submit.h:389
static void stop()
auto entangle(First f, Second s) -> entangled_pair< First, Second >
Definition: entangle.h:219
new_thread_executor new_thread()
Definition: new_thread.h:50
auto tr
auto ie
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
countdownflowmany flowmany
void submit(Out out)
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::operators::flow_from_fn flow_from
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::make_flow_receiver_fn make_flow_receiver
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
requires mi::True &&mi::Invocable< decltype(mi::set_value), Up > void starting(Up up) volatile
countdownsingle single
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
Definition: executor.h:102
auto pe
std::atomic< int > counter
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
static set< string > s
void submit(std::chrono::system_clock::time_point at, Out out)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void error(E e)
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit
requires mi::Invocable< decltype(mi::set_value), Up, std::ptrdiff_t > void starting(Up up)
void value(ExecutorRef exec)
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::make_receiver_fn make_receiver
char c
std::atomic< int > * counter
std::chrono::nanoseconds time()
static const char tokens[256]
Definition: http_parser.c:184
std::vector< int > values(1'000)
NONIUS_BENCHMARK("ready 1'000 single get (submit)", [](nonius::chronometer meter){int counter{0};meter.measure([&]{counter=1'000;while(--counter >=0){auto fortyTwo=op::just(42)|op::get< int >;}return counter;});}) NONIUS_BENCHMARK("ready 1'000 single get (blocking_submit)"
inline_executor_flow_many(std::atomic< int > &c)
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done