/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include #include "gmock/gmock.h" #include "gtest/gtest.h" #include "gtest/gtest-spi.h" #include "nsThreadPool.h" #include "nsThread.h" #include "mozilla/SharedThreadPool.h" #include "mozilla/SyncRunnable.h" #include "mozilla/TaskQueue.h" #include "mozilla/ThrottledEventQueue.h" #include "nsITargetShutdownTask.h" #include "VideoUtils.h" namespace TestTaskQueue { using namespace mozilla; using testing::_; using testing::InSequence; using testing::MockFunction; using testing::StrEq; TEST(TaskQueue, EventOrder) { RefPtr tq1 = TaskQueue::Create(GetMediaThreadPool(MediaThreadType::SUPERVISOR), "TestTaskQueue tq1", true); RefPtr tq2 = TaskQueue::Create(GetMediaThreadPool(MediaThreadType::SUPERVISOR), "TestTaskQueue tq2", true); RefPtr tq3 = TaskQueue::Create(GetMediaThreadPool(MediaThreadType::SUPERVISOR), "TestTaskQueue tq3", true); bool errored = false; int counter = 0; int sync = 0; Monitor monitor MOZ_UNANNOTATED("TaskQueue::EventOrder::monitor"); // We expect task1 happens before task3. for (int i = 0; i < 10000; ++i) { (void)tq1->Dispatch( NS_NewRunnableFunction( "TestTaskQueue::TaskQueue_EventOrder_Test::TestBody", [&]() { (void)tq2->Dispatch(NS_NewRunnableFunction( "TestTaskQueue::TaskQueue_EventOrder_Test::TestBody", []() { // task0 })); (void)tq3->Dispatch(NS_NewRunnableFunction( "TestTaskQueue::TaskQueue_EventOrder_Test::TestBody", [&]() { // task1 EXPECT_EQ(1, ++counter); errored = counter != 1; MonitorAutoLock mon(monitor); ++sync; mon.Notify(); })); (void)tq2->Dispatch(NS_NewRunnableFunction( "TestTaskQueue::TaskQueue_EventOrder_Test::TestBody", [&]() { // task2 (void)tq3->Dispatch(NS_NewRunnableFunction( "TestTaskQueue::TaskQueue_EventOrder_Test::TestBody", [&]() { // task3 EXPECT_EQ(0, --counter); errored = counter != 0; MonitorAutoLock mon(monitor); ++sync; mon.Notify(); })); })); }), AbstractThread::TailDispatch); // Ensure task1 and task3 are done before next loop. MonitorAutoLock mon(monitor); while (sync != 2) { mon.Wait(); } sync = 0; if (errored) { break; } } tq1->BeginShutdown(); tq1->AwaitShutdownAndIdle(); tq2->BeginShutdown(); tq2->AwaitShutdownAndIdle(); tq3->BeginShutdown(); tq3->AwaitShutdownAndIdle(); } TEST(TaskQueue, GetCurrentSerialEventTarget) { RefPtr tq1 = TaskQueue::Create(GetMediaThreadPool(MediaThreadType::SUPERVISOR), "TestTaskQueue GetCurrentSerialEventTarget", false); (void)tq1->Dispatch(NS_NewRunnableFunction( "TestTaskQueue::TestCurrentSerialEventTarget::TestBody", [tq1]() { nsCOMPtr thread = GetCurrentSerialEventTarget(); EXPECT_EQ(thread, tq1); })); tq1->BeginShutdown(); tq1->AwaitShutdownAndIdle(); } TEST(TaskQueue, DirectTaskGetCurrentSerialEventTarget) { RefPtr tq1 = TaskQueue::Create( GetMediaThreadPool(MediaThreadType::SUPERVISOR), "TestTaskQueue DirectTaskGetCurrentSerialEventTarget", true); (void)tq1->Dispatch(NS_NewRunnableFunction( "TestTaskQueue::DirectTaskGetCurrentSerialEventTarget::TestBody", [&]() { AbstractThread::DispatchDirectTask(NS_NewRunnableFunction( "TestTaskQueue::DirectTaskGetCurrentSerialEventTarget::DirectTask", [&] { EXPECT_EQ(GetCurrentSerialEventTarget(), tq1); })); })); tq1->BeginShutdown(); tq1->AwaitShutdownAndIdle(); } namespace { class TestShutdownTask final : public nsITargetShutdownTask { public: NS_DECL_THREADSAFE_ISUPPORTS explicit TestShutdownTask(std::function aCallback) : mCallback(std::move(aCallback)) {} void TargetShutdown() override { if (mCallback) { mCallback(); } } private: ~TestShutdownTask() = default; std::function mCallback; }; NS_IMPL_ISUPPORTS(TestShutdownTask, nsITargetShutdownTask) } // namespace TEST(TaskQueue, ShutdownTask) { auto shutdownTaskRun = std::make_shared(); auto runnableFromShutdownRun = std::make_shared(); RefPtr tq = TaskQueue::Create( GetMediaThreadPool(MediaThreadType::SUPERVISOR), "Testing TaskQueue"); nsCOMPtr shutdownTask = new TestShutdownTask([=] { EXPECT_TRUE(tq->IsOnCurrentThread()); ASSERT_FALSE(*shutdownTaskRun); *shutdownTaskRun = true; nsCOMPtr dummyTask = new TestShutdownTask([] {}); nsresult rv = tq->RegisterShutdownTask(dummyTask); EXPECT_TRUE(rv == NS_ERROR_UNEXPECTED); MOZ_ALWAYS_SUCCEEDS( tq->Dispatch(NS_NewRunnableFunction("afterShutdownTask", [=] { EXPECT_TRUE(tq->IsOnCurrentThread()); nsCOMPtr dummyTask = new TestShutdownTask([] {}); nsresult rv = tq->RegisterShutdownTask(dummyTask); EXPECT_TRUE(rv == NS_ERROR_UNEXPECTED); ASSERT_FALSE(*runnableFromShutdownRun); *runnableFromShutdownRun = true; }))); }); MOZ_ALWAYS_SUCCEEDS(tq->RegisterShutdownTask(shutdownTask)); ASSERT_FALSE(*shutdownTaskRun); ASSERT_FALSE(*runnableFromShutdownRun); RefPtr syncWithThread = new mozilla::SyncRunnable(NS_NewRunnableFunction("dummy", [] {})); MOZ_ALWAYS_SUCCEEDS(syncWithThread->DispatchToThread(tq)); ASSERT_FALSE(*shutdownTaskRun); ASSERT_FALSE(*runnableFromShutdownRun); tq->BeginShutdown(); tq->AwaitShutdownAndIdle(); ASSERT_TRUE(*shutdownTaskRun); ASSERT_TRUE(*runnableFromShutdownRun); } TEST(TaskQueue, UnregisteredShutdownTask) { RefPtr tq = TaskQueue::Create( GetMediaThreadPool(MediaThreadType::SUPERVISOR), "Testing TaskQueue"); nsCOMPtr shutdownTask = new TestShutdownTask([=] { MOZ_CRASH("should not be run"); }); MOZ_ALWAYS_SUCCEEDS(tq->RegisterShutdownTask(shutdownTask)); RefPtr syncWithThread = new mozilla::SyncRunnable(NS_NewRunnableFunction("dummy", [] {})); MOZ_ALWAYS_SUCCEEDS(syncWithThread->DispatchToThread(tq)); MOZ_ALWAYS_SUCCEEDS(tq->UnregisterShutdownTask(shutdownTask)); tq->BeginShutdown(); tq->AwaitShutdownAndIdle(); } // Mock function to register code flow and current targets. Targets are 1) // TaskQueue::Observer's task queue, 2) AbstractThread::GetCurrent(), and 3) // GetCurrentSerialEventTarget(). using ObserverCheckpoint = MockFunction; // Helpers because the thread args are usually the same. // These are macros because EXPECT_CALL is a macro, and derives line numbers // this way. #define EXPECT_OBS_CALL(cp, str, tq) \ EXPECT_CALL(cp, Call(StrEq(str), tq, tq, tq)) #define EXPECT_RUNNABLE_CALL(cp, str, tq) \ EXPECT_CALL(cp, Call(StrEq(str), nullptr, tq, tq)) #define EXPECT_OBSDTOR_CALL(cp, str) \ EXPECT_CALL(cp, Call(StrEq(str), nullptr, AbstractThread::MainThread(), \ GetMainThreadSerialEventTarget())) class Observer final : public TaskQueue::Observer { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Observer, override); explicit Observer(ObserverCheckpoint& aFunc) : mFunc(aFunc) {} void WillProcessEvent(TaskQueue* aTaskQueue) override { mFunc.Call(__func__, aTaskQueue, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); }; void DidProcessEvent(TaskQueue* aTaskQueue) override { EXPECT_EQ(aTaskQueue, AbstractThread::GetCurrent()); mFunc.Call(__func__, aTaskQueue, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); } private: ~Observer() override { mFunc.Call(__func__, nullptr, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); } ObserverCheckpoint& mFunc; }; TEST(TaskQueue, Observer) { RefPtr taskQueue = TaskQueue::Create( do_AddRef(AbstractThread::MainThread()), "Testing TaskQueue"); TaskQueue* tq = taskQueue; ObserverCheckpoint checkpoint; { InSequence seq; EXPECT_OBS_CALL(checkpoint, "WillProcessEvent", tq); EXPECT_RUNNABLE_CALL(checkpoint, "Runnable", tq); EXPECT_OBS_CALL(checkpoint, "DidProcessEvent", tq); EXPECT_OBSDTOR_CALL(checkpoint, "~Observer"); } { auto obs = MakeRefPtr(checkpoint); tq->SetObserver(obs); } MOZ_ALWAYS_SUCCEEDS(tq->Dispatch(NS_NewRunnableFunction(__func__, [&] { checkpoint.Call("Runnable", nullptr, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); }))); NS_ProcessPendingEvents(nullptr); tq->BeginShutdown(); } TEST(TaskQueue, ObserverTransactional) { RefPtr taskQueue = TaskQueue::Create( do_AddRef(AbstractThread::MainThread()), "Testing TaskQueue"); TaskQueue* tq = taskQueue; ObserverCheckpoint checkpoint; { InSequence seq; EXPECT_RUNNABLE_CALL(checkpoint, "Runnable1", tq); EXPECT_OBS_CALL(checkpoint, "WillProcessEvent", tq); EXPECT_RUNNABLE_CALL(checkpoint, "Runnable2", tq); EXPECT_OBS_CALL(checkpoint, "DidProcessEvent", tq); EXPECT_RUNNABLE_CALL(checkpoint, "~Observer", _); } { auto obs = MakeRefPtr(checkpoint); MOZ_ALWAYS_SUCCEEDS(tq->Dispatch(NS_NewRunnableFunction(__func__, [&] { tq->SetObserver(obs); checkpoint.Call("Runnable1", nullptr, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); }))); NS_ProcessPendingEvents(nullptr); } MOZ_ALWAYS_SUCCEEDS(tq->Dispatch(NS_NewRunnableFunction(__func__, [&] { // Note this technically destroys the observer on a different event target // than if it's destroyed through shutdown. tq->SetObserver(nullptr); checkpoint.Call("Runnable2", nullptr, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); }))); NS_ProcessPendingEvents(nullptr); tq->BeginShutdown(); } TEST(TaskQueue, ObserverDirectTask) { RefPtr taskQueue = TaskQueue::Create(do_AddRef(AbstractThread::MainThread()), "Testing TaskQueue", /*aSupportsTailDispatch=*/true); TaskQueue* tq = taskQueue; ObserverCheckpoint checkpoint; { auto obs = MakeRefPtr(checkpoint); tq->SetObserver(obs); } { InSequence seq; EXPECT_OBS_CALL(checkpoint, "WillProcessEvent", tq); EXPECT_RUNNABLE_CALL(checkpoint, "Runnable1", tq); EXPECT_RUNNABLE_CALL(checkpoint, "Runnable1.Direct", tq); EXPECT_OBS_CALL(checkpoint, "DidProcessEvent", tq); EXPECT_OBS_CALL(checkpoint, "WillProcessEvent", tq); EXPECT_RUNNABLE_CALL(checkpoint, "Runnable2", tq); EXPECT_OBS_CALL(checkpoint, "DidProcessEvent", tq); EXPECT_OBSDTOR_CALL(checkpoint, "~Observer"); } MOZ_ALWAYS_SUCCEEDS(tq->Dispatch(NS_NewRunnableFunction(__func__, [&] { checkpoint.Call("Runnable1", nullptr, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); tq->TailDispatcher().AddDirectTask( NS_NewRunnableFunction("TestDirectTask", [&] { checkpoint.Call("Runnable1.Direct", nullptr, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); })); }))); MOZ_ALWAYS_SUCCEEDS(tq->Dispatch(NS_NewRunnableFunction(__func__, [&] { checkpoint.Call("Runnable2", nullptr, AbstractThread::GetCurrent(), GetCurrentSerialEventTarget()); }))); NS_ProcessPendingEvents(nullptr); tq->BeginShutdown(); } #undef EXPECT_OBS_CALL #undef EXPECT_RUNNABLE_CALL #undef EXPECT_CP_CALL TEST(AbstractThread, GetCurrentSerialEventTarget) { RefPtr mainThread = AbstractThread::GetCurrent(); EXPECT_EQ(mainThread, AbstractThread::MainThread()); (void)mainThread->Dispatch(NS_NewRunnableFunction( "TestAbstractThread::TestCurrentSerialEventTarget::TestBody", [mainThread]() { nsCOMPtr thread = GetCurrentSerialEventTarget(); EXPECT_EQ(thread, mainThread); })); // Spin the event loop. NS_ProcessPendingEvents(nullptr); } TEST(AbstractThread, DirectTaskGetCurrentSerialEventTarget) { RefPtr mainThread = AbstractThread::GetCurrent(); EXPECT_EQ(mainThread, AbstractThread::MainThread()); (void)mainThread->Dispatch(NS_NewRunnableFunction( "TestAbstractThread::DirectTaskGetCurrentSerialEventTarget::TestBody", [&]() { AbstractThread::DispatchDirectTask(NS_NewRunnableFunction( "TestAbstractThread::DirectTaskGetCurrentSerialEventTarget::" "DirectTask", [&] { // NOTE: Currently we don't set the SerialEventTarget guard when // running direct tasks on `AbstractThread::MainThread()`. // See bug 1971198 for context. EXPECT_NONFATAL_FAILURE( EXPECT_EQ(GetCurrentSerialEventTarget(), mainThread), ""); })); })); // Spin the event loop. NS_ProcessPendingEvents(nullptr); } namespace { template void TestShutdownOnEventTargetShutdown(StaticString aTestName, nsCOMPtr&& aEventTarget, ShutdownFn&& aShutdownFn) { ASSERT_TRUE(aEventTarget); nsIEventTarget::FeatureFlags features = aEventTarget->GetFeatures(); bool expectShutdownTaskToRun = features & nsIEventTarget::SUPPORTS_SHUTDOWN_TASKS && features & nsIEventTarget::SUPPORTS_SHUTDOWN_TASK_DISPATCH; RefPtr tq = TaskQueue::Create(aEventTarget.forget(), aTestName); Atomic shutdownTaskRun(false); nsCOMPtr shutdownTask = new TestShutdownTask([&] { shutdownTaskRun = true; }); MOZ_ALWAYS_SUCCEEDS(tq->RegisterShutdownTask(shutdownTask)); RefPtr syncWithThread = new mozilla::SyncRunnable(NS_NewRunnableFunction("dummy", [] {})); MOZ_ALWAYS_SUCCEEDS(syncWithThread->DispatchToThread(tq)); aShutdownFn(); if (expectShutdownTaskToRun) { ASSERT_TRUE(shutdownTaskRun); } else { ASSERT_FALSE(shutdownTaskRun); MOZ_ALWAYS_SUCCEEDS(tq->UnregisterShutdownTask(shutdownTask)); } } } // namespace TEST(TaskQueue, ShutdownOnThreadPoolShutdown) { RefPtr threadPool = new nsThreadPool(); ASSERT_TRUE(threadPool); threadPool->SetName("TaskQueue ThreadPool Shutdown Test"_ns); threadPool->SetThreadLimit(4); RefPtr eventTarget(threadPool); TestShutdownOnEventTargetShutdown("TaskQueue on ThreadPool", std::move(eventTarget), [threadPool] { threadPool->Shutdown(); }); } TEST(TaskQueue, ShutdownOnSharedThreadPoolShutdown) { RefPtr sharedThreadPool = SharedThreadPool::Get("TaskQueue SharedThreadPool Shutdown Test", 4); RefPtr eventTarget(sharedThreadPool); TestShutdownOnEventTargetShutdown( "TaskQueue on SharedThreadPool", std::move(eventTarget), [sharedThreadPool] { sharedThreadPool->Shutdown(); }); } TEST(TaskQueue, ShutdownOnNsThreadShutdown) { RefPtr thread; nsresult rv = NS_NewNamedThread("TQ nsThread", getter_AddRefs(thread)); ASSERT_TRUE(NS_SUCCEEDED(rv)); ASSERT_TRUE(thread); RefPtr eventTarget(thread); TestShutdownOnEventTargetShutdown("TaskQueue on nsThread", std::move(eventTarget), [thread] { thread->Shutdown(); }); } TEST(TaskQueue, ShutdownOnThrottledEventQueueShutdown) { RefPtr thread; nsresult rv = NS_NewNamedThread("TQ nsThread", getter_AddRefs(thread)); ASSERT_TRUE(NS_SUCCEEDED(rv)); ASSERT_TRUE(thread); nsISerialEventTarget* baseTarget(thread); RefPtr throttledQueue = ThrottledEventQueue::Create(baseTarget, "TestThrottledQueue"); ASSERT_TRUE(throttledQueue); RefPtr eventTarget(throttledQueue); TestShutdownOnEventTargetShutdown("TaskQueue on ThrottledEventQueue", std::move(eventTarget), [thread] { thread->Shutdown(); }); } TEST(TaskQueue, ShutdownOnNestedTaskQueuePoolShutdown) { RefPtr threadPool = new nsThreadPool(); ASSERT_TRUE(threadPool); threadPool->SetName("TaskQueue Nested Pool Test"_ns); threadPool->SetThreadLimit(1); RefPtr poolTarget(threadPool); RefPtr baseTaskQueue = TaskQueue::Create(poolTarget.forget(), "BaseTaskQueue"); ASSERT_TRUE(baseTaskQueue); RefPtr eventTarget(baseTaskQueue); TestShutdownOnEventTargetShutdown( "TaskQueue on TaskQueue (pool shutdown)", std::move(eventTarget), [threadPool, baseTaskQueue] { threadPool->Shutdown(); }); } TEST(TaskQueue, ShutdownOnNestedTaskQueueBaseShutdown) { RefPtr threadPool = new nsThreadPool(); ASSERT_TRUE(threadPool); threadPool->SetName("TaskQueue Nested Base Test"_ns); threadPool->SetThreadLimit(1); RefPtr poolTarget(threadPool); RefPtr baseTaskQueue = TaskQueue::Create(poolTarget.forget(), "BaseTaskQueue"); ASSERT_TRUE(baseTaskQueue); RefPtr eventTarget(baseTaskQueue); TestShutdownOnEventTargetShutdown("TaskQueue on TaskQueue (base shutdown)", std::move(eventTarget), [threadPool, baseTaskQueue] { baseTaskQueue->BeginShutdown(); baseTaskQueue->AwaitShutdownAndIdle(); }); // Cleanup. threadPool->Shutdown(); } } // namespace TestTaskQueue