proxygen
FlatCombiningPriorityQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <atomic>
20 #include <chrono>
21 #include <memory>
22 #include <mutex>
23 #include <queue>
24 
25 #include <folly/Optional.h>
26 #include <folly/detail/Futex.h>
28 #include <glog/logging.h>
29 
30 namespace folly {
31 
73 
74 template <
75  typename T,
76  typename PriorityQueue = std::priority_queue<T>,
77  typename Mutex = std::mutex,
78  template <typename> class Atom = std::atomic>
79 class FlatCombiningPriorityQueue
80  : public folly::FlatCombining<
81  FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>,
82  Mutex,
83  Atom> {
84  using FCPQ = FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>;
86 
87  public:
88  template <
89  typename... PQArgs,
90  typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
91  explicit FlatCombiningPriorityQueue(
92  // Concurrent priority queue parameter
93  const size_t maxSize = 0,
94  // Flat combining parameters
95  const bool dedicated = true,
96  const uint32_t numRecs = 0,
97  const uint32_t maxOps = 0,
98  // (Sequential) PriorityQueue Parameters
99  PQArgs... args)
100  : FC(dedicated, numRecs, maxOps),
101  maxSize_(maxSize),
102  pq_(std::forward<PQArgs>(args)...) {}
103 
105  bool empty() const {
106  bool res;
107  auto fn = [&] { res = pq_.empty(); };
108  const_cast<FCPQ*>(this)->requestFC(fn);
109  return res;
110  }
111 
113  size_t size() const {
114  size_t res;
115  auto fn = [&] { res = pq_.size(); };
116  const_cast<FCPQ*>(this)->requestFC(fn);
117  return res;
118  }
119 
125  bool try_push(const T& val) {
126  return try_push_impl(
128  }
129 
134  bool try_pop(T& val) {
135  return try_pop_impl(
137  }
138 
143  bool try_peek(T& val) {
144  return try_peek_impl(
146  }
147 
151  void push(const T& val) {
152  try_push_impl(
154  }
155 
159  void pop(T& val) {
160  try_pop_impl(
162  }
163 
167  void peek(T& val) {
168  try_peek_impl(
170  }
171 
172  folly::Optional<T> try_pop() {
173  T val;
174  if (try_pop(val)) {
175  return std::move(val);
176  }
177  return folly::none;
178  }
179 
180  folly::Optional<T> try_peek() {
181  T val;
182  if (try_peek(val)) {
183  return std::move(val);
184  }
185  return folly::none;
186  }
187 
188  template <typename Rep, typename Period>
189  folly::Optional<T> try_pop_for(
190  const std::chrono::duration<Rep, Period>& timeout) {
191  T val;
192  if (try_pop(val) ||
193  try_pop_impl(val, std::chrono::steady_clock::now() + timeout)) {
194  return std::move(val);
195  }
196  return folly::none;
197  }
198 
199  template <typename Rep, typename Period>
200  bool try_push_for(
201  const T& val,
202  const std::chrono::duration<Rep, Period>& timeout) {
203  return (
204  try_push(val) ||
205  try_push_impl(val, std::chrono::steady_clock::now() + timeout));
206  }
207 
208  template <typename Rep, typename Period>
209  folly::Optional<T> try_peek_for(
210  const std::chrono::duration<Rep, Period>& timeout) {
211  T val;
212  if (try_peek(val) ||
213  try_peek_impl(val, std::chrono::steady_clock::now() + timeout)) {
214  return std::move(val);
215  }
216  return folly::none;
217  }
218 
219  template <typename Clock, typename Duration>
220  folly::Optional<T> try_pop_until(
221  const std::chrono::time_point<Clock, Duration>& deadline) {
222  T val;
223  if (try_pop_impl(val, deadline)) {
224  return std::move(val);
225  }
226  return folly::none;
227  }
228 
229  template <typename Clock, typename Duration>
230  bool try_push_until(
231  const T& val,
232  const std::chrono::time_point<Clock, Duration>& deadline) {
233  return try_push_impl(val, deadline);
234  }
235 
236  template <typename Clock, typename Duration>
237  folly::Optional<T> try_peek_until(
238  const std::chrono::time_point<Clock, Duration>& deadline) {
239  T val;
240  if (try_peek_impl(val, deadline)) {
241  return std::move(val);
242  }
243  return folly::none;
244  }
245 
246  private:
247  size_t maxSize_;
248  PriorityQueue pq_;
249  detail::Futex<Atom> empty_{};
250  detail::Futex<Atom> full_{};
251 
252  bool isTrue(detail::Futex<Atom>& futex) {
253  return futex.load(std::memory_order_relaxed) != 0;
254  }
255 
256  void setFutex(detail::Futex<Atom>& futex, uint32_t val) {
257  futex.store(val, std::memory_order_relaxed);
258  }
259 
260  bool futexSignal(detail::Futex<Atom>& futex) {
261  if (isTrue(futex)) {
262  setFutex(futex, 0);
263  return true;
264  } else {
265  return false;
266  }
267  }
268 
269  template <typename Clock, typename Duration>
270  bool try_push_impl(
271  const T& val,
272  const std::chrono::time_point<Clock, Duration>& when);
273 
274  template <typename Clock, typename Duration>
275  bool try_pop_impl(
276  T& val,
277  const std::chrono::time_point<Clock, Duration>& when);
278 
279  template <typename Clock, typename Duration>
280  bool try_peek_impl(
281  T& val,
282  const std::chrono::time_point<Clock, Duration>& when);
283 };
284 
286 
287 template <
288  typename T,
289  typename PriorityQueue,
290  typename Mutex,
291  template <typename> class Atom>
292 template <typename Clock, typename Duration>
293 inline bool
294 FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_push_impl(
295  const T& val,
296  const std::chrono::time_point<Clock, Duration>& when) {
297  while (true) {
298  bool res;
299  bool wake;
300 
301  auto fn = [&] {
302  if (maxSize_ > 0 && pq_.size() == maxSize_) {
303  setFutex(full_, 1);
304  res = false;
305  return;
306  }
307  DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
308  try {
309  pq_.push(val);
310  wake = futexSignal(empty_);
311  res = true;
312  return;
313  } catch (const std::bad_alloc&) {
314  setFutex(full_, 1);
315  res = false;
316  return;
317  }
318  };
319  this->requestFC(fn);
320 
321  if (res) {
322  if (wake) {
323  detail::futexWake(&empty_);
324  }
325  return true;
326  }
327  if (when == std::chrono::time_point<Clock>::min()) {
328  return false;
329  }
330  while (isTrue(full_)) {
331  if (when == std::chrono::time_point<Clock>::max()) {
332  detail::futexWait(&full_, 1);
333  } else {
334  if (Clock::now() > when) {
335  return false;
336  } else {
337  detail::futexWaitUntil(&full_, 1, when);
338  }
339  }
340  } // inner while loop
341  } // outer while loop
342 }
343 
344 template <
345  typename T,
346  typename PriorityQueue,
347  typename Mutex,
348  template <typename> class Atom>
349 template <typename Clock, typename Duration>
350 inline bool
351 FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_pop_impl(
352  T& val,
353  const std::chrono::time_point<Clock, Duration>& when) {
354  while (true) {
355  bool res;
356  bool wake;
357 
358  auto fn = [&] {
359  res = !pq_.empty();
360  if (res) {
361  val = pq_.top();
362  pq_.pop();
363  wake = futexSignal(full_);
364  } else {
365  setFutex(empty_, 1);
366  }
367  };
368  this->requestFC(fn);
369 
370  if (res) {
371  if (wake) {
372  detail::futexWake(&full_);
373  }
374  return true;
375  }
376  while (isTrue(empty_)) {
377  if (when == std::chrono::time_point<Clock>::max()) {
378  detail::futexWait(&empty_, 1);
379  } else {
380  if (Clock::now() > when) {
381  return false;
382  } else {
383  detail::futexWaitUntil(&empty_, 1, when);
384  }
385  }
386  } // inner while loop
387  } // outer while loop
388 }
389 
390 template <
391  typename T,
392  typename PriorityQueue,
393  typename Mutex,
394  template <typename> class Atom>
395 template <typename Clock, typename Duration>
396 inline bool
397 FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_peek_impl(
398  T& val,
399  const std::chrono::time_point<Clock, Duration>& when) {
400  while (true) {
401  bool res;
402 
403  auto fn = [&] {
404  res = !pq_.empty();
405  if (res) {
406  val = pq_.top();
407  } else {
408  setFutex(empty_, 1);
409  }
410  };
411  this->requestFC(fn);
412 
413  if (res) {
414  return true;
415  }
416  while (isTrue(empty_)) {
417  if (when == std::chrono::time_point<Clock>::max()) {
418  detail::futexWait(&empty_, 1);
419  } else {
420  if (Clock::now() > when) {
421  return false;
422  } else {
423  detail::futexWaitUntil(&empty_, 1, when);
424  }
425  }
426  } // inner while loop
427  } // outer while loop
428 }
429 
430 } // namespace folly
bool wake(bool publishing, Waiter &waiter, WakerMetadata metadata, Waiter *&sleepers)
LogLevel max
Definition: LogLevel.cpp:31
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define Mutex
STL namespace.
double val
Definition: String.cpp:273
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
FutexResult futexWait(const Futex *futex, uint32_t expected, uint32_t waitMask)
Definition: Futex-inl.h:100
static bool dedicated
constexpr auto size(C const &c) -> decltype(c.size())
Definition: Access.h:45
constexpr auto empty(C const &c) -> decltype(c.empty())
Definition: Access.h:55
LogLevel min
Definition: LogLevel.cpp:30
folly::FlatCombiningPriorityQueue< int > FCPQ
#define Atom
FutexResult futexWaitUntil(const Futex *futex, uint32_t expected, std::chrono::time_point< Clock, Duration > const &deadline, uint32_t waitMask)
Definition: Futex-inl.h:112
Future< Unit > when(bool p, F &&thunk)
Definition: Future-inl.h:2330
std::mutex mutex
constexpr None none
Definition: Optional.h:87
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
Definition: Futex-inl.h:107