proxygen
PriorityLifoSemMPMCQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Executor.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/Range.h>
24 #include <glog/logging.h>
25 
26 namespace folly {
27 
28 template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
30  public:
31  // Note A: The queue pre-allocates all memory for max_capacity
32  // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
33  // MID_PRI and HI_PRI are treated at the same priority level.
34  PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
35  queues_.reserve(numPriorities);
36  for (int8_t i = 0; i < numPriorities; i++) {
37  queues_.emplace_back(max_capacity);
38  }
39  }
40 
42  CHECK_LT(capacities.size(), 256) << "At most 255 priorities supported";
43 
44  queues_.reserve(capacities.size());
45  for (auto capacity : capacities) {
46  queues_.emplace_back(capacity);
47  }
48  }
49 
51  return queues_.size();
52  }
53 
54  // Add at medium priority by default
55  BlockingQueueAddResult add(T item) override {
57  }
58 
59  BlockingQueueAddResult addWithPriority(T item, int8_t priority) override {
60  int mid = getNumPriorities() / 2;
61  size_t queue = priority < 0
62  ? std::max(0, mid + priority)
63  : std::min(getNumPriorities() - 1, mid + priority);
64  CHECK_LT(queue, queues_.size());
65  switch (kBehavior) { // static
67  if (!queues_[queue].write(std::move(item))) {
68  throw QueueFullException("LifoSemMPMCQueue full, can't add item");
69  }
70  break;
72  queues_[queue].blockingWrite(std::move(item));
73  break;
74  }
75  return sem_.post();
76  }
77 
78  T take() override {
79  T item;
80  while (true) {
81  if (nonBlockingTake(item)) {
82  return item;
83  }
84  sem_.wait();
85  }
86  }
87 
88  folly::Optional<T> try_take_for(std::chrono::milliseconds time) override {
89  T item;
90  while (true) {
91  if (nonBlockingTake(item)) {
92  return std::move(item);
93  }
94  if (!sem_.try_wait_for(time)) {
95  return folly::none;
96  }
97  }
98  }
99 
100  bool nonBlockingTake(T& item) {
101  for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
102  if (it->readIfNotEmpty(item)) {
103  return true;
104  }
105  }
106  return false;
107  }
108 
109  size_t size() override {
110  size_t size = 0;
111  for (auto& q : queues_) {
112  size += q.size();
113  }
114  return size;
115  }
116 
117  size_t sizeGuess() const {
118  size_t size = 0;
119  for (auto& q : queues_) {
120  size += q.sizeGuess();
121  }
122  return size;
123  }
124 
125  private:
127  std::vector<folly::MPMCQueue<T>> queues_;
128 };
129 
130 } // namespace folly
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
BlockingQueueAddResult add(T item) override
static const int8_t MID_PRI
Definition: Executor.h:49
LogLevel max
Definition: LogLevel.cpp:31
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
bool post()
Silently saturates if value is already 2^32-1.
Definition: LifoSem.h:361
constexpr size_type size() const
Definition: Range.h:431
std::vector< folly::MPMCQueue< T > > queues_
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
LogLevel min
Definition: LogLevel.cpp:30
PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity)
PriorityLifoSemMPMCQueue(folly::Range< const size_t * > capacities)
folly::Optional< T > try_take_for(std::chrono::milliseconds time) override
bool try_wait_for(const std::chrono::duration< Rep, Period > &timeout)
Definition: LifoSem.h:472
BlockingQueueAddResult addWithPriority(T item, int8_t priority) override
std::chrono::nanoseconds time()
constexpr None none
Definition: Optional.h:87