proxygen
MPMCQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <cassert>
22 #include <cstring>
23 #include <limits>
24 #include <type_traits>
25 
26 #include <boost/noncopyable.hpp>
27 
28 #include <folly/Traits.h>
32 
33 namespace folly {
34 
35 namespace detail {
36 
37 template <typename T, template <typename> class Atom>
39 
40 template <typename T>
42 
44 template <typename>
46 
47 } // namespace detail
48 
102 template <
103  typename T,
104  template <typename> class Atom = std::atomic,
105  bool Dynamic = false>
106 class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>> {
109 
110  public:
111  explicit MPMCQueue(size_t queueCapacity)
112  : detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>>(queueCapacity) {
113  this->stride_ = this->computeStride(queueCapacity);
114  this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding];
115  }
116 
118 };
119 
175 template <typename T, template <typename> class Atom>
176 class MPMCQueue<T, Atom, true>
177  : public detail::MPMCQueueBase<MPMCQueue<T, Atom, true>> {
178  friend class detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>;
180 
181  struct ClosedArray {
183  Slot* slots_{nullptr};
184  size_t capacity_{0};
185  int stride_{0};
186  };
187 
188  public:
189  explicit MPMCQueue(size_t queueCapacity)
190  : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) {
191  size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
192  initQueue(cap, kDefaultExpansionMultiplier);
193  }
194 
195  explicit MPMCQueue(
196  size_t queueCapacity,
197  size_t minCapacity,
198  size_t expansionMultiplier)
199  : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) {
200  minCapacity = std::max<size_t>(1, minCapacity);
201  size_t cap = std::min<size_t>(minCapacity, queueCapacity);
202  expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
203  initQueue(cap, expansionMultiplier);
204  }
205 
207  dmult_ = 0;
208  closed_ = nullptr;
209  }
210 
212  this->capacity_ = rhs.capacity_;
213  new (&this->dslots_)
214  Atom<Slot*>(rhs.dslots_.load(std::memory_order_relaxed));
215  new (&this->dstride_)
216  Atom<int>(rhs.dstride_.load(std::memory_order_relaxed));
217  this->dstate_.store(
218  rhs.dstate_.load(std::memory_order_relaxed), std::memory_order_relaxed);
219  this->dcapacity_.store(
220  rhs.dcapacity_.load(std::memory_order_relaxed),
221  std::memory_order_relaxed);
222  this->pushTicket_.store(
223  rhs.pushTicket_.load(std::memory_order_relaxed),
224  std::memory_order_relaxed);
225  this->popTicket_.store(
226  rhs.popTicket_.load(std::memory_order_relaxed),
227  std::memory_order_relaxed);
228  this->pushSpinCutoff_.store(
229  rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
230  std::memory_order_relaxed);
231  this->popSpinCutoff_.store(
232  rhs.popSpinCutoff_.load(std::memory_order_relaxed),
233  std::memory_order_relaxed);
234  dmult_ = rhs.dmult_;
235  closed_ = rhs.closed_;
236 
237  rhs.capacity_ = 0;
238  rhs.dslots_.store(nullptr, std::memory_order_relaxed);
239  rhs.dstride_.store(0, std::memory_order_relaxed);
240  rhs.dstate_.store(0, std::memory_order_relaxed);
241  rhs.dcapacity_.store(0, std::memory_order_relaxed);
242  rhs.pushTicket_.store(0, std::memory_order_relaxed);
243  rhs.popTicket_.store(0, std::memory_order_relaxed);
244  rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
245  rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
246  rhs.dmult_ = 0;
247  rhs.closed_ = nullptr;
248  }
249 
251  if (this != &rhs) {
252  this->~MPMCQueue();
253  new (this) MPMCQueue(std::move(rhs));
254  }
255  return *this;
256  }
257 
259  if (closed_ != nullptr) {
260  for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
261  delete[] closed_[i].slots_;
262  }
263  delete[] closed_;
264  }
265  using AtomInt = Atom<int>;
266  this->dstride_.~AtomInt();
267  using AtomSlot = Atom<Slot*>;
268  // Sort of a hack to get ~MPMCQueueBase to free dslots_
269  auto slots = this->dslots_.load();
270  this->dslots_.~AtomSlot();
271  this->slots_ = slots;
272  }
273 
275  return this->dcapacity_.load(std::memory_order_relaxed);
276  }
277 
278  template <typename... Args>
279  void blockingWrite(Args&&... args) noexcept {
280  uint64_t ticket = this->pushTicket_++;
281  Slot* slots;
282  size_t cap;
283  int stride;
284  uint64_t state;
285  uint64_t offset;
286  do {
287  if (!trySeqlockReadSection(state, slots, cap, stride)) {
289  continue;
290  }
291  if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
292  // There was an expansion after this ticket was issued.
293  break;
294  }
295  if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
296  this->turn(ticket - offset, cap))) {
297  // A slot is ready. No need to expand.
298  break;
299  } else if (
300  this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
301  // May block, but a pop is in progress. No need to expand.
302  // Get seqlock read section info again in case an expansion
303  // occurred with an equal or higher ticket.
304  continue;
305  } else {
306  // May block. See if we can expand.
307  if (tryExpand(state, cap)) {
308  // This or another thread started an expansion. Get updated info.
309  continue;
310  } else {
311  // Can't expand.
312  break;
313  }
314  }
315  } while (true);
316  this->enqueueWithTicketBase(
317  ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
318  }
319 
321  ticket = this->popTicket_++;
322  Slot* slots;
323  size_t cap;
324  int stride;
325  uint64_t state;
326  uint64_t offset;
327  while (!trySeqlockReadSection(state, slots, cap, stride)) {
329  }
330  // If there was an expansion after the corresponding push ticket
331  // was issued, adjust accordingly
332  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
333  this->dequeueWithTicketBase(ticket - offset, slots, cap, stride, elem);
334  }
335 
336  private:
337  enum {
338  kSeqlockBits = 6,
339  kDefaultMinDynamicCapacity = 10,
340  kDefaultExpansionMultiplier = 10,
341  };
342 
343  size_t dmult_;
344 
345  // Info about closed slots arrays for use by lagging operations
346  ClosedArray* closed_;
347 
348  void initQueue(const size_t cap, const size_t mult) {
349  new (&this->dstride_) Atom<int>(this->computeStride(cap));
350  Slot* slots = new Slot[cap + 2 * this->kSlotPadding];
351  new (&this->dslots_) Atom<Slot*>(slots);
352  this->dstate_.store(0);
353  this->dcapacity_.store(cap);
354  dmult_ = mult;
355  size_t maxClosed = 0;
356  for (size_t expanded = cap; expanded < this->capacity_; expanded *= mult) {
357  ++maxClosed;
358  }
359  closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
360  }
361 
363  uint64_t& ticket,
364  Slot*& slots,
365  size_t& cap,
366  int& stride) noexcept {
367  uint64_t state;
368  do {
369  ticket = this->pushTicket_.load(std::memory_order_acquire); // A
370  if (!trySeqlockReadSection(state, slots, cap, stride)) {
372  continue;
373  }
374 
375  // If there was an expansion with offset greater than this ticket,
376  // adjust accordingly
377  uint64_t offset;
378  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
379 
380  if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
381  this->turn(ticket - offset, cap))) {
382  // A slot is ready.
383  if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
384  // Adjust ticket
385  ticket -= offset;
386  return true;
387  } else {
388  continue;
389  }
390  } else {
391  if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
392  // Try again. Ticket changed.
393  continue;
394  }
395  // Likely to block.
396  // Try to expand unless the ticket is for a closed array
397  if (offset == getOffset(state)) {
398  if (tryExpand(state, cap)) {
399  // This or another thread started an expansion. Get up-to-date info.
400  continue;
401  }
402  }
403  return false;
404  }
405  } while (true);
406  }
407 
409  uint64_t& ticket,
410  Slot*& slots,
411  size_t& cap,
412  int& stride) noexcept {
413  uint64_t state;
414  do {
415  ticket = this->pushTicket_.load(std::memory_order_acquire);
416  auto numPops = this->popTicket_.load(std::memory_order_acquire);
417  if (!trySeqlockReadSection(state, slots, cap, stride)) {
419  continue;
420  }
421 
422  const auto curCap = cap;
423  // If there was an expansion with offset greater than this ticket,
424  // adjust accordingly
425  uint64_t offset;
426  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
427 
428  int64_t n = ticket - numPops;
429 
430  if (n >= static_cast<ssize_t>(cap)) {
431  if ((cap == curCap) && tryExpand(state, cap)) {
432  // This or another thread started an expansion. Start over.
433  continue;
434  }
435  // Can't expand.
436  ticket -= offset;
437  return false;
438  }
439 
440  if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
441  // Adjust ticket
442  ticket -= offset;
443  return true;
444  }
445  } while (true);
446  }
447 
449  uint64_t& ticket,
450  Slot*& slots,
451  size_t& cap,
452  int& stride) noexcept {
453  uint64_t state;
454  do {
455  ticket = this->popTicket_.load(std::memory_order_relaxed);
456  if (!trySeqlockReadSection(state, slots, cap, stride)) {
458  continue;
459  }
460 
461  // If there was an expansion after the corresponding push ticket
462  // was issued, adjust accordingly
463  uint64_t offset;
464  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
465 
466  if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
467  this->turn(ticket - offset, cap))) {
468  if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
469  // Adjust ticket
470  ticket -= offset;
471  return true;
472  }
473  } else {
474  return false;
475  }
476  } while (true);
477  }
478 
480  uint64_t& ticket,
481  Slot*& slots,
482  size_t& cap,
483  int& stride) noexcept {
484  uint64_t state;
485  do {
486  ticket = this->popTicket_.load(std::memory_order_acquire);
487  auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
488  if (!trySeqlockReadSection(state, slots, cap, stride)) {
490  continue;
491  }
492 
493  uint64_t offset;
494  // If there was an expansion after the corresponding push
495  // ticket was issued, adjust accordingly
496  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
497 
498  if (ticket >= numPushes) {
499  ticket -= offset;
500  return false;
501  }
502  if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
503  ticket -= offset;
504  return true;
505  }
506  } while (true);
507  }
508 
510  template <typename... Args>
511  void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept {
512  Slot* slots;
513  size_t cap;
514  int stride;
515  uint64_t state;
516  uint64_t offset;
517 
518  while (!trySeqlockReadSection(state, slots, cap, stride)) {
519  }
520 
521  // If there was an expansion after this ticket was issued, adjust
522  // accordingly
523  maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
524 
525  this->enqueueWithTicketBase(
526  ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
527  }
528 
530  return state >> kSeqlockBits;
531  }
532 
533  int getNumClosed(const uint64_t state) const noexcept {
534  return (state & ((1 << kSeqlockBits) - 1)) >> 1;
535  }
536 
541  bool tryExpand(const uint64_t state, const size_t cap) noexcept {
542  if (cap == this->capacity_) {
543  return false;
544  }
545  // Acquire seqlock
546  uint64_t oldval = state;
547  assert((state & 1) == 0);
548  if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
549  assert(cap == this->dcapacity_.load());
550  uint64_t ticket =
551  1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
552  size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
553  Slot* newSlots =
554  new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
555  if (newSlots == nullptr) {
556  // Expansion failed. Restore the seqlock
557  this->dstate_.store(state);
558  return false;
559  }
560  // Successful expansion
561  // calculate the current ticket offset
562  uint64_t offset = getOffset(state);
563  // calculate index in closed array
564  int index = getNumClosed(state);
565  assert((index << 1) < (1 << kSeqlockBits));
566  // fill the info for the closed slots array
567  closed_[index].offset_ = offset;
568  closed_[index].slots_ = this->dslots_.load();
569  closed_[index].capacity_ = cap;
570  closed_[index].stride_ = this->dstride_.load();
571  // update the new slots array info
572  this->dslots_.store(newSlots);
573  this->dcapacity_.store(newCapacity);
574  this->dstride_.store(this->computeStride(newCapacity));
575  // Release the seqlock and record the new ticket offset
576  this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
577  return true;
578  } else { // failed to acquire seqlock
579  // Someone acaquired the seqlock. Go back to the caller and get
580  // up-to-date info.
581  return true;
582  }
583  }
584 
587  uint64_t& state,
588  Slot*& slots,
589  size_t& cap,
590  int& stride) noexcept {
591  state = this->dstate_.load(std::memory_order_acquire);
592  if (state & 1) {
593  // Locked.
594  return false;
595  }
596  // Start read-only section.
597  slots = this->dslots_.load(std::memory_order_relaxed);
598  cap = this->dcapacity_.load(std::memory_order_relaxed);
599  stride = this->dstride_.load(std::memory_order_relaxed);
600  // End of read-only section. Validate seqlock.
601  std::atomic_thread_fence(std::memory_order_acquire);
602  return (state == this->dstate_.load(std::memory_order_relaxed));
603  }
604 
609  const uint64_t state,
610  const uint64_t ticket,
611  uint64_t& offset,
612  Slot*& slots,
613  size_t& cap,
614  int& stride) noexcept {
615  offset = getOffset(state);
616  if (ticket >= offset) {
617  return false;
618  }
619  for (int i = getNumClosed(state) - 1; i >= 0; --i) {
620  offset = closed_[i].offset_;
621  if (offset <= ticket) {
622  slots = closed_[i].slots_;
623  cap = closed_[i].capacity_;
624  stride = closed_[i].stride_;
625  return true;
626  }
627  }
628  // A closed array with offset <= ticket should have been found
629  assert(false);
630  return false;
631  }
632 };
633 
634 namespace detail {
635 
637 template <
638  template <typename T, template <typename> class Atom, bool Dynamic>
639  class Derived,
640  typename T,
641  template <typename> class Atom,
642  bool Dynamic>
643 class MPMCQueueBase<Derived<T, Atom, Dynamic>> : boost::noncopyable {
644  // Note: Using CRTP static casts in several functions of this base
645  // template instead of making called functions virtual or duplicating
646  // the code of calling functions in the derived partially specialized
647  // template
648 
649  static_assert(
652  "T must be relocatable or have a noexcept move constructor");
653 
654  public:
655  typedef T value_type;
656 
658 
659  explicit MPMCQueueBase(size_t queueCapacity)
660  : capacity_(queueCapacity),
661  dstate_(0),
662  dcapacity_(0),
663  pushTicket_(0),
664  popTicket_(0),
665  pushSpinCutoff_(0),
666  popSpinCutoff_(0) {
667  if (queueCapacity == 0) {
668  throw std::invalid_argument(
669  "MPMCQueue with explicit capacity 0 is impossible"
670  // Stride computation in derived classes would sigfpe if capacity is 0
671  );
672  }
673 
674  // ideally this would be a static assert, but g++ doesn't allow it
675  assert(
677  assert(
678  static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
679  static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
680  static_cast<ptrdiff_t>(hardware_destructive_interference_size));
681  }
682 
686  : capacity_(0),
687  slots_(nullptr),
688  stride_(0),
689  dstate_(0),
690  dcapacity_(0),
691  pushTicket_(0),
692  popTicket_(0),
693  pushSpinCutoff_(0),
694  popSpinCutoff_(0) {}
695 
699  MPMCQueueBase(MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) noexcept
700  : capacity_(rhs.capacity_),
701  slots_(rhs.slots_),
702  stride_(rhs.stride_),
703  dstate_(rhs.dstate_.load(std::memory_order_relaxed)),
704  dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed)),
705  pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed)),
706  popTicket_(rhs.popTicket_.load(std::memory_order_relaxed)),
707  pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed)),
708  popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed)) {
709  // relaxed ops are okay for the previous reads, since rhs queue can't
710  // be in concurrent use
711 
712  // zero out rhs
713  rhs.capacity_ = 0;
714  rhs.slots_ = nullptr;
715  rhs.stride_ = 0;
716  rhs.dstate_.store(0, std::memory_order_relaxed);
717  rhs.dcapacity_.store(0, std::memory_order_relaxed);
718  rhs.pushTicket_.store(0, std::memory_order_relaxed);
719  rhs.popTicket_.store(0, std::memory_order_relaxed);
720  rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
721  rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
722  }
723 
728  MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) {
729  if (this != &rhs) {
730  this->~MPMCQueueBase();
731  new (this) MPMCQueueBase(std::move(rhs));
732  }
733  return *this;
734  }
735 
739  delete[] slots_;
740  }
741 
751  ssize_t size() const noexcept {
752  // since both pushes and pops increase monotonically, we can get a
753  // consistent snapshot either by bracketing a read of popTicket_ with
754  // two reads of pushTicket_ that return the same value, or the other
755  // way around. We maximize our chances by alternately attempting
756  // both bracketings.
757  uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A
758  uint64_t pops = popTicket_.load(std::memory_order_acquire); // B
759  while (true) {
760  uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C
761  if (pushes == nextPushes) {
762  // pushTicket_ didn't change from A (or the previous C) to C,
763  // so we can linearize at B (or D)
764  return ssize_t(pushes - pops);
765  }
766  pushes = nextPushes;
767  uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
768  if (pops == nextPops) {
769  // popTicket_ didn't chance from B (or the previous D), so we
770  // can linearize at C
771  return ssize_t(pushes - pops);
772  }
773  pops = nextPops;
774  }
775  }
776 
779  return size() <= 0;
780  }
781 
784  // careful with signed -> unsigned promotion, since size can be negative
785  return size() >= static_cast<ssize_t>(capacity_);
786  }
787 
795  return writeCount() - readCount();
796  }
797 
800  return capacity_;
801  }
802 
805  return capacity_;
806  }
807 
812  return pushTicket_.load(std::memory_order_acquire);
813  }
814 
819  return popTicket_.load(std::memory_order_acquire);
820  }
821 
827  template <typename... Args>
828  void blockingWrite(Args&&... args) noexcept {
829  enqueueWithTicketBase(
830  pushTicket_++, slots_, capacity_, stride_, std::forward<Args>(args)...);
831  }
832 
846  template <typename... Args>
847  bool write(Args&&... args) noexcept {
849  Slot* slots;
850  size_t cap;
851  int stride;
852  if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPushTicket(
853  ticket, slots, cap, stride)) {
854  // we have pre-validated that the ticket won't block
855  enqueueWithTicketBase(
856  ticket, slots, cap, stride, std::forward<Args>(args)...);
857  return true;
858  } else {
859  return false;
860  }
861  }
862 
863  template <class Clock, typename... Args>
865  const std::chrono::time_point<Clock>& when,
866  Args&&... args) noexcept {
868  Slot* slots;
869  size_t cap;
870  int stride;
871  if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
872  // we have pre-validated that the ticket won't block, or rather that
873  // it won't block longer than it takes another thread to dequeue an
874  // element from the slot it identifies.
875  enqueueWithTicketBase(
876  ticket, slots, cap, stride, std::forward<Args>(args)...);
877  return true;
878  } else {
879  return false;
880  }
881  }
882 
896  template <typename... Args>
897  bool writeIfNotFull(Args&&... args) noexcept {
899  Slot* slots;
900  size_t cap;
901  int stride;
902  if (static_cast<Derived<T, Atom, Dynamic>*>(this)
903  ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
904  // some other thread is already dequeuing the slot into which we
905  // are going to enqueue, but we might have to wait for them to finish
906  enqueueWithTicketBase(
907  ticket, slots, cap, stride, std::forward<Args>(args)...);
908  return true;
909  } else {
910  return false;
911  }
912  }
913 
916  void blockingRead(T& elem) noexcept {
918  static_cast<Derived<T, Atom, Dynamic>*>(this)->blockingReadWithTicket(
919  ticket, elem);
920  }
921 
924  assert(capacity_ != 0);
925  ticket = popTicket_++;
926  dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem);
927  }
928 
931  bool read(T& elem) noexcept {
933  return readAndGetTicket(ticket, elem);
934  }
935 
938  Slot* slots;
939  size_t cap;
940  int stride;
941  if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPopTicket(
942  ticket, slots, cap, stride)) {
943  // the ticket has been pre-validated to not block
944  dequeueWithTicketBase(ticket, slots, cap, stride, elem);
945  return true;
946  } else {
947  return false;
948  }
949  }
950 
951  template <class Clock, typename... Args>
953  const std::chrono::time_point<Clock>& when,
954  T& elem) noexcept {
956  Slot* slots;
957  size_t cap;
958  int stride;
959  if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride, when)) {
960  // we have pre-validated that the ticket won't block, or rather that
961  // it won't block longer than it takes another thread to enqueue an
962  // element on the slot it identifies.
963  dequeueWithTicketBase(ticket, slots, cap, stride, elem);
964  return true;
965  } else {
966  return false;
967  }
968  }
969 
975  bool readIfNotEmpty(T& elem) noexcept {
977  Slot* slots;
978  size_t cap;
979  int stride;
980  if (static_cast<Derived<T, Atom, Dynamic>*>(this)
981  ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
982  // the matching enqueue already has a ticket, but might not be done
983  dequeueWithTicketBase(ticket, slots, cap, stride, elem);
984  return true;
985  } else {
986  return false;
987  }
988  }
989 
990  protected:
991  enum {
994  kAdaptationFreq = 128,
995 
999  kSlotPadding =
1000  (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1
1001  };
1002 
1004  alignas(hardware_destructive_interference_size) size_t capacity_;
1005 
1007  union {
1013  Atom<Slot*> dslots_;
1014  };
1015 
1017  union {
1021  int stride_;
1023  Atom<int> dstride_;
1024  };
1025 
1032  Atom<uint64_t> dstate_;
1034  Atom<size_t> dcapacity_;
1035 
1037  alignas(hardware_destructive_interference_size) Atom<uint64_t> pushTicket_;
1038 
1040  alignas(hardware_destructive_interference_size) Atom<uint64_t> popTicket_;
1041 
1045  alignas(
1046  hardware_destructive_interference_size) Atom<uint32_t> pushSpinCutoff_;
1047 
1049  alignas(hardware_destructive_interference_size) Atom<uint32_t> popSpinCutoff_;
1050 
1053  char pad_[hardware_destructive_interference_size - sizeof(Atom<uint32_t>)];
1054 
1074  static int computeStride(size_t capacity) noexcept {
1075  static const int smallPrimes[] = {2, 3, 5, 7, 11, 13, 17, 19, 23};
1076 
1077  int bestStride = 1;
1078  size_t bestSep = 1;
1079  for (int stride : smallPrimes) {
1080  if ((stride % capacity) == 0 || (capacity % stride) == 0) {
1081  continue;
1082  }
1083  size_t sep = stride % capacity;
1084  sep = std::min(sep, capacity - sep);
1085  if (sep > bestSep) {
1086  bestStride = stride;
1087  bestSep = sep;
1088  }
1089  }
1090  return bestStride;
1091  }
1092 
1095  size_t idx(uint64_t ticket, size_t cap, int stride) noexcept {
1096  return ((ticket * stride) % cap) + kSlotPadding;
1097  }
1098 
1102  assert(cap != 0);
1103  return uint32_t(ticket / cap);
1104  }
1105 
1110  uint64_t& ticket,
1111  Slot*& slots,
1112  size_t& cap,
1113  int& stride) noexcept {
1114  ticket = pushTicket_.load(std::memory_order_acquire); // A
1115  slots = slots_;
1116  cap = capacity_;
1117  stride = stride_;
1118  while (true) {
1119  if (!slots[idx(ticket, cap, stride)].mayEnqueue(turn(ticket, cap))) {
1120  // if we call enqueue(ticket, ...) on the SingleElementQueue
1121  // right now it would block, but this might no longer be the next
1122  // ticket. We can increase the chance of tryEnqueue success under
1123  // contention (without blocking) by rechecking the ticket dispenser
1124  auto prev = ticket;
1125  ticket = pushTicket_.load(std::memory_order_acquire); // B
1126  if (prev == ticket) {
1127  // mayEnqueue was bracketed by two reads (A or prev B or prev
1128  // failing CAS to B), so we are definitely unable to enqueue
1129  return false;
1130  }
1131  } else {
1132  // we will bracket the mayEnqueue check with a read (A or prev B
1133  // or prev failing CAS) and the following CAS. If the CAS fails
1134  // it will effect a load of pushTicket_
1135  if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
1136  return true;
1137  }
1138  }
1139  }
1140  }
1141 
1146  template <class Clock>
1148  uint64_t& ticket,
1149  Slot*& slots,
1150  size_t& cap,
1151  int& stride,
1152  const std::chrono::time_point<Clock>& when) noexcept {
1153  bool deadlineReached = false;
1154  while (!deadlineReached) {
1155  if (static_cast<Derived<T, Atom, Dynamic>*>(this)
1156  ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
1157  return true;
1158  }
1159  // ticket is a blocking ticket until the preceding ticket has been
1160  // processed: wait until this ticket's turn arrives. We have not reserved
1161  // this ticket so we will have to re-attempt to get a non-blocking ticket
1162  // if we wake up before we time-out.
1163  deadlineReached =
1164  !slots[idx(ticket, cap, stride)].tryWaitForEnqueueTurnUntil(
1165  turn(ticket, cap),
1166  pushSpinCutoff_,
1167  (ticket % kAdaptationFreq) == 0,
1168  when);
1169  }
1170  return false;
1171  }
1172 
1179  uint64_t& ticket,
1180  Slot*& slots,
1181  size_t& cap,
1182  int& stride) noexcept {
1183  auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
1184  slots = slots_;
1185  cap = capacity_;
1186  stride = stride_;
1187  while (true) {
1188  ticket = numPushes;
1189  const auto numPops = popTicket_.load(std::memory_order_acquire); // B
1190  // n will be negative if pops are pending
1191  const int64_t n = int64_t(numPushes - numPops);
1192  if (n >= static_cast<ssize_t>(capacity_)) {
1193  // Full, linearize at B. We don't need to recheck the read we
1194  // performed at A, because if numPushes was stale at B then the
1195  // real numPushes value is even worse
1196  return false;
1197  }
1198  if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
1199  return true;
1200  }
1201  }
1202  }
1203 
1208  uint64_t& ticket,
1209  Slot*& slots,
1210  size_t& cap,
1211  int& stride) noexcept {
1212  ticket = popTicket_.load(std::memory_order_acquire);
1213  slots = slots_;
1214  cap = capacity_;
1215  stride = stride_;
1216  while (true) {
1217  if (!slots[idx(ticket, cap, stride)].mayDequeue(turn(ticket, cap))) {
1218  auto prev = ticket;
1219  ticket = popTicket_.load(std::memory_order_acquire);
1220  if (prev == ticket) {
1221  return false;
1222  }
1223  } else {
1224  if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
1225  return true;
1226  }
1227  }
1228  }
1229  }
1230 
1235  template <class Clock>
1237  uint64_t& ticket,
1238  Slot*& slots,
1239  size_t& cap,
1240  int& stride,
1241  const std::chrono::time_point<Clock>& when) noexcept {
1242  bool deadlineReached = false;
1243  while (!deadlineReached) {
1244  if (static_cast<Derived<T, Atom, Dynamic>*>(this)
1245  ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
1246  return true;
1247  }
1248  // ticket is a blocking ticket until the preceding ticket has been
1249  // processed: wait until this ticket's turn arrives. We have not reserved
1250  // this ticket so we will have to re-attempt to get a non-blocking ticket
1251  // if we wake up before we time-out.
1252  deadlineReached =
1253  !slots[idx(ticket, cap, stride)].tryWaitForDequeueTurnUntil(
1254  turn(ticket, cap),
1255  pushSpinCutoff_,
1256  (ticket % kAdaptationFreq) == 0,
1257  when);
1258  }
1259  return false;
1260  }
1261 
1273  uint64_t& ticket,
1274  Slot*& slots,
1275  size_t& cap,
1276  int& stride) noexcept {
1277  auto numPops = popTicket_.load(std::memory_order_acquire); // A
1278  slots = slots_;
1279  cap = capacity_;
1280  stride = stride_;
1281  while (true) {
1282  ticket = numPops;
1283  const auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
1284  if (numPops >= numPushes) {
1285  // Empty, or empty with pending pops. Linearize at B. We don't
1286  // need to recheck the read we performed at A, because if numPops
1287  // is stale then the fresh value is larger and the >= is still true
1288  return false;
1289  }
1290  if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
1291  return true;
1292  }
1293  }
1294  }
1295 
1296  // Given a ticket, constructs an enqueued item using args
1297  template <typename... Args>
1299  uint64_t ticket,
1300  Slot* slots,
1301  size_t cap,
1302  int stride,
1303  Args&&... args) noexcept {
1304  slots[idx(ticket, cap, stride)].enqueue(
1305  turn(ticket, cap),
1306  pushSpinCutoff_,
1307  (ticket % kAdaptationFreq) == 0,
1308  std::forward<Args>(args)...);
1309  }
1310 
1311  // To support tracking ticket numbers in MPMCPipelineStageImpl
1312  template <typename... Args>
1314  enqueueWithTicketBase(
1315  ticket, slots_, capacity_, stride_, std::forward<Args>(args)...);
1316  }
1317 
1318  // Given a ticket, dequeues the corresponding element
1320  uint64_t ticket,
1321  Slot* slots,
1322  size_t cap,
1323  int stride,
1324  T& elem) noexcept {
1325  assert(cap != 0);
1326  slots[idx(ticket, cap, stride)].dequeue(
1327  turn(ticket, cap),
1328  popSpinCutoff_,
1329  (ticket % kAdaptationFreq) == 0,
1330  elem);
1331  }
1332 };
1333 
1338 template <typename T, template <typename> class Atom>
1339 struct SingleElementQueue {
1341  if ((sequencer_.uncompletedTurnLSB() & 1) == 1) {
1342  // we are pending a dequeue, so we have a constructed item
1343  destroyContents();
1344  }
1345  }
1346 
1348  template <
1349  typename... Args,
1350  typename = typename std::enable_if<
1351  std::is_nothrow_constructible<T, Args...>::value>::type>
1352  void enqueue(
1353  const uint32_t turn,
1354  Atom<uint32_t>& spinCutoff,
1355  const bool updateSpinCutoff,
1356  Args&&... args) noexcept {
1357  sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1358  new (&contents_) T(std::forward<Args>(args)...);
1359  sequencer_.completeTurn(turn * 2);
1360  }
1361 
1365  template <
1366  typename = typename std::enable_if<
1370  void enqueue(
1371  const uint32_t turn,
1372  Atom<uint32_t>& spinCutoff,
1373  const bool updateSpinCutoff,
1374  T&& goner) noexcept {
1375  enqueueImpl(
1376  turn,
1377  spinCutoff,
1378  updateSpinCutoff,
1379  std::move(goner),
1380  typename std::conditional<
1382  ImplByMove,
1383  ImplByRelocation>::type());
1384  }
1385 
1390  template <class Clock>
1392  const uint32_t turn,
1393  Atom<uint32_t>& spinCutoff,
1394  const bool updateSpinCutoff,
1395  const std::chrono::time_point<Clock>& when) noexcept {
1396  return sequencer_.tryWaitForTurn(
1397  turn * 2, spinCutoff, updateSpinCutoff, &when) !=
1399  }
1400 
1401  bool mayEnqueue(const uint32_t turn) const noexcept {
1402  return sequencer_.isTurn(turn * 2);
1403  }
1404 
1405  void dequeue(
1406  uint32_t turn,
1407  Atom<uint32_t>& spinCutoff,
1408  const bool updateSpinCutoff,
1409  T& elem) noexcept {
1410  dequeueImpl(
1411  turn,
1412  spinCutoff,
1413  updateSpinCutoff,
1414  elem,
1415  typename std::conditional<
1418  ImplByMove>::type());
1419  }
1420 
1425  template <class Clock>
1427  const uint32_t turn,
1428  Atom<uint32_t>& spinCutoff,
1429  const bool updateSpinCutoff,
1430  const std::chrono::time_point<Clock>& when) noexcept {
1431  return sequencer_.tryWaitForTurn(
1432  turn * 2 + 1, spinCutoff, updateSpinCutoff, &when) !=
1434  }
1435 
1436  bool mayDequeue(const uint32_t turn) const noexcept {
1437  return sequencer_.isTurn(turn * 2 + 1);
1438  }
1439 
1440  private:
1443 
1446 
1447  T* ptr() noexcept {
1448  return static_cast<T*>(static_cast<void*>(&contents_));
1449  }
1450 
1452  try {
1453  ptr()->~T();
1454  } catch (...) {
1455  // g++ doesn't seem to have std::is_nothrow_destructible yet
1456  }
1457 #ifndef NDEBUG
1458  memset(&contents_, 'Q', sizeof(T));
1459 #endif
1460  }
1461 
1463  struct ImplByRelocation {};
1464  struct ImplByMove {};
1465 
1468  const uint32_t turn,
1469  Atom<uint32_t>& spinCutoff,
1470  const bool updateSpinCutoff,
1471  T&& goner,
1472  ImplByMove) noexcept {
1473  sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1474  new (&contents_) T(std::move(goner));
1475  sequencer_.completeTurn(turn * 2);
1476  }
1477 
1481  const uint32_t turn,
1482  Atom<uint32_t>& spinCutoff,
1483  const bool updateSpinCutoff,
1484  T&& goner,
1486  sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1487  memcpy(&contents_, &goner, sizeof(T));
1488  sequencer_.completeTurn(turn * 2);
1489  new (&goner) T();
1490  }
1491 
1495  uint32_t turn,
1496  Atom<uint32_t>& spinCutoff,
1497  const bool updateSpinCutoff,
1498  T& elem,
1500  try {
1501  elem.~T();
1502  } catch (...) {
1503  // unlikely, but if we don't complete our turn the queue will die
1504  }
1505  sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
1506  memcpy(&elem, &contents_, sizeof(T));
1507  sequencer_.completeTurn(turn * 2 + 1);
1508  }
1509 
1512  uint32_t turn,
1513  Atom<uint32_t>& spinCutoff,
1514  const bool updateSpinCutoff,
1515  T& elem,
1516  ImplByMove) noexcept {
1517  sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
1518  elem = std::move(*ptr());
1519  destroyContents();
1520  sequencer_.completeTurn(turn * 2 + 1);
1521  }
1522 };
1523 
1524 } // namespace detail
1525 
1526 } // namespace folly
void * ptr
bool tryObtainPromisedPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1178
uint64_t getOffset(const uint64_t state) const noexcept
Definition: MPMCQueue.h:529
void completeTurn(const uint32_t turn) noexcept
Unblocks a thread running waitForTurn(turn + 1)
void enqueue(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &&goner) noexcept
Definition: MPMCQueue.h:1370
bool tryObtainReadyPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1109
LogLevel max
Definition: LogLevel.cpp:31
Tag classes for dispatching to enqueue/dequeue implementation.
Definition: MPMCQueue.h:1463
void blockingReadWithTicket(uint64_t &ticket, T &elem) noexcept
Same as blockingRead() but also records the ticket nunmer.
Definition: MPMCQueue.h:923
MPMCQueue base CRTP template.
Definition: MPMCQueue.h:45
bool tryObtainPromisedPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:479
PskType type
void enqueueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, Args &&...args) noexcept
Definition: MPMCQueue.h:1298
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
bool tryReadUntil(const std::chrono::time_point< Clock > &when, T &elem) noexcept
Definition: MPMCQueue.h:952
STL namespace.
void waitForTurn(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff) noexcept
Definition: TurnSequencer.h:86
folly::std T
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
static int computeStride(size_t capacity) noexcept
Definition: MPMCQueue.h:1074
void dequeueWithTicketBase(uint64_t ticket, Slot *slots, size_t cap, int stride, T &elem) noexcept
Definition: MPMCQueue.h:1319
bool tryObtainPromisedPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:408
Atom< Slot * > dslots_
Current dynamic slots array of dcapacity_ SingleElementQueue-s.
Definition: MPMCQueue.h:1013
requires E e noexcept(noexcept(s.error(std::move(e))))
MPMCQueue() noexcept
Definition: MPMCQueue.h:117
def load()
Definition: deadlock.py:441
void blockingWrite(Args &&...args) noexcept
Definition: MPMCQueue.h:279
void blockingReadWithTicket(uint64_t &ticket, T &elem) noexcept
Definition: MPMCQueue.h:320
FOLLY_PUSH_WARNING RHS rhs
Definition: Traits.h:649
static constexpr StringPiece ticket
TurnSequencer< Atom > sequencer_
Even turns are pushes, odd turns are pops.
Definition: MPMCQueue.h:1445
void enqueueImpl(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &&goner, ImplByMove) noexcept
enqueue using nothrow move construction.
Definition: MPMCQueue.h:1467
constexpr std::size_t hardware_destructive_interference_size
Definition: Align.h:107
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
bool trySeqlockReadSection(uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept
Seqlock read-only section.
Definition: MPMCQueue.h:586
void enqueueWithTicket(const uint64_t ticket, Args &&...args) noexcept
Enqueues an element with a specific ticket number.
Definition: MPMCQueue.h:511
void enqueueWithTicket(uint64_t ticket, Args &&...args) noexcept
Definition: MPMCQueue.h:1313
MPMCQueue(MPMCQueue< T, Atom, true > &&rhs) noexcept
Definition: MPMCQueue.h:211
LogLevel min
Definition: LogLevel.cpp:30
bool isEmpty() const noexcept
Returns true if there are no items available for dequeue.
Definition: MPMCQueue.h:778
std::aligned_storage< sizeof(T), alignof(T)>::type contents_
Storage for a T constructed with placement new.
Definition: MPMCQueue.h:1442
void dequeueImpl(uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &elem, ImplByMove) noexcept
dequeue by nothrow move assignment.
Definition: MPMCQueue.h:1511
#define Atom
bool mayDequeue(const uint32_t turn) const noexcept
Definition: MPMCQueue.h:1436
size_t capacity() const noexcept
Doesn&#39;t change.
Definition: MPMCQueue.h:799
MPMCQueue(size_t queueCapacity)
Definition: MPMCQueue.h:111
bool tryWriteUntil(const std::chrono::time_point< Clock > &when, Args &&...args) noexcept
Definition: MPMCQueue.h:864
void enqueue(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, Args &&...args) noexcept
enqueue using in-place noexcept construction
Definition: MPMCQueue.h:1352
MPMCQueueBase< Derived< T, Atom, Dynamic > > const & operator=(MPMCQueueBase< Derived< T, Atom, Dynamic >> &&rhs)
Definition: MPMCQueue.h:727
bool readAndGetTicket(uint64_t &ticket, T &elem) noexcept
Same as read() but also records the ticket nunmer.
Definition: MPMCQueue.h:937
MPMCQueue< T, Atom, true > const & operator=(MPMCQueue< T, Atom, true > &&rhs)
Definition: MPMCQueue.h:250
void enqueueImpl(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &&goner, ImplByRelocation) noexcept
Definition: MPMCQueue.h:1480
static const char *const value
Definition: Conv.cpp:50
bool tryWaitForDequeueTurnUntil(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock > &when) noexcept
Definition: MPMCQueue.h:1426
bool tryObtainReadyPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1207
bool tryObtainPromisedPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:1272
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
bool tryExpand(const uint64_t state, const size_t cap) noexcept
Definition: MPMCQueue.h:541
bool tryObtainPromisedPopTicketUntil(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride, const std::chrono::time_point< Clock > &when) noexcept
Definition: MPMCQueue.h:1236
off_t offset_
uint32_t turn(uint64_t ticket, size_t cap) noexcept
Definition: MPMCQueue.h:1101
size_t idx(uint64_t ticket, size_t cap, int stride) noexcept
Definition: MPMCQueue.h:1095
bool tryObtainReadyPushTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:362
bool tryObtainPromisedPushTicketUntil(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride, const std::chrono::time_point< Clock > &when) noexcept
Definition: MPMCQueue.h:1147
bool maybeUpdateFromClosed(const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:608
MPMCQueueBase(MPMCQueueBase< Derived< T, Atom, Dynamic >> &&rhs) noexcept
Definition: MPMCQueue.h:699
const
Definition: upload.py:398
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
bool tryObtainReadyPopTicket(uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept
Definition: MPMCQueue.h:448
void initQueue(const size_t cap, const size_t mult)
Definition: MPMCQueue.h:348
size_t allocatedCapacity() const noexcept
Doesn&#39;t change for non-dynamic.
Definition: MPMCQueue.h:804
void dequeue(uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &elem) noexcept
Definition: MPMCQueue.h:1405
void dequeueImpl(uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, T &elem, ImplByRelocation) noexcept
Definition: MPMCQueue.h:1494
int getNumClosed(const uint64_t state) const noexcept
Definition: MPMCQueue.h:533
MPMCQueue(size_t queueCapacity, size_t minCapacity, size_t expansionMultiplier)
Definition: MPMCQueue.h:195
size_t allocatedCapacity() const noexcept
Definition: MPMCQueue.h:274
MPMCQueue(size_t queueCapacity)
Definition: MPMCQueue.h:189
detail::Stride stride(size_t s)
Definition: Base-inl.h:2589
bool tryWaitForEnqueueTurnUntil(const uint32_t turn, Atom< uint32_t > &spinCutoff, const bool updateSpinCutoff, const std::chrono::time_point< Clock > &when) noexcept
Definition: MPMCQueue.h:1391
bool isFull() const noexcept
Returns true if there is currently no empty space to enqueue.
Definition: MPMCQueue.h:783
state
Definition: http_parser.c:272
bool mayEnqueue(const uint32_t turn) const noexcept
Definition: MPMCQueue.h:1401
void asm_volatile_pause()
Definition: Asm.h:37