proxygen
Parallel-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef FOLLY_GEN_PARALLEL_H_
18 #error This file may only be included from folly/gen/Parallel.h
19 #endif
20 
21 #include <folly/MPMCQueue.h>
22 #include <folly/ScopeGuard.h>
24 #include <atomic>
25 #include <thread>
26 #include <vector>
27 
28 namespace folly {
29 namespace gen {
30 namespace detail {
31 
32 template <typename T>
35  std::atomic<size_t> producers_{0};
36  std::atomic<size_t> consumers_{0};
39 
40  public:
41  explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
42 
44  CHECK(!producers());
45  CHECK(!consumers());
46  }
47 
48  void openProducer() {
49  ++producers_;
50  }
51  void openConsumer() {
52  ++consumers_;
53  }
54 
56  size_t producers = producers_--;
57  CHECK(producers);
58  if (producers == 1) { // last producer
59  wakeConsumer_.notifyAll();
60  }
61  }
62 
64  size_t consumers = consumers_--;
65  CHECK(consumers);
66  if (consumers == 1) { // last consumer
67  wakeProducer_.notifyAll();
68  }
69  }
70 
71  size_t producers() const {
72  return producers_.load(std::memory_order_acquire);
73  }
74 
75  size_t consumers() const {
76  return consumers_.load(std::memory_order_acquire);
77  }
78 
79  template <typename... Args>
80  bool writeUnlessFull(Args&&... args) noexcept {
81  if (queue_.write(std::forward<Args>(args)...)) {
82  // wake consumers to pick up new value
83  wakeConsumer_.notify();
84  return true;
85  }
86  return false;
87  }
88 
89  template <typename... Args>
90  bool writeUnlessClosed(Args&&... args) {
91  // write if there's room
92  while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
93  // if write fails, check if there are still consumers listening
94  auto key = wakeProducer_.prepareWait();
95  if (!consumers()) {
96  // no consumers left; bail out
97  wakeProducer_.cancelWait();
98  return false;
99  }
100  wakeProducer_.wait(key);
101  }
102  // wake consumers to pick up new value
103  wakeConsumer_.notify();
104  return true;
105  }
106 
107  bool readUnlessEmpty(T& out) {
108  if (queue_.read(out)) {
109  // wake producers to fill empty space
110  wakeProducer_.notify();
111  return true;
112  }
113  return false;
114  }
115 
116  bool readUnlessClosed(T& out) {
117  while (!queue_.readIfNotEmpty(out)) {
118  auto key = wakeConsumer_.prepareWait();
119  if (!producers()) {
120  // wake producers to fill empty space
121  wakeProducer_.notify();
122  return false;
123  }
124  wakeConsumer_.wait(key);
125  }
126  // wake writers blocked by full queue
127  wakeProducer_.notify();
128  return true;
129  }
130 };
131 
132 template <class Sink>
133 class Sub : public Operator<Sub<Sink>> {
134  Sink sink_;
135 
136  public:
137  explicit Sub(Sink sink) : sink_(sink) {}
138 
139  template <
140  class Value,
141  class Source,
142  class Result =
143  decltype(std::declval<Sink>().compose(std::declval<Source>())),
145  Just compose(const GenImpl<Value, Source>& source) const {
146  return Just(source | sink_);
147  }
148 };
149 
150 template <class Ops>
151 class Parallel : public Operator<Parallel<Ops>> {
152  Ops ops_;
153  size_t threads_;
154 
155  public:
156  Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
157 
158  template <
159  class Input,
160  class Source,
161  class InputDecayed = typename std::decay<Input>::type,
162  class Composed =
163  decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
164  class Output = typename Composed::ValueType,
165  class OutputDecayed = typename std::decay<Output>::type>
166  class Generator : public GenImpl<
167  OutputDecayed&&,
168  Generator<
169  Input,
170  Source,
171  InputDecayed,
172  Composed,
173  Output,
174  OutputDecayed>> {
175  Source source_;
176  Ops ops_;
177  size_t threads_;
178 
181 
182  class Puller : public GenImpl<InputDecayed&&, Puller> {
184 
185  public:
186  explicit Puller(InQueue* queue) : queue_(queue) {}
187 
188  template <class Handler>
189  bool apply(Handler&& handler) const {
190  InputDecayed input;
191  while (queue_->readUnlessClosed(input)) {
192  if (!handler(std::move(input))) {
193  return false;
194  }
195  }
196  return true;
197  }
198 
199  template <class Body>
200  void foreach(Body&& body) const {
201  InputDecayed input;
202  while (queue_->readUnlessClosed(input)) {
203  body(std::move(input));
204  }
205  }
206  };
207 
208  template <bool all = false>
209  class Pusher : public Operator<Pusher<all>> {
211 
212  public:
213  explicit Pusher(OutQueue* queue) : queue_(queue) {}
214 
215  template <class Value, class InnerSource>
216  void compose(const GenImpl<Value, InnerSource>& source) const {
217  if (all) {
218  source.self().foreach([&](Value value) {
219  queue_->writeUnlessClosed(std::forward<Value>(value));
220  });
221  } else {
222  source.self().apply([&](Value value) {
223  return queue_->writeUnlessClosed(std::forward<Value>(value));
224  });
225  }
226  }
227  };
228 
229  template <bool all = false>
230  class Executor {
235  std::vector<std::thread> workers_;
236  const Ops* ops_;
237 
238  void work() {
239  puller_ | *ops_ | pusher_;
240  }
241 
242  public:
243  Executor(size_t threads, const Ops* ops)
244  : inQueue_(threads * 4),
245  outQueue_(threads * 4),
246  puller_(&inQueue_),
247  pusher_(&outQueue_),
248  ops_(ops) {
249  inQueue_.openProducer();
250  outQueue_.openConsumer();
251  for (size_t t = 0; t < threads; ++t) {
252  inQueue_.openConsumer();
253  outQueue_.openProducer();
254  workers_.emplace_back([this] {
255  SCOPE_EXIT {
256  inQueue_.closeOutputConsumer();
257  outQueue_.closeInputProducer();
258  };
259  this->work();
260  });
261  }
262  }
263 
265  if (inQueue_.producers()) {
266  inQueue_.closeInputProducer();
267  }
268  if (outQueue_.consumers()) {
269  outQueue_.closeOutputConsumer();
270  }
271  while (!workers_.empty()) {
272  workers_.back().join();
273  workers_.pop_back();
274  }
275  CHECK(!inQueue_.consumers());
276  CHECK(!outQueue_.producers());
277  }
278 
280  inQueue_.closeInputProducer();
281  }
282 
284  outQueue_.closeOutputConsumer();
285  }
286 
287  bool writeUnlessClosed(Input&& input) {
288  return inQueue_.writeUnlessClosed(std::forward<Input>(input));
289  }
290 
291  bool writeUnlessFull(Input&& input) {
292  return inQueue_.writeUnlessFull(std::forward<Input>(input));
293  }
294 
295  bool readUnlessClosed(OutputDecayed& output) {
296  return outQueue_.readUnlessClosed(output);
297  }
298 
299  bool readUnlessEmpty(OutputDecayed& output) {
300  return outQueue_.readUnlessEmpty(output);
301  }
302  };
303 
304  public:
305  Generator(Source source, Ops ops, size_t threads)
306  : source_(std::move(source)),
307  ops_(std::move(ops)),
308  threads_(
309  threads
310  ? threads
311  : size_t(std::max<long>(1, sysconf(_SC_NPROCESSORS_CONF)))) {}
312 
313  template <class Handler>
314  bool apply(Handler&& handler) const {
315  Executor<false> executor(threads_, &ops_);
316  bool more = true;
317  source_.apply([&](Input input) {
318  if (executor.writeUnlessFull(std::forward<Input>(input))) {
319  return true;
320  }
321  OutputDecayed output;
322  while (executor.readUnlessEmpty(output)) {
323  if (!handler(std::move(output))) {
324  more = false;
325  return false;
326  }
327  }
328  if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
329  return false;
330  }
331  return true;
332  });
333  executor.closeInputProducer();
334 
335  if (more) {
336  OutputDecayed output;
337  while (executor.readUnlessClosed(output)) {
338  if (!handler(std::move(output))) {
339  more = false;
340  break;
341  }
342  }
343  }
344  executor.closeOutputConsumer();
345 
346  return more;
347  }
348 
349  template <class Body>
350  void foreach(Body&& body) const {
351  Executor<true> executor(threads_, &ops_);
352  source_.foreach([&](Input input) {
353  if (executor.writeUnlessFull(std::forward<Input>(input))) {
354  return;
355  }
356  OutputDecayed output;
357  while (executor.readUnlessEmpty(output)) {
358  body(std::move(output));
359  }
360  CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
361  });
362  executor.closeInputProducer();
363 
364  OutputDecayed output;
365  while (executor.readUnlessClosed(output)) {
366  body(std::move(output));
367  }
368  executor.closeOutputConsumer();
369  }
370  };
371 
372  template <class Value, class Source>
374  return Generator<Value, Source>(source.self(), ops_, threads_);
375  }
376 
377  template <class Value, class Source>
379  return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
380  }
381 };
382 
396 template <class Iterator>
398  : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
401 
402  public:
403  ChunkedRangeSource() = default;
405  : chunkSize_(chunkSize), range_(std::move(range)) {}
406 
407  template <class Handler>
408  bool apply(Handler&& handler) const {
409  auto remaining = range_;
410  while (!remaining.empty()) {
411  auto chunk = remaining.subpiece(0, chunkSize_);
412  remaining.advance(chunk.size());
413  auto gen = RangeSource<Iterator>(chunk);
414  if (!handler(std::move(gen))) {
415  return false;
416  }
417  }
418  return true;
419  }
420 };
421 
422 } // namespace detail
423 
424 } // namespace gen
425 } // namespace folly
void notifyAll() noexcept
Definition: EventCount.h:138
PskType type
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
folly::std T
bool apply(Handler &&handler) const
Definition: Parallel-inl.h:408
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
ChunkedRangeSource(int chunkSize, Range< Iterator > range)
Definition: Parallel-inl.h:404
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
Generator< Value, Source > compose(const GenImpl< Value, Source > &source) const
Definition: Parallel-inl.h:373
bool apply(Handler &&handler) const
Definition: Parallel-inl.h:189
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
StringPiece range_
Definition: json.cpp:324
std::vector< std::thread::id > threads
void handler(int, siginfo_t *, void *)
Gen range(Value begin, Value end)
Definition: Base.h:467
Generator< Value, Source > compose(GenImpl< Value, Source > &&source) const
Definition: Parallel-inl.h:378
bool apply(Handler &&handler) const
Definition: Parallel-inl.h:314
void wait(Key key) noexcept
Definition: EventCount.h:163
bool Value(const T &value, M matcher)
void compose(const GenImpl< Value, InnerSource > &source) const
Definition: Parallel-inl.h:216
const int ops
Parallel(Ops ops, size_t threads)
Definition: Parallel-inl.h:156
std::atomic< size_t > producers_
Definition: Parallel-inl.h:35
std::atomic< size_t > consumers_
Definition: Parallel-inl.h:36
void cancelWait() noexcept
Definition: EventCount.h:155
Executor(size_t threads, const Ops *ops)
Definition: Parallel-inl.h:243
constexpr detail::Min< Identity, Greater > max
Definition: Base-inl.h:2561
Generator(Source source, Ops ops, size_t threads)
Definition: Parallel-inl.h:305
const Self & self() const
Definition: Core-inl.h:71
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void notify() noexcept
Definition: EventCount.h:134
Key prepareWait() noexcept
Definition: EventCount.h:150
Just compose(const GenImpl< Value, Source > &source) const
Definition: Parallel-inl.h:145
bool writeUnlessClosed(Args &&...args)
Definition: Parallel-inl.h:90
bool writeUnlessFull(Args &&...args) noexcept
Definition: Parallel-inl.h:80
Composed all(Predicate pred=Predicate())
Definition: Base.h:786