proxygen
folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > > Class Template Reference

CRTP specialization of MPMCQueueBase. More...

#include <MPMCQueue.h>

Inheritance diagram for folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >:

Public Types

typedef T value_type
 
using Slot = detail::SingleElementQueue< T, Atom >
 

Public Member Functions

 MPMCQueueBase (size_t queueCapacity)
 
 MPMCQueueBase () noexcept
 
 MPMCQueueBase (MPMCQueueBase< Derived< T, Atom, Dynamic >> &&rhs) noexcept
 
MPMCQueueBase< Derived< T, Atom, Dynamic > > const & operator= (MPMCQueueBase< Derived< T, Atom, Dynamic >> &&rhs)
 
 ~MPMCQueueBase ()
 
ssize_t size () const noexcept
 
bool isEmpty () const noexcept
 Returns true if there are no items available for dequeue. More...
 
bool isFull () const noexcept
 Returns true if there is currently no empty space to enqueue. More...
 
ssize_t sizeGuess () const noexcept
 
size_t capacity () const noexcept
 Doesn't change. More...
 
size_t allocatedCapacity () const noexcept
 Doesn't change for non-dynamic. More...
 
uint64_t writeCount () const noexcept
 
uint64_t readCount () const noexcept
 
template<typename... Args>
void blockingWrite (Args &&...args) noexcept
 
template<typename... Args>
bool write (Args &&...args) noexcept
 
template<class Clock , typename... Args>
bool tryWriteUntil (const std::chrono::time_point< Clock > &when, Args &&...args) noexcept
 
template<typename... Args>
bool writeIfNotFull (Args &&...args) noexcept
 
void blockingRead (T &elem) noexcept
 
void blockingReadWithTicket (uint64_t &ticket, T &elem) noexcept
 Same as blockingRead() but also records the ticket nunmer. More...
 
bool read (T &elem) noexcept
 
bool readAndGetTicket (uint64_t &ticket, T &elem) noexcept
 Same as read() but also records the ticket nunmer. More...
 
template<class Clock , typename... Args>
bool tryReadUntil (const std::chrono::time_point< Clock > &when, T &elem) noexcept
 
bool readIfNotEmpty (T &elem) noexcept
 

Protected Types

enum  { kAdaptationFreq = 128, kSlotPadding }
 

Protected Member Functions

size_t idx (uint64_t ticket, size_t cap, int stride) noexcept
 
uint32_t turn (uint64_t ticket, size_t cap) noexcept
 
bool tryObtainReadyPushTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
 
template<class Clock >
bool tryObtainPromisedPushTicketUntil (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride, const std::chrono::time_point< Clock > &when) noexcept
 
bool tryObtainPromisedPushTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
 
bool tryObtainReadyPopTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
 
template<class Clock >
bool tryObtainPromisedPopTicketUntil (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride, const std::chrono::time_point< Clock > &when) noexcept
 
bool tryObtainPromisedPopTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
 
template<typename... Args>
void enqueueWithTicketBase (uint64_t ticket, Slot *slots, size_t cap, int stride, Args &&...args) noexcept
 
template<typename... Args>
void enqueueWithTicket (uint64_t ticket, Args &&...args) noexcept
 
void dequeueWithTicketBase (uint64_t ticket, Slot *slots, size_t cap, int stride, T &elem) noexcept
 

Static Protected Member Functions

static int computeStride (size_t capacity) noexcept
 

Protected Attributes

size_t capacity_
 The maximum number of items in the queue at once. More...
 
union folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >:: { ... }  
 Anonymous union for use when Dynamic = false and true, respectively. More...
 
union folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >:: { ... }  
 Anonymous union for use when Dynamic = false and true, respectively. More...
 
Atom< uint64_tdstate_
 
Atom< size_t > dcapacity_
 Dynamic capacity. More...
 
Atom< uint64_tpushTicket_
 Enqueuers get tickets from here. More...
 
Atom< uint64_tpopTicket_
 Dequeuers get tickets from here. More...
 
Atom< uint32_tpushSpinCutoff_
 
Atom< uint32_tpopSpinCutoff_
 The adaptive spin cutoff when the queue is empty on dequeue. More...
 
char pad_ [hardware_destructive_interference_size-sizeof(Atom< uint32_t >)]
 

Detailed Description

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T, template< typename > class Atom, bool Dynamic>
class folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >

CRTP specialization of MPMCQueueBase.

Definition at line 643 of file MPMCQueue.h.

Member Typedef Documentation

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
using folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::Slot = detail::SingleElementQueue<T, Atom>

Definition at line 657 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
typedef T folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::value_type

Definition at line 652 of file MPMCQueue.h.

Member Enumeration Documentation

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
anonymous enum
protected
Enumerator
kAdaptationFreq 

Once every kAdaptationFreq we will spin longer, to try to estimate the proper spin backoff

kSlotPadding 

To avoid false sharing in slots_ with neighboring memory allocations, we pad it with this many SingleElementQueue-s at each end

Definition at line 991 of file MPMCQueue.h.

Constructor & Destructor Documentation

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::MPMCQueueBase ( size_t  queueCapacity)
inlineexplicit

Definition at line 659 of file MPMCQueue.h.

References folly::hardware_destructive_interference_size.

660  : capacity_(queueCapacity),
661  dstate_(0),
662  dcapacity_(0),
663  pushTicket_(0),
664  popTicket_(0),
665  pushSpinCutoff_(0),
666  popSpinCutoff_(0) {
667  if (queueCapacity == 0) {
668  throw std::invalid_argument(
669  "MPMCQueue with explicit capacity 0 is impossible"
670  // Stride computation in derived classes would sigfpe if capacity is 0
671  );
672  }
673 
674  // ideally this would be a static assert, but g++ doesn't allow it
675  assert(
676  alignof(MPMCQueue<T, Atom>) >= hardware_destructive_interference_size);
677  assert(
678  static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
679  static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
680  static_cast<ptrdiff_t>(hardware_destructive_interference_size));
681  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
constexpr std::size_t hardware_destructive_interference_size
Definition: Align.h:107
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
Atom< uint32_t > popSpinCutoff_
The adaptive spin cutoff when the queue is empty on dequeue.
Definition: MPMCQueue.h:1049
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::MPMCQueueBase ( )
inlinenoexcept

A default-constructed queue is useful because a usable (non-zero capacity) queue can be moved onto it or swapped with it

Definition at line 685 of file MPMCQueue.h.

686  : capacity_(0),
687  slots_(nullptr),
688  stride_(0),
689  dstate_(0),
690  dcapacity_(0),
691  pushTicket_(0),
692  popTicket_(0),
693  pushSpinCutoff_(0),
694  popSpinCutoff_(0) {}
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
Atom< uint32_t > popSpinCutoff_
The adaptive spin cutoff when the queue is empty on dequeue.
Definition: MPMCQueue.h:1049
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::MPMCQueueBase ( MPMCQueueBase< Derived< T, Atom, Dynamic >> &&  rhs)
inlinenoexcept

IMPORTANT: The move constructor is here to make it easier to perform the initialization phase, it is not safe to use when there are any concurrent accesses (this is not checked).

Definition at line 699 of file MPMCQueue.h.

References folly::detail::rhs.

700  : capacity_(rhs.capacity_),
701  slots_(rhs.slots_),
702  stride_(rhs.stride_),
703  dstate_(rhs.dstate_.load(std::memory_order_relaxed)),
704  dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed)),
705  pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed)),
706  popTicket_(rhs.popTicket_.load(std::memory_order_relaxed)),
707  pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed)),
708  popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed)) {
709  // relaxed ops are okay for the previous reads, since rhs queue can't
710  // be in concurrent use
711 
712  // zero out rhs
713  rhs.capacity_ = 0;
714  rhs.slots_ = nullptr;
715  rhs.stride_ = 0;
716  rhs.dstate_.store(0, std::memory_order_relaxed);
717  rhs.dcapacity_.store(0, std::memory_order_relaxed);
718  rhs.pushTicket_.store(0, std::memory_order_relaxed);
719  rhs.popTicket_.store(0, std::memory_order_relaxed);
720  rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
721  rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
722  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
FOLLY_PUSH_WARNING RHS rhs
Definition: Traits.h:649
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
Atom< uint32_t > popSpinCutoff_
The adaptive spin cutoff when the queue is empty on dequeue.
Definition: MPMCQueue.h:1049
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::~MPMCQueueBase ( )
inline

MPMCQueue can only be safely destroyed when there are no pending enqueuers or dequeuers (this is not checked).

Definition at line 738 of file MPMCQueue.h.

Member Function Documentation

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
size_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::allocatedCapacity ( ) const
inlinenoexcept

Doesn't change for non-dynamic.

Definition at line 804 of file MPMCQueue.h.

804  {
805  return capacity_;
806  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
void folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::blockingRead ( T elem)
inlinenoexcept

Moves a dequeued element onto elem, blocking until an element is available

Definition at line 916 of file MPMCQueue.h.

References ticket, and uint64_t.

916  {
918  static_cast<Derived<T, Atom, Dynamic>*>(this)->blockingReadWithTicket(
919  ticket, elem);
920  }
void blockingReadWithTicket(uint64_t &ticket, T &elem) noexcept
Same as blockingRead() but also records the ticket nunmer.
Definition: MPMCQueue.h:923
static constexpr StringPiece ticket
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
void folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::blockingReadWithTicket ( uint64_t ticket,
T elem 
)
inlinenoexcept

Same as blockingRead() but also records the ticket nunmer.

Definition at line 923 of file MPMCQueue.h.

References ticket.

923  {
924  assert(capacity_ != 0);
925  ticket = popTicket_++;
927  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
void dequeueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, T &elem) noexcept
Definition: MPMCQueue.h:1319
static constexpr StringPiece ticket
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<typename... Args>
void folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::blockingWrite ( Args &&...  args)
inlinenoexcept

Enqueues a T constructed from args, blocking until space is available. Note that this method signature allows enqueue via move, if args is a T rvalue, via copy, if args is a T lvalue, or via emplacement if args is an initializer list that can be passed to a T constructor.

Definition at line 828 of file MPMCQueue.h.

References testing::Args().

828  {
830  pushTicket_++, slots_, capacity_, stride_, std::forward<Args>(args)...);
831  }
void enqueueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, Args &&...args) noexcept
Definition: MPMCQueue.h:1298
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
size_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::capacity ( ) const
inlinenoexcept

Doesn't change.

Definition at line 799 of file MPMCQueue.h.

799  {
800  return capacity_;
801  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
static int folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::computeStride ( size_t  capacity)
inlinestaticprotectednoexcept

We assign tickets in increasing order, but we don't want to access neighboring elements of slots_ because that will lead to false sharing (multiple cores accessing the same cache line even though they aren't accessing the same bytes in that cache line). To avoid this we advance by stride slots per ticket.

We need gcd(capacity, stride) to be 1 so that we will use all of the slots. We ensure this by only considering prime strides, which either have no common divisors with capacity or else have a zero remainder after dividing by capacity. That is sufficient to guarantee correctness, but we also want to actually spread the accesses away from each other to avoid false sharing (consider a stride of 7 with a capacity of 8). To that end we try a few taking care to observe that advancing by -1 is as bad as advancing by 1 when in comes to false sharing.

The simple way to avoid false sharing would be to pad each SingleElementQueue, but since we have capacity_ of them that could waste a lot of space.

Definition at line 1074 of file MPMCQueue.h.

References min, and folly::gen::stride().

1074  {
1075  static const int smallPrimes[] = {2, 3, 5, 7, 11, 13, 17, 19, 23};
1076 
1077  int bestStride = 1;
1078  size_t bestSep = 1;
1079  for (int stride : smallPrimes) {
1080  if ((stride % capacity) == 0 || (capacity % stride) == 0) {
1081  continue;
1082  }
1083  size_t sep = stride % capacity;
1084  sep = std::min(sep, capacity - sep);
1085  if (sep > bestSep) {
1086  bestStride = stride;
1087  bestSep = sep;
1088  }
1089  }
1090  return bestStride;
1091  }
LogLevel min
Definition: LogLevel.cpp:30
size_t capacity() const noexcept
Doesn&#39;t change.
Definition: MPMCQueue.h:799
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
void folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::dequeueWithTicketBase ( uint64_t  ticket,
Slot slots,
size_t  cap,
int  stride,
T elem 
)
inlineprotectednoexcept

Definition at line 1319 of file MPMCQueue.h.

References folly::gen::stride(), and ticket.

1324  {
1325  assert(cap != 0);
1326  slots[idx(ticket, cap, stride)].dequeue(
1327  turn(ticket, cap),
1329  (ticket % kAdaptationFreq) == 0,
1330  elem);
1331  }
static constexpr StringPiece ticket
uint32_t turn(uint64_t ticket, size_t cap) noexcept
Definition: MPMCQueue.h:1101
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept
Definition: MPMCQueue.h:1095
Atom< uint32_t > popSpinCutoff_
The adaptive spin cutoff when the queue is empty on dequeue.
Definition: MPMCQueue.h:1049
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<typename... Args>
void folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::enqueueWithTicket ( uint64_t  ticket,
Args &&...  args 
)
inlineprotectednoexcept

Definition at line 1313 of file MPMCQueue.h.

1313  {
1315  ticket, slots_, capacity_, stride_, std::forward<Args>(args)...);
1316  }
void enqueueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, Args &&...args) noexcept
Definition: MPMCQueue.h:1298
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
static constexpr StringPiece ticket
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<typename... Args>
void folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::enqueueWithTicketBase ( uint64_t  ticket,
Slot slots,
size_t  cap,
int  stride,
Args &&...  args 
)
inlineprotectednoexcept

Definition at line 1298 of file MPMCQueue.h.

References testing::Args(), and folly::detail::SingleElementQueue< T, Atom >::enqueue().

1303  {
1304  slots[idx(ticket, cap, stride)].enqueue(
1305  turn(ticket, cap),
1307  (ticket % kAdaptationFreq) == 0,
1308  std::forward<Args>(args)...);
1309  }
static constexpr StringPiece ticket
uint32_t turn(uint64_t ticket, size_t cap) noexcept
Definition: MPMCQueue.h:1101
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept
Definition: MPMCQueue.h:1095
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
size_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::idx ( uint64_t  ticket,
size_t  cap,
int  stride 
)
inlineprotectednoexcept

Returns the index into slots_ that should be used when enqueuing or dequeuing with the specified ticket

Definition at line 1095 of file MPMCQueue.h.

References folly::gen::stride(), and ticket.

1095  {
1096  return ((ticket * stride) % cap) + kSlotPadding;
1097  }
static constexpr StringPiece ticket
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::isEmpty ( ) const
inlinenoexcept

Returns true if there are no items available for dequeue.

Definition at line 778 of file MPMCQueue.h.

References folly::size().

778  {
779  return size() <= 0;
780  }
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::isFull ( ) const
inlinenoexcept

Returns true if there is currently no empty space to enqueue.

Definition at line 783 of file MPMCQueue.h.

References folly::size().

783  {
784  // careful with signed -> unsigned promotion, since size can be negative
785  return size() >= static_cast<ssize_t>(capacity_);
786  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
MPMCQueueBase<Derived<T, Atom, Dynamic> > const& folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::operator= ( MPMCQueueBase< Derived< T, Atom, Dynamic >> &&  rhs)
inline

IMPORTANT: The move operator is here to make it easier to perform the initialization phase, it is not safe to use when there are any concurrent accesses (this is not checked).

Definition at line 727 of file MPMCQueue.h.

References folly::gen::move, and folly::detail::rhs.

728  {
729  if (this != &rhs) {
730  this->~MPMCQueueBase();
731  new (this) MPMCQueueBase(std::move(rhs));
732  }
733  return *this;
734  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
FOLLY_PUSH_WARNING RHS rhs
Definition: Traits.h:649
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::read ( T elem)
inlinenoexcept

If an item can be dequeued with no blocking, does so and returns true, otherwise returns false.

Definition at line 931 of file MPMCQueue.h.

References ticket, and uint64_t.

931  {
933  return readAndGetTicket(ticket, elem);
934  }
static constexpr StringPiece ticket
bool readAndGetTicket(uint64_t &ticket, T &elem) noexcept
Same as read() but also records the ticket nunmer.
Definition: MPMCQueue.h:937
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::readAndGetTicket ( uint64_t ticket,
T elem 
)
inlinenoexcept

Same as read() but also records the ticket nunmer.

Definition at line 937 of file MPMCQueue.h.

References testing::Args(), folly::gen::stride(), and ticket.

937  {
938  Slot* slots;
939  size_t cap;
940  int stride;
941  if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPopTicket(
942  ticket, slots, cap, stride)) {
943  // the ticket has been pre-validated to not block
944  dequeueWithTicketBase(ticket, slots, cap, stride, elem);
945  return true;
946  } else {
947  return false;
948  }
949  }
void dequeueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, T &elem) noexcept
Definition: MPMCQueue.h:1319
static constexpr StringPiece ticket
bool tryObtainReadyPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1207
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:657
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
uint64_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::readCount ( ) const
inlinenoexcept

Returns the total number of calls to blockingRead or successful calls to read, including those blockingRead calls that are currently blocking

Definition at line 818 of file MPMCQueue.h.

References testing::Args().

818  {
819  return popTicket_.load(std::memory_order_acquire);
820  }
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::readIfNotEmpty ( T elem)
inlinenoexcept

If the queue is not empty, dequeues and returns true, otherwise returns false. If the matching write is still in progress then this method may block waiting for it. If you don't rely on being able to dequeue (such as by counting completed write) then you should prefer read.

Definition at line 975 of file MPMCQueue.h.

References folly::gen::stride(), ticket, and uint64_t.

975  {
977  Slot* slots;
978  size_t cap;
979  int stride;
980  if (static_cast<Derived<T, Atom, Dynamic>*>(this)
981  ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
982  // the matching enqueue already has a ticket, but might not be done
983  dequeueWithTicketBase(ticket, slots, cap, stride, elem);
984  return true;
985  } else {
986  return false;
987  }
988  }
void dequeueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, T &elem) noexcept
Definition: MPMCQueue.h:1319
static constexpr StringPiece ticket
bool tryObtainPromisedPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1272
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:657
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
ssize_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::size ( ) const
inlinenoexcept

Returns the number of writes (including threads that are blocked waiting to write) minus the number of reads (including threads that are blocked waiting to read). So effectively, it becomes: elements in queue + pending(calls to write) - pending(calls to read). If nothing is pending, then the method returns the actual number of elements in the queue. The returned value can be negative if there are no writers and the queue is empty, but there is one reader that is blocked waiting to read (in which case, the returned size will be -1).

Definition at line 751 of file MPMCQueue.h.

References uint64_t.

751  {
752  // since both pushes and pops increase monotonically, we can get a
753  // consistent snapshot either by bracketing a read of popTicket_ with
754  // two reads of pushTicket_ that return the same value, or the other
755  // way around. We maximize our chances by alternately attempting
756  // both bracketings.
757  uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A
758  uint64_t pops = popTicket_.load(std::memory_order_acquire); // B
759  while (true) {
760  uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C
761  if (pushes == nextPushes) {
762  // pushTicket_ didn't change from A (or the previous C) to C,
763  // so we can linearize at B (or D)
764  return ssize_t(pushes - pops);
765  }
766  pushes = nextPushes;
767  uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
768  if (pops == nextPops) {
769  // popTicket_ didn't chance from B (or the previous D), so we
770  // can linearize at C
771  return ssize_t(pushes - pops);
772  }
773  pops = nextPops;
774  }
775  }
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
ssize_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::sizeGuess ( ) const
inlinenoexcept

Returns is a guess at size() for contexts that don't need a precise value, such as stats. More specifically, it returns the number of writes minus the number of reads, but after reading the number of writes, more writers could have came before the number of reads was sampled, and this method doesn't protect against such case. The returned value can be negative.

Definition at line 794 of file MPMCQueue.h.

794  {
795  return writeCount() - readCount();
796  }
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::tryObtainPromisedPopTicket ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprotectednoexcept

Similar to tryObtainReadyPopTicket, but returns a pop ticket whose corresponding push ticket has already been handed out, rather than returning one whose corresponding push ticket has already been completed. This means that there is a possibility that the caller will block when using the ticket, but it allows the user to rely on the fact that if enqueue has succeeded, tryObtainPromisedPopTicket will return true. The "try" part of this is that we won't have to block waiting for someone to call enqueue, although we might have to block waiting for them to finish executing code inside the MPMCQueue itself.

Definition at line 1272 of file MPMCQueue.h.

References testing::Args(), folly::gen::stride(), and ticket.

1276  {
1277  auto numPops = popTicket_.load(std::memory_order_acquire); // A
1278  slots = slots_;
1279  cap = capacity_;
1280  stride = stride_;
1281  while (true) {
1282  ticket = numPops;
1283  const auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
1284  if (numPops >= numPushes) {
1285  // Empty, or empty with pending pops. Linearize at B. We don't
1286  // need to recheck the read we performed at A, because if numPops
1287  // is stale then the fresh value is larger and the >= is still true
1288  return false;
1289  }
1290  if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
1291  return true;
1292  }
1293  }
1294  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
static constexpr StringPiece ticket
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<class Clock >
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::tryObtainPromisedPopTicketUntil ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride,
const std::chrono::time_point< Clock > &  when 
)
inlineprotectednoexcept

Tries until when to obtain a pop ticket for which SingleElementQueue::dequeue won't block. Returns true on success, false on failure. ticket is filled on success AND failure.

Definition at line 1236 of file MPMCQueue.h.

References folly::gen::stride(), ticket, and folly::when().

1241  {
1242  bool deadlineReached = false;
1243  while (!deadlineReached) {
1244  if (static_cast<Derived<T, Atom, Dynamic>*>(this)
1245  ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
1246  return true;
1247  }
1248  // ticket is a blocking ticket until the preceding ticket has been
1249  // processed: wait until this ticket's turn arrives. We have not reserved
1250  // this ticket so we will have to re-attempt to get a non-blocking ticket
1251  // if we wake up before we time-out.
1252  deadlineReached =
1253  !slots[idx(ticket, cap, stride)].tryWaitForDequeueTurnUntil(
1254  turn(ticket, cap),
1256  (ticket % kAdaptationFreq) == 0,
1257  when);
1258  }
1259  return false;
1260  }
static constexpr StringPiece ticket
bool tryObtainPromisedPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1272
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
uint32_t turn(uint64_t ticket, size_t cap) noexcept
Definition: MPMCQueue.h:1101
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept
Definition: MPMCQueue.h:1095
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::tryObtainPromisedPushTicket ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprotectednoexcept

Tries to obtain a push ticket which can be satisfied if all in-progress pops complete. This function does not block, but blocking may be required when using the returned ticket if some other thread's pop is still in progress (ticket has been granted but pop has not yet completed).

Definition at line 1178 of file MPMCQueue.h.

References int64_t, folly::gen::stride(), and ticket.

1182  {
1183  auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
1184  slots = slots_;
1185  cap = capacity_;
1186  stride = stride_;
1187  while (true) {
1188  ticket = numPushes;
1189  const auto numPops = popTicket_.load(std::memory_order_acquire); // B
1190  // n will be negative if pops are pending
1191  const int64_t n = int64_t(numPushes - numPops);
1192  if (n >= static_cast<ssize_t>(capacity_)) {
1193  // Full, linearize at B. We don't need to recheck the read we
1194  // performed at A, because if numPushes was stale at B then the
1195  // real numPushes value is even worse
1196  return false;
1197  }
1198  if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
1199  return true;
1200  }
1201  }
1202  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
static constexpr StringPiece ticket
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<class Clock >
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::tryObtainPromisedPushTicketUntil ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride,
const std::chrono::time_point< Clock > &  when 
)
inlineprotectednoexcept

Tries until when to obtain a push ticket for which SingleElementQueue::enqueue won't block. Returns true on success, false on failure. ticket is filled on success AND failure.

Definition at line 1147 of file MPMCQueue.h.

References folly::gen::stride(), ticket, and folly::when().

1152  {
1153  bool deadlineReached = false;
1154  while (!deadlineReached) {
1155  if (static_cast<Derived<T, Atom, Dynamic>*>(this)
1156  ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
1157  return true;
1158  }
1159  // ticket is a blocking ticket until the preceding ticket has been
1160  // processed: wait until this ticket's turn arrives. We have not reserved
1161  // this ticket so we will have to re-attempt to get a non-blocking ticket
1162  // if we wake up before we time-out.
1163  deadlineReached =
1164  !slots[idx(ticket, cap, stride)].tryWaitForEnqueueTurnUntil(
1165  turn(ticket, cap),
1167  (ticket % kAdaptationFreq) == 0,
1168  when);
1169  }
1170  return false;
1171  }
bool tryObtainPromisedPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1178
static constexpr StringPiece ticket
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
uint32_t turn(uint64_t ticket, size_t cap) noexcept
Definition: MPMCQueue.h:1101
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept
Definition: MPMCQueue.h:1095
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::tryObtainReadyPopTicket ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprotectednoexcept

Tries to obtain a pop ticket for which SingleElementQueue::dequeue won't block. Returns true on immediate success, false on immediate failure.

Definition at line 1207 of file MPMCQueue.h.

References folly::gen::stride(), and ticket.

1211  {
1212  ticket = popTicket_.load(std::memory_order_acquire);
1213  slots = slots_;
1214  cap = capacity_;
1215  stride = stride_;
1216  while (true) {
1217  if (!slots[idx(ticket, cap, stride)].mayDequeue(turn(ticket, cap))) {
1218  auto prev = ticket;
1219  ticket = popTicket_.load(std::memory_order_acquire);
1220  if (prev == ticket) {
1221  return false;
1222  }
1223  } else {
1224  if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
1225  return true;
1226  }
1227  }
1228  }
1229  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
Atom< uint64_t > popTicket_
Dequeuers get tickets from here.
Definition: MPMCQueue.h:1040
static constexpr StringPiece ticket
uint32_t turn(uint64_t ticket, size_t cap) noexcept
Definition: MPMCQueue.h:1101
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept
Definition: MPMCQueue.h:1095
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::tryObtainReadyPushTicket ( uint64_t ticket,
Slot *&  slots,
size_t &  cap,
int &  stride 
)
inlineprotectednoexcept

Tries to obtain a push ticket for which SingleElementQueue::enqueue won't block. Returns true on immediate success, false on immediate failure.

Definition at line 1109 of file MPMCQueue.h.

References folly::gen::stride(), and ticket.

1113  {
1114  ticket = pushTicket_.load(std::memory_order_acquire); // A
1115  slots = slots_;
1116  cap = capacity_;
1117  stride = stride_;
1118  while (true) {
1119  if (!slots[idx(ticket, cap, stride)].mayEnqueue(turn(ticket, cap))) {
1120  // if we call enqueue(ticket, ...) on the SingleElementQueue
1121  // right now it would block, but this might no longer be the next
1122  // ticket. We can increase the chance of tryEnqueue success under
1123  // contention (without blocking) by rechecking the ticket dispenser
1124  auto prev = ticket;
1125  ticket = pushTicket_.load(std::memory_order_acquire); // B
1126  if (prev == ticket) {
1127  // mayEnqueue was bracketed by two reads (A or prev B or prev
1128  // failing CAS to B), so we are definitely unable to enqueue
1129  return false;
1130  }
1131  } else {
1132  // we will bracket the mayEnqueue check with a read (A or prev B
1133  // or prev failing CAS) and the following CAS. If the CAS fails
1134  // it will effect a load of pushTicket_
1135  if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
1136  return true;
1137  }
1138  }
1139  }
1140  }
size_t capacity_
The maximum number of items in the queue at once.
Definition: MPMCQueue.h:1004
static constexpr StringPiece ticket
uint32_t turn(uint64_t ticket, size_t cap) noexcept
Definition: MPMCQueue.h:1101
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept
Definition: MPMCQueue.h:1095
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<class Clock , typename... Args>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::tryReadUntil ( const std::chrono::time_point< Clock > &  when,
T elem 
)
inlinenoexcept

Definition at line 952 of file MPMCQueue.h.

References folly::gen::stride(), ticket, uint64_t, and folly::when().

954  {
956  Slot* slots;
957  size_t cap;
958  int stride;
959  if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride, when)) {
960  // we have pre-validated that the ticket won't block, or rather that
961  // it won't block longer than it takes another thread to enqueue an
962  // element on the slot it identifies.
963  dequeueWithTicketBase(ticket, slots, cap, stride, elem);
964  return true;
965  } else {
966  return false;
967  }
968  }
void dequeueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, T &elem) noexcept
Definition: MPMCQueue.h:1319
static constexpr StringPiece ticket
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
bool tryObtainPromisedPopTicketUntil(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride, const std::chrono::time_point< Clock > &when) noexcept
Definition: MPMCQueue.h:1236
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:657
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<class Clock , typename... Args>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::tryWriteUntil ( const std::chrono::time_point< Clock > &  when,
Args &&...  args 
)
inlinenoexcept

Definition at line 864 of file MPMCQueue.h.

References testing::Args(), folly::gen::stride(), ticket, and uint64_t.

866  {
868  Slot* slots;
869  size_t cap;
870  int stride;
871  if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
872  // we have pre-validated that the ticket won't block, or rather that
873  // it won't block longer than it takes another thread to dequeue an
874  // element from the slot it identifies.
876  ticket, slots, cap, stride, std::forward<Args>(args)...);
877  return true;
878  } else {
879  return false;
880  }
881  }
void enqueueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, Args &&...args) noexcept
Definition: MPMCQueue.h:1298
static constexpr StringPiece ticket
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
bool tryObtainPromisedPushTicketUntil(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride, const std::chrono::time_point< Clock > &when) noexcept
Definition: MPMCQueue.h:1147
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:657
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
uint32_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::turn ( uint64_t  ticket,
size_t  cap 
)
inlineprotectednoexcept

Maps an enqueue or dequeue ticket to the turn should be used at the corresponding SingleElementQueue

Definition at line 1101 of file MPMCQueue.h.

References ticket, and uint32_t.

1101  {
1102  assert(cap != 0);
1103  return uint32_t(ticket / cap);
1104  }
static constexpr StringPiece ticket
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<typename... Args>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::write ( Args &&...  args)
inlinenoexcept

If an item can be enqueued with no blocking, does so and returns true, otherwise returns false. This method is similar to writeIfNotFull, but if you don't have a specific need for that method you should use this one.

One of the common usages of this method is to enqueue via the move constructor, something like q.write(std::move(x)). If write returns false because the queue is full then x has not actually been consumed, which looks strange. To understand why it is actually okay to use x afterward, remember that std::move is just a typecast that provides an rvalue reference that enables use of a move constructor or operator. std::move doesn't actually move anything. It could more accurately be called std::rvalue_cast or std::move_permission.

Definition at line 847 of file MPMCQueue.h.

References testing::Args(), folly::gen::stride(), ticket, and uint64_t.

847  {
849  Slot* slots;
850  size_t cap;
851  int stride;
852  if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPushTicket(
853  ticket, slots, cap, stride)) {
854  // we have pre-validated that the ticket won't block
856  ticket, slots, cap, stride, std::forward<Args>(args)...);
857  return true;
858  } else {
859  return false;
860  }
861  }
bool tryObtainReadyPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1109
void enqueueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, Args &&...args) noexcept
Definition: MPMCQueue.h:1298
static constexpr StringPiece ticket
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:657
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
uint64_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::writeCount ( ) const
inlinenoexcept

Returns the total number of calls to blockingWrite or successful calls to write, including those blockingWrite calls that are currently blocking

Definition at line 811 of file MPMCQueue.h.

811  {
812  return pushTicket_.load(std::memory_order_acquire);
813  }
Atom< uint64_t > pushTicket_
Enqueuers get tickets from here.
Definition: MPMCQueue.h:1037
template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
template<typename... Args>
bool folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::writeIfNotFull ( Args &&...  args)
inlinenoexcept

If the queue is not full, enqueues and returns true, otherwise returns false. Unlike write this method can be blocked by another thread, specifically a read that has linearized (been assigned a ticket) but not yet completed. If you don't really need this function you should probably use write.

MPMCQueue isn't lock-free, so just because a read operation has linearized (and isFull is false) doesn't mean that space has been made available for another write. In this situation write will return false, but writeIfNotFull will wait for the dequeue to finish. This method is required if you are composing queues and managing your own wakeup, because it guarantees that after every successful write a readIfNotEmpty will succeed.

Definition at line 897 of file MPMCQueue.h.

References folly::gen::stride(), ticket, and uint64_t.

897  {
899  Slot* slots;
900  size_t cap;
901  int stride;
902  if (static_cast<Derived<T, Atom, Dynamic>*>(this)
903  ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
904  // some other thread is already dequeuing the slot into which we
905  // are going to enqueue, but we might have to wait for them to finish
907  ticket, slots, cap, stride, std::forward<Args>(args)...);
908  return true;
909  } else {
910  return false;
911  }
912  }
bool tryObtainPromisedPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1178
void enqueueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, Args &&...args) noexcept
Definition: MPMCQueue.h:1298
static constexpr StringPiece ticket
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
detail::SingleElementQueue< T, Atom > Slot
Definition: MPMCQueue.h:657

Member Data Documentation

union { ... }

Anonymous union for use when Dynamic = false and true, respectively.

union { ... }

Anonymous union for use when Dynamic = false and true, respectively.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
size_t folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::capacity_
protected

The maximum number of items in the queue at once.

Definition at line 1004 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Atom<size_t> folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::dcapacity_
protected

Dynamic capacity.

Definition at line 1034 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Atom<Slot*> folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::dslots_

Current dynamic slots array of dcapacity_ SingleElementQueue-s.

Definition at line 1013 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Atom<uint64_t> folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::dstate_
protected

The following two memebers are used by dynamic MPMCQueue. Ideally they should be in MPMCQueue<T,Atom,true>, but we get better cache locality if they are in the same cache line as dslots_ and dstride_.

Dynamic state. A packed seqlock and ticket offset

Definition at line 1032 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Atom<int> folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::dstride_

Current stride.

Definition at line 1023 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
char folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::pad_[hardware_destructive_interference_size-sizeof(Atom< uint32_t >)]
protected

Alignment doesn't prevent false sharing at the end of the struct, so fill out the last cache line

Definition at line 1053 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Atom<uint32_t> folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::popSpinCutoff_
protected

The adaptive spin cutoff when the queue is empty on dequeue.

Definition at line 1049 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Atom<uint64_t> folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::popTicket_
protected

Dequeuers get tickets from here.

Definition at line 1040 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Atom<uint32_t> folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::pushSpinCutoff_
protected

This is how many times we will spin before using FUTEX_WAIT when the queue is full on enqueue, adaptively computed by occasionally spinning for longer and smoothing with an exponential moving average

Definition at line 1046 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Atom<uint64_t> folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::pushTicket_
protected

Enqueuers get tickets from here.

Definition at line 1037 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
Slot* folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::slots_

An array of capacity_ SingleElementQueue-s, each of which holds either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't touch the slots at either end, to avoid false sharing

Definition at line 1011 of file MPMCQueue.h.

template<template< typename T, template< typename > class Atom, bool Dynamic > class Derived, typename T , template< typename > class Atom, bool Dynamic>
int folly::detail::MPMCQueueBase< Derived< T, Atom, Dynamic > >::stride_

The number of slots_ indices that we advance for each ticket, to avoid false sharing. Ideally slots_[i] and slots_[i + stride_] aren't on the same cache line

Definition at line 1021 of file MPMCQueue.h.


The documentation for this class was generated from the following file: