35 #include <folly/experimental/pushmi/pool.h> 47 template <
class ExecutorRef>
48 void value(ExecutorRef exec);
63 Invocable<decltype(mi::set_value), Up>)
void starting(Up up)
volatile {
69 template <
class ExecutorRef>
71 if (--*counter >= 0) {
89 std::chrono::system_clock::time_point
top() {
96 void submit(std::chrono::system_clock::time_point at, Out out) {
97 std::this_thread::sleep_until(at);
117 template <
class CancellationFactory>
119 CancellationFactory
cf;
134 using Stopper = decltype(
tokens.second);
136 explicit Data(Stopper stopper) : stopper(
std::move(stopper)) {}
144 (*(both.first))(both.second);
148 (*(both.first))(both.second);
155 if (!!both.first && !*(both.first)) {
168 auto set_stop = [](
auto&
stop) {
183 auto set_stop = [](
auto&
stop) {
235 std::atomic<bool>
stop;
237 auto p = std::make_shared<producer>(
std::move(out),
false);
240 explicit Data(std::shared_ptr<producer> p) : p(
std::move(p)) {}
241 std::shared_ptr<producer> p;
246 [counter = this->
counter](
auto&
data,
auto requested) {
251 while (!data.p->stop && --requested >= 0 &&
252 (!counter || --*counter >= 0)) {
258 if (!counter || *counter == 0) {
263 data.p->stop.store(
true);
267 data.p->stop.store(
true);
313 #define concept Concept 314 #include <nonius/nonius.h++> 320 while (--counter >=0) { 321 auto fortyTwo = op::just(42) | op::get<int>; 331 while (--counter >=0) { 332 auto fortyTwo = mi::make_single_sender([](auto out){ mi::set_value(out, 42); mi::set_done(out);}) | op::get<int>; 338 NONIUS_BENCHMARK("inline 1
'000 single", [](nonius::chronometer meter){ 339 std::atomic<int> counter{0}; 340 auto ie = inline_executor{}; 341 using IE = decltype(ie); 342 countdownsingle single{counter}; 344 counter.store(1'000);
346 while(counter.load() > 0);
347 return counter.load();
352 std::atomic<int> counter{0};
357 counter.store(1
'000); 358 ie | op::submit(mi::make_receiver(single)); 359 while(counter.load() > 0); 360 return counter.load(); 364 NONIUS_BENCHMARK("inline 1'000 many
", [](nonius::chronometer meter){ 365 std::atomic<int> counter{0}; 366 auto ie = inline_executor_many{}; 367 using IE = decltype(ie); 368 countdownmany many{counter}; 370 counter.store(1'000); 371 ie | op::submit(mi::make_receiver(many)); 372 while(counter.load() > 0); 373 return counter.load(); 377 NONIUS_BENCHMARK("inline 1
'000 flow_single shared", [](nonius::chronometer meter){ 378 std::atomic<int> counter{0}; 379 auto ie = inline_executor_flow_single_shared{}; 380 using IE = decltype(ie); 381 countdownflowsingle flowsingle{counter}; 383 counter.store(1'000);
385 while(counter.load() > 0);
386 return counter.load();
390 NONIUS_BENCHMARK(
"inline 1'000 flow_single entangle", [](nonius::chronometer meter){
391 std::atomic<int> counter{0};
393 using IE = decltype(
ie);
396 counter.store(1
'000); 397 ie | op::submit(mi::make_flow_receiver(flowsingle)); 398 while(counter.load() > 0); 399 return counter.load(); 403 NONIUS_BENCHMARK("inline 1'000 flow_single ignore cancellation
", [](nonius::chronometer meter){ 404 std::atomic<int> counter{0}; 405 auto ie = inline_executor_flow_single_ignore{}; 406 using IE = decltype(ie); 407 countdownflowsingle flowsingle{counter}; 409 counter.store(1'000); 410 ie | op::submit(mi::make_flow_receiver(flowsingle)); 411 while(counter.load() > 0); 412 return counter.load(); 416 NONIUS_BENCHMARK("inline 1
'000 flow_many", [](nonius::chronometer meter){ 417 std::atomic<int> counter{0}; 418 auto ie = inline_executor_flow_many{}; 419 using IE = decltype(ie); 420 countdownflowmany flowmany{counter}; 422 counter.store(1'000);
424 while(counter.load() > 0);
425 return counter.load();
429 NONIUS_BENCHMARK(
"inline 1 flow_many with 1'000 values pull 1", [](nonius::chronometer meter){
430 std::atomic<int> counter{0};
432 using IE = decltype(
ie);
434 counter.store(1
'000); 435 ie | op::for_each(mi::make_receiver()); 436 while(counter.load() > 0); 437 return counter.load(); 441 NONIUS_BENCHMARK("inline 1 flow_many with 1'000
values pull 1
'000", [](nonius::chronometer meter){ 442 std::atomic<int> counter{0}; 443 auto ie = inline_executor_flow_many{counter}; 444 using IE = decltype(ie); 446 counter.store(1'000);
450 while(counter.load() > 0); 451 return counter.load(); 455 NONIUS_BENCHMARK("inline 1'000 flow_many ignore cancellation
", [](nonius::chronometer meter){ 456 std::atomic<int> counter{0}; 457 auto ie = inline_executor_flow_many_ignore{}; 458 using IE = decltype(ie); 459 countdownflowmany flowmany{counter}; 461 counter.store(1'000); 462 ie | op::submit(mi::make_flow_receiver(flowmany)); 463 while(counter.load() > 0); 464 return counter.load(); 468 NONIUS_BENCHMARK("trampoline 1
'000 single get (blocking_submit)", [](nonius::chronometer meter){ 470 auto tr = mi::trampoline(); 471 using TR = decltype(tr); 474 while (--counter >=0) {
475 auto fortyTwo =
tr |
op::transform([](
auto){
return 42;}) | op::get<int>;
481 NONIUS_BENCHMARK(
"trampoline static derecursion 1'000", [](nonius::chronometer meter){
482 std::atomic<int> counter{0};
487 counter.store(1
'000); 488 tr | op::submit(single); 489 while(counter.load() > 0); 490 return counter.load(); 494 NONIUS_BENCHMARK("trampoline virtual derecursion 1'000
", [](nonius::chronometer meter){ 495 std::atomic<int> counter{0}; 496 auto tr = mi::trampoline(); 497 using TR = decltype(tr); 498 auto single = countdownsingle{counter}; 499 std::function<void(mi::any_executor_ref<>)> recurse{ 500 [&](auto exec) { ::folly::pushmi::set_value(single, exec); }}; 502 counter.store(1'000); 503 tr | op::submit([&](auto exec) { recurse(exec); }); 504 while(counter.load() > 0); 505 return counter.load(); 509 NONIUS_BENCHMARK("trampoline flow_many_sender 1
'000", [](nonius::chronometer meter){ 510 std::atomic<int> counter{0}; 511 auto tr = mi::trampoline(); 512 using TR = decltype(tr); 513 std::vector<int> values(1'000);
519 counter.store(1
'000); 520 f | op::for_each(mi::make_receiver()); 521 while(counter.load() > 0); 522 return counter.load(); 526 NONIUS_BENCHMARK("pool{1} submit 1'000
", [](nonius::chronometer meter){ 527 mi::pool pl{std::max(1u,std::thread::hardware_concurrency())}; 528 auto pe = pl.executor(); 529 using PE = decltype(pe); 530 std::atomic<int> counter{0}; 531 countdownsingle single{counter}; 533 counter.store(1'000); 534 pe | op::submit(single); 535 while(counter.load() > 0); 536 return counter.load(); 540 NONIUS_BENCHMARK("pool{hardware_concurrency}
submit 1
'000", [](nonius::chronometer meter){ 541 mi::pool pl{std::min(1u,std::thread::hardware_concurrency())}; 542 auto pe = pl.executor(); 543 using PE = decltype(pe); 544 std::atomic<int> counter{0}; 545 countdownsingle single{counter}; 547 counter.store(1'000);
549 while(counter.load() > 0);
550 return counter.load();
556 using NT = decltype(nt);
557 std::atomic<int> counter{0};
560 counter.store(1
'000); 561 nt | op::submit(single); 562 while(counter.load() > 0); 563 return counter.load(); 567 NONIUS_BENCHMARK("new thread blocking_submit 1'000
", [](nonius::chronometer meter){ 568 auto nt = mi::new_thread(); 569 using NT = decltype(nt); 570 std::atomic<int> counter{0}; 571 countdownsingle single{counter}; 573 counter.store(1'000); 574 nt | op::blocking_submit(single); 575 return counter.load(); 579 NONIUS_BENCHMARK("new thread +
time submit 1
'000", [](nonius::chronometer meter){ 580 auto nt = mi::new_thread(); 581 using NT = decltype(nt); 582 auto time = mi::time_source<>{}; 583 auto tnt = time.make(mi::systemNowF{}, [nt](){ return nt; })(); 584 using TNT = decltype(tnt); 585 std::atomic<int> counter{0}; 586 countdownsingle single{counter}; 588 counter.store(1'000);
590 while(counter.load() > 0);
591 return counter.load();
locked_entangled_pair< T, Dual > lock_both(entangled< T, Dual > &e)
std::atomic< int > * counter
detail::delegator< E > trampoline()
PUSHMI_INLINE_VAR constexpr detail::tap_fn tap
std::chrono::system_clock::time_point top()
std::enable_if_t< PropertySet< __properties_t< property_set_traits< T >>>, __properties_t< property_set_traits< T >>> properties_t
constexpr detail::Map< Move > move
std::chrono::steady_clock::time_point now()
inline_executor_flow_many()
mi::properties_t< decltype(R{}())> properties
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr detail::transform_fn transform
countdown(std::atomic< int > &c)
auto shared_entangle(First f, Second s) -> shared_entangled_pair< First, Second >
nonius::chronometer meter
countdownflowsingle flowsingle
PUSHMI_INLINE_VAR constexpr detail::blocking_submit_fn blocking_submit
auto entangle(First f, Second s) -> entangled_pair< First, Second >
new_thread_executor new_thread()
constexpr auto data(C &c) -> decltype(c.data())
countdownflowmany flowmany
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::operators::flow_from_fn flow_from
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::make_flow_receiver_fn make_flow_receiver
PUSHMI_INLINE_VAR constexpr __adl::set_starting_fn set_starting
requires mi::True &&mi::Invocable< decltype(mi::set_value), Up > void starting(Up up) volatile
requires requires(detail::apply_impl(std::declval< F >(), std::declval< Tuple >(), detail::tupidxs< Tuple >{}))) const expr decltype(auto) apply(F &&f
PUSHMI_TEMPLATE(class E=std::exception_ptr, class Wrapped)(requires Sender< detail
std::atomic< int > counter
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
void submit(std::chrono::system_clock::time_point at, Out out)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
PUSHMI_INLINE_VAR constexpr __adl::do_submit_fn submit
requires mi::Invocable< decltype(mi::set_value), Up, std::ptrdiff_t > void starting(Up up)
void value(ExecutorRef exec)
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::make_receiver_fn make_receiver
std::atomic< int > * counter
std::chrono::nanoseconds time()
static const char tokens[256]
std::vector< int > values(1'000)
NONIUS_BENCHMARK("ready 1'000 single get (submit)", [](nonius::chronometer meter){int counter{0};meter.measure([&]{counter=1'000;while(--counter >=0){auto fortyTwo=op::just(42)|op::get< int >;}return counter;});}) NONIUS_BENCHMARK("ready 1'000 single get (blocking_submit)"
inline_executor_flow_many(std::atomic< int > &c)
PUSHMI_INLINE_VAR constexpr __adl::set_done_fn set_done