blob: bc3214e41d9c8b1f6fcf5271263124b2377fe064 [file] [log] [blame] [edit]
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <array>
#include <memory>
#include <optional>
#include <queue>
#include <string>
#include <string_view>
#include <utility>
#include "base/containers/span.h"
#include "base/files/scoped_file.h"
#include "base/message_loop/io_watcher.h"
#include "base/posix/eintr_wrapper.h"
#include "base/run_loop.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "base/threading/thread.h"
#include "testing/gtest/include/gtest/gtest.h"
#if BUILDFLAG(IS_ANDROID)
#include "base/android/java_handler_thread.h"
#endif
namespace base {
namespace {
// TODO(crbug.com/379190028): Introduce new types here as file descriptor
// support is added.
enum class FdIOCapableMessagePumpType {
kDefaultIO,
#if BUILDFLAG(IS_ANDROID)
kAndroid,
#endif
};
std::pair<ScopedFD, ScopedFD> CreateSocketPair() {
int fds[2];
CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
return {ScopedFD(fds[0]), ScopedFD(fds[1])};
}
void WriteToSocket(int fd, std::string_view msg) {
const ssize_t result = HANDLE_EINTR(write(fd, msg.data(), msg.size()));
CHECK_EQ(result, static_cast<ssize_t>(msg.size()));
}
void FillSocket(int fd) {
const std::array<char, 1024> kJunk = {};
ssize_t result;
do {
result = HANDLE_EINTR(write(fd, kJunk.data(), kJunk.size()));
} while (result > 0);
}
std::string ReadFromSocket(int fd) {
char buffer[256];
const ssize_t result = HANDLE_EINTR(read(fd, buffer, std::size(buffer)));
if (result <= 0) {
return {};
}
const auto contents = span(buffer).first(static_cast<size_t>(result));
return std::string(contents.begin(), contents.end());
}
template <typename Fn>
void RunOnTaskRunner(scoped_refptr<SequencedTaskRunner> task_runner, Fn fn) {
RunLoop loop;
task_runner->PostTask(FROM_HERE,
BindLambdaForTesting([&fn, quit = loop.QuitClosure()] {
fn();
quit.Run();
}));
loop.Run();
}
class TestFdWatcher;
class IOWatcherFdTest
: public testing::Test,
public testing::WithParamInterface<FdIOCapableMessagePumpType> {
public:
void SetUp() override {
switch (GetParam()) {
case FdIOCapableMessagePumpType::kDefaultIO:
thread_.emplace("IO thread");
thread_->StartWithOptions(Thread::Options(MessagePumpType::IO, 0));
io_task_runner_ = thread_->task_runner();
break;
#if BUILDFLAG(IS_ANDROID)
case FdIOCapableMessagePumpType::kAndroid:
java_thread_.emplace("Java thread");
java_thread_->Start();
io_task_runner_ = java_thread_->task_runner();
break;
#endif
}
}
void TearDown() override {
thread_.reset();
#if BUILDFLAG(IS_ANDROID)
if (java_thread_) {
java_thread_->Stop();
java_thread_.reset();
}
#endif
}
std::unique_ptr<TestFdWatcher> CreateWatcher();
// This is useful for ensuring that read and write can be observed at the
// same time on a socket's peer, since the operations which signal both read
// and write availability will happen on the same thread that dispatches
// signals.
void MakePeerReadableAndWritableFromIOThread(int fd) {
RunOnTaskRunner(io_task_runner_, [fd] {
WriteToSocket(fd, "x");
while (!ReadFromSocket(fd).empty()) {
}
});
}
private:
test::TaskEnvironment task_environment_;
std::optional<Thread> thread_;
#if BUILDFLAG(IS_ANDROID)
std::optional<android::JavaHandlerThread> java_thread_;
#endif
scoped_refptr<SequencedTaskRunner> io_task_runner_;
};
class TestFdWatcher : public IOWatcher::FdWatcher {
public:
explicit TestFdWatcher(scoped_refptr<SequencedTaskRunner> io_task_runner)
: io_task_runner_(std::move(io_task_runner)) {}
~TestFdWatcher() override { Stop(); }
int num_events() {
AutoLock lock(lock_);
return num_events_;
}
void reset_num_events() {
AutoLock lock(lock_);
num_events_ = 0;
}
void set_cancel_on_read() { cancel_on_read_ = true; }
void set_cancel_on_write() { cancel_on_write_ = true; }
void Watch(const ScopedFD& fd,
IOWatcher::FdWatchDuration duration,
IOWatcher::FdWatchMode mode) {
RunOnTaskRunner(io_task_runner_, [this, fd = fd.get(), duration, mode] {
watch_ = IOWatcher::Get()->WatchFileDescriptor(fd, duration, mode, *this);
});
}
void Stop() {
RunOnTaskRunner(io_task_runner_, [this] { watch_.reset(); });
}
std::string WaitForNextMessage() {
AutoLock lock(lock_);
while (messages_.empty()) {
messages_available_.Wait();
}
std::string next_message = messages_.front();
messages_.pop();
return next_message;
}
void WaitForDisconnect() { disconnect_event_.Wait(); }
void WaitForWritable() { writable_event_.Wait(); }
void WaitForReadableOrWritable() { readable_or_writable_event_.Wait(); }
// IOWatcher::FdWatcher:
void OnFdReadable(int fd) override {
bool did_read_something = false;
{
AutoLock lock(lock_);
++num_events_;
readable_or_writable_event_.Signal();
for (;;) {
std::string message = ReadFromSocket(fd);
if (message.empty()) {
break;
}
did_read_something = true;
messages_.push(std::move(message));
messages_available_.Signal();
}
}
if (!did_read_something) {
disconnect_event_.Signal();
}
if (cancel_on_read_) {
watch_.reset();
}
}
void OnFdWritable(int fd) override {
{
AutoLock lock(lock_);
++num_events_;
writable_event_.Signal();
readable_or_writable_event_.Signal();
}
if (cancel_on_write_) {
watch_.reset();
}
}
private:
const scoped_refptr<SequencedTaskRunner> io_task_runner_;
// The active watch, started by Watch(). Only one at a time and must be
// created and destroyed on `io_task_runner_`.
std::unique_ptr<IOWatcher::FdWatch> watch_;
// Signaled when `watch_` observes writability.
WaitableEvent writable_event_{WaitableEvent::ResetPolicy::AUTOMATIC};
// Signaled when `watch_` observes either readability or writability.
WaitableEvent readable_or_writable_event_{
WaitableEvent::ResetPolicy::AUTOMATIC};
// Signaled when `watch_` observes disconnection - i.e., readability when
// nothing is available to read.
WaitableEvent disconnect_event_;
// If set by a test, observing readability will immediately destroy `watch_`.
bool cancel_on_read_ = false;
// If set by a test, observing writability will immediately destroy `watch_`.
bool cancel_on_write_ = false;
Lock lock_;
// Message queue accumulated as readability is signaled.
ConditionVariable messages_available_{&lock_};
std::queue<std::string> messages_ GUARDED_BY(lock_);
// Counts the number of observed events of any kind.
int num_events_ GUARDED_BY(lock_) = 0;
};
std::unique_ptr<TestFdWatcher> IOWatcherFdTest::CreateWatcher() {
return std::make_unique<TestFdWatcher>(io_task_runner_);
}
TEST_P(IOWatcherFdTest, ReadOnce) {
// Test that a one-shot read watch sees a single readable event and no more.
auto [a, b] = CreateSocketPair();
auto watcher1 = CreateWatcher();
watcher1->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
IOWatcher::FdWatchMode::kRead);
WriteToSocket(a.get(), "ping");
EXPECT_EQ("ping", watcher1->WaitForNextMessage());
auto watcher2 = CreateWatcher();
watcher2->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
IOWatcher::FdWatchMode::kRead);
WriteToSocket(a.get(), "pong");
EXPECT_EQ("pong", watcher2->WaitForNextMessage());
EXPECT_EQ(1, watcher1->num_events());
}
TEST_P(IOWatcherFdTest, ReadPersistent) {
// Tests that a persistent read watch can see multiple events.
auto [a, b] = CreateSocketPair();
auto watcher = CreateWatcher();
watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
IOWatcher::FdWatchMode::kRead);
WriteToSocket(a.get(), "ping");
EXPECT_EQ("ping", watcher->WaitForNextMessage());
WriteToSocket(a.get(), "pong");
EXPECT_EQ("pong", watcher->WaitForNextMessage());
EXPECT_EQ(2, watcher->num_events());
a.reset();
watcher->WaitForDisconnect();
}
TEST_P(IOWatcherFdTest, StopWatch) {
// Tests that a stopped watch doesn't continue dispatching events.
auto [a, b] = CreateSocketPair();
auto watcher = CreateWatcher();
watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
IOWatcher::FdWatchMode::kRead);
WriteToSocket(a.get(), "ping");
EXPECT_EQ("ping", watcher->WaitForNextMessage());
WriteToSocket(a.get(), "pong");
EXPECT_EQ("pong", watcher->WaitForNextMessage());
watcher->Stop();
watcher->reset_num_events();
WriteToSocket(a.get(), "abc");
WriteToSocket(a.get(), "123");
EXPECT_EQ(0, watcher->num_events());
}
TEST_P(IOWatcherFdTest, Write) {
// Tests basic one-shot write watching.
auto [a, b] = CreateSocketPair();
FillSocket(b.get());
auto watcher = CreateWatcher();
watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
IOWatcher::FdWatchMode::kWrite);
MakePeerReadableAndWritableFromIOThread(a.get());
watcher->WaitForWritable();
WriteToSocket(b.get(), "x");
}
TEST_P(IOWatcherFdTest, ReadWriteUnifiedOneShot) {
// Tests that a one-shot read-write watch will observe at most one event
// even if the watched object becomes both readable and writable.
auto [a, b] = CreateSocketPair();
FillSocket(b.get());
auto watcher = CreateWatcher();
watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
IOWatcher::FdWatchMode::kReadWrite);
MakePeerReadableAndWritableFromIOThread(a.get());
watcher->WaitForReadableOrWritable();
EXPECT_EQ(1, watcher->num_events());
}
TEST_P(IOWatcherFdTest, ReadWriteSeparateOneShot) {
// Tests that separate one-shot read and write watches can observe the same
// descriptor concurrently.
auto [a, b] = CreateSocketPair();
FillSocket(b.get());
auto read_watcher = CreateWatcher();
auto write_watcher = CreateWatcher();
read_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
IOWatcher::FdWatchMode::kRead);
write_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
IOWatcher::FdWatchMode::kWrite);
MakePeerReadableAndWritableFromIOThread(a.get());
EXPECT_EQ("x", read_watcher->WaitForNextMessage());
write_watcher->WaitForWritable();
}
TEST_P(IOWatcherFdTest, CancelDuringRead) {
// Tests that the watcher behaves safely when watching both read and write
// with a persistent watch which is cancelled while handling a read.
auto [a, b] = CreateSocketPair();
FillSocket(b.get());
auto watcher = CreateWatcher();
watcher->set_cancel_on_read();
watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
IOWatcher::FdWatchMode::kReadWrite);
MakePeerReadableAndWritableFromIOThread(a.get());
EXPECT_EQ("x", watcher->WaitForNextMessage());
EXPECT_LE(watcher->num_events(), 2);
}
TEST_P(IOWatcherFdTest, CancelDuringWrite) {
// Tests that the watcher behaves safely when watching both read and write
// with a persistent watch which is cancelled while handling a write.
auto [a, b] = CreateSocketPair();
FillSocket(b.get());
auto watcher = CreateWatcher();
watcher->set_cancel_on_write();
watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
IOWatcher::FdWatchMode::kReadWrite);
MakePeerReadableAndWritableFromIOThread(a.get());
EXPECT_LE(watcher->num_events(), 2);
}
INSTANTIATE_TEST_SUITE_P(,
IOWatcherFdTest,
testing::Values(
#if BUILDFLAG(IS_ANDROID)
FdIOCapableMessagePumpType::kAndroid,
#endif
FdIOCapableMessagePumpType::kDefaultIO));
} // namespace
} // namespace base