22 #include <glog/logging.h> 270 template <
typename T>
272 template <
typename Arg>
283 size_t LgSegmentSize = 8,
286 template <
typename>
class Atom = std::atomic>
295 static constexpr
bool SPSC = SingleProducer && SingleConsumer;
296 static constexpr
size_t Align = 1u << LgAlign;
298 static_assert(LgAlign < 16,
"LgAlign must be < 16");
303 alignas(Align) Atom<Weight> debit_;
307 alignas(Align) Atom<Weight> credit_;
312 alignas(Align) Atom<Weight> transfer_;
330 capacity_(capacity + threshold(capacity)),
332 threshold_(threshold(capacity)),
352 return tryEnqueueImpl(v);
360 template <
typename Clock,
typename Duration>
363 const std::chrono::time_point<Clock, Duration>& deadline) {
364 return tryEnqueueUntilImpl(v, deadline);
367 template <
typename Clock,
typename Duration>
370 const std::chrono::time_point<Clock, Duration>& deadline) {
371 return tryEnqueueUntilImpl(
std::move(
v), deadline);
375 template <
typename Rep,
typename Period>
378 const std::chrono::duration<Rep, Period>& duration) {
379 return tryEnqueueForImpl(v, duration);
382 template <
typename Rep,
typename Period>
385 const std::chrono::duration<Rep, Period>& duration) {
386 return tryEnqueueForImpl(
std::move(
v), duration);
394 addCredit(WeightFn()(elem));
399 if (q_.try_dequeue(elem)) {
400 addCredit(WeightFn()(elem));
407 template <
typename Clock,
typename Duration>
410 const std::chrono::time_point<Clock, Duration>& deadline) {
411 if (q_.try_dequeue_until(elem, deadline)) {
412 addCredit(WeightFn()(elem));
419 template <
typename Rep,
typename Period>
422 const std::chrono::duration<Rep, Period>& duration) {
423 if (q_.try_dequeue_for(elem, duration)) {
424 addCredit(WeightFn()(elem));
434 Weight thresh = threshold(capacity);
435 capacity_.store(capacity + thresh, std::memory_order_release);
436 threshold_.store(thresh, std::memory_order_release);
442 auto c = getCredit();
443 auto t = getTransfer();
444 return d > (
c +
t) ? d - (
c +
t) : 0;
463 return capacity / 10;
468 template <
typename Arg>
474 template <
typename Arg>
476 return tryEnqueueUntilImpl(
480 template <
typename Clock,
typename Duration,
typename Arg>
483 const std::chrono::time_point<Clock, Duration>& deadline) {
484 Weight weight = WeightFn()(std::forward<Arg>(
v));
485 if (
LIKELY(tryAddDebit(weight))) {
486 q_.enqueue(std::forward<Arg>(
v));
489 return tryEnqueueUntilSlow(std::forward<Arg>(
v), deadline);
492 template <
typename Rep,
typename Period,
typename Arg>
495 const std::chrono::duration<Rep, Period>& duration) {
496 if (
LIKELY(tryEnqueueImpl(std::forward<Arg>(
v)))) {
500 return tryEnqueueUntilSlow(std::forward<Arg>(
v), deadline);
504 Weight capacity = getCapacity();
505 Weight before = fetchAddDebit(weight);
506 if (
LIKELY(before + weight <= capacity)) {
515 return capacity_.load(std::memory_order_acquire);
520 if (SingleProducer) {
522 debit_.store(before + weight, std::memory_order_relaxed);
524 before = debit_.fetch_add(weight, std::memory_order_acq_rel);
530 return debit_.load(std::memory_order_acquire);
536 Weight before = fetchAddCredit(weight);
537 Weight thresh = getThreshold();
538 if (before + weight >= thresh && before < thresh) {
545 if (SingleConsumer) {
546 before = getCredit();
547 credit_.store(before + weight, std::memory_order_relaxed);
549 before = credit_.fetch_add(weight, std::memory_order_acq_rel);
555 return credit_.load(std::memory_order_acquire);
559 return threshold_.load(std::memory_order_acquire);
566 if (SingleProducer) {
568 debit_.store(before - weight, std::memory_order_relaxed);
570 before = debit_.fetch_sub(weight, std::memory_order_acq_rel);
572 DCHECK_GE(before, weight);
575 template <
typename Clock,
typename Duration,
typename Arg>
578 const std::chrono::time_point<Clock, Duration>& deadline) {
579 Weight weight = WeightFn()(std::forward<Arg>(
v));
580 if (canEnqueue(deadline, weight)) {
581 q_.enqueue(std::forward<Arg>(
v));
588 template <
typename Clock,
typename Duration>
590 const std::chrono::time_point<Clock, Duration>& deadline,
592 Weight capacity = getCapacity();
595 Weight debit = getDebit();
596 if ((debit + weight <= capacity) && tryAddDebit(weight)) {
603 if (canBlock(weight, capacity)) {
613 waiting_.store(WAITING, std::memory_order_relaxed);
614 std::atomic_thread_fence(std::memory_order_seq_cst);
616 Weight debit = getDebit();
617 return debit + weight > capacity;
621 Weight w = takeTransfer();
631 w = transfer_.exchange(0, std::memory_order_acq_rel);
637 return transfer_.load(std::memory_order_acquire);
643 Weight credit = takeCredit();
644 transfer_.fetch_add(credit, std::memory_order_acq_rel);
646 std::atomic_thread_fence(std::memory_order_seq_cst);
647 waiting_.store(NOTWAITING, std::memory_order_relaxed);
654 if (SingleConsumer) {
655 credit = credit_.load(std::memory_order_relaxed);
656 credit_.store(0, std::memory_order_relaxed);
658 credit = credit_.exchange(0, std::memory_order_acq_rel);
671 size_t LgSegmentSize = 8,
674 template <
typename>
class Atom = std::atomic>
689 size_t LgSegmentSize = 8,
692 template <
typename>
class Atom = std::atomic>
707 size_t LgSegmentSize = 8,
710 template <
typename>
class Atom = std::atomic>
725 size_t LgSegmentSize = 8,
728 template <
typename>
class Atom = std::atomic>
FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept
uint64_t operator()(Arg &&) const noexcept
FOLLY_ALWAYS_INLINE void enqueue(T &&v)
Weight weight() const noexcept
FOLLY_ALWAYS_INLINE void dequeue(T &elem)
Dequeue functions.
FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept
#define FOLLY_ALWAYS_INLINE
FOLLY_ALWAYS_INLINE void enqueue(const T &v)
Enqueue functions.
size_t size() const noexcept
FOLLY_ALWAYS_INLINE void enqueueImpl(Arg &&v)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Atom< std::uint32_t > Futex
FOLLY_ALWAYS_INLINE bool try_dequeue(T &elem)
FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg &&v)
Weight takeTransfer() noexcept
FOLLY_ALWAYS_INLINE bool try_enqueue_for(const T &v, const std::chrono::duration< Rep, Period > &duration)
DynamicBoundedQueue(Weight capacity)
—— Concurrent Priority Queue Implementation ——
bool tryReduceDebit() noexcept
requires E e noexcept(noexcept(s.error(std::move(e))))
FOLLY_ALWAYS_INLINE bool try_dequeue_until(T &elem, const std::chrono::time_point< Clock, Duration > &deadline)
FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl(Arg &&v, const std::chrono::duration< Rep, Period > &duration)
FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept
FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept
void subDebit(Weight weight) noexcept
bool tryEnqueueUntilSlow(Arg &&v, const std::chrono::time_point< Clock, Duration > &deadline)
bool canBlock(Weight weight, Weight capacity) noexcept
FOLLY_ALWAYS_INLINE bool try_enqueue(T &&v)
FOLLY_ALWAYS_INLINE bool try_enqueue_until(T &&v, const std::chrono::time_point< Clock, Duration > &deadline)
void reset_capacity(Weight capacity) noexcept
Secondary functions.
FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept
UnboundedQueue< T, SingleProducer, SingleConsumer, MayBlock, LgSegmentSize, LgAlign, Atom > q_
constexpr Weight threshold(Weight capacity) const noexcept
Private functions ///.
FOLLY_ALWAYS_INLINE bool try_enqueue_for(T &&v, const std::chrono::duration< Rep, Period > &duration)
FutexResult futexWaitUntil(const Futex *futex, uint32_t expected, std::chrono::time_point< Clock, Duration > const &deadline, uint32_t waitMask)
Weight takeCredit() noexcept
void transferCredit() noexcept
FOLLY_ALWAYS_INLINE bool try_enqueue_until(const T &v, const std::chrono::time_point< Clock, Duration > &deadline)
Atom< Weight > threshold_
Weight getTransfer() const noexcept
detail::Futex< Atom > waiting_
bool empty() const noexcept
FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept
FOLLY_ALWAYS_INLINE bool try_enqueue(const T &v)
bool canEnqueue(const std::chrono::time_point< Clock, Duration > &deadline, Weight weight) noexcept
FOLLY_ALWAYS_INLINE bool try_dequeue_for(T &elem, const std::chrono::duration< Rep, Period > &duration)
FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl(Arg &&v, const std::chrono::time_point< Clock, Duration > &deadline)
FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept
void asm_volatile_pause()