proxygen
NotificationQueueTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <sys/types.h>
20 
21 #include <iostream>
22 #include <list>
23 #include <thread>
24 
28 
29 #ifndef _WIN32
30 #include <sys/wait.h>
31 #endif
32 
33 using namespace std;
34 using namespace folly;
35 
37 
39  public:
41 
42  void messageAvailable(int&& value) noexcept override {
43  messages.push_back(value);
44  if (fn) {
45  fn(value);
46  }
47  }
48 
49  std::function<void(int)> fn;
50  std::deque<int> messages;
51 };
52 
53 class QueueTest {
54  public:
56  : queue(maxSize, type), terminationQueue(maxSize, type) {}
57 
58  void sendOne();
59  void putMessages();
60  void multiConsumer();
61  void maxQueueSize();
62  void maxReadAtOnce();
63  void destroyCallback();
64  void useAfterFork();
65 
68 };
69 
71  // Create a notification queue and a callback in this thread
72  EventBase eventBase;
73 
74  QueueConsumer consumer;
75  consumer.fn = [&](int) {
76  // Stop consuming after we receive 1 message
77  consumer.stopConsuming();
78  };
79  consumer.startConsuming(&eventBase, &queue);
80 
81  // Start a new EventBase thread to put a message on our queue
83  t1.getEventBase()->runInEventBaseThread([&] { this->queue.putMessage(5); });
84 
85  // Loop until we receive the message
86  eventBase.loop();
87 
88  const auto& messages = consumer.messages;
89  EXPECT_EQ(1, messages.size());
90  EXPECT_EQ(5, messages.at(0));
91 }
92 
94  EventBase eventBase;
95 
96  QueueConsumer consumer;
97  QueueConsumer consumer2;
98  consumer.fn = [&](int msg) {
99  // Stop consuming after we receive a message with value 0, and start
100  // consumer2
101  if (msg == 0) {
102  consumer.stopConsuming();
103  consumer2.startConsuming(&eventBase, &queue);
104  }
105  };
106  consumer2.fn = [&](int msg) {
107  // Stop consuming after we receive a message with value 0
108  if (msg == 0) {
109  consumer2.stopConsuming();
110  }
111  };
112  consumer.startConsuming(&eventBase, &queue);
113 
114  list<int> msgList = {1, 2, 3, 4};
115  vector<int> msgVector = {5, 0, 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 6, 10, 2, 0};
116  // Call putMessages() several times to add messages to the queue
117  queue.putMessages(msgList.begin(), msgList.end());
118  queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
119  // Test sending 17 messages, the pipe-based queue calls write in 16 byte
120  // chunks
121  queue.putMessages(msgVector.begin(), msgVector.end());
122 
123  // Loop until the consumer has stopped
124  eventBase.loop();
125 
126  vector<int> expectedMessages = {1, 2, 3, 4, 9, 8, 7, 5, 0};
127  vector<int> expectedMessages2 = {9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0};
128  EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
129  for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
130  EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
131  }
132  EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
133  for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
134  EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
135  }
136 }
137 
139  uint32_t numConsumers = 8;
140  uint32_t numMessages = 10000;
141 
142  // Create several consumers each running in their own EventBase thread
143  vector<QueueConsumer> consumers(numConsumers);
144  vector<ScopedEventBaseThread> threads(numConsumers);
145 
146  for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
147  QueueConsumer* consumer = &consumers[consumerIdx];
148 
149  consumer->fn = [consumer, consumerIdx, this](int value) {
150  // Treat 0 as a signal to stop.
151  if (value == 0) {
152  consumer->stopConsuming();
153  // Put a message on the terminationQueue to indicate we have stopped
154  terminationQueue.putMessage(consumerIdx);
155  }
156  };
157 
158  EventBase* eventBase = threads[consumerIdx].getEventBase();
159  eventBase->runInEventBaseThread([eventBase, consumer, this] {
160  consumer->startConsuming(eventBase, &queue);
161  });
162  }
163 
164  // Now add a number of messages from this thread
165  // Start at 1 rather than 0, since 0 is the signal to stop.
166  for (uint32_t n = 1; n < numMessages; ++n) {
167  queue.putMessage(n);
168  }
169  // Now add a 0 for each consumer, to signal them to stop
170  for (uint32_t n = 0; n < numConsumers; ++n) {
171  queue.putMessage(0);
172  }
173 
174  // Wait until we get notified that all of the consumers have stopped
175  // We use a separate notification queue for this.
176  QueueConsumer terminationConsumer;
177  vector<uint32_t> consumersStopped(numConsumers, 0);
178  uint32_t consumersRemaining = numConsumers;
179  terminationConsumer.fn = [&](int consumerIdx) {
180  --consumersRemaining;
181  if (consumersRemaining == 0) {
182  terminationConsumer.stopConsuming();
183  }
184 
185  EXPECT_GE(consumerIdx, 0);
186  EXPECT_LT(consumerIdx, numConsumers);
187  ++consumersStopped[consumerIdx];
188  };
189  EventBase eventBase;
190  terminationConsumer.startConsuming(&eventBase, &terminationQueue);
191  eventBase.loop();
192 
193  // Verify that we saw exactly 1 stop message for each consumer
194  for (uint32_t n = 0; n < numConsumers; ++n) {
195  EXPECT_EQ(1, consumersStopped[n]);
196  }
197 
198  // Validate that every message sent to the main queue was received exactly
199  // once.
200  vector<int> messageCount(numMessages, 0);
201  for (uint32_t n = 0; n < numConsumers; ++n) {
202  for (int msg : consumers[n].messages) {
203  EXPECT_GE(msg, 0);
204  EXPECT_LT(msg, numMessages);
205  ++messageCount[msg];
206  }
207  }
208 
209  // 0 is the signal to stop, and should have been received once by each
210  // consumer
211  EXPECT_EQ(numConsumers, messageCount[0]);
212  // All other messages should have been received exactly once
213  for (uint32_t n = 1; n < numMessages; ++n) {
214  EXPECT_EQ(1, messageCount[n]);
215  }
216 }
217 
219  // Create a queue with a maximum size of 5, and fill it up
220 
221  for (int n = 0; n < 5; ++n) {
222  queue.tryPutMessage(n);
223  }
224 
225  // Calling tryPutMessage() now should fail
226  EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
227 
228  EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
229  int val = 5;
230  EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
231 
232  // Pop a message from the queue
233  int result = -1;
234  EXPECT_TRUE(queue.tryConsume(result));
235  EXPECT_EQ(0, result);
236 
237  // We should be able to write another message now that we popped one off.
238  queue.tryPutMessage(5);
239  // But now we are full again.
240  EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
241  // putMessage() should let us exceed the maximum
242  queue.putMessage(6);
243 
244  // Pull another mesage off
245  EXPECT_TRUE(queue.tryConsume(result));
246  EXPECT_EQ(1, result);
247 
248  // tryPutMessage() should still fail since putMessage() actually put us over
249  // the max.
250  EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
251 
252  // Pull another message off and try again
253  EXPECT_TRUE(queue.tryConsume(result));
254  EXPECT_EQ(2, result);
255  queue.tryPutMessage(7);
256 
257  // Now pull all the remaining messages off
258  EXPECT_TRUE(queue.tryConsume(result));
259  EXPECT_EQ(3, result);
260  EXPECT_TRUE(queue.tryConsume(result));
261  EXPECT_EQ(4, result);
262  EXPECT_TRUE(queue.tryConsume(result));
263  EXPECT_EQ(5, result);
264  EXPECT_TRUE(queue.tryConsume(result));
265  EXPECT_EQ(6, result);
266  EXPECT_TRUE(queue.tryConsume(result));
267  EXPECT_EQ(7, result);
268 
269  // There should be no messages left
270  result = -1;
271  EXPECT_TRUE(!queue.tryConsume(result));
272  EXPECT_EQ(-1, result);
273 }
274 
276  // Add 100 messages to the queue
277  for (int n = 0; n < 100; ++n) {
278  queue.putMessage(n);
279  }
280 
281  EventBase eventBase;
282 
283  // Record how many messages were processed each loop iteration.
284  uint32_t messagesThisLoop = 0;
285  std::vector<uint32_t> messagesPerLoop;
286  std::function<void()> loopFinished = [&] {
287  // Record the current number of messages read this loop
288  messagesPerLoop.push_back(messagesThisLoop);
289  // Reset messagesThisLoop to 0 for the next loop
290  messagesThisLoop = 0;
291 
292  // To prevent use-after-free bugs when eventBase destructs,
293  // prevent calling runInLoop any more after the test is finished.
294  // 55 == number of times loop should run.
295  if (messagesPerLoop.size() != 55) {
296  // Reschedule ourself to run at the end of the next loop
297  eventBase.runInLoop(loopFinished);
298  }
299  };
300  // Schedule the first call to loopFinished
301  eventBase.runInLoop(loopFinished);
302 
303  QueueConsumer consumer;
304  // Read the first 50 messages 10 at a time.
305  consumer.setMaxReadAtOnce(10);
306  consumer.fn = [&](int value) {
307  ++messagesThisLoop;
308  // After 50 messages, drop to reading only 1 message at a time.
309  if (value == 50) {
310  consumer.setMaxReadAtOnce(1);
311  }
312  // Terminate the loop when we reach the end of the messages.
313  if (value == 99) {
314  eventBase.terminateLoopSoon();
315  }
316  };
317  consumer.startConsuming(&eventBase, &queue);
318 
319  // Run the event loop until the consumer terminates it
320  eventBase.loop();
321 
322  // The consumer should have read all 100 messages in order
323  EXPECT_EQ(100, consumer.messages.size());
324  for (int n = 0; n < 100; ++n) {
325  EXPECT_EQ(n, consumer.messages.at(n));
326  }
327 
328  // Currently EventBase happens to still run the loop callbacks even after
329  // terminateLoopSoon() is called. However, we don't really want to depend on
330  // this behavior. In case this ever changes in the future, add
331  // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
332  // last loop iteration.
333  if (messagesThisLoop > 0) {
334  messagesPerLoop.push_back(messagesThisLoop);
335  messagesThisLoop = 0;
336  }
337 
338  // For the first 5 loops it should have read 10 messages each time.
339  // After that it should have read 1 messages per loop for the next 50 loops.
340  EXPECT_EQ(55, messagesPerLoop.size());
341  for (int n = 0; n < 5; ++n) {
342  EXPECT_EQ(10, messagesPerLoop.at(n));
343  }
344  for (int n = 5; n < 55; ++n) {
345  EXPECT_EQ(1, messagesPerLoop.at(n));
346  }
347 }
348 
350  // Rather than using QueueConsumer, define a separate class for the destroy
351  // test. The DestroyTestConsumer will delete itself inside the
352  // messageAvailable() callback. With a regular QueueConsumer this would
353  // destroy the std::function object while the function is running, which we
354  // should probably avoid doing. This uses a pointer to a std::function to
355  // avoid destroying the function object.
356  class DestroyTestConsumer : public IntQueue::Consumer {
357  public:
358  void messageAvailable(int&& value) noexcept override {
359  DestructorGuard g(this);
360  if (fn && *fn) {
361  (*fn)(value);
362  }
363  }
364 
365  std::function<void(int)>* fn;
366 
367  protected:
368  ~DestroyTestConsumer() override = default;
369  };
370 
371  EventBase eventBase;
372  // Create a queue and add 2 messages to it
373  queue.putMessage(1);
374  queue.putMessage(2);
375 
376  // Create two QueueConsumers allocated on the heap.
377  // Have whichever one gets called first destroy both of the QueueConsumers.
378  // This way one consumer will be destroyed from inside its messageAvailable()
379  // callback, and one consume will be destroyed when it isn't inside
380  // messageAvailable().
381  std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
382  consumer1(new DestroyTestConsumer);
383  std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
384  consumer2(new DestroyTestConsumer);
385  std::function<void(int)> fn = [&](int) {
386  consumer1 = nullptr;
387  consumer2 = nullptr;
388  };
389  consumer1->fn = &fn;
390  consumer2->fn = &fn;
391 
392  consumer1->startConsuming(&eventBase, &queue);
393  consumer2->startConsuming(&eventBase, &queue);
394 
395  // Run the event loop.
396  eventBase.loop();
397 
398  // One of the consumers should have fired, received the message,
399  // then destroyed both consumers.
400  EXPECT_TRUE(!consumer1);
401  EXPECT_TRUE(!consumer2);
402  // One message should be left in the queue
403  int result = 1;
404  EXPECT_TRUE(queue.tryConsume(result));
405  EXPECT_EQ(2, result);
406 }
407 
408 TEST(NotificationQueueTest, ConsumeUntilDrained) {
409  // Basic tests: make sure we
410  // - drain all the messages
411  // - ignore any maxReadAtOnce
412  // - can't add messages during draining
413  EventBase eventBase;
414  IntQueue queue;
415  QueueConsumer consumer;
416  consumer.fn = [&](int i) {
417  EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
419  EXPECT_THROW(queue.putMessage(i), std::runtime_error);
420  std::vector<int> ints{1, 2, 3};
421  EXPECT_THROW(
422  queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
423  };
424  consumer.setMaxReadAtOnce(10); // We should ignore this
425  consumer.startConsuming(&eventBase, &queue);
426  for (int i = 0; i < 20; i++) {
427  queue.putMessage(i);
428  }
429  EXPECT_TRUE(consumer.consumeUntilDrained());
430  EXPECT_EQ(20, consumer.messages.size());
431 
432  // Make sure there can only be one drainer at once
433  folly::Baton<> callbackBaton, threadStartBaton;
434  consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
435  QueueConsumer competingConsumer;
436  competingConsumer.startConsuming(&eventBase, &queue);
437  queue.putMessage(1);
438  atomic<bool> raceA{false};
439  atomic<bool> raceB{false};
440  size_t numConsA = 0;
441  size_t numConsB = 0;
442  auto thread = std::thread([&] {
443  threadStartBaton.post();
444  raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
445  });
446  threadStartBaton.wait();
447  raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
448  callbackBaton.post();
449  thread.join();
450  EXPECT_FALSE(raceA && raceB);
451  EXPECT_TRUE(raceA || raceB);
452  EXPECT_TRUE(raceA ^ raceB);
453 }
454 
455 TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
456  for (size_t i = 0; i < 1 << 8; ++i) {
457  // Basic tests: make sure we
458  // - drain all the messages
459  // - ignore any maxReadAtOnce
460  // - can't add messages during draining
461  EventBase eventBase;
462  IntQueue queue;
463  QueueConsumer consumer;
464  consumer.fn = [&](int j) {
465  EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
467  EXPECT_THROW(queue.putMessage(j), std::runtime_error);
468  std::vector<int> ints{1, 2, 3};
469  EXPECT_THROW(
470  queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
471  };
472  consumer.setMaxReadAtOnce(10); // We should ignore this
473  consumer.startConsuming(&eventBase, &queue);
474  for (int j = 0; j < 20; j++) {
475  queue.putMessage(j);
476  }
477  EXPECT_TRUE(consumer.consumeUntilDrained());
478  EXPECT_EQ(20, consumer.messages.size());
479 
480  // Make sure there can only be one drainer at once
481  folly::Baton<> callbackBaton, threadStartBaton;
482  consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
483  QueueConsumer competingConsumer;
484  competingConsumer.startConsuming(&eventBase, &queue);
485  queue.putMessage(1);
486  atomic<bool> raceA{false};
487  atomic<bool> raceB{false};
488  size_t numConsA = 0;
489  size_t numConsB = 0;
490  auto thread = std::thread([&] {
491  threadStartBaton.post();
492  raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
493  });
494  threadStartBaton.wait();
495  raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
496  callbackBaton.post();
497  thread.join();
498  EXPECT_FALSE(raceA && raceB);
499  EXPECT_TRUE(raceA || raceB);
500  EXPECT_TRUE(raceA ^ raceB);
501  }
502 }
503 
504 #ifdef FOLLY_HAVE_EVENTFD
505 TEST(NotificationQueueTest, SendOneEventFD) {
506  QueueTest qt(0, IntQueue::FdType::EVENTFD);
507  qt.sendOne();
508 }
509 
510 TEST(NotificationQueueTest, PutMessagesEventFD) {
511  QueueTest qt(0, IntQueue::FdType::EVENTFD);
512  qt.sendOne();
513 }
514 
515 TEST(NotificationQueueTest, MultiConsumerEventFD) {
516  QueueTest qt(0, IntQueue::FdType::EVENTFD);
517  qt.multiConsumer();
518 }
519 
520 TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
521  QueueTest qt(5, IntQueue::FdType::EVENTFD);
522  qt.maxQueueSize();
523 }
524 
525 TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
526  QueueTest qt(0, IntQueue::FdType::EVENTFD);
527  qt.maxReadAtOnce();
528 }
529 
530 TEST(NotificationQueueTest, DestroyCallbackEventFD) {
531  QueueTest qt(0, IntQueue::FdType::EVENTFD);
532  qt.destroyCallback();
533 }
534 #endif
535 
536 TEST(NotificationQueueTest, SendOnePipe) {
538  qt.sendOne();
539 }
540 
541 TEST(NotificationQueueTest, PutMessagesPipe) {
543  qt.sendOne();
544 }
545 
546 TEST(NotificationQueueTest, MultiConsumerPipe) {
548  qt.multiConsumer();
549 }
550 
551 TEST(NotificationQueueTest, MaxQueueSizePipe) {
553  qt.maxQueueSize();
554 }
555 
556 TEST(NotificationQueueTest, MaxReadAtOncePipe) {
558  qt.maxReadAtOnce();
559 }
560 
561 TEST(NotificationQueueTest, DestroyCallbackPipe) {
563  qt.destroyCallback();
564 }
565 
566 #ifndef _WIN32
567 /*
568  * Test code that creates a NotificationQueue, then forks, and incorrectly
569  * tries to send a message to the queue from the child process.
570  *
571  * The child process should crash in this scenario, since the child code has a
572  * bug. (Older versions of NotificationQueue didn't catch this in the child,
573  * resulting in a crash in the parent process.)
574  */
575 TEST(NotificationQueueTest, UseAfterFork) {
576  IntQueue queue;
577  int childStatus = 0;
578  QueueConsumer consumer;
579 
580  // Boost sets a custom SIGCHLD handler, which fails the test if a child
581  // process exits abnormally. We don't want this.
582  signal(SIGCHLD, SIG_DFL);
583 
584  // Log some info so users reading the test output aren't confused
585  // by the child process' crash log messages.
586  LOG(INFO) << "This test makes sure the child process crashes. "
587  << "Error log messagges and a backtrace are expected.";
588 
589  {
590  // Start a separate thread consuming from the queue
593  [&] { consumer.startConsuming(t1.getEventBase(), &queue); });
594 
595  // Send a message to it, just for sanity checking
596  queue.putMessage(1234);
597 
598  // Fork
599  pid_t pid = fork();
600  if (pid == 0) {
601  // The boost test framework installs signal handlers to catch errors.
602  // We only want to catch in the parent. In the child let SIGABRT crash
603  // us normally.
604  signal(SIGABRT, SIG_DFL);
605 
606  // Child.
607  // We're horrible people, so we try to send a message to the queue
608  // that is being consumed in the parent process.
609  //
610  // The putMessage() call should catch this error, and crash our process.
611  queue.putMessage(9876);
612  // We shouldn't reach here.
613  _exit(0);
614  }
615  PCHECK(pid > 0);
616 
617  // Parent. Wait for the child to exit.
618  auto waited = waitpid(pid, &childStatus, 0);
619  EXPECT_EQ(pid, waited);
620 
621  // Send another message to the queue before we terminate the thread.
622  queue.putMessage(5678);
623  }
624 
625  // The child process should have crashed when it tried to call putMessage()
626  // on our NotificationQueue.
627  EXPECT_TRUE(WIFSIGNALED(childStatus));
628  EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
629 
630  // Make sure the parent saw the expected messages.
631  // It should have gotten 1234 and 5678 from the parent process, but not
632  // 9876 from the child.
633  EXPECT_EQ(2, consumer.messages.size());
634  EXPECT_EQ(1234, consumer.messages.front());
635  consumer.messages.pop_front();
636  EXPECT_EQ(5678, consumer.messages.front());
637  consumer.messages.pop_front();
638 }
639 #endif
640 
641 TEST(NotificationQueueConsumer, make) {
642  int value = 0;
643  EventBase evb;
644  NotificationQueue<int> queue(32);
645 
646  auto consumer =
647  decltype(queue)::Consumer::make([&](int&& msg) noexcept { value = msg; });
648 
649  consumer->startConsuming(&evb, &queue);
650 
651  int const newValue = 10;
652  queue.tryPutMessage(newValue);
653 
654  evb.loopOnce();
655 
656  EXPECT_EQ(newValue, value);
657 }
static struct message messages[5]
Definition: test.c:75
void setMaxReadAtOnce(uint32_t maxAtOnce)
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
PskType type
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
double val
Definition: String.cpp:273
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
#define EXPECT_GE(val1, val2)
Definition: gtest.h:1932
void putMessage(MessageTT &&message)
std::function< void(int)> fn
std::vector< std::thread::id > threads
void runInLoop(LoopCallback *callback, bool thisIteration=false)
Definition: EventBase.cpp:520
std::deque< int > messages
bool loopOnce(int flags=0)
Definition: EventBase.cpp:271
void terminateLoopSoon()
Definition: EventBase.cpp:493
NotificationQueue< int > IntQueue
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
static const char *const value
Definition: Conv.cpp:50
bool tryPutMessageNoThrow(MessageTT &&message)
void putMessages(InputIteratorT first, InputIteratorT last)
IntQueue terminationQueue
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
QueueTest(uint32_t maxSize, IntQueue::FdType type)
void messageAvailable(int &&value) noexceptoverride
g_t g(f_t)
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
void startConsuming(EventBase *eventBase, NotificationQueue *queue)
#define EXPECT_LT(val1, val2)
Definition: gtest.h:1930
bool consumeUntilDrained(size_t *numConsumed=nullptr) noexcept
TEST(NotificationQueueTest, ConsumeUntilDrained)
void tryPutMessage(MessageTT &&message)