23 #include <glog/logging.h> 213 size_t LgSegmentSize = 8,
215 template <
typename>
class Atom = std::atomic>
221 static constexpr
bool SPSC = SingleProducer && SingleConsumer;
222 static constexpr
size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
224 static constexpr
size_t Align = 1u << LgAlign;
228 "T must be nothrow_destructible");
229 static_assert((Stride & 1) == 1,
"Stride must be odd");
230 static_assert(LgSegmentSize < 32,
"LgSegmentSize must be < 32");
231 static_assert(LgAlign < 16,
"LgAlign must be < 16");
275 if (
LIKELY(o.has_value())) {
287 template <
typename Clock,
typename Duration>
290 const std::chrono::time_point<Clock, Duration>& deadline)
noexcept {
301 template <
typename Clock,
typename Duration>
303 const std::chrono::time_point<Clock, Duration>& deadline)
noexcept {
308 template <
typename Rep,
typename Period>
311 const std::chrono::duration<Rep, Period>& duration)
noexcept {
322 template <
typename Rep,
typename Period>
324 const std::chrono::duration<Rep, Period>& duration)
noexcept {
335 DCHECK(SingleConsumer);
343 return p >
c ? p -
c : 0;
355 template <
typename Arg>
370 template <
typename Arg>
373 if (!SingleProducer) {
376 DCHECK_GE(t, s->minTicket());
378 size_t idx =
index(t);
379 Entry& e = s->entry(idx);
380 e.putItem(std::forward<Arg>(arg));
407 if (!SingleConsumer) {
410 size_t idx =
index(t);
411 Entry& e =
s->entry(idx);
419 template <
typename Clock,
typename Duration>
421 const std::chrono::time_point<Clock, Duration>& deadline)
noexcept {
422 if (SingleConsumer) {
435 template <
typename Clock,
typename Duration>
438 const std::chrono::time_point<Clock, Duration>& deadline)
noexcept {
440 DCHECK_GE(t,
s->minTicket());
442 size_t idx =
index(t);
443 Entry& e =
s->entry(idx);
448 auto ret = e.takeItem();
456 template <
typename Clock,
typename Duration>
459 const std::chrono::time_point<Clock, Duration>& deadline)
noexcept {
467 size_t idx =
index(t);
468 Entry& e =
s->entry(idx);
472 if (!
c_.
ticket.compare_exchange_weak(
473 t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
476 auto ret = e.takeItem();
485 template <
typename Clock,
typename Duration>
489 const std::chrono::time_point<Clock, Duration>& deadline)
noexcept {
490 if (
LIKELY(e.tryWaitUntil(deadline))) {
497 template <
typename Clock,
typename Duration>
499 const std::chrono::time_point<Clock, Duration>& deadline)
noexcept {
502 DCHECK_GE(t, s->minTicket());
504 size_t idx =
index(t);
505 Entry& e = s->entry(idx);
524 Segment*
next =
s->nextSegment();
529 auto dur = std::chrono::microseconds(diff);
534 deadline, opt, [
s] {
return s->nextSegment(); });
535 next =
s->nextSegment();
549 Segment*
next =
new Segment(
t);
550 next->acquire_ref_safe();
551 if (!s->casNextSegment(next)) {
553 next = s->nextSegment();
562 Segment*
next =
s->nextSegment();
574 while (s->minTicket() <
t) {
575 Segment*
next = s->nextSegment();
588 while (
tail() ==
s) {
592 Segment*
next =
s->nextSegment();
608 if (SingleConsumer) {
610 Segment*
next = s->nextSegment();
615 while (s->minTicket() <
t) {
616 Segment*
next = s->nextSegment();
641 s =
s->nextSegment();
645 auto& e =
s->entry(idx);
653 auto s =
h->nextSegment();
654 h->setNextSegment(
nullptr);
657 auto next =
s->nextSegment();
664 return (
t * Stride) & (SegmentSize - 1);
668 return (
t & (SegmentSize - 1)) == 0;
672 return (
t & (SegmentSize - 1)) == (SegmentSize - 1);
676 return c_.
head.load(std::memory_order_acquire);
680 return p_.
tail.load(std::memory_order_acquire);
684 return p_.
ticket.load(std::memory_order_acquire);
688 return c_.
ticket.load(std::memory_order_acquire);
692 DCHECK(SingleConsumer);
693 c_.
head.store(
s, std::memory_order_relaxed);
698 p_.
tail.store(
s, std::memory_order_release);
702 DCHECK(!SingleConsumer);
703 return c_.
head.compare_exchange_strong(
704 s,
next, std::memory_order_release, std::memory_order_acquire);
709 p_.
tail.compare_exchange_strong(
710 s,
next, std::memory_order_release, std::memory_order_relaxed);
714 p_.
ticket.store(
t, std::memory_order_release);
718 c_.
ticket.store(
t, std::memory_order_release);
722 if (SingleConsumer) {
727 return c_.
ticket.fetch_add(1, std::memory_order_acq_rel);
732 if (SingleProducer) {
737 return p_.
ticket.fetch_add(1, std::memory_order_acq_rel);
749 template <
typename Arg>
751 new (&item_)
T(std::forward<Arg>(arg));
770 template <
typename Clock,
typename Duration>
772 const std::chrono::time_point<Clock, Duration>& deadline)
noexcept {
775 flag_.
wait_options().spin_max(std::chrono::microseconds(10));
796 return static_cast<T*
>(
static_cast<void*
>(&item_));
804 Atom<Segment*> next_{
nullptr};
812 return next_.load(std::memory_order_acquire);
816 next_.store(next, std::memory_order_relaxed);
821 return next_.compare_exchange_strong(
822 expected,
next, std::memory_order_release, std::memory_order_relaxed);
826 DCHECK_EQ((min_ & (SegmentSize - 1)),
Ticket(0));
834 template <
typename S>
837 auto p = nextSegment();
852 size_t LgSegmentSize = 8,
854 template <
typename>
class Atom = std::atomic>
861 size_t LgSegmentSize = 8,
863 template <
typename>
class Atom = std::atomic>
870 size_t LgSegmentSize = 8,
872 template <
typename>
class Atom = std::atomic>
879 size_t LgSegmentSize = 8,
881 template <
typename>
class Atom = std::atomic>
FOLLY_ALWAYS_INLINE Segment * findSegment(Segment *s, const Ticket t) noexcept
FOLLY_ALWAYS_INLINE void wait(const WaitOptions &opt=wait_options()) noexcept
FOLLY_ALWAYS_INLINE const T * try_peek() noexcept
FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept
Segment * getAllocNextSegment(Segment *s, Ticket t) noexcept
void push_links(bool m, S &s)
std::chrono::nanoseconds spin_max() const
FOLLY_ALWAYS_INLINE folly::Optional< T > takeItem() noexcept
FOLLY_ALWAYS_INLINE void destroyItem() noexcept
Segment * allocNextSegment(Segment *s)
FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept
#define FOLLY_ALWAYS_INLINE
FOLLY_ALWAYS_INLINE T * get_protected(const Atom< T * > &src) noexcept
static constexpr size_t Align
FOLLY_ALWAYS_INLINE folly::Optional< T > tryDequeueUntil(const std::chrono::time_point< Clock, Duration > &deadline) noexcept
bool casNextSegment(Segment *next) noexcept
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
FOLLY_CPP14_CONSTEXPR bool has_value() const noexcept
void advanceTail(Segment *s) noexcept
FOLLY_ALWAYS_INLINE void putItem(Arg &&arg)
size_t size() const noexcept
constexpr T constexpr_log2(T t)
—— Concurrent Priority Queue Implementation ——
folly::SaturatingSemaphore< MayBlock, Atom > flag_
FOLLY_ALWAYS_INLINE void getItem(T &item) noexcept
requires E e noexcept(noexcept(s.error(std::move(e))))
FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept
FOLLY_ALWAYS_INLINE folly::Optional< T > getItem() noexcept
void setTail(Segment *s) noexcept
FOLLY_ALWAYS_INLINE folly::Optional< T > tryDequeueUntilMC(Segment *s, const std::chrono::time_point< Clock, Duration > &deadline) noexcept
static FOLLY_ALWAYS_INLINE WaitOptions wait_options()
FOLLY_ALWAYS_INLINE bool try_dequeue_for(T &item, const std::chrono::duration< Rep, Period > &duration) noexcept
FOLLY_ALWAYS_INLINE void enqueue(T &&arg)
FOLLY_ALWAYS_INLINE void post() noexcept
constexpr std::size_t hardware_destructive_interference_size
void cleanUpRemainingItems()
void setHead(Segment *s) noexcept
FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept
FOLLY_ALWAYS_INLINE folly::Optional< T > try_dequeue_until(const std::chrono::time_point< Clock, Duration > &deadline) noexcept
FOLLY_ALWAYS_INLINE bool try_dequeue_until(T &item, const std::chrono::time_point< Clock, Duration > &deadline) noexcept
bool casHead(Segment *&s, Segment *next) noexcept
FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(Entry &e, Ticket t, const std::chrono::time_point< Clock, Duration > &deadline) noexcept
std::aligned_storage< sizeof(T), alignof(T)>::type item_
FOLLY_ALWAYS_INLINE const T * peekItem() noexcept
FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept
FOLLY_ALWAYS_INLINE bool try_wait_until(const std::chrono::time_point< Clock, Duration > &deadline, const WaitOptions &opt=wait_options()) noexcept
auto end(TestAdlIterable &instance)
void advanceHeadToTicket(Ticket t) noexcept
static map< string, int > m
static constexpr size_t SegmentSize
FOLLY_ALWAYS_INLINE void dequeueCommon(Segment *s, T &item) noexcept
FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept
static const char *const value
void reclaimSegment(Segment *s) noexcept
FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept
void advanceTailToTicket(Ticket t) noexcept
Segment * nextSegment() const noexcept
uint64_t diff(uint64_t a, uint64_t b)
FOLLY_ALWAYS_INLINE T * itemPtr() noexcept
FOLLY_ALWAYS_INLINE Entry & entry(size_t index) noexcept
FOLLY_ALWAYS_INLINE Segment * tail() const noexcept
FOLLY_ALWAYS_INLINE Segment * head() const noexcept
static constexpr bool SPSC
FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept
void advanceHead(Segment *s) noexcept
FOLLY_ALWAYS_INLINE const T * tryPeekUntil(const std::chrono::time_point< Clock, Duration > &deadline) noexcept
FOLLY_ALWAYS_INLINE void dequeueImpl(T &item) noexcept
Segment(const Ticket t) noexcept
FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept
FOLLY_ALWAYS_INLINE void enqueueCommon(Segment *s, Arg &&arg)
FOLLY_ALWAYS_INLINE folly::Optional< T > tryDequeueUntilSC(Segment *s, const std::chrono::time_point< Clock, Duration > &deadline) noexcept
FOLLY_ALWAYS_INLINE void takeItem(T &item) noexcept
FOLLY_ALWAYS_INLINE void dequeue(T &item) noexcept
static constexpr size_t Stride
FOLLY_ALWAYS_INLINE void enqueueImpl(Arg &&arg)
void reclaimRemainingSegments()
void setNextSegment(Segment *next)
FOLLY_ALWAYS_INLINE void enqueue(const T &arg)
FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept
bool empty() const noexcept
FOLLY_ALWAYS_INLINE bool tryWaitUntil(const std::chrono::time_point< Clock, Duration > &deadline) noexcept
void casTail(Segment *&s, Segment *next) noexcept
spin_result spin_pause_until(std::chrono::time_point< Clock, Duration > const &deadline, WaitOptions const &opt, F f)
FOLLY_ALWAYS_INLINE folly::Optional< T > try_dequeue_for(const std::chrono::duration< Rep, Period > &duration) noexcept
FOLLY_ALWAYS_INLINE folly::Optional< T > try_dequeue() noexcept
void asm_volatile_pause()
FOLLY_ALWAYS_INLINE bool try_dequeue(T &item) noexcept