17 #ifndef FOLLY_GEN_PARALLEL_H_ 18 #error This file may only be included from folly/gen/Parallel.h 72 return producers_.load(std::memory_order_acquire);
76 return consumers_.load(std::memory_order_acquire);
79 template <
typename...
Args>
81 if (queue_.write(std::forward<Args>(args)...)) {
89 template <
typename...
Args>
92 while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
100 wakeProducer_.
wait(key);
108 if (queue_.read(out)) {
117 while (!queue_.readIfNotEmpty(out)) {
124 wakeConsumer_.
wait(key);
132 template <
class Sink>
137 explicit Sub(Sink sink) : sink_(sink) {}
143 decltype(std::declval<Sink>().compose(std::declval<Source>())),
146 return Just(source | sink_);
164 class Output =
typename Composed::ValueType,
188 template <
class Handler>
199 template <
class Body>
200 void foreach(Body&& body)
const {
208 template <
bool all = false>
215 template <
class Value,
class InnerSource>
229 template <
bool all = false>
239 puller_ | *ops_ | pusher_;
244 : inQueue_(threads * 4),
245 outQueue_(threads * 4),
254 workers_.emplace_back([
this] {
271 while (!workers_.empty()) {
272 workers_.back().join();
311 : size_t(
std::
max<long>(1, sysconf(_SC_NPROCESSORS_CONF)))) {}
313 template <
class Handler>
317 source_.apply([&](Input input) {
349 template <
class Body>
350 void foreach(Body&& body)
const {
352 source_.foreach([&](Input input) {
372 template <
class Value,
class Source>
377 template <
class Value,
class Source>
396 template <
class Iterator>
398 :
public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
405 : chunkSize_(chunkSize), range_(
std::
move(range)) {}
407 template <
class Handler>
410 while (!remaining.empty()) {
411 auto chunk = remaining.subpiece(0, chunkSize_);
412 remaining.advance(chunk.size());
void notifyAll() noexcept
folly::EventCount wakeConsumer_
bool readUnlessClosed(OutputDecayed &output)
constexpr detail::Map< Move > move
bool apply(Handler &&handler) const
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
ChunkedRangeSource(int chunkSize, Range< Iterator > range)
—— Concurrent Priority Queue Implementation ——
Generator< Value, Source > compose(const GenImpl< Value, Source > &source) const
bool apply(Handler &&handler) const
requires E e noexcept(noexcept(s.error(std::move(e))))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
void closeInputProducer()
std::vector< std::thread::id > threads
void handler(int, siginfo_t *, void *)
Gen range(Value begin, Value end)
Generator< Value, Source > compose(GenImpl< Value, Source > &&source) const
bool readUnlessEmpty(OutputDecayed &output)
void closeOutputConsumer()
bool apply(Handler &&handler) const
void closeOutputConsumer()
void wait(Key key) noexcept
bool Value(const T &value, M matcher)
void compose(const GenImpl< Value, InnerSource > &source) const
Parallel(Ops ops, size_t threads)
std::atomic< size_t > producers_
bool writeUnlessClosed(Input &&input)
std::atomic< size_t > consumers_
std::vector< std::thread > workers_
void cancelWait() noexcept
folly::EventCount wakeProducer_
Executor(size_t threads, const Ops *ops)
constexpr detail::Min< Identity, Greater > max
Generator(Source source, Ops ops, size_t threads)
const Self & self() const
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
ClosableMPMCQueue(size_t capacity)
Key prepareWait() noexcept
void closeInputProducer()
bool readUnlessEmpty(T &out)
Just compose(const GenImpl< Value, Source > &source) const
bool writeUnlessClosed(Args &&...args)
bool writeUnlessFull(Input &&input)
bool readUnlessClosed(T &out)
bool writeUnlessFull(Args &&...args) noexcept
Composed all(Predicate pred=Predicate())