blob: c98ddf033544dd77d1ce1c31dc50421743d727fc [file] [log] [blame]
// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <cstdint>
#include <functional>
#include <memory>
#include <thread>
#include <utility>
#include <vector>
#include "reference_drivers/file_descriptor.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
#include "third_party/abseil-cpp/absl/types/span.h"
#include "util/ref_counted.h"
namespace ipcz::reference_drivers {
// A driver transport implementation backed by a Unix domain socket, suitable
// for use in a multiprocess POSIX testing environment.
class SocketTransport : public RefCounted<SocketTransport> {
using Pair = std::pair<Ref<SocketTransport>, Ref<SocketTransport>>;
struct Message {
absl::Span<const uint8_t> data;
absl::Span<FileDescriptor> descriptors;
// A header injected to prefix every message sent through this
// SocketTransport, used to frame each message.
struct Header {
// The number of bytes in the message, including this Header.
uint32_t num_bytes;
// The number of file descriptors in the message.
uint32_t num_descriptors;
explicit SocketTransport(FileDescriptor fd);
SocketTransport(const SocketTransport&) = delete;
SocketTransport& operator=(const SocketTransport&) = delete;
// Creates a new pair of entangled SocketTransport objects. For two transports
// X and Y, a Send(foo) on X will result in an equivalent message arriving on
// Y once Y is activated. The reverse is also true, since transports are
// bidirectional.
static Pair CreatePair();
// Indicates whether this SocketTransport has been activated yet.
bool has_been_activated() const { return has_been_activated_; }
// Spawns an internal I/O thread for this SocketTransport and uses it to
// monitor the underlying socket for incoming messages, errors, and other
// relevant events.
// The transport may invoke `message_handler` or `error_handler` at any time
// from the I/O thread to notify the client about messages or errors. This
// continutes until either an error is encountered or Deactivate() is called.
using MessageHandler = std::function<bool(Message)>;
using ErrorHandler = std::function<void()>;
void Activate(
MessageHandler message_handler = [](Message) { return true; },
ErrorHandler error_handler = [] {});
// Stops monitoring the underlying socket. Deactivation may complete
// asynchronously, and `shutdown_callback` is invoked when complete.
// Invocation may happen before this call returns if the I/O thread has
// already been terminated; otherwise the callback is invoked from the I/O
// thread just before terminating.
// NOTE: If Activate() has been called, this MUST be called before destroying
// the SocketTransport.
void Deactivate(std::function<void()> shutdown_callback);
// Sends the contents of `message` to the SocketTransport's peer,
// asynchronously. May be called from any thread.
// Returns true on success (including cases where the message is queued but
// not yet transmitted), or false on unrecoverable error.
bool Send(Message message);
// Takes ownership of the underlying socket descriptor. This is invalid to
// call on a SocketTransport which has already been activated, and doing so
// results in undefined behavior.
FileDescriptor TakeDescriptor();
friend class RefCounted<SocketTransport>;
// Attempts to send `message` without queueing.
// If `header` is non-empty, its contents are sent just before the contents of
// `message`.
// Returns the total number of bytes successfully sent, including any header
// bytes. If the sum of the size of `header` and `` is returned,
// then the full message was sent. If any smaller value is returned, including
// zero, then the message transmission was partially or fully blocked and the
// remainder will be queued internally by SocketTransport for later
// transmission. If null is returned, an unrecoverable error was encountered.
// This method is invoked by only one thread at a time.
absl::optional<size_t> TrySend(absl::Span<uint8_t> header, Message message);
// Static entry point for the I/O thread.
static void RunIOThreadForTransport(Ref<SocketTransport> transport);
// Runs the I/O loop for this SocketTransport. Called from a dedicated,
// internally managed thread. This method does not return until the underlying
// socket becomes unusable, some other unrecoverable error is encountered, or
// BeginShutdown() is invoked from any other thread.
void RunIOThread();
// Indicates whether there are any outgoing messages queued.
bool IsOutgoingQueueEmpty();
// Ensures that at least `num_bytes` bytes of storage capacity are available
// at the tail end of `data_buffer_`, and returns the span of all available
// storage there. If any data is written into this span by the caller, it must
// be committed with CommitRead() in order to persist it for eventual
// dispatch.
// NOTE: The returned value may be invalidated by any subsequent calls to
// EnsureReadCapacity() or TryDispatchMessages().
absl::Span<uint8_t> EnsureReadCapacity(size_t num_bytes);
// Commits data and file descriptors for subsequent dispatch. `num_bytes` is
// the number of bytes of data to commit starting from the front of the span
// most recently returned by EnsureReadCapacity().
void CommitRead(size_t num_bytes, std::vector<FileDescriptor> descriptors);
// Notifies the transport's client of an unrecoverable error condition. Must
// be called on the I/O thread.
void NotifyError();
// Must be called on the I/O thread any time the socket has
// received new data or file descriptors and committed them via CommitRead().
// This gives the SocketTransport an opportunity to parse any complete
// messages received and dispatch them to its client.
// Returns true if there were no complete messages to dispatch, or if all
// complete messages were dispatched successfully. Returns false if a
// malformed message was encountered or if any message dispatch was rejected
// by the client.
// NOTE: This call invalidates any value previously returned by
// EnsureReadCapacity().
bool TryDispatchMessages();
// Called when the underlying socket may be able to send queued outgoing
// messages again. This may call back into TrySend() to transmit any such
// queued mesages.
void TryFlushingOutgoingQueue();
// Ensures that the I/O loop wakes up for processing.
void WakeIOThread();
// Clears any signal from `signal_receiver_` so future polling on that FD will
// wait for a new signal.
void ClearIOThreadSignal();
// Indicates whether Activate() has been called on this transport yet.
bool has_been_activated_ = false;
// Background I/O thread used to monitor the underlying socket and dispatch
// incoming messages or errors.
absl::Mutex io_thread_mutex_;
std::unique_ptr<std::thread> io_thread_ ABSL_GUARDED_BY(io_thread_mutex_);
// Buffer to accumulate incoming data from the underlying socket. Note that a
// value of 64 kB for this constant was chosen arbitrarily.
static constexpr size_t kDefaultDataBufferSize = 64 * 1024;
std::vector<uint8_t> data_buffer_ =
// A subspan of `data_buffer_` covering all bytes occupied by received data
// which has not yet been dispatched to the client.
absl::Span<uint8_t> occupied_data_;
// Buffer to accumulate incoming file descriptors from the underlying socket.
static constexpr size_t kDefaultDescriptorBufferSize = 4;
std::vector<FileDescriptor> descriptor_buffer_ =
// A subspan of `descriptor_buffer_` covering all elements occupied by
// received file descriptors which have not yet been dispatched to the client.
absl::Span<FileDescriptor> occupied_descriptors_;
// Client handlers for incoming messages or errors, as provided to Activate().
MessageHandler message_handler_;
ErrorHandler error_handler_;
// If a Send() ever fails or only partially completes, SocketTransport copies
// and queues any unsent contents into a DeferredMessage to be transmitted
// ASAP once the underlying socket might no longer reject it.
struct DeferredMessage {
// Constructs a new DeferredMessage from optional header bytes and message
// contents. Data is copied into `data`, where `header` and `message` data
// are concatenated. Descriptors carried by `message` are moved into
// `descriptors`.
DeferredMessage(absl::Span<uint8_t> header, Message message);
DeferredMessage& operator=(DeferredMessage&&);
Message AsMessage();
bool sent_header = false;
std::vector<uint8_t> data;
std::vector<FileDescriptor> descriptors;
// The queue of outgoing messages; used only if a Send() is rejected by the
// underlying socket due to e.g. a full buffer.
absl::Mutex queue_mutex_;
std::vector<DeferredMessage> outgoing_queue_ ABSL_GUARDED_BY(queue_mutex_);
// The underlying socket this object uses for I/O.
FileDescriptor socket_;
// State used to wake the I/O thread for various reasons other than incoming
// messages.
absl::Mutex notify_mutex_;
bool is_io_thread_done_ ABSL_GUARDED_BY(notify_mutex_) = false;
std::function<void()> shutdown_callback_ ABSL_GUARDED_BY(notify_mutex_);
FileDescriptor signal_sender_;
FileDescriptor signal_receiver_;
} // namespace ipcz::reference_drivers