proxygen
reduce_2.cpp File Reference
#include <algorithm>
#include <cassert>
#include <exception>
#include <iostream>
#include <numeric>
#include <vector>
#include <folly/experimental/pushmi/examples/pool.h>
#include <folly/experimental/pushmi/examples/reduce.h>

Go to the source code of this file.

Functions

template<class Executor , class Allocator = std::allocator<char>>
auto naive_executor_bulk_target (Executor e, Allocator a=Allocator{})
 
int main ()
 

Function Documentation

int main ( void  )

Definition at line 119 of file reduce_2.cpp.

References folly::f14::accumulate(), max, naive_executor_bulk_target(), and folly::pushmi::reduce.

119  {
120  mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
121 
122  std::vector<int> vec(10);
123  std::fill(vec.begin(), vec.end(), 4);
124 
125  auto fortyTwo = mi::reduce(
126  naive_executor_bulk_target(p.executor()),
127  vec.begin(),
128  vec.end(),
129  2,
130  std::plus<>{});
131 
132  assert(std::accumulate(vec.begin(), vec.end(), 2) == fortyTwo);
133 
134  std::cout << "OK" << std::endl;
135 
136  p.wait();
137 }
void accumulate(std::vector< std::size_t > &a, std::vector< std::size_t > const &d)
Definition: F14TestUtil.h:58
LogLevel max
Definition: LogLevel.cpp:31
auto naive_executor_bulk_target(Executor e, Allocator a=Allocator{})
Definition: reduce_2.cpp:29
Definition: Traits.h:588
vector< string > vec
Definition: StringTest.cpp:35
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::reduce_fn reduce
template<class Executor , class Allocator = std::allocator<char>>
auto naive_executor_bulk_target ( Executor  e,
Allocator  a = Allocator{} 
)

Definition at line 29 of file reduce_2.cpp.

References a, folly::init(), folly::gen::move, folly::pushmi::set_error, folly::pushmi::set_value, submit, type, and value.

Referenced by main().

29  {}) {
30  return [e, a](
31  auto init,
32  auto selector,
33  auto input,
34  auto&& func,
35  auto sb,
36  auto se,
37  auto out) {
38  using RS = decltype(selector);
39  using F = std::conditional_t<
41  decltype(func),
43  using Out = decltype(out);
44  try {
45  typename std::allocator_traits<Allocator>::template rebind_alloc<char>
46  allocState(a);
47  auto shared_state = std::allocate_shared<std::tuple<
48  std::exception_ptr, // first exception
49  Out, // destination
50  RS, // selector
51  F, // func
52  std::atomic<decltype(init(input))>, // accumulation
53  std::atomic<std::size_t>, // pending
54  std::atomic<std::size_t> // exception count (protects assignment to
55  // first exception)
56  >>(
57  allocState,
58  std::exception_ptr{},
59  std::move(out),
60  std::move(selector),
61  (decltype(func)&&)func,
62  init(std::move(input)),
63  1,
64  0);
65  e | op::submit([e, sb, se, shared_state](auto) {
66  auto stepDone = [](auto shared_state) {
67  // pending
68  if (--std::get<5>(*shared_state) == 0) {
69  // first exception
70  if (std::get<0>(*shared_state)) {
72  std::get<1>(*shared_state), std::get<0>(*shared_state));
73  return;
74  }
75  try {
76  // selector(accumulation)
77  auto result = std::get<2>(*shared_state)(
78  std::move(std::get<4>(*shared_state).load()));
79  mi::set_value(std::get<1>(*shared_state), std::move(result));
80  } catch (...) {
82  std::get<1>(*shared_state), std::current_exception());
83  }
84  }
85  };
86  for (decltype(sb) idx{sb}; idx != se;
87  ++idx, ++std::get<5>(*shared_state)) {
88  e | op::submit([shared_state, idx, stepDone](auto ex) {
89  try {
90  // this indicates to me that bulk is not the right abstraction
91  auto old = std::get<4>(*shared_state).load();
92  auto step = old;
93  do {
94  step = old;
95  // func(accumulation, idx)
96  std::get<3> (*shared_state)(step, idx);
97  } while (!std::get<4>(*shared_state)
98  .compare_exchange_strong(old, step));
99  } catch (...) {
100  // exception count
101  if (std::get<6>(*shared_state)++ == 0) {
102  // store first exception
103  std::get<0>(*shared_state) = std::current_exception();
104  } // else eat the exception
105  }
106  stepDone(shared_state);
107  });
108  }
109  stepDone(shared_state);
110  });
111  } catch (...) {
112  e |
113  op::submit([out = std::move(out), ep = std::current_exception()](
114  auto) mutable { mi::set_error(out, ep); });
115  }
116  };
117 }
PskType type
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
void init()
char a
static const char *const value
Definition: Conv.cpp:50
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value