proxygen
folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline Class Reference

Public Member Functions

 ExecutionPipeline (const Predicate &pred, size_t nThreads)
 
 ~ExecutionPipeline ()
 
void stop ()
 
bool write (Value &&value)
 
void blockingWrite (Value &&value)
 
bool read (Output &out)
 
void blockingRead (Output &out)
 

Private Member Functions

void predApplier ()
 

Private Attributes

std::vector< std::thread > workers_
 
std::atomic< bool > done_ {false}
 
const Predicate & pred_
 
MPMCPipeline< Input, Output > pipeline_
 
EventCount wake_
 

Detailed Description

template<class Predicate>
template<class Value, class Source, class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
class folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline

Definition at line 69 of file ParallelMap-inl.h.

Constructor & Destructor Documentation

template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::ExecutionPipeline ( const Predicate &  pred,
size_t  nThreads 
)
inline

Definition at line 77 of file ParallelMap-inl.h.

References i, and folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::predApplier().

78  : pred_(pred), pipeline_(nThreads, nThreads) {
79  workers_.reserve(nThreads);
80  for (size_t i = 0; i < nThreads; i++) {
81  workers_.push_back(std::thread([this] { this->predApplier(); }));
82  }
83  }
template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::~ExecutionPipeline ( )
inline

Definition at line 85 of file ParallelMap-inl.h.

References folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::done_, and folly::MPMCPipeline< In, Stages >::sizeGuess().

85  {
86  assert(pipeline_.sizeGuess() == 0);
87  assert(done_.load());
88  for (auto& w : workers_) {
89  w.join();
90  }
91  }
ssize_t sizeGuess() const noexcept
Definition: MPMCPipeline.h:272

Member Function Documentation

template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
void folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::blockingRead ( Output &  out)
inline
template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
void folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::blockingWrite ( Value &&  value)
inline
template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
void folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::predApplier ( )
inlineprivate

Definition at line 121 of file ParallelMap-inl.h.

References folly::EventCount::cancelWait(), folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::done_, folly::gen::move, folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::pred_, folly::EventCount::prepareWait(), ticket, and folly::EventCount::wait().

Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::ExecutionPipeline().

121  {
122  // Each thread takes a value from the pipeline_, runs the
123  // predicate and enqueues the result. The pipeline preserves
124  // ordering. NOTE: don't use blockingReadStage<0> to read from
125  // the pipeline_ as there may not be any: end-of-data is signaled
126  // separately using done_/wake_.
127  Input in;
128  for (;;) {
129  auto key = wake_.prepareWait();
130 
131  typename MPMCPipeline<Input, Output>::template Ticket<0> ticket;
132  if (pipeline_.template readStage<0>(ticket, in)) {
133  wake_.cancelWait();
134  Output out = pred_(std::move(in));
135  pipeline_.template blockingWriteStage<0>(ticket, std::move(out));
136  continue;
137  }
138 
139  if (done_.load(std::memory_order_acquire)) {
140  wake_.cancelWait();
141  break;
142  }
143 
144  // Not done_, but no items in the queue.
145  wake_.wait(key);
146  }
147  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
static constexpr StringPiece ticket
void wait(Key key) noexcept
Definition: EventCount.h:163
void cancelWait() noexcept
Definition: EventCount.h:155
Key prepareWait() noexcept
Definition: EventCount.h:150
template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
bool folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::read ( Output &  out)
inline

Definition at line 112 of file ParallelMap-inl.h.

References folly::MPMCPipeline< In, Stages >::read().

Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::apply(), and folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::foreach().

112  {
113  return pipeline_.read(out);
114  }
bool read(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:260
template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
void folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::stop ( )
inline
template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
bool folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::write ( Value &&  value)
inline

Definition at line 99 of file ParallelMap-inl.h.

References folly::EventCount::notify(), folly::value(), and folly::MPMCPipeline< In, Stages >::write().

Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::apply(), and folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::foreach().

99  {
100  bool wrote = pipeline_.write(std::forward<Value>(value));
101  if (wrote) {
102  wake_.notify();
103  }
104  return wrote;
105  }
bool write(Args &&...args)
Definition: MPMCPipeline.h:203
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void notify() noexcept
Definition: EventCount.h:134

Member Data Documentation

template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
std::atomic<bool> folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::done_ {false}
private
template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
MPMCPipeline<Input, Output> folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::pipeline_
private

Definition at line 73 of file ParallelMap-inl.h.

template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
const Predicate& folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::pred_
private
template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
EventCount folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::wake_
private

Definition at line 74 of file ParallelMap-inl.h.

template<class Predicate >
template<class Value , class Source , class Input = typename std::decay<Value>::type, class Output = typename std::decay<invoke_result_t<Predicate, Value>>::type>
std::vector<std::thread> folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::workers_
private

Definition at line 70 of file ParallelMap-inl.h.


The documentation for this class was generated from the following file: