17 #ifndef FOLLY_GEN_PARALLELMAP_H_ 18 #error This file may only be included from folly/gen/ParallelMap.h 24 #include <type_traits> 46 template <
class Predicate>
54 PMap(Predicate pred,
size_t nThreads)
55 : pred_(
std::
move(pred)), nThreads_(nThreads) {}
62 typename std::decay<invoke_result_t<Predicate, Value>>
::type>
64 :
public GenImpl<Output, Generator<Value, Source, Input, Output>> {
78 : pred_(pred), pipeline_(nThreads, nThreads) {
79 workers_.reserve(nThreads);
80 for (
size_t i = 0;
i < nThreads;
i++) {
81 workers_.push_back(std::thread([
this] { this->
predApplier(); }));
88 for (
auto& w : workers_) {
95 done_.store(
true, std::memory_order_release);
100 bool wrote = pipeline_.
write(std::forward<Value>(
value));
113 return pipeline_.
read(out);
132 if (pipeline_.template readStage<0>(ticket, in)) {
139 if (
done_.load(std::memory_order_acquire)) {
151 Generator(Source source,
const Predicate& pred,
size_t nThreads)
154 nThreads_(nThreads ? nThreads : sysconf(_SC_NPROCESSORS_ONLN)) {}
156 template <
class Body>
157 void foreach(Body&& body)
const {
163 if (pipeline.
write(std::forward<Value>(value))) {
172 while (pipeline.
read(out)) {
185 while (read < wrote) {
193 template <
class Handler>
201 if (pipeline.
write(std::forward<Value>(value))) {
210 while (pipeline.
read(out)) {
227 while (read < wrote) {
241 template <
class Source,
class Value,
class Gen = Generator<Value, Source>>
243 return Gen(
std::move(source.self()), pred_, nThreads_);
246 template <
class Source,
class Value,
class Gen = Generator<Value, Source>>
ssize_t sizeGuess() const noexcept
void blockingWrite(Args &&...args)
void notifyAll() noexcept
void blockingRead(Output &out)
constexpr detail::Map< Move > move
—— Concurrent Priority Queue Implementation ——
static constexpr bool infinite
void blockingRead(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
bool apply(Handler &&handler) const
static constexpr StringPiece ticket
void blockingWrite(Value &&value)
void handler(int, siginfo_t *, void *)
std::vector< std::thread > workers_
void wait(Key key) noexcept
Gen compose(const GenImpl< Value, Source > &source) const
bool Value(const T &value, M matcher)
Gen compose(GenImpl< Value, Source > &&source) const
MPMCPipeline< Input, Output > pipeline_
ExecutionPipeline(const Predicate &pred, size_t nThreads)
bool write(Value &&value)
bool write(Args &&...args)
bool read(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
void cancelWait() noexcept
std::atomic< bool > done_
PMap(Predicate pred, size_t nThreads)
Generator(Source source, const Predicate &pred, size_t nThreads)
const Self & self() const
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
Key prepareWait() noexcept