proxygen
folly::NotificationQueue< MessageT > Class Template Reference

#include <EventBase.h>

Classes

class  Consumer
 
struct  Node
 
class  SimpleConsumer
 

Public Types

enum  FdType { FdType::PIPE }
 

Public Member Functions

 NotificationQueue (uint32_t maxSize=0, FdType fdType=FdType::PIPE)
 
 ~NotificationQueue ()
 
void setMaxQueueSize (uint32_t max)
 
template<typename MessageTT >
void tryPutMessage (MessageTT &&message)
 
template<typename MessageTT >
bool tryPutMessageNoThrow (MessageTT &&message)
 
template<typename MessageTT >
void putMessage (MessageTT &&message)
 
template<typename InputIteratorT >
void putMessages (InputIteratorT first, InputIteratorT last)
 
bool tryConsume (MessageT &result)
 
size_t size () const
 
void checkPid () const
 

Private Member Functions

 NotificationQueue (NotificationQueue const &)=delete
 
NotificationQueueoperator= (NotificationQueue const &)=delete
 
bool checkQueueSize (size_t maxSize, bool throws=true) const
 
bool checkDraining (bool throws=true)
 
void ensureSignalLocked () const
 
void drainSignalsLocked ()
 
void ensureSignal () const
 
void syncSignalAndQueue ()
 
template<typename MessageTT >
bool putMessageImpl (MessageTT &&message, size_t maxSize, bool throws=true)
 
template<typename InputIteratorT >
void putMessagesImpl (InputIteratorT first, InputIteratorT last, std::input_iterator_tag)
 

Private Attributes

folly::SpinLock spinlock_
 
bool signal_ {false}
 
int eventfd_
 
int pipeFds_ [2]
 
uint32_t advisoryMaxQueueSize_
 
pid_t pid_
 
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
 
int numConsumers_ {0}
 
std::atomic< int > numActiveConsumers_ {0}
 
bool draining_ {false}
 

Detailed Description

template<typename MessageT>
class folly::NotificationQueue< MessageT >

A producer-consumer queue for passing messages between EventBase threads.

Messages can be added to the queue from any thread. Multiple consumers may listen to the queue from multiple EventBase threads.

A NotificationQueue may not be destroyed while there are still consumers registered to receive events from the queue. It is the user's responsibility to ensure that all consumers are unregistered before the queue is destroyed.

MessageT should be MoveConstructible (i.e., must support either a move constructor or a copy constructor, or both). Ideally it's move constructor (or copy constructor if no move constructor is provided) should never throw exceptions. If the constructor may throw, the consumers could end up spinning trying to move a message off the queue and failing, and then retrying.

Definition at line 56 of file EventBase.h.

Member Enumeration Documentation

template<typename MessageT>
enum folly::NotificationQueue::FdType
strong
Enumerator
PIPE 

Definition at line 247 of file NotificationQueue.h.

247  {
248  PIPE,
249 #ifdef FOLLY_HAVE_EVENTFD
250  EVENTFD,
251 #endif
252  };

Constructor & Destructor Documentation

template<typename MessageT>
folly::NotificationQueue< MessageT >::NotificationQueue ( uint32_t  maxSize = 0,
FdType  fdType = FdType::PIPE 
)
inlineexplicit

Create a new NotificationQueue.

If the maxSize parameter is specified, this sets the maximum queue size that will be enforced by tryPutMessage(). (This size is advisory, and may be exceeded if producers explicitly use putMessage() instead of tryPutMessage().)

The fdType parameter determines the type of file descriptor used internally to signal message availability. The default (eventfd) is preferable for performance and because it won't fail when the queue gets too long. It is not available on on older and non-linux kernels, however. In this case the code will fall back to using a pipe, the parameter is mostly for testing purposes.

Definition at line 269 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::checkPid().

276  : eventfd_(-1),
277  pipeFds_{-1, -1},
278  advisoryMaxQueueSize_(maxSize),
279  pid_(pid_t(getpid())) {
280 
281 #ifdef FOLLY_HAVE_EVENTFD
282  if (fdType == FdType::EVENTFD) {
284  if (eventfd_ == -1) {
285  if (errno == ENOSYS || errno == EINVAL) {
286  // eventfd not availalble
287  LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
288  << errno << ", falling back to pipe mode (is your kernel "
289  << "> 2.6.30?)";
290  fdType = FdType::PIPE;
291  } else {
292  // some other error
294  "Failed to create eventfd for "
295  "NotificationQueue",
296  errno);
297  }
298  }
299  }
300 #endif
301  if (fdType == FdType::PIPE) {
302  if (pipe(pipeFds_)) {
304  "Failed to create pipe for NotificationQueue", errno);
305  }
306  try {
307  // put both ends of the pipe into non-blocking mode
308  if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
310  "failed to put NotificationQueue pipe read "
311  "endpoint into non-blocking mode",
312  errno);
313  }
314  if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
316  "failed to put NotificationQueue pipe write "
317  "endpoint into non-blocking mode",
318  errno);
319  }
320  } catch (...) {
321  ::close(pipeFds_[0]);
322  ::close(pipeFds_[1]);
323  throw;
324  }
325  }
326  }
#define eventfd(initval, flags)
void throwSystemError(Args &&...args)
Definition: Exception.h:76
int close(NetworkSocket s)
Definition: NetOps.cpp:90
void pipe(CPUExecutor cpu, IOExecutor io)
template<typename MessageT>
folly::NotificationQueue< MessageT >::~NotificationQueue ( )
inline

Definition at line 328 of file NotificationQueue.h.

328  {
329  std::unique_ptr<Node> data;
330  while (!queue_.empty()) {
331  data.reset(&queue_.front());
332  queue_.pop_front();
333  }
334  if (eventfd_ >= 0) {
335  ::close(eventfd_);
336  eventfd_ = -1;
337  }
338  if (pipeFds_[0] >= 0) {
339  ::close(pipeFds_[0]);
340  pipeFds_[0] = -1;
341  }
342  if (pipeFds_[1] >= 0) {
343  ::close(pipeFds_[1]);
344  pipeFds_[1] = -1;
345  }
346  }
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
int close(NetworkSocket s)
Definition: NetOps.cpp:90
template<typename MessageT>
folly::NotificationQueue< MessageT >::NotificationQueue ( NotificationQueue< MessageT > const &  )
privatedelete

Member Function Documentation

template<typename MessageT>
bool folly::NotificationQueue< MessageT >::checkDraining ( bool  throws = true)
inlineprivate

Definition at line 496 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl().

496  {
497  if (UNLIKELY(draining_ && throws)) {
498  throw std::runtime_error("queue is draining, cannot add message");
499  }
500  return draining_;
501  }
#define UNLIKELY(x)
Definition: Likely.h:48
template<typename MessageT>
void folly::NotificationQueue< MessageT >::checkPid ( ) const
inline

Check that the NotificationQueue is being used from the correct process.

If you create a NotificationQueue in one process, then fork, and try to send messages to the queue from the child process, you're going to have a bad time. Unfortunately users have (accidentally) run into this.

Because we use an eventfd/pipe, the child process can actually signal the parent process that an event is ready. However, it can't put anything on the parent's queue, so the parent wakes up and finds an empty queue. This check ensures that we catch the problem in the misbehaving child process code, and crash before signalling the parent process.

Definition at line 474 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryConsume().

474  {
475  CHECK_EQ(pid_, pid_t(getpid()));
476  }
template<typename MessageT>
bool folly::NotificationQueue< MessageT >::checkQueueSize ( size_t  maxSize,
bool  throws = true 
) const
inlineprivate

Definition at line 483 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl().

483  {
484  DCHECK(0 == spinlock_.try_lock());
485  if (maxSize > 0 && queue_.size() >= maxSize) {
486  if (throws) {
487  throw std::overflow_error(
488  "unable to add message to NotificationQueue: "
489  "queue is full");
490  }
491  return false;
492  }
493  return true;
494  }
FOLLY_ALWAYS_INLINE bool try_lock() const noexcept
Definition: SpinLock.h:55
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
template<typename MessageT>
void folly::NotificationQueue< MessageT >::drainSignalsLocked ( )
inlineprivate

Definition at line 553 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::syncSignalAndQueue().

553  {
554  ssize_t bytes_read = 0;
555  if (eventfd_ > 0) {
557  bytes_read = readNoInt(eventfd_, &message, sizeof(message));
558  CHECK(bytes_read != -1 || errno == EAGAIN);
559  } else {
560  // There should only be one byte in the pipe. To avoid potential leaks we
561  // still drain.
562  uint8_t message[32];
563  ssize_t result;
564  while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) !=
565  -1) {
566  bytes_read += result;
567  }
568  CHECK(result == -1 && errno == EAGAIN);
569  LOG_IF(ERROR, bytes_read > 1)
570  << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
571  << bytes_read << " bytes, expected <= 1";
572  }
573  LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
574  << "[NotificationQueue] Unexpected state while draining signals: signal_="
575  << signal_ << " bytes_read=" << bytes_read;
576 
577  signal_ = false;
578 
579 #ifdef __ANDROID__
580  if (bytes_read > 0) {
581  eventBytes_ -= bytes_read;
582  }
583 #endif
584  }
ssize_t readNoInt(int fd, void *buf, size_t count)
Definition: FileUtil.cpp:102
std::string message
Definition: SPDYCodec.cpp:133
template<typename MessageT>
void folly::NotificationQueue< MessageT >::ensureSignal ( ) const
inlineprivate

Definition at line 586 of file NotificationQueue.h.

586  {
589  }
g_t g(f_t)
template<typename MessageT>
void folly::NotificationQueue< MessageT >::ensureSignalLocked ( ) const
inlineprivate

Definition at line 509 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignal(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessageImpl(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessagesImpl(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::syncSignalAndQueue().

509  {
510  // semantics: empty fd == empty queue <=> !signal_
511  if (signal_) {
512  return;
513  }
514 
515  ssize_t bytes_written = 0;
516  size_t bytes_expected = 0;
517 
518  do {
519  if (eventfd_ >= 0) {
520  // eventfd(2) dictates that we must write a 64-bit integer
521  uint64_t signal = 1;
522  bytes_expected = sizeof(signal);
523  bytes_written = ::write(eventfd_, &signal, bytes_expected);
524  } else {
525  uint8_t signal = 1;
526  bytes_expected = sizeof(signal);
527  bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
528  }
529  } while (bytes_written == -1 && errno == EINTR);
530 
531 #ifdef __ANDROID__
532  if (bytes_written > 0) {
533  eventBytes_ += bytes_written;
534  maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
535  }
536 #endif
537 
538  if (bytes_written == ssize_t(bytes_expected)) {
539  signal_ = true;
540  } else {
541 #ifdef __ANDROID__
542  LOG(ERROR) << "NotificationQueue Write Error=" << errno
543  << " bytesInPipe=" << eventBytes_
544  << " maxInPipe=" << maxEventBytes_ << " queue=" << size();
545 #endif
547  "failed to signal NotificationQueue after "
548  "write",
549  errno);
550  }
551  }
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
LogLevel max
Definition: LogLevel.cpp:31
void throwSystemError(Args &&...args)
Definition: Exception.h:76
template<typename MessageT>
NotificationQueue& folly::NotificationQueue< MessageT >::operator= ( NotificationQueue< MessageT > const &  )
privatedelete
template<typename MessageT>
template<typename MessageTT >
void folly::NotificationQueue< MessageT >::putMessage ( MessageTT &&  message)
inline

Unconditionally put a message on the queue.

This method is like tryPutMessage(), but ignores the maximum queue size and always puts the message on the queue, even if the maximum queue size would be exceeded.

putMessage() may throw

  • std::bad_alloc if memory allocation fails, and may
  • std::runtime_error if the queue is currently draining
  • any other exception thrown by the MessageT move/copy constructor.

Definition at line 408 of file NotificationQueue.h.

Referenced by TEST().

408  {
409  putMessageImpl(std::forward<MessageTT>(message), 0);
410  }
Definition: test.c:42
bool putMessageImpl(MessageTT &&message, size_t maxSize, bool throws=true)
template<typename MessageT>
template<typename MessageTT >
bool folly::NotificationQueue< MessageT >::putMessageImpl ( MessageTT &&  message,
size_t  maxSize,
bool  throws = true 
)
inlineprivate

Definition at line 602 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessage(), folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryPutMessage(), and folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryPutMessageNoThrow().

602  {
603  checkPid();
604  bool signal = false;
605  {
606  auto data = std::make_unique<Node>(
607  std::forward<MessageTT>(message), RequestContext::saveContext());
609  if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
610  return false;
611  }
612  // We only need to signal an event if not all consumers are
613  // awake.
615  signal = true;
616  }
617  queue_.push_back(*data.release());
618  if (signal) {
620  }
621  }
622  return true;
623  }
std::atomic< int > numActiveConsumers_
static std::shared_ptr< RequestContext > saveContext()
Definition: Request.h:196
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
std::string message
Definition: SPDYCodec.cpp:133
bool checkQueueSize(size_t maxSize, bool throws=true) const
g_t g(f_t)
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
bool checkDraining(bool throws=true)
template<typename MessageT>
template<typename InputIteratorT >
void folly::NotificationQueue< MessageT >::putMessages ( InputIteratorT  first,
InputIteratorT  last 
)
inline

Put several messages on the queue.

Definition at line 416 of file NotificationQueue.h.

Referenced by TEST().

416  {
417  typedef typename std::iterator_traits<InputIteratorT>::iterator_category
418  IterCategory;
419  putMessagesImpl(first, last, IterCategory());
420  }
void putMessagesImpl(InputIteratorT first, InputIteratorT last, std::input_iterator_tag)
constexpr detail::First first
Definition: Base-inl.h:2553
template<typename MessageT>
template<typename InputIteratorT >
void folly::NotificationQueue< MessageT >::putMessagesImpl ( InputIteratorT  first,
InputIteratorT  last,
std::input_iterator_tag   
)
inlineprivate

Definition at line 626 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::putMessages().

629  {
630  checkPid();
631  bool signal = false;
632  boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q;
633  try {
634  while (first != last) {
635  auto data = std::make_unique<Node>(
637  q.push_back(*data.release());
638  ++first;
639  }
641  checkDraining();
642  queue_.splice(queue_.end(), q);
644  signal = true;
645  }
646  if (signal) {
648  }
649  } catch (...) {
650  std::unique_ptr<Node> data;
651  while (!q.empty()) {
652  data.reset(&q.front());
653  q.pop_front();
654  }
655  throw;
656  }
657  }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
std::atomic< int > numActiveConsumers_
static std::shared_ptr< RequestContext > saveContext()
Definition: Request.h:196
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
g_t g(f_t)
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
bool checkDraining(bool throws=true)
constexpr detail::First first
Definition: Base-inl.h:2553
template<typename MessageT>
void folly::NotificationQueue< MessageT >::setMaxQueueSize ( uint32_t  max)
inline

Set the advisory maximum queue size.

This maximum queue size affects calls to tryPutMessage(). Message producers can still use the putMessage() call to unconditionally put a message on the queue, ignoring the configured maximum queue size. This can cause the queue size to exceed the configured maximum.

Definition at line 356 of file NotificationQueue.h.

356  {
358  }
LogLevel max
Definition: LogLevel.cpp:31
template<typename MessageT>
size_t folly::NotificationQueue< MessageT >::size ( ) const
inline

Definition at line 456 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::ensureSignalLocked().

456  {
458  return queue_.size();
459  }
g_t g(f_t)
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
template<typename MessageT>
void folly::NotificationQueue< MessageT >::syncSignalAndQueue ( )
inlineprivate

Definition at line 591 of file NotificationQueue.h.

Referenced by folly::NotificationQueue< folly::AsyncServerSocket::QueueMessage >::tryConsume().

591  {
593 
594  if (queue_.empty()) {
596  } else {
598  }
599  }
g_t g(f_t)
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
template<typename MessageT>
bool folly::NotificationQueue< MessageT >::tryConsume ( MessageT &  result)
inline

Try to immediately pull a message off of the queue, without blocking.

If a message is immediately available, the result parameter will be updated to contain the message contents and true will be returned.

If no message is available, false will be returned and result will be left unmodified.

Definition at line 431 of file NotificationQueue.h.

431  {
432  SCOPE_EXIT {
434  };
435 
436  checkPid();
437  std::unique_ptr<Node> data;
438 
439  {
441 
442  if (UNLIKELY(queue_.empty())) {
443  return false;
444  }
445 
446  data.reset(&queue_.front());
447  queue_.pop_front();
448  }
449 
450  result = std::move(data->msg_);
452 
453  return true;
454  }
std::shared_ptr< RequestContext > ctx_
static std::shared_ptr< RequestContext > setContext(std::shared_ptr< RequestContext > ctx)
Definition: Request.cpp:227
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
g_t g(f_t)
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
#define UNLIKELY(x)
Definition: Likely.h:48
template<typename MessageT>
template<typename MessageTT >
void folly::NotificationQueue< MessageT >::tryPutMessage ( MessageTT &&  message)
inline

Attempt to put a message on the queue if the queue is not already full.

If the queue is full, a std::overflow_error will be thrown. The setMaxQueueSize() function controls the maximum queue size.

If the queue is currently draining, an std::runtime_error will be thrown.

This method may contend briefly on a spinlock if many threads are concurrently accessing the queue, but for all intents and purposes it will immediately place the message on the queue and return.

tryPutMessage() may throw std::bad_alloc if memory allocation fails, and may throw any other exception thrown by the MessageT move/copy constructor.

Definition at line 377 of file NotificationQueue.h.

Referenced by TEST().

377  {
378  putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
379  }
Definition: test.c:42
bool putMessageImpl(MessageTT &&message, size_t maxSize, bool throws=true)
template<typename MessageT>
template<typename MessageTT >
bool folly::NotificationQueue< MessageT >::tryPutMessageNoThrow ( MessageTT &&  message)
inline

No-throw versions of the above. Instead returns true on success, false on failure.

Only std::overflow_error (the common exception case) and std::runtime_error (which indicates that the queue is being drained) are prevented from being thrown. User code must still catch std::bad_alloc errors.

Definition at line 390 of file NotificationQueue.h.

Referenced by TEST().

390  {
391  return putMessageImpl(
392  std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
393  }
Definition: test.c:42
bool putMessageImpl(MessageTT &&message, size_t maxSize, bool throws=true)

Member Data Documentation

template<typename MessageT>
bool folly::NotificationQueue< MessageT >::draining_ {false}
private
template<typename MessageT>
std::atomic<int> folly::NotificationQueue< MessageT >::numActiveConsumers_ {0}
private

The documentation for this class was generated from the following files: