|
void | initQueue (const size_t cap, const size_t mult) |
|
bool | tryObtainReadyPushTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept |
|
bool | tryObtainPromisedPushTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept |
|
bool | tryObtainReadyPopTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept |
|
bool | tryObtainPromisedPopTicket (uint64_t &ticket, Slot *&slots, size_t &cap, int &stride) noexcept |
|
template<typename... Args> |
void | enqueueWithTicket (const uint64_t ticket, Args &&...args) noexcept |
| Enqueues an element with a specific ticket number. More...
|
|
uint64_t | getOffset (const uint64_t state) const noexcept |
|
int | getNumClosed (const uint64_t state) const noexcept |
|
bool | tryExpand (const uint64_t state, const size_t cap) noexcept |
|
bool | trySeqlockReadSection (uint64_t &state, Slot *&slots, size_t &cap, int &stride) noexcept |
| Seqlock read-only section. More...
|
|
bool | maybeUpdateFromClosed (const uint64_t state, const uint64_t ticket, uint64_t &offset, Slot *&slots, size_t &cap, int &stride) noexcept |
|
template<typename T, template< typename > class Atom>
class folly::MPMCQueue< T, Atom, true >
The dynamic version of MPMCQueue allows dynamic expansion of queue capacity, such that a queue may start with a smaller capacity than specified and expand only if needed. Users may optionally specify the initial capacity and the expansion multiplier.
The design uses a seqlock to enforce mutual exclusion among expansion attempts. Regular operations read up-to-date queue information (slots array, capacity, stride) inside read-only seqlock sections, which are unimpeded when no expansion is in progress.
An expansion computes a new capacity, allocates a new slots array, and updates stride. No information needs to be copied from the current slots array to the new one. When this happens, new slots will not have sequence numbers that match ticket numbers. The expansion needs to compute a ticket offset such that operations that use new arrays can adjust the calculations of slot indexes and sequence numbers that take into account that the new slots start with sequence numbers of zero. The current ticket offset is packed with the seqlock in an atomic 64-bit integer. The initial offset is zero.
Lagging write and read operations with tickets lower than the ticket offset of the current slots array (i.e., the minimum ticket number that can be served by the current array) must use earlier closed arrays instead of the current one. Information about closed slots arrays (array address, capacity, stride, and offset) is maintained in a logarithmic-sized structure. Each entry in that structure never needs to be changed once set. The number of closed arrays is half the value of the seqlock (when unlocked).
The acquisition of the seqlock to perform an expansion does not prevent the issuing of new push and pop tickets concurrently. The expansion must set the new ticket offset to a value that couldn't have been issued to an operation that has already gone through a seqlock read-only section (and hence obtained information for older closed arrays).
Note that the total queue capacity can temporarily exceed the specified capacity when there are lagging consumers that haven't yet consumed all the elements in closed arrays. Users should not rely on the capacity of dynamic queues for synchronization, e.g., they should not expect that a thread will definitely block on a call to blockingWrite() when the queue size is known to be equal to its capacity.
Note that some writeIfNotFull() and tryWriteUntil() operations may fail even if the size of the queue is less than its maximum capacity and despite the success of expansion, if the operation happens to acquire a ticket that belongs to a closed array. This is a transient condition. Typically, one or two ticket values may be subject to such condition per expansion.
The dynamic version is a partial specialization of MPMCQueue with Dynamic == true
Definition at line 176 of file MPMCQueue.h.