/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "WebrtcTaskQueueWrapper.h" #include "VideoUtils.h" #include "api/task_queue/task_queue_factory.h" #include "mozilla/TaskQueue.h" #include "nsThreadUtils.h" #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY # include "fmt/format.h" #endif namespace mozilla { #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY class InvocableRunnable final : public nsIRunnable, public nsINamed { // Storage for caching the name returned from GetName(). Maybe mName; // These are used to construct mName. const char* const mTaskQueueName; const WebrtcLocation mLocation; absl::AnyInvocable mTask; ~InvocableRunnable() = default; public: NS_DECL_THREADSAFE_ISUPPORTS InvocableRunnable(const char* aTaskQueueName, absl::AnyInvocable&& aTask, const WebrtcLocation& aLocation) : mTaskQueueName(aTaskQueueName), mLocation(aLocation), mTask(std::move(aTask)) {} NS_IMETHOD Run() override { std::move(mTask)(); return NS_OK; } NS_IMETHOD GetName(nsACString& aName) override { if (mName) { aName = *mName; return NS_OK; } Maybe fileName; if (mLocation.mFile) { nsDependentCString f(mLocation.mFile); # ifdef XP_WIN // On Windows, path separators are inconsistent per // https://github.com/llvm/llvm-project/issues/45076. int32_t i = f.RFindCharInSet("/\\"); # else int32_t i = f.RFindChar('/'); # endif if (i == kNotFound) { fileName.emplace(f); } else { fileName.emplace(f, i + 1); } } mName.emplace(); if (mLocation.mFunction && fileName && mLocation.mLine) { mName->AppendFmt("{} - {} ({}:{})", mTaskQueueName, mLocation.mFunction, *fileName, mLocation.mLine); } else if (fileName && mLocation.mLine) { mName->AppendFmt("{} - InvocableRunnable ({}:{})", mTaskQueueName, *fileName, mLocation.mLine); } else { mName->AppendFmt("{} - InvocableRunnable", mTaskQueueName); } aName = *mName; return NS_OK; } }; NS_IMPL_ISUPPORTS(InvocableRunnable, nsIRunnable, nsINamed) #endif enum class DeletionPolicy : uint8_t { Blocking, NonBlocking }; /** * A wrapper around Mozilla TaskQueues in the shape of a libwebrtc TaskQueue. * * Allows libwebrtc to use Mozilla threads where tooling, e.g. profiling, is set * up and just works. * * Mozilla APIs like Runnables, MozPromise, etc. can also be used with the * wrapped TaskQueue to run things on the right thread when interacting with * libwebrtc. */ template class WebrtcTaskQueueWrapper : public webrtc::TaskQueueBase { public: class TaskQueueObserver final : public TaskQueue::Observer { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(TaskQueueObserver, override); template explicit TaskQueueObserver(Wrapper aOwner) : mOwner(std::forward(aOwner)) {} void WillProcessEvent(TaskQueue* aQueue) override { if constexpr (Deletion == DeletionPolicy::Blocking) { mCurrent.emplace(mOwner); } else { static_assert(Deletion == DeletionPolicy::NonBlocking); mCurrent.emplace(mOwner.get()); } } void DidProcessEvent(TaskQueue* aQueue) override { mCurrent = Nothing(); } private: ~TaskQueueObserver() override = default; // If NonBlocking, a TaskQueue owns this observer, which owns mOwner, which // must be kept alive. There are no cycles. // // If Blocking, mOwner owns the TaskQueue, which owns us. mOwner is owned // externally. It must be a weak reference here, or we'd have a cycle. // // mOwner is safe because the underlying TaskQueue first finishes shutdown, // then the observer is destroyed, then the WebrtcTaskQueueWrapper is // destroyed. See the WebrtcTaskQueueWrawpper::Delete for more details. std::conditional_t, WebrtcTaskQueueWrapper*> const mOwner; Maybe mCurrent; }; public: template WebrtcTaskQueueWrapper(Target aTaskQueue, const nsACString& aName) : mTaskQueue(std::forward(aTaskQueue)), mName(aName) {} ~WebrtcTaskQueueWrapper() = default; void Delete() override { if constexpr (Deletion == DeletionPolicy::Blocking) { MOZ_RELEASE_ASSERT(!mTaskQueue->IsOnCurrentThread()); // Don't call into the task queue if non-blocking as it is in the middle // of its dtor. There'd be nothing to wait for anyway. mTaskQueue->BeginShutdown(); mTaskQueue->AwaitShutdownAndIdle(); mTaskQueue->SetObserver(nullptr); } delete this; } already_AddRefed WrapInvocable( absl::AnyInvocable&& aTask, const WebrtcLocation& aLocation) { #ifdef MOZ_COLLECTING_RUNNABLE_TELEMETRY return MakeAndAddRef(mName.get(), std::move(aTask), aLocation); #else return NS_NewRunnableFunction( "InvocableRunnable", [task = std::move(aTask)]() mutable { std::move(task)(); }); #endif } void PostTaskImpl(absl::AnyInvocable aTask, const PostTaskTraits& aTraits, const webrtc::Location& aLocation) override { if (NS_FAILED(mTaskQueue->Dispatch( WrapInvocable(std::move(aTask), aLocation), /* Pass NS_DISPATCH_FALLIBLE as documentation, but note that TaskQueue::Dispatch does not leak the task even with NS_DISPATCH_NORMAL, which is what DelayedDispatch below results in. */ NS_DISPATCH_FALLIBLE))) { NS_WARNING(nsFmtCString( "TaskQueue '{}' failed to dispatch a task from {} ({}:{})", mName, aLocation.mFunction, aLocation.mFile, aLocation.mLine) .get()); }; } void PostDelayedTaskImpl(absl::AnyInvocable aTask, webrtc::TimeDelta aDelay, const PostDelayedTaskTraits& aTraits, const webrtc::Location& aLocation) override { if (aDelay.ms() == 0) { // AbstractThread::DelayedDispatch doesn't support delay 0 PostTaskImpl(std::move(aTask), PostTaskTraits{}, aLocation); return; } if (NS_FAILED(mTaskQueue->DelayedDispatch( WrapInvocable(std::move(aTask), aLocation), aDelay.ms()))) { NS_WARNING(nsFmtCString("TaskQueue '{}' failed to dispatch a " "delayed task from {} ({}:{})", mName, aLocation.mFunction, aLocation.mFile, aLocation.mLine) .get()); } } // If Blocking, access is through WebrtcTaskQueueWrapper, which has to keep // mTaskQueue alive. If NonBlocking, mTaskQueue keeps WebrtcTaskQueueWrapper // alive through the observer. We must not create a cycle. const std::conditional_t, TaskQueue*> mTaskQueue; // Storage for mTaskQueue's null-terminated const char* name. const nsCString mName; }; } // namespace mozilla namespace std { template struct default_delete> : webrtc::TaskQueueDeleter { void operator()(mozilla::WebrtcTaskQueueWrapper* aPtr) const { webrtc::TaskQueueDeleter::operator()(aPtr); } }; } // namespace std namespace mozilla { std::unique_ptr CreateWebrtcTaskQueue(already_AddRefed aTarget, const nsACString& aName, bool aSupportsTailDispatch) { using Wrapper = WebrtcTaskQueueWrapper; const auto& flat = PromiseFlatCString(aName); auto tq = TaskQueue::Create(std::move(aTarget), "WebrtcTaskQueue", aSupportsTailDispatch); auto wrapper = MakeUnique(std::move(tq), flat); auto observer = MakeRefPtr(wrapper.get()); wrapper->mTaskQueue->SetObserver(observer); return std::unique_ptr( wrapper.release()); } RefPtr CreateWebrtcTaskQueueWrapper( already_AddRefed aTarget, const nsLiteralCString& aName, bool aSupportsTailDispatch) { using Wrapper = WebrtcTaskQueueWrapper; auto tq = TaskQueue::Create(std::move(aTarget), StaticString(aName), aSupportsTailDispatch); auto wrapper = MakeUnique(tq.get(), aName); auto observer = MakeRefPtr(std::move(wrapper)); tq->SetObserver(observer); return tq; } UniquePtr CreateWebrtcTaskQueueFactory() { class SharedThreadPoolWebRtcTaskQueueFactory : public webrtc::TaskQueueFactory { public: std::unique_ptr CreateTaskQueue(absl::string_view aName, Priority aPriority) const override { // libwebrtc will dispatch some tasks sync, i.e., block the origin thread // until they've run, and that doesn't play nice with tail dispatching // since there will never be a tail. DeletionPolicy::Blocking because this // is for libwebrtc use and that's what they expect. constexpr bool supportTailDispatch = false; // XXX Do something with aPriority return CreateWebrtcTaskQueue( GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), nsDependentCSubstring(aName.data(), aName.size()), supportTailDispatch); } }; return WrapUnique(new SharedThreadPoolWebRtcTaskQueueFactory); } } // namespace mozilla