proxygen
for_each_2.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <cassert>
18 #include <iostream>
19 #include <vector>
20 
23 
24 using namespace folly::pushmi::aliases;
25 
26 template <class Executor, class Allocator = std::allocator<char>>
27 auto naive_executor_bulk_target(Executor e, Allocator a = Allocator{}) {
28  return [e, a](
29  auto init,
30  auto selector,
31  auto input,
32  auto&& func,
33  auto sb,
34  auto se,
35  auto out) {
36  using RS = decltype(selector);
37  using F = std::conditional_t<
39  decltype(func),
41  using Out = decltype(out);
42  try {
43  typename std::allocator_traits<Allocator>::template rebind_alloc<char>
44  allocState(a);
45  auto shared_state = std::allocate_shared<std::tuple<
46  std::exception_ptr, // first exception
47  Out, // destination
48  RS, // selector
49  F, // func
50  std::atomic<decltype(init(input))>, // accumulation
51  std::atomic<std::size_t>, // pending
52  std::atomic<std::size_t> // exception count (protects assignment to
53  // first exception)
54  >>(
55  allocState,
56  std::exception_ptr{},
57  std::move(out),
58  std::move(selector),
59  (decltype(func)&&)func,
60  init(std::move(input)),
61  1,
62  0);
63  e | op::submit([e, sb, se, shared_state](auto) {
64  auto stepDone = [](auto shared_state) {
65  // pending
66  if (--std::get<5>(*shared_state) == 0) {
67  // first exception
68  if (std::get<0>(*shared_state)) {
70  std::get<1>(*shared_state), std::get<0>(*shared_state));
71  return;
72  }
73  try {
74  // selector(accumulation)
75  auto result = std::get<2>(*shared_state)(
76  std::move(std::get<4>(*shared_state).load()));
77  mi::set_value(std::get<1>(*shared_state), std::move(result));
78  } catch (...) {
80  std::get<1>(*shared_state), std::current_exception());
81  }
82  }
83  };
84  for (decltype(sb) idx{sb}; idx != se;
85  ++idx, ++std::get<5>(*shared_state)) {
86  e | op::submit([shared_state, idx, stepDone](auto ex) {
87  try {
88  // this indicates to me that bulk is not the right abstraction
89  auto old = std::get<4>(*shared_state).load();
90  auto step = old;
91  do {
92  step = old;
93  // func(accumulation, idx)
94  std::get<3> (*shared_state)(step, idx);
95  } while (!std::get<4>(*shared_state)
96  .compare_exchange_strong(old, step));
97  } catch (...) {
98  // exception count
99  if (std::get<6>(*shared_state)++ == 0) {
100  // store first exception
101  std::get<0>(*shared_state) = std::current_exception();
102  } // else eat the exception
103  }
104  stepDone(shared_state);
105  });
106  }
107  stepDone(shared_state);
108  });
109  } catch (...) {
110  e |
111  op::submit([out = std::move(out), ep = std::current_exception()](
112  auto) mutable { mi::set_error(out, ep); });
113  }
114  };
115 }
116 
117 int main() {
118  mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
119 
120  std::vector<int> vec(10);
121 
122  mi::for_each(
123  naive_executor_bulk_target(p.executor()),
124  vec.begin(),
125  vec.end(),
126  [](int& x) { x = 42; });
127 
128  assert(
129  std::count(vec.begin(), vec.end(), 42) == static_cast<int>(vec.size()));
130 
131  std::cout << "OK" << std::endl;
132 
133  p.stop();
134  p.wait();
135 }
Definition: InvokeTest.cpp:58
LogLevel max
Definition: LogLevel.cpp:31
PskType type
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
const int x
PUSHMI_INLINE_VAR constexpr __adl::set_error_fn set_error
auto naive_executor_bulk_target(Executor e, Allocator a=Allocator{})
Definition: for_each_2.cpp:27
void init(int *argc, char ***argv, bool removeFlags)
Definition: Init.cpp:34
char a
static const char *const value
Definition: Conv.cpp:50
int * count
int main()
Definition: for_each_2.cpp:117
PUSHMI_INLINE_VAR constexpr __adl::set_value_fn set_value
vector< string > vec
Definition: StringTest.cpp:35
void for_each(T const &range, Function< void(typename T::value_type const &) const > const &func)