119 bool MayBlock =
false,
120 bool SupportsSize =
false,
121 size_t PopBatch = 16,
122 size_t ListTargetSize = 25,
124 template <
typename>
class Atom = std::atomic>
132 static constexpr
size_t Align = 1u << 7;
136 static_assert(PopBatch <= 256,
"PopBatch must be <= 256");
138 ListTargetSize >= 1 && ListTargetSize <= 256,
139 "TargetSize must be in the range [1, 256]");
162 head.store(
nullptr, std::memory_order_relaxed);
163 size.store(0, std::memory_order_relaxed);
215 shared_buffer_.reset(
new BufferNode[PopBatch]);
216 for (
size_t i = 0;
i < PopBatch;
i++) {
217 shared_buffer_[
i].pnode =
nullptr;
220 bottom_.store(0, std::memory_order_relaxed);
221 guard_.store(0, std::memory_order_relaxed);
225 levels_[
i] =
nullptr;
234 futex_array_.reset();
248 counter_p_.fetch_add(1, std::memory_order_relaxed);
255 counter_c_.fetch_add(1, std::memory_order_relaxed);
263 DCHECK(SupportsSize);
264 size_t p =
counter_p_.load(std::memory_order_acquire);
265 size_t c =
counter_c_.load(std::memory_order_acquire);
266 return (p > c) ? p - c : 0;
276 return bottom_.load(std::memory_order_acquire);
281 DCHECK(PopBatch > 0);
283 int loc =
top_loc_.load(std::memory_order_relaxed);
285 Node* n = shared_buffer_[loc--].pnode.load(std::memory_order_relaxed);
289 shared_buffer_.reset();
302 while (curList !=
nullptr) {
304 curList = curList->
next;
333 return pos.
level == 0;
362 return levels_[pos.
level][pos.
index].
size.load(std::memory_order_relaxed);
369 levels_[pos.
level][pos.
index].
size.store(v, std::memory_order_relaxed);
375 if (guard_.fetch_add(1, std::memory_order_acq_rel) == 0) {
386 guard_.store(0, std::memory_order_release);
393 levels_[tmp_btm + 1] = new_level;
394 bottom_.store(tmp_btm + 1, std::memory_order_release);
395 guard_.store(0, std::memory_order_release);
419 int loc = (index +
i) % bound;
429 b > LevelForForceInsert &&
getElementSize(pos) < ListTargetSize) {
476 return (tmp ==
nullptr) ? MIN_VALUE : tmp->
val;
482 return (tmp ==
nullptr) ? MIN_VALUE : tmp->
val;
486 return levels_[pos.
level][pos.
index].
head.load(std::memory_order_relaxed);
490 levels_[pos.
level][pos.
index].
head.store(t, std::memory_order_relaxed);
495 if (base ==
nullptr) {
497 }
else if (source ==
nullptr) {
503 if (base->
val >= source->
val) {
509 source = source->
next;
513 while (base !=
nullptr && source !=
nullptr) {
514 if (base->
val >= source->
val) {
519 source = source->
next;
523 if (base ==
nullptr) {
548 for (
int i = 0;
i < leaves;
i++) {
555 if (cnodes > leaves * 2 / 3) {
560 if (cnodes <= leaves * 2 / 3) {
576 int steps = ListTargetSize;
577 for (
int i = 0;
i < steps - 1;
i++) {
578 pruning_head = pruning_head->
next;
580 Node*
t = pruning_head;
581 pruning_head = pruning_head->
next;
590 int left_length = (tail_length + 1) / 2;
591 int right_length = tail_length - left_length;
592 Node *to_right, *to_left = pruning_head;
593 for (
int i = 0;
i < left_length - 1;
i++) {
594 pruning_head = pruning_head->
next;
596 to_right = pruning_head->
next;
597 pruning_head->
next =
nullptr;
601 if (left_length != 0) {
605 if (right_length != 0) {
612 }
else if (left_length != 0) {
617 }
else if (right_length != 0) {
623 if (steps % 2 == 1) {
675 if (
LIKELY(pv > val && nv <= val)) {
681 if (pos.
level >= LevelForTraverseParent) {
683 while (start->
next !=
nullptr && start->
next->
val >= val) {
686 if (start->
next !=
nullptr) {
688 start->
next = newNode;
689 while (start->
next->
next !=
nullptr) {
692 newNode = start->
next;
693 start->
next =
nullptr;
698 if (curList ==
nullptr) {
699 newNode->
next =
nullptr;
703 if (p->
val <= newNode->
val) {
704 newNode->
next = curList;
736 std::unique_lock<Mutex> lck(
738 if (!lck.owns_lock()) {
742 if (sz >= ListTargetSize) {
747 if (curList ==
nullptr) {
748 newNode->
next =
nullptr;
752 if (p->
val <= newNode->
val) {
753 newNode->
next = curList;
777 std::unique_lock<Mutex> lck(
779 if (!lck.owns_lock()) {
790 if (
UNLIKELY(nv < val || sz >= ListTargetSize)) {
812 if (cur.
level == 0) {
829 if (mid.
level == 0 ||
846 bool go_fast_path =
false;
875 for (
int i = num - 1;
i >= 0;
i--) {
877 while (shared_buffer_[
i].pnode.load(std::memory_order_relaxed) !=
nullptr)
879 shared_buffer_[
i].pnode.store(head, std::memory_order_relaxed);
883 top_loc_.store(num - 1, std::memory_order_release);
904 if (nv >= lv && nv >= rv) {
916 if (sum <= MergingSize) {
930 if (rv >= lv && rv > nv) {
935 }
else if (lv >= rv && lv > nv) {
989 if (head ==
nullptr) {
995 if (PopBatch > 0 &&
top_loc_.load(std::memory_order_acquire) >= 0) {
1029 auto p =
pticket_.fetch_add(1, std::memory_order_acq_rel);
1031 uint32_t curfutex = futex_array_[loc].load(std::memory_order_acquire);
1037 ready + 1 < curfutex ||
1038 ((curfutex > ready) && (curfutex - ready > 0x40000000)))) {
1042 if (futex_array_[loc].compare_exchange_strong(curfutex, ready)) {
1049 curfutex = futex_array_[loc].load(std::memory_order_acquire);
1063 return top_loc_.load(std::memory_order_acquire) < 0;
1075 auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
1076 uint32_t short_cticket = curticket & 0x7FFFFFFF;
1077 uint32_t futex_ready = curfutex >> 1;
1079 return futex_ready >= short_cticket ||
1080 short_cticket - futex_ready > 0x40000000;
1083 template <
typename Clock,
typename Duration>
1085 const size_t& curticket,
1086 const std::chrono::time_point<Clock, Duration>& deadline,
1095 auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
1106 curfutex = futex_array_[loc].load(std::memory_order_acquire);
1111 uint32_t blocking_futex = curfutex + 1;
1112 if (futex_array_[loc].compare_exchange_strong(
1113 curfutex, blocking_futex)) {
1123 auto ct =
cticket_.fetch_add(1, std::memory_order_acq_rel);
1150 template <
typename Clock,
typename Duration>
1152 const std::chrono::time_point<Clock, Duration>& deadline,
1156 deadline, opt, [=] {
return !
isEmpty(); })) {
1181 get_or =
top_loc_.fetch_sub(1, std::memory_order_acq_rel);
1183 Node*
c = shared_buffer_[get_or].pnode.load(std::memory_order_relaxed);
1184 shared_buffer_[get_or].pnode.store(
nullptr, std::memory_order_release);
1194 return ((s - 1) * Stride) & (NumFutex - 1);
static constexpr size_t MergingSize
void swapList(const Position &a, const Position &b)
Swap two Tree Elements (head, size)
bool deferSettingRootSize(Position &pos)
FOLLY_ALWAYS_INLINE void lockNode(const Position &pos)
spin_result spin_yield_until(std::chrono::time_point< Clock, Duration > const &deadline, F f)
bool isHeap(const Position &pos)
Check the first node in TreeElement keeps the heap structure.
FOLLY_ALWAYS_INLINE bool isMoundEmpty()
FOLLY_ALWAYS_INLINE T optimisticReadValue(const Position &pos, folly::hazptr_holder< Atom > &hptr)
~RelaxedConcurrentPriorityQueue()
std::atomic< int64_t > sum(0)
static FOLLY_ALWAYS_INLINE folly::WaitOptions wait_options()
void grow(uint32_t btm)
Extend the tree level.
FOLLY_ALWAYS_INLINE Position parentOf(const Position &pos)
Locate the parent node.
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
Position selectPosition(const T &val, bool &path, uint32_t &seed, folly::hazptr_holder< Atom > &hptr)
TODO: optimization.
#define FOLLY_ALWAYS_INLINE
FOLLY_ALWAYS_INLINE T * get_protected(const Atom< T * > &src) noexcept
void deleteSharedBuffer()
This function is only called by the destructor.
int popToSharedBuffer(const uint32_t rsize, Node *head)
void deleteAllNodes(const Position &pos)
This function is only called by the destructor.
Atom< size_t > counter_c_
RelaxedConcurrentPriorityQueue()
Constructor.
void binarySearchPosition(Position &cur, const T &val, folly::hazptr_holder< Atom > &hptr)
Atom< std::uint32_t > Futex
void retire(D deleter={}, hazptr_domain< Atom > &domain=default_hazptr_domain< Atom >())
Mound Element (Tree node), head points to a linked list.
FOLLY_ALWAYS_INLINE bool isRoot(const Position &pos)
Current element is the root?
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
std::unique_ptr< folly::detail::Futex< Atom >[]> futex_array_
—— Concurrent Priority Queue Implementation ——
static constexpr size_t Stride
MoundElement * levels_[MAX_LEVELS]
Data members.
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
FOLLY_ALWAYS_INLINE Position leftOf(const Position &pos)
Locate the left child.
static constexpr size_t NumFutex
Blocking algorithm.
static constexpr size_t PruningSize
static constexpr uint32_t MAX_LEVELS
Atom< uint32_t > cticket_
void tryBlockingPop(const size_t &curticket)
void moundPush(const T &val)
bool moundPopMany(T &val)
bool regularInsert(const Position &pos, const T &val, Node *newNode)
bool empty()
Returns true only if the queue was empty during the call.
FOLLY_ALWAYS_INLINE Position rightOf(const Position &pos)
Locate the right child.
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
FOLLY_NOINLINE bool trySpinBeforeBlock(const size_t &curticket, const std::chrono::time_point< Clock, Duration > &deadline, const folly::WaitOptions &opt=wait_options())
FOLLY_ALWAYS_INLINE bool isLeaf(const Position &pos)
Current position is leaf?
static constexpr size_t Align
uint32_t getBottomLevel()
FOLLY_ALWAYS_INLINE bool isSharedBufferEmpty()
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
FOLLY_ALWAYS_INLINE bool futexIsReady(const size_t &curticket)
Atom< uint32_t > pticket_
void mergeListTo(const Position &pos, Node *t, const size_t &list_length)
Merge list t to the Element Position.
FOLLY_ALWAYS_INLINE bool isEmpty()
void startPruning(const Position &pos)
bool forceInsert(const Position &pos, const T &val, Node *newNode)
FOLLY_ALWAYS_INLINE bool trylockNode(const Position &pos)
size_t getFutexArrayLoc(size_t s)
FOLLY_ALWAYS_INLINE T readValue(const Position &pos)
bool tryPopFromSharedBuffer(T &val)
std::unique_ptr< BufferNode[]> shared_buffer_
Atom< size_t > counter_p_
bool forceInsertToRoot(Node *newNode)
Node for shared buffer should be aligned.
bool pruningLeaf(const Position &pos)
The pos strcture simplify the implementation.
static constexpr T MIN_VALUE
static constexpr int LevelForTraverseParent
FOLLY_ALWAYS_INLINE void unlockNode(const Position &pos)
static constexpr int LevelForForceInsert
spin_result spin_pause_until(std::chrono::time_point< Clock, Duration > const &deadline, WaitOptions const &opt, F f)
Node * mergeList(Node *base, Node *source)
void mergeDown(const Position &pos)
folly::Function< void()> parent
FOLLY_NOINLINE bool tryWait(const std::chrono::time_point< Clock, Duration > &deadline, const folly::WaitOptions &opt=wait_options())
bool tryPopFromMound(T &val)
int futexWake(const Futex *futex, int count, uint32_t wakeMask)