21 #include <glog/logging.h> 31 template <
class T,
size_t Amp>
32 class MPMCPipelineStage;
96 template <
class In,
class... Stages>
98 typedef std::tuple<detail::PipelineStageInfo<Stages>...>
StageInfos;
135 std::conditional_t<kIsDebug, TicketBaseDebug, TicketBaseNDebug>;
142 template <
size_t Stage>
146 CHECK_EQ(remainingUses_, 0) <<
"All tickets must be completely used!";
153 remainingUses_(
std::
exchange(other.remainingUses_, 0)) {}
156 if (
this != &other) {
169 remainingUses_(amplification) {}
172 CHECK_GT(remainingUses_--, 0);
173 TicketBase::check_owner(owner);
174 return TicketBase::value_++;
187 template <
class... Sizes>
193 template <
class...
Args>
202 template <
class...
Args>
204 return std::get<0>(
stages_).
write(std::forward<Args>(args)...);
210 template <
size_t Stage>
212 typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
213 return Ticket<Stage>(
215 std::tuple_element<Stage, StageInfos>::type::kAmplification,
223 template <
size_t Stage>
226 typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
228 if (!std::get<Stage>(
stages_).readAndGetTicket(tval, elem)) {
231 ticket = Ticket<Stage>(
233 std::tuple_element<Stage, StageInfos>::type::kAmplification,
242 template <
size_t Stage,
class...
Args>
244 std::get<Stage + 1>(
stages_).blockingWriteWithTicket(
245 ticket.use(
this), std::forward<Args>(args)...);
252 type::value_type& elem) {
274 std::get<0>(
stages_).writeCount() * kAmplification -
ssize_t sizeGuess() const noexcept
void blockingWrite(Args &&...args)
TicketBaseDebug(TicketBaseDebug &&other) noexcept
Ticket(Ticket &&other) noexcept
static constexpr size_t kAmplification
constexpr detail::Map< Move > move
Ticket & operator=(Ticket &&other) noexcept
Ticket(MPMCPipeline *owner, size_t amplification, uint64_t value) noexcept
TicketBaseNDebug(MPMCPipeline *, uint64_t value) noexcept
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
std::tuple< detail::MPMCPipelineStageImpl< In >, detail::MPMCPipelineStageImpl< typename detail::PipelineStageInfo< Stages >::value_type >... > StageTuple
void blockingRead(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
void check_owner(MPMCPipeline *) const
static constexpr StringPiece ticket
bool readStage(Ticket< Stage > &ticket, typename std::tuple_element< Stage, StageTuple >::type::value_type &elem)
std::conditional_t< kIsDebug, TicketBaseDebug, TicketBaseNDebug > TicketBase
bool write(Args &&...args)
MPMCPipeline(Sizes...sizes)
TicketBaseDebug() noexcept
T exchange(T &obj, U &&new_value)
bool read(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
Ticket< Stage > blockingReadStage(typename std::tuple_element< Stage, StageTuple >::type::value_type &elem)
TicketBaseDebug(MPMCPipeline *owner, uint64_t value) noexcept
std::tuple< detail::PipelineStageInfo< Stages >... > StageInfos
uint64_t use(MPMCPipeline *owner)
void blockingWriteStage(Ticket< Stage > &ticket, Args &&...args)
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
void check_owner(MPMCPipeline *owner) const