|  | // Copyright 2024 The Chromium Authors | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include <fcntl.h> | 
|  | #include <sys/socket.h> | 
|  | #include <unistd.h> | 
|  |  | 
|  | #include <array> | 
|  | #include <memory> | 
|  | #include <optional> | 
|  | #include <queue> | 
|  | #include <string> | 
|  | #include <string_view> | 
|  | #include <utility> | 
|  |  | 
|  | #include "base/containers/span.h" | 
|  | #include "base/files/scoped_file.h" | 
|  | #include "base/message_loop/io_watcher.h" | 
|  | #include "base/posix/eintr_wrapper.h" | 
|  | #include "base/run_loop.h" | 
|  | #include "base/synchronization/condition_variable.h" | 
|  | #include "base/synchronization/lock.h" | 
|  | #include "base/synchronization/waitable_event.h" | 
|  | #include "base/test/bind.h" | 
|  | #include "base/test/task_environment.h" | 
|  | #include "base/threading/thread.h" | 
|  | #include "testing/gtest/include/gtest/gtest.h" | 
|  |  | 
|  | #if BUILDFLAG(IS_ANDROID) | 
|  | #include "base/android/java_handler_thread.h" | 
|  | #endif | 
|  |  | 
|  | namespace base { | 
|  | namespace { | 
|  |  | 
|  | // TODO(crbug.com/379190028): Introduce new types here as file descriptor | 
|  | // support is added. | 
|  | enum class FdIOCapableMessagePumpType { | 
|  | kDefaultIO, | 
|  | #if BUILDFLAG(IS_ANDROID) | 
|  | kAndroid, | 
|  | #endif | 
|  | #if defined(USE_GLIB) | 
|  | kDefaultUI | 
|  | #endif | 
|  | }; | 
|  |  | 
|  | std::pair<ScopedFD, ScopedFD> CreateSocketPair() { | 
|  | int fds[2]; | 
|  | CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0); | 
|  | PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0); | 
|  | PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0); | 
|  | return {ScopedFD(fds[0]), ScopedFD(fds[1])}; | 
|  | } | 
|  |  | 
|  | void WriteToSocket(int fd, std::string_view msg) { | 
|  | const ssize_t result = HANDLE_EINTR(write(fd, msg.data(), msg.size())); | 
|  | CHECK_EQ(result, static_cast<ssize_t>(msg.size())); | 
|  | } | 
|  |  | 
|  | void FillSocket(int fd) { | 
|  | const std::array<char, 1024> kJunk = {}; | 
|  | ssize_t result; | 
|  | do { | 
|  | result = HANDLE_EINTR(write(fd, kJunk.data(), kJunk.size())); | 
|  | } while (result > 0); | 
|  | } | 
|  |  | 
|  | std::string ReadFromSocket(int fd) { | 
|  | char buffer[256]; | 
|  | const ssize_t result = HANDLE_EINTR(read(fd, buffer, std::size(buffer))); | 
|  | if (result <= 0) { | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | const auto contents = span(buffer).first(static_cast<size_t>(result)); | 
|  | return std::string(contents.begin(), contents.end()); | 
|  | } | 
|  |  | 
|  | template <typename Fn> | 
|  | void RunOnTaskRunner(scoped_refptr<SequencedTaskRunner> task_runner, Fn fn) { | 
|  | RunLoop loop; | 
|  | task_runner->PostTask(FROM_HERE, | 
|  | BindLambdaForTesting([&fn, quit = loop.QuitClosure()] { | 
|  | fn(); | 
|  | quit.Run(); | 
|  | })); | 
|  | loop.Run(); | 
|  | } | 
|  |  | 
|  | class TestFdWatcher; | 
|  |  | 
|  | class IOWatcherFdTest | 
|  | : public testing::Test, | 
|  | public testing::WithParamInterface<FdIOCapableMessagePumpType> { | 
|  | public: | 
|  | void SetUp() override { | 
|  | switch (GetParam()) { | 
|  | case FdIOCapableMessagePumpType::kDefaultIO: | 
|  | thread_.emplace("IO thread"); | 
|  | thread_->StartWithOptions(Thread::Options(MessagePumpType::IO, 0)); | 
|  | io_task_runner_ = thread_->task_runner(); | 
|  | break; | 
|  |  | 
|  | #if BUILDFLAG(IS_ANDROID) | 
|  | case FdIOCapableMessagePumpType::kAndroid: | 
|  | java_thread_.emplace("Java thread"); | 
|  | java_thread_->Start(); | 
|  | io_task_runner_ = java_thread_->task_runner(); | 
|  | break; | 
|  | #endif | 
|  |  | 
|  | #if defined(USE_GLIB) | 
|  | case FdIOCapableMessagePumpType::kDefaultUI: | 
|  | thread_.emplace("UI thread"); | 
|  | thread_->StartWithOptions(Thread::Options(MessagePumpType::UI, 0)); | 
|  | io_task_runner_ = thread_->task_runner(); | 
|  | break; | 
|  | #endif | 
|  | } | 
|  | } | 
|  |  | 
|  | void TearDown() override { | 
|  | thread_.reset(); | 
|  | #if BUILDFLAG(IS_ANDROID) | 
|  | if (java_thread_) { | 
|  | java_thread_->Stop(); | 
|  | java_thread_.reset(); | 
|  | } | 
|  | #endif | 
|  | } | 
|  |  | 
|  | std::unique_ptr<TestFdWatcher> CreateWatcher(); | 
|  |  | 
|  | // This is useful for ensuring that read and write can be observed at the | 
|  | // same time on a socket's peer, since the operations which signal both read | 
|  | // and write availability will happen on the same thread that dispatches | 
|  | // signals. | 
|  | void MakePeerReadableAndWritableFromIOThread(int fd) { | 
|  | RunOnTaskRunner(io_task_runner_, [fd] { | 
|  | WriteToSocket(fd, "x"); | 
|  | while (!ReadFromSocket(fd).empty()) { | 
|  | } | 
|  | }); | 
|  | } | 
|  |  | 
|  | private: | 
|  | test::TaskEnvironment task_environment_; | 
|  | std::optional<Thread> thread_; | 
|  | #if BUILDFLAG(IS_ANDROID) | 
|  | std::optional<android::JavaHandlerThread> java_thread_; | 
|  | #endif | 
|  | scoped_refptr<SequencedTaskRunner> io_task_runner_; | 
|  | }; | 
|  |  | 
|  | class TestFdWatcher : public IOWatcher::FdWatcher { | 
|  | public: | 
|  | explicit TestFdWatcher(scoped_refptr<SequencedTaskRunner> io_task_runner) | 
|  | : io_task_runner_(std::move(io_task_runner)) {} | 
|  |  | 
|  | ~TestFdWatcher() override { Stop(); } | 
|  |  | 
|  | int num_events() { | 
|  | AutoLock lock(lock_); | 
|  | return num_events_; | 
|  | } | 
|  |  | 
|  | void reset_num_events() { | 
|  | AutoLock lock(lock_); | 
|  | num_events_ = 0; | 
|  | } | 
|  |  | 
|  | void set_cancel_on_read() { cancel_on_read_ = true; } | 
|  | void set_cancel_on_write() { cancel_on_write_ = true; } | 
|  |  | 
|  | void Watch(const ScopedFD& fd, | 
|  | IOWatcher::FdWatchDuration duration, | 
|  | IOWatcher::FdWatchMode mode) { | 
|  | RunOnTaskRunner(io_task_runner_, [this, fd = fd.get(), duration, mode] { | 
|  | watch_ = IOWatcher::Get()->WatchFileDescriptor(fd, duration, mode, *this); | 
|  | }); | 
|  | } | 
|  |  | 
|  | void Stop() { | 
|  | RunOnTaskRunner(io_task_runner_, [this] { watch_.reset(); }); | 
|  | } | 
|  |  | 
|  | std::string WaitForNextMessage() { | 
|  | AutoLock lock(lock_); | 
|  | while (messages_.empty()) { | 
|  | messages_available_.Wait(); | 
|  | } | 
|  | std::string next_message = messages_.front(); | 
|  | messages_.pop(); | 
|  | return next_message; | 
|  | } | 
|  |  | 
|  | void WaitForDisconnect() { disconnect_event_.Wait(); } | 
|  |  | 
|  | void WaitForWritable() { writable_event_.Wait(); } | 
|  |  | 
|  | void WaitForReadableOrWritable() { readable_or_writable_event_.Wait(); } | 
|  |  | 
|  | // IOWatcher::FdWatcher: | 
|  | void OnFdReadable(int fd) override { | 
|  | bool did_read_something = false; | 
|  | { | 
|  | AutoLock lock(lock_); | 
|  | ++num_events_; | 
|  | readable_or_writable_event_.Signal(); | 
|  |  | 
|  | for (;;) { | 
|  | std::string message = ReadFromSocket(fd); | 
|  | if (message.empty()) { | 
|  | break; | 
|  | } | 
|  |  | 
|  | did_read_something = true; | 
|  | messages_.push(std::move(message)); | 
|  | messages_available_.Signal(); | 
|  | } | 
|  | } | 
|  |  | 
|  | if (!did_read_something) { | 
|  | disconnect_event_.Signal(); | 
|  | } | 
|  |  | 
|  | if (cancel_on_read_) { | 
|  | watch_.reset(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void OnFdWritable(int fd) override { | 
|  | { | 
|  | AutoLock lock(lock_); | 
|  | ++num_events_; | 
|  | writable_event_.Signal(); | 
|  | readable_or_writable_event_.Signal(); | 
|  | } | 
|  |  | 
|  | if (cancel_on_write_) { | 
|  | watch_.reset(); | 
|  | } | 
|  | } | 
|  |  | 
|  | private: | 
|  | const scoped_refptr<SequencedTaskRunner> io_task_runner_; | 
|  |  | 
|  | // The active watch, started by Watch(). Only one at a time and must be | 
|  | // created and destroyed on `io_task_runner_`. | 
|  | std::unique_ptr<IOWatcher::FdWatch> watch_; | 
|  |  | 
|  | // Signaled when `watch_` observes writability. | 
|  | WaitableEvent writable_event_{WaitableEvent::ResetPolicy::AUTOMATIC}; | 
|  |  | 
|  | // Signaled when `watch_` observes either readability or writability. | 
|  | WaitableEvent readable_or_writable_event_{ | 
|  | WaitableEvent::ResetPolicy::AUTOMATIC}; | 
|  |  | 
|  | // Signaled when `watch_` observes disconnection - i.e., readability when | 
|  | // nothing is available to read. | 
|  | WaitableEvent disconnect_event_; | 
|  |  | 
|  | // If set by a test, observing readability will immediately destroy `watch_`. | 
|  | bool cancel_on_read_ = false; | 
|  |  | 
|  | // If set by a test, observing writability will immediately destroy `watch_`. | 
|  | bool cancel_on_write_ = false; | 
|  |  | 
|  | Lock lock_; | 
|  |  | 
|  | // Message queue accumulated as readability is signaled. | 
|  | ConditionVariable messages_available_{&lock_}; | 
|  | std::queue<std::string> messages_ GUARDED_BY(lock_); | 
|  |  | 
|  | // Counts the number of observed events of any kind. | 
|  | int num_events_ GUARDED_BY(lock_) = 0; | 
|  | }; | 
|  |  | 
|  | std::unique_ptr<TestFdWatcher> IOWatcherFdTest::CreateWatcher() { | 
|  | return std::make_unique<TestFdWatcher>(io_task_runner_); | 
|  | } | 
|  |  | 
|  | TEST_P(IOWatcherFdTest, ReadOnce) { | 
|  | // Test that a one-shot read watch sees a single readable event and no more. | 
|  | auto [a, b] = CreateSocketPair(); | 
|  | auto watcher1 = CreateWatcher(); | 
|  | watcher1->Watch(b, IOWatcher::FdWatchDuration::kOneShot, | 
|  | IOWatcher::FdWatchMode::kRead); | 
|  | WriteToSocket(a.get(), "ping"); | 
|  | EXPECT_EQ("ping", watcher1->WaitForNextMessage()); | 
|  |  | 
|  | auto watcher2 = CreateWatcher(); | 
|  | watcher2->Watch(b, IOWatcher::FdWatchDuration::kOneShot, | 
|  | IOWatcher::FdWatchMode::kRead); | 
|  | WriteToSocket(a.get(), "pong"); | 
|  | EXPECT_EQ("pong", watcher2->WaitForNextMessage()); | 
|  | EXPECT_EQ(1, watcher1->num_events()); | 
|  | } | 
|  |  | 
|  | TEST_P(IOWatcherFdTest, ReadPersistent) { | 
|  | // Tests that a persistent read watch can see multiple events. | 
|  | auto [a, b] = CreateSocketPair(); | 
|  | auto watcher = CreateWatcher(); | 
|  | watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent, | 
|  | IOWatcher::FdWatchMode::kRead); | 
|  | WriteToSocket(a.get(), "ping"); | 
|  | EXPECT_EQ("ping", watcher->WaitForNextMessage()); | 
|  | WriteToSocket(a.get(), "pong"); | 
|  | EXPECT_EQ("pong", watcher->WaitForNextMessage()); | 
|  | EXPECT_EQ(2, watcher->num_events()); | 
|  | a.reset(); | 
|  | watcher->WaitForDisconnect(); | 
|  | } | 
|  |  | 
|  | TEST_P(IOWatcherFdTest, StopWatch) { | 
|  | // Tests that a stopped watch doesn't continue dispatching events. | 
|  | auto [a, b] = CreateSocketPair(); | 
|  | auto watcher = CreateWatcher(); | 
|  | watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent, | 
|  | IOWatcher::FdWatchMode::kRead); | 
|  | WriteToSocket(a.get(), "ping"); | 
|  | EXPECT_EQ("ping", watcher->WaitForNextMessage()); | 
|  | WriteToSocket(a.get(), "pong"); | 
|  | EXPECT_EQ("pong", watcher->WaitForNextMessage()); | 
|  | watcher->Stop(); | 
|  | watcher->reset_num_events(); | 
|  |  | 
|  | WriteToSocket(a.get(), "abc"); | 
|  | WriteToSocket(a.get(), "123"); | 
|  | EXPECT_EQ(0, watcher->num_events()); | 
|  | } | 
|  |  | 
|  | TEST_P(IOWatcherFdTest, Write) { | 
|  | // Tests basic one-shot write watching. | 
|  | auto [a, b] = CreateSocketPair(); | 
|  | FillSocket(b.get()); | 
|  | auto watcher = CreateWatcher(); | 
|  | watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot, | 
|  | IOWatcher::FdWatchMode::kWrite); | 
|  | MakePeerReadableAndWritableFromIOThread(a.get()); | 
|  | watcher->WaitForWritable(); | 
|  | WriteToSocket(b.get(), "x"); | 
|  | } | 
|  |  | 
|  | TEST_P(IOWatcherFdTest, ReadWriteUnifiedOneShot) { | 
|  | // Tests that a one-shot read-write watch will observe at most one event | 
|  | // even if the watched object becomes both readable and writable. | 
|  | auto [a, b] = CreateSocketPair(); | 
|  | FillSocket(b.get()); | 
|  | auto watcher = CreateWatcher(); | 
|  | watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot, | 
|  | IOWatcher::FdWatchMode::kReadWrite); | 
|  | MakePeerReadableAndWritableFromIOThread(a.get()); | 
|  | watcher->WaitForReadableOrWritable(); | 
|  | EXPECT_EQ(1, watcher->num_events()); | 
|  | } | 
|  |  | 
|  | TEST_P(IOWatcherFdTest, ReadWriteSeparateOneShot) { | 
|  | // Tests that separate one-shot read and write watches can observe the same | 
|  | // descriptor concurrently. | 
|  | auto [a, b] = CreateSocketPair(); | 
|  | FillSocket(b.get()); | 
|  | auto read_watcher = CreateWatcher(); | 
|  | auto write_watcher = CreateWatcher(); | 
|  | read_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot, | 
|  | IOWatcher::FdWatchMode::kRead); | 
|  | write_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot, | 
|  | IOWatcher::FdWatchMode::kWrite); | 
|  | MakePeerReadableAndWritableFromIOThread(a.get()); | 
|  | EXPECT_EQ("x", read_watcher->WaitForNextMessage()); | 
|  | write_watcher->WaitForWritable(); | 
|  | } | 
|  |  | 
|  | TEST_P(IOWatcherFdTest, CancelDuringRead) { | 
|  | // Tests that the watcher behaves safely when watching both read and write | 
|  | // with a persistent watch which is cancelled while handling a read. | 
|  | auto [a, b] = CreateSocketPair(); | 
|  | FillSocket(b.get()); | 
|  | auto watcher = CreateWatcher(); | 
|  | watcher->set_cancel_on_read(); | 
|  | watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent, | 
|  | IOWatcher::FdWatchMode::kReadWrite); | 
|  | MakePeerReadableAndWritableFromIOThread(a.get()); | 
|  | EXPECT_EQ("x", watcher->WaitForNextMessage()); | 
|  | EXPECT_LE(watcher->num_events(), 2); | 
|  | } | 
|  |  | 
|  | TEST_P(IOWatcherFdTest, CancelDuringWrite) { | 
|  | // Tests that the watcher behaves safely when watching both read and write | 
|  | // with a persistent watch which is cancelled while handling a write. | 
|  | auto [a, b] = CreateSocketPair(); | 
|  | FillSocket(b.get()); | 
|  | auto watcher = CreateWatcher(); | 
|  | watcher->set_cancel_on_write(); | 
|  | watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent, | 
|  | IOWatcher::FdWatchMode::kReadWrite); | 
|  | MakePeerReadableAndWritableFromIOThread(a.get()); | 
|  | EXPECT_LE(watcher->num_events(), 2); | 
|  | } | 
|  |  | 
|  | INSTANTIATE_TEST_SUITE_P(, | 
|  | IOWatcherFdTest, | 
|  | testing::Values( | 
|  | #if BUILDFLAG(IS_ANDROID) | 
|  | FdIOCapableMessagePumpType::kAndroid, | 
|  | #endif | 
|  | #if defined(USE_GLIB) | 
|  | FdIOCapableMessagePumpType::kDefaultUI, | 
|  | #endif | 
|  | FdIOCapableMessagePumpType::kDefaultIO)); | 
|  |  | 
|  | }  // namespace | 
|  | }  // namespace base |