proxygen
|
#include <MPMCPipelineDetail.h>
Classes | |
class | Ticket |
class | TicketBaseDebug |
class | TicketBaseNDebug |
Public Member Functions | |
MPMCPipeline ()=default | |
template<class... Sizes> | |
MPMCPipeline (Sizes...sizes) | |
template<class... Args> | |
void | blockingWrite (Args &&...args) |
template<class... Args> | |
bool | write (Args &&...args) |
template<size_t Stage> | |
Ticket< Stage > | blockingReadStage (typename std::tuple_element< Stage, StageTuple >::type::value_type &elem) |
template<size_t Stage> | |
bool | readStage (Ticket< Stage > &ticket, typename std::tuple_element< Stage, StageTuple >::type::value_type &elem) |
template<size_t Stage, class... Args> | |
void | blockingWriteStage (Ticket< Stage > &ticket, Args &&...args) |
void | blockingRead (typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem) |
bool | read (typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem) |
ssize_t | sizeGuess () const noexcept |
Private Types | |
typedef std::tuple< detail::PipelineStageInfo< Stages >... > | StageInfos |
typedef std::tuple< detail::MPMCPipelineStageImpl< In >, detail::MPMCPipelineStageImpl< typename detail::PipelineStageInfo< Stages >::value_type >... > | StageTuple |
using | TicketBase = std::conditional_t< kIsDebug, TicketBaseDebug, TicketBaseNDebug > |
Private Attributes | |
StageTuple | stages_ |
Static Private Attributes | |
static constexpr size_t | kAmplification |
Multi-Producer, Multi-Consumer pipeline.
A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).
At each stage, you may dequeue the results from the previous stage (possibly from multiple threads) and enqueue results to the next stage. Regardless of the order of completion, data is delivered to the next stage in the original order. Each input is matched with a "ticket" which must be produced when enqueueing to the next stage.
A given stage must produce exactly K ("amplification factor", default K=1) results for every input. This is enforced by requiring that each ticket is used exactly K times.
Usage:
// arguments are queue sizes MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);
pipeline.blockingWrite(42);
{ int val; auto ticket = pipeline.blockingReadStage<0>(val); pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val)); }
{ std::string val; auto ticket = pipeline.blockingReadStage<1>(val); int ival = 0; try { ival = folly::to<int>(val); } catch (...) { // We must produce exactly 1 output even on exception! } pipeline.blockingWriteStage<1>(ticket, ival); }
int result; pipeline.blockingRead(result); // result == 42
To specify amplification factors greater than 1, use MPMCPipelineStage<T, amplification> instead of T in the declaration:
MPMCPipeline<int, MPMCPipelineStage<std::string, 2>, MPMCPipelineStage<int, 4>>
declares a two-stage pipeline: the first stage produces 2 strings for each input int, the second stage produces 4 ints for each input string, so, overall, the pipeline produces 2*4 = 8 ints for each input int.
Implementation details: we use N+1 MPMCQueue objects; each intermediate queue connects two adjacent stages. The MPMCQueue implementation is abused; instead of using it as a queue, we insert in the output queue at the position determined by the input queue's popTicket_. We guarantee that all slots are filled (and therefore the queue doesn't freeze) because we require that each step produces exactly K outputs for every input.
Definition at line 24 of file MPMCPipelineDetail.h.
|
private |
Definition at line 98 of file MPMCPipeline.h.
|
private |
Definition at line 103 of file MPMCPipeline.h.
|
private |
Definition at line 135 of file MPMCPipeline.h.
|
default |
Default-construct pipeline. Useful to move-assign later, just like MPMCQueue, see MPMCQueue.h for more details.
Referenced by folly::MPMCPipeline< In, Stages >::Ticket< Stage >::use().
|
inlineexplicit |
|
inline |
Pop an element from (the final stage of) the pipeline. Blocking.
Definition at line 251 of file MPMCPipeline.h.
Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::blockingRead(), folly::MPMCPipeline< Input, Output >::blockingReadStage(), and folly::test::TEST().
|
inline |
Read an element for stage Stage and obtain a ticket. Blocking.
Definition at line 211 of file MPMCPipeline.h.
Referenced by folly::test::TEST().
|
inline |
Push an element into (the first stage of) the pipeline. Blocking.
Definition at line 194 of file MPMCPipeline.h.
Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::blockingWrite(), folly::MPMCPipeline< Input, Output >::blockingWrite(), and folly::test::TEST().
|
inline |
Complete an element in stage Stage (pushing it for stage Stage+1). Blocking.
Definition at line 243 of file MPMCPipeline.h.
Referenced by folly::test::TEST().
|
inline |
Try to pop an element from (the final stage of) the pipeline. Non-blocking.
Definition at line 260 of file MPMCPipeline.h.
Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::read(), and folly::test::TEST().
|
inline |
Try to read an element for stage Stage and obtain a ticket. Non-blocking.
Definition at line 224 of file MPMCPipeline.h.
|
inlinenoexcept |
Estimate queue size, measured as values from the last stage. (so if the pipeline has an amplification factor > 1, pushing an element into the first stage will cause sizeGuess() to be == amplification factor) Elements "in flight" (currently processed as part of a stage, so not in any queue) are also counted.
Definition at line 272 of file MPMCPipeline.h.
Referenced by folly::test::TEST(), and folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::~ExecutionPipeline().
|
inline |
Try to push an element into (the first stage of) the pipeline. Non-blocking.
Definition at line 203 of file MPMCPipeline.h.
Referenced by folly::gen::detail::PMap< Predicate >::Generator< Value, Source, Input, Output >::ExecutionPipeline::write(), and folly::MPMCPipeline< Input, Output >::write().
|
staticprivate |
Definition at line 104 of file MPMCPipeline.h.
|
private |
Definition at line 279 of file MPMCPipeline.h.
Referenced by folly::MPMCPipeline< Input, Output >::blockingRead(), folly::MPMCPipeline< Input, Output >::blockingReadStage(), folly::MPMCPipeline< Input, Output >::blockingWrite(), folly::MPMCPipeline< Input, Output >::blockingWriteStage(), folly::MPMCPipeline< Input, Output >::read(), folly::MPMCPipeline< Input, Output >::readStage(), folly::MPMCPipeline< Input, Output >::sizeGuess(), and folly::MPMCPipeline< Input, Output >::write().