proxygen
folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom > Class Template Reference

#include <RelaxedConcurrentPriorityQueue.h>

Classes

struct  BufferNode
 Node for shared buffer should be aligned. More...
 
struct  MoundElement
 Mound Element (Tree node), head points to a linked list. More...
 
struct  Node
 List Node structure. More...
 
struct  Position
 The pos strcture simplify the implementation. More...
 

Public Member Functions

 RelaxedConcurrentPriorityQueue ()
 Constructor. More...
 
 ~RelaxedConcurrentPriorityQueue ()
 
void push (const T &val)
 
void pop (T &val)
 
size_t size ()
 
bool empty ()
 Returns true only if the queue was empty during the call. More...
 

Private Member Functions

uint32_t getBottomLevel ()
 
void deleteSharedBuffer ()
 This function is only called by the destructor. More...
 
void deleteAllNodes (const Position &pos)
 This function is only called by the destructor. More...
 
bool isHeap (const Position &pos)
 Check the first node in TreeElement keeps the heap structure. More...
 
FOLLY_ALWAYS_INLINE bool isLeaf (const Position &pos)
 Current position is leaf? More...
 
FOLLY_ALWAYS_INLINE bool isRoot (const Position &pos)
 Current element is the root? More...
 
FOLLY_ALWAYS_INLINE Position parentOf (const Position &pos)
 Locate the parent node. More...
 
FOLLY_ALWAYS_INLINE Position leftOf (const Position &pos)
 Locate the left child. More...
 
FOLLY_ALWAYS_INLINE Position rightOf (const Position &pos)
 Locate the right child. More...
 
FOLLY_ALWAYS_INLINE size_t getElementSize (const Position &pos)
 get the list size in current MoundElement More...
 
FOLLY_ALWAYS_INLINE void setElementSize (const Position &pos, const uint32_t &v)
 Set the size of current MoundElement. More...
 
void grow (uint32_t btm)
 Extend the tree level. More...
 
Position selectPosition (const T &val, bool &path, uint32_t &seed, folly::hazptr_holder< Atom > &hptr)
 TODO: optimization. More...
 
void swapList (const Position &a, const Position &b)
 Swap two Tree Elements (head, size) More...
 
FOLLY_ALWAYS_INLINE void lockNode (const Position &pos)
 
FOLLY_ALWAYS_INLINE void unlockNode (const Position &pos)
 
FOLLY_ALWAYS_INLINE bool trylockNode (const Position &pos)
 
FOLLY_ALWAYS_INLINE T optimisticReadValue (const Position &pos, folly::hazptr_holder< Atom > &hptr)
 
FOLLY_ALWAYS_INLINE T readValue (const Position &pos)
 
FOLLY_ALWAYS_INLINE NodegetList (const Position &pos)
 
FOLLY_ALWAYS_INLINE void setTreeNode (const Position &pos, Node *t)
 
NodemergeList (Node *base, Node *source)
 
void mergeListTo (const Position &pos, Node *t, const size_t &list_length)
 Merge list t to the Element Position. More...
 
bool pruningLeaf (const Position &pos)
 
void startPruning (const Position &pos)
 
bool regularInsert (const Position &pos, const T &val, Node *newNode)
 
bool forceInsertToRoot (Node *newNode)
 
bool forceInsert (const Position &pos, const T &val, Node *newNode)
 
void binarySearchPosition (Position &cur, const T &val, folly::hazptr_holder< Atom > &hptr)
 
void moundPush (const T &val)
 
int popToSharedBuffer (const uint32_t rsize, Node *head)
 
void mergeDown (const Position &pos)
 
bool deferSettingRootSize (Position &pos)
 
bool moundPopMany (T &val)
 
void blockingPushImpl ()
 
FOLLY_ALWAYS_INLINE bool isMoundEmpty ()
 
FOLLY_ALWAYS_INLINE bool isSharedBufferEmpty ()
 
FOLLY_ALWAYS_INLINE bool isEmpty ()
 
FOLLY_ALWAYS_INLINE bool futexIsReady (const size_t &curticket)
 
template<typename Clock , typename Duration >
FOLLY_NOINLINE bool trySpinBeforeBlock (const size_t &curticket, const std::chrono::time_point< Clock, Duration > &deadline, const folly::WaitOptions &opt=wait_options())
 
void tryBlockingPop (const size_t &curticket)
 
void blockingPopImpl ()
 
bool tryPopFromMound (T &val)
 
template<typename Clock , typename Duration >
FOLLY_NOINLINE bool tryWait (const std::chrono::time_point< Clock, Duration > &deadline, const folly::WaitOptions &opt=wait_options())
 
bool tryPopFromSharedBuffer (T &val)
 
size_t getFutexArrayLoc (size_t s)
 
void moundPop (T &val)
 

Static Private Member Functions

static FOLLY_ALWAYS_INLINE folly::WaitOptions wait_options ()
 

Private Attributes

MoundElementlevels_ [MAX_LEVELS]
 Data members. More...
 
Atom< uint32_tbottom_
 
Atom< uint32_tguard_
 
std::unique_ptr< BufferNode[]> shared_buffer_
 
Atom< int > top_loc_
 
std::unique_ptr< folly::detail::Futex< Atom >[]> futex_array_
 
Atom< uint32_tcticket_
 
Atom< uint32_tpticket_
 
Atom< size_t > counter_p_
 
Atom< size_t > counter_c_
 

Static Private Attributes

static constexpr uint32_t MAX_LEVELS = 32
 
static constexpr T MIN_VALUE = std::numeric_limits<T>::min()
 
static constexpr size_t Align = 1u << 7
 
static constexpr int LevelForForceInsert = 3
 
static constexpr int LevelForTraverseParent = 7
 
static constexpr size_t PruningSize = ListTargetSize * 2
 
static constexpr size_t MergingSize = ListTargetSize
 
static constexpr size_t NumFutex = 128
 Blocking algorithm. More...
 
static constexpr size_t Stride = 33
 

Detailed Description

template<typename T, bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
class folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >

Definition at line 125 of file RelaxedConcurrentPriorityQueue.h.

Constructor & Destructor Documentation

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::RelaxedConcurrentPriorityQueue ( )
inline

Constructor.

Definition at line 207 of file RelaxedConcurrentPriorityQueue.h.

References i, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MAX_LEVELS, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::top_loc_, and uint32_t.

208  : cticket_(1), pticket_(1), counter_p_(0), counter_c_(0) {
209  if (MayBlock) {
211  }
212 
213  if (PopBatch > 0) {
214  top_loc_ = -1;
215  shared_buffer_.reset(new BufferNode[PopBatch]);
216  for (size_t i = 0; i < PopBatch; i++) {
217  shared_buffer_[i].pnode = nullptr;
218  }
219  }
220  bottom_.store(0, std::memory_order_relaxed);
221  guard_.store(0, std::memory_order_relaxed);
222  // allocate the root MoundElement and initialize Mound
223  levels_[0] = new MoundElement[1]; // default MM for MoundElement
224  for (uint32_t i = 1; i < MAX_LEVELS; i++) {
225  levels_[i] = nullptr;
226  }
227  }
Atom< std::uint32_t > Futex
Definition: Futex.h:51
std::unique_ptr< folly::detail::Futex< Atom >[]> futex_array_
MoundElement * levels_[MAX_LEVELS]
Data members.
static constexpr size_t NumFutex
Blocking algorithm.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::~RelaxedConcurrentPriorityQueue ( )
inline

Definition at line 229 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deleteAllNodes(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deleteSharedBuffer(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getBottomLevel(), i, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level.

229  {
230  if (PopBatch > 0) {
232  }
233  if (MayBlock) {
234  futex_array_.reset();
235  }
236  Position pos;
237  pos.level = pos.index = 0;
238  deleteAllNodes(pos);
239  // default MM for MoundElement
240  for (int i = getBottomLevel(); i >= 0; i--) {
241  delete[] levels_[i];
242  }
243  }
void deleteSharedBuffer()
This function is only called by the destructor.
void deleteAllNodes(const Position &pos)
This function is only called by the destructor.
std::unique_ptr< folly::detail::Futex< Atom >[]> futex_array_
MoundElement * levels_[MAX_LEVELS]
Data members.

Member Function Documentation

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::binarySearchPosition ( Position cur,
const T val,
folly::hazptr_holder< Atom > &  hptr 
)
inlineprivate

Definition at line 807 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::optimisticReadValue(), parent, and folly::T.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPush().

810  {
811  Position parent, mid;
812  if (cur.level == 0) {
813  return;
814  }
815  // start from the root
816  parent.level = parent.index = 0;
817 
818  while (true) { // binary search
819  mid.level = (cur.level + parent.level) / 2;
820  mid.index = cur.index >> (cur.level - mid.level);
821 
822  T mv = optimisticReadValue(mid, hptr);
823  if (val < mv) {
824  parent = mid;
825  } else {
826  cur = mid;
827  }
828 
829  if (mid.level == 0 || // the root
830  ((parent.level + 1 == cur.level) && parent.level != 0)) {
831  return;
832  }
833  }
834  }
FOLLY_ALWAYS_INLINE T optimisticReadValue(const Position &pos, folly::hazptr_holder< Atom > &hptr)
double val
Definition: String.cpp:273
folly::std T
folly::Function< void()> parent
Definition: AtFork.cpp:34
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::blockingPopImpl ( )
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::blockingPushImpl ( )
inlineprivate

Definition at line 1028 of file RelaxedConcurrentPriorityQueue.h.

References folly::detail::futexWake(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getFutexArrayLoc(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::pticket_, uint32_t, and UNLIKELY.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPush().

1028  {
1029  auto p = pticket_.fetch_add(1, std::memory_order_acq_rel);
1030  auto loc = getFutexArrayLoc(p);
1031  uint32_t curfutex = futex_array_[loc].load(std::memory_order_acquire);
1032 
1033  while (true) {
1034  uint32_t ready = p << 1; // get the lower 31 bits
1035  // avoid the situation that push has larger ticket already set the value
1036  if (UNLIKELY(
1037  ready + 1 < curfutex ||
1038  ((curfutex > ready) && (curfutex - ready > 0x40000000)))) {
1039  return;
1040  }
1041 
1042  if (futex_array_[loc].compare_exchange_strong(curfutex, ready)) {
1043  if (curfutex &
1044  1) { // One or more consumers may be blocked on this futex
1046  }
1047  return;
1048  } else {
1049  curfutex = futex_array_[loc].load(std::memory_order_acquire);
1050  }
1051  }
1052  }
std::unique_ptr< folly::detail::Futex< Atom >[]> futex_array_
#define UNLIKELY(x)
Definition: Likely.h:48
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
Definition: Futex-inl.h:107
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deferSettingRootSize ( Position pos)
inlineprivate

Definition at line 944 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isLeaf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::leftOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::lockNode(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::readValue(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::rightOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::swapList(), folly::T, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::unlockNode().

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany().

944  {
945  if (isLeaf(pos)) {
946  setElementSize(pos, 0);
947  unlockNode(pos);
948  return true;
949  }
950 
951  // acquire locks for L and R and compare
952  Position lchild = leftOf(pos);
953  Position rchild = rightOf(pos);
954  lockNode(lchild);
955  lockNode(rchild);
956  if (getElementSize(lchild) == 0 && getElementSize(rchild) == 0) {
957  setElementSize(pos, 0);
958  unlockNode(pos);
959  unlockNode(lchild);
960  unlockNode(rchild);
961  return true;
962  } else {
963  // read values
964  T lv = readValue(lchild);
965  T rv = readValue(rchild);
966  if (lv >= rv) {
967  swapList(lchild, pos);
968  setElementSize(lchild, 0);
969  unlockNode(pos);
970  unlockNode(rchild);
971  pos = lchild;
972  } else {
973  swapList(rchild, pos);
974  setElementSize(rchild, 0);
975  unlockNode(pos);
976  unlockNode(lchild);
977  pos = rchild;
978  }
979  return false;
980  }
981  }
void swapList(const Position &a, const Position &b)
Swap two Tree Elements (head, size)
FOLLY_ALWAYS_INLINE void lockNode(const Position &pos)
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
folly::std T
FOLLY_ALWAYS_INLINE Position leftOf(const Position &pos)
Locate the left child.
FOLLY_ALWAYS_INLINE Position rightOf(const Position &pos)
Locate the right child.
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
FOLLY_ALWAYS_INLINE bool isLeaf(const Position &pos)
Current position is leaf?
FOLLY_ALWAYS_INLINE T readValue(const Position &pos)
FOLLY_ALWAYS_INLINE void unlockNode(const Position &pos)
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deleteAllNodes ( const Position pos)
inlineprivate

This function is only called by the destructor.

Definition at line 293 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isLeaf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::leftOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::next, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::rightOf(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode().

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::~RelaxedConcurrentPriorityQueue().

293  {
294  if (getElementSize(pos) == 0) {
295  // current list is empty, do not need to check
296  // its children again.
297  return;
298  }
299 
300  Node* curList = getList(pos);
301  setTreeNode(pos, nullptr);
302  while (curList != nullptr) { // reclaim nodes
303  Node* n = curList;
304  curList = curList->next;
305  delete n;
306  }
307 
308  if (!isLeaf(pos)) {
309  deleteAllNodes(leftOf(pos));
310  deleteAllNodes(rightOf(pos));
311  }
312  }
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
void deleteAllNodes(const Position &pos)
This function is only called by the destructor.
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
FOLLY_ALWAYS_INLINE Position leftOf(const Position &pos)
Locate the left child.
Node< Atom > * next() const noexcept
Definition: HazptrTest.cpp:116
FOLLY_ALWAYS_INLINE Position rightOf(const Position &pos)
Locate the right child.
FOLLY_ALWAYS_INLINE bool isLeaf(const Position &pos)
Current position is leaf?
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deleteSharedBuffer ( )
inlineprivate

This function is only called by the destructor.

Definition at line 280 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::top_loc_.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::~RelaxedConcurrentPriorityQueue().

280  {
281  DCHECK(PopBatch > 0);
282  // delete nodes in the buffer
283  int loc = top_loc_.load(std::memory_order_relaxed);
284  while (loc >= 0) {
285  Node* n = shared_buffer_[loc--].pnode.load(std::memory_order_relaxed);
286  delete n;
287  }
288  // delete buffer
289  shared_buffer_.reset();
290  }
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::empty ( )
inline

Returns true only if the queue was empty during the call.

Definition at line 270 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isEmpty().

270  {
271  return isEmpty();
272  }
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsert ( const Position pos,
const T val,
Node newNode 
)
inlineprivate

Definition at line 771 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsertToRoot(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isRoot(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MoundElement::lock, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::next, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::readValue(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), folly::T, uint32_t, UNLIKELY, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPush().

771  {
772  if (isRoot(pos)) {
773  return forceInsertToRoot(newNode);
774  }
775 
776  while (true) {
777  std::unique_lock<Mutex> lck(
778  levels_[pos.level][pos.index].lock, std::try_to_lock);
779  if (!lck.owns_lock()) {
780  if (getElementSize(pos) < ListTargetSize && readValue(pos) >= val) {
781  continue;
782  } else {
783  return false;
784  }
785  }
786  T nv = readValue(pos);
787  uint32_t sz = getElementSize(pos);
788  // do not allow the new node to be the first one
789  // do not allow the list size tooooo big
790  if (UNLIKELY(nv < val || sz >= ListTargetSize)) {
791  return false;
792  }
793 
794  Node* p = getList(pos);
795  // find a place to insert the node
796  while (p->next != nullptr && p->next->val > val) {
797  p = p->next;
798  }
799  newNode->next = p->next;
800  p->next = newNode;
801  // do not forget to change the metadata
802  setElementSize(pos, sz + 1);
803  return true;
804  }
805  }
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
double val
Definition: String.cpp:273
FOLLY_ALWAYS_INLINE bool isRoot(const Position &pos)
Current element is the root?
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
folly::std T
MoundElement * levels_[MAX_LEVELS]
Data members.
Node< Atom > * next() const noexcept
Definition: HazptrTest.cpp:116
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
FOLLY_ALWAYS_INLINE T readValue(const Position &pos)
#define UNLIKELY(x)
Definition: Likely.h:48
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsertToRoot ( Node newNode)
inlineprivate

Definition at line 733 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MoundElement::lock, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::next, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode(), uint32_t, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsert().

733  {
734  Position pos;
735  pos.level = pos.index = 0;
736  std::unique_lock<Mutex> lck(
737  levels_[pos.level][pos.index].lock, std::try_to_lock);
738  if (!lck.owns_lock()) {
739  return false;
740  }
741  uint32_t sz = getElementSize(pos);
742  if (sz >= ListTargetSize) {
743  return false;
744  }
745 
746  Node* curList = getList(pos);
747  if (curList == nullptr) {
748  newNode->next = nullptr;
749  setTreeNode(pos, newNode);
750  } else {
751  Node* p = curList;
752  if (p->val <= newNode->val) {
753  newNode->next = curList;
754  setTreeNode(pos, newNode);
755  } else {
756  while (p->next != nullptr && p->next->val >= newNode->val) {
757  p = p->next;
758  }
759  newNode->next = p->next;
760  p->next = newNode;
761  }
762  }
763  setElementSize(pos, sz + 1);
764  return true;
765  }
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
MoundElement * levels_[MAX_LEVELS]
Data members.
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
Node< Atom > * next() const noexcept
Definition: HazptrTest.cpp:116
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::futexIsReady ( const size_t &  curticket)
inlineprivate

Definition at line 1073 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getFutexArrayLoc(), and uint32_t.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::blockingPopImpl(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryBlockingPop(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::trySpinBeforeBlock().

1073  {
1074  auto loc = getFutexArrayLoc(curticket);
1075  auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
1076  uint32_t short_cticket = curticket & 0x7FFFFFFF;
1077  uint32_t futex_ready = curfutex >> 1;
1078  // handle unsigned 31 bits overflow
1079  return futex_ready >= short_cticket ||
1080  short_cticket - futex_ready > 0x40000000;
1081  }
std::unique_ptr< folly::detail::Futex< Atom >[]> futex_array_
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE size_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize ( const Position pos)
inlineprivate

get the list size in current MoundElement

Definition at line 361 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MoundElement::size.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deferSettingRootSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deleteAllNodes(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsert(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsertToRoot(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isMoundEmpty(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeListTo(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::pruningLeaf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::regularInsert(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::selectPosition(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::swapList().

361  {
362  return levels_[pos.level][pos.index].size.load(std::memory_order_relaxed);
363  }
MoundElement * levels_[MAX_LEVELS]
Data members.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
size_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getFutexArrayLoc ( size_t  s)
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE Node* folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList ( const Position pos)
inlineprivate

Definition at line 485 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MoundElement::head, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deleteAllNodes(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsert(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsertToRoot(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeListTo(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::readValue(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::regularInsert(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::swapList().

485  {
486  return levels_[pos.level][pos.index].head.load(std::memory_order_relaxed);
487  }
MoundElement * levels_[MAX_LEVELS]
Data members.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::grow ( uint32_t  btm)
inlineprivate

Extend the tree level.

Definition at line 373 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getBottomLevel(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::size(), uint32_t, and folly::fibers::yield().

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::selectPosition(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning().

373  {
374  while (true) {
375  if (guard_.fetch_add(1, std::memory_order_acq_rel) == 0) {
376  break;
377  }
378  // someone already expanded the tree
379  if (btm != getBottomLevel()) {
380  return;
381  }
383  }
384  // double check the bottom has not changed yet
385  if (btm != getBottomLevel()) {
386  guard_.store(0, std::memory_order_release);
387  return;
388  }
389  // create and initialize the new level
390  uint32_t tmp_btm = getBottomLevel();
391  uint32_t size = 1 << (tmp_btm + 1);
392  MoundElement* new_level = new MoundElement[size]; // MM
393  levels_[tmp_btm + 1] = new_level;
394  bottom_.store(tmp_btm + 1, std::memory_order_release);
395  guard_.store(0, std::memory_order_release);
396  }
MoundElement * levels_[MAX_LEVELS]
Data members.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isEmpty ( )
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isHeap ( const Position pos)
inlineprivate

Check the first node in TreeElement keeps the heap structure.

Definition at line 315 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isLeaf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::leftOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::readValue(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::rightOf().

315  {
316  if (isLeaf(pos)) {
317  return true;
318  }
319  Position lchild = leftOf(pos);
320  Position rchild = rightOf(pos);
321  return isHeap(lchild) && isHeap(rchild) &&
322  readValue(pos) >= readValue(lchild) &&
323  readValue(pos) >= readValue(rchild);
324  }
bool isHeap(const Position &pos)
Check the first node in TreeElement keeps the heap structure.
FOLLY_ALWAYS_INLINE Position leftOf(const Position &pos)
Locate the left child.
FOLLY_ALWAYS_INLINE Position rightOf(const Position &pos)
Locate the right child.
FOLLY_ALWAYS_INLINE bool isLeaf(const Position &pos)
Current position is leaf?
FOLLY_ALWAYS_INLINE T readValue(const Position &pos)
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isLeaf ( const Position pos)
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isMoundEmpty ( )
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isRoot ( const Position pos)
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isSharedBufferEmpty ( )
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::lockNode ( const Position pos)
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown ( const Position pos)
inlineprivate

Definition at line 889 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isLeaf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::leftOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::lockNode(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::readValue(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::rightOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode(), sum(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::swapList(), folly::T, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::unlockNode().

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany().

889  {
890  if (isLeaf(pos)) {
891  unlockNode(pos);
892  return;
893  }
894 
895  // acquire locks for L and R and compare
896  Position lchild = leftOf(pos);
897  Position rchild = rightOf(pos);
898  lockNode(lchild);
899  lockNode(rchild);
900  // read values
901  T nv = readValue(pos);
902  T lv = readValue(lchild);
903  T rv = readValue(rchild);
904  if (nv >= lv && nv >= rv) {
905  unlockNode(pos);
906  unlockNode(lchild);
907  unlockNode(rchild);
908  return;
909  }
910 
911  // If two children contains nodes less than the
912  // threshold, we merge two children to the parent
913  // and do merge down on both of them.
914  size_t sum =
915  getElementSize(rchild) + getElementSize(lchild) + getElementSize(pos);
916  if (sum <= MergingSize) {
917  Node* l1 = mergeList(getList(rchild), getList(lchild));
918  setTreeNode(pos, mergeList(l1, getList(pos)));
919  setElementSize(pos, sum);
920  setTreeNode(lchild, nullptr);
921  setElementSize(lchild, 0);
922  setTreeNode(rchild, nullptr);
923  setElementSize(rchild, 0);
924  unlockNode(pos);
925  mergeDown(lchild);
926  mergeDown(rchild);
927  return;
928  }
929  // pull from right
930  if (rv >= lv && rv > nv) {
931  swapList(rchild, pos);
932  unlockNode(pos);
933  unlockNode(lchild);
934  mergeDown(rchild);
935  } else if (lv >= rv && lv > nv) {
936  // pull from left
937  swapList(lchild, pos);
938  unlockNode(pos);
939  unlockNode(rchild);
940  mergeDown(lchild);
941  }
942  }
void swapList(const Position &a, const Position &b)
Swap two Tree Elements (head, size)
FOLLY_ALWAYS_INLINE void lockNode(const Position &pos)
std::atomic< int64_t > sum(0)
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
folly::std T
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
FOLLY_ALWAYS_INLINE Position leftOf(const Position &pos)
Locate the left child.
FOLLY_ALWAYS_INLINE Position rightOf(const Position &pos)
Locate the right child.
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
FOLLY_ALWAYS_INLINE bool isLeaf(const Position &pos)
Current position is leaf?
FOLLY_ALWAYS_INLINE T readValue(const Position &pos)
FOLLY_ALWAYS_INLINE void unlockNode(const Position &pos)
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
Node* folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeList ( Node base,
Node source 
)
inlineprivate

Definition at line 494 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::next, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeListTo().

494  {
495  if (base == nullptr) {
496  return source;
497  } else if (source == nullptr) {
498  return base;
499  }
500 
501  Node *res, *p;
502  // choose the head node
503  if (base->val >= source->val) {
504  res = base;
505  base = base->next;
506  p = res;
507  } else {
508  res = source;
509  source = source->next;
510  p = res;
511  }
512 
513  while (base != nullptr && source != nullptr) {
514  if (base->val >= source->val) {
515  p->next = base;
516  base = base->next;
517  } else {
518  p->next = source;
519  source = source->next;
520  }
521  p = p->next;
522  }
523  if (base == nullptr) {
524  p->next = source;
525  } else {
526  p->next = base;
527  }
528  return res;
529  }
Node< Atom > * next() const noexcept
Definition: HazptrTest.cpp:116
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeListTo ( const Position pos,
Node t,
const size_t &  list_length 
)
inlineprivate

Merge list t to the Element Position.

Definition at line 532 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode(), and uint32_t.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning().

532  {
533  Node* head = getList(pos);
534  setTreeNode(pos, mergeList(head, t));
535  uint32_t ns = getElementSize(pos) + list_length;
536  setElementSize(pos, ns);
537  }
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPop ( T val)
inlineprivate

Definition at line 1197 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::blockingPopImpl(), LIKELY, max, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryPopFromMound(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryPopFromSharedBuffer(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryWait().

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::pop().

1197  {
1198  if (MayBlock) {
1199  blockingPopImpl();
1200  }
1201 
1202  if (PopBatch > 0) {
1203  if (tryPopFromSharedBuffer(val)) {
1204  return;
1205  }
1206  }
1207 
1208  while (true) {
1209  if (LIKELY(tryPopFromMound(val))) {
1210  return;
1211  }
1213  if (PopBatch > 0 && tryPopFromSharedBuffer(val)) {
1214  return;
1215  }
1216  }
1217  }
LogLevel max
Definition: LogLevel.cpp:31
#define LIKELY(x)
Definition: Likely.h:47
double val
Definition: String.cpp:273
FOLLY_NOINLINE bool tryWait(const std::chrono::time_point< Clock, Duration > &deadline, const folly::WaitOptions &opt=wait_options())
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany ( T val)
inlineprivate

Definition at line 983 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deferSettingRootSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, LIKELY, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::next, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::popToSharedBuffer(), folly::hazptr_obj_base< T, Atom, D >::retire(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::top_loc_, uint32_t, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::unlockNode(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryPopFromMound().

983  {
984  // pop from the root
985  Position pos;
986  pos.level = pos.index = 0;
987  // the root is nullptr, return false
988  Node* head = getList(pos);
989  if (head == nullptr) {
990  unlockNode(pos);
991  return false;
992  }
993 
994  // shared buffer already filled by other threads
995  if (PopBatch > 0 && top_loc_.load(std::memory_order_acquire) >= 0) {
996  unlockNode(pos);
997  return false;
998  }
999 
1000  uint32_t sz = getElementSize(pos);
1001  // get the one node first
1002  val = head->val;
1003  Node* p = head;
1004  head = head->next;
1005  sz--;
1006 
1007  if (PopBatch > 0) {
1008  sz = popToSharedBuffer(sz, head);
1009  } else {
1010  setTreeNode(pos, head);
1011  }
1012 
1013  bool done = false;
1014  if (LIKELY(sz == 0)) {
1015  done = deferSettingRootSize(pos);
1016  } else {
1017  setElementSize(pos, sz);
1018  }
1019 
1020  if (LIKELY(!done)) {
1021  mergeDown(pos);
1022  }
1023 
1024  p->retire();
1025  return true;
1026  }
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
int popToSharedBuffer(const uint32_t rsize, Node *head)
#define LIKELY(x)
Definition: Likely.h:47
void retire(D deleter={}, hazptr_domain< Atom > &domain=default_hazptr_domain< Atom >())
Definition: HazptrObj.h:229
double val
Definition: String.cpp:273
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
Node< Atom > * next() const noexcept
Definition: HazptrTest.cpp:116
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
FOLLY_ALWAYS_INLINE void unlockNode(const Position &pos)
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPush ( const T val)
inlineprivate

Definition at line 837 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::binarySearchPosition(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::blockingPushImpl(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsert(), LIKELY, folly::Random::rand32(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::regularInsert(), seed, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::selectPosition(), uint32_t, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::push().

837  {
838  Position cur;
840  Node* newNode = new Node;
841  newNode->val = val;
842  uint32_t seed = folly::Random::rand32() % (1 << 21);
843 
844  while (true) {
845  // shell we go the fast path?
846  bool go_fast_path = false;
847  // chooice the right node to start
848  cur = selectPosition(val, go_fast_path, seed, hptr);
849  if (go_fast_path) {
850  if (LIKELY(forceInsert(cur, val, newNode))) {
851  if (MayBlock) {
853  }
854  return;
855  } else {
856  continue;
857  }
858  }
859 
860  binarySearchPosition(cur, val, hptr);
861  if (LIKELY(regularInsert(cur, val, newNode))) {
862  if (MayBlock) {
864  }
865  return;
866  }
867  }
868  }
Position selectPosition(const T &val, bool &path, uint32_t &seed, folly::hazptr_holder< Atom > &hptr)
TODO: optimization.
static const int seed
void binarySearchPosition(Position &cur, const T &val, folly::hazptr_holder< Atom > &hptr)
#define LIKELY(x)
Definition: Likely.h:47
double val
Definition: String.cpp:273
bool regularInsert(const Position &pos, const T &val, Node *newNode)
bool forceInsert(const Position &pos, const T &val, Node *newNode)
static uint32_t rand32()
Definition: Random.h:213
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE T folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::optimisticReadValue ( const Position pos,
folly::hazptr_holder< Atom > &  hptr 
)
inlineprivate

Definition at line 474 of file RelaxedConcurrentPriorityQueue.h.

References folly::hazptr_holder< Atom >::get_protected(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MoundElement::head, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::binarySearchPosition(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::selectPosition().

474  {
475  Node* tmp = hptr.get_protected(levels_[pos.level][pos.index].head);
476  return (tmp == nullptr) ? MIN_VALUE : tmp->val;
477  }
FOLLY_ALWAYS_INLINE T * get_protected(const Atom< T * > &src) noexcept
Definition: HazptrHolder.h:138
MoundElement * levels_[MAX_LEVELS]
Data members.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE Position folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::parentOf ( const Position pos)
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::pop ( T val)
inline
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
int folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::popToSharedBuffer ( const uint32_t  rsize,
Node head 
)
inlineprivate

Definition at line 870 of file RelaxedConcurrentPriorityQueue.h.

References i, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, min, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::next, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::top_loc_, and uint32_t.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany().

870  {
871  Position pos;
872  pos.level = pos.index = 0;
873 
874  int num = std::min(rsize, (uint32_t)PopBatch);
875  for (int i = num - 1; i >= 0; i--) {
876  // wait until this block is empty
877  while (shared_buffer_[i].pnode.load(std::memory_order_relaxed) != nullptr)
878  ;
879  shared_buffer_[i].pnode.store(head, std::memory_order_relaxed);
880  head = head->next;
881  }
882  if (num > 0) {
883  top_loc_.store(num - 1, std::memory_order_release);
884  }
885  setTreeNode(pos, head);
886  return rsize - num;
887  }
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
Node< Atom > * next() const noexcept
Definition: HazptrTest.cpp:116
LogLevel min
Definition: LogLevel.cpp:30
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::pruningLeaf ( const Position pos)
inlineprivate

Definition at line 539 of file RelaxedConcurrentPriorityQueue.h.

References b, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getBottomLevel(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), i, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::unlockNode().

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning().

539  {
540  if (getElementSize(pos) <= PruningSize) {
541  unlockNode(pos);
542  return true;
543  }
544 
545  int b = getBottomLevel();
546  int leaves = 1 << b;
547  int cnodes = 0;
548  for (int i = 0; i < leaves; i++) {
549  Position tmp;
550  tmp.level = b;
551  tmp.index = i;
552  if (getElementSize(tmp) != 0) {
553  cnodes++;
554  }
555  if (cnodes > leaves * 2 / 3) {
556  break;
557  }
558  }
559 
560  if (cnodes <= leaves * 2 / 3) {
561  unlockNode(pos);
562  return true;
563  }
564  return false;
565  }
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
char b
FOLLY_ALWAYS_INLINE void unlockNode(const Position &pos)
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::push ( const T val)
inline
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE T folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::readValue ( const Position pos)
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::regularInsert ( const Position pos,
const T val,
Node newNode 
)
inlineprivate

Definition at line 643 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isRoot(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, LIKELY, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::lockNode(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::next, parent, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::parentOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::readValue(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode(), start, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning(), folly::T, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::trylockNode(), uint32_t, UNLIKELY, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::unlockNode(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPush().

643  {
644  // insert to the root node
645  if (isRoot(pos)) {
646  lockNode(pos);
647  T nv = readValue(pos);
648  if (LIKELY(nv <= val)) {
649  newNode->next = getList(pos);
650  setTreeNode(pos, newNode);
651  uint32_t sz = getElementSize(pos);
652  setElementSize(pos, sz + 1);
653  if (UNLIKELY(sz > PruningSize)) {
654  startPruning(pos);
655  } else {
656  unlockNode(pos);
657  }
658  return true;
659  }
660  unlockNode(pos);
661  return false;
662  }
663 
664  // insert to an inner node
665  Position parent = parentOf(pos);
666  if (!trylockNode(parent)) {
667  return false;
668  }
669  if (!trylockNode(pos)) {
670  unlockNode(parent);
671  return false;
672  }
673  T pv = readValue(parent);
674  T nv = readValue(pos);
675  if (LIKELY(pv > val && nv <= val)) {
676  // improve the accuracy by getting the node(R) with less priority than the
677  // new value from parent level, insert the new node to the parent list
678  // and insert R to the current list.
679  // It only happens at >= LevelForTraverseParent for reducing contention
680  uint32_t sz = getElementSize(pos);
681  if (pos.level >= LevelForTraverseParent) {
682  Node* start = getList(parent);
683  while (start->next != nullptr && start->next->val >= val) {
684  start = start->next;
685  }
686  if (start->next != nullptr) {
687  newNode->next = start->next;
688  start->next = newNode;
689  while (start->next->next != nullptr) {
690  start = start->next;
691  }
692  newNode = start->next;
693  start->next = nullptr;
694  }
695  unlockNode(parent);
696 
697  Node* curList = getList(pos);
698  if (curList == nullptr) {
699  newNode->next = nullptr;
700  setTreeNode(pos, newNode);
701  } else {
702  Node* p = curList;
703  if (p->val <= newNode->val) {
704  newNode->next = curList;
705  setTreeNode(pos, newNode);
706  } else {
707  while (p->next != nullptr && p->next->val >= newNode->val) {
708  p = p->next;
709  }
710  newNode->next = p->next;
711  p->next = newNode;
712  }
713  }
714  setElementSize(pos, sz + 1);
715  } else {
716  unlockNode(parent);
717  newNode->next = getList(pos);
718  setTreeNode(pos, newNode);
719  setElementSize(pos, sz + 1);
720  }
721  if (UNLIKELY(sz > PruningSize)) {
722  startPruning(pos);
723  } else {
724  unlockNode(pos);
725  }
726  return true;
727  }
728  unlockNode(parent);
729  unlockNode(pos);
730  return false;
731  }
FOLLY_ALWAYS_INLINE void lockNode(const Position &pos)
FOLLY_ALWAYS_INLINE Position parentOf(const Position &pos)
Locate the parent node.
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
#define LIKELY(x)
Definition: Likely.h:47
double val
Definition: String.cpp:273
FOLLY_ALWAYS_INLINE bool isRoot(const Position &pos)
Current element is the root?
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
folly::std T
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
Node< Atom > * next() const noexcept
Definition: HazptrTest.cpp:116
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
auto start
FOLLY_ALWAYS_INLINE bool trylockNode(const Position &pos)
FOLLY_ALWAYS_INLINE T readValue(const Position &pos)
#define UNLIKELY(x)
Definition: Likely.h:48
FOLLY_ALWAYS_INLINE void unlockNode(const Position &pos)
folly::Function< void()> parent
Definition: AtFork.cpp:34
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
Position folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::selectPosition ( const T val,
bool &  path,
uint32_t seed,
folly::hazptr_holder< Atom > &  hptr 
)
inlineprivate

TODO: optimization.

Definition at line 406 of file RelaxedConcurrentPriorityQueue.h.

References b, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getBottomLevel(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::grow(), i, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::optimisticReadValue(), seed, shell_builder::steps, uint32_t, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPush().

410  {
411  while (true) {
413  int bound = 1 << b; // number of elements in this level
414  int steps = 1 + b * b; // probe the length
415  ++seed;
416  uint32_t index = seed % bound;
417 
418  for (int i = 0; i < steps; i++) {
419  int loc = (index + i) % bound;
420  Position pos;
421  pos.level = b;
422  pos.index = loc;
423  // the first round, we do the quick check
424  if (optimisticReadValue(pos, hptr) <= val) {
425  path = false;
426  seed = ++loc;
427  return pos;
428  } else if (
429  b > LevelForForceInsert && getElementSize(pos) < ListTargetSize) {
430  // [fast path] conservative implementation
431  // it makes sure every tree element should
432  // have more than the given number of nodes.
433  seed = ++loc;
434  path = true;
435  return pos;
436  }
437  if (b != getBottomLevel()) {
438  break;
439  }
440  }
441  // failed too many times grow
442  if (b == getBottomLevel()) {
443  grow(b);
444  }
445  }
446  }
FOLLY_ALWAYS_INLINE T optimisticReadValue(const Position &pos, folly::hazptr_holder< Atom > &hptr)
void grow(uint32_t btm)
Extend the tree level.
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
char b
static const int seed
double val
Definition: String.cpp:273
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize ( const Position pos,
const uint32_t v 
)
inlineprivate

Set the size of current MoundElement.

Definition at line 366 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MoundElement::size.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deferSettingRootSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsert(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsertToRoot(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeListTo(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::regularInsert(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::swapList().

368  {
369  levels_[pos.level][pos.index].size.store(v, std::memory_order_relaxed);
370  }
MoundElement * levels_[MAX_LEVELS]
Data members.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode ( const Position pos,
Node t 
)
inlineprivate

Definition at line 489 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MoundElement::head, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deleteAllNodes(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::forceInsertToRoot(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeListTo(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::popToSharedBuffer(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::regularInsert(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::swapList().

489  {
490  levels_[pos.level][pos.index].head.store(t, std::memory_order_relaxed);
491  }
MoundElement * levels_[MAX_LEVELS]
Data members.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
size_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::size ( )
inline

Note: size() and empty() are guaranteed to be accurate only if the queue is not changed concurrently. Returns an estimate of the size of the queue

Definition at line 262 of file RelaxedConcurrentPriorityQueue.h.

References c, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::counter_c_, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::counter_p_.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::grow().

262  {
263  DCHECK(SupportsSize);
264  size_t p = counter_p_.load(std::memory_order_acquire);
265  size_t c = counter_c_.load(std::memory_order_acquire);
266  return (p > c) ? p - c : 0;
267  }
char c
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning ( const Position pos)
inlineprivate

Split the current list into two lists, then split the tail list and merge to two children.

Definition at line 569 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getBottomLevel(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::grow(), i, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isLeaf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::leftOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::lockNode(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeListTo(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::next, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::pruningLeaf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::rightOf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), shell_builder::steps, folly::pushmi::detail::t, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::unlockNode().

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::regularInsert().

569  {
570  if (isLeaf(pos) && pruningLeaf(pos)) {
571  return;
572  }
573 
574  // split the list, record the tail
575  Node* pruning_head = getList(pos);
576  int steps = ListTargetSize; // keep in the original list
577  for (int i = 0; i < steps - 1; i++) {
578  pruning_head = pruning_head->next;
579  }
580  Node* t = pruning_head;
581  pruning_head = pruning_head->next;
582  t->next = nullptr;
583  int tail_length = getElementSize(pos) - steps;
584  setElementSize(pos, steps);
585 
586  // split the tail list into two lists
587  // evenly merge to two children
588  if (pos.level != getBottomLevel()) {
589  // split the rest into two lists
590  int left_length = (tail_length + 1) / 2;
591  int right_length = tail_length - left_length;
592  Node *to_right, *to_left = pruning_head;
593  for (int i = 0; i < left_length - 1; i++) {
594  pruning_head = pruning_head->next;
595  }
596  to_right = pruning_head->next;
597  pruning_head->next = nullptr;
598 
599  Position lchild = leftOf(pos);
600  Position rchild = rightOf(pos);
601  if (left_length != 0) {
602  lockNode(lchild);
603  mergeListTo(lchild, to_left, left_length);
604  }
605  if (right_length != 0) {
606  lockNode(rchild);
607  mergeListTo(rchild, to_right, right_length);
608  }
609  unlockNode(pos);
610  if (left_length != 0 && getElementSize(lchild) > PruningSize) {
611  startPruning(lchild);
612  } else if (left_length != 0) {
613  unlockNode(lchild);
614  }
615  if (right_length != 0 && getElementSize(rchild) > PruningSize) {
616  startPruning(rchild);
617  } else if (right_length != 0) {
618  unlockNode(rchild);
619  }
620  } else { // time to grow the Mound
621  grow(pos.level);
622  // randomly choose a child to insert
623  if (steps % 2 == 1) {
624  Position rchild = rightOf(pos);
625  lockNode(rchild);
626  mergeListTo(rchild, pruning_head, tail_length);
627  unlockNode(pos);
628  unlockNode(rchild);
629  } else {
630  Position lchild = leftOf(pos);
631  lockNode(lchild);
632  mergeListTo(lchild, pruning_head, tail_length);
633  unlockNode(pos);
634  unlockNode(lchild);
635  }
636  }
637  }
FOLLY_ALWAYS_INLINE void lockNode(const Position &pos)
void grow(uint32_t btm)
Extend the tree level.
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
FOLLY_ALWAYS_INLINE Position leftOf(const Position &pos)
Locate the left child.
Node< Atom > * next() const noexcept
Definition: HazptrTest.cpp:116
FOLLY_ALWAYS_INLINE Position rightOf(const Position &pos)
Locate the right child.
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
FOLLY_ALWAYS_INLINE bool isLeaf(const Position &pos)
Current position is leaf?
void mergeListTo(const Position &pos, Node *t, const size_t &list_length)
Merge list t to the Element Position.
FOLLY_ALWAYS_INLINE void unlockNode(const Position &pos)
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::swapList ( const Position a,
const Position b 
)
inlineprivate

Swap two Tree Elements (head, size)

Definition at line 449 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getList(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setElementSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::setTreeNode(), and uint32_t.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deferSettingRootSize(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown().

449  {
450  Node* tmp = getList(a);
451  setTreeNode(a, getList(b));
452  setTreeNode(b, tmp);
453 
454  // need to swap the tree node meta-data
455  uint32_t sa = getElementSize(a);
456  uint32_t sb = getElementSize(b);
457  setElementSize(a, sb);
458  setElementSize(b, sa);
459  }
FOLLY_ALWAYS_INLINE size_t getElementSize(const Position &pos)
get the list size in current MoundElement
char b
FOLLY_ALWAYS_INLINE Node * getList(const Position &pos)
FOLLY_ALWAYS_INLINE void setTreeNode(const Position &pos, Node *t)
FOLLY_ALWAYS_INLINE void setElementSize(const Position &pos, const uint32_t &v)
Set the size of current MoundElement.
char a
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryBlockingPop ( const size_t &  curticket)
inlineprivate

The last round consumers are still waiting, go to sleep

Spin until the push ticket is ready

The last round consumers are still waiting, go to sleep

Definition at line 1093 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::futexIsReady(), folly::detail::futexWait(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::getFutexArrayLoc(), max, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::trySpinBeforeBlock(), and uint32_t.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::blockingPopImpl().

1093  {
1094  auto loc = getFutexArrayLoc(curticket);
1095  auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
1096  if (curfutex &
1097  1) {
1098  detail::futexWait(&futex_array_[loc], curfutex);
1099  }
1100  if (trySpinBeforeBlock(
1101  curticket,
1103  return;
1104  }
1105  while (true) {
1106  curfutex = futex_array_[loc].load(std::memory_order_acquire);
1107  if (curfutex &
1108  1) {
1109  detail::futexWait(&futex_array_[loc], curfutex);
1110  } else if (!futexIsReady(curticket)) { // current ticket < pop ticket
1111  uint32_t blocking_futex = curfutex + 1;
1112  if (futex_array_[loc].compare_exchange_strong(
1113  curfutex, blocking_futex)) {
1114  detail::futexWait(&futex_array_[loc], blocking_futex);
1115  }
1116  } else {
1117  return;
1118  }
1119  }
1120  }
LogLevel max
Definition: LogLevel.cpp:31
std::unique_ptr< folly::detail::Futex< Atom >[]> futex_array_
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
Definition: Futex-inl.h:100
FOLLY_NOINLINE bool trySpinBeforeBlock(const size_t &curticket, const std::chrono::time_point< Clock, Duration > &deadline, const folly::WaitOptions &opt=wait_options())
FOLLY_ALWAYS_INLINE bool futexIsReady(const size_t &curticket)
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryPopFromMound ( T val)
inlineprivate

Definition at line 1132 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isMoundEmpty(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::trylockNode().

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPop().

1132  {
1133  if (isMoundEmpty()) {
1134  return false;
1135  }
1136  Position pos;
1137  pos.level = pos.index = 0;
1138 
1139  // lock the root
1140  if (trylockNode(pos)) {
1141  return moundPopMany(val);
1142  }
1143  return false;
1144  }
double val
Definition: String.cpp:273
FOLLY_ALWAYS_INLINE bool trylockNode(const Position &pos)
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryPopFromSharedBuffer ( T val)
inlineprivate

Definition at line 1178 of file RelaxedConcurrentPriorityQueue.h.

References c, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isSharedBufferEmpty(), folly::hazptr_obj_base< T, Atom, D >::retire(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::top_loc_, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Node::val.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPop().

1178  {
1179  int get_or = -1;
1180  if (!isSharedBufferEmpty()) {
1181  get_or = top_loc_.fetch_sub(1, std::memory_order_acq_rel);
1182  if (get_or >= 0) {
1183  Node* c = shared_buffer_[get_or].pnode.load(std::memory_order_relaxed);
1184  shared_buffer_[get_or].pnode.store(nullptr, std::memory_order_release);
1185  val = c->val;
1186  c->retire();
1187  return true;
1188  }
1189  }
1190  return false;
1191  }
void retire(D deleter={}, hazptr_domain< Atom > &domain=default_hazptr_domain< Atom >())
Definition: HazptrObj.h:229
double val
Definition: String.cpp:273
char c
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
template<typename Clock , typename Duration >
FOLLY_NOINLINE bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::trySpinBeforeBlock ( const size_t &  curticket,
const std::chrono::time_point< Clock, Duration > &  deadline,
const folly::WaitOptions opt = wait_options() 
)
inlineprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
template<typename Clock , typename Duration >
FOLLY_NOINLINE bool folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::tryWait ( const std::chrono::time_point< Clock, Duration > &  deadline,
const folly::WaitOptions opt = wait_options() 
)
inlineprivate

Definition at line 1151 of file RelaxedConcurrentPriorityQueue.h.

References folly::detail::advance, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::isEmpty(), folly::detail::spin_pause_until(), folly::detail::spin_yield_until(), folly::detail::success, and folly::detail::timeout.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPop().

1153  {
1154  // Fast path, by quick check the status
1156  deadline, opt, [=] { return !isEmpty(); })) {
1158  return true;
1160  return false;
1162  break;
1163  }
1164 
1165  // Spinning strategy
1166  while (true) {
1167  auto res =
1168  folly::detail::spin_yield_until(deadline, [=] { return !isEmpty(); });
1170  return true;
1171  } else if (res == folly::detail::spin_result::timeout) {
1172  return false;
1173  }
1174  }
1175  return true;
1176  }
spin_result spin_yield_until(std::chrono::time_point< Clock, Duration > const &deadline, F f)
Definition: Spin.h:70
spin_result spin_pause_until(std::chrono::time_point< Clock, Duration > const &deadline, WaitOptions const &opt, F f)
Definition: Spin.h:36
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
FOLLY_ALWAYS_INLINE void folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::unlockNode ( const Position pos)
inlineprivate

Definition at line 465 of file RelaxedConcurrentPriorityQueue.h.

References folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::index, folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Position::level, and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MoundElement::lock.

Referenced by folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::deferSettingRootSize(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::mergeDown(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::moundPopMany(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::pruningLeaf(), folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::regularInsert(), and folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::startPruning().

465  {
466  levels_[pos.level][pos.index].lock.unlock();
467  }
MoundElement * levels_[MAX_LEVELS]
Data members.
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
static FOLLY_ALWAYS_INLINE folly::WaitOptions folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::wait_options ( )
inlinestaticprivate

Definition at line 1146 of file RelaxedConcurrentPriorityQueue.h.

1146  {
1147  return {};
1148  }

Member Data Documentation

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr size_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Align = 1u << 7
staticprivate

Definition at line 132 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
Atom<uint32_t> folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::bottom_
private

Definition at line 183 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
Atom<size_t> folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::counter_c_
private
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
Atom<size_t> folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::counter_p_
private
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
Atom<uint32_t> folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::cticket_
private
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
std::unique_ptr<folly::detail::Futex<Atom>[]> folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::futex_array_
private

Definition at line 197 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
Atom<uint32_t> folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::guard_
private

Definition at line 185 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr int folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::LevelForForceInsert = 3
staticprivate

Definition at line 133 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr int folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::LevelForTraverseParent = 7
staticprivate

Definition at line 134 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
MoundElement* folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::levels_[MAX_LEVELS]
private

Data members.

Definition at line 181 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr uint32_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MAX_LEVELS = 32
staticprivate
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr size_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MergingSize = ListTargetSize
staticprivate

Definition at line 147 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr T folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::MIN_VALUE = std::numeric_limits<T>::min()
staticprivate

Definition at line 129 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr size_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::NumFutex = 128
staticprivate

Blocking algorithm.

Definition at line 194 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr size_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::PruningSize = ListTargetSize * 2
staticprivate

Definition at line 142 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
Atom<uint32_t> folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::pticket_
private
template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
std::unique_ptr<BufferNode[]> folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::shared_buffer_
private

Definition at line 189 of file RelaxedConcurrentPriorityQueue.h.

template<typename T , bool MayBlock = false, bool SupportsSize = false, size_t PopBatch = 16, size_t ListTargetSize = 25, typename Mutex = folly::SpinLock, template< typename > class Atom = std::atomic>
constexpr size_t folly::RelaxedConcurrentPriorityQueue< T, MayBlock, SupportsSize, PopBatch, ListTargetSize, Mutex, Atom >::Stride = 33
staticprivate

Definition at line 196 of file RelaxedConcurrentPriorityQueue.h.


The documentation for this class was generated from the following file: