blob: 6031083fcf64e9a47860ac99155fcdd84cd7183c [file] [log] [blame]
// Copyright 2013 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/sync_socket.h"
#include <stddef.h>
#include <stdint.h>
#include <array>
#include "base/compiler_specific.h"
#include "base/containers/span.h"
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/memory/raw_ptr.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread.h"
#include "base/time/time.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h"
#if BUILDFLAG(IS_POSIX) || BUILDFLAG(IS_FUCHSIA)
#include "base/file_descriptor_posix.h"
#endif
namespace base {
namespace {
constexpr TimeDelta kReceiveTimeout = base::Milliseconds(750);
class HangingReceiveThread : public DelegateSimpleThread::Delegate {
public:
explicit HangingReceiveThread(SyncSocket* socket, bool with_timeout)
: socket_(socket),
thread_(this, "HangingReceiveThread"),
with_timeout_(with_timeout),
started_event_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
done_event_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {
thread_.Start();
}
HangingReceiveThread(const HangingReceiveThread&) = delete;
HangingReceiveThread& operator=(const HangingReceiveThread&) = delete;
~HangingReceiveThread() override = default;
void Run() override {
int data = 0;
ASSERT_EQ(socket_->Peek(), 0u);
started_event_.Signal();
if (with_timeout_) {
ASSERT_EQ(0u, socket_->ReceiveWithTimeout(byte_span_from_ref(data),
kReceiveTimeout));
} else {
ASSERT_EQ(0u, socket_->Receive(byte_span_from_ref(data)));
}
done_event_.Signal();
}
void Stop() { thread_.Join(); }
WaitableEvent* started_event() { return &started_event_; }
WaitableEvent* done_event() { return &done_event_; }
private:
raw_ptr<SyncSocket> socket_;
DelegateSimpleThread thread_;
bool with_timeout_;
WaitableEvent started_event_;
WaitableEvent done_event_;
};
// Tests sending data between two SyncSockets. Uses ASSERT() and thus will exit
// early upon failure. Callers should use ASSERT_NO_FATAL_FAILURE() if testing
// continues after return.
void SendReceivePeek(SyncSocket* socket_a, SyncSocket* socket_b) {
int received = 0;
const int kSending = 123;
static_assert(sizeof(kSending) == sizeof(received), "invalid data size");
ASSERT_EQ(0u, socket_a->Peek());
ASSERT_EQ(0u, socket_b->Peek());
// Verify |socket_a| can send to |socket_a| and |socket_a| can Receive from
// |socket_a|.
ASSERT_EQ(sizeof(kSending), socket_a->Send(byte_span_from_ref(kSending)));
ASSERT_EQ(sizeof(kSending), socket_b->Peek());
ASSERT_EQ(sizeof(kSending), socket_b->Receive(byte_span_from_ref(received)));
ASSERT_EQ(kSending, received);
ASSERT_EQ(0u, socket_a->Peek());
ASSERT_EQ(0u, socket_b->Peek());
// Now verify the reverse.
received = 0;
ASSERT_EQ(sizeof(kSending), socket_b->Send(byte_span_from_ref(kSending)));
ASSERT_EQ(sizeof(kSending), socket_a->Peek());
ASSERT_EQ(sizeof(kSending), socket_a->Receive(byte_span_from_ref(received)));
ASSERT_EQ(kSending, received);
ASSERT_EQ(0u, socket_a->Peek());
ASSERT_EQ(0u, socket_b->Peek());
socket_a->Close();
socket_b->Close();
}
const char kHelloString[] = "Hello, SyncSocket Client";
const size_t kHelloStringLength = std::size(kHelloString);
// A blocking read operation that will block the thread until it receives
// |buffer|'s length bytes of packets or Shutdown() is called on another thread.
static void BlockingRead(base::SyncSocket* socket,
base::span<uint8_t> buffer,
size_t* received) {
// Notify the parent thread that we're up and running.
socket->Send(base::as_byte_span(kHelloString));
*received = socket->Receive(buffer);
}
} // namespace
class SyncSocketTest : public testing::Test {
public:
void SetUp() override {
ASSERT_TRUE(SyncSocket::CreatePair(&socket_a_, &socket_b_));
}
protected:
SyncSocket socket_a_;
SyncSocket socket_b_;
};
TEST_F(SyncSocketTest, NormalSendReceivePeek) {
SendReceivePeek(&socket_a_, &socket_b_);
}
TEST_F(SyncSocketTest, ClonedSendReceivePeek) {
SyncSocket socket_c(socket_a_.Release());
SyncSocket socket_d(socket_b_.Release());
SendReceivePeek(&socket_c, &socket_d);
}
class CancelableSyncSocketTest : public testing::Test {
public:
void SetUp() override {
ASSERT_TRUE(CancelableSyncSocket::CreatePair(&socket_a_, &socket_b_));
}
protected:
CancelableSyncSocket socket_a_;
CancelableSyncSocket socket_b_;
};
TEST_F(CancelableSyncSocketTest, NormalSendReceivePeek) {
SendReceivePeek(&socket_a_, &socket_b_);
}
TEST_F(CancelableSyncSocketTest, ClonedSendReceivePeek) {
CancelableSyncSocket socket_c(socket_a_.Release());
CancelableSyncSocket socket_d(socket_b_.Release());
SendReceivePeek(&socket_c, &socket_d);
}
// TODO(https://crbug.com/361250560): Flaky on mac.
#if BUILDFLAG(IS_MAC)
#define MAYBE_ShutdownCancelsReceive DISABLED_ShutdownCancelsReceive
#else
#define MAYBE_ShutdownCancelsReceive ShutdownCancelsReceive
#endif
TEST_F(CancelableSyncSocketTest, MAYBE_ShutdownCancelsReceive) {
HangingReceiveThread thread(&socket_b_, /* with_timeout = */ false);
// Wait for the thread to be started. Note that this doesn't guarantee that
// Receive() is called before Shutdown().
thread.started_event()->Wait();
EXPECT_TRUE(socket_b_.Shutdown());
EXPECT_TRUE(thread.done_event()->TimedWait(kReceiveTimeout));
thread.Stop();
}
TEST_F(CancelableSyncSocketTest, ShutdownCancelsReceiveWithTimeout) {
HangingReceiveThread thread(&socket_b_, /* with_timeout = */ true);
// Wait for the thread to be started. Note that this doesn't guarantee that
// Receive() is called before Shutdown().
thread.started_event()->Wait();
EXPECT_TRUE(socket_b_.Shutdown());
EXPECT_TRUE(thread.done_event()->TimedWait(kReceiveTimeout));
thread.Stop();
}
TEST_F(CancelableSyncSocketTest, ReceiveAfterShutdown) {
socket_a_.Shutdown();
int data = 0;
EXPECT_EQ(0u, socket_a_.Receive(byte_span_from_ref(data)));
}
TEST_F(CancelableSyncSocketTest, ReceiveWithTimeoutAfterShutdown) {
socket_a_.Shutdown();
TimeTicks start = TimeTicks::Now();
int data = 0;
EXPECT_EQ(0u, socket_a_.ReceiveWithTimeout(byte_span_from_ref(data),
kReceiveTimeout));
// Ensure the receive didn't just timeout.
EXPECT_LT(TimeTicks::Now() - start, kReceiveTimeout);
}
// Tests that we can safely end a blocking Receive operation on one thread
// from another thread by disconnecting (but not closing) the socket.
TEST_F(SyncSocketTest, DisconnectTest) {
std::array<base::CancelableSyncSocket, 2> pair;
ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1]));
base::Thread worker("BlockingThread");
worker.Start();
// Try to do a blocking read from one of the sockets on the worker thread.
char buf[0xff];
size_t received = 1U; // Initialize to an unexpected value.
worker.task_runner()->PostTask(
FROM_HERE, base::BindOnce(&BlockingRead, &pair[0],
base::as_writable_byte_span(buf), &received));
// Wait for the worker thread to say hello.
char hello[kHelloStringLength] = {};
pair[1].Receive(base::as_writable_byte_span(hello));
EXPECT_EQ(UNSAFE_TODO(strcmp(hello, kHelloString)), 0);
// Give the worker a chance to start Receive().
base::PlatformThread::YieldCurrentThread();
// Now shut down the socket that the thread is issuing a blocking read on
// which should cause Receive to return with an error.
pair[0].Shutdown();
worker.Stop();
EXPECT_EQ(0U, received);
}
// Tests that read is a blocking operation.
TEST_F(SyncSocketTest, BlockingReceiveTest) {
std::array<base::CancelableSyncSocket, 2> pair;
ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1]));
base::Thread worker("BlockingThread");
worker.Start();
// Try to do a blocking read from one of the sockets on the worker thread.
char buf[kHelloStringLength] = {};
size_t received = 1U; // Initialize to an unexpected value.
worker.task_runner()->PostTask(
FROM_HERE, base::BindOnce(&BlockingRead, &pair[0],
base::as_writable_byte_span(buf), &received));
// Wait for the worker thread to say hello.
char hello[kHelloStringLength] = {};
pair[1].Receive(base::as_writable_byte_span(hello));
EXPECT_EQ(0, UNSAFE_TODO(strcmp(hello, kHelloString)));
// Give the worker a chance to start Receive().
base::PlatformThread::YieldCurrentThread();
// Send a message to the socket on the blocking thead, it should free the
// socket from Receive().
auto bytes_to_send = base::as_byte_span(kHelloString);
pair[1].Send(bytes_to_send);
worker.Stop();
// Verify the socket has received the message.
EXPECT_TRUE(UNSAFE_TODO(strcmp(buf, kHelloString)) == 0);
EXPECT_EQ(received, bytes_to_send.size());
}
// Tests that the write operation is non-blocking and returns immediately
// when there is insufficient space in the socket's buffer.
TEST_F(SyncSocketTest, NonBlockingWriteTest) {
std::array<base::CancelableSyncSocket, 2> pair;
ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1]));
// Fill up the buffer for one of the socket, Send() should not block the
// thread even when the buffer is full.
auto bytes_to_send = base::as_byte_span(kHelloString);
while (pair[0].Send(bytes_to_send) != 0) {
}
// Data should be avialble on another socket.
size_t bytes_in_buffer = pair[1].Peek();
EXPECT_NE(bytes_in_buffer, 0U);
// No more data can be written to the buffer since socket has been full,
// verify that the amount of avialble data on another socket is unchanged.
EXPECT_EQ(pair[0].Send(bytes_to_send), 0U);
EXPECT_EQ(bytes_in_buffer, pair[1].Peek());
// Read from another socket to free some space for a new write.
char hello[kHelloStringLength] = {};
pair[1].Receive(base::as_writable_byte_span(hello));
// Should be able to write more data to the buffer now.
EXPECT_EQ(pair[0].Send(bytes_to_send), bytes_to_send.size());
}
} // namespace base