/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "AsyncPlatformPipes.h" #include "base/eintr_wrapper.h" #include "base/message_loop.h" #include "mozilla/EventTargetCapability.h" #include "mozilla/UniquePtr.h" #include "nsStreamUtils.h" #include "nsThreadUtils.h" #include "nsXULAppAPI.h" #ifndef XP_WIN # include # include # include # include #endif namespace mozilla { namespace platform_pipe_detail { class PlatformPipeLink #ifdef XP_WIN : public MessageLoopForIO::IOHandler #else : public MessageLoopForIO::Watcher #endif { NS_INLINE_DECL_THREADSAFE_REFCOUNTING(PlatformPipeLink) public: PlatformPipeLink(UniqueFileHandle aHandle, uint32_t aBufferSize); void CloseLocked(nsresult aStatus) MOZ_REQUIRES(mMutex); // Dispatch notification of mCallback to another thread. // // NOTE: We intentionally do not notify directly on the current thread, even // if no event target is provided, as we don't want user code running on the // IPC I/O thread as it is very latency sensitive. void DispatchNotify() MOZ_REQUIRES(mMutex); void AdvanceIO() MOZ_EXCLUDES(mMutex) MOZ_REQUIRES(mIOThread); #ifdef XP_WIN void OnIOCompleted(MessageLoopForIO::IOContext* aContext, DWORD aBytesTransferred, DWORD aError) override; #else void OnFileCanReadWithoutBlocking(int fd) override; void OnFileCanWriteWithoutBlocking(int fd) override; #endif const EventTargetCapability mIOThread; Mutex mMutex{"PlatformPipeReader"}; UniqueFileHandle mHandle MOZ_GUARDED_BY(mIOThread); const UniquePtr mBuffer; const uint32_t mBufferSize; bool mProcessingSegment MOZ_GUARDED_BY(mMutex) = false; nsresult mStatus MOZ_GUARDED_BY(mMutex) = NS_OK; // `mOffset` is the start of the readable region in the buffer, and // `mAvailable` is its size. uint32_t mOffset MOZ_GUARDED_BY(mMutex) = 0; uint32_t mAvailable MOZ_GUARDED_BY(mMutex) = 0; bool mCallbackClosureOnly MOZ_GUARDED_BY(mMutex) = false; nsCOMPtr mCallback MOZ_GUARDED_BY(mMutex); nsCOMPtr mCallbackTarget MOZ_GUARDED_BY(mMutex); // A reference keeping `this` alive while I/O is in-flight. // This is particularly important on Windows where mBuffer needs to be kept // alive while Overlapped IO is ongoing. RefPtr mPending MOZ_GUARDED_BY(mIOThread); #ifdef XP_WIN MessageLoopForIO::IOContext mIOContext MOZ_GUARDED_BY(mIOThread) = {}; #else MessageLoopForIO::FileDescriptorWatcher mWatcher MOZ_GUARDED_BY(mIOThread); #endif private: ~PlatformPipeLink() = default; }; PlatformPipeLink::PlatformPipeLink(UniqueFileHandle aHandle, uint32_t aBufferSize) : mIOThread(XRE_GetAsyncIOEventTarget()), mHandle(std::move(aHandle)), mBuffer(MakeUnique(aBufferSize)), mBufferSize(aBufferSize) { MOZ_ASSERT(aBufferSize > 1, "invalid buffer size"); MOZ_ASSERT(mHandle, "invalid handle"); #if defined(DEBUG) && !defined(XP_WIN) struct stat st{}; MOZ_ASSERT(fstat(mHandle.get(), &st) == 0 && !S_ISREG(st.st_mode), "PlatformPipeLink does not support regular files"); MOZ_ASSERT(fcntl(mHandle.get(), F_GETFL) & O_NONBLOCK, "PlatformPipeLink requires non-blocking file descriptors"); #endif } void PlatformPipeLink::CloseLocked(nsresult aStatus) { if (NS_FAILED(mStatus)) { return; } mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus; DispatchNotify(); mIOThread.Dispatch(NewRunnableMethod("PlatformPipeLink::AdvanceIO", this, &PlatformPipeLink::AdvanceIO)); } void PlatformPipeLink::DispatchNotify() { nsCOMPtr callback = mCallback.forget(); nsCOMPtr target = mCallbackTarget.forget(); if (!callback) { return; } if (target) { target->Dispatch(callback.forget()); } else { NS_DispatchBackgroundTask(callback.forget()); } } void PlatformPipeLink::AdvanceIO() { if (!mHandle) { return; } MutexAutoLock lock(mMutex); if (NS_FAILED(mStatus)) { if (mPending) { #ifdef XP_WIN // If we still have pending I/O, cancel it. We don't clear mPending, // as we need to keep our buffers alive until the cancelled I/O // completes. CancelIo(mHandle.get()); #else // On posix, immediately cancel our pending I/O by clearing the // watcher. mWatcher.StopWatchingFileDescriptor(); mPending = nullptr; #endif } // The `close` operation can be slow and perform blocking I/O, so we want to // avoid performing it on the IPC I/O thread. Instead, we dispatch it to a // blocking background task. NS_DispatchBackgroundTask( NS_NewRunnableFunction( "PlatformPipeLink::CloseHandle", [handle = std::move(mHandle)]() mutable { handle = nullptr; }), NS_DISPATCH_EVENT_MAY_BLOCK); return; } // We still have outstanding I/O, or our buffer already has data waiting to // be consumed. Either way, don't start new I/O. if (mPending || mAvailable) { return; } #ifdef XP_WIN // On Windows, we need to register the IO handler the first time we're on the // I/O thread. if (!mIOContext.handler) { MessageLoopForIO::current()->RegisterIOHandler(mHandle.get(), this); mIOContext.handler = this; } BOOL ok = ReadFile(mHandle.get(), mBuffer.get(), mBufferSize, nullptr, &mIOContext.overlapped); if (!ok) { DWORD error = GetLastError(); if (error == ERROR_IO_PENDING) { mPending = this; return; } if (error == ERROR_BROKEN_PIPE || error == ERROR_HANDLE_EOF) { CloseLocked(NS_BASE_STREAM_CLOSED); } else { CloseLocked(NS_ERROR_FAILURE); } return; } mPending = this; #else ssize_t rv = HANDLE_EINTR(read(mHandle.get(), mBuffer.get(), mBufferSize)); if (rv > 0) { mOffset = 0; mAvailable = static_cast(rv); if (!mCallbackClosureOnly) { DispatchNotify(); } return; } if (rv == 0) { CloseLocked(NS_BASE_STREAM_CLOSED); return; } if (errno == EAGAIN # if EWOULDBLOCK != EAGAIN || errno == EWOULDBLOCK # endif ) { if (MessageLoopForIO::current()->WatchFileDescriptor( mHandle.get(), false, MessageLoopForIO::WATCH_READ, &mWatcher, this)) { mPending = this; return; } } CloseLocked(NS_ERROR_FAILURE); #endif } #ifdef XP_WIN void PlatformPipeLink::OnIOCompleted(MessageLoopForIO::IOContext* aContext, DWORD aBytesTransferred, DWORD aError) { mIOThread.AssertOnCurrentThread(); if (aContext != &mIOContext) { return; } RefPtr pending = mPending.forget(); if (!pending) { return; } MutexAutoLock lock(mMutex); if (NS_FAILED(mStatus)) { return; } if (aError != ERROR_SUCCESS) { if (aError == ERROR_BROKEN_PIPE || aError == ERROR_HANDLE_EOF || aError == ERROR_OPERATION_ABORTED) { CloseLocked(NS_BASE_STREAM_CLOSED); } else { CloseLocked(NS_ERROR_FAILURE); } return; } if (aBytesTransferred == 0) { CloseLocked(NS_BASE_STREAM_CLOSED); return; } mOffset = 0; mAvailable = aBytesTransferred; if (!mCallbackClosureOnly) { DispatchNotify(); } } #else void PlatformPipeLink::OnFileCanReadWithoutBlocking(int fd) { mIOThread.AssertOnCurrentThread(); RefPtr pending = mPending.forget(); AdvanceIO(); } void PlatformPipeLink::OnFileCanWriteWithoutBlocking(int fd) { MOZ_ASSERT_UNREACHABLE(); } #endif } // namespace platform_pipe_detail //----------------------------------------------------------------------------- // PlatformPipeReader //----------------------------------------------------------------------------- NS_IMPL_ISUPPORTS(PlatformPipeReader, nsIInputStream, nsIAsyncInputStream) PlatformPipeReader::PlatformPipeReader(UniqueFileHandle aHandle, uint32_t aBufferSize) : mLink(new platform_pipe_detail::PlatformPipeLink(std::move(aHandle), aBufferSize)) {} PlatformPipeReader::~PlatformPipeReader() { Close(); } NS_IMETHODIMP PlatformPipeReader::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED); } NS_IMETHODIMP PlatformPipeReader::Available(uint64_t* aAvailable) { MutexAutoLock lock(mLink->mMutex); if (NS_FAILED(mLink->mStatus)) { return mLink->mStatus; } *aAvailable = mLink->mAvailable; return NS_OK; } NS_IMETHODIMP PlatformPipeReader::StreamStatus() { MutexAutoLock lock(mLink->mMutex); return mLink->mStatus; } NS_IMETHODIMP PlatformPipeReader::Read(char* aBuf, uint32_t aCount, uint32_t* aReadCount) { return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount); } NS_IMETHODIMP PlatformPipeReader::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount, uint32_t* aReadCount) { *aReadCount = 0; MutexAutoLock lock(mLink->mMutex); if (NS_FAILED(mLink->mStatus)) { return mLink->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mLink->mStatus; } if (!mLink->mAvailable) { return NS_BASE_STREAM_WOULD_BLOCK; } MOZ_RELEASE_ASSERT(!mLink->mProcessingSegment, "Only one thread may be processing a segment at a time"); char* start = mLink->mBuffer.get() + mLink->mOffset; uint32_t length = std::min(aCount, mLink->mAvailable); mLink->mProcessingSegment = true; { MutexAutoUnlock unlock(mLink->mMutex); nsresult rv = aWriter(this, aClosure, start, 0, length, aReadCount); if (NS_FAILED(rv)) { *aReadCount = 0; } MOZ_RELEASE_ASSERT(*aReadCount <= length); } mLink->mProcessingSegment = false; mLink->mOffset += *aReadCount; mLink->mAvailable -= *aReadCount; // If a closure-only callback is armed, the caller isn't listening for new // data, so only the IO thread is able to notice the peer closing. Re-kick // AdvanceIO once the buffer has drained so a subsequent close is observed. if (!mLink->mAvailable && mLink->mCallback && mLink->mCallbackClosureOnly) { mLink->mIOThread.Dispatch( NewRunnableMethod("PlatformPipeLink::AdvanceIO", mLink, &platform_pipe_detail::PlatformPipeLink::AdvanceIO)); } return NS_OK; } NS_IMETHODIMP PlatformPipeReader::IsNonBlocking(bool* aNonBlocking) { *aNonBlocking = true; return NS_OK; } NS_IMETHODIMP PlatformPipeReader::CloseWithStatus(nsresult aStatus) { MutexAutoLock lock(mLink->mMutex); MOZ_RELEASE_ASSERT(!mLink->mProcessingSegment, "Cannot close pipe during ReadSegments callback"); mLink->CloseLocked(aStatus); return NS_OK; } NS_IMETHODIMP PlatformPipeReader::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags, uint32_t aRequestedCount, nsIEventTarget* aTarget) { MutexAutoLock lock(mLink->mMutex); if (!aCallback) { mLink->mCallback = nullptr; mLink->mCallbackTarget = nullptr; return NS_OK; } mLink->mCallback = NS_NewRunnableFunction( "PlatformPipeReader::AsyncWait", [self = RefPtr{this}, callback = RefPtr{aCallback}] { callback->OnInputStreamReady(self); }); mLink->mCallbackTarget = aTarget; mLink->mCallbackClosureOnly = aFlags & WAIT_CLOSURE_ONLY; if (NS_FAILED(mLink->mStatus) || (!mLink->mCallbackClosureOnly && mLink->mAvailable)) { mLink->DispatchNotify(); } else { mLink->mIOThread.Dispatch( NewRunnableMethod("PlatformPipeLink::AdvanceIO", mLink, &platform_pipe_detail::PlatformPipeLink::AdvanceIO)); } return NS_OK; } } // namespace mozilla