proxygen
reduce_2.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <cassert>
18 #include <exception>
19 #include <iostream>
20 #include <numeric>
21 #include <vector>
22 
25 
26 using namespace folly::pushmi::aliases;
27 
28 template <class Executor, class Allocator = std::allocator<char>>
29 auto naive_executor_bulk_target(Executor e, Allocator a = Allocator{}) {
30  return [e, a](
31  auto init,
32  auto selector,
33  auto input,
34  auto&& func,
35  auto sb,
36  auto se,
37  auto out) {
38  using RS = decltype(selector);
39  using F = std::conditional_t<
41  decltype(func),
43  using Out = decltype(out);
44  try {
45  typename std::allocator_traits<Allocator>::template rebind_alloc<char>
46  allocState(a);
47  auto shared_state = std::allocate_shared<std::tuple<
48  std::exception_ptr, // first exception
49  Out, // destination
50  RS, // selector
51  F, // func
52  std::atomic<decltype(init(input))>, // accumulation
53  std::atomic<std::size_t>, // pending
54  std::atomic<std::size_t> // exception count (protects assignment to
55  // first exception)
56  >>(
57  allocState,
58  std::exception_ptr{},
59  std::move(out),
60  std::move(selector),
61  (decltype(func)&&)func,
62  init(std::move(input)),
63  1,
64  0);
65  e | op::submit([e, sb, se, shared_state](auto) {
66  auto stepDone = [](auto shared_state) {
67  // pending
68  if (--std::get<5>(*shared_state) == 0) {
69  // first exception
70  if (std::get<0>(*shared_state)) {
72  std::get<1>(*shared_state), std::get<0>(*shared_state));
73  return;
74  }
75  try {
76  // selector(accumulation)
77  auto result = std::get<2>(*shared_state)(
78  std::move(std::get<4>(*shared_state).load()));
79  mi::set_value(std::get<1>(*shared_state), std::move(result));
80  } catch (...) {
82  std::get<1>(*shared_state), std::current_exception());
83  }
84  }
85  };
86  for (decltype(sb) idx{sb}; idx != se;
87  ++idx, ++std::get<5>(*shared_state)) {
88  e | op::submit([shared_state, idx, stepDone](auto ex) {
89  try {
90  // this indicates to me that bulk is not the right abstraction
91  auto old = std::get<4>(*shared_state).load();
92  auto step = old;
93  do {
94  step = old;
95  // func(accumulation, idx)
96  std::get<3> (*shared_state)(step, idx);
97  } while (!std::get<4>(*shared_state)
98  .compare_exchange_strong(old, step));
99  } catch (...) {
100  // exception count
101  if (std::get<6>(*shared_state)++ == 0) {
102  // store first exception
103  std::get<0>(*shared_state) = std::current_exception();
104  } // else eat the exception
105  }
106  stepDone(shared_state);
107  });
108  }
109  stepDone(shared_state);
110  });
111  } catch (...) {
112  e |
113  op::submit([out = std::move(out), ep = std::current_exception()](
114  auto) mutable { mi::set_error(out, ep); });
115  }
116  };
117 }
118 
119 int main() {
120  mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
121 
122  std::vector<int> vec(10);
123  std::fill(vec.begin(), vec.end(), 4);
124 
125  auto fortyTwo = mi::reduce(
126  naive_executor_bulk_target(p.executor()),
127  vec.begin(),
128  vec.end(),
129  2,
130  std::plus<>{});
131 
132  assert(std::accumulate(vec.begin(), vec.end(), 2) == fortyTwo);
133 
134  std::cout << "OK" << std::endl;
135 
136  p.wait();
137 }
int main()
Definition: reduce_2.cpp:119
void accumulate(std::vector< std::size_t > &a, std::vector< std::size_t > const &d)
Definition: F14TestUtil.h:58
LogLevel max
Definition: LogLevel.cpp:31
PskType type
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
auto naive_executor_bulk_target(Executor e, Allocator a=Allocator{})
Definition: reduce_2.cpp:29
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
void init(int *argc, char ***argv, bool removeFlags)
Definition: Init.cpp:34
char a
static const char *const value
Definition: Conv.cpp:50
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
vector< string > vec
Definition: StringTest.cpp:35
PUSHMI_INLINE_VAR constexpr struct folly::pushmi::reduce_fn reduce