proxygen
folly::NotificationQueue< MessageT >::Consumer Class Referenceabstract

#include <NotificationQueue.h>

Inheritance diagram for folly::NotificationQueue< MessageT >::Consumer:
folly::DelayedDestruction folly::EventHandler folly::DelayedDestructionBase folly::AsyncServerSocket::RemoteAcceptor folly::detail::notification_queue_consumer_wrapper< MessageT, TCallback > folly::EventBase::FunctionRunner QueueConsumer

Public Types

enum  : uint16_t { kDefaultMaxReadAtOnce = 10 }
 

Public Member Functions

 Consumer ()
 
virtual void messageAvailable (MessageT &&message) noexcept=0
 
void startConsuming (EventBase *eventBase, NotificationQueue *queue)
 
void startConsumingInternal (EventBase *eventBase, NotificationQueue *queue)
 
void stopConsuming ()
 
bool consumeUntilDrained (size_t *numConsumed=nullptr) noexcept
 
NotificationQueuegetCurrentQueue () const
 
void setMaxReadAtOnce (uint32_t maxAtOnce)
 
uint32_t getMaxReadAtOnce () const
 
EventBasegetEventBase ()
 
void handlerReady (uint16_t events) noexceptoverride
 
template<typename TCallback >
std::unique_ptr< typename NotificationQueue< MessageT >::Consumer, DelayedDestruction::Destructormake (TCallback &&callback)
 
- Public Member Functions inherited from folly::DelayedDestruction
bool getDestroyPending () const
 
- Public Member Functions inherited from folly::DelayedDestructionBase
virtual ~DelayedDestructionBase ()=default
 

Static Public Member Functions

template<typename TCallback >
static std::unique_ptr< Consumer, DelayedDestruction::Destructormake (TCallback &&callback)
 

Protected Member Functions

void destroy () override
 
 ~Consumer () override
 
- Protected Member Functions inherited from folly::DelayedDestruction
 ~DelayedDestruction () override=default
 
 DelayedDestruction ()
 
- Protected Member Functions inherited from folly::DelayedDestructionBase
 DelayedDestructionBase ()
 
uint32_t getDestructorGuardCount () const
 

Private Member Functions

void consumeMessages (bool isDrain, size_t *numConsumed=nullptr) noexcept
 
void setActive (bool active, bool shouldLock=false)
 
void init (EventBase *eventBase, NotificationQueue *queue)
 
- Private Member Functions inherited from folly::EventHandler
 EventHandler (EventBase *eventBase, int fd)
 
 EventHandler (EventBase *eventBase=nullptr, NetworkSocket fd=NetworkSocket())
 
virtual ~EventHandler ()
 
bool registerHandler (uint16_t events)
 
void unregisterHandler ()
 
bool isHandlerRegistered () const
 
void attachEventBase (EventBase *eventBase)
 
void detachEventBase ()
 
void changeHandlerFD (int fd)
 
void changeHandlerFD (NetworkSocket fd)
 
void initHandler (EventBase *eventBase, int fd)
 
void initHandler (EventBase *eventBase, NetworkSocket fd)
 
uint16_t getRegisteredEvents () const
 
bool registerInternalHandler (uint16_t events)
 
bool isPending () const
 

Private Attributes

NotificationQueuequeue_
 
bool * destroyedFlagPtr_
 
uint32_t maxReadAtOnce_
 
EventBasebase_
 
bool active_ {false}
 

Additional Inherited Members

- Private Types inherited from folly::EventHandler
enum  EventFlags {
  NONE = 0, READ = EV_READ, WRITE = EV_WRITE, READ_WRITE = (READ | WRITE),
  PERSIST = EV_PERSIST
}
 

Detailed Description

template<typename MessageT>
class folly::NotificationQueue< MessageT >::Consumer

A callback interface for consuming messages from the queue as they arrive.

Definition at line 83 of file NotificationQueue.h.

Member Enumeration Documentation

template<typename MessageT>
anonymous enum : uint16_t
Enumerator
kDefaultMaxReadAtOnce 

Definition at line 85 of file NotificationQueue.h.

Constructor & Destructor Documentation

template<typename MessageT>
folly::NotificationQueue< MessageT >::Consumer::~Consumer ( )
inlineoverrideprotected

Definition at line 184 of file NotificationQueue.h.

References folly::pushmi::__adl::noexcept().

184 {}

Member Function Documentation

template<typename MessageT >
void folly::NotificationQueue< MessageT >::Consumer::consumeMessages ( bool  isDrain,
size_t *  numConsumed = nullptr 
)
privatenoexcept

Consume messages off the the queue until

  • the queue is empty (1), or
  • until the consumer is destroyed, or
  • until the consumer is uninstalled, or
  • an exception is thrown in the course of dequeueing, or
  • unless isDrain is true, until the maxReadAtOnce_ limit is hit

(1) Well, maybe. See logic/comments around "wasEmpty" in implementation.

Definition at line 691 of file NotificationQueue.h.

References folly::data(), folly::gen::move, folly::NotificationQueue< MessageT >::queue_, SCOPE_EXIT, uint32_t, and UNLIKELY.

693  {
694  DestructorGuard dg(this);
695  uint32_t numProcessed = 0;
696  setActive(true);
697  SCOPE_EXIT {
698  if (queue_) {
700  }
701  };
702  SCOPE_EXIT {
703  setActive(false, /* shouldLock = */ true);
704  };
705  SCOPE_EXIT {
706  if (numConsumed != nullptr) {
707  *numConsumed = numProcessed;
708  }
709  };
710  while (true) {
711  // Now pop the message off of the queue.
712  //
713  // We have to manually acquire and release the spinlock here, rather than
714  // using SpinLockHolder since the MessageT has to be constructed while
715  // holding the spinlock and available after we release it. SpinLockHolder
716  // unfortunately doesn't provide a release() method. (We can't construct
717  // MessageT first since we have no guarantee that MessageT has a default
718  // constructor.
719  queue_->spinlock_.lock();
720  bool locked = true;
721 
722  try {
723  if (UNLIKELY(queue_->queue_.empty())) {
724  // If there is no message, we've reached the end of the queue, return.
725  setActive(false);
727  return;
728  }
729 
730  // Pull a message off the queue.
731  std::unique_ptr<Node> data;
732  data.reset(&queue_->queue_.front());
733  queue_->queue_.pop_front();
734 
735  // Check to see if the queue is empty now.
736  // We use this as an optimization to see if we should bother trying to
737  // loop again and read another message after invoking this callback.
738  bool wasEmpty = queue_->queue_.empty();
739  if (wasEmpty) {
740  setActive(false);
741  }
742 
743  // Now unlock the spinlock before we invoke the callback.
745  RequestContextScopeGuard rctx(std::move(data->ctx_));
746 
747  locked = false;
748 
749  // Call the callback
750  bool callbackDestroyed = false;
751  CHECK(destroyedFlagPtr_ == nullptr);
752  destroyedFlagPtr_ = &callbackDestroyed;
753  messageAvailable(std::move(data->msg_));
754  destroyedFlagPtr_ = nullptr;
755 
756  // If the callback was destroyed before it returned, we are done
757  if (callbackDestroyed) {
758  return;
759  }
760 
761  // If the callback is no longer installed, we are done.
762  if (queue_ == nullptr) {
763  return;
764  }
765 
766  // If we have hit maxReadAtOnce_, we are done.
767  ++numProcessed;
768  if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
769  return;
770  }
771 
772  // If the queue was empty before we invoked the callback, it's probable
773  // that it is still empty now. Just go ahead and return, rather than
774  // looping again and trying to re-read from the eventfd. (If a new
775  // message had in fact arrived while we were invoking the callback, we
776  // will simply be woken up the next time around the event loop and will
777  // process the message then.)
778  if (wasEmpty) {
779  return;
780  }
781  } catch (const std::exception&) {
782  // This catch block is really just to handle the case where the MessageT
783  // constructor throws. The messageAvailable() callback itself is
784  // declared as noexcept and should never throw.
785  //
786  // If the MessageT constructor does throw we try to handle it as best as
787  // we can, but we can't work miracles. We will just ignore the error for
788  // now and return. The next time around the event loop we will end up
789  // trying to read the message again. If MessageT continues to throw we
790  // will never make forward progress and will keep trying each time around
791  // the event loop.
792  if (locked) {
793  // Unlock the spinlock.
795  }
796 
797  return;
798  }
799  }
800 }
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
FOLLY_ALWAYS_INLINE void unlock() const noexcept
Definition: SpinLock.h:52
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
void setActive(bool active, bool shouldLock=false)
FOLLY_ALWAYS_INLINE void lock() const noexcept
Definition: SpinLock.h:49
virtual void messageAvailable(MessageT &&message) noexcept=0
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
#define UNLIKELY(x)
Definition: Likely.h:48
template<typename MessageT >
bool folly::NotificationQueue< MessageT >::Consumer::consumeUntilDrained ( size_t *  numConsumed = nullptr)
noexcept

Consume messages off the queue until it is empty. No messages may be added to the queue while it is draining, so that the process is bounded. To that end, putMessage/tryPutMessage will throw an std::runtime_error, and tryPutMessageNoThrow will return false.

Returns
true if the queue was drained, false otherwise. In practice, this will only fail if someone else is already draining the queue.

Definition at line 848 of file NotificationQueue.h.

References g(), and folly::NotificationQueue< MessageT >::queue_.

Referenced by TEST().

849  {
850  DestructorGuard dg(this);
851  {
853  if (queue_->draining_) {
854  return false;
855  }
856  queue_->draining_ = true;
857  }
858  consumeMessages(true, numConsumed);
859  {
861  queue_->draining_ = false;
862  }
863  return true;
864 }
void consumeMessages(bool isDrain, size_t *numConsumed=nullptr) noexcept
g_t g(f_t)
template<typename MessageT >
void folly::NotificationQueue< MessageT >::Consumer::destroy ( )
overrideprotectedvirtual

destroy() requests destruction of the object.

This method will destroy the object after it has no more functions running higher up on the stack. (i.e., No more DestructorGuard objects exist for this object.) This method must be used instead of the destructor.

Reimplemented from folly::DelayedDestruction.

Definition at line 672 of file NotificationQueue.h.

References folly::DelayedDestruction::destroy().

672  {
673  // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
674  // will be non-nullptr. Mark the value that it points to, so that
675  // handlerReady() will know the callback is destroyed, and that it cannot
676  // access any member variables anymore.
677  if (destroyedFlagPtr_) {
678  *destroyedFlagPtr_ = true;
679  }
680  stopConsuming();
682 }
template<typename MessageT>
NotificationQueue* folly::NotificationQueue< MessageT >::Consumer::getCurrentQueue ( ) const
inline

Get the NotificationQueue that this consumer is currently consuming messages from. Returns nullptr if the consumer is not currently consuming events from any queue.

Definition at line 154 of file NotificationQueue.h.

References folly::NotificationQueue< MessageT >::queue_.

154  {
155  return queue_;
156  }
template<typename MessageT>
EventBase* folly::NotificationQueue< MessageT >::Consumer::getEventBase ( )
inline

Definition at line 175 of file NotificationQueue.h.

References base_, destroy(), and uint16_t.

Referenced by folly::EventBase::FunctionRunner::messageAvailable().

175  {
176  return base_;
177  }
template<typename MessageT>
uint32_t folly::NotificationQueue< MessageT >::Consumer::getMaxReadAtOnce ( ) const
inline

Definition at line 171 of file NotificationQueue.h.

171  {
172  return maxReadAtOnce_;
173  }
template<typename MessageT >
void folly::NotificationQueue< MessageT >::Consumer::handlerReady ( uint16_t  events)
overridevirtualnoexcept

handlerReady() is invoked when the handler is ready.

Parameters
eventsA bitset indicating the events that are ready.

Implements folly::EventHandler.

Definition at line 685 of file NotificationQueue.h.

686  {
687  consumeMessages(false);
688 }
void consumeMessages(bool isDrain, size_t *numConsumed=nullptr) noexcept
template<typename MessageT >
void folly::NotificationQueue< MessageT >::Consumer::init ( EventBase eventBase,
NotificationQueue queue 
)
private

Definition at line 803 of file NotificationQueue.h.

References base_, folly::EventBase::dcheckIsInEventBaseThread(), g(), and folly::NotificationQueue< MessageT >::queue_.

805  {
806  eventBase->dcheckIsInEventBaseThread();
807  assert(queue_ == nullptr);
808  assert(!isHandlerRegistered());
809  queue->checkPid();
810 
811  base_ = eventBase;
812 
813  queue_ = queue;
814 
815  {
818  }
819  queue_->ensureSignal();
820 
821  if (queue_->eventfd_ >= 0) {
822  initHandler(eventBase, queue_->eventfd_);
823  } else {
824  initHandler(eventBase, queue_->pipeFds_[0]);
825  }
826 }
void initHandler(EventBase *eventBase, int fd)
Definition: EventHandler.h:156
g_t g(f_t)
bool isHandlerRegistered() const
Definition: EventHandler.h:112
template<typename MessageT>
template<typename TCallback >
static std::unique_ptr<Consumer, DelayedDestruction::Destructor> folly::NotificationQueue< MessageT >::Consumer::make ( TCallback &&  callback)
static
template<typename MessageT>
template<typename TCallback >
std::unique_ptr< typename NotificationQueue<MessageT>::Consumer, DelayedDestruction::Destructor> folly::NotificationQueue< MessageT >::Consumer::make ( TCallback &&  callback)

Definition at line 931 of file NotificationQueue.h.

References type.

931  {
932  return std::unique_ptr<
933  NotificationQueue<MessageT>::Consumer,
934  DelayedDestruction::Destructor>(
935  new detail::notification_queue_consumer_wrapper<
936  MessageT,
937  typename std::decay<TCallback>::type>(
938  std::forward<TCallback>(callback)));
939 }
PskType type
template<typename MessageT>
virtual void folly::NotificationQueue< MessageT >::Consumer::messageAvailable ( MessageT &&  message)
pure virtualnoexcept

messageAvailable() will be invoked whenever a new message is available from the pipe.

Implemented in folly::detail::notification_queue_consumer_wrapper< MessageT, TCallback >.

template<typename MessageT>
void folly::NotificationQueue< MessageT >::Consumer::setActive ( bool  active,
bool  shouldLock = false 
)
inlineprivate

Definition at line 199 of file NotificationQueue.h.

References folly::init(), and folly::NotificationQueue< MessageT >::queue_.

199  {
200  if (!queue_) {
201  active_ = active;
202  return;
203  }
204  if (shouldLock) {
205  queue_->spinlock_.lock();
206  }
207  if (!active_ && active) {
209  } else if (active_ && !active) {
211  }
212  active_ = active;
213  if (shouldLock) {
215  }
216  }
FOLLY_ALWAYS_INLINE void unlock() const noexcept
Definition: SpinLock.h:52
std::atomic< int > numActiveConsumers_
FOLLY_ALWAYS_INLINE void lock() const noexcept
Definition: SpinLock.h:49
template<typename MessageT>
void folly::NotificationQueue< MessageT >::Consumer::setMaxReadAtOnce ( uint32_t  maxAtOnce)
inline

Set a limit on how many messages this consumer will read each iteration around the event loop.

This helps rate-limit how much work the Consumer will do each event loop iteration, to prevent it from starving other event handlers.

A limit of 0 means no limit will be enforced. If unset, the limit defaults to kDefaultMaxReadAtOnce (defined to 10 above).

Definition at line 168 of file NotificationQueue.h.

Referenced by QueueTest::maxReadAtOnce(), folly::AsyncServerSocket::RemoteAcceptor::start(), and TEST().

168  {
169  maxReadAtOnce_ = maxAtOnce;
170  }
template<typename MessageT>
void folly::NotificationQueue< MessageT >::Consumer::startConsuming ( EventBase eventBase,
NotificationQueue queue 
)
inline

Begin consuming messages from the specified queue.

messageAvailable() will be called whenever a message is available. This consumer will continue to consume messages until stopConsuming() is called.

A Consumer may only consume messages from a single NotificationQueue at a time. startConsuming() should not be called if this consumer is already consuming.

Definition at line 114 of file NotificationQueue.h.

References folly::init().

Referenced by QueueTest::maxReadAtOnce(), QueueTest::multiConsumer(), QueueTest::putMessages(), QueueTest::sendOne(), and TEST().

114  {
115  init(eventBase, queue);
117  }
void init(EventBase *eventBase, NotificationQueue *queue)
bool registerHandler(uint16_t events)
Definition: EventHandler.h:100
template<typename MessageT>
void folly::NotificationQueue< MessageT >::Consumer::startConsumingInternal ( EventBase eventBase,
NotificationQueue queue 
)
inline

Same as above but registers this event handler as internal so that it doesn't count towards the pending reader count for the IOLoop.

Definition at line 123 of file NotificationQueue.h.

References folly::init(), and folly::pushmi::__adl::noexcept().

125  {
126  init(eventBase, queue);
128  }
bool registerInternalHandler(uint16_t events)
Definition: EventHandler.h:183
void init(EventBase *eventBase, NotificationQueue *queue)
template<typename MessageT >
void folly::NotificationQueue< MessageT >::Consumer::stopConsuming ( )

Stop consuming messages.

startConsuming() may be called again to resume consumption of messages at a later point in time.

Definition at line 829 of file NotificationQueue.h.

References g(), and folly::NotificationQueue< MessageT >::queue_.

Referenced by QueueTest::multiConsumer(), QueueTest::putMessages(), and QueueTest::sendOne().

829  {
830  if (queue_ == nullptr) {
831  assert(!isHandlerRegistered());
832  return;
833  }
834 
835  {
838  setActive(false);
839  }
840 
841  assert(isHandlerRegistered());
843  detachEventBase();
844  queue_ = nullptr;
845 }
void setActive(bool active, bool shouldLock=false)
g_t g(f_t)
bool isHandlerRegistered() const
Definition: EventHandler.h:112

Member Data Documentation

template<typename MessageT>
bool folly::NotificationQueue< MessageT >::Consumer::active_ {false}
private

Definition at line 223 of file NotificationQueue.h.

template<typename MessageT>
EventBase* folly::NotificationQueue< MessageT >::Consumer::base_
private

Definition at line 222 of file NotificationQueue.h.

template<typename MessageT>
bool* folly::NotificationQueue< MessageT >::Consumer::destroyedFlagPtr_
private

Definition at line 220 of file NotificationQueue.h.

template<typename MessageT>
uint32_t folly::NotificationQueue< MessageT >::Consumer::maxReadAtOnce_
private

Definition at line 221 of file NotificationQueue.h.

template<typename MessageT>
NotificationQueue* folly::NotificationQueue< MessageT >::Consumer::queue_
private

Definition at line 219 of file NotificationQueue.h.


The documentation for this class was generated from the following file: