| // Copyright 2017 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/core/channel.h" |
| |
| #include <lib/fdio/limits.h> |
| #include <lib/fdio/util.h> |
| #include <lib/zx/channel.h> |
| #include <lib/zx/handle.h> |
| #include <zircon/processargs.h> |
| #include <zircon/status.h> |
| #include <zircon/syscalls.h> |
| #include <algorithm> |
| |
| #include "base/bind.h" |
| #include "base/containers/circular_deque.h" |
| #include "base/files/scoped_file.h" |
| #include "base/fuchsia/fuchsia_logging.h" |
| #include "base/location.h" |
| #include "base/macros.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/message_loop/message_loop_current.h" |
| #include "base/message_loop/message_pump_for_io.h" |
| #include "base/stl_util.h" |
| #include "base/synchronization/lock.h" |
| #include "base/task_runner.h" |
| #include "mojo/core/platform_handle_in_transit.h" |
| |
| namespace mojo { |
| namespace core { |
| |
| namespace { |
| |
| const size_t kMaxBatchReadCapacity = 256 * 1024; |
| |
| bool UnwrapPlatformHandle(PlatformHandleInTransit handle, |
| Channel::Message::HandleInfoEntry* info_out, |
| std::vector<PlatformHandleInTransit>* handles_out) { |
| DCHECK(handle.handle().is_valid()); |
| |
| if (!handle.handle().is_valid_fd()) { |
| *info_out = {0u, 0u}; |
| handles_out->emplace_back(std::move(handle)); |
| return true; |
| } |
| |
| // Each FDIO file descriptor is implemented using one or more native resources |
| // and can be un-wrapped into a set of |handle| and |info| pairs, with |info| |
| // consisting of an FDIO-defined type & arguments (see zircon/processargs.h). |
| // |
| // We try to transfer the FD, but if that fails (for example if the file has |
| // already been dup()d into another FD) we may need to clone. |
| zx_handle_t handles[FDIO_MAX_HANDLES] = {}; |
| uint32_t info[FDIO_MAX_HANDLES] = {}; |
| zx_status_t result = |
| fdio_transfer_fd(handle.handle().GetFD().get(), 0, handles, info); |
| if (result > 0) { |
| // On success, the fd in |handle| has been transferred and is no longer |
| // valid. Release from the PlatformHandle to avoid close()ing an invalid |
| // an invalid handle. |
| handle.CompleteTransit(); |
| } else if (result == ZX_ERR_UNAVAILABLE) { |
| // No luck, try cloning instead. |
| result = fdio_clone_fd(handle.handle().GetFD().get(), 0, handles, info); |
| } |
| |
| if (result <= 0) { |
| ZX_DLOG(ERROR, result) << "fdio_transfer_fd(" |
| << handle.handle().GetFD().get() << ")"; |
| return false; |
| } |
| DCHECK_LE(result, FDIO_MAX_HANDLES); |
| |
| // We assume here that only the |PA_HND_TYPE| of the |info| really matters, |
| // and that that is the same for all the underlying handles. |
| *info_out = {PA_HND_TYPE(info[0]), result}; |
| for (int i = 0; i < result; ++i) { |
| DCHECK_EQ(PA_HND_TYPE(info[0]), PA_HND_TYPE(info[i])); |
| DCHECK_EQ(0u, PA_HND_SUBTYPE(info[i])); |
| handles_out->emplace_back( |
| PlatformHandleInTransit(PlatformHandle(zx::handle(handles[i])))); |
| } |
| |
| return true; |
| } |
| |
| PlatformHandle WrapPlatformHandles(Channel::Message::HandleInfoEntry info, |
| base::circular_deque<zx::handle>* handles) { |
| PlatformHandle out_handle; |
| if (!info.type) { |
| out_handle = PlatformHandle(std::move(handles->front())); |
| handles->pop_front(); |
| } else { |
| if (info.count > FDIO_MAX_HANDLES) |
| return PlatformHandle(); |
| |
| // Fetch the required number of handles from |handles| and set up type info. |
| zx_handle_t fd_handles[FDIO_MAX_HANDLES] = {}; |
| uint32_t fd_infos[FDIO_MAX_HANDLES] = {}; |
| for (int i = 0; i < info.count; ++i) { |
| fd_handles[i] = (*handles)[i].get(); |
| fd_infos[i] = PA_HND(info.type, 0); |
| } |
| |
| // Try to wrap the handles into an FDIO file descriptor. |
| base::ScopedFD out_fd; |
| zx_status_t result = |
| fdio_create_fd(fd_handles, fd_infos, info.count, out_fd.receive()); |
| if (result != ZX_OK) { |
| ZX_DLOG(ERROR, result) << "fdio_create_fd"; |
| return PlatformHandle(); |
| } |
| |
| // The handles are owned by FDIO now, so |release()| them before removing |
| // the entries from |handles|. |
| for (int i = 0; i < info.count; ++i) { |
| ignore_result(handles->front().release()); |
| handles->pop_front(); |
| } |
| |
| out_handle = PlatformHandle(std::move(out_fd)); |
| } |
| return out_handle; |
| } |
| |
| // A view over a Channel::Message object. The write queue uses these since |
| // large messages may need to be sent in chunks. |
| class MessageView { |
| public: |
| // Owns |message|. |offset| indexes the first unsent byte in the message. |
| MessageView(Channel::MessagePtr message, size_t offset) |
| : message_(std::move(message)), |
| offset_(offset), |
| handles_(message_->TakeHandlesForTransport()) { |
| DCHECK_GT(message_->data_num_bytes(), offset_); |
| } |
| |
| MessageView(MessageView&& other) { *this = std::move(other); } |
| |
| MessageView& operator=(MessageView&& other) { |
| message_ = std::move(other.message_); |
| offset_ = other.offset_; |
| handles_ = std::move(other.handles_); |
| return *this; |
| } |
| |
| ~MessageView() {} |
| |
| const void* data() const { |
| return static_cast<const char*>(message_->data()) + offset_; |
| } |
| |
| size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } |
| |
| size_t data_offset() const { return offset_; } |
| void advance_data_offset(size_t num_bytes) { |
| DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); |
| offset_ += num_bytes; |
| } |
| |
| std::vector<PlatformHandleInTransit> TakeHandles() { |
| if (handles_.empty()) |
| return std::vector<PlatformHandleInTransit>(); |
| |
| // We can only pass Fuchsia handles via IPC, so unwrap any FDIO file- |
| // descriptors in |handles_| into the underlying handles, and serialize the |
| // metadata, if any, into the extra header. |
| auto* handles_info = reinterpret_cast<Channel::Message::HandleInfoEntry*>( |
| message_->mutable_extra_header()); |
| memset(handles_info, 0, message_->extra_header_size()); |
| |
| std::vector<PlatformHandleInTransit> in_handles = std::move(handles_); |
| handles_.reserve(in_handles.size()); |
| for (size_t i = 0; i < in_handles.size(); i++) { |
| if (!UnwrapPlatformHandle(std::move(in_handles[i]), &handles_info[i], |
| &handles_)) |
| return std::vector<PlatformHandleInTransit>(); |
| } |
| return std::move(handles_); |
| } |
| |
| private: |
| Channel::MessagePtr message_; |
| size_t offset_; |
| std::vector<PlatformHandleInTransit> handles_; |
| |
| DISALLOW_COPY_AND_ASSIGN(MessageView); |
| }; |
| |
| class ChannelFuchsia : public Channel, |
| public base::MessageLoopCurrent::DestructionObserver, |
| public base::MessagePumpForIO::ZxHandleWatcher { |
| public: |
| ChannelFuchsia(Delegate* delegate, |
| ConnectionParams connection_params, |
| scoped_refptr<base::TaskRunner> io_task_runner) |
| : Channel(delegate), |
| self_(this), |
| handle_( |
| connection_params.TakeEndpoint().TakePlatformHandle().TakeHandle()), |
| io_task_runner_(io_task_runner) { |
| CHECK(handle_.is_valid()); |
| } |
| |
| void Start() override { |
| if (io_task_runner_->RunsTasksInCurrentSequence()) { |
| StartOnIOThread(); |
| } else { |
| io_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&ChannelFuchsia::StartOnIOThread, this)); |
| } |
| } |
| |
| void ShutDownImpl() override { |
| // Always shut down asynchronously when called through the public interface. |
| io_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&ChannelFuchsia::ShutDownOnIOThread, this)); |
| } |
| |
| void Write(MessagePtr message) override { |
| bool write_error = false; |
| { |
| base::AutoLock lock(write_lock_); |
| if (reject_writes_) |
| return; |
| if (!WriteNoLock(MessageView(std::move(message), 0))) |
| reject_writes_ = write_error = true; |
| } |
| if (write_error) { |
| // Do not synchronously invoke OnWriteError(). Write() may have been |
| // called by the delegate and we don't want to re-enter it. |
| io_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&ChannelFuchsia::OnWriteError, this, |
| Error::kDisconnected)); |
| } |
| } |
| |
| void LeakHandle() override { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| leak_handle_ = true; |
| } |
| |
| bool GetReadPlatformHandles(const void* payload, |
| size_t payload_size, |
| size_t num_handles, |
| const void* extra_header, |
| size_t extra_header_size, |
| std::vector<PlatformHandle>* handles, |
| bool* deferred) override { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| if (num_handles > std::numeric_limits<uint16_t>::max()) |
| return false; |
| |
| // Locate the handle info and verify there is enough of it. |
| if (!extra_header) |
| return false; |
| const auto* handles_info = |
| reinterpret_cast<const Channel::Message::HandleInfoEntry*>( |
| extra_header); |
| size_t handles_info_size = sizeof(handles_info[0]) * num_handles; |
| if (handles_info_size > extra_header_size) |
| return false; |
| |
| // Some caller-supplied handles may be FDIO file-descriptors, which were |
| // un-wrapped to more than one native platform resource handle for transfer. |
| // We may therefore need to expect more than |num_handles| handles to have |
| // been accumulated in |incoming_handles_|, based on the handle info. |
| size_t num_raw_handles = 0u; |
| for (size_t i = 0; i < num_handles; ++i) |
| num_raw_handles += handles_info[i].type ? handles_info[i].count : 1; |
| |
| // If there are too few handles then we're not ready yet, so return true |
| // indicating things are OK, but leave |handles| empty. |
| if (incoming_handles_.size() < num_raw_handles) |
| return true; |
| |
| handles->reserve(num_handles); |
| for (size_t i = 0; i < num_handles; ++i) { |
| handles->emplace_back( |
| WrapPlatformHandles(handles_info[i], &incoming_handles_)); |
| } |
| return true; |
| } |
| |
| private: |
| ~ChannelFuchsia() override { DCHECK(!read_watch_); } |
| |
| void StartOnIOThread() { |
| DCHECK(!read_watch_); |
| |
| base::MessageLoopCurrent::Get()->AddDestructionObserver(this); |
| |
| read_watch_.reset( |
| new base::MessagePumpForIO::ZxHandleWatchController(FROM_HERE)); |
| base::MessageLoopCurrentForIO::Get()->WatchZxHandle( |
| handle_.get(), true /* persistent */, |
| ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, read_watch_.get(), this); |
| } |
| |
| void ShutDownOnIOThread() { |
| base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this); |
| |
| read_watch_.reset(); |
| if (leak_handle_) |
| ignore_result(handle_.release()); |
| handle_.reset(); |
| |
| // May destroy the |this| if it was the last reference. |
| self_ = nullptr; |
| } |
| |
| // base::MessageLoopCurrent::DestructionObserver: |
| void WillDestroyCurrentMessageLoop() override { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| if (self_) |
| ShutDownOnIOThread(); |
| } |
| |
| // base::MessagePumpForIO::ZxHandleWatcher: |
| void OnZxHandleSignalled(zx_handle_t handle, zx_signals_t signals) override { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| CHECK_EQ(handle, handle_.get()); |
| DCHECK((ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED) & signals); |
| |
| // We always try to read message(s), even if ZX_CHANNEL_PEER_CLOSED, since |
| // the peer may have closed while messages were still unread, in the pipe. |
| |
| bool validation_error = false; |
| bool read_error = false; |
| size_t next_read_size = 0; |
| size_t buffer_capacity = 0; |
| size_t total_bytes_read = 0; |
| do { |
| buffer_capacity = next_read_size; |
| char* buffer = GetReadBuffer(&buffer_capacity); |
| DCHECK_GT(buffer_capacity, 0u); |
| |
| uint32_t bytes_read = 0; |
| uint32_t handles_read = 0; |
| zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {}; |
| |
| zx_status_t read_result = |
| handle_.read(0, buffer, buffer_capacity, &bytes_read, handles, |
| base::size(handles), &handles_read); |
| if (read_result == ZX_OK) { |
| for (size_t i = 0; i < handles_read; ++i) { |
| incoming_handles_.emplace_back(handles[i]); |
| } |
| total_bytes_read += bytes_read; |
| if (!OnReadComplete(bytes_read, &next_read_size)) { |
| read_error = true; |
| validation_error = true; |
| break; |
| } |
| } else if (read_result == ZX_ERR_BUFFER_TOO_SMALL) { |
| DCHECK_LE(handles_read, base::size(handles)); |
| next_read_size = bytes_read; |
| } else if (read_result == ZX_ERR_SHOULD_WAIT) { |
| break; |
| } else { |
| ZX_DLOG_IF(ERROR, read_result != ZX_ERR_PEER_CLOSED, read_result) |
| << "zx_channel_read"; |
| read_error = true; |
| break; |
| } |
| } while (total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0); |
| if (read_error) { |
| // Stop receiving read notifications. |
| read_watch_.reset(); |
| if (validation_error) |
| OnError(Error::kReceivedMalformedData); |
| else |
| OnError(Error::kDisconnected); |
| } |
| } |
| |
| // Attempts to write a message directly to the channel. If the full message |
| // cannot be written, it's queued and a wait is initiated to write the message |
| // ASAP on the I/O thread. |
| bool WriteNoLock(MessageView message_view) { |
| uint32_t write_bytes = 0; |
| do { |
| message_view.advance_data_offset(write_bytes); |
| |
| std::vector<PlatformHandleInTransit> outgoing_handles = |
| message_view.TakeHandles(); |
| zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {}; |
| size_t handles_count = outgoing_handles.size(); |
| |
| DCHECK_LE(handles_count, base::size(handles)); |
| for (size_t i = 0; i < handles_count; ++i) { |
| DCHECK(outgoing_handles[i].handle().is_valid()); |
| handles[i] = outgoing_handles[i].handle().GetHandle().get(); |
| } |
| |
| write_bytes = std::min(message_view.data_num_bytes(), |
| static_cast<size_t>(ZX_CHANNEL_MAX_MSG_BYTES)); |
| zx_status_t result = handle_.write(0, message_view.data(), write_bytes, |
| handles, handles_count); |
| // zx_channel_write() consumes |handles| whether or not it succeeds, so |
| // release() our copies now, to avoid them being double-closed. |
| for (auto& outgoing_handle : outgoing_handles) |
| outgoing_handle.CompleteTransit(); |
| |
| if (result != ZX_OK) { |
| // TODO(fuchsia): Handle ZX_ERR_SHOULD_WAIT flow-control errors, once |
| // the platform starts generating them. See https://crbug.com/754084. |
| ZX_DLOG_IF(ERROR, result != ZX_ERR_PEER_CLOSED, result) |
| << "WriteNoLock(zx_channel_write)"; |
| return false; |
| } |
| |
| } while (write_bytes < message_view.data_num_bytes()); |
| |
| return true; |
| } |
| |
| void OnWriteError(Error error) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| DCHECK(reject_writes_); |
| |
| if (error == Error::kDisconnected) { |
| // If we can't write because the pipe is disconnected then continue |
| // reading to fetch any in-flight messages, relying on end-of-stream to |
| // signal the actual disconnection. |
| if (read_watch_) { |
| // TODO: When we add flow-control for writes, we also need to reset the |
| // write-watcher here. |
| return; |
| } |
| } |
| |
| OnError(error); |
| } |
| |
| // Keeps the Channel alive at least until explicit shutdown on the IO thread. |
| scoped_refptr<Channel> self_; |
| |
| zx::channel handle_; |
| scoped_refptr<base::TaskRunner> io_task_runner_; |
| |
| // These members are only used on the IO thread. |
| std::unique_ptr<base::MessagePumpForIO::ZxHandleWatchController> read_watch_; |
| base::circular_deque<zx::handle> incoming_handles_; |
| bool leak_handle_ = false; |
| |
| base::Lock write_lock_; |
| bool reject_writes_ = false; |
| |
| DISALLOW_COPY_AND_ASSIGN(ChannelFuchsia); |
| }; |
| |
| } // namespace |
| |
| // static |
| scoped_refptr<Channel> Channel::Create( |
| Delegate* delegate, |
| ConnectionParams connection_params, |
| scoped_refptr<base::TaskRunner> io_task_runner) { |
| return new ChannelFuchsia(delegate, std::move(connection_params), |
| std::move(io_task_runner)); |
| } |
| |
| } // namespace core |
| } // namespace mojo |