proxygen
NotificationQueue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <sys/types.h>
20 
21 #include <algorithm>
22 #include <iterator>
23 #include <memory>
24 #include <stdexcept>
25 #include <utility>
26 
27 #include <boost/intrusive/slist.hpp>
28 #include <folly/Exception.h>
29 #include <folly/FileUtil.h>
30 #include <folly/Likely.h>
31 #include <folly/ScopeGuard.h>
32 #include <folly/SpinLock.h>
36 #include <folly/io/async/Request.h>
40 
41 #include <glog/logging.h>
42 
43 #if __linux__ && !__ANDROID__
44 #define FOLLY_HAVE_EVENTFD
46 #endif
47 
48 namespace folly {
49 
68 template <typename MessageT>
69 class NotificationQueue {
70  struct Node : public boost::intrusive::slist_base_hook<
71  boost::intrusive::cache_last<true>> {
72  template <typename MessageTT>
73  Node(MessageTT&& msg, std::shared_ptr<RequestContext> ctx)
74  : msg_(std::forward<MessageTT>(msg)), ctx_(std::move(ctx)) {}
75  MessageT msg_;
76  std::shared_ptr<RequestContext> ctx_;
77  };
78 
79  public:
83  class Consumer : public DelayedDestruction, private EventHandler {
84  public:
85  enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
86 
88  : queue_(nullptr),
89  destroyedFlagPtr_(nullptr),
90  maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
91 
92  // create a consumer in-place, without the need to build new class
93  template <typename TCallback>
94  static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
95  TCallback&& callback);
96 
101  virtual void messageAvailable(MessageT&& message) noexcept = 0;
102 
114  void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
115  init(eventBase, queue);
116  registerHandler(READ | PERSIST);
117  }
118 
124  EventBase* eventBase,
125  NotificationQueue* queue) {
126  init(eventBase, queue);
127  registerInternalHandler(READ | PERSIST);
128  }
129 
136  void stopConsuming();
137 
147  bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
148 
155  return queue_;
156  }
157 
168  void setMaxReadAtOnce(uint32_t maxAtOnce) {
169  maxReadAtOnce_ = maxAtOnce;
170  }
172  return maxReadAtOnce_;
173  }
174 
176  return base_;
177  }
178 
179  void handlerReady(uint16_t events) noexcept override;
180 
181  protected:
182  void destroy() override;
183 
184  ~Consumer() override {}
185 
186  private:
197  void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
198 
199  void setActive(bool active, bool shouldLock = false) {
200  if (!queue_) {
201  active_ = active;
202  return;
203  }
204  if (shouldLock) {
205  queue_->spinlock_.lock();
206  }
207  if (!active_ && active) {
208  ++queue_->numActiveConsumers_;
209  } else if (active_ && !active) {
210  --queue_->numActiveConsumers_;
211  }
212  active_ = active;
213  if (shouldLock) {
214  queue_->spinlock_.unlock();
215  }
216  }
217  void init(EventBase* eventBase, NotificationQueue* queue);
218 
223  bool active_{false};
224  };
225 
227  public:
228  explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
229  ++queue_.numConsumers_;
230  }
231 
233  --queue_.numConsumers_;
234  }
235 
236  int getFd() const {
237  return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
238  }
239 
240  template <typename F>
241  void consumeUntilDrained(F&& foreach);
242 
243  private:
245  };
246 
247  enum class FdType {
248  PIPE,
249 #ifdef FOLLY_HAVE_EVENTFD
250  EVENTFD,
251 #endif
252  };
253 
270  uint32_t maxSize = 0,
271 #ifdef FOLLY_HAVE_EVENTFD
272  FdType fdType = FdType::EVENTFD)
273 #else
274  FdType fdType = FdType::PIPE)
275 #endif
276  : eventfd_(-1),
277  pipeFds_{-1, -1},
278  advisoryMaxQueueSize_(maxSize),
279  pid_(pid_t(getpid())) {
280 
281 #ifdef FOLLY_HAVE_EVENTFD
282  if (fdType == FdType::EVENTFD) {
284  if (eventfd_ == -1) {
285  if (errno == ENOSYS || errno == EINVAL) {
286  // eventfd not availalble
287  LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
288  << errno << ", falling back to pipe mode (is your kernel "
289  << "> 2.6.30?)";
290  fdType = FdType::PIPE;
291  } else {
292  // some other error
294  "Failed to create eventfd for "
295  "NotificationQueue",
296  errno);
297  }
298  }
299  }
300 #endif
301  if (fdType == FdType::PIPE) {
302  if (pipe(pipeFds_)) {
304  "Failed to create pipe for NotificationQueue", errno);
305  }
306  try {
307  // put both ends of the pipe into non-blocking mode
308  if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
310  "failed to put NotificationQueue pipe read "
311  "endpoint into non-blocking mode",
312  errno);
313  }
314  if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
316  "failed to put NotificationQueue pipe write "
317  "endpoint into non-blocking mode",
318  errno);
319  }
320  } catch (...) {
321  ::close(pipeFds_[0]);
322  ::close(pipeFds_[1]);
323  throw;
324  }
325  }
326  }
327 
329  std::unique_ptr<Node> data;
330  while (!queue_.empty()) {
331  data.reset(&queue_.front());
332  queue_.pop_front();
333  }
334  if (eventfd_ >= 0) {
335  ::close(eventfd_);
336  eventfd_ = -1;
337  }
338  if (pipeFds_[0] >= 0) {
339  ::close(pipeFds_[0]);
340  pipeFds_[0] = -1;
341  }
342  if (pipeFds_[1] >= 0) {
343  ::close(pipeFds_[1]);
344  pipeFds_[1] = -1;
345  }
346  }
347 
358  }
359 
376  template <typename MessageTT>
377  void tryPutMessage(MessageTT&& message) {
378  putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
379  }
380 
389  template <typename MessageTT>
390  bool tryPutMessageNoThrow(MessageTT&& message) {
391  return putMessageImpl(
392  std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
393  }
394 
407  template <typename MessageTT>
408  void putMessage(MessageTT&& message) {
409  putMessageImpl(std::forward<MessageTT>(message), 0);
410  }
411 
415  template <typename InputIteratorT>
416  void putMessages(InputIteratorT first, InputIteratorT last) {
417  typedef typename std::iterator_traits<InputIteratorT>::iterator_category
418  IterCategory;
419  putMessagesImpl(first, last, IterCategory());
420  }
421 
431  bool tryConsume(MessageT& result) {
432  SCOPE_EXIT {
434  };
435 
436  checkPid();
437  std::unique_ptr<Node> data;
438 
439  {
441 
442  if (UNLIKELY(queue_.empty())) {
443  return false;
444  }
445 
446  data.reset(&queue_.front());
447  queue_.pop_front();
448  }
449 
450  result = std::move(data->msg_);
452 
453  return true;
454  }
455 
456  size_t size() const {
458  return queue_.size();
459  }
460 
474  void checkPid() const {
475  CHECK_EQ(pid_, pid_t(getpid()));
476  }
477 
478  private:
479  // Forbidden copy constructor and assignment operator
480  NotificationQueue(NotificationQueue const&) = delete;
482 
483  inline bool checkQueueSize(size_t maxSize, bool throws = true) const {
484  DCHECK(0 == spinlock_.try_lock());
485  if (maxSize > 0 && queue_.size() >= maxSize) {
486  if (throws) {
487  throw std::overflow_error(
488  "unable to add message to NotificationQueue: "
489  "queue is full");
490  }
491  return false;
492  }
493  return true;
494  }
495 
496  inline bool checkDraining(bool throws = true) {
497  if (UNLIKELY(draining_ && throws)) {
498  throw std::runtime_error("queue is draining, cannot add message");
499  }
500  return draining_;
501  }
502 
503 #ifdef __ANDROID__
504  // TODO 10860938 Remove after figuring out crash
505  mutable std::atomic<int> eventBytes_{0};
506  mutable std::atomic<int> maxEventBytes_{0};
507 #endif
508 
509  void ensureSignalLocked() const {
510  // semantics: empty fd == empty queue <=> !signal_
511  if (signal_) {
512  return;
513  }
514 
515  ssize_t bytes_written = 0;
516  size_t bytes_expected = 0;
517 
518  do {
519  if (eventfd_ >= 0) {
520  // eventfd(2) dictates that we must write a 64-bit integer
521  uint64_t signal = 1;
522  bytes_expected = sizeof(signal);
523  bytes_written = ::write(eventfd_, &signal, bytes_expected);
524  } else {
525  uint8_t signal = 1;
526  bytes_expected = sizeof(signal);
527  bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
528  }
529  } while (bytes_written == -1 && errno == EINTR);
530 
531 #ifdef __ANDROID__
532  if (bytes_written > 0) {
533  eventBytes_ += bytes_written;
534  maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
535  }
536 #endif
537 
538  if (bytes_written == ssize_t(bytes_expected)) {
539  signal_ = true;
540  } else {
541 #ifdef __ANDROID__
542  LOG(ERROR) << "NotificationQueue Write Error=" << errno
543  << " bytesInPipe=" << eventBytes_
544  << " maxInPipe=" << maxEventBytes_ << " queue=" << size();
545 #endif
547  "failed to signal NotificationQueue after "
548  "write",
549  errno);
550  }
551  }
552 
554  ssize_t bytes_read = 0;
555  if (eventfd_ > 0) {
557  bytes_read = readNoInt(eventfd_, &message, sizeof(message));
558  CHECK(bytes_read != -1 || errno == EAGAIN);
559  } else {
560  // There should only be one byte in the pipe. To avoid potential leaks we
561  // still drain.
562  uint8_t message[32];
563  ssize_t result;
564  while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) !=
565  -1) {
566  bytes_read += result;
567  }
568  CHECK(result == -1 && errno == EAGAIN);
569  LOG_IF(ERROR, bytes_read > 1)
570  << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
571  << bytes_read << " bytes, expected <= 1";
572  }
573  LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
574  << "[NotificationQueue] Unexpected state while draining signals: signal_="
575  << signal_ << " bytes_read=" << bytes_read;
576 
577  signal_ = false;
578 
579 #ifdef __ANDROID__
580  if (bytes_read > 0) {
581  eventBytes_ -= bytes_read;
582  }
583 #endif
584  }
585 
586  void ensureSignal() const {
589  }
590 
593 
594  if (queue_.empty()) {
596  } else {
598  }
599  }
600 
601  template <typename MessageTT>
602  bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
603  checkPid();
604  bool signal = false;
605  {
606  auto data = std::make_unique<Node>(
607  std::forward<MessageTT>(message), RequestContext::saveContext());
609  if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
610  return false;
611  }
612  // We only need to signal an event if not all consumers are
613  // awake.
615  signal = true;
616  }
617  queue_.push_back(*data.release());
618  if (signal) {
620  }
621  }
622  return true;
623  }
624 
625  template <typename InputIteratorT>
627  InputIteratorT first,
628  InputIteratorT last,
629  std::input_iterator_tag) {
630  checkPid();
631  bool signal = false;
632  boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q;
633  try {
634  while (first != last) {
635  auto data = std::make_unique<Node>(
637  q.push_back(*data.release());
638  ++first;
639  }
641  checkDraining();
642  queue_.splice(queue_.end(), q);
644  signal = true;
645  }
646  if (signal) {
648  }
649  } catch (...) {
650  std::unique_ptr<Node> data;
651  while (!q.empty()) {
652  data.reset(&q.front());
653  q.pop_front();
654  }
655  throw;
656  }
657  }
658 
660  mutable bool signal_{false};
661  int eventfd_;
662  int pipeFds_[2]; // to fallback to on older/non-linux systems
664  pid_t pid_;
665  boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> queue_;
667  std::atomic<int> numActiveConsumers_{0};
668  bool draining_{false};
669 };
670 
671 template <typename MessageT>
673  // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
674  // will be non-nullptr. Mark the value that it points to, so that
675  // handlerReady() will know the callback is destroyed, and that it cannot
676  // access any member variables anymore.
677  if (destroyedFlagPtr_) {
678  *destroyedFlagPtr_ = true;
679  }
680  stopConsuming();
682 }
683 
684 template <typename MessageT>
686  uint16_t /*events*/) noexcept {
687  consumeMessages(false);
688 }
689 
690 template <typename MessageT>
692  bool isDrain,
693  size_t* numConsumed) noexcept {
694  DestructorGuard dg(this);
695  uint32_t numProcessed = 0;
696  setActive(true);
697  SCOPE_EXIT {
698  if (queue_) {
699  queue_->syncSignalAndQueue();
700  }
701  };
702  SCOPE_EXIT {
703  setActive(false, /* shouldLock = */ true);
704  };
705  SCOPE_EXIT {
706  if (numConsumed != nullptr) {
707  *numConsumed = numProcessed;
708  }
709  };
710  while (true) {
711  // Now pop the message off of the queue.
712  //
713  // We have to manually acquire and release the spinlock here, rather than
714  // using SpinLockHolder since the MessageT has to be constructed while
715  // holding the spinlock and available after we release it. SpinLockHolder
716  // unfortunately doesn't provide a release() method. (We can't construct
717  // MessageT first since we have no guarantee that MessageT has a default
718  // constructor.
719  queue_->spinlock_.lock();
720  bool locked = true;
721 
722  try {
723  if (UNLIKELY(queue_->queue_.empty())) {
724  // If there is no message, we've reached the end of the queue, return.
725  setActive(false);
726  queue_->spinlock_.unlock();
727  return;
728  }
729 
730  // Pull a message off the queue.
731  std::unique_ptr<Node> data;
732  data.reset(&queue_->queue_.front());
733  queue_->queue_.pop_front();
734 
735  // Check to see if the queue is empty now.
736  // We use this as an optimization to see if we should bother trying to
737  // loop again and read another message after invoking this callback.
738  bool wasEmpty = queue_->queue_.empty();
739  if (wasEmpty) {
740  setActive(false);
741  }
742 
743  // Now unlock the spinlock before we invoke the callback.
744  queue_->spinlock_.unlock();
745  RequestContextScopeGuard rctx(std::move(data->ctx_));
746 
747  locked = false;
748 
749  // Call the callback
750  bool callbackDestroyed = false;
751  CHECK(destroyedFlagPtr_ == nullptr);
752  destroyedFlagPtr_ = &callbackDestroyed;
753  messageAvailable(std::move(data->msg_));
754  destroyedFlagPtr_ = nullptr;
755 
756  // If the callback was destroyed before it returned, we are done
757  if (callbackDestroyed) {
758  return;
759  }
760 
761  // If the callback is no longer installed, we are done.
762  if (queue_ == nullptr) {
763  return;
764  }
765 
766  // If we have hit maxReadAtOnce_, we are done.
767  ++numProcessed;
768  if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
769  return;
770  }
771 
772  // If the queue was empty before we invoked the callback, it's probable
773  // that it is still empty now. Just go ahead and return, rather than
774  // looping again and trying to re-read from the eventfd. (If a new
775  // message had in fact arrived while we were invoking the callback, we
776  // will simply be woken up the next time around the event loop and will
777  // process the message then.)
778  if (wasEmpty) {
779  return;
780  }
781  } catch (const std::exception&) {
782  // This catch block is really just to handle the case where the MessageT
783  // constructor throws. The messageAvailable() callback itself is
784  // declared as noexcept and should never throw.
785  //
786  // If the MessageT constructor does throw we try to handle it as best as
787  // we can, but we can't work miracles. We will just ignore the error for
788  // now and return. The next time around the event loop we will end up
789  // trying to read the message again. If MessageT continues to throw we
790  // will never make forward progress and will keep trying each time around
791  // the event loop.
792  if (locked) {
793  // Unlock the spinlock.
794  queue_->spinlock_.unlock();
795  }
796 
797  return;
798  }
799  }
800 }
801 
802 template <typename MessageT>
804  EventBase* eventBase,
805  NotificationQueue* queue) {
806  eventBase->dcheckIsInEventBaseThread();
807  assert(queue_ == nullptr);
808  assert(!isHandlerRegistered());
809  queue->checkPid();
810 
811  base_ = eventBase;
812 
813  queue_ = queue;
814 
815  {
816  folly::SpinLockGuard g(queue_->spinlock_);
817  queue_->numConsumers_++;
818  }
819  queue_->ensureSignal();
820 
821  if (queue_->eventfd_ >= 0) {
822  initHandler(eventBase, queue_->eventfd_);
823  } else {
824  initHandler(eventBase, queue_->pipeFds_[0]);
825  }
826 }
827 
828 template <typename MessageT>
830  if (queue_ == nullptr) {
831  assert(!isHandlerRegistered());
832  return;
833  }
834 
835  {
836  folly::SpinLockGuard g(queue_->spinlock_);
837  queue_->numConsumers_--;
838  setActive(false);
839  }
840 
841  assert(isHandlerRegistered());
842  unregisterHandler();
843  detachEventBase();
844  queue_ = nullptr;
845 }
846 
847 template <typename MessageT>
849  size_t* numConsumed) noexcept {
850  DestructorGuard dg(this);
851  {
852  folly::SpinLockGuard g(queue_->spinlock_);
853  if (queue_->draining_) {
854  return false;
855  }
856  queue_->draining_ = true;
857  }
858  consumeMessages(true, numConsumed);
859  {
860  folly::SpinLockGuard g(queue_->spinlock_);
861  queue_->draining_ = false;
862  }
863  return true;
864 }
865 
866 template <typename MessageT>
867 template <typename F>
869  F&& foreach) {
870  SCOPE_EXIT {
871  queue_.syncSignalAndQueue();
872  };
873 
874  queue_.checkPid();
875 
876  while (true) {
877  std::unique_ptr<Node> data;
878  {
879  folly::SpinLockGuard g(queue_.spinlock_);
880 
881  if (UNLIKELY(queue_.queue_.empty())) {
882  return;
883  }
884 
885  data.reset(&queue_.queue_.front());
886  queue_.queue_.pop_front();
887  }
888 
889  RequestContextScopeGuard rctx(std::move(data->ctx_));
890  foreach(std::move(data->msg_));
891  // Make sure message destructor is called with the correct RequestContext.
892  data.reset();
893  }
894 }
895 
902 namespace detail {
903 
904 template <typename MessageT, typename TCallback>
906  : public NotificationQueue<MessageT>::Consumer {
907  template <typename UCallback>
908  explicit notification_queue_consumer_wrapper(UCallback&& callback)
909  : callback_(std::forward<UCallback>(callback)) {}
910 
911  // we are being stricter here and requiring noexcept for callback
912  void messageAvailable(MessageT&& message) noexcept override {
913  static_assert(
914  noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
915  "callback must be declared noexcept, e.g.: `[]() noexcept {}`");
916 
917  callback_(std::forward<MessageT>(message));
918  }
919 
920  private:
921  TCallback callback_;
922 };
923 
924 } // namespace detail
925 
926 template <typename MessageT>
927 template <typename TCallback>
928 std::unique_ptr<
932  return std::unique_ptr<
933  NotificationQueue<MessageT>::Consumer,
936  MessageT,
937  typename std::decay<TCallback>::type>(
938  std::forward<TCallback>(callback)));
939 }
940 
941 } // namespace folly
std::shared_ptr< RequestContext > ctx_
void setMaxReadAtOnce(uint32_t maxAtOnce)
Definition: test.c:42
static std::shared_ptr< RequestContext > setContext(std::shared_ptr< RequestContext > ctx)
Definition: Request.cpp:227
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
#define eventfd(initval, flags)
void consumeMessages(bool isDrain, size_t *numConsumed=nullptr) noexcept
LogLevel max
Definition: LogLevel.cpp:31
void startConsumingInternal(EventBase *eventBase, NotificationQueue *queue)
PskType type
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
ssize_t readNoInt(int fd, void *buf, size_t count)
Definition: FileUtil.cpp:102
STL namespace.
NotificationQueue * getCurrentQueue() const
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
NotificationQueue & operator=(NotificationQueue const &)=delete
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
FOLLY_ALWAYS_INLINE bool try_lock() const noexcept
Definition: SpinLock.h:55
requires E e noexcept(noexcept(s.error(std::move(e))))
#define nullptr
Definition: http_parser.c:41
void putMessage(MessageTT &&message)
static void destroy()
std::atomic< int > numActiveConsumers_
void putMessagesImpl(InputIteratorT first, InputIteratorT last, std::input_iterator_tag)
void init(int *argc, char ***argv, bool removeFlags)
Definition: Init.cpp:34
static std::shared_ptr< RequestContext > saveContext()
Definition: Request.h:196
folly::Synchronized< EventBase * > base_
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
void setActive(bool active, bool shouldLock=false)
bool tryConsume(MessageT &result)
std::string message
Definition: SPDYCodec.cpp:133
void dcheckIsInEventBaseThread() const
Definition: EventBase.h:520
bool tryPutMessageNoThrow(MessageTT &&message)
void handlerReady(uint16_t events) noexceptoverride
void putMessages(InputIteratorT first, InputIteratorT last)
bool checkQueueSize(size_t maxSize, bool throws=true) const
NotificationQueue(uint32_t maxSize=0, FdType fdType=FdType::PIPE)
Node(MessageTT &&msg, std::shared_ptr< RequestContext > ctx)
g_t g(f_t)
folly::Function< void()> callback_
bool putMessageImpl(MessageTT &&message, size_t maxSize, bool throws=true)
void init(EventBase *eventBase, NotificationQueue *queue)
boost::intrusive::slist< Node, boost::intrusive::cache_last< true > > queue_
void throwSystemError(Args &&...args)
Definition: Exception.h:76
void startConsuming(EventBase *eventBase, NotificationQueue *queue)
#define UNLIKELY(x)
Definition: Likely.h:48
int close(NetworkSocket s)
Definition: NetOps.cpp:90
bool checkDraining(bool throws=true)
void messageAvailable(MessageT &&message) noexceptoverride
bool consumeUntilDrained(size_t *numConsumed=nullptr) noexcept
void pipe(CPUExecutor cpu, IOExecutor io)
void setMaxQueueSize(uint32_t max)
static std::unique_ptr< Consumer, DelayedDestruction::Destructor > make(TCallback &&callback)
constexpr detail::First first
Definition: Base-inl.h:2553
void tryPutMessage(MessageTT &&message)