|  | // Copyright 2013 The Chromium Authors | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "base/sync_socket.h" | 
|  |  | 
|  | #include <stddef.h> | 
|  | #include <stdint.h> | 
|  |  | 
|  | #include <array> | 
|  |  | 
|  |  | 
|  | #include "base/compiler_specific.h" | 
|  | #include "base/containers/span.h" | 
|  | #include "base/functional/bind.h" | 
|  | #include "base/location.h" | 
|  | #include "base/memory/raw_ptr.h" | 
|  | #include "base/synchronization/waitable_event.h" | 
|  | #include "base/threading/platform_thread.h" | 
|  | #include "base/threading/simple_thread.h" | 
|  | #include "base/threading/thread.h" | 
|  | #include "base/time/time.h" | 
|  | #include "build/build_config.h" | 
|  | #include "testing/gtest/include/gtest/gtest.h" | 
|  |  | 
|  | #if BUILDFLAG(IS_POSIX) || BUILDFLAG(IS_FUCHSIA) | 
|  | #include "base/file_descriptor_posix.h" | 
|  | #endif | 
|  |  | 
|  | namespace base { | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | constexpr TimeDelta kReceiveTimeout = base::Milliseconds(750); | 
|  |  | 
|  | class HangingReceiveThread : public DelegateSimpleThread::Delegate { | 
|  | public: | 
|  | explicit HangingReceiveThread(SyncSocket* socket, bool with_timeout) | 
|  | : socket_(socket), | 
|  | thread_(this, "HangingReceiveThread"), | 
|  | with_timeout_(with_timeout), | 
|  | started_event_(WaitableEvent::ResetPolicy::MANUAL, | 
|  | WaitableEvent::InitialState::NOT_SIGNALED), | 
|  | done_event_(WaitableEvent::ResetPolicy::MANUAL, | 
|  | WaitableEvent::InitialState::NOT_SIGNALED) { | 
|  | thread_.Start(); | 
|  | } | 
|  |  | 
|  | HangingReceiveThread(const HangingReceiveThread&) = delete; | 
|  | HangingReceiveThread& operator=(const HangingReceiveThread&) = delete; | 
|  | ~HangingReceiveThread() override = default; | 
|  |  | 
|  | void Run() override { | 
|  | int data = 0; | 
|  | ASSERT_EQ(socket_->Peek(), 0u); | 
|  |  | 
|  | started_event_.Signal(); | 
|  |  | 
|  | if (with_timeout_) { | 
|  | ASSERT_EQ(0u, socket_->ReceiveWithTimeout(byte_span_from_ref(data), | 
|  | kReceiveTimeout)); | 
|  | } else { | 
|  | ASSERT_EQ(0u, socket_->Receive(byte_span_from_ref(data))); | 
|  | } | 
|  |  | 
|  | done_event_.Signal(); | 
|  | } | 
|  |  | 
|  | void Stop() { thread_.Join(); } | 
|  |  | 
|  | WaitableEvent* started_event() { return &started_event_; } | 
|  | WaitableEvent* done_event() { return &done_event_; } | 
|  |  | 
|  | private: | 
|  | raw_ptr<SyncSocket> socket_; | 
|  | DelegateSimpleThread thread_; | 
|  | bool with_timeout_; | 
|  | WaitableEvent started_event_; | 
|  | WaitableEvent done_event_; | 
|  | }; | 
|  |  | 
|  | // Tests sending data between two SyncSockets. Uses ASSERT() and thus will exit | 
|  | // early upon failure.  Callers should use ASSERT_NO_FATAL_FAILURE() if testing | 
|  | // continues after return. | 
|  | void SendReceivePeek(SyncSocket* socket_a, SyncSocket* socket_b) { | 
|  | int received = 0; | 
|  | const int kSending = 123; | 
|  | static_assert(sizeof(kSending) == sizeof(received), "invalid data size"); | 
|  |  | 
|  | ASSERT_EQ(0u, socket_a->Peek()); | 
|  | ASSERT_EQ(0u, socket_b->Peek()); | 
|  |  | 
|  | // Verify |socket_a| can send to |socket_a| and |socket_a| can Receive from | 
|  | // |socket_a|. | 
|  | ASSERT_EQ(sizeof(kSending), socket_a->Send(byte_span_from_ref(kSending))); | 
|  | ASSERT_EQ(sizeof(kSending), socket_b->Peek()); | 
|  | ASSERT_EQ(sizeof(kSending), socket_b->Receive(byte_span_from_ref(received))); | 
|  | ASSERT_EQ(kSending, received); | 
|  |  | 
|  | ASSERT_EQ(0u, socket_a->Peek()); | 
|  | ASSERT_EQ(0u, socket_b->Peek()); | 
|  |  | 
|  | // Now verify the reverse. | 
|  | received = 0; | 
|  | ASSERT_EQ(sizeof(kSending), socket_b->Send(byte_span_from_ref(kSending))); | 
|  | ASSERT_EQ(sizeof(kSending), socket_a->Peek()); | 
|  | ASSERT_EQ(sizeof(kSending), socket_a->Receive(byte_span_from_ref(received))); | 
|  | ASSERT_EQ(kSending, received); | 
|  |  | 
|  | ASSERT_EQ(0u, socket_a->Peek()); | 
|  | ASSERT_EQ(0u, socket_b->Peek()); | 
|  |  | 
|  | socket_a->Close(); | 
|  | socket_b->Close(); | 
|  | } | 
|  |  | 
|  | const char kHelloString[] = "Hello, SyncSocket Client"; | 
|  | const size_t kHelloStringLength = std::size(kHelloString); | 
|  |  | 
|  | // A blocking read operation that will block the thread until it receives | 
|  | // |buffer|'s length bytes of packets or Shutdown() is called on another thread. | 
|  | static void BlockingRead(base::SyncSocket* socket, | 
|  | base::span<uint8_t> buffer, | 
|  | size_t* received) { | 
|  | // Notify the parent thread that we're up and running. | 
|  | socket->Send(base::as_byte_span(kHelloString)); | 
|  | *received = socket->Receive(buffer); | 
|  | } | 
|  |  | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | class SyncSocketTest : public testing::Test { | 
|  | public: | 
|  | void SetUp() override { | 
|  | ASSERT_TRUE(SyncSocket::CreatePair(&socket_a_, &socket_b_)); | 
|  | } | 
|  |  | 
|  | protected: | 
|  | SyncSocket socket_a_; | 
|  | SyncSocket socket_b_; | 
|  | }; | 
|  |  | 
|  | TEST_F(SyncSocketTest, NormalSendReceivePeek) { | 
|  | SendReceivePeek(&socket_a_, &socket_b_); | 
|  | } | 
|  |  | 
|  | TEST_F(SyncSocketTest, ClonedSendReceivePeek) { | 
|  | SyncSocket socket_c(socket_a_.Release()); | 
|  | SyncSocket socket_d(socket_b_.Release()); | 
|  | SendReceivePeek(&socket_c, &socket_d); | 
|  | } | 
|  |  | 
|  | class CancelableSyncSocketTest : public testing::Test { | 
|  | public: | 
|  | void SetUp() override { | 
|  | ASSERT_TRUE(CancelableSyncSocket::CreatePair(&socket_a_, &socket_b_)); | 
|  | } | 
|  |  | 
|  | protected: | 
|  | CancelableSyncSocket socket_a_; | 
|  | CancelableSyncSocket socket_b_; | 
|  | }; | 
|  |  | 
|  | TEST_F(CancelableSyncSocketTest, NormalSendReceivePeek) { | 
|  | SendReceivePeek(&socket_a_, &socket_b_); | 
|  | } | 
|  |  | 
|  | TEST_F(CancelableSyncSocketTest, ClonedSendReceivePeek) { | 
|  | CancelableSyncSocket socket_c(socket_a_.Release()); | 
|  | CancelableSyncSocket socket_d(socket_b_.Release()); | 
|  | SendReceivePeek(&socket_c, &socket_d); | 
|  | } | 
|  |  | 
|  | // TODO(https://crbug.com/361250560): Flaky on mac. | 
|  | #if BUILDFLAG(IS_MAC) | 
|  | #define MAYBE_ShutdownCancelsReceive DISABLED_ShutdownCancelsReceive | 
|  | #else | 
|  | #define MAYBE_ShutdownCancelsReceive ShutdownCancelsReceive | 
|  | #endif | 
|  | TEST_F(CancelableSyncSocketTest, MAYBE_ShutdownCancelsReceive) { | 
|  | HangingReceiveThread thread(&socket_b_, /* with_timeout = */ false); | 
|  |  | 
|  | // Wait for the thread to be started. Note that this doesn't guarantee that | 
|  | // Receive() is called before Shutdown(). | 
|  | thread.started_event()->Wait(); | 
|  |  | 
|  | EXPECT_TRUE(socket_b_.Shutdown()); | 
|  | EXPECT_TRUE(thread.done_event()->TimedWait(kReceiveTimeout)); | 
|  |  | 
|  | thread.Stop(); | 
|  | } | 
|  |  | 
|  | TEST_F(CancelableSyncSocketTest, ShutdownCancelsReceiveWithTimeout) { | 
|  | HangingReceiveThread thread(&socket_b_, /* with_timeout = */ true); | 
|  |  | 
|  | // Wait for the thread to be started. Note that this doesn't guarantee that | 
|  | // Receive() is called before Shutdown(). | 
|  | thread.started_event()->Wait(); | 
|  |  | 
|  | EXPECT_TRUE(socket_b_.Shutdown()); | 
|  | EXPECT_TRUE(thread.done_event()->TimedWait(kReceiveTimeout)); | 
|  |  | 
|  | thread.Stop(); | 
|  | } | 
|  |  | 
|  | TEST_F(CancelableSyncSocketTest, ReceiveAfterShutdown) { | 
|  | socket_a_.Shutdown(); | 
|  | int data = 0; | 
|  | EXPECT_EQ(0u, socket_a_.Receive(byte_span_from_ref(data))); | 
|  | } | 
|  |  | 
|  | TEST_F(CancelableSyncSocketTest, ReceiveWithTimeoutAfterShutdown) { | 
|  | socket_a_.Shutdown(); | 
|  | TimeTicks start = TimeTicks::Now(); | 
|  | int data = 0; | 
|  | EXPECT_EQ(0u, socket_a_.ReceiveWithTimeout(byte_span_from_ref(data), | 
|  | kReceiveTimeout)); | 
|  |  | 
|  | // Ensure the receive didn't just timeout. | 
|  | EXPECT_LT(TimeTicks::Now() - start, kReceiveTimeout); | 
|  | } | 
|  |  | 
|  | // Tests that we can safely end a blocking Receive operation on one thread | 
|  | // from another thread by disconnecting (but not closing) the socket. | 
|  | TEST_F(SyncSocketTest, DisconnectTest) { | 
|  | std::array<base::CancelableSyncSocket, 2> pair; | 
|  | ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); | 
|  |  | 
|  | base::Thread worker("BlockingThread"); | 
|  | worker.Start(); | 
|  |  | 
|  | // Try to do a blocking read from one of the sockets on the worker thread. | 
|  | char buf[0xff]; | 
|  | size_t received = 1U;  // Initialize to an unexpected value. | 
|  | worker.task_runner()->PostTask( | 
|  | FROM_HERE, base::BindOnce(&BlockingRead, &pair[0], | 
|  | base::as_writable_byte_span(buf), &received)); | 
|  |  | 
|  | // Wait for the worker thread to say hello. | 
|  | char hello[kHelloStringLength] = {}; | 
|  | pair[1].Receive(base::as_writable_byte_span(hello)); | 
|  | EXPECT_EQ(UNSAFE_TODO(strcmp(hello, kHelloString)), 0); | 
|  | // Give the worker a chance to start Receive(). | 
|  | base::PlatformThread::YieldCurrentThread(); | 
|  |  | 
|  | // Now shut down the socket that the thread is issuing a blocking read on | 
|  | // which should cause Receive to return with an error. | 
|  | pair[0].Shutdown(); | 
|  |  | 
|  | worker.Stop(); | 
|  |  | 
|  | EXPECT_EQ(0U, received); | 
|  | } | 
|  |  | 
|  | // Tests that read is a blocking operation. | 
|  | TEST_F(SyncSocketTest, BlockingReceiveTest) { | 
|  | std::array<base::CancelableSyncSocket, 2> pair; | 
|  | ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); | 
|  |  | 
|  | base::Thread worker("BlockingThread"); | 
|  | worker.Start(); | 
|  |  | 
|  | // Try to do a blocking read from one of the sockets on the worker thread. | 
|  | char buf[kHelloStringLength] = {}; | 
|  | size_t received = 1U;  // Initialize to an unexpected value. | 
|  | worker.task_runner()->PostTask( | 
|  | FROM_HERE, base::BindOnce(&BlockingRead, &pair[0], | 
|  | base::as_writable_byte_span(buf), &received)); | 
|  |  | 
|  | // Wait for the worker thread to say hello. | 
|  | char hello[kHelloStringLength] = {}; | 
|  | pair[1].Receive(base::as_writable_byte_span(hello)); | 
|  | EXPECT_EQ(0, UNSAFE_TODO(strcmp(hello, kHelloString))); | 
|  | // Give the worker a chance to start Receive(). | 
|  | base::PlatformThread::YieldCurrentThread(); | 
|  |  | 
|  | // Send a message to the socket on the blocking thead, it should free the | 
|  | // socket from Receive(). | 
|  | auto bytes_to_send = base::as_byte_span(kHelloString); | 
|  | pair[1].Send(bytes_to_send); | 
|  | worker.Stop(); | 
|  |  | 
|  | // Verify the socket has received the message. | 
|  | EXPECT_TRUE(UNSAFE_TODO(strcmp(buf, kHelloString)) == 0); | 
|  | EXPECT_EQ(received, bytes_to_send.size()); | 
|  | } | 
|  |  | 
|  | // Tests that the write operation is non-blocking and returns immediately | 
|  | // when there is insufficient space in the socket's buffer. | 
|  | TEST_F(SyncSocketTest, NonBlockingWriteTest) { | 
|  | std::array<base::CancelableSyncSocket, 2> pair; | 
|  | ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); | 
|  |  | 
|  | // Fill up the buffer for one of the socket, Send() should not block the | 
|  | // thread even when the buffer is full. | 
|  | auto bytes_to_send = base::as_byte_span(kHelloString); | 
|  | while (pair[0].Send(bytes_to_send) != 0) { | 
|  | } | 
|  |  | 
|  | // Data should be avialble on another socket. | 
|  | size_t bytes_in_buffer = pair[1].Peek(); | 
|  | EXPECT_NE(bytes_in_buffer, 0U); | 
|  |  | 
|  | // No more data can be written to the buffer since socket has been full, | 
|  | // verify that the amount of avialble data on another socket is unchanged. | 
|  | EXPECT_EQ(pair[0].Send(bytes_to_send), 0U); | 
|  | EXPECT_EQ(bytes_in_buffer, pair[1].Peek()); | 
|  |  | 
|  | // Read from another socket to free some space for a new write. | 
|  | char hello[kHelloStringLength] = {}; | 
|  | pair[1].Receive(base::as_writable_byte_span(hello)); | 
|  |  | 
|  | // Should be able to write more data to the buffer now. | 
|  | EXPECT_EQ(pair[0].Send(bytes_to_send), bytes_to_send.size()); | 
|  | } | 
|  |  | 
|  | }  // namespace base |