// Copyright 2019 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/message_pump_kqueue.h" #include #ifdef XP_IOS # include #endif #include "mozilla/AutoRestore.h" #include "base/logging.h" #include "base/scoped_nsautorelease_pool.h" #include "base/eintr_wrapper.h" namespace base { namespace { // On iOS, the normal `kevent64` method is blocked by the content process // sandbox, so instead we use `be_kevent64` from the BrowserEngineCore library. static int platform_kevent64(int fd, const kevent64_s* changelist, int nchanges, kevent64_s* eventlist, int nevents, int flags) { #ifdef XP_IOS return be_kevent64(fd, changelist, nchanges, eventlist, nevents, flags); #else return kevent64(fd, changelist, nchanges, eventlist, nevents, flags, nullptr); #endif } int ChangeOneEvent(const mozilla::UniqueFileHandle& kqueue, kevent64_s* event) { return HANDLE_EINTR(platform_kevent64(kqueue.get(), event, 1, nullptr, 0, 0)); } } // namespace MessagePumpKqueue::FileDescriptorWatcher::FileDescriptorWatcher() = default; MessagePumpKqueue::FileDescriptorWatcher::~FileDescriptorWatcher() { StopWatchingFileDescriptor(); } bool MessagePumpKqueue::FileDescriptorWatcher::StopWatchingFileDescriptor() { if (!pump_) return true; return pump_->StopWatchingFileDescriptor(this); } void MessagePumpKqueue::FileDescriptorWatcher::Init(MessagePumpKqueue* pump, int fd, int mode, Watcher* watcher) { DCHECK_NE(fd, -1); DCHECK(!watcher_); DCHECK(watcher); DCHECK(pump); fd_ = fd; mode_ = mode; watcher_ = watcher; pump_ = pump; } void MessagePumpKqueue::FileDescriptorWatcher::Reset() { fd_ = -1; mode_ = 0; watcher_ = nullptr; pump_ = nullptr; } MessagePumpKqueue::MachPortWatchController::MachPortWatchController() = default; MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() { StopWatchingMachPort(); } bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() { if (!pump_) { return true; } return pump_->StopWatchingMachPort(this); } void MessagePumpKqueue::MachPortWatchController::Init( MessagePumpKqueue* pump, mach_port_t port, MachPortWatcher* watcher) { DCHECK(!watcher_); DCHECK(watcher); DCHECK(pump); port_ = port; watcher_ = watcher; pump_ = pump; } void MessagePumpKqueue::MachPortWatchController::Reset() { port_ = MACH_PORT_NULL; watcher_ = nullptr; pump_ = nullptr; } MessagePumpKqueue::MessagePumpKqueue() : kqueue_(kqueue()) { DCHECK(kqueue_) << "kqueue"; // Create a Mach port that will be used to wake up the pump by sending // a message in response to ScheduleWork(). This is significantly faster than // using an EVFILT_USER event, especially when triggered across threads. mach_port_t wakeup; kern_return_t kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &wakeup); wakeup_.reset(wakeup); CHECK(kr == KERN_SUCCESS) << "mach_port_allocate: " << mach_error_string(kr); // Specify the wakeup port event to directly receive the Mach message as part // of the kevent64() syscall. kevent64_s event{}; event.ident = wakeup_.get(); event.filter = EVFILT_MACHPORT; event.flags = EV_ADD; event.fflags = MACH_RCV_MSG; event.ext[0] = reinterpret_cast(&wakeup_buffer_); event.ext[1] = sizeof(wakeup_buffer_); int rv = ChangeOneEvent(kqueue_, &event); DCHECK(rv == 0) << "kevent64"; } MessagePumpKqueue::~MessagePumpKqueue() = default; void MessagePumpKqueue::Run(Delegate* delegate) { mozilla::AutoRestore reset_keep_running(keep_running_); keep_running_ = true; while (keep_running_) { ScopedNSAutoreleasePool pool; bool do_more_work = DoInternalWork(delegate, nullptr); if (!keep_running_) break; do_more_work |= delegate->DoWork(); if (!keep_running_) break; TimeTicks delayed_work_time; do_more_work |= delegate->DoDelayedWork(&delayed_work_time); if (!keep_running_) break; if (do_more_work) continue; do_more_work |= delegate->DoIdleWork(); if (!keep_running_) break; if (do_more_work) continue; DoInternalWork(delegate, &delayed_work_time); } } void MessagePumpKqueue::Quit() { keep_running_ = false; ScheduleWork(); } void MessagePumpKqueue::ScheduleWork() { mach_msg_empty_send_t message{}; message.header.msgh_size = sizeof(message); message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE); message.header.msgh_remote_port = wakeup_.get(); kern_return_t kr = mach_msg_send(&message.header); if (kr != KERN_SUCCESS) { // If ScheduleWork() is being called by other threads faster than the pump // can dispatch work, the kernel message queue for the wakeup port can fill // up (this happens under base_perftests, for example). The kernel does // return a SEND_ONCE right in the case of failure, which must be destroyed // to avoid leaking. if ((kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER) { DLOG(ERROR) << "mach_msg_send: " << mach_error_string(kr); } mach_msg_destroy(&message.header); } } void MessagePumpKqueue::ScheduleDelayedWork( const TimeTicks& delayed_work_time) { // Nothing to do. This MessagePump uses DoWork(). } bool MessagePumpKqueue::WatchMachReceivePort( mach_port_t port, MachPortWatchController* controller, MachPortWatcher* delegate) { DCHECK(port != MACH_PORT_NULL); DCHECK(controller); DCHECK(delegate); if (controller->port() != MACH_PORT_NULL) { DLOG(ERROR) << "Cannot use the same MachPortWatchController while it is active"; return false; } kevent64_s event{}; event.ident = port; event.filter = EVFILT_MACHPORT; event.flags = EV_ADD; int rv = ChangeOneEvent(kqueue_, &event); if (rv < 0) { DLOG(ERROR) << "kevent64"; return false; } ++event_count_; controller->Init(this, port, delegate); port_controllers_.InsertOrUpdate(port, controller); return true; } bool MessagePumpKqueue::WatchFileDescriptor(int fd, bool persistent, int mode, FileDescriptorWatcher* controller, Watcher* delegate) { DCHECK_GE(fd, 0); DCHECK(controller); DCHECK(delegate); DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0); if (controller->fd() != -1 && controller->fd() != fd) { DLOG(ERROR) << "Cannot use the same FileDescriptorWatcher on two different FDs"; return false; } StopWatchingFileDescriptor(controller); AutoTArray events; kevent64_s base_event{}; base_event.ident = static_cast(fd); base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0); if (mode & Mode::WATCH_READ) { base_event.filter = EVFILT_READ; CHECK(next_fd_controller_id_ < std::numeric_limits::max()); base_event.udata = next_fd_controller_id_++; fd_controllers_.InsertOrUpdate(base_event.udata, controller); events.AppendElement(base_event); } if (mode & Mode::WATCH_WRITE) { base_event.filter = EVFILT_WRITE; CHECK(next_fd_controller_id_ < std::numeric_limits::max()); base_event.udata = next_fd_controller_id_++; fd_controllers_.InsertOrUpdate(base_event.udata, controller); events.AppendElement(base_event); } int rv = HANDLE_EINTR(platform_kevent64(kqueue_.get(), events.Elements(), events.Length(), nullptr, 0, 0)); if (rv < 0) { DLOG(ERROR) << "WatchFileDescriptor kevent64"; return false; } event_count_ += events.Length(); controller->Init(this, fd, mode, delegate); return true; } bool MessagePumpKqueue::StopWatchingMachPort( MachPortWatchController* controller) { mach_port_t port = controller->port(); controller->Reset(); port_controllers_.Remove(port); kevent64_s event{}; event.ident = port; event.filter = EVFILT_MACHPORT; event.flags = EV_DELETE; --event_count_; int rv = ChangeOneEvent(kqueue_, &event); if (rv < 0) { DLOG(ERROR) << "kevent64"; return false; } return true; } bool MessagePumpKqueue::StopWatchingFileDescriptor( FileDescriptorWatcher* controller) { int fd = controller->fd(); int mode = controller->mode(); controller->Reset(); if (fd < 0) return true; AutoTArray events; kevent64_s base_event{}; base_event.ident = static_cast(fd); base_event.flags = EV_DELETE; if (mode & Mode::WATCH_READ) { base_event.filter = EVFILT_READ; events.AppendElement(base_event); } if (mode & Mode::WATCH_WRITE) { base_event.filter = EVFILT_WRITE; events.AppendElement(base_event); } int rv = HANDLE_EINTR(platform_kevent64(kqueue_.get(), events.Elements(), events.Length(), nullptr, 0, 0)); if (rv < 0) DLOG(ERROR) << "StopWatchingFileDescriptor kevent64"; // The keys for the IDMap aren't recorded anywhere (they're attached to the // kevent object in the kernel), so locate the entries by controller pointer. fd_controllers_.RemoveIf([&](auto& it) { return it.Data() == controller; }); event_count_ -= events.Length(); return rv >= 0; } bool MessagePumpKqueue::DoInternalWork(Delegate* delegate, TimeTicks* delayed_work_time) { if (events_.size() < event_count_) { events_.resize(event_count_); } bool poll = delayed_work_time == nullptr; int flags = poll ? KEVENT_FLAG_IMMEDIATE : 0; if (!poll && delayed_work_time_ != *delayed_work_time) { UpdateWakeupTimer(*delayed_work_time); DCHECK(delayed_work_time_ == *delayed_work_time); } int rv = HANDLE_EINTR(platform_kevent64( kqueue_.get(), nullptr, 0, events_.data(), events_.size(), flags)); CHECK(rv >= 0) << "kevent64"; return ProcessEvents(delegate, rv); } bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, int count) { bool did_work = false; for (int i = 0; i < count; ++i) { auto* event = &events_[i]; if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) { did_work = true; FileDescriptorWatcher* controller = fd_controllers_.Get(event->udata); if (!controller) { // The controller was removed by some other work callout before // this event could be processed. continue; } Watcher* fd_watcher = controller->watcher(); if (event->flags & EV_ONESHOT) { // If this was a one-shot event, the Controller needs to stop tracking // the descriptor, so it is not double-removed when it is told to stop // watching. controller->Reset(); fd_controllers_.Remove(event->udata); --event_count_; } if (fd_watcher) { if (event->filter == EVFILT_READ) { fd_watcher->OnFileCanReadWithoutBlocking( static_cast(event->ident)); } else if (event->filter == EVFILT_WRITE) { fd_watcher->OnFileCanWriteWithoutBlocking( static_cast(event->ident)); } } } else if (event->filter == EVFILT_MACHPORT) { mach_port_t port = event->ident; if (port == wakeup_.get()) { // The wakeup event has been received, do not treat this as "doing // work", this just wakes up the pump. continue; } did_work = true; MachPortWatchController* controller = port_controllers_.Get(port); // The controller could have been removed by some other work callout // before this event could be processed. if (controller) { controller->watcher()->OnMachMessageReceived(port); } } else if (event->filter == EVFILT_TIMER) { // The wakeup timer fired. DCHECK(!delayed_work_time_.is_null()); delayed_work_time_ = base::TimeTicks(); --event_count_; } else { NOTREACHED() << "Unexpected event for filter " << event->filter; } } return did_work; } void MessagePumpKqueue::UpdateWakeupTimer(const base::TimeTicks& wakeup_time) { DCHECK_NE(wakeup_time, delayed_work_time_); // The ident of the wakeup timer. There's only the one timer as the pair // (ident, filter) is the identity of the event. constexpr uint64_t kWakeupTimerIdent = 0x0; if (wakeup_time.is_null()) { // Clear the timer. kevent64_s timer{}; timer.ident = kWakeupTimerIdent; timer.filter = EVFILT_TIMER; timer.flags = EV_DELETE; int rv = ChangeOneEvent(kqueue_, &timer); CHECK(rv == 0) << "kevent64, delete timer"; --event_count_; } else { // Set/reset the timer. kevent64_s timer{}; timer.ident = kWakeupTimerIdent; timer.filter = EVFILT_TIMER; // This updates the timer if it already exists in |kqueue_|. timer.flags = EV_ADD | EV_ONESHOT; // Specify the sleep in microseconds to avoid undersleeping due to // numeric problems. // If wakeup_time is in the past, the delta below will be negative and the // timer is set immediately. timer.fflags = NOTE_USECONDS; timer.data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds(); int rv = ChangeOneEvent(kqueue_, &timer); CHECK(rv == 0) << "kevent64, set timer"; // Bump the event count if we just added the timer. if (delayed_work_time_.is_null()) ++event_count_; } delayed_work_time_ = wakeup_time; } } // namespace base