proxygen
|
#include <EventBase.h>
Classes | |
class | Consumer |
struct | Node |
class | SimpleConsumer |
Public Types | |
enum | FdType { FdType::PIPE } |
Public Member Functions | |
NotificationQueue (uint32_t maxSize=0, FdType fdType=FdType::PIPE) | |
~NotificationQueue () | |
void | setMaxQueueSize (uint32_t max) |
template<typename MessageTT > | |
void | tryPutMessage (MessageTT &&message) |
template<typename MessageTT > | |
bool | tryPutMessageNoThrow (MessageTT &&message) |
template<typename MessageTT > | |
void | putMessage (MessageTT &&message) |
template<typename InputIteratorT > | |
void | putMessages (InputIteratorT first, InputIteratorT last) |
bool | tryConsume (MessageT &result) |
size_t | size () const |
void | checkPid () const |
Private Member Functions | |
NotificationQueue (NotificationQueue const &)=delete | |
NotificationQueue & | operator= (NotificationQueue const &)=delete |
bool | checkQueueSize (size_t maxSize, bool throws=true) const |
bool | checkDraining (bool throws=true) |
void | ensureSignalLocked () const |
void | drainSignalsLocked () |
void | ensureSignal () const |
void | syncSignalAndQueue () |
template<typename MessageTT > | |
bool | putMessageImpl (MessageTT &&message, size_t maxSize, bool throws=true) |
template<typename InputIteratorT > | |
void | putMessagesImpl (InputIteratorT first, InputIteratorT last, std::input_iterator_tag) |
Private Attributes | |
folly::SpinLock | spinlock_ |
bool | signal_ {false} |
int | eventfd_ |
int | pipeFds_ [2] |
uint32_t | advisoryMaxQueueSize_ |
pid_t | pid_ |
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > | queue_ |
int | numConsumers_ {0} |
std::atomic< int > | numActiveConsumers_ {0} |
bool | draining_ {false} |
A producer-consumer queue for passing messages between EventBase threads.
Messages can be added to the queue from any thread. Multiple consumers may listen to the queue from multiple EventBase threads.
A NotificationQueue may not be destroyed while there are still consumers registered to receive events from the queue. It is the user's responsibility to ensure that all consumers are unregistered before the queue is destroyed.
MessageT should be MoveConstructible (i.e., must support either a move constructor or a copy constructor, or both). Ideally it's move constructor (or copy constructor if no move constructor is provided) should never throw exceptions. If the constructor may throw, the consumers could end up spinning trying to move a message off the queue and failing, and then retrying.
Definition at line 56 of file EventBase.h.
|
strong |
Enumerator | |
---|---|
PIPE |
Definition at line 247 of file NotificationQueue.h.
|
inlineexplicit |
Create a new NotificationQueue.
If the maxSize parameter is specified, this sets the maximum queue size that will be enforced by tryPutMessage(). (This size is advisory, and may be exceeded if producers explicitly use putMessage() instead of tryPutMessage().)
The fdType parameter determines the type of file descriptor used internally to signal message availability. The default (eventfd) is preferable for performance and because it won't fail when the queue gets too long. It is not available on on older and non-linux kernels, however. In this case the code will fall back to using a pipe, the parameter is mostly for testing purposes.
Definition at line 269 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::checkPid().
|
inline |
Definition at line 328 of file NotificationQueue.h.
|
privatedelete |
|
inlineprivate |
Definition at line 496 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl().
|
inline |
Check that the NotificationQueue is being used from the correct process.
If you create a NotificationQueue in one process, then fork, and try to send messages to the queue from the child process, you're going to have a bad time. Unfortunately users have (accidentally) run into this.
Because we use an eventfd/pipe, the child process can actually signal the parent process that an event is ready. However, it can't put anything on the parent's queue, so the parent wakes up and finds an empty queue. This check ensures that we catch the problem in the misbehaving child process code, and crash before signalling the parent process.
Definition at line 474 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryConsume().
|
inlineprivate |
Definition at line 483 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl().
|
inlineprivate |
Definition at line 553 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::syncSignalAndQueue().
|
inlineprivate |
Definition at line 586 of file NotificationQueue.h.
|
inlineprivate |
Definition at line 509 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignal(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::syncSignalAndQueue().
|
privatedelete |
|
inline |
Unconditionally put a message on the queue.
This method is like tryPutMessage(), but ignores the maximum queue size and always puts the message on the queue, even if the maximum queue size would be exceeded.
putMessage() may throw
Definition at line 408 of file NotificationQueue.h.
Referenced by TEST().
|
inlineprivate |
Definition at line 602 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessage(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryPutMessage(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryPutMessageNoThrow().
|
inline |
Put several messages on the queue.
Definition at line 416 of file NotificationQueue.h.
Referenced by TEST().
|
inlineprivate |
Definition at line 626 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessages().
|
inline |
Set the advisory maximum queue size.
This maximum queue size affects calls to tryPutMessage(). Message producers can still use the putMessage() call to unconditionally put a message on the queue, ignoring the configured maximum queue size. This can cause the queue size to exceed the configured maximum.
Definition at line 356 of file NotificationQueue.h.
|
inline |
Definition at line 456 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignalLocked().
|
inlineprivate |
Definition at line 591 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryConsume().
|
inline |
Try to immediately pull a message off of the queue, without blocking.
If a message is immediately available, the result parameter will be updated to contain the message contents and true will be returned.
If no message is available, false will be returned and result will be left unmodified.
Definition at line 431 of file NotificationQueue.h.
|
inline |
Attempt to put a message on the queue if the queue is not already full.
If the queue is full, a std::overflow_error will be thrown. The setMaxQueueSize() function controls the maximum queue size.
If the queue is currently draining, an std::runtime_error will be thrown.
This method may contend briefly on a spinlock if many threads are concurrently accessing the queue, but for all intents and purposes it will immediately place the message on the queue and return.
tryPutMessage() may throw std::bad_alloc if memory allocation fails, and may throw any other exception thrown by the MessageT move/copy constructor.
Definition at line 377 of file NotificationQueue.h.
Referenced by TEST().
|
inline |
No-throw versions of the above. Instead returns true on success, false on failure.
Only std::overflow_error (the common exception case) and std::runtime_error (which indicates that the queue is being drained) are prevented from being thrown. User code must still catch std::bad_alloc errors.
Definition at line 390 of file NotificationQueue.h.
Referenced by TEST().
|
private |
Definition at line 663 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::NotificationQueue(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::setMaxQueueSize(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryPutMessage(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryPutMessageNoThrow().
|
private |
Definition at line 668 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::checkDraining().
|
private |
Definition at line 661 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::drainSignalsLocked(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignalLocked(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::NotificationQueue(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::~NotificationQueue().
|
private |
Definition at line 667 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl().
|
private |
Definition at line 666 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl().
|
private |
Definition at line 664 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::checkPid(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::NotificationQueue().
|
private |
Definition at line 662 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::drainSignalsLocked(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignalLocked(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::NotificationQueue(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::~NotificationQueue().
|
private |
Definition at line 665 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::checkQueueSize(), folly::NotificationQueue< MessageT >::Consumer::consumeMessages(), folly::NotificationQueue< MessageT >::Consumer::consumeUntilDrained(), folly::NotificationQueue< MessageT >::SimpleConsumer::consumeUntilDrained(), folly::NotificationQueue< MessageT >::Consumer::getCurrentQueue(), folly::NotificationQueue< MessageT >::SimpleConsumer::getFd(), folly::NotificationQueue< MessageT >::Consumer::init(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl(), folly::NotificationQueue< MessageT >::Consumer::setActive(), folly::NotificationQueue< MessageT >::SimpleConsumer::SimpleConsumer(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::size(), folly::NotificationQueue< MessageT >::Consumer::stopConsuming(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::syncSignalAndQueue(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryConsume(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::~NotificationQueue(), and folly::NotificationQueue< MessageT >::SimpleConsumer::~SimpleConsumer().
|
mutableprivate |
Definition at line 660 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::drainSignalsLocked(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignalLocked().
|
mutableprivate |
Definition at line 659 of file NotificationQueue.h.
Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::checkQueueSize(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignal(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::size(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::syncSignalAndQueue(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryConsume().