/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include #include #include #include #include #include "UnboundedMPSCQueue.h" #include "mozilla/Atomics.h" #include "mozilla/TaskQueue.h" #include "mozilla/UniquePtr.h" #include "nsThreadManager.h" #include "nsThreadUtils.h" namespace mozilla::test_mpsc_queue { struct MoveOnlyValue { explicit MoveOnlyValue(int32_t aValue = 0) : mValue(aValue) {} MoveOnlyValue(const MoveOnlyValue&) = delete; MoveOnlyValue& operator=(const MoveOnlyValue&) = delete; MoveOnlyValue(MoveOnlyValue&& aOther) : mValue(aOther.mValue) { aOther.mValue = -1; } MoveOnlyValue& operator=(MoveOnlyValue&& aOther) { mValue = aOther.mValue; aOther.mValue = -1; return *this; } int32_t mValue; }; struct CountedType { explicit CountedType(int32_t aValue = 0) : mValue(aValue) { ++sLiveCount; } CountedType(const CountedType& aOther) : mValue(aOther.mValue) { ++sLiveCount; } CountedType& operator=(const CountedType& aOther) = default; CountedType(CountedType&& aOther) : mValue(aOther.mValue) { aOther.mValue = -1; ++sLiveCount; } CountedType& operator=(CountedType&& aOther) { mValue = aOther.mValue; aOther.mValue = -1; return *this; } ~CountedType() { --sLiveCount; } static Atomic sLiveCount; int32_t mValue; }; Atomic CountedType::sLiveCount(0); template void PushValue(UnboundedMPSCQueue& aQueue, T aValue) { auto* msg = new typename UnboundedMPSCQueue::Message(); msg->data = std::move(aValue); aQueue.Push(msg); } // --- Single-threaded basics --- TEST(UnboundedMPSCQueue, PopEmptyReturnsFalse) { UnboundedMPSCQueue queue; int32_t output = 42; EXPECT_FALSE(queue.Pop(&output)); EXPECT_EQ(output, 42); } TEST(UnboundedMPSCQueue, SinglePushPop) { UnboundedMPSCQueue queue; PushValue(queue, 42); int32_t output = 0; EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, 42); EXPECT_FALSE(queue.Pop(&output)); } TEST(UnboundedMPSCQueue, FIFOOrdering) { UnboundedMPSCQueue queue; for (int32_t i = 0; i < 100; i++) { PushValue(queue, i); } for (int32_t i = 0; i < 100; i++) { int32_t output = -1; EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, i); } int32_t output = -1; EXPECT_FALSE(queue.Pop(&output)); } TEST(UnboundedMPSCQueue, ManyElements) { UnboundedMPSCQueue queue; const int32_t count = 10000; for (int32_t i = 0; i < count; i++) { PushValue(queue, i); } for (int32_t i = 0; i < count; i++) { int32_t output = -1; EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, i); } int32_t output = -1; EXPECT_FALSE(queue.Pop(&output)); } // Push/pop alternate one-at-a-time (not batched), catching bugs in node // recycling or state transitions between empty and non-empty. TEST(UnboundedMPSCQueue, InterleavedPushPop) { UnboundedMPSCQueue queue; // Push one, pop one, 50 times. for (int32_t i = 0; i < 50; i++) { PushValue(queue, i); int32_t output = -1; EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, i); } // Queue must be empty. int32_t output = -1; EXPECT_FALSE(queue.Pop(&output)); } // New pushes after a partial drain are appended correctly and don't corrupt // remaining unconsumed items -- catches stale-tail or linkage bugs. TEST(UnboundedMPSCQueue, PopAfterPartialDrain) { UnboundedMPSCQueue queue; // Enqueue 5 items. for (int32_t i = 0; i < 5; i++) { PushValue(queue, i); } // Drain only the first 3. for (int32_t i = 0; i < 3; i++) { int32_t output = -1; EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, i); } // Enqueue 3 more while 2 remain. for (int32_t i = 5; i < 8; i++) { PushValue(queue, i); } // Remaining + new items must appear in FIFO order. for (const auto& expected : {3, 4, 5, 6, 7}) { int32_t output = -1; EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, expected); } int32_t output = -1; EXPECT_FALSE(queue.Pop(&output)); } // --- Move-only types --- TEST(UnboundedMPSCQueue, MoveOnlyUniquePtr) { UnboundedMPSCQueue> queue; auto* msg = new UnboundedMPSCQueue>::Message(); msg->data = MakeUnique(99); queue.Push(msg); UniquePtr output; EXPECT_TRUE(queue.Pop(&output)); ASSERT_NE(output, nullptr); EXPECT_EQ(*output, 99); EXPECT_FALSE(queue.Pop(&output)); } TEST(UnboundedMPSCQueue, MoveOnlyStruct) { UnboundedMPSCQueue queue; for (int32_t i = 0; i < 10; i++) { PushValue(queue, MoveOnlyValue(i)); } for (int32_t i = 0; i < 10; i++) { MoveOnlyValue output; EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output.mValue, i); } MoveOnlyValue output; EXPECT_FALSE(queue.Pop(&output)); } TEST(UnboundedMPSCQueue, MoveOnlyFIFO) { UnboundedMPSCQueue> queue; for (int32_t i = 0; i < 100; i++) { auto* msg = new UnboundedMPSCQueue>::Message(); msg->data = MakeUnique(i); queue.Push(msg); } for (int32_t i = 0; i < 100; i++) { UniquePtr output; EXPECT_TRUE(queue.Pop(&output)); ASSERT_NE(output, nullptr); EXPECT_EQ(*output, i); } } // --- Complex types --- TEST(UnboundedMPSCQueue, StdString) { UnboundedMPSCQueue queue; PushValue(queue, "hello"); PushValue(queue, "world"); PushValue(queue, "foo"); std::string output; EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, "hello"); EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, "world"); EXPECT_TRUE(queue.Pop(&output)); EXPECT_EQ(output, "foo"); EXPECT_FALSE(queue.Pop(&output)); } // --- Destructor / edge cases --- TEST(UnboundedMPSCQueue, DestructorDrainsElements) { int32_t before = CountedType::sLiveCount; { UnboundedMPSCQueue queue; for (int32_t i = 0; i < 50; i++) { PushValue(queue, CountedType(i)); } } EXPECT_EQ(static_cast(CountedType::sLiveCount), before); } TEST(UnboundedMPSCQueue, DestructorDrainsMoveOnlyElements) { int32_t before = CountedType::sLiveCount; { UnboundedMPSCQueue> queue; for (int32_t i = 0; i < 50; i++) { auto* msg = new UnboundedMPSCQueue>::Message(); msg->data = MakeUnique(i); queue.Push(msg); } } EXPECT_EQ(static_cast(CountedType::sLiveCount), before); } TEST(UnboundedMPSCQueue, DestroyEmptyQueue) { { UnboundedMPSCQueue queue; } SUCCEED(); } // --- Multi-producer tests --- class UnboundedMPSCQueueMTTest : public ::testing::TestWithParam {}; INSTANTIATE_TEST_SUITE_P(ThreadCounts, UnboundedMPSCQueueMTTest, ::testing::Values(1, 2, 4, 8, 16)); // No items lost under concurrent multi-producer pushes (lost-update / ABA on // the head pointer). TEST_P(UnboundedMPSCQueueMTTest, AllItemsReceived) { const int32_t numThreads = GetParam(); UnboundedMPSCQueue queue; // Spawn N producer task queues. nsTArray> threads(numThreads); for (int32_t t = 0; t < numThreads; t++) { RefPtr tq = nsThreadManager::get().CreateBackgroundTaskQueue("MPSCTest"); ASSERT_NE(tq, nullptr); threads.AppendElement(std::move(tq)); } // Each producer pushes 10k items. for (int32_t t = 0; t < numThreads; t++) { MOZ_ALWAYS_SUCCEEDS( threads[t]->Dispatch(NS_NewRunnableFunction("MPSCPush", [&queue]() { for (int32_t i = 0; i < 10000; i++) { PushValue(queue, i); } }))); } // Wait for all producers to finish. for (int32_t t = numThreads - 1; t >= 0; t--) { threads[t]->BeginShutdown(); threads[t]->AwaitShutdownAndIdle(); } // Drain and count -- must equal N * 10k. int32_t total = 0; int32_t value; while (queue.Pop(&value)) { total++; } EXPECT_EQ(total, numThreads * 10000); } // Checks for data tearing -- each producer writes values in a unique range // (threadIdx * 1M + i), then we verify every expected value is present. TEST_P(UnboundedMPSCQueueMTTest, NoDataCorruption) { const int32_t numThreads = GetParam(); const int32_t itemsPerThread = 10000; UnboundedMPSCQueue queue; // Spawn N producer task queues. nsTArray> threads(numThreads); for (int32_t t = 0; t < numThreads; t++) { RefPtr tq = nsThreadManager::get().CreateBackgroundTaskQueue("MPSCTest"); ASSERT_NE(tq, nullptr); threads.AppendElement(std::move(tq)); } // Each producer pushes 10k items tagged with its thread index. for (int32_t t = 0; t < numThreads; t++) { int32_t threadIdx = t; MOZ_ALWAYS_SUCCEEDS(threads[t]->Dispatch( NS_NewRunnableFunction("MPSCPush", [&queue, threadIdx]() { for (int32_t i = 0; i < 10000; i++) { PushValue(queue, threadIdx * 1000000 + i); } }))); } // Wait for all producers. for (int32_t t = numThreads - 1; t >= 0; t--) { threads[t]->BeginShutdown(); threads[t]->AwaitShutdownAndIdle(); } // Collect all values. std::vector values; values.reserve(numThreads * itemsPerThread); int32_t value; while (queue.Pop(&value)) { values.push_back(value); } EXPECT_EQ(static_cast(values.size()), numThreads * itemsPerThread); // Sort and verify every expected value is present exactly once. std::sort(values.begin(), values.end()); for (int32_t t = 0; t < numThreads; t++) { for (int32_t i = 0; i < itemsPerThread; i++) { int32_t expected = t * 1000000 + i; auto it = std::lower_bound(values.begin(), values.end(), expected); ASSERT_NE(it, values.end()) << "Missing value " << expected; EXPECT_EQ(*it, expected); } } } // Same as AllItemsReceived but with UniquePtr to verify move semantics under // contention -- catches double-free or use-after-move. TEST_P(UnboundedMPSCQueueMTTest, MoveOnlyMultiProducer) { const int32_t numThreads = GetParam(); UnboundedMPSCQueue> queue; // Spawn N producer task queues. nsTArray> threads(numThreads); for (int32_t t = 0; t < numThreads; t++) { RefPtr tq = nsThreadManager::get().CreateBackgroundTaskQueue("MPSCTest"); ASSERT_NE(tq, nullptr); threads.AppendElement(std::move(tq)); } // Each producer pushes 10k heap-allocated items. for (int32_t t = 0; t < numThreads; t++) { MOZ_ALWAYS_SUCCEEDS( threads[t]->Dispatch(NS_NewRunnableFunction("MPSCPush", [&queue]() { for (int32_t i = 0; i < 10000; i++) { auto* msg = new UnboundedMPSCQueue>::Message(); msg->data = MakeUnique(i); queue.Push(msg); } }))); } // Wait for all producers. for (int32_t t = numThreads - 1; t >= 0; t--) { threads[t]->BeginShutdown(); threads[t]->AwaitShutdownAndIdle(); } // Drain, verify non-null, count. int32_t total = 0; UniquePtr value; while (queue.Pop(&value)) { ASSERT_NE(value, nullptr); total++; } EXPECT_EQ(total, numThreads * 10000); } // --- Stress tests --- // Brute-force stress test: 8 producers x 100k items to surface races that only // manifest under heavy contention. TEST(UnboundedMPSCQueue, StressHighVolume) { const int32_t numThreads = 8; UnboundedMPSCQueue queue; // Spawn 8 producer task queues. nsTArray> threads(numThreads); for (int32_t t = 0; t < numThreads; t++) { RefPtr tq = nsThreadManager::get().CreateBackgroundTaskQueue("MPSCStress"); ASSERT_NE(tq, nullptr); threads.AppendElement(std::move(tq)); } // Each producer pushes 100k items. for (int32_t t = 0; t < numThreads; t++) { MOZ_ALWAYS_SUCCEEDS( threads[t]->Dispatch(NS_NewRunnableFunction("MPSCPush", [&queue]() { for (int32_t i = 0; i < 100000; i++) { PushValue(queue, i); } }))); } // Wait, drain, verify total count. for (int32_t t = numThreads - 1; t >= 0; t--) { threads[t]->BeginShutdown(); threads[t]->AwaitShutdownAndIdle(); } int32_t total = 0; int32_t value; while (queue.Pop(&value)) { total++; } EXPECT_EQ(total, numThreads * 100000); } // True MPSC pattern -- multiple producers push while a single consumer pops // concurrently, catching races between Push and Pop on the shared head/tail // pointers. TEST(UnboundedMPSCQueue, StressConcurrentPushPop) { const int32_t numProducers = 4; UnboundedMPSCQueue queue; Atomic producersDone(false); Atomic totalPopped(0); // Spawn 4 producer task queues. nsTArray> producers(numProducers); for (int32_t t = 0; t < numProducers; t++) { RefPtr tq = nsThreadManager::get().CreateBackgroundTaskQueue("MPSCProd"); ASSERT_NE(tq, nullptr); producers.AppendElement(std::move(tq)); } // Start a consumer that spins draining the queue. RefPtr consumer = nsThreadManager::get().CreateBackgroundTaskQueue("MPSCCons"); MOZ_ALWAYS_SUCCEEDS(consumer->Dispatch(NS_NewRunnableFunction( "MPSCConsumer", [&queue, &producersDone, &totalPopped]() { int32_t value; for (;;) { while (queue.Pop(&value)) { totalPopped++; } if (producersDone) { // Final drain after all producers are done. while (queue.Pop(&value)) { totalPopped++; } break; } } }))); // Start all producers, each pushing 100k items. for (int32_t t = 0; t < numProducers; t++) { MOZ_ALWAYS_SUCCEEDS( producers[t]->Dispatch(NS_NewRunnableFunction("MPSCPush", [&queue]() { for (int32_t i = 0; i < 100000; i++) { PushValue(queue, i); } }))); } // Wait for producers, signal consumer, wait for consumer. for (int32_t t = numProducers - 1; t >= 0; t--) { producers[t]->BeginShutdown(); producers[t]->AwaitShutdownAndIdle(); } producersDone = true; consumer->BeginShutdown(); consumer->AwaitShutdownAndIdle(); // Consumer must have popped all items. EXPECT_EQ(static_cast(totalPopped), numProducers * 100000); } } // namespace mozilla::test_mpsc_queue