19 #include <sys/types.h> 27 #include <boost/intrusive/slist.hpp> 41 #include <glog/logging.h> 43 #if __linux__ && !__ANDROID__ 44 #define FOLLY_HAVE_EVENTFD 68 template <
typename MessageT>
69 class NotificationQueue {
70 struct Node :
public boost::intrusive::slist_base_hook<
71 boost::intrusive::cache_last<true>> {
72 template <
typename MessageTT>
73 Node(MessageTT&& msg, std::shared_ptr<RequestContext> ctx)
76 std::shared_ptr<RequestContext>
ctx_;
85 enum :
uint16_t { kDefaultMaxReadAtOnce = 10 };
90 maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
93 template <
typename TCallback>
94 static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
95 TCallback&& callback);
115 init(eventBase, queue);
116 registerHandler(READ | PERSIST);
126 init(eventBase, queue);
127 registerInternalHandler(READ | PERSIST);
136 void stopConsuming();
147 bool consumeUntilDrained(
size_t* numConsumed =
nullptr)
noexcept;
169 maxReadAtOnce_ = maxAtOnce;
172 return maxReadAtOnce_;
179 void handlerReady(
uint16_t events) noexcept
override;
197 void consumeMessages(
bool isDrain,
size_t* numConsumed =
nullptr)
noexcept;
207 if (!active_ && active) {
208 ++
queue_->numActiveConsumers_;
209 }
else if (active_ && !active) {
210 --
queue_->numActiveConsumers_;
214 queue_->spinlock_.unlock();
240 template <
typename F>
241 void consumeUntilDrained(F&&
foreach);
249 #ifdef FOLLY_HAVE_EVENTFD 271 #ifdef FOLLY_HAVE_EVENTFD
272 FdType fdType = FdType::EVENTFD)
279 pid_(pid_t(getpid())) {
281 #ifdef FOLLY_HAVE_EVENTFD 282 if (fdType == FdType::EVENTFD) {
285 if (errno == ENOSYS || errno == EINVAL) {
287 LOG(ERROR) <<
"failed to create eventfd for NotificationQueue: " 288 << errno <<
", falling back to pipe mode (is your kernel " 294 "Failed to create eventfd for " 304 "Failed to create pipe for NotificationQueue", errno);
308 if (fcntl(
pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
310 "failed to put NotificationQueue pipe read " 311 "endpoint into non-blocking mode",
314 if (fcntl(
pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
316 "failed to put NotificationQueue pipe write " 317 "endpoint into non-blocking mode",
329 std::unique_ptr<Node>
data;
331 data.reset(&
queue_.front());
376 template <
typename MessageTT>
389 template <
typename MessageTT>
407 template <
typename MessageTT>
415 template <
typename InputIteratorT>
417 typedef typename std::iterator_traits<InputIteratorT>::iterator_category
437 std::unique_ptr<Node>
data;
446 data.reset(&
queue_.front());
475 CHECK_EQ(
pid_, pid_t(getpid()));
485 if (maxSize > 0 &&
queue_.size() >= maxSize) {
487 throw std::overflow_error(
488 "unable to add message to NotificationQueue: " 498 throw std::runtime_error(
"queue is draining, cannot add message");
505 mutable std::atomic<int> eventBytes_{0};
506 mutable std::atomic<int> maxEventBytes_{0};
515 ssize_t bytes_written = 0;
516 size_t bytes_expected = 0;
522 bytes_expected =
sizeof(signal);
526 bytes_expected =
sizeof(signal);
529 }
while (bytes_written == -1 && errno == EINTR);
532 if (bytes_written > 0) {
533 eventBytes_ += bytes_written;
534 maxEventBytes_ =
std::max((
int)maxEventBytes_, (
int)eventBytes_);
538 if (bytes_written == ssize_t(bytes_expected)) {
542 LOG(ERROR) <<
"NotificationQueue Write Error=" << errno
543 <<
" bytesInPipe=" << eventBytes_
544 <<
" maxInPipe=" << maxEventBytes_ <<
" queue=" <<
size();
547 "failed to signal NotificationQueue after " 554 ssize_t bytes_read = 0;
558 CHECK(bytes_read != -1 || errno == EAGAIN);
566 bytes_read += result;
568 CHECK(result == -1 && errno == EAGAIN);
569 LOG_IF(ERROR, bytes_read > 1)
570 <<
"[NotificationQueue] Unexpected state while draining pipe: bytes_read=" 571 << bytes_read <<
" bytes, expected <= 1";
573 LOG_IF(ERROR, (
signal_ && bytes_read == 0) || (!
signal_ && bytes_read > 0))
574 <<
"[NotificationQueue] Unexpected state while draining signals: signal_=" 575 <<
signal_ <<
" bytes_read=" << bytes_read;
580 if (bytes_read > 0) {
581 eventBytes_ -= bytes_read;
601 template <
typename MessageTT>
606 auto data = std::make_unique<Node>(
625 template <
typename InputIteratorT>
627 InputIteratorT
first,
629 std::input_iterator_tag) {
632 boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q;
634 while (first != last) {
635 auto data = std::make_unique<Node>(
637 q.push_back(*
data.release());
650 std::unique_ptr<Node>
data;
652 data.reset(&q.front());
665 boost::intrusive::slist<Node, boost::intrusive::cache_last<true>>
queue_;
671 template <
typename MessageT>
677 if (destroyedFlagPtr_) {
678 *destroyedFlagPtr_ =
true;
684 template <
typename MessageT>
687 consumeMessages(
false);
690 template <
typename MessageT>
699 queue_->syncSignalAndQueue();
703 setActive(
false,
true);
706 if (numConsumed !=
nullptr) {
707 *numConsumed = numProcessed;
726 queue_->spinlock_.unlock();
731 std::unique_ptr<Node>
data;
732 data.reset(&
queue_->queue_.front());
733 queue_->queue_.pop_front();
738 bool wasEmpty =
queue_->queue_.empty();
744 queue_->spinlock_.unlock();
750 bool callbackDestroyed =
false;
751 CHECK(destroyedFlagPtr_ ==
nullptr);
752 destroyedFlagPtr_ = &callbackDestroyed;
754 destroyedFlagPtr_ =
nullptr;
757 if (callbackDestroyed) {
768 if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
781 }
catch (
const std::exception&) {
794 queue_->spinlock_.unlock();
802 template <
typename MessageT>
807 assert(
queue_ ==
nullptr);
808 assert(!isHandlerRegistered());
821 if (
queue_->eventfd_ >= 0) {
822 initHandler(eventBase,
queue_->eventfd_);
824 initHandler(eventBase,
queue_->pipeFds_[0]);
828 template <
typename MessageT>
831 assert(!isHandlerRegistered());
841 assert(isHandlerRegistered());
847 template <
typename MessageT>
858 consumeMessages(
true, numConsumed);
861 queue_->draining_ =
false;
866 template <
typename MessageT>
867 template <
typename F>
871 queue_.syncSignalAndQueue();
877 std::unique_ptr<Node>
data;
885 data.reset(&
queue_.queue_.front());
886 queue_.queue_.pop_front();
904 template <
typename MessageT,
typename TCallback>
907 template <
typename UCallback>
915 "callback must be declared noexcept, e.g.: `[]() noexcept {}`");
926 template <
typename MessageT>
927 template <
typename TCallback>
932 return std::unique_ptr<
933 NotificationQueue<MessageT>::Consumer,
938 std::forward<TCallback>(callback)));
std::shared_ptr< RequestContext > ctx_
void ensureSignal() const
void setMaxReadAtOnce(uint32_t maxAtOnce)
void ensureSignalLocked() const
void consumeUntilDrained(F &&foreach)
static std::shared_ptr< RequestContext > setContext(std::shared_ptr< RequestContext > ctx)
void write(const T &in, folly::io::Appender &appender)
#define eventfd(initval, flags)
void consumeMessages(bool isDrain, size_t *numConsumed=nullptr) noexcept
void startConsumingInternal(EventBase *eventBase, NotificationQueue *queue)
folly::SpinLock spinlock_
constexpr detail::Map< Move > move
ssize_t readNoInt(int fd, void *buf, size_t count)
NotificationQueue * getCurrentQueue() const
EventBase * getEventBase()
NotificationQueue & operator=(NotificationQueue const &)=delete
—— Concurrent Priority Queue Implementation ——
FOLLY_ALWAYS_INLINE bool try_lock() const noexcept
requires E e noexcept(noexcept(s.error(std::move(e))))
uint32_t advisoryMaxQueueSize_
void putMessage(MessageTT &&message)
uint32_t getMaxReadAtOnce() const
std::atomic< int > numActiveConsumers_
void putMessagesImpl(InputIteratorT first, InputIteratorT last, std::input_iterator_tag)
void init(int *argc, char ***argv, bool removeFlags)
static std::shared_ptr< RequestContext > saveContext()
SimpleConsumer(NotificationQueue &queue)
folly::Synchronized< EventBase * > base_
constexpr auto data(C &c) -> decltype(c.data())
void setActive(bool active, bool shouldLock=false)
bool tryConsume(MessageT &result)
void dcheckIsInEventBaseThread() const
bool tryPutMessageNoThrow(MessageTT &&message)
void handlerReady(uint16_t events) noexceptoverride
void putMessages(InputIteratorT first, InputIteratorT last)
void drainSignalsLocked()
bool checkQueueSize(size_t maxSize, bool throws=true) const
NotificationQueue & queue_
NotificationQueue(uint32_t maxSize=0, FdType fdType=FdType::PIPE)
Node(MessageTT &&msg, std::shared_ptr< RequestContext > ctx)
notification_queue_consumer_wrapper(UCallback &&callback)
folly::Function< void()> callback_
NotificationQueue * queue_
bool putMessageImpl(MessageTT &&message, size_t maxSize, bool throws=true)
void init(EventBase *eventBase, NotificationQueue *queue)
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
void throwSystemError(Args &&...args)
void startConsuming(EventBase *eventBase, NotificationQueue *queue)
int close(NetworkSocket s)
bool checkDraining(bool throws=true)
void messageAvailable(MessageT &&message) noexceptoverride
bool consumeUntilDrained(size_t *numConsumed=nullptr) noexcept
void syncSignalAndQueue()
void pipe(CPUExecutor cpu, IOExecutor io)
void setMaxQueueSize(uint32_t max)
static std::unique_ptr< Consumer, DelayedDestruction::Destructor > make(TCallback &&callback)
constexpr detail::First first
void tryPutMessage(MessageTT &&message)