| // Copyright 2019 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifdef UNSAFE_BUFFERS_BUILD |
| // TODO(crbug.com/40284755): Remove this and spanify to fix the errors. |
| #pragma allow_unsafe_buffers |
| #endif |
| |
| #include "base/message_loop/message_pump_kqueue.h" |
| |
| #include <sys/errno.h> |
| |
| #include <atomic> |
| |
| #include "base/apple/mach_logging.h" |
| #include "base/apple/scoped_nsautorelease_pool.h" |
| #include "base/auto_reset.h" |
| #include "base/feature_list.h" |
| #include "base/logging.h" |
| #include "base/mac/mac_util.h" |
| #include "base/notreached.h" |
| #include "base/posix/eintr_wrapper.h" |
| #include "base/task/task_features.h" |
| #include "base/time/time_override.h" |
| |
| namespace base { |
| |
| namespace { |
| |
| // Under this feature native work is batched. Remove it once crbug.com/1200141 |
| // is resolved. |
| BASE_FEATURE(kBatchNativeEventsInMessagePumpKqueue, |
| "BatchNativeEventsInMessagePumpKqueue", |
| base::FEATURE_DISABLED_BY_DEFAULT); |
| |
| // Caches the state of the "BatchNativeEventsInMessagePumpKqueue". |
| std::atomic_bool g_use_batched_version = false; |
| |
| // Caches the state of the "TimerSlackMac" feature for efficiency. |
| std::atomic_bool g_timer_slack = false; |
| |
| #if DCHECK_IS_ON() |
| // Prior to macOS 10.14, kqueue timers may spuriously wake up, because earlier |
| // wake ups race with timer resets in the kernel. As of macOS 10.14, updating a |
| // timer from the thread that reads the kqueue does not cause spurious wakeups. |
| // Note that updating a kqueue timer from one thread while another thread is |
| // waiting in a kevent64 invocation is still (inherently) racy. |
| bool KqueueTimersSpuriouslyWakeUp() { |
| #if BUILDFLAG(IS_MAC) |
| return false; |
| #else |
| // This still happens on iOS15. |
| return true; |
| #endif |
| } |
| #endif |
| |
| int ChangeOneEvent(const ScopedFD& kqueue, kevent64_s* event) { |
| return HANDLE_EINTR(kevent64(kqueue.get(), event, 1, nullptr, 0, 0, nullptr)); |
| } |
| |
| } // namespace |
| |
| MessagePumpKqueue::FdWatchController::FdWatchController( |
| const Location& from_here) |
| : FdWatchControllerInterface(from_here) {} |
| |
| MessagePumpKqueue::FdWatchController::~FdWatchController() { |
| StopWatchingFileDescriptor(); |
| } |
| |
| bool MessagePumpKqueue::FdWatchController::StopWatchingFileDescriptor() { |
| if (!pump_) |
| return true; |
| return pump_->StopWatchingFileDescriptor(this); |
| } |
| |
| void MessagePumpKqueue::FdWatchController::Init(WeakPtr<MessagePumpKqueue> pump, |
| int fd, |
| int mode, |
| FdWatcher* watcher) { |
| DCHECK_NE(fd, -1); |
| DCHECK(!watcher_); |
| DCHECK(watcher); |
| DCHECK(pump); |
| fd_ = fd; |
| mode_ = mode; |
| watcher_ = watcher; |
| pump_ = pump; |
| } |
| |
| void MessagePumpKqueue::FdWatchController::Reset() { |
| fd_ = -1; |
| mode_ = 0; |
| watcher_ = nullptr; |
| pump_ = nullptr; |
| } |
| |
| MessagePumpKqueue::MachPortWatchController::MachPortWatchController( |
| const Location& from_here) |
| : from_here_(from_here) {} |
| |
| MessagePumpKqueue::MachPortWatchController::~MachPortWatchController() { |
| StopWatchingMachPort(); |
| } |
| |
| bool MessagePumpKqueue::MachPortWatchController::StopWatchingMachPort() { |
| if (!pump_) |
| return true; |
| return pump_->StopWatchingMachPort(this); |
| } |
| |
| void MessagePumpKqueue::MachPortWatchController::Init( |
| WeakPtr<MessagePumpKqueue> pump, |
| mach_port_t port, |
| MachPortWatcher* watcher) { |
| DCHECK(!watcher_); |
| DCHECK(watcher); |
| DCHECK(pump); |
| port_ = port; |
| watcher_ = watcher; |
| pump_ = pump; |
| } |
| |
| void MessagePumpKqueue::MachPortWatchController::Reset() { |
| port_ = MACH_PORT_NULL; |
| watcher_ = nullptr; |
| pump_ = nullptr; |
| } |
| |
| MessagePumpKqueue::MessagePumpKqueue() |
| : kqueue_(kqueue()), weak_factory_(this) { |
| PCHECK(kqueue_.is_valid()) << "kqueue"; |
| |
| // Create a Mach port that will be used to wake up the pump by sending |
| // a message in response to ScheduleWork(). This is significantly faster than |
| // using an EVFILT_USER event, especially when triggered across threads. |
| kern_return_t kr = mach_port_allocate( |
| mach_task_self(), MACH_PORT_RIGHT_RECEIVE, |
| base::apple::ScopedMachReceiveRight::Receiver(wakeup_).get()); |
| MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate"; |
| |
| // Configure the event to directly receive the Mach message as part of the |
| // kevent64() call. |
| kevent64_s event{}; |
| event.ident = wakeup_.get(); |
| event.filter = EVFILT_MACHPORT; |
| event.flags = EV_ADD; |
| event.fflags = MACH_RCV_MSG; |
| event.ext[0] = reinterpret_cast<uint64_t>(&wakeup_buffer_); |
| event.ext[1] = sizeof(wakeup_buffer_); |
| |
| int rv = ChangeOneEvent(kqueue_, &event); |
| PCHECK(rv == 0) << "kevent64"; |
| } |
| |
| MessagePumpKqueue::~MessagePumpKqueue() {} |
| |
| void MessagePumpKqueue::InitializeFeatures() { |
| g_use_batched_version.store( |
| base::FeatureList::IsEnabled(kBatchNativeEventsInMessagePumpKqueue), |
| std::memory_order_relaxed); |
| g_timer_slack.store(FeatureList::IsEnabled(kTimerSlackMac), |
| std::memory_order_relaxed); |
| } |
| |
| void MessagePumpKqueue::Run(Delegate* delegate) { |
| AutoReset<bool> reset_keep_running(&keep_running_, true); |
| |
| if (g_use_batched_version.load(std::memory_order_relaxed)) { |
| RunBatched(delegate); |
| } else { |
| while (keep_running_) { |
| apple::ScopedNSAutoreleasePool pool; |
| |
| bool do_more_work = DoInternalWork(delegate, nullptr); |
| if (!keep_running_) |
| break; |
| |
| Delegate::NextWorkInfo next_work_info = delegate->DoWork(); |
| do_more_work |= next_work_info.is_immediate(); |
| if (!keep_running_) |
| break; |
| |
| if (do_more_work) |
| continue; |
| |
| delegate->DoIdleWork(); |
| if (!keep_running_) |
| break; |
| |
| DoInternalWork(delegate, &next_work_info); |
| } |
| } |
| } |
| |
| void MessagePumpKqueue::RunBatched(Delegate* delegate) { |
| // Look for native work once before the loop starts. Without this call the |
| // loop would break without checking native work even once in cases where |
| // QuitWhenIdle was used. This is sometimes the case in tests. |
| DoInternalWork(delegate, nullptr); |
| |
| while (keep_running_) { |
| apple::ScopedNSAutoreleasePool pool; |
| |
| Delegate::NextWorkInfo next_work_info = delegate->DoWork(); |
| if (!keep_running_) |
| break; |
| |
| if (!next_work_info.is_immediate()) { |
| delegate->DoIdleWork(); |
| } |
| if (!keep_running_) |
| break; |
| |
| int batch_size = 0; |
| if (DoInternalWork(delegate, &next_work_info)) { |
| // More than one call can be necessary to fully dispatch all available |
| // internal work. Making an effort to dispatch more than the minimum |
| // before moving on to application tasks reduces the overhead of going |
| // through the whole loop. It also more closely mirrors the behavior of |
| // application task execution where tasks are batched. A value of 16 was |
| // chosen via local experimentation showing that is was sufficient to |
| // dispatch all work in roughly 95% of cases. |
| constexpr int kMaxAttempts = 16; |
| while (DoInternalWork(delegate, nullptr) && batch_size < kMaxAttempts) { |
| ++batch_size; |
| } |
| } |
| } |
| } |
| |
| void MessagePumpKqueue::Quit() { |
| keep_running_ = false; |
| ScheduleWork(); |
| } |
| |
| void MessagePumpKqueue::ScheduleWork() { |
| mach_msg_empty_send_t message{}; |
| message.header.msgh_size = sizeof(message); |
| message.header.msgh_bits = |
| MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE); |
| message.header.msgh_remote_port = wakeup_.get(); |
| kern_return_t kr = mach_msg_send(&message.header); |
| if (kr != KERN_SUCCESS) { |
| // If ScheduleWork() is being called by other threads faster than the pump |
| // can dispatch work, the kernel message queue for the wakeup port can fill |
| // up (this happens under base_perftests, for example). The kernel does |
| // return a SEND_ONCE right in the case of failure, which must be destroyed |
| // to avoid leaking. |
| MACH_DLOG_IF(ERROR, (kr & ~MACH_MSG_IPC_SPACE) != MACH_SEND_NO_BUFFER, kr) |
| << "mach_msg_send"; |
| mach_msg_destroy(&message.header); |
| } |
| } |
| |
| void MessagePumpKqueue::ScheduleDelayedWork( |
| const Delegate::NextWorkInfo& next_work_info) { |
| // Nothing to do. This MessagePump uses DoWork(). |
| } |
| |
| bool MessagePumpKqueue::WatchMachReceivePort( |
| mach_port_t port, |
| MachPortWatchController* controller, |
| MachPortWatcher* delegate) { |
| DCHECK(port != MACH_PORT_NULL); |
| DCHECK(controller); |
| DCHECK(delegate); |
| |
| if (controller->port() != MACH_PORT_NULL) { |
| DLOG(ERROR) |
| << "Cannot use the same MachPortWatchController while it is active"; |
| return false; |
| } |
| |
| kevent64_s event{}; |
| event.ident = port; |
| event.filter = EVFILT_MACHPORT; |
| event.flags = EV_ADD; |
| int rv = ChangeOneEvent(kqueue_, &event); |
| if (rv < 0) { |
| DPLOG(ERROR) << "kevent64"; |
| return false; |
| } |
| ++event_count_; |
| |
| controller->Init(weak_factory_.GetWeakPtr(), port, delegate); |
| port_controllers_.AddWithID(controller, port); |
| |
| return true; |
| } |
| |
| TimeTicks MessagePumpKqueue::AdjustDelayedRunTime(TimeTicks earliest_time, |
| TimeTicks run_time, |
| TimeTicks latest_time) { |
| if (g_timer_slack.load(std::memory_order_relaxed)) { |
| return earliest_time; |
| } |
| return MessagePump::AdjustDelayedRunTime(earliest_time, run_time, |
| latest_time); |
| } |
| |
| bool MessagePumpKqueue::WatchFileDescriptor(int fd, |
| bool persistent, |
| int mode, |
| FdWatchController* controller, |
| FdWatcher* delegate) { |
| DCHECK_GE(fd, 0); |
| DCHECK(controller); |
| DCHECK(delegate); |
| DCHECK_NE(mode & Mode::WATCH_READ_WRITE, 0); |
| |
| if (controller->fd() != -1 && controller->fd() != fd) { |
| DLOG(ERROR) << "Cannot use the same FdWatchController on two different FDs"; |
| return false; |
| } |
| StopWatchingFileDescriptor(controller); |
| |
| std::vector<kevent64_s> events; |
| |
| kevent64_s base_event{}; |
| base_event.ident = static_cast<uint64_t>(fd); |
| base_event.flags = EV_ADD | (!persistent ? EV_ONESHOT : 0); |
| |
| if (mode & Mode::WATCH_READ) { |
| base_event.filter = EVFILT_READ; |
| base_event.udata = fd_controllers_.Add(controller); |
| events.push_back(base_event); |
| } |
| if (mode & Mode::WATCH_WRITE) { |
| base_event.filter = EVFILT_WRITE; |
| base_event.udata = fd_controllers_.Add(controller); |
| events.push_back(base_event); |
| } |
| |
| int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(), |
| checked_cast<int>(events.size()), nullptr, 0, |
| 0, nullptr)); |
| if (rv < 0) { |
| DPLOG(ERROR) << "WatchFileDescriptor kevent64"; |
| return false; |
| } |
| |
| event_count_ += events.size(); |
| controller->Init(weak_factory_.GetWeakPtr(), fd, mode, delegate); |
| |
| return true; |
| } |
| |
| void MessagePumpKqueue::SetWakeupTimerEvent(const base::TimeTicks& wakeup_time, |
| base::TimeDelta leeway, |
| kevent64_s* timer_event) { |
| // The ident of the wakeup timer. There's only the one timer as the pair |
| // (ident, filter) is the identity of the event. |
| constexpr uint64_t kWakeupTimerIdent = 0x0; |
| timer_event->ident = kWakeupTimerIdent; |
| timer_event->filter = EVFILT_TIMER; |
| if (wakeup_time == base::TimeTicks::Max()) { |
| timer_event->flags = EV_DELETE; |
| } else { |
| timer_event->filter = EVFILT_TIMER; |
| // This updates the timer if it already exists in |kqueue_|. |
| timer_event->flags = EV_ADD | EV_ONESHOT; |
| |
| // Specify the sleep in microseconds to avoid undersleeping due to |
| // numeric problems. The sleep is computed from TimeTicks::Now rather than |
| // NextWorkInfo::recent_now because recent_now is strictly earlier than |
| // current wall-clock. Using an earlier wall clock time to compute the |
| // delta to the next wakeup wall-clock time would guarantee oversleep. |
| // If wakeup_time is in the past, the delta below will be negative and the |
| // timer is set immediately. |
| timer_event->fflags = NOTE_USECONDS; |
| timer_event->data = (wakeup_time - base::TimeTicks::Now()).InMicroseconds(); |
| |
| if (!leeway.is_zero() && g_timer_slack.load(std::memory_order_relaxed)) { |
| // Specify slack based on |leeway|. |
| // See "man kqueue" in recent macOSen for documentation. |
| timer_event->fflags |= NOTE_LEEWAY; |
| timer_event->ext[1] = static_cast<uint64_t>(leeway.InMicroseconds()); |
| } |
| } |
| } |
| |
| bool MessagePumpKqueue::StopWatchingMachPort( |
| MachPortWatchController* controller) { |
| mach_port_t port = controller->port(); |
| controller->Reset(); |
| port_controllers_.Remove(port); |
| |
| kevent64_s event{}; |
| event.ident = port; |
| event.filter = EVFILT_MACHPORT; |
| event.flags = EV_DELETE; |
| --event_count_; |
| int rv = ChangeOneEvent(kqueue_, &event); |
| if (rv < 0) { |
| DPLOG(ERROR) << "kevent64"; |
| return false; |
| } |
| |
| return true; |
| } |
| |
| bool MessagePumpKqueue::StopWatchingFileDescriptor( |
| FdWatchController* controller) { |
| int fd = controller->fd(); |
| int mode = controller->mode(); |
| controller->Reset(); |
| |
| if (fd < 0) |
| return true; |
| |
| std::vector<kevent64_s> events; |
| |
| kevent64_s base_event{}; |
| base_event.ident = static_cast<uint64_t>(fd); |
| base_event.flags = EV_DELETE; |
| |
| if (mode & Mode::WATCH_READ) { |
| base_event.filter = EVFILT_READ; |
| events.push_back(base_event); |
| } |
| if (mode & Mode::WATCH_WRITE) { |
| base_event.filter = EVFILT_WRITE; |
| events.push_back(base_event); |
| } |
| |
| int rv = HANDLE_EINTR(kevent64(kqueue_.get(), events.data(), |
| checked_cast<int>(events.size()), nullptr, 0, |
| 0, nullptr)); |
| DPLOG_IF(ERROR, rv < 0) << "StopWatchingFileDescriptor kevent64"; |
| |
| // The keys for the IDMap aren't recorded anywhere (they're attached to the |
| // kevent object in the kernel), so locate the entries by controller pointer. |
| for (IDMap<FdWatchController*, uint64_t>::iterator it(&fd_controllers_); |
| !it.IsAtEnd(); it.Advance()) { |
| if (it.GetCurrentValue() == controller) { |
| fd_controllers_.Remove(it.GetCurrentKey()); |
| } |
| } |
| |
| event_count_ -= events.size(); |
| |
| return rv >= 0; |
| } |
| |
| bool MessagePumpKqueue::DoInternalWork(Delegate* delegate, |
| Delegate::NextWorkInfo* next_work_info) { |
| if (events_.size() < event_count_) { |
| events_.resize(event_count_); |
| } |
| |
| bool immediate = next_work_info == nullptr; |
| unsigned int flags = immediate ? KEVENT_FLAG_IMMEDIATE : 0; |
| |
| if (!immediate) { |
| MaybeUpdateWakeupTimer(next_work_info->delayed_run_time, |
| next_work_info->leeway); |
| DCHECK_EQ(scheduled_wakeup_time_, next_work_info->delayed_run_time); |
| delegate->BeforeWait(); |
| } |
| |
| int rv = |
| HANDLE_EINTR(kevent64(kqueue_.get(), nullptr, 0, events_.data(), |
| checked_cast<int>(events_.size()), flags, nullptr)); |
| if (rv == 0) { |
| // No events to dispatch so no need to call ProcessEvents(). |
| return false; |
| } |
| |
| PCHECK(rv > 0) << "kevent64"; |
| return ProcessEvents(delegate, static_cast<size_t>(rv)); |
| } |
| |
| bool MessagePumpKqueue::ProcessEvents(Delegate* delegate, size_t count) { |
| bool did_work = false; |
| |
| delegate->BeginNativeWorkBeforeDoWork(); |
| for (size_t i = 0; i < count; ++i) { |
| auto* event = &events_[i]; |
| if (event->filter == EVFILT_READ || event->filter == EVFILT_WRITE) { |
| did_work = true; |
| |
| FdWatchController* controller = fd_controllers_.Lookup(event->udata); |
| if (!controller) { |
| // The controller was removed by some other work callout before |
| // this event could be processed. |
| continue; |
| } |
| FdWatcher* fd_watcher = controller->watcher(); |
| |
| if (event->flags & EV_ONESHOT) { |
| // If this was a one-shot event, the Controller needs to stop tracking |
| // the descriptor, so it is not double-removed when it is told to stop |
| // watching. |
| controller->Reset(); |
| fd_controllers_.Remove(event->udata); |
| --event_count_; |
| } |
| |
| auto scoped_do_work_item = delegate->BeginWorkItem(); |
| // WatchFileDescriptor() originally upcasts event->ident from an int. |
| if (event->filter == EVFILT_READ) { |
| fd_watcher->OnFileCanReadWithoutBlocking( |
| static_cast<int>(event->ident)); |
| } else if (event->filter == EVFILT_WRITE) { |
| fd_watcher->OnFileCanWriteWithoutBlocking( |
| static_cast<int>(event->ident)); |
| } |
| } else if (event->filter == EVFILT_MACHPORT) { |
| // WatchMachReceivePort() originally sets event->ident from a mach_port_t. |
| mach_port_t port = static_cast<mach_port_t>(event->ident); |
| if (port == wakeup_.get()) { |
| // The wakeup event has been received, do not treat this as "doing |
| // work", this just wakes up the pump. |
| continue; |
| } |
| |
| did_work = true; |
| |
| MachPortWatchController* controller = port_controllers_.Lookup(port); |
| // The controller could have been removed by some other work callout |
| // before this event could be processed. |
| if (controller) { |
| auto scoped_do_work_item = delegate->BeginWorkItem(); |
| controller->watcher()->OnMachMessageReceived(port); |
| } |
| } else if (event->filter == EVFILT_TIMER) { |
| // The wakeup timer fired. |
| #if DCHECK_IS_ON() |
| // On macOS 10.13 and earlier, kqueue timers may spuriously wake up. |
| // When this happens, the timer will be re-scheduled the next time |
| // DoInternalWork is entered, which means this doesn't lead to a |
| // spinning wait. |
| // When clock overrides are active, TimeTicks::Now may be decoupled from |
| // wall-clock time, and can therefore not be used to validate whether the |
| // expected wall-clock time has passed. |
| if (!KqueueTimersSpuriouslyWakeUp() && |
| !subtle::ScopedTimeClockOverrides::overrides_active()) { |
| // Given the caveats above, assert that the timer didn't fire early. |
| DCHECK_LE(scheduled_wakeup_time_, base::TimeTicks::Now()); |
| } |
| #endif |
| DCHECK_NE(scheduled_wakeup_time_, base::TimeTicks::Max()); |
| scheduled_wakeup_time_ = base::TimeTicks::Max(); |
| --event_count_; |
| } else { |
| NOTREACHED_IN_MIGRATION() |
| << "Unexpected event for filter " << event->filter; |
| } |
| } |
| |
| return did_work; |
| } |
| |
| void MessagePumpKqueue::MaybeUpdateWakeupTimer( |
| const base::TimeTicks& wakeup_time, |
| base::TimeDelta leeway) { |
| if (wakeup_time == scheduled_wakeup_time_) { |
| // No change in the timer setting necessary. |
| return; |
| } |
| |
| if (wakeup_time == base::TimeTicks::Max()) { |
| // If the timer was already reset, don't re-reset it on a suspend toggle. |
| if (scheduled_wakeup_time_ != base::TimeTicks::Max()) { |
| // Clear the timer. |
| kevent64_s timer{}; |
| SetWakeupTimerEvent(wakeup_time, leeway, &timer); |
| int rv = ChangeOneEvent(kqueue_, &timer); |
| PCHECK(rv == 0) << "kevent64, delete timer"; |
| --event_count_; |
| } |
| } else { |
| // Set/reset the timer. |
| kevent64_s timer{}; |
| SetWakeupTimerEvent(wakeup_time, leeway, &timer); |
| int rv = ChangeOneEvent(kqueue_, &timer); |
| PCHECK(rv == 0) << "kevent64, set timer"; |
| |
| // Bump the event count if we just added the timer. |
| if (scheduled_wakeup_time_ == base::TimeTicks::Max()) |
| ++event_count_; |
| } |
| |
| scheduled_wakeup_time_ = wakeup_time; |
| } |
| |
| } // namespace base |