/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "StreamFilterParent.h" #include "HttpChannelChild.h" #include "mozilla/ExtensionPolicyService.h" #include "mozilla/dom/ContentParent.h" #include "mozilla/net/ChannelEventQueue.h" #include "mozilla/StaticPrefs_extensions.h" #include "mozilla/Try.h" #include "nsHttpChannel.h" #include "nsIChannel.h" #include "nsIInputStream.h" #include "nsITraceableChannel.h" #include "nsProxyRelease.h" #include "nsQueryObject.h" #include "nsSocketTransportService2.h" #include "nsStringStream.h" #include "mozilla/net/DocumentChannelChild.h" #include "nsIViewSourceChannel.h" namespace mozilla { namespace extensions { /***************************************************************************** * Event queueing helpers *****************************************************************************/ using net::ChannelEvent; using net::ChannelEventQueue; namespace { // Define some simple ChannelEvent sub-classes that store the appropriate // EventTarget and delegate their Run methods to a wrapped Runnable or lambda // function. class ChannelEventWrapper : public ChannelEvent { public: ChannelEventWrapper(nsIEventTarget* aTarget) : mTarget(aTarget) {} already_AddRefed GetEventTarget() override { return do_AddRef(mTarget); } protected: ~ChannelEventWrapper() override = default; private: nsCOMPtr mTarget; }; class ChannelEventFunction final : public ChannelEventWrapper { public: ChannelEventFunction(nsIEventTarget* aTarget, std::function&& aFunc) : ChannelEventWrapper(aTarget), mFunc(std::move(aFunc)) {} void Run() override { mFunc(); } protected: ~ChannelEventFunction() override = default; private: std::function mFunc; }; class ChannelEventRunnable final : public ChannelEventWrapper { public: ChannelEventRunnable(nsIEventTarget* aTarget, already_AddRefed aRunnable) : ChannelEventWrapper(aTarget), mRunnable(aRunnable) {} void Run() override { nsresult rv = mRunnable->Run(); (void)NS_WARN_IF(NS_FAILED(rv)); } protected: ~ChannelEventRunnable() override = default; private: RefPtr mRunnable; }; } // anonymous namespace /***************************************************************************** * Initialization *****************************************************************************/ StreamFilterParent::StreamFilterParent() : mMainThread(GetCurrentSerialEventTarget()), mIOThread(mMainThread), mQueue(new ChannelEventQueue(static_cast(this))), mBufferMutex("StreamFilter buffer mutex"), mReceivedStop(false), mSentStop(false), mContext(nullptr), mOffset(0), mState(State::Uninitialized) {} StreamFilterParent::~StreamFilterParent() { NS_ReleaseOnMainThread("StreamFilterParent::mChannel", mChannel.forget()); NS_ReleaseOnMainThread("StreamFilterParent::mLoadGroup", mLoadGroup.forget()); NS_ReleaseOnMainThread("StreamFilterParent::mOrigListener", mOrigListener.forget()); NS_ReleaseOnMainThread("StreamFilterParent::mContext", mContext.forget()); mQueue->NotifyReleasingOwner(); } auto StreamFilterParent::Create(dom::ContentParent* aContentParent, uint64_t aChannelId, const nsAString& aAddonId) -> RefPtr { AssertIsMainThread(); auto& webreq = WebRequestService::GetSingleton(); RefPtr addonPolicy = ExtensionPolicyService::GetSingleton().GetByID(aAddonId); if (!addonPolicy) { return ChildEndpointPromise::CreateAndReject(false, __func__); } nsCOMPtr channel = webreq.GetTraceableChannel(aChannelId, *addonPolicy, aContentParent); RefPtr chan = do_QueryObject(channel); if (!chan) { return ChildEndpointPromise::CreateAndReject(false, __func__); } nsCOMPtr genChan(do_QueryInterface(channel)); if (!StaticPrefs::extensions_filterResponseServiceWorkerScript_disabled() && ChannelWrapper::IsServiceWorkerScript(genChan) && !addonPolicy->HasPermission( nsGkAtoms::webRequestFilterResponse_serviceWorkerScript)) { return ChildEndpointPromise::CreateAndReject(false, __func__); } // Disable alt-data for extension stream listeners. nsCOMPtr internal(do_QueryObject(channel)); internal->DisableAltDataCache(); return chan->AttachStreamFilter(); } /* static */ void StreamFilterParent::Attach(nsIChannel* aChannel, ParentEndpoint&& aEndpoint) { auto self = MakeRefPtr(); self->ActorThread()->Dispatch( NewRunnableMethod("StreamFilterParent::Bind", self, &StreamFilterParent::Bind, std::move(aEndpoint)), NS_DISPATCH_NORMAL); // If aChannel is a HttpChannelChild, ask HttpChannelChild to hold a weak // reference on this StreamFilterParent. Such that HttpChannelChild has a // chance to disconnect this StreamFilterParent if internal redirection // happens, i.e. ServiceWorker fallback redirection. RefPtr channelChild = do_QueryObject(aChannel); if (channelChild) { channelChild->RegisterStreamFilter(self); } self->Init(aChannel); } void StreamFilterParent::Disconnect(const nsACString& aReason) { AssertIsMainThread(); MOZ_DIAGNOSTIC_ASSERT(mBeforeOnStartRequest); mDisconnected = true; nsAutoCString reason(aReason); RefPtr self(this); RunOnActorThread(FUNC, [self, reason] { if (self->IPCActive()) { self->mState = State::Disconnected; self->CheckResult(self->SendError(reason)); } }); } void StreamFilterParent::Bind(ParentEndpoint&& aEndpoint) { aEndpoint.Bind(this); } void StreamFilterParent::Init(nsIChannel* aChannel) { mChannel = aChannel; nsCOMPtr traceable = do_QueryInterface(aChannel); if (MOZ_UNLIKELY(!traceable)) { // nsViewSourceChannel is not nsITraceableChannel, but wraps one. Unwrap it. nsCOMPtr vsc = do_QueryInterface(aChannel); if (vsc) { traceable = do_QueryObject(vsc->GetInnerChannel()); // OnStartRequest etc. is passed the unwrapped channel, so update mChannel // to prevent OnStartRequest from mistaking it for a redirect, which would // close the filter. mChannel = do_QueryObject(traceable); } // TODO bug 1683403: Replace assertion; Close StreamFilter instead. MOZ_RELEASE_ASSERT(traceable); } nsresult rv = traceable->SetNewListener(this, /* aMustApplyContentConversion = */ true, getter_AddRefs(mOrigListener)); MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv)); } /***************************************************************************** * nsIThreadRetargetableStreamListener *****************************************************************************/ NS_IMETHODIMP StreamFilterParent::CheckListenerChain() { AssertIsMainThread(); nsCOMPtr trsl = do_QueryInterface(mOrigListener); if (trsl) { return trsl->CheckListenerChain(); } return NS_ERROR_NO_INTERFACE; } NS_IMETHODIMP StreamFilterParent::OnDataFinished(nsresult aStatus) { AssertIsIOThread(); // Forwarding onDataFinished to the mOriginListener when: // - the StreamFilter is already disconnected // - it does not have any buffered data which would still need // to be sent to the mOrigListener and we have // - we have not yet called mOrigListener OnStopRequest method. if (!mDisconnected || !mBufferedData.isEmpty() || mSentStop) { return NS_OK; } nsCOMPtr listener = do_QueryInterface(mOrigListener); if (listener) { return listener->OnDataFinished(aStatus); } return NS_OK; } /***************************************************************************** * Error handling *****************************************************************************/ void StreamFilterParent::Broken() { AssertIsActorThread(); switch (mState) { case State::Initialized: case State::TransferringData: case State::Suspended: { // TODO bug 1965710: Entering Disconnecting + Disconnected can result in // data corruption. mState = State::Disconnecting; RefPtr self(this); RunOnMainThread(FUNC, [=] { if (self->mChannel) { self->mChannel->Cancel(NS_ERROR_FAILURE); } }); // Immediately calling FinishDisconnect() may trigger a race condition // (like bug 1628642) where data is dropped by DoSendData(), when the // assumption documented in FinishDisconnect() is violated. // Since bug 1965710 already results in data corruption, there is not // really a point in fixing just this issue. // TODO bug 1965710: Fix this issue when the data corruption is fixed. FinishDisconnect(); } break; default: break; } } /***************************************************************************** * State change requests *****************************************************************************/ IPCResult StreamFilterParent::RecvClose() { AssertIsActorThread(); mState = State::Closed; if (!mSentStop) { RefPtr self(this); RunOnMainThread(FUNC, [=] { nsresult rv = self->EmitStopRequest(NS_OK); (void)NS_WARN_IF(NS_FAILED(rv)); }); } (void)SendClosed(); Destroy(); return IPC_OK(); } void StreamFilterParent::Destroy() { // Close the channel asynchronously so the actor is never destroyed before // this message is fully processed. ActorThread()->Dispatch(NewRunnableMethod("StreamFilterParent::Close", this, &StreamFilterParent::Close), NS_DISPATCH_NORMAL); } IPCResult StreamFilterParent::RecvDestroy() { AssertIsActorThread(); Destroy(); return IPC_OK(); } IPCResult StreamFilterParent::RecvSuspend() { AssertIsActorThread(); if (mState == State::TransferringData) { RefPtr self(this); RunOnMainThread(FUNC, [=] { self->mChannel->Suspend(); self->RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->mState = State::Suspended; self->CheckResult(self->SendSuspended()); } }); }); } return IPC_OK(); } IPCResult StreamFilterParent::RecvResume() { AssertIsActorThread(); if (mState == State::Suspended) { // Change state before resuming so incoming data is handled correctly // immediately after resuming. mState = State::TransferringData; RefPtr self(this); RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); self->RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->CheckResult(self->SendResumed()); } }); }); } return IPC_OK(); } IPCResult StreamFilterParent::RecvDisconnect() { AssertIsActorThread(); if (mState == State::Suspended) { RefPtr self(this); RunOnMainThread(FUNC, [=] { self->mChannel->Resume(); }); } else if (mState != State::TransferringData) { return IPC_OK(); } // TODO bug 1965711: If SendFlushData() fails or if the child never sends // SendFlushData(), we stay stuck in Disconnecting forever! mState = State::Disconnecting; CheckResult(SendFlushData()); return IPC_OK(); } IPCResult StreamFilterParent::RecvFlushedData() { AssertIsActorThread(); MOZ_ASSERT(mState == State::Disconnecting); Destroy(); FinishDisconnect(); return IPC_OK(); } void StreamFilterParent::FinishDisconnect() { RefPtr self(this); // FinishDisconnect is called when we do not expect RecvWrite from the child. // // Immediately after entering mState Disconnecting, ODA might still be in the // process of dispatching DoSendData calls from the IO thread to the actor // thread. When DoSendData runs, it detects state Disconnecting and puts the // data back in mBufferedData (at index mPrependedBufferCount). // // When state Disconnecting is entered via RecvDisconnect, it performs an // IPC roundtrip to the child. Any queued DoSendData() queued to the actor // thread by ODA are expected to have executed before RecvFlushedData is // executed, and we don't expect additional DoSendData(). // // Set mPrependedBufferCount = -1 prevents DoSendData() from prepending // additional data to mBufferedData (if DoSendData() is somehow called // unexpectedly). mPrependedBufferCount = -1; RunOnIOThread(FUNC, [=] { // This is not always the last flush. See below for the final flush. self->FlushBufferedData(); self->RunOnActorThread(FUNC, [=] { if (self->mState != State::Closed) { self->mState = State::Disconnected; } // Despite having flushed buffers before, the buffer may be non-empty // if OnDataAvailable is called before entering state Disconnected. self->RunOnIOThread(FUNC, [=] { // If OnDataAvailable is called after entering state Disconnected, // it calls FlushBufferedData() if needed. But if that did not // happen, we need to flush the data here, now. if (!self->mBufferedData.isEmpty()) { self->FlushBufferedData(); } }); self->RunOnMainThread(FUNC, [=] { if (self->mReceivedStop && !self->mSentStop) { nsresult rv = self->EmitStopRequest(NS_OK); (void)NS_WARN_IF(NS_FAILED(rv)); } else if (self->mLoadGroup && !self->mDisconnected) { (void)self->mLoadGroup->RemoveRequest(self, nullptr, NS_OK); } self->mDisconnected = true; }); }); }); } /***************************************************************************** * Data output *****************************************************************************/ IPCResult StreamFilterParent::RecvWrite(Data&& aData) { AssertIsActorThread(); RunOnIOThread(NewRunnableMethod("StreamFilterParent::WriteMove", this, &StreamFilterParent::WriteMove, std::move(aData))); return IPC_OK(); } void StreamFilterParent::WriteMove(Data&& aData) { nsresult rv = Write(aData); (void)NS_WARN_IF(NS_FAILED(rv)); } nsresult StreamFilterParent::Write(Data& aData) { AssertIsIOThread(); nsCOMPtr stream; nsresult rv = NS_NewByteInputStream( getter_AddRefs(stream), Span(reinterpret_cast(aData.Elements()), aData.Length()), NS_ASSIGNMENT_DEPEND); NS_ENSURE_SUCCESS(rv, rv); rv = mOrigListener->OnDataAvailable(mChannel, stream, mOffset, aData.Length()); NS_ENSURE_SUCCESS(rv, rv); mOffset += aData.Length(); return NS_OK; } /***************************************************************************** * nsIRequest *****************************************************************************/ NS_IMETHODIMP StreamFilterParent::GetName(nsACString& aName) { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->GetName(aName); } NS_IMETHODIMP StreamFilterParent::GetStatus(nsresult* aStatus) { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->GetStatus(aStatus); } NS_IMETHODIMP StreamFilterParent::IsPending(bool* aIsPending) { switch (mState) { case State::Initialized: case State::TransferringData: case State::Suspended: *aIsPending = true; break; default: *aIsPending = false; } return NS_OK; } NS_IMETHODIMP StreamFilterParent::SetCanceledReason(const nsACString& aReason) { return SetCanceledReasonImpl(aReason); } NS_IMETHODIMP StreamFilterParent::GetCanceledReason(nsACString& aReason) { return GetCanceledReasonImpl(aReason); } NS_IMETHODIMP StreamFilterParent::CancelWithReason(nsresult aStatus, const nsACString& aReason) { return CancelWithReasonImpl(aStatus, aReason); } NS_IMETHODIMP StreamFilterParent::Cancel(nsresult aResult) { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->Cancel(aResult); } NS_IMETHODIMP StreamFilterParent::Suspend() { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->Suspend(); } NS_IMETHODIMP StreamFilterParent::Resume() { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->Resume(); } NS_IMETHODIMP StreamFilterParent::GetLoadGroup(nsILoadGroup** aLoadGroup) { *aLoadGroup = mLoadGroup; return NS_OK; } NS_IMETHODIMP StreamFilterParent::SetLoadGroup(nsILoadGroup* aLoadGroup) { return NS_ERROR_NOT_IMPLEMENTED; } NS_IMETHODIMP StreamFilterParent::GetLoadFlags(nsLoadFlags* aLoadFlags) { AssertIsMainThread(); MOZ_ASSERT(mChannel); MOZ_TRY(mChannel->GetLoadFlags(aLoadFlags)); *aLoadFlags &= ~nsIChannel::LOAD_DOCUMENT_URI; return NS_OK; } NS_IMETHODIMP StreamFilterParent::SetLoadFlags(nsLoadFlags aLoadFlags) { AssertIsMainThread(); MOZ_ASSERT(mChannel); return mChannel->SetLoadFlags(aLoadFlags); } NS_IMETHODIMP StreamFilterParent::GetTRRMode(nsIRequest::TRRMode* aTRRMode) { return GetTRRModeImpl(aTRRMode); } NS_IMETHODIMP StreamFilterParent::SetTRRMode(nsIRequest::TRRMode aTRRMode) { return SetTRRModeImpl(aTRRMode); } /***************************************************************************** * nsIStreamListener *****************************************************************************/ NS_IMETHODIMP StreamFilterParent::OnStartRequest(nsIRequest* aRequest) { AssertIsMainThread(); // Always reset mChannel if aRequest is different. Various calls in // StreamFilterParent will use mChannel, but aRequest is *always* the // right channel to use at this point. // // For ALL redirections, we will disconnect this listener. Extensions // will create a new filter if they need it. mBeforeOnStartRequest = false; if (aRequest != mChannel) { nsCOMPtr channel = do_QueryInterface(aRequest); nsCOMPtr loadInfo = channel ? channel->LoadInfo() : nullptr; mChannel = std::move(channel); if (!(loadInfo && loadInfo->RedirectChainIncludingInternalRedirects().IsEmpty())) { mDisconnected = true; mDisconnectedByOnStartRequest = true; RefPtr self(this); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->mState = State::Disconnected; self->CheckResult(self->SendError("Channel redirected"_ns)); } }); } } // Check if alterate cached data is being sent, if so we receive un-decoded // data and we must disconnect the filter and send an error to the extension. if (!mDisconnected) { RefPtr chan = do_QueryObject(aRequest); if (chan && chan->IsDeliveringAltData()) { mDisconnected = true; mDisconnectedByOnStartRequest = true; RefPtr self(this); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->mState = State::Disconnected; self->CheckResult( self->SendError("Channel is delivering cached alt-data"_ns)); } }); } } if (!mDisconnected) { (void)mChannel->GetLoadGroup(getter_AddRefs(mLoadGroup)); if (mLoadGroup) { (void)mLoadGroup->AddRequest(this, nullptr); } } nsresult rv = mOrigListener->OnStartRequest(aRequest); // Important: Do this only *after* running the next listener in the chain, so // that we get the final delivery target after any retargeting that it may do. if (nsCOMPtr req = do_QueryInterface(aRequest)) { nsCOMPtr thread; (void)req->GetDeliveryTarget(getter_AddRefs(thread)); if (thread) { mIOThread = std::move(thread); } } // Important: Do this *after* we have set the thread delivery target, or it is // possible in rare circumstances for an extension to attempt to write data // before the thread has been set up, even though there are several layers of // asynchrony involved. if (!mDisconnected) { RefPtr self(this); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->mState = State::TransferringData; self->CheckResult(self->SendStartRequest()); } }); } return rv; } NS_IMETHODIMP StreamFilterParent::OnStopRequest(nsIRequest* aRequest, nsresult aStatusCode) { AssertIsMainThread(); MOZ_ASSERT(aRequest == mChannel); mReceivedStop = true; if (mDisconnected) { return EmitStopRequest(aStatusCode); } RefPtr self(this); RunOnActorThread(FUNC, [=] { if (self->IPCActive()) { self->CheckResult(self->SendStopRequest(aStatusCode)); } else if (self->mState == State::Disconnected) { // !IPCActive() implies mState == Closed, Disconnecting or Disconnected. // // When we enter Closed via RecvClose, we emit a stop request. // If we're currently disconnecting, then we'll emit a stop // request at the end of that process. Otherwise we need to // manually emit one here, since we won't be getting a response // from the child. self->RunOnMainThread(FUNC, [=] { if (!self->mSentStop) { self->EmitStopRequest(aStatusCode); } }); } }); return NS_OK; } nsresult StreamFilterParent::EmitStopRequest(nsresult aStatusCode) { AssertIsMainThread(); MOZ_ASSERT(!mSentStop); mSentStop = true; nsresult rv = mOrigListener->OnStopRequest(mChannel, aStatusCode); if (mLoadGroup && !mDisconnected) { (void)mLoadGroup->RemoveRequest(this, nullptr, aStatusCode); } return rv; } /***************************************************************************** * Incoming data handling *****************************************************************************/ void StreamFilterParent::DoSendData(Data&& aData) { AssertIsActorThread(); if (mState == State::TransferringData) { CheckResult(SendData(aData)); // TODO bug 1965710: If SendData(aData) fails, this chunk is dropped, but // any later data may be forwarded to the listener. This can result in // surprising data inconsistencies. } else if (mState == State::Disconnecting) { // OnDataAvailable buffers data when state is Disconnecting. But if this // state was entered between the thread hop from ODA to DoSendData, then we // have data that should be in the buffer but is not. // We should not be called after some point in FinishDisconnect(). MOZ_ASSERT(mPrependedBufferCount != -1); MutexAutoLock al(mBufferMutex); if (mPrependedBufferCount == 0) { mBufferedData.insertFront(new BufferedData(std::move(aData))); } else { MOZ_ASSERT(!mBufferedData.isEmpty()); int i = 0; for (BufferedData* item : mBufferedData) { if (++i == mPrependedBufferCount) { item->setNext(new BufferedData(std::move(aData))); ++mPrependedBufferCount; break; } } MOZ_ASSERT_UNREACHABLE("mPrependedBufferCount past end of mBufferedData"); } } // If mState is Closed or the child cannot be reached, the data is dropped. } NS_IMETHODIMP StreamFilterParent::OnDataAvailable(nsIRequest* aRequest, nsIInputStream* aInputStream, uint64_t aOffset, uint32_t aCount) { AssertIsIOThread(); // Data handling is as follows: // - If mDisconnectedByOnStartRequest == true: Forward data. // (^ child was never notified from OnStartRequest) // - If mState is TransferringData or Suspended: send to child (DoSendData). // - If mState is Closed: Drop data (ignore it). // - If mState is Disconnecting: Buffer data for later. // - If mState is Disconnected: Forward data. // // Desired order of forwarding data during Disconnecting: // - Any RecvWrite (from child before/during Disconnecting, not buffered). // - Any DoSendData that were queued before Disconnecting but were not sent // to the child because the state changed to Disconnecting. // - Any OnDataAvailable while state is set to Disconnecting. if (mDisconnectedByOnStartRequest || mState == State::Disconnected) { // If we're offloading data in a thread pool, it's possible that we'll // have buffered some additional data while waiting for the buffer to // flush. So, if there's any buffered data left, flush that before we // flush this incoming data. // // Note: When in the eDisconnected state, the buffer list is guaranteed // never to be accessed by another thread during an OnDataAvailable call. if (!mBufferedData.isEmpty()) { // The flag is set without notifying the child, so we cannot ever get // into the Disconnecting state that would fill the buffer. MOZ_ASSERT(!mDisconnectedByOnStartRequest); // OnDataAvailable called before FinishDisconnect has had a chance to // perform the final FlushBufferedData() call, so do it here now. FlushBufferedData(); } mOffset += aCount; return mOrigListener->OnDataAvailable(aRequest, aInputStream, mOffset - aCount, aCount); } Data data; data.SetLength(aCount); uint32_t count; nsresult rv = aInputStream->Read(reinterpret_cast(data.Elements()), aCount, &count); NS_ENSURE_SUCCESS(rv, rv); NS_ENSURE_TRUE(count == aCount, NS_ERROR_UNEXPECTED); if (mState == State::Disconnecting) { MutexAutoLock al(mBufferMutex); BufferData(std::move(data)); } else if (mState == State::Closed) { return NS_ERROR_FAILURE; } else { ActorThread()->Dispatch( NewRunnableMethod("StreamFilterParent::DoSendData", this, &StreamFilterParent::DoSendData, std::move(data)), NS_DISPATCH_NORMAL); } return NS_OK; } nsresult StreamFilterParent::FlushBufferedData() { AssertIsIOThread(); // When offloading data to a thread pool, OnDataAvailable isn't guaranteed // to always run in the same thread, so it's possible for this function to // run in parallel with OnDataAvailable. MutexAutoLock al(mBufferMutex); while (!mBufferedData.isEmpty()) { UniquePtr data(mBufferedData.popFirst()); if (NS_WARN_IF(mState == State::Closed)) { // We should only have data to flush while in state Disconnecting, set by // RecvDisconnect. But we can also enter the Disconnecting state via // Broken() via CheckResult(false), after which it is theoretically to // still receive RecvClose. When RecvClose is called, EmitStopRequest is // called, so we should not write additional data (OnDataAvailable). continue; } nsresult rv = Write(data->mData); NS_ENSURE_SUCCESS(rv, rv); } return NS_OK; } /***************************************************************************** * Thread helpers *****************************************************************************/ nsIEventTarget* StreamFilterParent::ActorThread() { return net::gSocketTransportService; } bool StreamFilterParent::IsActorThread() { return ActorThread()->IsOnCurrentThread(); } void StreamFilterParent::AssertIsActorThread() { MOZ_ASSERT(IsActorThread()); } nsISerialEventTarget* StreamFilterParent::IOThread() { return mIOThread; } bool StreamFilterParent::IsIOThread() { return mIOThread->IsOnCurrentThread(); } void StreamFilterParent::AssertIsIOThread() { MOZ_ASSERT(IsIOThread()); } template void StreamFilterParent::RunOnMainThread(const char* aName, Function&& aFunc) { mQueue->RunOrEnqueue(new ChannelEventFunction(mMainThread, std::move(aFunc))); } void StreamFilterParent::RunOnMainThread(already_AddRefed aRunnable) { mQueue->RunOrEnqueue( new ChannelEventRunnable(mMainThread, std::move(aRunnable))); } template void StreamFilterParent::RunOnIOThread(const char* aName, Function&& aFunc) { mQueue->RunOrEnqueue(new ChannelEventFunction(mIOThread, std::move(aFunc))); } void StreamFilterParent::RunOnIOThread(already_AddRefed aRunnable) { mQueue->RunOrEnqueue( new ChannelEventRunnable(mIOThread, std::move(aRunnable))); } template void StreamFilterParent::RunOnActorThread(const char* aName, Function&& aFunc) { // We don't use mQueue for dispatch to the actor thread. // // The main thread and IO thread are used for dispatching events to the // wrapped stream listener, and those events need to be processed // consistently, in the order they were dispatched. An event dispatched to the // main thread can't be run before events that were dispatched to the IO // thread before it. // // Additionally, the IO thread is likely to be a thread pool, which means that // without thread-safe queuing, it's possible for multiple events dispatched // to it to be processed in parallel, or out of order. // // The actor thread, however, is always a serial event target. Its events are // always processed in order, and events dispatched to the actor thread are // independent of the events in the output event queue. if (IsActorThread()) { aFunc(); } else { ActorThread()->Dispatch(std::move(NS_NewRunnableFunction(aName, aFunc)), NS_DISPATCH_NORMAL); } } /***************************************************************************** * Glue *****************************************************************************/ void StreamFilterParent::ActorDestroy(ActorDestroyReason aWhy) { AssertIsActorThread(); if (mState != State::Disconnected && mState != State::Closed) { Broken(); } } NS_INTERFACE_MAP_BEGIN(StreamFilterParent) NS_INTERFACE_MAP_ENTRY(nsIStreamListener) NS_INTERFACE_MAP_ENTRY(nsIRequestObserver) NS_INTERFACE_MAP_ENTRY(nsIRequest) NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableStreamListener) NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIStreamListener) NS_INTERFACE_MAP_END NS_IMPL_ADDREF(StreamFilterParent) NS_IMPL_RELEASE(StreamFilterParent) } // namespace extensions } // namespace mozilla