18 #include <glog/logging.h> 33 template <
typename>
class Atom = std::atomic,
38 CHECK_GT(numPriorities, 0);
40 for (
size_t i = 0;
i < numPriorities;
i++) {
56 CHECK_LT(queue,
queues_.size());
63 std::chrono::milliseconds timeout) {
65 CHECK_LT(queue,
queues_.size());
66 return queues_.at(queue).tryWriteUntil(
72 if (q.readIfNotEmpty(item)) {
80 return queues_[priority].readIfNotEmpty(item);
84 size_t total_size = 0;
89 total_size += std::max<ssize_t>(0, q.size());
95 size_t total_size = 0;
100 total_size += std::max<ssize_t>(0, q.sizeGuess());
111 std::vector<folly::MPMCQueue<T, Atom, Dynamic>>
queues_;
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
—— Concurrent Priority Queue Implementation ——
bool writeWithPriority(T &&item, size_t priority)
size_t getNumPriorities()
bool readWithPriority(T &item, size_t priority)
std::vector< folly::MPMCQueue< T, Atom, Dynamic > > queues_
PriorityMPMCQueue(size_t numPriorities, size_t capacity)
bool writeWithPriority(T &&item, size_t priority, std::chrono::milliseconds timeout)
bool isEmpty() const
Returns true if there are no items available for dequeue.