proxygen
MPMCPipeline.h
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <utility>
20 
21 #include <glog/logging.h>
22 
23 #include <folly/Portability.h>
25 
26 namespace folly {
27 
31 template <class T, size_t Amp>
32 class MPMCPipelineStage;
33 
96 template <class In, class... Stages>
97 class MPMCPipeline {
98  typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos;
99  typedef std::tuple<
104  static constexpr size_t kAmplification =
106 
108  public:
109  TicketBaseDebug() noexcept : owner_(nullptr), value_(0xdeadbeeffaceb00c) {}
111  : owner_(std::exchange(other.owner_, nullptr)),
112  value_(std::exchange(other.value_, 0xdeadbeeffaceb00c)) {}
114  : owner_(owner), value_(value) {}
115  void check_owner(MPMCPipeline* owner) const {
116  CHECK(owner == owner_);
117  }
118 
121  };
122 
124  public:
125  TicketBaseNDebug() = default;
126  TicketBaseNDebug(TicketBaseNDebug&&) = default;
128  : value_(value) {}
129  void check_owner(MPMCPipeline*) const {}
130 
132  };
133 
134  using TicketBase =
135  std::conditional_t<kIsDebug, TicketBaseDebug, TicketBaseNDebug>;
136 
137  public:
142  template <size_t Stage>
143  class Ticket : TicketBase {
144  public:
146  CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!";
147  }
148 
149  Ticket() noexcept : remainingUses_(0) {}
150 
152  : TicketBase(static_cast<TicketBase&&>(other)),
153  remainingUses_(std::exchange(other.remainingUses_, 0)) {}
154 
156  if (this != &other) {
157  this->~Ticket();
158  new (this) Ticket(std::move(other));
159  }
160  return *this;
161  }
162 
163  private:
164  friend class MPMCPipeline;
166 
167  Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept
168  : TicketBase(owner, value * amplification),
169  remainingUses_(amplification) {}
170 
172  CHECK_GT(remainingUses_--, 0);
173  TicketBase::check_owner(owner);
174  return TicketBase::value_++;
175  }
176  };
177 
182  MPMCPipeline() = default;
183 
187  template <class... Sizes>
188  explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) {}
189 
193  template <class... Args>
194  void blockingWrite(Args&&... args) {
195  std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
196  }
197 
202  template <class... Args>
203  bool write(Args&&... args) {
204  return std::get<0>(stages_).write(std::forward<Args>(args)...);
205  }
206 
210  template <size_t Stage>
211  Ticket<Stage> blockingReadStage(
212  typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
213  return Ticket<Stage>(
214  this,
215  std::tuple_element<Stage, StageInfos>::type::kAmplification,
216  std::get<Stage>(stages_).blockingRead(elem));
217  }
218 
223  template <size_t Stage>
224  bool readStage(
225  Ticket<Stage>& ticket,
226  typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
227  uint64_t tval;
228  if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
229  return false;
230  }
231  ticket = Ticket<Stage>(
232  this,
233  std::tuple_element<Stage, StageInfos>::type::kAmplification,
234  tval);
235  return true;
236  }
237 
242  template <size_t Stage, class... Args>
243  void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) {
244  std::get<Stage + 1>(stages_).blockingWriteWithTicket(
245  ticket.use(this), std::forward<Args>(args)...);
246  }
247 
251  void blockingRead(typename std::tuple_element<sizeof...(Stages), StageTuple>::
252  type::value_type& elem) {
253  std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
254  }
255 
260  bool read(typename std::tuple_element<sizeof...(Stages), StageTuple>::type::
261  value_type& elem) {
262  return std::get<sizeof...(Stages)>(stages_).read(elem);
263  }
264 
273  return ssize_t(
274  std::get<0>(stages_).writeCount() * kAmplification -
275  std::get<sizeof...(Stages)>(stages_).readCount());
276  }
277 
278  private:
280 };
281 
282 } // namespace folly
ssize_t sizeGuess() const noexcept
Definition: MPMCPipeline.h:272
void blockingWrite(Args &&...args)
Definition: MPMCPipeline.h:194
TicketBaseDebug(TicketBaseDebug &&other) noexcept
Definition: MPMCPipeline.h:110
MPMCPipeline()=default
Ticket(Ticket &&other) noexcept
Definition: MPMCPipeline.h:151
PskType type
static constexpr size_t kAmplification
Definition: MPMCPipeline.h:104
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
Ticket & operator=(Ticket &&other) noexcept
Definition: MPMCPipeline.h:155
STL namespace.
Ticket(MPMCPipeline *owner, size_t amplification, uint64_t value) noexcept
Definition: MPMCPipeline.h:167
TicketBaseNDebug(MPMCPipeline *, uint64_t value) noexcept
Definition: MPMCPipeline.h:127
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
std::tuple< detail::MPMCPipelineStageImpl< In >, detail::MPMCPipelineStageImpl< typename detail::PipelineStageInfo< Stages >::value_type >... > StageTuple
Definition: MPMCPipeline.h:103
void blockingRead(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:251
void check_owner(MPMCPipeline *) const
Definition: MPMCPipeline.h:129
static constexpr StringPiece ticket
const int sizes[]
bool readStage(Ticket< Stage > &ticket, typename std::tuple_element< Stage, StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:224
std::conditional_t< kIsDebug, TicketBaseDebug, TicketBaseNDebug > TicketBase
Definition: MPMCPipeline.h:135
bool write(Args &&...args)
Definition: MPMCPipeline.h:203
MPMCPipeline(Sizes...sizes)
Definition: MPMCPipeline.h:188
T exchange(T &obj, U &&new_value)
Definition: Utility.h:120
bool read(typename std::tuple_element< sizeof...(Stages), StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:260
const
Definition: upload.py:398
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
Ticket< Stage > blockingReadStage(typename std::tuple_element< Stage, StageTuple >::type::value_type &elem)
Definition: MPMCPipeline.h:211
TicketBaseDebug(MPMCPipeline *owner, uint64_t value) noexcept
Definition: MPMCPipeline.h:113
std::tuple< detail::PipelineStageInfo< Stages >... > StageInfos
Definition: MPMCPipeline.h:98
uint64_t use(MPMCPipeline *owner)
Definition: MPMCPipeline.h:171
void blockingWriteStage(Ticket< Stage > &ticket, Args &&...args)
Definition: MPMCPipeline.h:243
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
void check_owner(MPMCPipeline *owner) const
Definition: MPMCPipeline.h:115