proxygen
RelaxedConcurrentPriorityQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <climits>
21 #include <cmath>
22 #include <iomanip>
23 #include <iostream>
24 #include <mutex>
25 
26 #include <folly/Random.h>
27 #include <folly/SpinLock.h>
28 #include <folly/ThreadLocal.h>
29 #include <folly/detail/Futex.h>
30 #include <folly/lang/Align.h>
34 
36 // The concurrent priority queue implementation is based on the
37 // Mound data structure (Mounds: Array-Based Concurrent Priority Queues
38 // by Yujie Liu and Michael Spear, ICPP 2012)
39 //
41 // This relaxed implementation extends the Mound algorithm, and provides
42 // following features:
43 // - Arbitrary priorities.
44 // - Unbounded size.
45 // - Push, pop, empty, size functions. [TODO: Non-waiting and timed wait pop]
46 // - Supports blocking.
47 // - Fast and Scalable.
48 //
50 // A Mound is a heap where each element is a sorted linked list.
51 // First nodes in the lists maintain the heap property. Push randomly
52 // selects a leaf at the bottom level, then uses binary search to find
53 // a place to insert the new node to the head of the list. Pop gets
54 // the node from the head of the list at the root, then swap the
55 // list down until the heap feature holds. To use Mound in our
56 // implementation, we need to solve the following problems:
57 // - 1. Lack of general relaxed implementations. Mound is appealing
58 // for relaxed priority queue implementation because pop the whole
59 // list from the root is straightforward. One thread pops the list
60 // and following threads can pop from the list until its empty.
61 // Those pops only trigger one swap done operation. Thus reduce
62 // the latency for pop and reduce the contention for Mound.
63 // The difficulty is to provide a scalable and fast mechanism
64 // to let threads concurrently get elements from the list.
65 // - 2. Lack of control of list length. The length for every
66 // lists is critical for the performance. Mound suffers from not
67 // only the extreme cases(Push with increasing priorities, Mound
68 // becomes a sorted linked list; Push with decreasing priorities,
69 // Mound becomes to a regular heap), but also the common case(for
70 // random generated priorities, Mound degrades to the regular heap
71 // after millions of push/pop operations). The difficulty is to
72 // stabilize the list length without losing the accuracy and performance.
73 // - 3. Does not support blocking. Blocking is an important feature.
74 // Mound paper does not mention it. Designing the new algorithm for
75 // efficient blocking is challenging.
76 // - 4. Memory management. Mound allows optimistic reads. We need to
77 // protect the node from been reclaimed.
78 //
80 // Our implementation extends Mound algorithm to support
81 // efficient relaxed pop. We employ a shared buffer algorithm to
82 // share the popped list. Our algorithm makes popping from shared
83 // buffer as fast as fetch_and_add. We improve the performance
84 // and compact the heap structure by stabilizing the size of each list.
85 // The implementation exposes the template parameter to set the
86 // preferred list length. Under the hood, we provide algorithms for
87 // fast inserting, pruning, and merging. The blocking algorithm is
88 // tricky. It allows one producer only wakes one consumer at a time.
89 // It also does not block the producer. For optimistic read, we use
90 // hazard pointer to protect the node from been reclaimed. We optimize the
91 // check-lock-check pattern by using test-test-and-set spin lock.
92 
94 // 1. PopBatch could be 0 or a positive integer.
95 // If it is 0, only pop one node at a time.
96 // This is the strict implementation. It guarantees the return
97 // priority is alway the highest. If it is > 0, we keep
98 // up to that number of nodes in a shared buffer to be consumed by
99 // subsequent pop operations.
100 //
101 // 2. ListTargetSize represents the minimal length for the list. It
102 // solves the problem when inserting to Mound with
103 // decreasing priority order (degrade to a heap). Moreover,
104 // it maintains the Mound structure stable after trillions of
105 // operations, which causes unbalanced problem in the original
106 // Mound algorithm. We set the prunning length and merging lengtyh
107 // based on this parameter.
108 //
110 // void push(const T& val)
111 // void pop(T& val)
112 // size_t size()
113 // bool empty()
114 
115 namespace folly {
116 
117 template <
118  typename T,
119  bool MayBlock = false,
120  bool SupportsSize = false,
121  size_t PopBatch = 16,
122  size_t ListTargetSize = 25,
123  typename Mutex = folly::SpinLock,
124  template <typename> class Atom = std::atomic>
126  // Max height of the tree
127  static constexpr uint32_t MAX_LEVELS = 32;
128  // The default minimum value
130 
131  // Align size for the shared buffer node
132  static constexpr size_t Align = 1u << 7;
133  static constexpr int LevelForForceInsert = 3;
134  static constexpr int LevelForTraverseParent = 7;
135 
136  static_assert(PopBatch <= 256, "PopBatch must be <= 256");
137  static_assert(
138  ListTargetSize >= 1 && ListTargetSize <= 256,
139  "TargetSize must be in the range [1, 256]");
140 
141  // The maximal length for the list
142  static constexpr size_t PruningSize = ListTargetSize * 2;
143  // When pop from Mound, tree elements near the leaf
144  // level are likely be very small (the length of the list). When
145  // swapping down after pop a list, we check the size of the
146  // children to decide whether to merge them to their parent.
147  static constexpr size_t MergingSize = ListTargetSize;
148 
150  struct Node : public folly::hazptr_obj_base<Node, Atom> {
153  };
154 
156  struct MoundElement {
157  // Reading (head, size) without acquiring the lock
158  Atom<Node*> head;
159  Atom<size_t> size;
160  alignas(Align) Mutex lock;
161  MoundElement() { // initializer
162  head.store(nullptr, std::memory_order_relaxed);
163  size.store(0, std::memory_order_relaxed);
164  }
165  };
166 
168  struct Position {
171  };
172 
174  struct BufferNode {
175  alignas(Align) Atom<Node*> pnode;
176  };
177 
179 
180  // Mound structure -> 2D array to represent a tree
182  // Record the current leaf level (root is 0)
183  Atom<uint32_t> bottom_;
184  // It is used when expanding the tree
185  Atom<uint32_t> guard_;
186 
187  // Mound with shared buffer
188  // Following two members are accessed by consumers
189  std::unique_ptr<BufferNode[]> shared_buffer_;
190  alignas(Align) Atom<int> top_loc_;
191 
193  // Numbers of futexs in the array
194  static constexpr size_t NumFutex = 128;
195  // The index gap for accessing futex in the array
196  static constexpr size_t Stride = 33;
197  std::unique_ptr<folly::detail::Futex<Atom>[]> futex_array_;
198  alignas(Align) Atom<uint32_t> cticket_;
199  alignas(Align) Atom<uint32_t> pticket_;
200 
201  // Two counters to calculate size of the queue
202  alignas(Align) Atom<size_t> counter_p_;
203  alignas(Align) Atom<size_t> counter_c_;
204 
205  public:
208  : cticket_(1), pticket_(1), counter_p_(0), counter_c_(0) {
209  if (MayBlock) {
210  futex_array_.reset(new folly::detail::Futex<Atom>[NumFutex]);
211  }
212 
213  if (PopBatch > 0) {
214  top_loc_ = -1;
215  shared_buffer_.reset(new BufferNode[PopBatch]);
216  for (size_t i = 0; i < PopBatch; i++) {
217  shared_buffer_[i].pnode = nullptr;
218  }
219  }
220  bottom_.store(0, std::memory_order_relaxed);
221  guard_.store(0, std::memory_order_relaxed);
222  // allocate the root MoundElement and initialize Mound
223  levels_[0] = new MoundElement[1]; // default MM for MoundElement
224  for (uint32_t i = 1; i < MAX_LEVELS; i++) {
225  levels_[i] = nullptr;
226  }
227  }
228 
230  if (PopBatch > 0) {
232  }
233  if (MayBlock) {
234  futex_array_.reset();
235  }
236  Position pos;
237  pos.level = pos.index = 0;
238  deleteAllNodes(pos);
239  // default MM for MoundElement
240  for (int i = getBottomLevel(); i >= 0; i--) {
241  delete[] levels_[i];
242  }
243  }
244 
245  void push(const T& val) {
246  moundPush(val);
247  if (SupportsSize) {
248  counter_p_.fetch_add(1, std::memory_order_relaxed);
249  }
250  }
251 
252  void pop(T& val) {
253  moundPop(val);
254  if (SupportsSize) {
255  counter_c_.fetch_add(1, std::memory_order_relaxed);
256  }
257  }
258 
262  size_t size() {
263  DCHECK(SupportsSize);
264  size_t p = counter_p_.load(std::memory_order_acquire);
265  size_t c = counter_c_.load(std::memory_order_acquire);
266  return (p > c) ? p - c : 0;
267  }
268 
270  bool empty() {
271  return isEmpty();
272  }
273 
274  private:
276  return bottom_.load(std::memory_order_acquire);
277  }
278 
281  DCHECK(PopBatch > 0);
282  // delete nodes in the buffer
283  int loc = top_loc_.load(std::memory_order_relaxed);
284  while (loc >= 0) {
285  Node* n = shared_buffer_[loc--].pnode.load(std::memory_order_relaxed);
286  delete n;
287  }
288  // delete buffer
289  shared_buffer_.reset();
290  }
291 
293  void deleteAllNodes(const Position& pos) {
294  if (getElementSize(pos) == 0) {
295  // current list is empty, do not need to check
296  // its children again.
297  return;
298  }
299 
300  Node* curList = getList(pos);
301  setTreeNode(pos, nullptr);
302  while (curList != nullptr) { // reclaim nodes
303  Node* n = curList;
304  curList = curList->next;
305  delete n;
306  }
307 
308  if (!isLeaf(pos)) {
309  deleteAllNodes(leftOf(pos));
310  deleteAllNodes(rightOf(pos));
311  }
312  }
313 
315  bool isHeap(const Position& pos) {
316  if (isLeaf(pos)) {
317  return true;
318  }
319  Position lchild = leftOf(pos);
320  Position rchild = rightOf(pos);
321  return isHeap(lchild) && isHeap(rchild) &&
322  readValue(pos) >= readValue(lchild) &&
323  readValue(pos) >= readValue(rchild);
324  }
325 
328  return pos.level == getBottomLevel();
329  }
330 
333  return pos.level == 0;
334  }
335 
338  Position res;
339  res.level = pos.level - 1;
340  res.index = pos.index / 2;
341  return res;
342  }
343 
346  Position res;
347  res.level = pos.level + 1;
348  res.index = pos.index * 2;
349  return res;
350  }
351 
354  Position res;
355  res.level = pos.level + 1;
356  res.index = pos.index * 2 + 1;
357  return res;
358  }
359 
362  return levels_[pos.level][pos.index].size.load(std::memory_order_relaxed);
363  }
364 
367  const Position& pos,
368  const uint32_t& v) {
369  levels_[pos.level][pos.index].size.store(v, std::memory_order_relaxed);
370  }
371 
373  void grow(uint32_t btm) {
374  while (true) {
375  if (guard_.fetch_add(1, std::memory_order_acq_rel) == 0) {
376  break;
377  }
378  // someone already expanded the tree
379  if (btm != getBottomLevel()) {
380  return;
381  }
383  }
384  // double check the bottom has not changed yet
385  if (btm != getBottomLevel()) {
386  guard_.store(0, std::memory_order_release);
387  return;
388  }
389  // create and initialize the new level
390  uint32_t tmp_btm = getBottomLevel();
391  uint32_t size = 1 << (tmp_btm + 1);
392  MoundElement* new_level = new MoundElement[size]; // MM
393  levels_[tmp_btm + 1] = new_level;
394  bottom_.store(tmp_btm + 1, std::memory_order_release);
395  guard_.store(0, std::memory_order_release);
396  }
397 
399  // This function is important, it selects a position to insert the
400  // node, there are two execution paths when this function returns.
401  // 1. It returns a position with head node has lower priority than the target.
402  // Thus it could be potentially used as the starting element to do the binary
403  // search to find the fit position. (slow path)
404  // 2. It returns a position, which is not the best fit.
405  // But it prevents aggressively grow the Mound. (fast path)
407  const T& val,
408  bool& path,
409  uint32_t& seed,
411  while (true) {
413  int bound = 1 << b; // number of elements in this level
414  int steps = 1 + b * b; // probe the length
415  ++seed;
416  uint32_t index = seed % bound;
417 
418  for (int i = 0; i < steps; i++) {
419  int loc = (index + i) % bound;
420  Position pos;
421  pos.level = b;
422  pos.index = loc;
423  // the first round, we do the quick check
424  if (optimisticReadValue(pos, hptr) <= val) {
425  path = false;
426  seed = ++loc;
427  return pos;
428  } else if (
429  b > LevelForForceInsert && getElementSize(pos) < ListTargetSize) {
430  // [fast path] conservative implementation
431  // it makes sure every tree element should
432  // have more than the given number of nodes.
433  seed = ++loc;
434  path = true;
435  return pos;
436  }
437  if (b != getBottomLevel()) {
438  break;
439  }
440  }
441  // failed too many times grow
442  if (b == getBottomLevel()) {
443  grow(b);
444  }
445  }
446  }
447 
449  void swapList(const Position& a, const Position& b) {
450  Node* tmp = getList(a);
451  setTreeNode(a, getList(b));
452  setTreeNode(b, tmp);
453 
454  // need to swap the tree node meta-data
455  uint32_t sa = getElementSize(a);
456  uint32_t sb = getElementSize(b);
457  setElementSize(a, sb);
458  setElementSize(b, sa);
459  }
460 
462  levels_[pos.level][pos.index].lock.lock();
463  }
464 
466  levels_[pos.level][pos.index].lock.unlock();
467  }
468 
470  return levels_[pos.level][pos.index].lock.try_lock();
471  }
472 
475  Node* tmp = hptr.get_protected(levels_[pos.level][pos.index].head);
476  return (tmp == nullptr) ? MIN_VALUE : tmp->val;
477  }
478 
479  // Get the value from the head of the list as the elementvalue
481  Node* tmp = getList(pos);
482  return (tmp == nullptr) ? MIN_VALUE : tmp->val;
483  }
484 
486  return levels_[pos.level][pos.index].head.load(std::memory_order_relaxed);
487  }
488 
490  levels_[pos.level][pos.index].head.store(t, std::memory_order_relaxed);
491  }
492 
493  // Merge two sorted lists
494  Node* mergeList(Node* base, Node* source) {
495  if (base == nullptr) {
496  return source;
497  } else if (source == nullptr) {
498  return base;
499  }
500 
501  Node *res, *p;
502  // choose the head node
503  if (base->val >= source->val) {
504  res = base;
505  base = base->next;
506  p = res;
507  } else {
508  res = source;
509  source = source->next;
510  p = res;
511  }
512 
513  while (base != nullptr && source != nullptr) {
514  if (base->val >= source->val) {
515  p->next = base;
516  base = base->next;
517  } else {
518  p->next = source;
519  source = source->next;
520  }
521  p = p->next;
522  }
523  if (base == nullptr) {
524  p->next = source;
525  } else {
526  p->next = base;
527  }
528  return res;
529  }
530 
532  void mergeListTo(const Position& pos, Node* t, const size_t& list_length) {
533  Node* head = getList(pos);
534  setTreeNode(pos, mergeList(head, t));
535  uint32_t ns = getElementSize(pos) + list_length;
536  setElementSize(pos, ns);
537  }
538 
539  bool pruningLeaf(const Position& pos) {
540  if (getElementSize(pos) <= PruningSize) {
541  unlockNode(pos);
542  return true;
543  }
544 
545  int b = getBottomLevel();
546  int leaves = 1 << b;
547  int cnodes = 0;
548  for (int i = 0; i < leaves; i++) {
549  Position tmp;
550  tmp.level = b;
551  tmp.index = i;
552  if (getElementSize(tmp) != 0) {
553  cnodes++;
554  }
555  if (cnodes > leaves * 2 / 3) {
556  break;
557  }
558  }
559 
560  if (cnodes <= leaves * 2 / 3) {
561  unlockNode(pos);
562  return true;
563  }
564  return false;
565  }
566 
569  void startPruning(const Position& pos) {
570  if (isLeaf(pos) && pruningLeaf(pos)) {
571  return;
572  }
573 
574  // split the list, record the tail
575  Node* pruning_head = getList(pos);
576  int steps = ListTargetSize; // keep in the original list
577  for (int i = 0; i < steps - 1; i++) {
578  pruning_head = pruning_head->next;
579  }
580  Node* t = pruning_head;
581  pruning_head = pruning_head->next;
582  t->next = nullptr;
583  int tail_length = getElementSize(pos) - steps;
584  setElementSize(pos, steps);
585 
586  // split the tail list into two lists
587  // evenly merge to two children
588  if (pos.level != getBottomLevel()) {
589  // split the rest into two lists
590  int left_length = (tail_length + 1) / 2;
591  int right_length = tail_length - left_length;
592  Node *to_right, *to_left = pruning_head;
593  for (int i = 0; i < left_length - 1; i++) {
594  pruning_head = pruning_head->next;
595  }
596  to_right = pruning_head->next;
597  pruning_head->next = nullptr;
598 
599  Position lchild = leftOf(pos);
600  Position rchild = rightOf(pos);
601  if (left_length != 0) {
602  lockNode(lchild);
603  mergeListTo(lchild, to_left, left_length);
604  }
605  if (right_length != 0) {
606  lockNode(rchild);
607  mergeListTo(rchild, to_right, right_length);
608  }
609  unlockNode(pos);
610  if (left_length != 0 && getElementSize(lchild) > PruningSize) {
611  startPruning(lchild);
612  } else if (left_length != 0) {
613  unlockNode(lchild);
614  }
615  if (right_length != 0 && getElementSize(rchild) > PruningSize) {
616  startPruning(rchild);
617  } else if (right_length != 0) {
618  unlockNode(rchild);
619  }
620  } else { // time to grow the Mound
621  grow(pos.level);
622  // randomly choose a child to insert
623  if (steps % 2 == 1) {
624  Position rchild = rightOf(pos);
625  lockNode(rchild);
626  mergeListTo(rchild, pruning_head, tail_length);
627  unlockNode(pos);
628  unlockNode(rchild);
629  } else {
630  Position lchild = leftOf(pos);
631  lockNode(lchild);
632  mergeListTo(lchild, pruning_head, tail_length);
633  unlockNode(pos);
634  unlockNode(lchild);
635  }
636  }
637  }
638 
639  // This function insert the new node (always) at the head of the
640  // current list. It needs to lock the parent & current
641  // This function may cause the list becoming tooooo long, so we
642  // provide pruning algorithm.
643  bool regularInsert(const Position& pos, const T& val, Node* newNode) {
644  // insert to the root node
645  if (isRoot(pos)) {
646  lockNode(pos);
647  T nv = readValue(pos);
648  if (LIKELY(nv <= val)) {
649  newNode->next = getList(pos);
650  setTreeNode(pos, newNode);
651  uint32_t sz = getElementSize(pos);
652  setElementSize(pos, sz + 1);
653  if (UNLIKELY(sz > PruningSize)) {
654  startPruning(pos);
655  } else {
656  unlockNode(pos);
657  }
658  return true;
659  }
660  unlockNode(pos);
661  return false;
662  }
663 
664  // insert to an inner node
665  Position parent = parentOf(pos);
666  if (!trylockNode(parent)) {
667  return false;
668  }
669  if (!trylockNode(pos)) {
670  unlockNode(parent);
671  return false;
672  }
673  T pv = readValue(parent);
674  T nv = readValue(pos);
675  if (LIKELY(pv > val && nv <= val)) {
676  // improve the accuracy by getting the node(R) with less priority than the
677  // new value from parent level, insert the new node to the parent list
678  // and insert R to the current list.
679  // It only happens at >= LevelForTraverseParent for reducing contention
680  uint32_t sz = getElementSize(pos);
681  if (pos.level >= LevelForTraverseParent) {
682  Node* start = getList(parent);
683  while (start->next != nullptr && start->next->val >= val) {
684  start = start->next;
685  }
686  if (start->next != nullptr) {
687  newNode->next = start->next;
688  start->next = newNode;
689  while (start->next->next != nullptr) {
690  start = start->next;
691  }
692  newNode = start->next;
693  start->next = nullptr;
694  }
695  unlockNode(parent);
696 
697  Node* curList = getList(pos);
698  if (curList == nullptr) {
699  newNode->next = nullptr;
700  setTreeNode(pos, newNode);
701  } else {
702  Node* p = curList;
703  if (p->val <= newNode->val) {
704  newNode->next = curList;
705  setTreeNode(pos, newNode);
706  } else {
707  while (p->next != nullptr && p->next->val >= newNode->val) {
708  p = p->next;
709  }
710  newNode->next = p->next;
711  p->next = newNode;
712  }
713  }
714  setElementSize(pos, sz + 1);
715  } else {
716  unlockNode(parent);
717  newNode->next = getList(pos);
718  setTreeNode(pos, newNode);
719  setElementSize(pos, sz + 1);
720  }
721  if (UNLIKELY(sz > PruningSize)) {
722  startPruning(pos);
723  } else {
724  unlockNode(pos);
725  }
726  return true;
727  }
728  unlockNode(parent);
729  unlockNode(pos);
730  return false;
731  }
732 
733  bool forceInsertToRoot(Node* newNode) {
734  Position pos;
735  pos.level = pos.index = 0;
736  std::unique_lock<Mutex> lck(
737  levels_[pos.level][pos.index].lock, std::try_to_lock);
738  if (!lck.owns_lock()) {
739  return false;
740  }
741  uint32_t sz = getElementSize(pos);
742  if (sz >= ListTargetSize) {
743  return false;
744  }
745 
746  Node* curList = getList(pos);
747  if (curList == nullptr) {
748  newNode->next = nullptr;
749  setTreeNode(pos, newNode);
750  } else {
751  Node* p = curList;
752  if (p->val <= newNode->val) {
753  newNode->next = curList;
754  setTreeNode(pos, newNode);
755  } else {
756  while (p->next != nullptr && p->next->val >= newNode->val) {
757  p = p->next;
758  }
759  newNode->next = p->next;
760  p->next = newNode;
761  }
762  }
763  setElementSize(pos, sz + 1);
764  return true;
765  }
766 
767  // This function forces the new node inserting to the current position
768  // if the element does not hold the enough nodes. It is safe to
769  // lock just one position to insert, because it won't be the first
770  // node to sustain the heap structure.
771  bool forceInsert(const Position& pos, const T& val, Node* newNode) {
772  if (isRoot(pos)) {
773  return forceInsertToRoot(newNode);
774  }
775 
776  while (true) {
777  std::unique_lock<Mutex> lck(
778  levels_[pos.level][pos.index].lock, std::try_to_lock);
779  if (!lck.owns_lock()) {
780  if (getElementSize(pos) < ListTargetSize && readValue(pos) >= val) {
781  continue;
782  } else {
783  return false;
784  }
785  }
786  T nv = readValue(pos);
787  uint32_t sz = getElementSize(pos);
788  // do not allow the new node to be the first one
789  // do not allow the list size tooooo big
790  if (UNLIKELY(nv < val || sz >= ListTargetSize)) {
791  return false;
792  }
793 
794  Node* p = getList(pos);
795  // find a place to insert the node
796  while (p->next != nullptr && p->next->val > val) {
797  p = p->next;
798  }
799  newNode->next = p->next;
800  p->next = newNode;
801  // do not forget to change the metadata
802  setElementSize(pos, sz + 1);
803  return true;
804  }
805  }
806 
808  Position& cur,
809  const T& val,
811  Position parent, mid;
812  if (cur.level == 0) {
813  return;
814  }
815  // start from the root
816  parent.level = parent.index = 0;
817 
818  while (true) { // binary search
819  mid.level = (cur.level + parent.level) / 2;
820  mid.index = cur.index >> (cur.level - mid.level);
821 
822  T mv = optimisticReadValue(mid, hptr);
823  if (val < mv) {
824  parent = mid;
825  } else {
826  cur = mid;
827  }
828 
829  if (mid.level == 0 || // the root
830  ((parent.level + 1 == cur.level) && parent.level != 0)) {
831  return;
832  }
833  }
834  }
835 
836  // The push keeps the length of each element stable
837  void moundPush(const T& val) {
838  Position cur;
840  Node* newNode = new Node;
841  newNode->val = val;
842  uint32_t seed = folly::Random::rand32() % (1 << 21);
843 
844  while (true) {
845  // shell we go the fast path?
846  bool go_fast_path = false;
847  // chooice the right node to start
848  cur = selectPosition(val, go_fast_path, seed, hptr);
849  if (go_fast_path) {
850  if (LIKELY(forceInsert(cur, val, newNode))) {
851  if (MayBlock) {
853  }
854  return;
855  } else {
856  continue;
857  }
858  }
859 
860  binarySearchPosition(cur, val, hptr);
861  if (LIKELY(regularInsert(cur, val, newNode))) {
862  if (MayBlock) {
864  }
865  return;
866  }
867  }
868  }
869 
870  int popToSharedBuffer(const uint32_t rsize, Node* head) {
871  Position pos;
872  pos.level = pos.index = 0;
873 
874  int num = std::min(rsize, (uint32_t)PopBatch);
875  for (int i = num - 1; i >= 0; i--) {
876  // wait until this block is empty
877  while (shared_buffer_[i].pnode.load(std::memory_order_relaxed) != nullptr)
878  ;
879  shared_buffer_[i].pnode.store(head, std::memory_order_relaxed);
880  head = head->next;
881  }
882  if (num > 0) {
883  top_loc_.store(num - 1, std::memory_order_release);
884  }
885  setTreeNode(pos, head);
886  return rsize - num;
887  }
888 
889  void mergeDown(const Position& pos) {
890  if (isLeaf(pos)) {
891  unlockNode(pos);
892  return;
893  }
894 
895  // acquire locks for L and R and compare
896  Position lchild = leftOf(pos);
897  Position rchild = rightOf(pos);
898  lockNode(lchild);
899  lockNode(rchild);
900  // read values
901  T nv = readValue(pos);
902  T lv = readValue(lchild);
903  T rv = readValue(rchild);
904  if (nv >= lv && nv >= rv) {
905  unlockNode(pos);
906  unlockNode(lchild);
907  unlockNode(rchild);
908  return;
909  }
910 
911  // If two children contains nodes less than the
912  // threshold, we merge two children to the parent
913  // and do merge down on both of them.
914  size_t sum =
915  getElementSize(rchild) + getElementSize(lchild) + getElementSize(pos);
916  if (sum <= MergingSize) {
917  Node* l1 = mergeList(getList(rchild), getList(lchild));
918  setTreeNode(pos, mergeList(l1, getList(pos)));
919  setElementSize(pos, sum);
920  setTreeNode(lchild, nullptr);
921  setElementSize(lchild, 0);
922  setTreeNode(rchild, nullptr);
923  setElementSize(rchild, 0);
924  unlockNode(pos);
925  mergeDown(lchild);
926  mergeDown(rchild);
927  return;
928  }
929  // pull from right
930  if (rv >= lv && rv > nv) {
931  swapList(rchild, pos);
932  unlockNode(pos);
933  unlockNode(lchild);
934  mergeDown(rchild);
935  } else if (lv >= rv && lv > nv) {
936  // pull from left
937  swapList(lchild, pos);
938  unlockNode(pos);
939  unlockNode(rchild);
940  mergeDown(lchild);
941  }
942  }
943 
945  if (isLeaf(pos)) {
946  setElementSize(pos, 0);
947  unlockNode(pos);
948  return true;
949  }
950 
951  // acquire locks for L and R and compare
952  Position lchild = leftOf(pos);
953  Position rchild = rightOf(pos);
954  lockNode(lchild);
955  lockNode(rchild);
956  if (getElementSize(lchild) == 0 && getElementSize(rchild) == 0) {
957  setElementSize(pos, 0);
958  unlockNode(pos);
959  unlockNode(lchild);
960  unlockNode(rchild);
961  return true;
962  } else {
963  // read values
964  T lv = readValue(lchild);
965  T rv = readValue(rchild);
966  if (lv >= rv) {
967  swapList(lchild, pos);
968  setElementSize(lchild, 0);
969  unlockNode(pos);
970  unlockNode(rchild);
971  pos = lchild;
972  } else {
973  swapList(rchild, pos);
974  setElementSize(rchild, 0);
975  unlockNode(pos);
976  unlockNode(lchild);
977  pos = rchild;
978  }
979  return false;
980  }
981  }
982 
983  bool moundPopMany(T& val) {
984  // pop from the root
985  Position pos;
986  pos.level = pos.index = 0;
987  // the root is nullptr, return false
988  Node* head = getList(pos);
989  if (head == nullptr) {
990  unlockNode(pos);
991  return false;
992  }
993 
994  // shared buffer already filled by other threads
995  if (PopBatch > 0 && top_loc_.load(std::memory_order_acquire) >= 0) {
996  unlockNode(pos);
997  return false;
998  }
999 
1000  uint32_t sz = getElementSize(pos);
1001  // get the one node first
1002  val = head->val;
1003  Node* p = head;
1004  head = head->next;
1005  sz--;
1006 
1007  if (PopBatch > 0) {
1008  sz = popToSharedBuffer(sz, head);
1009  } else {
1010  setTreeNode(pos, head);
1011  }
1012 
1013  bool done = false;
1014  if (LIKELY(sz == 0)) {
1015  done = deferSettingRootSize(pos);
1016  } else {
1017  setElementSize(pos, sz);
1018  }
1019 
1020  if (LIKELY(!done)) {
1021  mergeDown(pos);
1022  }
1023 
1024  p->retire();
1025  return true;
1026  }
1027 
1029  auto p = pticket_.fetch_add(1, std::memory_order_acq_rel);
1030  auto loc = getFutexArrayLoc(p);
1031  uint32_t curfutex = futex_array_[loc].load(std::memory_order_acquire);
1032 
1033  while (true) {
1034  uint32_t ready = p << 1; // get the lower 31 bits
1035  // avoid the situation that push has larger ticket already set the value
1036  if (UNLIKELY(
1037  ready + 1 < curfutex ||
1038  ((curfutex > ready) && (curfutex - ready > 0x40000000)))) {
1039  return;
1040  }
1041 
1042  if (futex_array_[loc].compare_exchange_strong(curfutex, ready)) {
1043  if (curfutex &
1044  1) { // One or more consumers may be blocked on this futex
1045  detail::futexWake(&futex_array_[loc]);
1046  }
1047  return;
1048  } else {
1049  curfutex = futex_array_[loc].load(std::memory_order_acquire);
1050  }
1051  }
1052  }
1053 
1054  // This could guarentee the Mound is empty
1056  Position pos;
1057  pos.level = pos.index = 0;
1058  return getElementSize(pos) == 0;
1059  }
1060 
1061  // Return true if the shared buffer is empty
1063  return top_loc_.load(std::memory_order_acquire) < 0;
1064  }
1065 
1067  if (PopBatch > 0) {
1068  return isMoundEmpty() && isSharedBufferEmpty();
1069  }
1070  return isMoundEmpty();
1071  }
1072 
1073  FOLLY_ALWAYS_INLINE bool futexIsReady(const size_t& curticket) {
1074  auto loc = getFutexArrayLoc(curticket);
1075  auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
1076  uint32_t short_cticket = curticket & 0x7FFFFFFF;
1077  uint32_t futex_ready = curfutex >> 1;
1078  // handle unsigned 31 bits overflow
1079  return futex_ready >= short_cticket ||
1080  short_cticket - futex_ready > 0x40000000;
1081  }
1082 
1083  template <typename Clock, typename Duration>
1085  const size_t& curticket,
1086  const std::chrono::time_point<Clock, Duration>& deadline,
1087  const folly::WaitOptions& opt = wait_options()) {
1088  return folly::detail::spin_pause_until(deadline, opt, [=] {
1089  return futexIsReady(curticket);
1091  }
1092 
1093  void tryBlockingPop(const size_t& curticket) {
1094  auto loc = getFutexArrayLoc(curticket);
1095  auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
1096  if (curfutex &
1097  1) {
1098  detail::futexWait(&futex_array_[loc], curfutex);
1099  }
1100  if (trySpinBeforeBlock(
1101  curticket,
1103  return;
1104  }
1105  while (true) {
1106  curfutex = futex_array_[loc].load(std::memory_order_acquire);
1107  if (curfutex &
1108  1) {
1109  detail::futexWait(&futex_array_[loc], curfutex);
1110  } else if (!futexIsReady(curticket)) { // current ticket < pop ticket
1111  uint32_t blocking_futex = curfutex + 1;
1112  if (futex_array_[loc].compare_exchange_strong(
1113  curfutex, blocking_futex)) {
1114  detail::futexWait(&futex_array_[loc], blocking_futex);
1115  }
1116  } else {
1117  return;
1118  }
1119  }
1120  }
1121 
1123  auto ct = cticket_.fetch_add(1, std::memory_order_acq_rel);
1124  // fast path check
1125  if (futexIsReady(ct)) {
1126  return;
1127  }
1128  // Blocking
1129  tryBlockingPop(ct);
1130  }
1131 
1133  if (isMoundEmpty()) {
1134  return false;
1135  }
1136  Position pos;
1137  pos.level = pos.index = 0;
1138 
1139  // lock the root
1140  if (trylockNode(pos)) {
1141  return moundPopMany(val);
1142  }
1143  return false;
1144  }
1145 
1147  return {};
1148  }
1149 
1150  template <typename Clock, typename Duration>
1152  const std::chrono::time_point<Clock, Duration>& deadline,
1153  const folly::WaitOptions& opt = wait_options()) {
1154  // Fast path, by quick check the status
1156  deadline, opt, [=] { return !isEmpty(); })) {
1158  return true;
1160  return false;
1162  break;
1163  }
1164 
1165  // Spinning strategy
1166  while (true) {
1167  auto res =
1168  folly::detail::spin_yield_until(deadline, [=] { return !isEmpty(); });
1170  return true;
1171  } else if (res == folly::detail::spin_result::timeout) {
1172  return false;
1173  }
1174  }
1175  return true;
1176  }
1177 
1179  int get_or = -1;
1180  if (!isSharedBufferEmpty()) {
1181  get_or = top_loc_.fetch_sub(1, std::memory_order_acq_rel);
1182  if (get_or >= 0) {
1183  Node* c = shared_buffer_[get_or].pnode.load(std::memory_order_relaxed);
1184  shared_buffer_[get_or].pnode.store(nullptr, std::memory_order_release);
1185  val = c->val;
1186  c->retire();
1187  return true;
1188  }
1189  }
1190  return false;
1191  }
1192 
1193  size_t getFutexArrayLoc(size_t s) {
1194  return ((s - 1) * Stride) & (NumFutex - 1);
1195  }
1196 
1197  void moundPop(T& val) {
1198  if (MayBlock) {
1199  blockingPopImpl();
1200  }
1201 
1202  if (PopBatch > 0) {
1203  if (tryPopFromSharedBuffer(val)) {
1204  return;
1205  }
1206  }
1207 
1208  while (true) {
1209  if (LIKELY(tryPopFromMound(val))) {
1210  return;
1211  }
1213  if (PopBatch > 0 && tryPopFromSharedBuffer(val)) {
1214  return;
1215  }
1216  }
1217  }
1218 };
1219 
1220 } // namespace folly
void swapList(const Position &a, const Position &b)
Swap two Tree Elements (head, size)
FOLLY_ALWAYS_INLINE void lockNode(const Position &pos)
spin_result spin_yield_until(std::chrono::time_point< Clock, Duration > const &deadline, F f)
Definition: Spin.h:70
bool isHeap(const Position &pos)
Check the first node in TreeElement keeps the heap structure.
FOLLY_ALWAYS_INLINE T optimisticReadValue(const Position &pos, folly::hazptr_holder< Atom > &hptr)
std::atomic< int64_t > sum(0)
static FOLLY_ALWAYS_INLINE folly::WaitOptions wait_options()
void grow(uint32_t btm)
Extend the tree level.
FOLLY_ALWAYS_INLINE Position parentOf(const Position &pos)
Locate the parent node.
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
Position selectPosition(const T &val, bool &path, uint32_t &seed, folly::hazptr_holder< Atom > &hptr)
TODO: optimization.
#define FOLLY_ALWAYS_INLINE
Definition: CPortability.h:151
FOLLY_ALWAYS_INLINE T * get_protected(const Atom< T * > &src) noexcept
Definition: HazptrHolder.h:138
char b
LogLevel max
Definition: LogLevel.cpp:31
void deleteSharedBuffer()
This function is only called by the destructor.
int popToSharedBuffer(const uint32_t rsize, Node *head)
void deleteAllNodes(const Position &pos)
This function is only called by the destructor.
static const int seed
void binarySearchPosition(Position &cur, const T &val, folly::hazptr_holder< Atom > &hptr)
Atom< std::uint32_t > Futex
Definition: Futex.h:51
#define LIKELY(x)
Definition: Likely.h:47
void retire(D deleter={}, hazptr_domain< Atom > &domain=default_hazptr_domain< Atom >())
Definition: HazptrObj.h:229
Mound Element (Tree node), head points to a linked list.
#define Mutex
FOLLY_ALWAYS_INLINE bool isRoot(const Position &pos)
Current element is the root?
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
std::unique_ptr< folly::detail::Futex< Atom >[]> futex_array_
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
MoundElement * levels_[MAX_LEVELS]
Data members.
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
Definition: Futex-inl.h:100
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
FOLLY_ALWAYS_INLINE Position leftOf(const Position &pos)
Locate the left child.
static constexpr size_t NumFutex
Blocking algorithm.
#define FOLLY_NOINLINE
Definition: CPortability.h:142
bool regularInsert(const Position &pos, const T &val, Node *newNode)
bool empty()
Returns true only if the queue was empty during the call.
LogLevel min
Definition: LogLevel.cpp:30
FOLLY_ALWAYS_INLINE Position rightOf(const Position &pos)
Locate the right child.
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
FOLLY_NOINLINE bool trySpinBeforeBlock(const size_t &curticket, const std::chrono::time_point< Clock, Duration > &deadline, const folly::WaitOptions &opt=wait_options())
FOLLY_ALWAYS_INLINE bool isLeaf(const Position &pos)
Current position is leaf?
#define Atom
auto lock(Synchronized< D, M > &synchronized, Args &&...args)
FOLLY_ALWAYS_INLINE bool futexIsReady(const size_t &curticket)
char a
void mergeListTo(const Position &pos, Node *t, const size_t &list_length)
Merge list t to the Element Position.
auto start
bool forceInsert(const Position &pos, const T &val, Node *newNode)
FOLLY_ALWAYS_INLINE bool trylockNode(const Position &pos)
FOLLY_ALWAYS_INLINE T readValue(const Position &pos)
static set< string > s
The pos strcture simplify the implementation.
static uint32_t rand32()
Definition: Random.h:213
#define UNLIKELY(x)
Definition: Likely.h:48
FOLLY_ALWAYS_INLINE void unlockNode(const Position &pos)
char c
spin_result spin_pause_until(std::chrono::time_point< Clock, Duration > const &deadline, WaitOptions const &opt, F f)
Definition: Spin.h:36
folly::Function< void()> parent
Definition: AtFork.cpp:34
FOLLY_NOINLINE bool tryWait(const std::chrono::time_point< Clock, Duration > &deadline, const folly::WaitOptions &opt=wait_options())
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
Definition: Futex-inl.h:107