proxygen
NotificationQueueTest.cpp File Reference
#include <folly/io/async/NotificationQueue.h>
#include <sys/types.h>
#include <iostream>
#include <list>
#include <thread>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
#include <sys/wait.h>

Go to the source code of this file.

Classes

class  QueueConsumer
 
class  QueueTest
 

Typedefs

typedef NotificationQueue< int > IntQueue
 

Functions

 TEST (NotificationQueueTest, ConsumeUntilDrained)
 
 TEST (NotificationQueueTest, ConsumeUntilDrainedStress)
 
 TEST (NotificationQueueTest, SendOnePipe)
 
 TEST (NotificationQueueTest, PutMessagesPipe)
 
 TEST (NotificationQueueTest, MultiConsumerPipe)
 
 TEST (NotificationQueueTest, MaxQueueSizePipe)
 
 TEST (NotificationQueueTest, MaxReadAtOncePipe)
 
 TEST (NotificationQueueTest, DestroyCallbackPipe)
 
 TEST (NotificationQueueTest, UseAfterFork)
 
 TEST (NotificationQueueConsumer, make)
 

Typedef Documentation

Definition at line 36 of file NotificationQueueTest.cpp.

Function Documentation

TEST ( NotificationQueueTest  ,
ConsumeUntilDrained   
)

Definition at line 408 of file NotificationQueueTest.cpp.

References folly::NotificationQueue< MessageT >::Consumer::consumeUntilDrained(), EXPECT_EQ, EXPECT_FALSE, EXPECT_THROW, EXPECT_TRUE, QueueConsumer::fn, i, QueueConsumer::messages, folly::NotificationQueue< MessageT >::putMessage(), folly::NotificationQueue< MessageT >::putMessages(), folly::NotificationQueue< MessageT >::Consumer::setMaxReadAtOnce(), folly::NotificationQueue< MessageT >::Consumer::startConsuming(), folly::NotificationQueue< MessageT >::tryPutMessage(), and folly::NotificationQueue< MessageT >::tryPutMessageNoThrow().

Referenced by TEST().

408  {
409  // Basic tests: make sure we
410  // - drain all the messages
411  // - ignore any maxReadAtOnce
412  // - can't add messages during draining
413  EventBase eventBase;
414  IntQueue queue;
415  QueueConsumer consumer;
416  consumer.fn = [&](int i) {
417  EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
419  EXPECT_THROW(queue.putMessage(i), std::runtime_error);
420  std::vector<int> ints{1, 2, 3};
421  EXPECT_THROW(
422  queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
423  };
424  consumer.setMaxReadAtOnce(10); // We should ignore this
425  consumer.startConsuming(&eventBase, &queue);
426  for (int i = 0; i < 20; i++) {
427  queue.putMessage(i);
428  }
429  EXPECT_TRUE(consumer.consumeUntilDrained());
430  EXPECT_EQ(20, consumer.messages.size());
431 
432  // Make sure there can only be one drainer at once
433  folly::Baton<> callbackBaton, threadStartBaton;
434  consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
435  QueueConsumer competingConsumer;
436  competingConsumer.startConsuming(&eventBase, &queue);
437  queue.putMessage(1);
438  atomic<bool> raceA{false};
439  atomic<bool> raceB{false};
440  size_t numConsA = 0;
441  size_t numConsB = 0;
442  auto thread = std::thread([&] {
443  threadStartBaton.post();
444  raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
445  });
446  threadStartBaton.wait();
447  raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
448  callbackBaton.post();
449  thread.join();
450  EXPECT_FALSE(raceA && raceB);
451  EXPECT_TRUE(raceA || raceB);
452  EXPECT_TRUE(raceA ^ raceB);
453 }
void setMaxReadAtOnce(uint32_t maxAtOnce)
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
void putMessage(MessageTT &&message)
std::function< void(int)> fn
std::deque< int > messages
bool tryPutMessageNoThrow(MessageTT &&message)
void putMessages(InputIteratorT first, InputIteratorT last)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
void startConsuming(EventBase *eventBase, NotificationQueue *queue)
bool consumeUntilDrained(size_t *numConsumed=nullptr) noexcept
void tryPutMessage(MessageTT &&message)
TEST ( NotificationQueueTest  ,
ConsumeUntilDrainedStress   
)

Definition at line 455 of file NotificationQueueTest.cpp.

References folly::NotificationQueue< MessageT >::Consumer::consumeUntilDrained(), QueueTest::destroyCallback(), EXPECT_EQ, EXPECT_FALSE, EXPECT_THROW, EXPECT_TRUE, QueueConsumer::fn, i, QueueTest::maxQueueSize(), QueueTest::maxReadAtOnce(), QueueConsumer::messages, QueueTest::multiConsumer(), folly::NotificationQueue< MessageT >::putMessage(), folly::NotificationQueue< MessageT >::putMessages(), QueueTest::sendOne(), folly::NotificationQueue< MessageT >::Consumer::setMaxReadAtOnce(), folly::NotificationQueue< MessageT >::Consumer::startConsuming(), TEST(), folly::NotificationQueue< MessageT >::tryPutMessage(), and folly::NotificationQueue< MessageT >::tryPutMessageNoThrow().

455  {
456  for (size_t i = 0; i < 1 << 8; ++i) {
457  // Basic tests: make sure we
458  // - drain all the messages
459  // - ignore any maxReadAtOnce
460  // - can't add messages during draining
461  EventBase eventBase;
462  IntQueue queue;
463  QueueConsumer consumer;
464  consumer.fn = [&](int j) {
465  EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
467  EXPECT_THROW(queue.putMessage(j), std::runtime_error);
468  std::vector<int> ints{1, 2, 3};
469  EXPECT_THROW(
470  queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
471  };
472  consumer.setMaxReadAtOnce(10); // We should ignore this
473  consumer.startConsuming(&eventBase, &queue);
474  for (int j = 0; j < 20; j++) {
475  queue.putMessage(j);
476  }
477  EXPECT_TRUE(consumer.consumeUntilDrained());
478  EXPECT_EQ(20, consumer.messages.size());
479 
480  // Make sure there can only be one drainer at once
481  folly::Baton<> callbackBaton, threadStartBaton;
482  consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
483  QueueConsumer competingConsumer;
484  competingConsumer.startConsuming(&eventBase, &queue);
485  queue.putMessage(1);
486  atomic<bool> raceA{false};
487  atomic<bool> raceB{false};
488  size_t numConsA = 0;
489  size_t numConsB = 0;
490  auto thread = std::thread([&] {
491  threadStartBaton.post();
492  raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
493  });
494  threadStartBaton.wait();
495  raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
496  callbackBaton.post();
497  thread.join();
498  EXPECT_FALSE(raceA && raceB);
499  EXPECT_TRUE(raceA || raceB);
500  EXPECT_TRUE(raceA ^ raceB);
501  }
502 }
void setMaxReadAtOnce(uint32_t maxAtOnce)
#define EXPECT_THROW(statement, expected_exception)
Definition: gtest.h:1843
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
void putMessage(MessageTT &&message)
std::function< void(int)> fn
std::deque< int > messages
bool tryPutMessageNoThrow(MessageTT &&message)
void putMessages(InputIteratorT first, InputIteratorT last)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
void startConsuming(EventBase *eventBase, NotificationQueue *queue)
bool consumeUntilDrained(size_t *numConsumed=nullptr) noexcept
void tryPutMessage(MessageTT &&message)
TEST ( NotificationQueueTest  ,
SendOnePipe   
)
TEST ( NotificationQueueTest  ,
PutMessagesPipe   
)
TEST ( NotificationQueueTest  ,
MultiConsumerPipe   
)
TEST ( NotificationQueueTest  ,
MaxQueueSizePipe   
)
TEST ( NotificationQueueTest  ,
MaxReadAtOncePipe   
)
TEST ( NotificationQueueTest  ,
DestroyCallbackPipe   
)
TEST ( NotificationQueueTest  ,
UseAfterFork   
)

Definition at line 575 of file NotificationQueueTest.cpp.

References EXPECT_EQ, EXPECT_TRUE, folly::ScopedEventBaseThread::getEventBase(), folly::INFO, QueueConsumer::messages, folly::NotificationQueue< MessageT >::putMessage(), folly::EventBase::runInEventBaseThread(), and folly::NotificationQueue< MessageT >::Consumer::startConsuming().

575  {
576  IntQueue queue;
577  int childStatus = 0;
578  QueueConsumer consumer;
579 
580  // Boost sets a custom SIGCHLD handler, which fails the test if a child
581  // process exits abnormally. We don't want this.
582  signal(SIGCHLD, SIG_DFL);
583 
584  // Log some info so users reading the test output aren't confused
585  // by the child process' crash log messages.
586  LOG(INFO) << "This test makes sure the child process crashes. "
587  << "Error log messagges and a backtrace are expected.";
588 
589  {
590  // Start a separate thread consuming from the queue
593  [&] { consumer.startConsuming(t1.getEventBase(), &queue); });
594 
595  // Send a message to it, just for sanity checking
596  queue.putMessage(1234);
597 
598  // Fork
599  pid_t pid = fork();
600  if (pid == 0) {
601  // The boost test framework installs signal handlers to catch errors.
602  // We only want to catch in the parent. In the child let SIGABRT crash
603  // us normally.
604  signal(SIGABRT, SIG_DFL);
605 
606  // Child.
607  // We're horrible people, so we try to send a message to the queue
608  // that is being consumed in the parent process.
609  //
610  // The putMessage() call should catch this error, and crash our process.
611  queue.putMessage(9876);
612  // We shouldn't reach here.
613  _exit(0);
614  }
615  PCHECK(pid > 0);
616 
617  // Parent. Wait for the child to exit.
618  auto waited = waitpid(pid, &childStatus, 0);
619  EXPECT_EQ(pid, waited);
620 
621  // Send another message to the queue before we terminate the thread.
622  queue.putMessage(5678);
623  }
624 
625  // The child process should have crashed when it tried to call putMessage()
626  // on our NotificationQueue.
627  EXPECT_TRUE(WIFSIGNALED(childStatus));
628  EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
629 
630  // Make sure the parent saw the expected messages.
631  // It should have gotten 1234 and 5678 from the parent process, but not
632  // 9876 from the child.
633  EXPECT_EQ(2, consumer.messages.size());
634  EXPECT_EQ(1234, consumer.messages.front());
635  consumer.messages.pop_front();
636  EXPECT_EQ(5678, consumer.messages.front());
637  consumer.messages.pop_front();
638 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
void putMessage(MessageTT &&message)
std::deque< int > messages
bool runInEventBaseThread(void(*fn)(T *), T *arg)
Definition: EventBase.h:794
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
void startConsuming(EventBase *eventBase, NotificationQueue *queue)
TEST ( NotificationQueueConsumer  ,
make   
)

Definition at line 641 of file NotificationQueueTest.cpp.

References EXPECT_EQ, folly::EventBase::loopOnce(), folly::pushmi::__adl::noexcept(), folly::NotificationQueue< MessageT >::tryPutMessage(), and value.

641  {
642  int value = 0;
643  EventBase evb;
644  NotificationQueue<int> queue(32);
645 
646  auto consumer =
647  decltype(queue)::Consumer::make([&](int&& msg) noexcept { value = msg; });
648 
649  consumer->startConsuming(&evb, &queue);
650 
651  int const newValue = 10;
652  queue.tryPutMessage(newValue);
653 
654  evb.loopOnce();
655 
656  EXPECT_EQ(newValue, value);
657 }
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
requires E e noexcept(noexcept(s.error(std::move(e))))
bool loopOnce(int flags=0)
Definition: EventBase.cpp:271
static const char *const value
Definition: Conv.cpp:50