proxygen
ProducerConsumerQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2012-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 // @author Bo Hu (bhu@fb.com)
18 // @author Jordan DeLong (delong.j@fb.com)
19 
20 #pragma once
21 
22 #include <atomic>
23 #include <cassert>
24 #include <cstdlib>
25 #include <memory>
26 #include <stdexcept>
27 #include <type_traits>
28 #include <utility>
29 
31 
32 namespace folly {
33 
34 /*
35  * ProducerConsumerQueue is a one producer and one consumer queue
36  * without locks.
37  */
38 template <class T>
40  typedef T value_type;
41 
44 
45  // size must be >= 2.
46  //
47  // Also, note that the number of usable slots in the queue at any
48  // given time is actually (size-1), so if you start with an empty queue,
49  // isFull() will return true after size-1 insertions.
51  : size_(size),
52  records_(static_cast<T*>(std::malloc(sizeof(T) * size))),
53  readIndex_(0),
54  writeIndex_(0) {
55  assert(size >= 2);
56  if (!records_) {
57  throw std::bad_alloc();
58  }
59  }
60 
62  // We need to destruct anything that may still exist in our queue.
63  // (No real synchronization needed at destructor time: only one
64  // thread can be doing this.)
66  size_t readIndex = readIndex_;
67  size_t endIndex = writeIndex_;
68  while (readIndex != endIndex) {
69  records_[readIndex].~T();
70  if (++readIndex == size_) {
71  readIndex = 0;
72  }
73  }
74  }
75 
77  }
78 
79  template <class... Args>
80  bool write(Args&&... recordArgs) {
81  auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
82  auto nextRecord = currentWrite + 1;
83  if (nextRecord == size_) {
84  nextRecord = 0;
85  }
86  if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
87  new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
88  writeIndex_.store(nextRecord, std::memory_order_release);
89  return true;
90  }
91 
92  // queue is full
93  return false;
94  }
95 
96  // move (or copy) the value at the front of the queue to given variable
97  bool read(T& record) {
98  auto const currentRead = readIndex_.load(std::memory_order_relaxed);
99  if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
100  // queue is empty
101  return false;
102  }
103 
104  auto nextRecord = currentRead + 1;
105  if (nextRecord == size_) {
106  nextRecord = 0;
107  }
108  record = std::move(records_[currentRead]);
109  records_[currentRead].~T();
110  readIndex_.store(nextRecord, std::memory_order_release);
111  return true;
112  }
113 
114  // pointer to the value at the front of the queue (for use in-place) or
115  // nullptr if empty.
116  T* frontPtr() {
117  auto const currentRead = readIndex_.load(std::memory_order_relaxed);
118  if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
119  // queue is empty
120  return nullptr;
121  }
122  return &records_[currentRead];
123  }
124 
125  // queue must not be empty
126  void popFront() {
127  auto const currentRead = readIndex_.load(std::memory_order_relaxed);
128  assert(currentRead != writeIndex_.load(std::memory_order_acquire));
129 
130  auto nextRecord = currentRead + 1;
131  if (nextRecord == size_) {
132  nextRecord = 0;
133  }
134  records_[currentRead].~T();
135  readIndex_.store(nextRecord, std::memory_order_release);
136  }
137 
138  bool isEmpty() const {
139  return readIndex_.load(std::memory_order_acquire) ==
140  writeIndex_.load(std::memory_order_acquire);
141  }
142 
143  bool isFull() const {
144  auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
145  if (nextRecord == size_) {
146  nextRecord = 0;
147  }
148  if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
149  return false;
150  }
151  // queue is full
152  return true;
153  }
154 
155  // * If called by consumer, then true size may be more (because producer may
156  // be adding items concurrently).
157  // * If called by producer, then true size may be less (because consumer may
158  // be removing items concurrently).
159  // * It is undefined to call this from any other thread.
160  size_t sizeGuess() const {
161  int ret = writeIndex_.load(std::memory_order_acquire) -
162  readIndex_.load(std::memory_order_acquire);
163  if (ret < 0) {
164  ret += size_;
165  }
166  return ret;
167  }
168 
169  // maximum number of items in the queue.
170  size_t capacity() const {
171  return size_ - 1;
172  }
173 
174  private:
175  using AtomicIndex = std::atomic<unsigned int>;
176 
179  T* const records_;
180 
183 
185 };
186 
187 } // namespace folly
char pad1_[hardware_destructive_interference_size-sizeof(AtomicIndex)]
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
folly::std T
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
constexpr std::size_t hardware_destructive_interference_size
Definition: Align.h:107
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
ProducerConsumerQueue(const ProducerConsumerQueue &)=delete
std::atomic< unsigned int > AtomicIndex
static const char *const value
Definition: Conv.cpp:50
ProducerConsumerQueue & operator=(const ProducerConsumerQueue &)=delete
void free()
folly::std enable_if::typetoAppendDelimStrImpl const Delimiter, const Tv, Tgtresult sizeof(Ts) >
bool write(Args &&...recordArgs)
char pad0_[hardware_destructive_interference_size]