proxygen
DynamicBoundedQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
21 
22 #include <glog/logging.h>
23 
24 #include <atomic>
25 #include <chrono>
26 
27 namespace folly {
28 
269 
270 template <typename T>
272  template <typename Arg>
274  return 1;
275  }
276 };
277 
278 template <
279  typename T,
280  bool SingleProducer,
281  bool SingleConsumer,
282  bool MayBlock,
283  size_t LgSegmentSize = 8,
284  size_t LgAlign = 7,
285  typename WeightFn = DefaultWeightFn<T>,
286  template <typename> class Atom = std::atomic>
288  using Weight = uint64_t;
289 
291  NOTWAITING = 0,
292  WAITING = 1,
293  };
294 
295  static constexpr bool SPSC = SingleProducer && SingleConsumer;
296  static constexpr size_t Align = 1u << LgAlign;
297 
298  static_assert(LgAlign < 16, "LgAlign must be < 16");
299 
301 
302  // Read mostly by producers
303  alignas(Align) Atom<Weight> debit_; // written frequently only by producers
304  Atom<Weight> capacity_; // written rarely by capacity resets
305 
306  // Read mostly by consumers
307  alignas(Align) Atom<Weight> credit_; // written frequently only by consumers
308  Atom<Weight> threshold_; // written rarely only by capacity resets
309 
310  // Normally written and read rarely by producers and consumers
311  // May be read frequently by producers when capacity is full
312  alignas(Align) Atom<Weight> transfer_;
314 
315  // Underlying unbounded queue
317  T,
318  SingleProducer,
319  SingleConsumer,
320  MayBlock,
321  LgSegmentSize,
322  LgAlign,
323  Atom>
324  q_;
325 
326  public:
328  explicit DynamicBoundedQueue(Weight capacity)
329  : debit_(0),
330  capacity_(capacity + threshold(capacity)), // capacity slack
331  credit_(0),
332  threshold_(threshold(capacity)),
333  transfer_(0),
334  waiting_(0) {}
335 
338 
340 
342  FOLLY_ALWAYS_INLINE void enqueue(const T& v) {
343  enqueueImpl(v);
344  }
345 
347  enqueueImpl(std::move(v));
348  }
349 
352  return tryEnqueueImpl(v);
353  }
354 
356  return tryEnqueueImpl(std::move(v));
357  }
358 
360  template <typename Clock, typename Duration>
362  const T& v,
363  const std::chrono::time_point<Clock, Duration>& deadline) {
364  return tryEnqueueUntilImpl(v, deadline);
365  }
366 
367  template <typename Clock, typename Duration>
369  T&& v,
370  const std::chrono::time_point<Clock, Duration>& deadline) {
371  return tryEnqueueUntilImpl(std::move(v), deadline);
372  }
373 
375  template <typename Rep, typename Period>
377  const T& v,
378  const std::chrono::duration<Rep, Period>& duration) {
379  return tryEnqueueForImpl(v, duration);
380  }
381 
382  template <typename Rep, typename Period>
384  T&& v,
385  const std::chrono::duration<Rep, Period>& duration) {
386  return tryEnqueueForImpl(std::move(v), duration);
387  }
388 
390 
392  FOLLY_ALWAYS_INLINE void dequeue(T& elem) {
393  q_.dequeue(elem);
394  addCredit(WeightFn()(elem));
395  }
396 
399  if (q_.try_dequeue(elem)) {
400  addCredit(WeightFn()(elem));
401  return true;
402  }
403  return false;
404  }
405 
407  template <typename Clock, typename Duration>
409  T& elem,
410  const std::chrono::time_point<Clock, Duration>& deadline) {
411  if (q_.try_dequeue_until(elem, deadline)) {
412  addCredit(WeightFn()(elem));
413  return true;
414  }
415  return false;
416  }
417 
419  template <typename Rep, typename Period>
421  T& elem,
422  const std::chrono::duration<Rep, Period>& duration) {
423  if (q_.try_dequeue_for(elem, duration)) {
424  addCredit(WeightFn()(elem));
425  return true;
426  }
427  return false;
428  }
429 
431 
433  void reset_capacity(Weight capacity) noexcept {
434  Weight thresh = threshold(capacity);
435  capacity_.store(capacity + thresh, std::memory_order_release);
436  threshold_.store(thresh, std::memory_order_release);
437  }
438 
441  auto d = getDebit();
442  auto c = getCredit();
443  auto t = getTransfer();
444  return d > (c + t) ? d - (c + t) : 0;
445  }
446 
448  size_t size() const noexcept {
449  return q_.size();
450  }
451 
454  return q_.empty();
455  }
456 
457  private:
459 
460  // Calculation of threshold to move credits in bulk from consumers
461  // to producers
462  constexpr Weight threshold(Weight capacity) const noexcept {
463  return capacity / 10;
464  }
465 
466  // Functions called frequently by producers
467 
468  template <typename Arg>
470  tryEnqueueUntilImpl(
471  std::forward<Arg>(v), std::chrono::steady_clock::time_point::max());
472  }
473 
474  template <typename Arg>
476  return tryEnqueueUntilImpl(
477  std::forward<Arg>(v), std::chrono::steady_clock::time_point::min());
478  }
479 
480  template <typename Clock, typename Duration, typename Arg>
482  Arg&& v,
483  const std::chrono::time_point<Clock, Duration>& deadline) {
484  Weight weight = WeightFn()(std::forward<Arg>(v));
485  if (LIKELY(tryAddDebit(weight))) {
486  q_.enqueue(std::forward<Arg>(v));
487  return true;
488  }
489  return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
490  }
491 
492  template <typename Rep, typename Period, typename Arg>
494  Arg&& v,
495  const std::chrono::duration<Rep, Period>& duration) {
496  if (LIKELY(tryEnqueueImpl(std::forward<Arg>(v)))) {
497  return true;
498  }
499  auto deadline = std::chrono::steady_clock::now() + duration;
500  return tryEnqueueUntilSlow(std::forward<Arg>(v), deadline);
501  }
502 
504  Weight capacity = getCapacity();
505  Weight before = fetchAddDebit(weight);
506  if (LIKELY(before + weight <= capacity)) {
507  return true;
508  } else {
509  subDebit(weight);
510  return false;
511  }
512  }
513 
515  return capacity_.load(std::memory_order_acquire);
516  }
517 
519  Weight before;
520  if (SingleProducer) {
521  before = getDebit();
522  debit_.store(before + weight, std::memory_order_relaxed);
523  } else {
524  before = debit_.fetch_add(weight, std::memory_order_acq_rel);
525  }
526  return before;
527  }
528 
530  return debit_.load(std::memory_order_acquire);
531  }
532 
533  // Functions called frequently by consumers
534 
536  Weight before = fetchAddCredit(weight);
537  Weight thresh = getThreshold();
538  if (before + weight >= thresh && before < thresh) {
539  transferCredit();
540  }
541  }
542 
544  Weight before;
545  if (SingleConsumer) {
546  before = getCredit();
547  credit_.store(before + weight, std::memory_order_relaxed);
548  } else {
549  before = credit_.fetch_add(weight, std::memory_order_acq_rel);
550  }
551  return before;
552  }
553 
555  return credit_.load(std::memory_order_acquire);
556  }
557 
559  return threshold_.load(std::memory_order_acquire);
560  }
561 
564  void subDebit(Weight weight) noexcept {
565  Weight before;
566  if (SingleProducer) {
567  before = getDebit();
568  debit_.store(before - weight, std::memory_order_relaxed);
569  } else {
570  before = debit_.fetch_sub(weight, std::memory_order_acq_rel);
571  }
572  DCHECK_GE(before, weight);
573  }
574 
575  template <typename Clock, typename Duration, typename Arg>
577  Arg&& v,
578  const std::chrono::time_point<Clock, Duration>& deadline) {
579  Weight weight = WeightFn()(std::forward<Arg>(v));
580  if (canEnqueue(deadline, weight)) {
581  q_.enqueue(std::forward<Arg>(v));
582  return true;
583  } else {
584  return false;
585  }
586  }
587 
588  template <typename Clock, typename Duration>
590  const std::chrono::time_point<Clock, Duration>& deadline,
591  Weight weight) noexcept {
592  Weight capacity = getCapacity();
593  while (true) {
594  tryReduceDebit();
595  Weight debit = getDebit();
596  if ((debit + weight <= capacity) && tryAddDebit(weight)) {
597  return true;
598  }
599  if (deadline < Clock::time_point::max() && Clock::now() >= deadline) {
600  return false;
601  }
602  if (MayBlock) {
603  if (canBlock(weight, capacity)) {
604  detail::futexWaitUntil(&waiting_, WAITING, deadline);
605  }
606  } else {
608  }
609  }
610  }
611 
612  bool canBlock(Weight weight, Weight capacity) noexcept {
613  waiting_.store(WAITING, std::memory_order_relaxed);
614  std::atomic_thread_fence(std::memory_order_seq_cst);
615  tryReduceDebit();
616  Weight debit = getDebit();
617  return debit + weight > capacity;
618  }
619 
621  Weight w = takeTransfer();
622  if (w > 0) {
623  subDebit(w);
624  }
625  return w > 0;
626  }
627 
629  Weight w = getTransfer();
630  if (w > 0) {
631  w = transfer_.exchange(0, std::memory_order_acq_rel);
632  }
633  return w;
634  }
635 
637  return transfer_.load(std::memory_order_acquire);
638  }
639 
643  Weight credit = takeCredit();
644  transfer_.fetch_add(credit, std::memory_order_acq_rel);
645  if (MayBlock) {
646  std::atomic_thread_fence(std::memory_order_seq_cst);
647  waiting_.store(NOTWAITING, std::memory_order_relaxed);
648  detail::futexWake(&waiting_);
649  }
650  }
651 
653  Weight credit;
654  if (SingleConsumer) {
655  credit = credit_.load(std::memory_order_relaxed);
656  credit_.store(0, std::memory_order_relaxed);
657  } else {
658  credit = credit_.exchange(0, std::memory_order_acq_rel);
659  }
660  return credit;
661  }
662 
663 }; // DynamicBoundedQueue
664 
666 
668 template <
669  typename T,
670  bool MayBlock,
671  size_t LgSegmentSize = 8,
672  size_t LgAlign = 7,
673  typename WeightFn = DefaultWeightFn<T>,
674  template <typename> class Atom = std::atomic>
676  T,
677  true,
678  true,
679  MayBlock,
680  LgSegmentSize,
681  LgAlign,
682  WeightFn,
683  Atom>;
684 
686 template <
687  typename T,
688  bool MayBlock,
689  size_t LgSegmentSize = 8,
690  size_t LgAlign = 7,
691  typename WeightFn = DefaultWeightFn<T>,
692  template <typename> class Atom = std::atomic>
694  T,
695  false,
696  true,
697  MayBlock,
698  LgSegmentSize,
699  LgAlign,
700  WeightFn,
701  Atom>;
702 
704 template <
705  typename T,
706  bool MayBlock,
707  size_t LgSegmentSize = 8,
708  size_t LgAlign = 7,
709  typename WeightFn = DefaultWeightFn<T>,
710  template <typename> class Atom = std::atomic>
712  T,
713  true,
714  false,
715  MayBlock,
716  LgSegmentSize,
717  LgAlign,
718  WeightFn,
719  Atom>;
720 
722 template <
723  typename T,
724  bool MayBlock,
725  size_t LgSegmentSize = 8,
726  size_t LgAlign = 7,
727  typename WeightFn = DefaultWeightFn<T>,
728  template <typename> class Atom = std::atomic>
730  T,
731  false,
732  false,
733  MayBlock,
734  LgSegmentSize,
735  LgAlign,
736  WeightFn,
737  Atom>;
738 
739 } // namespace folly
FOLLY_ALWAYS_INLINE Weight fetchAddDebit(Weight weight) noexcept
uint64_t operator()(Arg &&) const noexcept
auto v
FOLLY_ALWAYS_INLINE void enqueue(T &&v)
Weight weight() const noexcept
FOLLY_ALWAYS_INLINE void dequeue(T &elem)
Dequeue functions.
FOLLY_ALWAYS_INLINE Weight fetchAddCredit(Weight weight) noexcept
#define FOLLY_ALWAYS_INLINE
Definition: CPortability.h:151
LogLevel max
Definition: LogLevel.cpp:31
FOLLY_ALWAYS_INLINE void enqueue(const T &v)
Enqueue functions.
size_t size() const noexcept
FOLLY_ALWAYS_INLINE void enqueueImpl(Arg &&v)
std::chrono::steady_clock::time_point now()
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
Atom< std::uint32_t > Futex
Definition: Futex.h:51
#define LIKELY(x)
Definition: Likely.h:47
FOLLY_ALWAYS_INLINE bool try_dequeue(T &elem)
FOLLY_ALWAYS_INLINE bool tryEnqueueImpl(Arg &&v)
FOLLY_ALWAYS_INLINE bool try_enqueue_for(const T &v, const std::chrono::duration< Rep, Period > &duration)
DynamicBoundedQueue(Weight capacity)
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
FOLLY_ALWAYS_INLINE bool try_dequeue_until(T &elem, const std::chrono::time_point< Clock, Duration > &deadline)
FOLLY_ALWAYS_INLINE bool tryEnqueueForImpl(Arg &&v, const std::chrono::duration< Rep, Period > &duration)
FOLLY_ALWAYS_INLINE Weight getCapacity() const noexcept
FOLLY_ALWAYS_INLINE void addCredit(Weight weight) noexcept
void subDebit(Weight weight) noexcept
bool tryEnqueueUntilSlow(Arg &&v, const std::chrono::time_point< Clock, Duration > &deadline)
LogLevel min
Definition: LogLevel.cpp:30
bool canBlock(Weight weight, Weight capacity) noexcept
FOLLY_ALWAYS_INLINE bool try_enqueue(T &&v)
FOLLY_ALWAYS_INLINE bool try_enqueue_until(T &&v, const std::chrono::time_point< Clock, Duration > &deadline)
void reset_capacity(Weight capacity) noexcept
Secondary functions.
#define Atom
FOLLY_ALWAYS_INLINE Weight getDebit() const noexcept
UnboundedQueue< T, SingleProducer, SingleConsumer, MayBlock, LgSegmentSize, LgAlign, Atom > q_
constexpr Weight threshold(Weight capacity) const noexcept
Private functions ///.
FOLLY_ALWAYS_INLINE bool try_enqueue_for(T &&v, const std::chrono::duration< Rep, Period > &duration)
FutexResult futexWaitUntil(const Futex *futex, uint32_t expected, std::chrono::time_point< Clock, Duration > const &deadline, uint32_t waitMask)
Definition: Futex-inl.h:112
FOLLY_ALWAYS_INLINE bool try_enqueue_until(const T &v, const std::chrono::time_point< Clock, Duration > &deadline)
Weight getTransfer() const noexcept
detail::Futex< Atom > waiting_
const
Definition: upload.py:398
bool empty() const noexcept
FOLLY_ALWAYS_INLINE Weight getCredit() const noexcept
FOLLY_ALWAYS_INLINE bool try_enqueue(const T &v)
char c
bool canEnqueue(const std::chrono::time_point< Clock, Duration > &deadline, Weight weight) noexcept
FOLLY_ALWAYS_INLINE bool try_dequeue_for(T &elem, const std::chrono::duration< Rep, Period > &duration)
FOLLY_ALWAYS_INLINE bool tryEnqueueUntilImpl(Arg &&v, const std::chrono::time_point< Clock, Duration > &deadline)
FOLLY_ALWAYS_INLINE bool tryAddDebit(Weight weight) noexcept
int futexWake(const Futex *futex, int count, uint32_t wakeMask)
Definition: Futex-inl.h:107
FOLLY_ALWAYS_INLINE Weight getThreshold() const noexcept
void asm_volatile_pause()
Definition: Asm.h:37