proxygen
folly::MPMCQueue< T, Atom, true > Class Template Reference

#include <MPMCQueue.h>

Inheritance diagram for folly::MPMCQueue< T, Atom, true >:
folly::detail::MPMCQueueBase< MPMCQueue< T, Atom, true > >

Classes

struct  ClosedArray
 

Public Member Functions

 MPMCQueue (size_t queueCapacity)
 
 MPMCQueue (size_t queueCapacity, size_t minCapacity, size_t expansionMultiplier)
 
 MPMCQueue () noexcept
 
 MPMCQueue (MPMCQueue< T, Atom, true > &&rhs) noexcept
 
MPMCQueue< T, Atom, true > const & operator= (MPMCQueue< T, Atom, true > &&rhs)
 
 ~MPMCQueue ()
 
size_t allocatedCapacity () const noexcept
 
template<typename... Args>
void blockingWrite (Args &&...args) noexcept
 
void blockingReadWithTicket (uint64_t &ticket, T &elem) noexcept
 

Private Types

enum  { kSeqlockBits = 6, kDefaultMinDynamicCapacity = 10, kDefaultExpansionMultiplier = 10 }
 
using Slot = detail::SingleElementQueue< T, Atom >
 

Private Member Functions

void initQueue (const size_t cap, const size_t mult)
 
bool tryObtainReadyPushTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
 
bool tryObtainPromisedPushTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
 
bool tryObtainReadyPopTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
 
bool tryObtainPromisedPopTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
 
template<typename... Args>
void enqueueWithTicket (const uint64_t ticket, Args &&...args) noexcept
 Enqueues an element with a specific ticket number. More...
 
uint64_t getOffset (const uint64_t state) const noexcept
 
int getNumClosed (const uint64_t state) const noexcept
 
bool tryExpand (const uint64_t state, const size_t cap) noexcept
 
bool trySeqlockReadSection (uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
 Seqlock read-only section. More...
 
bool maybeUpdateFromClosed (const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
 

Private Attributes

size_t dmult_
 
ClosedArray * closed_
 

Friends

class detail::MPMCQueueBase< MPMCQueue< T, Atom, true > >
 

Detailed Description

template<typename T, template< typename > class Atom>
class folly::MPMCQueue< T, Atom, true >

The dynamic version of MPMCQueue allows dynamic expansion of queue capacity, such that a queue may start with a smaller capacity than specified and expand only if needed. Users may optionally specify the initial capacity and the expansion multiplier.

The design uses a seqlock to enforce mutual exclusion among expansion attempts. Regular operations read up-to-date queue information (slots array, capacity, stride) inside read-only seqlock sections, which are unimpeded when no expansion is in progress.

An expansion computes a new capacity, allocates a new slots array, and updates stride. No information needs to be copied from the current slots array to the new one. When this happens, new slots will not have sequence numbers that match ticket numbers. The expansion needs to compute a ticket offset such that operations that use new arrays can adjust the calculations of slot indexes and sequence numbers that take into account that the new slots start with sequence numbers of zero. The current ticket offset is packed with the seqlock in an atomic 64-bit integer. The initial offset is zero.

Lagging write and read operations with tickets lower than the ticket offset of the current slots array (i.e., the minimum ticket number that can be served by the current array) must use earlier closed arrays instead of the current one. Information about closed slots arrays (array address, capacity, stride, and offset) is maintained in a logarithmic-sized structure. Each entry in that structure never needs to be changed once set. The number of closed arrays is half the value of the seqlock (when unlocked).

The acquisition of the seqlock to perform an expansion does not prevent the issuing of new push and pop tickets concurrently. The expansion must set the new ticket offset to a value that couldn't have been issued to an operation that has already gone through a seqlock read-only section (and hence obtained information for older closed arrays).

Note that the total queue capacity can temporarily exceed the specified capacity when there are lagging consumers that haven't yet consumed all the elements in closed arrays. Users should not rely on the capacity of dynamic queues for synchronization, e.g., they should not expect that a thread will definitely block on a call to blockingWrite() when the queue size is known to be equal to its capacity.

Note that some writeIfNotFull() and tryWriteUntil() operations may fail even if the size of the queue is less than its maximum capacity and despite the success of expansion, if the operation happens to acquire a ticket that belongs to a closed array. This is a transient condition. Typically, one or two ticket values may be subject to such condition per expansion.

The dynamic version is a partial specialization of MPMCQueue with Dynamic == true

Definition at line 176 of file MPMCQueue.h.

Member Typedef Documentation

template<typename T , template< typename > class Atom>
using folly::MPMCQueue< T, Atom, true >::Slot = detail::SingleElementQueue<T, Atom>
private

Definition at line 179 of file MPMCQueue.h.

Member Enumeration Documentation

template<typename T , template< typename > class Atom>
anonymous enum
private
Enumerator
kSeqlockBits 
kDefaultMinDynamicCapacity 
kDefaultExpansionMultiplier 

Definition at line 337 of file MPMCQueue.h.

Constructor & Destructor Documentation

template<typename T , template< typename > class Atom>
folly::MPMCQueue< T, Atom, true >::MPMCQueue ( size_t  queueCapacity)
inlineexplicit

Definition at line 189 of file MPMCQueue.h.

190  : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) {
191  size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
193  }
void initQueue(const size_t cap, const size_t mult)
Definition: MPMCQueue.h:348
template<typename T , template< typename > class Atom>
folly::MPMCQueue< T, Atom, true >::MPMCQueue ( size_t  queueCapacity,
size_t  minCapacity,
size_t  expansionMultiplier 
)
inlineexplicit

Definition at line 195 of file MPMCQueue.h.

199  : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) {
200  minCapacity = std::max<size_t>(1, minCapacity);
201  size_t cap = std::min<size_t>(minCapacity, queueCapacity);
202  expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
203  initQueue(cap, expansionMultiplier);
204  }
void initQueue(const size_t cap, const size_t mult)
Definition: MPMCQueue.h:348
template<typename T , template< typename > class Atom>
folly::MPMCQueue< T, Atom, true >::MPMCQueue ( )
inlinenoexcept

Definition at line 206 of file MPMCQueue.h.

206  {
207  dmult_ = 0;
208  closed_ = nullptr;
209  }
template<typename T , template< typename > class Atom>
folly::MPMCQueue< T, Atom, true >::MPMCQueue ( MPMCQueue< T, Atom, true > &&  rhs)
inlinenoexcept

Definition at line 211 of file MPMCQueue.h.

References folly::detail::rhs.

211  {
212  this->capacity_ = rhs.capacity_;
213  new (&this->dslots_)
214  Atom<Slot*>(rhs.dslots_.load(std::memory_order_relaxed));
215  new (&this->dstride_)
216  Atom<int>(rhs.dstride_.load(std::memory_order_relaxed));
217  this->dstate_.store(
218  rhs.dstate_.load(std::memory_order_relaxed), std::memory_order_relaxed);
219  this->dcapacity_.store(
220  rhs.dcapacity_.load(std::memory_order_relaxed),
221  std::memory_order_relaxed);
222  this->pushTicket_.store(
223  rhs.pushTicket_.load(std::memory_order_relaxed),
224  std::memory_order_relaxed);
225  this->popTicket_.store(
226  rhs.popTicket_.load(std::memory_order_relaxed),
227  std::memory_order_relaxed);
228  this->pushSpinCutoff_.store(
229  rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
230  std::memory_order_relaxed);
231  this->popSpinCutoff_.store(
232  rhs.popSpinCutoff_.load(std::memory_order_relaxed),
233  std::memory_order_relaxed);
234  dmult_ = rhs.dmult_;
235  closed_ = rhs.closed_;
236 
237  rhs.capacity_ = 0;
238  rhs.dslots_.store(nullptr, std::memory_order_relaxed);
239  rhs.dstride_.store(0, std::memory_order_relaxed);
240  rhs.dstate_.store(0, std::memory_order_relaxed);
241  rhs.dcapacity_.store(0, std::memory_order_relaxed);
242  rhs.pushTicket_.store(0, std::memory_order_relaxed);
243  rhs.popTicket_.store(0, std::memory_order_relaxed);
244  rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
245  rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
246  rhs.dmult_ = 0;
247  rhs.closed_ = nullptr;
248  }
FOLLY_PUSH_WARNING RHS rhs
Definition: Traits.h:649
template<typename T , template< typename > class Atom>
folly::MPMCQueue< T, Atom, true >::~MPMCQueue ( )
inline

Definition at line 258 of file MPMCQueue.h.

References i.

258  {
259  if (closed_ != nullptr) {
260  for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
261  delete[] closed_[i].slots_;
262  }
263  delete[] closed_;
264  }
265  using AtomInt = Atom<int>;
266  this->dstride_.~AtomInt();
267  using AtomSlot = Atom<Slot*>;
268  // Sort of a hack to get ~MPMCQueueBase to free dslots_
269  auto slots = this->dslots_.load();
270  this->dslots_.~AtomSlot();
271  this->slots_ = slots;
272  }
int getNumClosed(const uint64_t state) const noexcept
Definition: MPMCQueue.h:533

Member Function Documentation

template<typename T , template< typename > class Atom>
size_t folly::MPMCQueue< T, Atom, true >::allocatedCapacity ( ) const
inlinenoexcept

Definition at line 274 of file MPMCQueue.h.

References testing::Args().

274  {
275  return this->dcapacity_.load(std::memory_order_relaxed);
276  }
template<typename T , template< typename > class Atom>
void folly::MPMCQueue< T, Atom, true >::blockingReadWithTicket ( uint64_t ticket,
T elem 
)
inlinenoexcept

Definition at line 320 of file MPMCQueue.h.

References folly::asm_volatile_pause(), folly::gen::stride(), ticket, and uint64_t.

320  {
321  ticket = this->popTicket_++;
322  Slot* slots;
323  size_t cap;
324  int stride;
325  uint64_t state;
326  uint64_t offset;
327  while (!trySeqlockReadSection(state, slots, cap, stride)) {
329  }
330  // If there was an expansion after the corresponding push ticket
331  // was issued, adjust accordingly
332  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
333  this->dequeueWithTicketBase(ticket - offset, slots, cap, stride, elem);
334  }
static constexpr StringPiece ticket
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:179
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
Definition: MPMCQueue.h:586
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:608
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272
void asm_volatile_pause()
Definition: Asm.h:37
template<typename T , template< typename > class Atom>
template<typename... Args>
void folly::MPMCQueue< T, Atom, true >::blockingWrite ( Args &&...  args)
inlinenoexcept

Definition at line 279 of file MPMCQueue.h.

References folly::asm_volatile_pause(), folly::gen::stride(), ticket, and uint64_t.

279  {
280  uint64_t ticket = this->pushTicket_++;
281  Slot* slots;
282  size_t cap;
283  int stride;
284  uint64_t state;
285  uint64_t offset;
286  do {
287  if (!trySeqlockReadSection(state, slots, cap, stride)) {
289  continue;
290  }
291  if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
292  // There was an expansion after this ticket was issued.
293  break;
294  }
295  if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
296  this->turn(ticket - offset, cap))) {
297  // A slot is ready. No need to expand.
298  break;
299  } else if (
300  this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
301  // May block, but a pop is in progress. No need to expand.
302  // Get seqlock read section info again in case an expansion
303  // occurred with an equal or higher ticket.
304  continue;
305  } else {
306  // May block. See if we can expand.
307  if (tryExpand(state, cap)) {
308  // This or another thread started an expansion. Get updated info.
309  continue;
310  } else {
311  // Can't expand.
312  break;
313  }
314  }
315  } while (true);
316  this->enqueueWithTicketBase(
317  ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
318  }
static constexpr StringPiece ticket
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:179
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
Definition: MPMCQueue.h:586
bool tryExpand(const uint64_t state, const size_t cap) noexcept
Definition: MPMCQueue.h:541
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:608
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272
void asm_volatile_pause()
Definition: Asm.h:37
template<typename T , template< typename > class Atom>
template<typename... Args>
void folly::MPMCQueue< T, Atom, true >::enqueueWithTicket ( const uint64_t  ticket,
Args &&...  args 
)
inlineprivatenoexcept

Enqueues an element with a specific ticket number.

Definition at line 511 of file MPMCQueue.h.

References folly::gen::stride(), and uint64_t.

511  {
512  Slot* slots;
513  size_t cap;
514  int stride;
515  uint64_t state;
516  uint64_t offset;
517 
518  while (!trySeqlockReadSection(state, slots, cap, stride)) {
519  }
520 
521  // If there was an expansion after this ticket was issued, adjust
522  // accordingly
523  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
524 
525  this->enqueueWithTicketBase(
526  ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
527  }
static constexpr StringPiece ticket
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:179
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
Definition: MPMCQueue.h:586
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:608
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272
template<typename T , template< typename > class Atom>
int folly::MPMCQueue< T, Atom, true >::getNumClosed ( const uint64_t  state) const
inlineprivatenoexcept

Definition at line 533 of file MPMCQueue.h.

533  {
534  return (state & ((1 << kSeqlockBits) - 1)) >> 1;
535  }
state
Definition: http_parser.c:272
template<typename T , template< typename > class Atom>
uint64_t folly::MPMCQueue< T, Atom, true >::getOffset ( const uint64_t  state) const
inlineprivatenoexcept

Definition at line 529 of file MPMCQueue.h.

template<typename T , template< typename > class Atom>
void folly::MPMCQueue< T, Atom, true >::initQueue ( const size_t  cap,
const size_t  mult 
)
inlineprivate

Definition at line 348 of file MPMCQueue.h.

348  {
349  new (&this->dstride_) Atom<int>(this->computeStride(cap));
350  Slot* slots = new Slot[cap + 2 * this->kSlotPadding];
351  new (&this->dslots_) Atom<Slot*>(slots);
352  this->dstate_.store(0);
353  this->dcapacity_.store(cap);
354  dmult_ = mult;
355  size_t maxClosed = 0;
356  for (size_t expanded = cap; expanded < this->capacity_; expanded *= mult) {
357  ++maxClosed;
358  }
359  closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
360  }
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:179
template<typename T , template< typename > class Atom>
bool folly::MPMCQueue< T, Atom, true >::maybeUpdateFromClosed ( const uint64_t  state,
const uint64_t  ticket,
uint64_t offset,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprivatenoexcept

If there was an expansion after ticket was issued, update local variables of the lagging operation using the most recent closed array with offset <= ticket and return true. Otherwise, return false;

Definition at line 608 of file MPMCQueue.h.

References Atom, i, folly::gen::stride(), and ticket.

614  {
615  offset = getOffset(state);
616  if (ticket >= offset) {
617  return false;
618  }
619  for (int i = getNumClosed(state) - 1; i >= 0; --i) {
620  offset = closed_[i].offset_;
621  if (offset <= ticket) {
622  slots = closed_[i].slots_;
623  cap = closed_[i].capacity_;
624  stride = closed_[i].stride_;
625  return true;
626  }
627  }
628  // A closed array with offset <= ticket should have been found
629  assert(false);
630  return false;
631  }
uint64_t getOffset(const uint64_t state) const noexcept
Definition: MPMCQueue.h:529
static constexpr StringPiece ticket
int getNumClosed(const uint64_t state) const noexcept
Definition: MPMCQueue.h:533
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272
template<typename T , template< typename > class Atom>
MPMCQueue<T, Atom, true> const& folly::MPMCQueue< T, Atom, true >::operator= ( MPMCQueue< T, Atom, true > &&  rhs)
inline

Definition at line 250 of file MPMCQueue.h.

References folly::gen::move, and folly::detail::rhs.

250  {
251  if (this != &rhs) {
252  this->~MPMCQueue();
253  new (this) MPMCQueue(std::move(rhs));
254  }
255  return *this;
256  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
FOLLY_PUSH_WARNING RHS rhs
Definition: Traits.h:649
template<typename T , template< typename > class Atom>
bool folly::MPMCQueue< T, Atom, true >::tryExpand ( const uint64_t  state,
const size_t  cap 
)
inlineprivatenoexcept

Try to expand the queue. Returns true if this expansion was successful or a concurent expansion is in progress. Returns false if the queue has reached its maximum capacity or allocation has failed.

Definition at line 541 of file MPMCQueue.h.

References max, min, ticket, and uint64_t.

541  {
542  if (cap == this->capacity_) {
543  return false;
544  }
545  // Acquire seqlock
546  uint64_t oldval = state;
547  assert((state & 1) == 0);
548  if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
549  assert(cap == this->dcapacity_.load());
550  uint64_t ticket =
551  1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
552  size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
553  Slot* newSlots =
554  new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
555  if (newSlots == nullptr) {
556  // Expansion failed. Restore the seqlock
557  this->dstate_.store(state);
558  return false;
559  }
560  // Successful expansion
561  // calculate the current ticket offset
562  uint64_t offset = getOffset(state);
563  // calculate index in closed array
564  int index = getNumClosed(state);
565  assert((index << 1) < (1 << kSeqlockBits));
566  // fill the info for the closed slots array
567  closed_[index].offset_ = offset;
568  closed_[index].slots_ = this->dslots_.load();
569  closed_[index].capacity_ = cap;
570  closed_[index].stride_ = this->dstride_.load();
571  // update the new slots array info
572  this->dslots_.store(newSlots);
573  this->dcapacity_.store(newCapacity);
574  this->dstride_.store(this->computeStride(newCapacity));
575  // Release the seqlock and record the new ticket offset
576  this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
577  return true;
578  } else { // failed to acquire seqlock
579  // Someone acaquired the seqlock. Go back to the caller and get
580  // up-to-date info.
581  return true;
582  }
583  }
uint64_t getOffset(const uint64_t state) const noexcept
Definition: MPMCQueue.h:529
LogLevel max
Definition: LogLevel.cpp:31
static constexpr StringPiece ticket
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:179
LogLevel min
Definition: LogLevel.cpp:30
int getNumClosed(const uint64_t state) const noexcept
Definition: MPMCQueue.h:533
state
Definition: http_parser.c:272
template<typename T , template< typename > class Atom>
bool folly::MPMCQueue< T, Atom, true >::tryObtainPromisedPopTicket ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprivatenoexcept

Definition at line 479 of file MPMCQueue.h.

References testing::Args(), folly::asm_volatile_pause(), folly::gen::stride(), ticket, and uint64_t.

483  {
484  uint64_t state;
485  do {
486  ticket = this->popTicket_.load(std::memory_order_acquire);
487  auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
488  if (!trySeqlockReadSection(state, slots, cap, stride)) {
490  continue;
491  }
492 
493  uint64_t offset;
494  // If there was an expansion after the corresponding push
495  // ticket was issued, adjust accordingly
496  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
497 
498  if (ticket >= numPushes) {
499  ticket -= offset;
500  return false;
501  }
502  if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
503  ticket -= offset;
504  return true;
505  }
506  } while (true);
507  }
static constexpr StringPiece ticket
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
Definition: MPMCQueue.h:586
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:608
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272
void asm_volatile_pause()
Definition: Asm.h:37
template<typename T , template< typename > class Atom>
bool folly::MPMCQueue< T, Atom, true >::tryObtainPromisedPushTicket ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprivatenoexcept

Definition at line 408 of file MPMCQueue.h.

References folly::asm_volatile_pause(), int64_t, folly::gen::stride(), ticket, and uint64_t.

412  {
413  uint64_t state;
414  do {
415  ticket = this->pushTicket_.load(std::memory_order_acquire);
416  auto numPops = this->popTicket_.load(std::memory_order_acquire);
417  if (!trySeqlockReadSection(state, slots, cap, stride)) {
419  continue;
420  }
421 
422  const auto curCap = cap;
423  // If there was an expansion with offset greater than this ticket,
424  // adjust accordingly
425  uint64_t offset;
426  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
427 
428  int64_t n = ticket - numPops;
429 
430  if (n >= static_cast<ssize_t>(cap)) {
431  if ((cap == curCap) && tryExpand(state, cap)) {
432  // This or another thread started an expansion. Start over.
433  continue;
434  }
435  // Can't expand.
436  ticket -= offset;
437  return false;
438  }
439 
440  if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
441  // Adjust ticket
442  ticket -= offset;
443  return true;
444  }
445  } while (true);
446  }
static constexpr StringPiece ticket
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
Definition: MPMCQueue.h:586
bool tryExpand(const uint64_t state, const size_t cap) noexcept
Definition: MPMCQueue.h:541
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:608
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272
void asm_volatile_pause()
Definition: Asm.h:37
template<typename T , template< typename > class Atom>
bool folly::MPMCQueue< T, Atom, true >::tryObtainReadyPopTicket ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprivatenoexcept

Definition at line 448 of file MPMCQueue.h.

References folly::asm_volatile_pause(), folly::gen::stride(), ticket, and uint64_t.

452  {
453  uint64_t state;
454  do {
455  ticket = this->popTicket_.load(std::memory_order_relaxed);
456  if (!trySeqlockReadSection(state, slots, cap, stride)) {
458  continue;
459  }
460 
461  // If there was an expansion after the corresponding push ticket
462  // was issued, adjust accordingly
463  uint64_t offset;
464  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
465 
466  if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
467  this->turn(ticket - offset, cap))) {
468  if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
469  // Adjust ticket
470  ticket -= offset;
471  return true;
472  }
473  } else {
474  return false;
475  }
476  } while (true);
477  }
static constexpr StringPiece ticket
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
Definition: MPMCQueue.h:586
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:608
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272
void asm_volatile_pause()
Definition: Asm.h:37
template<typename T , template< typename > class Atom>
bool folly::MPMCQueue< T, Atom, true >::tryObtainReadyPushTicket ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprivatenoexcept

Definition at line 362 of file MPMCQueue.h.

References folly::asm_volatile_pause(), folly::gen::stride(), ticket, and uint64_t.

366  {
367  uint64_t state;
368  do {
369  ticket = this->pushTicket_.load(std::memory_order_acquire); // A
370  if (!trySeqlockReadSection(state, slots, cap, stride)) {
372  continue;
373  }
374 
375  // If there was an expansion with offset greater than this ticket,
376  // adjust accordingly
377  uint64_t offset;
378  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
379 
380  if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
381  this->turn(ticket - offset, cap))) {
382  // A slot is ready.
383  if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
384  // Adjust ticket
385  ticket -= offset;
386  return true;
387  } else {
388  continue;
389  }
390  } else {
391  if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
392  // Try again. Ticket changed.
393  continue;
394  }
395  // Likely to block.
396  // Try to expand unless the ticket is for a closed array
397  if (offset == getOffset(state)) {
398  if (tryExpand(state, cap)) {
399  // This or another thread started an expansion. Get up-to-date info.
400  continue;
401  }
402  }
403  return false;
404  }
405  } while (true);
406  }
uint64_t getOffset(const uint64_t state) const noexcept
Definition: MPMCQueue.h:529
static constexpr StringPiece ticket
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
Definition: MPMCQueue.h:586
bool tryExpand(const uint64_t state, const size_t cap) noexcept
Definition: MPMCQueue.h:541
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:608
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272
void asm_volatile_pause()
Definition: Asm.h:37
template<typename T , template< typename > class Atom>
bool folly::MPMCQueue< T, Atom, true >::trySeqlockReadSection ( uint64_t state,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprivatenoexcept

Seqlock read-only section.

Definition at line 586 of file MPMCQueue.h.

References folly::gen::stride().

590  {
591  state = this->dstate_.load(std::memory_order_acquire);
592  if (state & 1) {
593  // Locked.
594  return false;
595  }
596  // Start read-only section.
597  slots = this->dslots_.load(std::memory_order_relaxed);
598  cap = this->dcapacity_.load(std::memory_order_relaxed);
599  stride = this->dstride_.load(std::memory_order_relaxed);
600  // End of read-only section. Validate seqlock.
601  std::atomic_thread_fence(std::memory_order_acquire);
602  return (state == this->dstate_.load(std::memory_order_relaxed));
603  }
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
state
Definition: http_parser.c:272

Friends And Related Function Documentation

template<typename T , template< typename > class Atom>
friend class detail::MPMCQueueBase< MPMCQueue< T, Atom, true > >
friend

Definition at line 178 of file MPMCQueue.h.

Member Data Documentation

template<typename T , template< typename > class Atom>
ClosedArray* folly::MPMCQueue< T, Atom, true >::closed_
private

Definition at line 346 of file MPMCQueue.h.

template<typename T , template< typename > class Atom>
size_t folly::MPMCQueue< T, Atom, true >::dmult_
private

Definition at line 343 of file MPMCQueue.h.


The documentation for this class was generated from the following file: