proxygen
PriorityMPMCQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <glog/logging.h>
19 #include <algorithm>
20 #include <vector>
21 
22 #include <folly/MPMCQueue.h>
23 
24 namespace folly {
25 
30 
31 template <
32  typename T,
33  template <typename> class Atom = std::atomic,
34  bool Dynamic = false>
36  public:
37  PriorityMPMCQueue(size_t numPriorities, size_t capacity) {
38  CHECK_GT(numPriorities, 0);
39  queues_.reserve(numPriorities);
40  for (size_t i = 0; i < numPriorities; i++) {
41  queues_.emplace_back(capacity);
42  }
43  }
44 
45  size_t getNumPriorities() {
46  return queues_.size();
47  }
48 
49  // Add at medium priority by default
50  bool write(T&& item) {
51  return writeWithPriority(std::move(item), getNumPriorities() / 2);
52  }
53 
54  bool writeWithPriority(T&& item, size_t priority) {
55  size_t queue = std::min(getNumPriorities() - 1, priority);
56  CHECK_LT(queue, queues_.size());
57  return queues_.at(queue).write(std::move(item));
58  }
59 
61  T&& item,
62  size_t priority,
63  std::chrono::milliseconds timeout) {
64  size_t queue = std::min(getNumPriorities() - 1, priority);
65  CHECK_LT(queue, queues_.size());
66  return queues_.at(queue).tryWriteUntil(
67  std::chrono::steady_clock::now() + timeout, std::move(item));
68  }
69 
70  bool read(T& item) {
71  for (auto& q : queues_) {
72  if (q.readIfNotEmpty(item)) {
73  return true;
74  }
75  }
76  return false;
77  }
78 
79  bool readWithPriority(T& item, size_t priority) {
80  return queues_[priority].readIfNotEmpty(item);
81  }
82 
83  size_t size() const {
84  size_t total_size = 0;
85  for (auto& q : queues_) {
86  // MPMCQueue can have a negative size if there are pending readers.
87  // Since we don't expose a blocking interface this shouldn't happen,
88  // But just in case we put a floor at 0
89  total_size += std::max<ssize_t>(0, q.size());
90  }
91  return total_size;
92  }
93 
94  size_t sizeGuess() const {
95  size_t total_size = 0;
96  for (auto& q : queues_) {
97  // MPMCQueue can have a negative size if there are pending readers.
98  // Since we don't expose a blocking interface this shouldn't happen,
99  // But just in case we put a floor at 0
100  total_size += std::max<ssize_t>(0, q.sizeGuess());
101  }
102  return total_size;
103  }
104 
106  bool isEmpty() const {
107  return size() == 0;
108  }
109 
110  private:
111  std::vector<folly::MPMCQueue<T, Atom, Dynamic>> queues_;
112 };
113 
114 } // namespace folly
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
bool writeWithPriority(T &&item, size_t priority)
LogLevel min
Definition: LogLevel.cpp:30
bool readWithPriority(T &item, size_t priority)
#define Atom
std::vector< folly::MPMCQueue< T, Atom, Dynamic > > queues_
PriorityMPMCQueue(size_t numPriorities, size_t capacity)
bool writeWithPriority(T &&item, size_t priority, std::chrono::milliseconds timeout)
bool isEmpty() const
Returns true if there are no items available for dequeue.