proxygen
folly::MPMCPipeline< In, Stages > Class Template Reference

#include <MPMCPipelineDetail.h>

Classes

class  Ticket
 
class  TicketBaseDebug
 
class  TicketBaseNDebug
 

Public Member Functions

 MPMCPipeline ()=default
 
template<class... Sizes>
 MPMCPipeline (Sizes...sizes)
 
template<class... Args>
void blockingWrite (Args &&...args)
 
template<class... Args>
bool write (Args &&...args)
 
template<size_t Stage>
Ticket< Stage > blockingReadStage (typename std::tuple_element< Stage, StageTuple >::type::value_type &elem)
 
template<size_t Stage>
bool readStage (Ticket< Stage > &ticket, typename std::tuple_element< Stage, StageTuple >::type::value_type &elem)
 
template<size_t Stage, class... Args>
void blockingWriteStage (Ticket< Stage > &ticket, Args &&...args)
 
void blockingRead (typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
 
bool read (typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
 
ssize_t sizeGuess () const noexcept
 

Private Types

typedef std::tuple< detail::PipelineStageInfo< Stages >... > StageInfos
 
typedef std::tuple< detail::MPMCPipelineStageImpl< In >, detail::MPMCPipelineStageImpl< typename detail::PipelineStageInfo< Stages >::value_type >... > StageTuple
 
using TicketBase = std::conditional_t< kIsDebug, TicketBaseDebug, TicketBaseNDebug >
 

Private Attributes

StageTuple stages_
 

Static Private Attributes

static constexpr size_t kAmplification
 

Detailed Description

template<class In, class... Stages>
class folly::MPMCPipeline< In, Stages >

Multi-Producer, Multi-Consumer pipeline.

A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).

At each stage, you may dequeue the results from the previous stage (possibly from multiple threads) and enqueue results to the next stage. Regardless of the order of completion, data is delivered to the next stage in the original order. Each input is matched with a "ticket" which must be produced when enqueueing to the next stage.

A given stage must produce exactly K ("amplification factor", default K=1) results for every input. This is enforced by requiring that each ticket is used exactly K times.

Usage:

// arguments are queue sizes MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);

pipeline.blockingWrite(42);

{ int val; auto ticket = pipeline.blockingReadStage<0>(val); pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val)); }

{ std::string val; auto ticket = pipeline.blockingReadStage<1>(val); int ival = 0; try { ival = folly::to<int>(val); } catch (...) { // We must produce exactly 1 output even on exception! } pipeline.blockingWriteStage<1>(ticket, ival); }

int result; pipeline.blockingRead(result); // result == 42

To specify amplification factors greater than 1, use MPMCPipelineStage<T, amplification> instead of T in the declaration:

MPMCPipeline<int, MPMCPipelineStage<std::string, 2>, MPMCPipelineStage<int, 4>>

declares a two-stage pipeline: the first stage produces 2 strings for each input int, the second stage produces 4 ints for each input string, so, overall, the pipeline produces 2*4 = 8 ints for each input int.

Implementation details: we use N+1 MPMCQueue objects; each intermediate queue connects two adjacent stages. The MPMCQueue implementation is abused; instead of using it as a queue, we insert in the output queue at the position determined by the input queue's popTicket_. We guarantee that all slots are filled (and therefore the queue doesn't freeze) because we require that each step produces exactly K outputs for every input.

Definition at line 24 of file MPMCPipelineDetail.h.

Member Typedef Documentation

template<class In, class... Stages>
typedef std::tuple<detail::PipelineStageInfo<Stages>...> folly::MPMCPipeline< In, Stages >::StageInfos
private

Definition at line 98 of file MPMCPipeline.h.

template<class In, class... Stages>
typedef std::tuple< detail::MPMCPipelineStageImpl<In>, detail::MPMCPipelineStageImpl< typename detail::PipelineStageInfo<Stages>::value_type>...> folly::MPMCPipeline< In, Stages >::StageTuple
private

Definition at line 103 of file MPMCPipeline.h.

template<class In, class... Stages>
using folly::MPMCPipeline< In, Stages >::TicketBase = std::conditional_t<kIsDebug, TicketBaseDebug, TicketBaseNDebug>
private

Definition at line 135 of file MPMCPipeline.h.

Constructor & Destructor Documentation

template<class In, class... Stages>
folly::MPMCPipeline< In, Stages >::MPMCPipeline ( )
default

Default-construct pipeline. Useful to move-assign later, just like MPMCQueue, see MPMCQueue.h for more details.

Referenced by folly::MPMCPipeline< In, Stages >::Ticket< Stage >::use().

template<class In, class... Stages>
template<class... Sizes>
folly::MPMCPipeline< In, Stages >::MPMCPipeline ( Sizes...  sizes)
inlineexplicit

Construct a pipeline with N+1 queue sizes.

Definition at line 188 of file MPMCPipeline.h.

188 : stages_(sizes...) {}
const int sizes[]

Member Function Documentation

template<class In, class... Stages>
void folly::MPMCPipeline< In, Stages >::blockingRead ( typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &  elem)
inline

Pop an element from (the final stage of) the pipeline. Blocking.

Definition at line 251 of file MPMCPipeline.h.

Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::blockingRead(), folly::MPMCPipeline< Input, Output >::blockingReadStage(), and folly::test::TEST().

252  {
253  std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
254  }
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
template<class In, class... Stages>
template<size_t Stage>
Ticket<Stage> folly::MPMCPipeline< In, Stages >::blockingReadStage ( typename std::tuple_element< Stage, StageTuple >::type::value_type &  elem)
inline

Read an element for stage Stage and obtain a ticket. Blocking.

Definition at line 211 of file MPMCPipeline.h.

Referenced by folly::test::TEST().

212  {
213  return Ticket<Stage>(
214  this,
215  std::tuple_element<Stage, StageInfos>::type::kAmplification,
216  std::get<Stage>(stages_).blockingRead(elem));
217  }
void blockingRead(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:251
template<class In, class... Stages>
template<class... Args>
void folly::MPMCPipeline< In, Stages >::blockingWrite ( Args &&...  args)
inline

Push an element into (the first stage of) the pipeline. Blocking.

Definition at line 194 of file MPMCPipeline.h.

Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::blockingWrite(), folly::MPMCPipeline< Input, Output >::blockingWrite(), and folly::test::TEST().

194  {
195  std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
196  }
void blockingWrite(Args &&...args)
Definition: MPMCPipeline.h:194
template<class In, class... Stages>
template<size_t Stage, class... Args>
void folly::MPMCPipeline< In, Stages >::blockingWriteStage ( Ticket< Stage > &  ticket,
Args &&...  args 
)
inline

Complete an element in stage Stage (pushing it for stage Stage+1). Blocking.

Definition at line 243 of file MPMCPipeline.h.

Referenced by folly::test::TEST().

243  {
244  std::get<Stage + 1>(stages_).blockingWriteWithTicket(
245  ticket.use(this), std::forward<Args>(args)...);
246  }
static constexpr StringPiece ticket
template<class In, class... Stages>
bool folly::MPMCPipeline< In, Stages >::read ( typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &  elem)
inline

Try to pop an element from (the final stage of) the pipeline. Non-blocking.

Definition at line 260 of file MPMCPipeline.h.

Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::read(), and folly::test::TEST().

261  {
262  return std::get<sizeof...(Stages)>(stages_).read(elem);
263  }
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
template<class In, class... Stages>
template<size_t Stage>
bool folly::MPMCPipeline< In, Stages >::readStage ( Ticket< Stage > &  ticket,
typename std::tuple_element< Stage, StageTuple >::type::value_type &  elem 
)
inline

Try to read an element for stage Stage and obtain a ticket. Non-blocking.

Definition at line 224 of file MPMCPipeline.h.

226  {
227  uint64_t tval;
228  if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
229  return false;
230  }
231  ticket = Ticket<Stage>(
232  this,
233  std::tuple_element<Stage, StageInfos>::type::kAmplification,
234  tval);
235  return true;
236  }
static constexpr StringPiece ticket
template<class In, class... Stages>
ssize_t folly::MPMCPipeline< In, Stages >::sizeGuess ( ) const
inlinenoexcept

Estimate queue size, measured as values from the last stage. (so if the pipeline has an amplification factor > 1, pushing an element into the first stage will cause sizeGuess() to be == amplification factor) Elements "in flight" (currently processed as part of a stage, so not in any queue) are also counted.

Definition at line 272 of file MPMCPipeline.h.

Referenced by folly::test::TEST(), and folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::~ExecutionPipeline().

272  {
273  return ssize_t(
274  std::get<0>(stages_).writeCount() * kAmplification -
275  std::get<sizeof...(Stages)>(stages_).readCount());
276  }
static constexpr size_t kAmplification
Definition: MPMCPipeline.h:104
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
template<class In, class... Stages>
template<class... Args>
bool folly::MPMCPipeline< In, Stages >::write ( Args &&...  args)
inline

Try to push an element into (the first stage of) the pipeline. Non-blocking.

Definition at line 203 of file MPMCPipeline.h.

Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::write(), and folly::MPMCPipeline< Input, Output >::write().

203  {
204  return std::get<0>(stages_).write(std::forward<Args>(args)...);
205  }
bool write(Args &&...args)
Definition: MPMCPipeline.h:203

Member Data Documentation

template<class In, class... Stages>
constexpr size_t folly::MPMCPipeline< In, Stages >::kAmplification
staticprivate
Initial value:

Definition at line 104 of file MPMCPipeline.h.


The documentation for this class was generated from the following files: