proxygen
folly::DefaultWeightFn< T > Struct Template Reference

#include <DynamicBoundedQueue.h>

Public Member Functions

template<typename Arg >
uint64_t operator() (Arg &&) const noexcept
 

Detailed Description

template<typename T>
struct folly::DefaultWeightFn< T >

DynamicBoundedQueue supports:

  • Dynamic memory usage that grows and shrink in proportion to the number of elements in the queue.
  • Adjustable capacity that helps throttle pathological cases of producer-consumer imbalance that may lead to excessive memory usage.
  • The adjustable capacity can also help prevent deadlock by allowing users to temporarily increase capacity substantially to guarantee accommodating producer requests that cannot wait.
  • SPSC, SPMC, MPSC, MPMC variants.
  • Blocking and spinning-only variants.
  • Inter-operable non-waiting, timed until, timed for, and waiting variants of producer and consumer operations.
  • Optional variable element weights.

Element Weights

  • Queue elements may have variable weights (calculated using a template parameter) that are by default 1.
  • Element weights count towards the queue's capacity.
  • Elements weights are not priorities and do not affect element order. Queues with variable element weights follow FIFO order, the same as default queues.

When to use DynamicBoundedQueue:

  • If a small maximum capacity may lead to deadlock or performance degradation under bursty patterns and a larger capacity is sufficient.
  • If the typical queue size is expected to be much lower than the maximum capacity
  • If an unbounded queue is susceptible to growing too much.
  • If support for variable element weights is needed.

When not to use DynamicBoundedQueue?

  • If dynamic memory allocation is unacceptable or if the maximum capacity needs to be small, then use fixed-size MPMCQueue or (if non-blocking SPSC) ProducerConsumerQueue.
  • If there is no risk of the queue growing too much, then use UnboundedQueue.

Setting capacity

  • The general rule is to set the capacity as high as acceptable. The queue performs best when it is not near full capacity.
  • The implementation may allow extra slack in capacity (~10%) for amortizing some costly steps. Therefore, precise capacity is not guaranteed and cannot be relied on for synchronization; i.e., this queue cannot be used as a semaphore.

Performance expectations:

  • As long as the queue size is below capacity in the common case, performance is comparable to MPMCQueue and better in cases of higher producer demand.
  • Performance degrades gracefully at full capacity.
  • It is recommended to measure performance with different variants when applicable, e.g., DMPMC vs DMPSC. Depending on the use case, sometimes the variant with the higher sequential overhead may yield better results due to, for example, more favorable producer-consumer balance or favorable timing for avoiding costly blocking.
  • See DynamicBoundedQueueTest.cpp for some benchmark results.

Template parameters:

  • T: element type
  • SingleProducer: true if there can be only one producer at a time.
  • SingleConsumer: true if there can be only one consumer at a time.
  • MayBlock: true if producers or consumers may block.
  • LgSegmentSize (default 8): Log base 2 of number of elements per UnboundedQueue segment.
  • LgAlign (default 7): Log base 2 of alignment directive; can be used to balance scalability (avoidance of false sharing) with memory efficiency.
  • WeightFn (DefaultWeightFn<T>): A customizable weight computing type for computing the weights of elements. The default weight is 1.

Template Aliases: DSPSCQueue<T, MayBlock, LgSegmentSize, LgAlign> DMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign> DSPMCQueue<T, MayBlock, LgSegmentSize, LgAlign> DMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>

Functions: Constructor Takes a capacity value as an argument.

Producer functions: void enqueue(const T&); void enqueue(T&&); Adds an element to the end of the queue. Waits until capacity is available if necessary. bool try_enqueue(const T&); bool try_enqueue(T&&); Tries to add an element to the end of the queue if capacity allows it. Returns true if successful. Otherwise Returns false. bool try_enqueue_until(const T&, time_point& deadline); bool try_enqueue_until(T&&, time_point& deadline); Tries to add an element to the end of the queue if capacity allows it until the specified deadline. Returns true if successful, otherwise false. bool try_enqueue_for(const T&, duration&); bool try_enqueue_for(T&&, duration&); Tries to add an element to the end of the queue if capacity allows until the expiration of the specified duration. Returns true if successful, otherwise false.

Consumer functions: void dequeue(T&); Extracts an element from the front of the queue. Waits until an element is available if necessary. bool try_dequeue(T&); Tries to extracts an element from the front of the queue if available. Returns true if successful, otherwise false. bool try_dequeue_until(T&, time_point& deadline); Tries to extracts an element from the front of the queue if available until the specified daedline. Returns true if successful. Otherwise Returns false. bool try_dequeue_for(T&, duration&); Tries to extracts an element from the front of the queue if available until the expiration of the specified duration. Returns true if successful. Otherwise Returns false.

Secondary functions: void reset_capacity(size_t capacity); Changes the capacity of the queue. Does not affect the current contents of the queue. Guaranteed only to affect subsequent enqueue operations. May or may not affect concurrent operations. Capacity must be at least 1000. Weight weight(); Returns an estimate of the total weight of the elements in the queue. size_t size(); Returns an estimate of the total number of elements. bool empty(); Returns true only if the queue was empty during the call. Note: weight(), size(), and empty() are guaranteed to be accurate only if there are no concurrent changes to the queue.

Usage example with default weight:

/* DMPSC, doesn't block, 1024 int elements per segment */
DMPSCQueue<int, false, 10> q(100000);
ASSERT_TRUE(q.empty());
ASSERT_EQ(q.size(), 0);
q.enqueue(1));
ASSERT_TRUE(q.try_enqueue(2));
ASSERT_TRUE(q.try_enqueue_until(3, deadline));
ASSERT_TRUE(q.try_enqueue(4, duration));
// ... enqueue more elements until capacity is full
// See above comments about imprecise capacity guarantees
ASSERT_FALSE(q.try_enqueue(100001)); // can't enqueue but can't wait
size_t sz = q.size();
ASSERT_GE(sz, 100000);
q.reset_capacity(1000000000); // set huge capacity
ASSERT_TRUE(q.try_enqueue(100001)); // now enqueue succeeds
q.reset_capacity(100000); // set capacity back to 100,000
ASSERT_FALSE(q.try_enqueue(100002));
ASSERT_EQ(q.size(), sz + 1);
int v;
q.dequeue(v);
ASSERT_EQ(v, 1);
ASSERT_TRUE(q.try_dequeue(v));
ASSERT_EQ(v, 2);
ASSERT_TRUE(q.try_dequeue_until(v, deadline));
ASSERT_EQ(v, 3);
ASSERT_TRUE(q.try_dequeue_for(v, duration));
ASSERT_EQ(v, 4);
ASSERT_EQ(q.size(), sz - 3);

Usage example with custom weights:

struct CustomWeightFn {
uint64_t operator()(int val) { return val / 100; }
};
DMPMCQueue<int, false, 10, CustomWeightFn> q(20);
ASSERT_TRUE(q.empty());
q.enqueue(100);
ASSERT_TRUE(q.try_enqueue(200));
ASSERT_TRUE(q.try_enqueue_until(500, now() + seconds(1)));
ASSERT_EQ(q.size(), 3);
ASSERT_EQ(q.weight(), 8);
ASSERT_FALSE(q.try_enqueue_for(1700, microseconds(1)));
q.reset_capacity(1000000); // set capacity to 1000000 instead of 20
ASSERT_TRUE(q.try_enqueue_for(1700, microseconds(1)));
q.reset_capacity(20); // set capacity to 20 again
ASSERT_FALSE(q.try_enqueue(100));
ASSERT_EQ(q.size(), 4);
ASSERT_EQ(q.weight(), 25);
int v;
q.dequeue(v);
ASSERT_EQ(v, 100);
ASSERT_TRUE(q.try_dequeue(v));
ASSERT_EQ(v, 200);
ASSERT_TRUE(q.try_dequeue_until(v, now() + seconds(1)));
ASSERT_EQ(v, 500);
ASSERT_EQ(q.size(), 1);
ASSERT_EQ(q.weight(), 17);

Design:

  • The implementation is on top of UnboundedQueue.
  • The main FIFO functionality is in UnboundedQueue. DynamicBoundedQueue manages keeping the total queue weight within the specified capacity.
  • For the sake of scalability, the data structures are designed to minimize interference between producers on one side and consumers on the other.
  • Producers add to a debit variable the weight of the added element and check capacity.
  • Consumers add to a credit variable the weight of the removed element.
  • Producers, for the sake of scalability, use fetch_add to add to the debit variable and subtract if it exceeded capacity, rather than using compare_exchange to avoid overshooting.
  • Consumers, infrequently, transfer credit to a transfer variable and unblock any blocked producers. The transfer variable can be used by producers to decrease their debit when needed.
  • Note that a low capacity will trigger frequent credit transfer by consumers that may degrade performance. Capacity should not be set too low.
  • Transfer of credit by consumers is triggered when the amount of credit reaches a threshold (1/10 of capacity).
  • The waiting of consumers is handled in UnboundedQueue. The waiting of producers is handled in this template.
  • For a producer operation, if the difference between debit and capacity (plus some slack to account for the transfer threshold) does not accommodate the weight of the new element, it first tries to transfer credit that may have already been made available by consumers. If this is insufficient and MayBlock is true, then the producer uses a futex to block until new credit is transferred by a consumer.

Memory Usage:

  • Aside from three cache lines for managing capacity, the memory for queue elements is managed using UnboundedQueue and grows and shrinks dynamically with the number of elements.
  • The template parameter LgAlign can be used to reduce memory usage at the cost of increased chance of false sharing.

Definition at line 271 of file DynamicBoundedQueue.h.

Member Function Documentation

template<typename T>
template<typename Arg >
uint64_t folly::DefaultWeightFn< T >::operator() ( Arg &&  ) const
inlinenoexcept

Definition at line 273 of file DynamicBoundedQueue.h.

References Atom, and folly::T.

273  {
274  return 1;
275  }

The documentation for this struct was generated from the following file: