24 #include <type_traits> 26 #include <boost/noncopyable.hpp> 37 template <
typename T,
template <
typename>
class Atom>
104 template <
typename>
class Atom = std::atomic,
105 bool Dynamic =
false>
113 this->stride_ = this->computeStride(queueCapacity);
114 this->slots_ =
new Slot[queueCapacity + 2 * this->kSlotPadding];
175 template <
typename T,
template <
typename>
class Atom>
191 size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
192 initQueue(cap, kDefaultExpansionMultiplier);
196 size_t queueCapacity,
198 size_t expansionMultiplier)
200 minCapacity = std::max<size_t>(1, minCapacity);
201 size_t cap = std::min<size_t>(minCapacity, queueCapacity);
202 expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
203 initQueue(cap, expansionMultiplier);
212 this->capacity_ =
rhs.capacity_;
214 Atom<Slot*>(
rhs.dslots_.load(std::memory_order_relaxed));
215 new (&this->dstride_)
216 Atom<int>(
rhs.dstride_.load(std::memory_order_relaxed));
218 rhs.dstate_.load(std::memory_order_relaxed), std::memory_order_relaxed);
219 this->dcapacity_.store(
220 rhs.dcapacity_.load(std::memory_order_relaxed),
221 std::memory_order_relaxed);
222 this->pushTicket_.store(
223 rhs.pushTicket_.load(std::memory_order_relaxed),
224 std::memory_order_relaxed);
225 this->popTicket_.store(
226 rhs.popTicket_.load(std::memory_order_relaxed),
227 std::memory_order_relaxed);
228 this->pushSpinCutoff_.store(
229 rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
230 std::memory_order_relaxed);
231 this->popSpinCutoff_.store(
232 rhs.popSpinCutoff_.load(std::memory_order_relaxed),
233 std::memory_order_relaxed);
235 closed_ =
rhs.closed_;
238 rhs.dslots_.store(
nullptr, std::memory_order_relaxed);
239 rhs.dstride_.store(0, std::memory_order_relaxed);
240 rhs.dstate_.store(0, std::memory_order_relaxed);
241 rhs.dcapacity_.store(0, std::memory_order_relaxed);
242 rhs.pushTicket_.store(0, std::memory_order_relaxed);
243 rhs.popTicket_.store(0, std::memory_order_relaxed);
244 rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
245 rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
247 rhs.closed_ =
nullptr;
259 if (closed_ !=
nullptr) {
260 for (
int i = getNumClosed(this->dstate_.load()) - 1;
i >= 0; --
i) {
261 delete[] closed_[
i].slots_;
265 using AtomInt = Atom<int>;
266 this->dstride_.~AtomInt();
267 using AtomSlot = Atom<Slot*>;
269 auto slots = this->dslots_.load();
270 this->dslots_.~AtomSlot();
271 this->slots_ = slots;
275 return this->dcapacity_.load(std::memory_order_relaxed);
278 template <
typename...
Args>
287 if (!trySeqlockReadSection(state, slots, cap, stride)) {
291 if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
295 if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
296 this->turn(ticket - offset, cap))) {
300 this->popTicket_.load(std::memory_order_relaxed) + cap >
ticket) {
307 if (tryExpand(state, cap)) {
316 this->enqueueWithTicketBase(
317 ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
321 ticket = this->popTicket_++;
327 while (!trySeqlockReadSection(state, slots, cap, stride)) {
332 maybeUpdateFromClosed(state,
ticket, offset, slots, cap, stride);
333 this->dequeueWithTicketBase(
ticket - offset, slots, cap, stride, elem);
339 kDefaultMinDynamicCapacity = 10,
340 kDefaultExpansionMultiplier = 10,
349 new (&this->dstride_) Atom<int>(this->computeStride(cap));
350 Slot* slots =
new Slot[cap + 2 * this->kSlotPadding];
351 new (&this->dslots_) Atom<Slot*>(slots);
352 this->dstate_.store(0);
353 this->dcapacity_.store(cap);
355 size_t maxClosed = 0;
356 for (
size_t expanded = cap; expanded < this->capacity_; expanded *= mult) {
359 closed_ = (maxClosed > 0) ?
new ClosedArray[maxClosed] :
nullptr;
369 ticket = this->pushTicket_.load(std::memory_order_acquire);
370 if (!trySeqlockReadSection(state, slots, cap,
stride)) {
378 maybeUpdateFromClosed(state,
ticket, offset, slots, cap,
stride);
380 if (slots[this->idx((
ticket - offset), cap,
stride)].mayEnqueue(
381 this->turn(
ticket - offset, cap))) {
383 if (this->pushTicket_.compare_exchange_strong(
ticket,
ticket + 1)) {
391 if (
ticket != this->pushTicket_.load(std::memory_order_relaxed)) {
397 if (offset == getOffset(state)) {
398 if (tryExpand(state, cap)) {
415 ticket = this->pushTicket_.load(std::memory_order_acquire);
416 auto numPops = this->popTicket_.load(std::memory_order_acquire);
417 if (!trySeqlockReadSection(state, slots, cap,
stride)) {
422 const auto curCap = cap;
426 maybeUpdateFromClosed(state,
ticket, offset, slots, cap,
stride);
430 if (n >= static_cast<ssize_t>(cap)) {
431 if ((cap == curCap) && tryExpand(state, cap)) {
440 if (this->pushTicket_.compare_exchange_strong(
ticket,
ticket + 1)) {
455 ticket = this->popTicket_.load(std::memory_order_relaxed);
456 if (!trySeqlockReadSection(state, slots, cap,
stride)) {
464 maybeUpdateFromClosed(state,
ticket, offset, slots, cap,
stride);
466 if (slots[this->idx((
ticket - offset), cap,
stride)].mayDequeue(
467 this->turn(
ticket - offset, cap))) {
468 if (this->popTicket_.compare_exchange_strong(
ticket,
ticket + 1)) {
486 ticket = this->popTicket_.load(std::memory_order_acquire);
487 auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
488 if (!trySeqlockReadSection(state, slots, cap,
stride)) {
496 maybeUpdateFromClosed(state,
ticket, offset, slots, cap,
stride);
498 if (
ticket >= numPushes) {
502 if (this->popTicket_.compare_exchange_strong(
ticket,
ticket + 1)) {
510 template <
typename...
Args>
518 while (!trySeqlockReadSection(state, slots, cap, stride)) {
523 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
525 this->enqueueWithTicketBase(
526 ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
530 return state >> kSeqlockBits;
534 return (
state & ((1 << kSeqlockBits) - 1)) >> 1;
542 if (cap == this->capacity_) {
547 assert((
state & 1) == 0);
548 if (this->dstate_.compare_exchange_strong(oldval,
state + 1)) {
549 assert(cap == this->dcapacity_.load());
551 1 +
std::max(this->pushTicket_.load(), this->popTicket_.load());
552 size_t newCapacity =
std::min(dmult_ * cap, this->capacity_);
554 new (std::nothrow)
Slot[newCapacity + 2 * this->kSlotPadding];
555 if (newSlots ==
nullptr) {
557 this->dstate_.store(
state);
564 int index = getNumClosed(
state);
565 assert((index << 1) < (1 << kSeqlockBits));
567 closed_[index].offset_ = offset;
568 closed_[index].slots_ = this->dslots_.load();
569 closed_[index].capacity_ = cap;
570 closed_[index].stride_ = this->dstride_.load();
572 this->dslots_.store(newSlots);
573 this->dcapacity_.store(newCapacity);
574 this->dstride_.store(this->computeStride(newCapacity));
576 this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
591 state = this->dstate_.load(std::memory_order_acquire);
597 slots = this->dslots_.load(std::memory_order_relaxed);
598 cap = this->dcapacity_.load(std::memory_order_relaxed);
599 stride = this->dstride_.load(std::memory_order_relaxed);
601 std::atomic_thread_fence(std::memory_order_acquire);
602 return (
state == this->dstate_.load(std::memory_order_relaxed));
615 offset = getOffset(
state);
619 for (
int i = getNumClosed(
state) - 1;
i >= 0; --
i) {
620 offset = closed_[
i].offset_;
622 slots = closed_[
i].slots_;
623 cap = closed_[
i].capacity_;
638 template <
typename T,
template <
typename>
class Atom,
bool Dynamic>
641 template <typename> class Atom,
652 "T must be relocatable or have a noexcept move constructor");
660 : capacity_(queueCapacity),
667 if (queueCapacity == 0) {
668 throw std::invalid_argument(
669 "MPMCQueue with explicit capacity 0 is impossible" 678 static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
679 static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
700 : capacity_(
rhs.capacity_),
702 stride_(
rhs.stride_),
703 dstate_(
rhs.dstate_.
load(
std::memory_order_relaxed)),
704 dcapacity_(
rhs.dcapacity_.
load(
std::memory_order_relaxed)),
705 pushTicket_(
rhs.pushTicket_.
load(
std::memory_order_relaxed)),
706 popTicket_(
rhs.popTicket_.
load(
std::memory_order_relaxed)),
707 pushSpinCutoff_(
rhs.pushSpinCutoff_.
load(
std::memory_order_relaxed)),
708 popSpinCutoff_(
rhs.popSpinCutoff_.
load(
std::memory_order_relaxed)) {
714 rhs.slots_ =
nullptr;
716 rhs.dstate_.store(0, std::memory_order_relaxed);
717 rhs.dcapacity_.store(0, std::memory_order_relaxed);
718 rhs.pushTicket_.store(0, std::memory_order_relaxed);
719 rhs.popTicket_.store(0, std::memory_order_relaxed);
720 rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
721 rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
757 uint64_t pushes = pushTicket_.load(std::memory_order_acquire);
758 uint64_t pops = popTicket_.load(std::memory_order_acquire);
760 uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire);
761 if (pushes == nextPushes) {
764 return ssize_t(pushes - pops);
767 uint64_t nextPops = popTicket_.load(std::memory_order_acquire);
768 if (pops == nextPops) {
771 return ssize_t(pushes - pops);
785 return size() >=
static_cast<ssize_t
>(capacity_);
795 return writeCount() - readCount();
812 return pushTicket_.load(std::memory_order_acquire);
819 return popTicket_.load(std::memory_order_acquire);
827 template <
typename...
Args>
829 enqueueWithTicketBase(
830 pushTicket_++, slots_, capacity_, stride_, std::forward<Args>(args)...);
846 template <
typename...
Args>
852 if (
static_cast<Derived<T, Atom, Dynamic>*
>(
this)->tryObtainReadyPushTicket(
853 ticket, slots, cap, stride)) {
855 enqueueWithTicketBase(
856 ticket, slots, cap, stride, std::forward<Args>(args)...);
863 template <
class Clock,
typename...
Args>
865 const std::chrono::time_point<Clock>&
when,
871 if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
875 enqueueWithTicketBase(
876 ticket, slots, cap, stride, std::forward<Args>(args)...);
896 template <
typename...
Args>
902 if (
static_cast<Derived<T, Atom, Dynamic>*
>(
this)
903 ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
906 enqueueWithTicketBase(
907 ticket, slots, cap, stride, std::forward<Args>(args)...);
918 static_cast<Derived<T, Atom, Dynamic>*
>(
this)->blockingReadWithTicket(
924 assert(capacity_ != 0);
926 dequeueWithTicketBase(
ticket, slots_, capacity_, stride_, elem);
933 return readAndGetTicket(ticket, elem);
941 if (
static_cast<Derived<T, Atom, Dynamic>*
>(
this)->tryObtainReadyPopTicket(
942 ticket, slots, cap, stride)) {
944 dequeueWithTicketBase(
ticket, slots, cap, stride, elem);
951 template <
class Clock,
typename...
Args>
953 const std::chrono::time_point<Clock>&
when,
959 if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride,
when)) {
963 dequeueWithTicketBase(ticket, slots, cap, stride, elem);
980 if (
static_cast<Derived<T, Atom, Dynamic>*
>(
this)
981 ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
983 dequeueWithTicketBase(ticket, slots, cap, stride, elem);
994 kAdaptationFreq = 128,
1075 static const int smallPrimes[] = {2, 3, 5, 7, 11, 13, 17, 19, 23};
1079 for (
int stride : smallPrimes) {
1080 if ((
stride % capacity) == 0 || (capacity %
stride) == 0) {
1083 size_t sep = stride % capacity;
1084 sep =
std::min(sep, capacity - sep);
1085 if (sep > bestSep) {
1114 ticket = pushTicket_.load(std::memory_order_acquire);
1125 ticket = pushTicket_.load(std::memory_order_acquire);
1135 if (pushTicket_.compare_exchange_strong(
ticket,
ticket + 1)) {
1146 template <
class Clock>
1153 bool deadlineReached =
false;
1154 while (!deadlineReached) {
1155 if (
static_cast<Derived<T, Atom, Dynamic>*
>(
this)
1156 ->tryObtainPromisedPushTicket(
ticket, slots, cap,
stride)) {
1164 !slots[idx(
ticket, cap,
stride)].tryWaitForEnqueueTurnUntil(
1167 (
ticket % kAdaptationFreq) == 0,
1183 auto numPushes = pushTicket_.load(std::memory_order_acquire);
1189 const auto numPops = popTicket_.load(std::memory_order_acquire);
1192 if (n >= static_cast<ssize_t>(capacity_)) {
1198 if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
1212 ticket = popTicket_.load(std::memory_order_acquire);
1219 ticket = popTicket_.load(std::memory_order_acquire);
1224 if (popTicket_.compare_exchange_strong(
ticket,
ticket + 1)) {
1235 template <
class Clock>
1242 bool deadlineReached =
false;
1243 while (!deadlineReached) {
1244 if (
static_cast<Derived<T, Atom, Dynamic>*
>(
this)
1245 ->tryObtainPromisedPopTicket(
ticket, slots, cap,
stride)) {
1253 !slots[idx(
ticket, cap,
stride)].tryWaitForDequeueTurnUntil(
1256 (
ticket % kAdaptationFreq) == 0,
1277 auto numPops = popTicket_.load(std::memory_order_acquire);
1283 const auto numPushes = pushTicket_.load(std::memory_order_acquire);
1284 if (numPops >= numPushes) {
1290 if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
1297 template <
typename...
Args>
1304 slots[idx(ticket, cap, stride)].
enqueue(
1307 (ticket % kAdaptationFreq) == 0,
1308 std::forward<Args>(args)...);
1312 template <
typename...
Args>
1314 enqueueWithTicketBase(
1315 ticket, slots_, capacity_, stride_, std::forward<Args>(args)...);
1329 (
ticket % kAdaptationFreq) == 0,
1338 template <
typename T,
template <
typename>
class Atom>
1341 if ((sequencer_.uncompletedTurnLSB() & 1) == 1) {
1350 typename =
typename std::enable_if<
1354 Atom<uint32_t>& spinCutoff,
1355 const bool updateSpinCutoff,
1357 sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1358 new (&contents_)
T(std::forward<Args>(args)...);
1359 sequencer_.completeTurn(turn * 2);
1366 typename =
typename std::enable_if<
1372 Atom<uint32_t>& spinCutoff,
1373 const bool updateSpinCutoff,
1380 typename std::conditional<
1390 template <
class Clock>
1393 Atom<uint32_t>& spinCutoff,
1394 const bool updateSpinCutoff,
1396 return sequencer_.tryWaitForTurn(
1397 turn * 2, spinCutoff, updateSpinCutoff, &
when) !=
1402 return sequencer_.isTurn(turn * 2);
1407 Atom<uint32_t>& spinCutoff,
1408 const bool updateSpinCutoff,
1415 typename std::conditional<
1425 template <
class Clock>
1428 Atom<uint32_t>& spinCutoff,
1429 const bool updateSpinCutoff,
1431 return sequencer_.tryWaitForTurn(
1432 turn * 2 + 1, spinCutoff, updateSpinCutoff, &
when) !=
1437 return sequencer_.isTurn(turn * 2 + 1);
1448 return static_cast<T*
>(
static_cast<void*
>(&contents_));
1458 memset(&contents_,
'Q',
sizeof(T));
1469 Atom<uint32_t>& spinCutoff,
1470 const bool updateSpinCutoff,
1473 sequencer_.
waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1482 Atom<uint32_t>& spinCutoff,
1483 const bool updateSpinCutoff,
1486 sequencer_.
waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1487 memcpy(&contents_, &goner,
sizeof(T));
1496 Atom<uint32_t>& spinCutoff,
1497 const bool updateSpinCutoff,
1505 sequencer_.
waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
1506 memcpy(&elem, &contents_,
sizeof(T));
1513 Atom<uint32_t>& spinCutoff,
1514 const bool updateSpinCutoff,
1517 sequencer_.
waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
bool tryObtainPromisedPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
bool write(Args &&...args) noexcept
uint64_t getOffset(const uint64_t state) const noexcept
void completeTurn(const uint32_t turn) noexcept
Unblocks a thread running waitForTurn(turn + 1)
~SingleElementQueue() noexcept
void enqueue(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &&goner) noexcept
Atom< size_t > dcapacity_
Dynamic capacity.
void blockingWrite(Args &&...args) noexcept
uint64_t readCount() const noexcept
bool tryObtainReadyPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Tag classes for dispatching to enqueue/dequeue implementation.
void blockingReadWithTicket(uint64_t &ticket, T &elem) noexcept
Same as blockingRead() but also records the ticket nunmer.
MPMCQueue base CRTP template.
bool tryObtainPromisedPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
void enqueueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, Args &&...args) noexcept
constexpr detail::Map< Move > move
bool tryReadUntil(const std::chrono::time_point< Clock > &when, T &elem) noexcept
Atom< int > dstride_
Current stride.
void waitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff) noexcept
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
static int computeStride(size_t capacity) noexcept
void dequeueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, T &elem) noexcept
bool tryObtainPromisedPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Atom< Slot * > dslots_
Current dynamic slots array of dcapacity_ SingleElementQueue-s.
requires E e noexcept(noexcept(s.error(std::move(e))))
void blockingWrite(Args &&...args) noexcept
void blockingRead(T &elem) noexcept
void blockingReadWithTicket(uint64_t &ticket, T &elem) noexcept
FOLLY_PUSH_WARNING RHS rhs
static constexpr StringPiece ticket
TurnSequencer< Atom > sequencer_
Even turns are pushes, odd turns are pops.
void enqueueImpl(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &&goner, ImplByMove) noexcept
enqueue using nothrow move construction.
constexpr std::size_t hardware_destructive_interference_size
MPMCQueueBase(size_t queueCapacity)
constexpr auto size(C const &c) -> decltype(c.size())
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
void enqueueWithTicket(const uint64_t ticket, Args &&...args) noexcept
Enqueues an element with a specific ticket number.
void enqueueWithTicket(uint64_t ticket, Args &&...args) noexcept
MPMCQueue(MPMCQueue< T, Atom, true > &&rhs) noexcept
bool isEmpty() const noexcept
Returns true if there are no items available for dequeue.
std::aligned_storage< sizeof(T), alignof(T)>::type contents_
Storage for a T constructed with placement new.
void dequeueImpl(uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &elem, ImplByMove) noexcept
dequeue by nothrow move assignment.
bool mayDequeue(const uint32_t turn) const noexcept
size_t capacity() const noexcept
Doesn't change.
MPMCQueue(size_t queueCapacity)
bool tryWriteUntil(const std::chrono::time_point< Clock > &when, Args &&...args) noexcept
void enqueue(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, Args &&...args) noexcept
enqueue using in-place noexcept construction
MPMCQueueBase< Derived< T, Atom, Dynamic > > const & operator=(MPMCQueueBase< Derived< T, Atom, Dynamic >> &&rhs)
bool readAndGetTicket(uint64_t &ticket, T &elem) noexcept
Same as read() but also records the ticket nunmer.
MPMCQueue< T, Atom, true > const & operator=(MPMCQueue< T, Atom, true > &&rhs)
void enqueueImpl(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &&goner, ImplByRelocation) noexcept
static const char *const value
bool tryWaitForDequeueTurnUntil(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock > &when) noexcept
bool tryObtainReadyPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
bool tryObtainPromisedPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
bool readIfNotEmpty(T &elem) noexcept
Future< Unit > when(bool p, F &&thunk)
uint64_t writeCount() const noexcept
bool read(T &elem) noexcept
bool tryExpand(const uint64_t state, const size_t cap) noexcept
bool tryObtainPromisedPopTicketUntil(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride, const std::chrono::time_point< Clock > &when) noexcept
uint32_t turn(uint64_t ticket, size_t cap) noexcept
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept
bool tryObtainReadyPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
bool tryObtainPromisedPushTicketUntil(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride, const std::chrono::time_point< Clock > &when) noexcept
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
MPMCQueueBase(MPMCQueueBase< Derived< T, Atom, Dynamic >> &&rhs) noexcept
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
bool tryObtainReadyPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
void initQueue(const size_t cap, const size_t mult)
size_t allocatedCapacity() const noexcept
Doesn't change for non-dynamic.
void dequeue(uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &elem) noexcept
void dequeueImpl(uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &elem, ImplByRelocation) noexcept
bool writeIfNotFull(Args &&...args) noexcept
int getNumClosed(const uint64_t state) const noexcept
MPMCQueue(size_t queueCapacity, size_t minCapacity, size_t expansionMultiplier)
size_t allocatedCapacity() const noexcept
MPMCQueue(size_t queueCapacity)
detail::Stride stride(size_t s)
bool tryWaitForEnqueueTurnUntil(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock > &when) noexcept
bool isFull() const noexcept
Returns true if there is currently no empty space to enqueue.
bool mayEnqueue(const uint32_t turn) const noexcept
ssize_t size() const noexcept
void asm_volatile_pause()
ssize_t sizeGuess() const noexcept
void destroyContents() noexcept