19 #include <sys/types.h> 34 using namespace folly;
49 std::function<void(int)>
fn;
56 : queue(maxSize, type), terminationQueue(maxSize, type) {}
63 void destroyCallback();
75 consumer.
fn = [&](int) {
98 consumer.
fn = [&](
int msg) {
106 consumer2.
fn = [&](
int msg) {
114 list<int> msgList = {1, 2, 3, 4};
115 vector<int> msgVector = {5, 0, 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 6, 10, 2, 0};
117 queue.putMessages(msgList.begin(), msgList.end());
118 queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
121 queue.putMessages(msgVector.begin(), msgVector.end());
126 vector<int> expectedMessages = {1, 2, 3, 4, 9, 8, 7, 5, 0};
127 vector<int> expectedMessages2 = {9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0};
129 for (
unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
133 for (
unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
143 vector<QueueConsumer> consumers(numConsumers);
144 vector<ScopedEventBaseThread>
threads(numConsumers);
146 for (
uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
149 consumer->
fn = [consumer, consumerIdx,
this](
int value) {
154 terminationQueue.putMessage(consumerIdx);
158 EventBase* eventBase = threads[consumerIdx].getEventBase();
166 for (
uint32_t n = 1; n < numMessages; ++n) {
170 for (
uint32_t n = 0; n < numConsumers; ++n) {
177 vector<uint32_t> consumersStopped(numConsumers, 0);
178 uint32_t consumersRemaining = numConsumers;
179 terminationConsumer.
fn = [&](
int consumerIdx) {
180 --consumersRemaining;
181 if (consumersRemaining == 0) {
187 ++consumersStopped[consumerIdx];
190 terminationConsumer.
startConsuming(&eventBase, &terminationQueue);
194 for (
uint32_t n = 0; n < numConsumers; ++n) {
200 vector<int> messageCount(numMessages, 0);
201 for (
uint32_t n = 0; n < numConsumers; ++n) {
202 for (
int msg : consumers[n].
messages) {
211 EXPECT_EQ(numConsumers, messageCount[0]);
213 for (
uint32_t n = 1; n < numMessages; ++n) {
221 for (
int n = 0; n < 5; ++n) {
222 queue.tryPutMessage(n);
226 EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
238 queue.tryPutMessage(5);
240 EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
250 EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
255 queue.tryPutMessage(7);
277 for (
int n = 0; n < 100; ++n) {
285 std::vector<uint32_t> messagesPerLoop;
286 std::function<void()> loopFinished = [&] {
288 messagesPerLoop.push_back(messagesThisLoop);
290 messagesThisLoop = 0;
295 if (messagesPerLoop.size() != 55) {
324 for (
int n = 0; n < 100; ++n) {
333 if (messagesThisLoop > 0) {
334 messagesPerLoop.push_back(messagesThisLoop);
335 messagesThisLoop = 0;
341 for (
int n = 0; n < 5; ++n) {
344 for (
int n = 5; n < 55; ++n) {
359 DestructorGuard
g(
this);
365 std::function<void(int)>* fn;
368 ~DestroyTestConsumer()
override =
default;
381 std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
382 consumer1(
new DestroyTestConsumer);
383 std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
384 consumer2(
new DestroyTestConsumer);
385 std::function<void(int)> fn = [&](int) {
392 consumer1->startConsuming(&eventBase, &queue);
393 consumer2->startConsuming(&eventBase, &queue);
408 TEST(NotificationQueueTest, ConsumeUntilDrained) {
416 consumer.
fn = [&](
int i) {
420 std::vector<int> ints{1, 2, 3};
422 queue.
putMessages(ints.begin(), ints.end()), std::runtime_error);
426 for (
int i = 0;
i < 20;
i++) {
434 consumer.
fn = [&](
int ) { callbackBaton.wait(); };
438 atomic<bool> raceA{
false};
439 atomic<bool> raceB{
false};
442 auto thread = std::thread([&] {
443 threadStartBaton.post();
446 threadStartBaton.wait();
448 callbackBaton.post();
455 TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
456 for (
size_t i = 0;
i < 1 << 8; ++
i) {
464 consumer.
fn = [&](
int j) {
468 std::vector<int> ints{1, 2, 3};
470 queue.
putMessages(ints.begin(), ints.end()), std::runtime_error);
474 for (
int j = 0; j < 20; j++) {
482 consumer.
fn = [&](
int ) { callbackBaton.wait(); };
486 atomic<bool> raceA{
false};
487 atomic<bool> raceB{
false};
490 auto thread = std::thread([&] {
491 threadStartBaton.post();
494 threadStartBaton.wait();
496 callbackBaton.post();
504 #ifdef FOLLY_HAVE_EVENTFD 505 TEST(NotificationQueueTest, SendOneEventFD) {
506 QueueTest qt(0, IntQueue::FdType::EVENTFD);
510 TEST(NotificationQueueTest, PutMessagesEventFD) {
511 QueueTest qt(0, IntQueue::FdType::EVENTFD);
515 TEST(NotificationQueueTest, MultiConsumerEventFD) {
516 QueueTest qt(0, IntQueue::FdType::EVENTFD);
520 TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
521 QueueTest qt(5, IntQueue::FdType::EVENTFD);
525 TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
526 QueueTest qt(0, IntQueue::FdType::EVENTFD);
530 TEST(NotificationQueueTest, DestroyCallbackEventFD) {
531 QueueTest qt(0, IntQueue::FdType::EVENTFD);
536 TEST(NotificationQueueTest, SendOnePipe) {
541 TEST(NotificationQueueTest, PutMessagesPipe) {
546 TEST(NotificationQueueTest, MultiConsumerPipe) {
551 TEST(NotificationQueueTest, MaxQueueSizePipe) {
556 TEST(NotificationQueueTest, MaxReadAtOncePipe) {
561 TEST(NotificationQueueTest, DestroyCallbackPipe) {
575 TEST(NotificationQueueTest, UseAfterFork) {
582 signal(SIGCHLD, SIG_DFL);
586 LOG(
INFO) <<
"This test makes sure the child process crashes. " 587 <<
"Error log messagges and a backtrace are expected.";
604 signal(SIGABRT, SIG_DFL);
618 auto waited = waitpid(pid, &childStatus, 0);
628 EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
641 TEST(NotificationQueueConsumer, make) {
647 decltype(queue)::Consumer::make([&](
int&& msg)
noexcept { value = msg; });
649 consumer->startConsuming(&evb, &queue);
651 int const newValue = 10;
EventBase * getEventBase() const
static struct message messages[5]
void setMaxReadAtOnce(uint32_t maxAtOnce)
#define EXPECT_THROW(statement, expected_exception)
#define EXPECT_EQ(val1, val2)
constexpr detail::Map< Move > move
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
#define EXPECT_GE(val1, val2)
void putMessage(MessageTT &&message)
std::function< void(int)> fn
std::vector< std::thread::id > threads
void runInLoop(LoopCallback *callback, bool thisIteration=false)
std::deque< int > messages
bool loopOnce(int flags=0)
NotificationQueue< int > IntQueue
bool runInEventBaseThread(void(*fn)(T *), T *arg)
static const char *const value
bool tryPutMessageNoThrow(MessageTT &&message)
void putMessages(InputIteratorT first, InputIteratorT last)
IntQueue terminationQueue
#define EXPECT_TRUE(condition)
QueueTest(uint32_t maxSize, IntQueue::FdType type)
void messageAvailable(int &&value) noexceptoverride
#define EXPECT_FALSE(condition)
void startConsuming(EventBase *eventBase, NotificationQueue *queue)
#define EXPECT_LT(val1, val2)
bool consumeUntilDrained(size_t *numConsumed=nullptr) noexcept
TEST(NotificationQueueTest, ConsumeUntilDrained)
void tryPutMessage(MessageTT &&message)