| // Copyright 2015 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/edk/system/message_pipe_dispatcher.h" |
| |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include <utility> |
| |
| #include "base/bind.h" |
| #include "base/debug/stack_trace.h" |
| #include "base/logging.h" |
| #include "base/message_loop/message_loop.h" |
| #include "mojo/edk/embedder/embedder_internal.h" |
| #include "mojo/edk/embedder/platform_handle_utils.h" |
| #include "mojo/edk/embedder/platform_shared_buffer.h" |
| #include "mojo/edk/embedder/platform_support.h" |
| #include "mojo/edk/system/broker.h" |
| #include "mojo/edk/system/configuration.h" |
| #include "mojo/edk/system/message_in_transit.h" |
| #include "mojo/edk/system/options_validation.h" |
| #include "mojo/edk/system/transport_data.h" |
| |
| namespace mojo { |
| namespace edk { |
| |
| namespace { |
| |
| const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); |
| |
| struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { |
| bool transferable; |
| bool write_error; |
| uint64_t pipe_id; // If transferable is false. |
| // The following members are only set if transferable is true. |
| // Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP |
| // was closed. |
| size_t platform_handle_index; |
| |
| size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
| uint32_t shared_memory_size; |
| |
| size_t serialized_read_buffer_size; |
| size_t serialized_write_buffer_size; |
| size_t serialized_message_queue_size; |
| |
| // These are the FDs required as part of serializing channel_ and |
| // message_queue_. This is only used on POSIX. |
| size_t serialized_fds_index; // (Or |kInvalidMessagePipeHandleIndex|.) |
| size_t serialized_read_fds_length; |
| size_t serialized_write_fds_length; |
| size_t serialized_message_fds_length; |
| }; |
| |
| char* SerializeBuffer(char* start, std::vector<char>* buffer) { |
| if (buffer->size()) |
| memcpy(start, &(*buffer)[0], buffer->size()); |
| return start + buffer->size(); |
| } |
| |
| bool GetHandle(size_t index, |
| PlatformHandleVector* platform_handles, |
| ScopedPlatformHandle* handle) { |
| if (index == kInvalidMessagePipeHandleIndex) |
| return true; |
| |
| if (!platform_handles || index >= platform_handles->size()) { |
| LOG(ERROR) |
| << "Invalid serialized message pipe dispatcher (missing handles)"; |
| return false; |
| } |
| |
| // We take ownership of the handle, so we have to invalidate the one in |
| // |platform_handles|. |
| handle->reset((*platform_handles)[index]); |
| (*platform_handles)[index] = PlatformHandle(); |
| return true; |
| } |
| |
| #if defined(OS_POSIX) |
| void ClosePlatformHandles(std::vector<int>* fds) { |
| for (size_t i = 0; i < fds->size(); ++i) |
| PlatformHandle((*fds)[i]).CloseIfNecessary(); |
| } |
| #endif |
| |
| } // namespace |
| |
| // MessagePipeDispatcher ------------------------------------------------------- |
| |
| const MojoCreateMessagePipeOptions |
| MessagePipeDispatcher::kDefaultCreateOptions = { |
| static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), |
| MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; |
| |
| MojoResult MessagePipeDispatcher::ValidateCreateOptions( |
| const MojoCreateMessagePipeOptions* in_options, |
| MojoCreateMessagePipeOptions* out_options) { |
| const MojoCreateMessagePipeOptionsFlags kKnownFlags = |
| MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE | |
| MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; |
| |
| *out_options = kDefaultCreateOptions; |
| if (!in_options) |
| return MOJO_RESULT_OK; |
| |
| UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); |
| if (!reader.is_valid()) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| |
| if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) |
| return MOJO_RESULT_OK; |
| if ((reader.options().flags & ~kKnownFlags)) |
| return MOJO_RESULT_UNIMPLEMENTED; |
| out_options->flags = reader.options().flags; |
| |
| // Checks for fields beyond |flags|: |
| |
| // (Nothing here yet.) |
| |
| return MOJO_RESULT_OK; |
| } |
| |
| void MessagePipeDispatcher::Init( |
| ScopedPlatformHandle message_pipe, |
| char* serialized_read_buffer, size_t serialized_read_buffer_size, |
| char* serialized_write_buffer, size_t serialized_write_buffer_size, |
| std::vector<int>* serialized_read_fds, |
| std::vector<int>* serialized_write_fds) { |
| CHECK(transferable_); |
| if (message_pipe.get().is_valid()) { |
| channel_ = RawChannel::Create(std::move(message_pipe)); |
| |
| // TODO(jam): It's probably cleaner to pass this in Init call. |
| channel_->SetSerializedData( |
| serialized_read_buffer, serialized_read_buffer_size, |
| serialized_write_buffer, serialized_write_buffer_size, |
| serialized_read_fds, serialized_write_fds); |
| internal::g_io_thread_task_runner->PostTask( |
| FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); |
| } |
| } |
| |
| void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) { |
| CHECK(!transferable_); |
| pipe_id_ = pipe_id; |
| } |
| |
| void MessagePipeDispatcher::InitOnIO() { |
| base::AutoLock locker(lock()); |
| CHECK(transferable_); |
| calling_init_ = true; |
| if (channel_) |
| channel_->Init(this); |
| calling_init_ = false; |
| } |
| |
| void MessagePipeDispatcher::CloseOnIO() { |
| base::AutoLock locker(lock()); |
| Release(); // To match CloseImplNoLock. |
| if (transferable_) { |
| if (channel_) { |
| channel_->Shutdown(); |
| channel_ = nullptr; |
| } |
| } else { |
| if (non_transferable_state_ == CONNECT_CALLED || |
| non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| RequestNontransferableChannel(); |
| |
| // We can't cancel the pending request yet, since the other side of the |
| // message pipe would want to get pending outgoing messages (if any) or |
| // at least know that this end was closed. So keep this object alive until |
| // then. |
| non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE; |
| AddRef(); |
| } else if (non_transferable_state_ == CONNECTED) { |
| internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| non_transferable_state_ = CLOSED; |
| channel_ = nullptr; |
| } |
| } |
| } |
| |
| Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| return Type::MESSAGE_PIPE; |
| } |
| |
| void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) { |
| base::AutoLock locker(lock()); |
| channel_ = channel; |
| while (!non_transferable_outgoing_message_queue_.IsEmpty()) { |
| channel_->WriteMessage( |
| non_transferable_outgoing_message_queue_.GetMessage()); |
| } |
| |
| if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) { |
| // We kept this object alive until it's connected, we can release it now. |
| internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| non_transferable_state_ = CLOSED; |
| channel_ = nullptr; |
| base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this); |
| } else { |
| non_transferable_state_ = CONNECTED; |
| } |
| } |
| |
| #if defined(OS_WIN) |
| // TODO(jam): this is copied from RawChannelWin till I figure out what's the |
| // best way we want to share this. |
| // Since this is used for serialization of messages read/written to a MP that |
| // aren't consumed by Mojo primitives yet, there could be an unbounded number of |
| // them when a MP is being sent. As a result, even for POSIX we will probably |
| // want to send the handles to the shell process and exchange them for tokens |
| // (since we can be sure that the shell will respond to our IPCs, compared to |
| // the other end where we're sending the MP to, which may not be reading...). |
| ScopedPlatformHandleVectorPtr GetReadPlatformHandles( |
| size_t num_platform_handles, |
| const void* platform_handle_table) { |
| ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector()); |
| rv->resize(num_platform_handles); |
| |
| const uint64_t* tokens = |
| static_cast<const uint64_t*>(platform_handle_table); |
| internal::g_broker->TokenToHandle(tokens, num_platform_handles, &rv->at(0)); |
| return rv.Pass(); |
| } |
| #endif |
| |
| scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( |
| const void* source, |
| size_t size, |
| PlatformHandleVector* platform_handles) { |
| if (size != sizeof(SerializedMessagePipeHandleDispatcher)) { |
| LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad size)"; |
| return nullptr; |
| } |
| |
| const SerializedMessagePipeHandleDispatcher* serialization = |
| static_cast<const SerializedMessagePipeHandleDispatcher*>(source); |
| |
| scoped_refptr<MessagePipeDispatcher> rv( |
| new MessagePipeDispatcher(serialization->transferable)); |
| if (!rv->transferable_) { |
| rv->InitNonTransferable(serialization->pipe_id); |
| return rv; |
| } |
| |
| if (serialization->shared_memory_size != |
| (serialization->serialized_read_buffer_size + |
| serialization->serialized_write_buffer_size + |
| serialization->serialized_message_queue_size)) { |
| LOG(ERROR) << "Invalid serialized message pipe dispatcher (bad struct)"; |
| return nullptr; |
| } |
| |
| ScopedPlatformHandle platform_handle, shared_memory_handle; |
| if (!GetHandle(serialization->platform_handle_index, |
| platform_handles, &platform_handle) || |
| !GetHandle(serialization->shared_memory_handle_index, |
| platform_handles, &shared_memory_handle)) { |
| return nullptr; |
| } |
| |
| char* serialized_read_buffer = nullptr; |
| size_t serialized_read_buffer_size = 0; |
| char* serialized_write_buffer = nullptr; |
| size_t serialized_write_buffer_size = 0; |
| char* message_queue_data = nullptr; |
| size_t message_queue_size = 0; |
| scoped_refptr<PlatformSharedBuffer> shared_buffer; |
| scoped_ptr<PlatformSharedBufferMapping> mapping; |
| if (shared_memory_handle.is_valid()) { |
| shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
| serialization->shared_memory_size, std::move(shared_memory_handle)); |
| mapping = shared_buffer->Map(0, serialization->shared_memory_size); |
| char* buffer = static_cast<char*>(mapping->GetBase()); |
| if (serialization->serialized_read_buffer_size) { |
| serialized_read_buffer = buffer; |
| serialized_read_buffer_size = serialization->serialized_read_buffer_size; |
| buffer += serialized_read_buffer_size; |
| } |
| if (serialization->serialized_write_buffer_size) { |
| serialized_write_buffer = buffer; |
| serialized_write_buffer_size = |
| serialization->serialized_write_buffer_size; |
| buffer += serialized_write_buffer_size; |
| } |
| if (serialization->serialized_message_queue_size) { |
| message_queue_data = buffer; |
| message_queue_size = serialization->serialized_message_queue_size; |
| buffer += message_queue_size; |
| } |
| } |
| |
| rv->write_error_ = serialization->write_error; |
| |
| std::vector<int> serialized_read_fds; |
| std::vector<int> serialized_write_fds; |
| #if defined(OS_POSIX) |
| std::vector<int> serialized_fds; |
| size_t serialized_fds_index = 0; |
| |
| size_t total_fd_count = serialization->serialized_read_fds_length + |
| serialization->serialized_write_fds_length + |
| serialization->serialized_message_fds_length; |
| for (size_t i = 0; i < total_fd_count; ++i) { |
| ScopedPlatformHandle handle; |
| if (!GetHandle(serialization->serialized_fds_index + i, platform_handles, |
| &handle)) { |
| ClosePlatformHandles(&serialized_fds); |
| return nullptr; |
| } |
| serialized_fds.push_back(handle.release().handle); |
| } |
| |
| serialized_read_fds.assign( |
| serialized_fds.begin(), |
| serialized_fds.begin() + serialization->serialized_read_fds_length); |
| serialized_fds_index += serialization->serialized_read_fds_length; |
| serialized_write_fds.assign( |
| serialized_fds.begin() + serialized_fds_index, |
| serialized_fds.begin() + serialized_fds_index + |
| serialization->serialized_write_fds_length); |
| serialized_fds_index += serialization->serialized_write_fds_length; |
| #endif |
| |
| while (message_queue_size) { |
| size_t message_size; |
| if (!MessageInTransit::GetNextMessageSize( |
| message_queue_data, message_queue_size, &message_size)) { |
| NOTREACHED() << "Couldn't read message size from serialized data."; |
| return nullptr; |
| } |
| if (message_size > message_queue_size) { |
| NOTREACHED() << "Invalid serialized message size."; |
| return nullptr; |
| } |
| MessageInTransit::View message_view(message_size, message_queue_data); |
| message_queue_size -= message_size; |
| message_queue_data += message_size; |
| |
| // TODO(jam): Copied below from RawChannelWin. See commment above |
| // GetReadPlatformHandles. |
| ScopedPlatformHandleVectorPtr temp_platform_handles; |
| if (message_view.transport_data_buffer()) { |
| size_t num_platform_handles; |
| const void* platform_handle_table; |
| TransportData::GetPlatformHandleTable( |
| message_view.transport_data_buffer(), &num_platform_handles, |
| &platform_handle_table); |
| |
| if (num_platform_handles > 0) { |
| #if defined(OS_WIN) |
| temp_platform_handles = |
| GetReadPlatformHandles(num_platform_handles, |
| platform_handle_table).Pass(); |
| if (!temp_platform_handles) { |
| LOG(ERROR) << "Invalid number of platform handles received"; |
| return nullptr; |
| } |
| #else |
| temp_platform_handles.reset(new PlatformHandleVector()); |
| for (size_t i = 0; i < num_platform_handles; ++i) |
| temp_platform_handles->push_back( |
| PlatformHandle(serialized_fds[serialized_fds_index++])); |
| #endif |
| } |
| } |
| |
| // TODO(jam): Copied below from RawChannelWin. See commment above |
| // GetReadPlatformHandles. |
| scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
| if (message_view.transport_data_buffer_size() > 0) { |
| DCHECK(message_view.transport_data_buffer()); |
| message->SetDispatchers(TransportData::DeserializeDispatchers( |
| message_view.transport_data_buffer(), |
| message_view.transport_data_buffer_size(), |
| std::move(temp_platform_handles))); |
| } |
| |
| rv->message_queue_.AddMessage(std::move(message)); |
| } |
| |
| rv->Init(std::move(platform_handle), serialized_read_buffer, |
| serialized_read_buffer_size, serialized_write_buffer, |
| serialized_write_buffer_size, &serialized_read_fds, |
| &serialized_write_fds); |
| |
| if (message_queue_size) { // Should be empty by now. |
| LOG(ERROR) << "Invalid queued messages"; |
| return nullptr; |
| } |
| |
| return rv; |
| } |
| |
| MessagePipeDispatcher::MessagePipeDispatcher(bool transferable) |
| : channel_(nullptr), |
| serialized_read_fds_length_(0u), |
| serialized_write_fds_length_(0u), |
| serialized_message_fds_length_(0u), |
| pipe_id_(0), |
| non_transferable_state_(WAITING_FOR_READ_OR_WRITE), |
| serialized_(false), |
| calling_init_(false), |
| write_error_(false), |
| transferable_(transferable) { |
| } |
| |
| MessagePipeDispatcher::~MessagePipeDispatcher() { |
| // |Close()|/|CloseImplNoLock()| should have taken care of the channel. The |
| // exception is if they posted a task to run CloseOnIO but the IO thread shut |
| // down and so when it was deleting pending tasks it caused the last reference |
| // to destruct this object. In that case, safe to destroy the channel. |
| if (channel_ && |
| internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) { |
| if (transferable_) { |
| channel_->Shutdown(); |
| } else { |
| internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| } |
| } else { |
| DCHECK(!channel_); |
| } |
| #if defined(OS_POSIX) |
| ClosePlatformHandles(&serialized_fds_); |
| #endif |
| } |
| |
| void MessagePipeDispatcher::CancelAllAwakablesNoLock() { |
| lock().AssertAcquired(); |
| awakable_list_.CancelAll(); |
| } |
| |
| void MessagePipeDispatcher::CloseImplNoLock() { |
| lock().AssertAcquired(); |
| // This early circuit fixes leak in unit tests. There's nothing to do in the |
| // posted task. |
| if (!transferable_ && non_transferable_state_ == CLOSED) |
| return; |
| |
| // We take a manual refcount because at shutdown, the task below might not get |
| // a chance to execute. If that happens, the RawChannel will still call our |
| // OnError method because it always runs (since it watches thread |
| // destruction). So to avoid UAF, manually add a reference and only release it |
| // if the task runs. |
| AddRef(); |
| internal::g_io_thread_task_runner->PostTask( |
| FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); |
| } |
| |
| void MessagePipeDispatcher::SerializeInternal() { |
| serialized_ = true; |
| if (!transferable_) { |
| CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| << "Non transferable message pipe being sent after read/write/waited. " |
| << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if " |
| << "the pipe can be sent after it's read or written. This message pipe " |
| << "was previously bound at:\n" |
| << non_transferable_bound_stack_->ToString(); |
| |
| non_transferable_state_ = SERIALISED; |
| return; |
| } |
| |
| // We need to stop watching handle immediately, even though not on IO thread, |
| // so that other messages aren't read after this. |
| std::vector<int> serialized_read_fds, serialized_write_fds; |
| if (channel_) { |
| bool write_error = false; |
| |
| serialized_platform_handle_ = channel_->ReleaseHandle( |
| &serialized_read_buffer_, &serialized_write_buffer_, |
| &serialized_read_fds, &serialized_write_fds, &write_error); |
| serialized_fds_.insert(serialized_fds_.end(), serialized_read_fds.begin(), |
| serialized_read_fds.end()); |
| serialized_read_fds_length_ = serialized_read_fds.size(); |
| serialized_fds_.insert(serialized_fds_.end(), serialized_write_fds.begin(), |
| serialized_write_fds.end()); |
| serialized_write_fds_length_ = serialized_write_fds.size(); |
| channel_ = nullptr; |
| } else { |
| // It's valid that the other side wrote some data and closed its end. |
| } |
| |
| DCHECK(serialized_message_queue_.empty()); |
| while (!message_queue_.IsEmpty()) { |
| scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); |
| |
| // When MojoWriteMessage is called, the MessageInTransit doesn't have |
| // dispatchers set and CreateEquivaent... is called since the dispatchers |
| // can be referenced by others. here dispatchers aren't referenced by |
| // others, but rawchannel can still call to them. so since we dont call |
| // createequiv, manually call TransportStarted and TransportEnd. |
| DispatcherVector dispatchers; |
| if (message->has_dispatchers()) |
| dispatchers = *message->dispatchers(); |
| for (size_t i = 0; i < dispatchers.size(); ++i) |
| dispatchers[i]->TransportStarted(); |
| |
| // TODO(jam): this handling for dispatchers only works on windows where we |
| // send transportdata as bytes instead of as parameters to sendmsg. |
| message->SerializeAndCloseDispatchers(); |
| // cont'd below |
| |
| size_t main_buffer_size = message->main_buffer_size(); |
| size_t transport_data_buffer_size = message->transport_data() ? |
| message->transport_data()->buffer_size() : 0; |
| |
| serialized_message_queue_.insert( |
| serialized_message_queue_.end(), |
| static_cast<const char*>(message->main_buffer()), |
| static_cast<const char*>(message->main_buffer()) + main_buffer_size); |
| |
| // cont'd |
| if (transport_data_buffer_size != 0) { |
| // TODO(jam): copied from RawChannelWin::WriteNoLock( |
| PlatformHandleVector* all_platform_handles = |
| message->transport_data()->platform_handles(); |
| if (all_platform_handles) { |
| #if defined(OS_WIN) |
| uint64_t* tokens = reinterpret_cast<uint64_t*>( |
| static_cast<char*>(message->transport_data()->buffer()) + |
| message->transport_data()->platform_handle_table_offset()); |
| internal::g_broker->HandleToToken( |
| &all_platform_handles->at(0), all_platform_handles->size(), tokens); |
| for (size_t i = 0; i < all_platform_handles->size(); i++) |
| all_platform_handles->at(i) = PlatformHandle(); |
| #else |
| for (size_t i = 0; i < all_platform_handles->size(); i++) { |
| serialized_fds_.push_back(all_platform_handles->at(i).handle); |
| serialized_message_fds_length_++; |
| all_platform_handles->at(i) = PlatformHandle(); |
| } |
| #endif |
| } |
| |
| serialized_message_queue_.insert( |
| serialized_message_queue_.end(), |
| static_cast<const char*>(message->transport_data()->buffer()), |
| static_cast<const char*>(message->transport_data()->buffer()) + |
| transport_data_buffer_size); |
| } |
| |
| for (size_t i = 0; i < dispatchers.size(); ++i) |
| dispatchers[i]->TransportEnded(); |
| } |
| } |
| |
| scoped_refptr<Dispatcher> |
| MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| lock().AssertAcquired(); |
| |
| SerializeInternal(); |
| |
| scoped_refptr<MessagePipeDispatcher> rv( |
| new MessagePipeDispatcher(transferable_)); |
| rv->serialized_ = true; |
| if (transferable_) { |
| rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); |
| serialized_message_queue_.swap(rv->serialized_message_queue_); |
| serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| serialized_write_buffer_.swap(rv->serialized_write_buffer_); |
| serialized_fds_.swap(rv->serialized_fds_); |
| rv->serialized_read_fds_length_ = serialized_read_fds_length_; |
| rv->serialized_write_fds_length_ = serialized_write_fds_length_; |
| rv->serialized_message_fds_length_ = serialized_message_fds_length_; |
| rv->write_error_ = write_error_; |
| } else { |
| rv->pipe_id_ = pipe_id_; |
| rv->non_transferable_state_ = non_transferable_state_; |
| } |
| return rv; |
| } |
| |
| MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( |
| const void* bytes, |
| uint32_t num_bytes, |
| std::vector<DispatcherTransport>* transports, |
| MojoWriteMessageFlags flags) { |
| lock().AssertAcquired(); |
| |
| DCHECK(!transports || |
| (transports->size() > 0 && |
| transports->size() <= GetConfiguration().max_message_num_handles)); |
| |
| if (write_error_ || |
| (transferable_ && !channel_) || |
| (!transferable_ && non_transferable_state_ == CLOSED)) { |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| } |
| |
| if (num_bytes > GetConfiguration().max_message_num_bytes) |
| return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| MessageInTransit::Type::MESSAGE, num_bytes, bytes)); |
| if (transports) { |
| MojoResult result = AttachTransportsNoLock(message.get(), transports); |
| if (result != MOJO_RESULT_OK) |
| return result; |
| } |
| |
| message->SerializeAndCloseDispatchers(); |
| if (!transferable_) |
| message->set_route_id(pipe_id_); |
| if (!transferable_ && |
| (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE || |
| non_transferable_state_ == CONNECT_CALLED)) { |
| if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) |
| RequestNontransferableChannel(); |
| non_transferable_outgoing_message_queue_.AddMessage(std::move(message)); |
| } else { |
| channel_->WriteMessage(std::move(message)); |
| } |
| |
| return MOJO_RESULT_OK; |
| } |
| |
| MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( |
| void* bytes, |
| uint32_t* num_bytes, |
| DispatcherVector* dispatchers, |
| uint32_t* num_dispatchers, |
| MojoReadMessageFlags flags) { |
| lock().AssertAcquired(); |
| if (transferable_ && channel_) { |
| channel_->EnsureLazyInitialized(); |
| } else if (!transferable_) { |
| if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| RequestNontransferableChannel(); |
| return MOJO_RESULT_SHOULD_WAIT; |
| } else if (non_transferable_state_ == CONNECT_CALLED) { |
| return MOJO_RESULT_SHOULD_WAIT; |
| } |
| } |
| |
| DCHECK(!dispatchers || dispatchers->empty()); |
| |
| const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; |
| const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
| |
| if (message_queue_.IsEmpty()) |
| return channel_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; |
| |
| // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
| // and release the lock immediately. |
| bool enough_space = true; |
| MessageInTransit* message = message_queue_.PeekMessage(); |
| if (num_bytes) |
| *num_bytes = message->num_bytes(); |
| if (message->num_bytes() <= max_bytes) |
| memcpy(bytes, message->bytes(), message->num_bytes()); |
| else |
| enough_space = false; |
| |
| if (DispatcherVector* queued_dispatchers = message->dispatchers()) { |
| if (num_dispatchers) |
| *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); |
| if (enough_space) { |
| if (queued_dispatchers->empty()) { |
| // Nothing to do. |
| } else if (queued_dispatchers->size() <= max_num_dispatchers) { |
| DCHECK(dispatchers); |
| dispatchers->swap(*queued_dispatchers); |
| } else { |
| enough_space = false; |
| } |
| } |
| } else { |
| if (num_dispatchers) |
| *num_dispatchers = 0; |
| } |
| |
| message = nullptr; |
| |
| if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
| message_queue_.DiscardMessage(); |
| |
| // Now it's empty, thus no longer readable. |
| if (message_queue_.IsEmpty()) { |
| // It's currently not possible to wait for non-readability, but we should |
| // do the state change anyway. |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| } |
| } |
| |
| if (!enough_space) |
| return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| |
| return MOJO_RESULT_OK; |
| } |
| |
| HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() |
| const { |
| lock().AssertAcquired(); |
| |
| HandleSignalsState rv; |
| if (!message_queue_.IsEmpty()) |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| if (!message_queue_.IsEmpty() || |
| (transferable_ && channel_) || |
| (!transferable_ && non_transferable_state_ != CLOSED)) |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| if (!write_error_ && |
| ((transferable_ && channel_) || |
| (!transferable_ && non_transferable_state_ != CLOSED))) { |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| } |
| if (write_error_ || |
| (transferable_ && !channel_) || |
| (!transferable_ && |
| ((non_transferable_state_ == CLOSED) || is_closed()))) { |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| } |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| return rv; |
| } |
| |
| MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( |
| Awakable* awakable, |
| MojoHandleSignals signals, |
| uintptr_t context, |
| HandleSignalsState* signals_state) { |
| lock().AssertAcquired(); |
| if (transferable_ && channel_) { |
| channel_->EnsureLazyInitialized(); |
| } else if (!transferable_ && |
| non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) { |
| RequestNontransferableChannel(); |
| } |
| |
| HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| if (state.satisfies(signals)) { |
| if (signals_state) |
| *signals_state = state; |
| return MOJO_RESULT_ALREADY_EXISTS; |
| } |
| if (!state.can_satisfy(signals)) { |
| if (signals_state) |
| *signals_state = state; |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| } |
| |
| awakable_list_.Add(awakable, signals, context); |
| return MOJO_RESULT_OK; |
| } |
| |
| void MessagePipeDispatcher::RemoveAwakableImplNoLock( |
| Awakable* awakable, |
| HandleSignalsState* signals_state) { |
| lock().AssertAcquired(); |
| |
| awakable_list_.Remove(awakable); |
| if (signals_state) |
| *signals_state = GetHandleSignalsStateImplNoLock(); |
| } |
| |
| void MessagePipeDispatcher::StartSerializeImplNoLock( |
| size_t* max_size, |
| size_t* max_platform_handles) { |
| if (!serialized_) |
| SerializeInternal(); |
| |
| *max_platform_handles = 0; |
| if (serialized_platform_handle_.is_valid()) |
| (*max_platform_handles)++; |
| if (!serialized_read_buffer_.empty() || |
| !serialized_write_buffer_.empty() || |
| !serialized_message_queue_.empty()) |
| (*max_platform_handles)++; |
| *max_platform_handles += serialized_fds_.size(); |
| *max_size = sizeof(SerializedMessagePipeHandleDispatcher); |
| } |
| |
| bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( |
| void* destination, |
| size_t* actual_size, |
| PlatformHandleVector* platform_handles) { |
| CloseImplNoLock(); |
| SerializedMessagePipeHandleDispatcher* serialization = |
| static_cast<SerializedMessagePipeHandleDispatcher*>(destination); |
| serialization->transferable = transferable_; |
| serialization->pipe_id = pipe_id_; |
| if (serialized_platform_handle_.is_valid()) { |
| serialization->platform_handle_index = platform_handles->size(); |
| platform_handles->push_back(serialized_platform_handle_.release()); |
| } else { |
| serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; |
| } |
| |
| serialization->write_error = write_error_; |
| serialization->serialized_read_buffer_size = serialized_read_buffer_.size(); |
| serialization->serialized_write_buffer_size = serialized_write_buffer_.size(); |
| serialization->serialized_message_queue_size = |
| serialized_message_queue_.size(); |
| |
| serialization->shared_memory_size = static_cast<uint32_t>( |
| serialization->serialized_read_buffer_size + |
| serialization->serialized_write_buffer_size + |
| serialization->serialized_message_queue_size); |
| if (serialization->shared_memory_size) { |
| scoped_refptr<PlatformSharedBuffer> shared_buffer( |
| internal::g_platform_support->CreateSharedBuffer( |
| serialization->shared_memory_size)); |
| scoped_ptr<PlatformSharedBufferMapping> mapping( |
| shared_buffer->Map(0, serialization->shared_memory_size)); |
| char* start = static_cast<char*>(mapping->GetBase()); |
| start = SerializeBuffer(start, &serialized_read_buffer_); |
| start = SerializeBuffer(start, &serialized_write_buffer_); |
| start = SerializeBuffer(start, &serialized_message_queue_); |
| |
| serialization->shared_memory_handle_index = platform_handles->size(); |
| platform_handles->push_back(shared_buffer->PassPlatformHandle().release()); |
| } else { |
| serialization->shared_memory_handle_index = kInvalidMessagePipeHandleIndex; |
| } |
| |
| serialization->serialized_read_fds_length = serialized_read_fds_length_; |
| serialization->serialized_write_fds_length = serialized_write_fds_length_; |
| serialization->serialized_message_fds_length = serialized_message_fds_length_; |
| if (serialized_fds_.empty()) { |
| serialization->serialized_fds_index = kInvalidMessagePipeHandleIndex; |
| } else { |
| #if defined(OS_POSIX) |
| serialization->serialized_fds_index = platform_handles->size(); |
| for (size_t i = 0; i < serialized_fds_.size(); ++i) |
| platform_handles->push_back(PlatformHandle(serialized_fds_[i])); |
| serialized_fds_.clear(); |
| #endif |
| } |
| |
| *actual_size = sizeof(SerializedMessagePipeHandleDispatcher); |
| return true; |
| } |
| |
| void MessagePipeDispatcher::TransportStarted() { |
| started_transport_.Acquire(); |
| } |
| |
| void MessagePipeDispatcher::TransportEnded() { |
| started_transport_.Release(); |
| |
| base::AutoLock locker(lock()); |
| |
| // If transporting of MPD failed, we might have got more data and didn't |
| // awake for. |
| // TODO(jam): should we care about only alerting if it was empty before |
| // TransportStarted? |
| if (!message_queue_.IsEmpty()) |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| } |
| |
| void MessagePipeDispatcher::OnReadMessage( |
| const MessageInTransit::View& message_view, |
| ScopedPlatformHandleVectorPtr platform_handles) { |
| scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); |
| if (message_view.transport_data_buffer_size() > 0) { |
| DCHECK(message_view.transport_data_buffer()); |
| message->SetDispatchers(TransportData::DeserializeDispatchers( |
| message_view.transport_data_buffer(), |
| message_view.transport_data_buffer_size(), |
| std::move(platform_handles))); |
| } |
| |
| if (started_transport_.Try()) { |
| // we're not in the middle of being sent |
| |
| // Can get synchronously called back in Init if there was initial data. |
| scoped_ptr<base::AutoLock> locker; |
| if (!calling_init_) { |
| locker.reset(new base::AutoLock(lock())); |
| } |
| |
| bool was_empty = message_queue_.IsEmpty(); |
| message_queue_.AddMessage(std::move(message)); |
| if (was_empty) |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| |
| started_transport_.Release(); |
| } else { |
| // If RawChannel is calling OnRead, that means it has its read_lock_ |
| // acquired. That means StartSerialize can't be accessing message queue as |
| // it waits on ReleaseHandle first which acquires readlock_. |
| message_queue_.AddMessage(std::move(message)); |
| } |
| } |
| |
| void MessagePipeDispatcher::OnError(Error error) { |
| // If there's a read error, then the other side of the pipe is closed. By |
| // definition, we can't write since there's no one to read it. And we can't |
| // read anymore, since we just got a read erorr. So we close the pipe. |
| // If there's a write error, then we stop writing. But we keep the pipe open |
| // until we finish reading everything in it. This is because it's valid for |
| // one endpoint to write some data and close their pipe immediately. Even |
| // though the other end can't write anymore, it should still get all the data. |
| switch (error) { |
| case ERROR_READ_SHUTDOWN: |
| // The other side was cleanly closed, so this isn't actually an error. |
| DVLOG(1) << "MessagePipeDispatcher read error (shutdown)"; |
| break; |
| case ERROR_READ_BROKEN: |
| LOG(ERROR) << "MessagePipeDispatcher read error (connection broken)"; |
| break; |
| case ERROR_READ_BAD_MESSAGE: |
| // Receiving a bad message means either a bug, data corruption, or |
| // malicious attack (probably due to some other bug). |
| LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)"; |
| break; |
| case ERROR_READ_UNKNOWN: |
| LOG(ERROR) << "MessagePipeDispatcher read error (unknown)"; |
| break; |
| case ERROR_WRITE: |
| // Write errors are slightly notable: they probably shouldn't happen under |
| // normal operation (but maybe the other side crashed). |
| LOG(WARNING) << "MessagePipeDispatcher write error"; |
| DCHECK_EQ(write_error_, false) << "Should only get one write error."; |
| write_error_ = true; |
| break; |
| } |
| |
| if (started_transport_.Try()) { |
| base::AutoLock locker(lock()); |
| // We can get two OnError callbacks before the post task below completes. |
| // Although RawChannel still has a pointer to this object until Shutdown is |
| // called, that is safe since this class always does a PostTask to the IO |
| // thread to self destruct. |
| if (channel_ && error != ERROR_WRITE) { |
| if (transferable_) { |
| channel_->Shutdown(); |
| } else { |
| CHECK_NE(non_transferable_state_, CLOSED); |
| internal::g_broker->CloseMessagePipe(pipe_id_, this); |
| non_transferable_state_ = CLOSED; |
| } |
| channel_ = nullptr; |
| } |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| started_transport_.Release(); |
| } else { |
| // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| } |
| } |
| |
| MojoResult MessagePipeDispatcher::AttachTransportsNoLock( |
| MessageInTransit* message, |
| std::vector<DispatcherTransport>* transports) { |
| DCHECK(!message->has_dispatchers()); |
| |
| // You're not allowed to send either handle to a message pipe over the message |
| // pipe, so check for this. (The case of trying to write a handle to itself is |
| // taken care of by |Core|. That case kind of makes sense, but leads to |
| // complications if, e.g., both sides try to do the same thing with their |
| // respective handles simultaneously. The other case, of trying to write the |
| // peer handle to a handle, doesn't make sense -- since no handle will be |
| // available to read the message from.) |
| for (size_t i = 0; i < transports->size(); i++) { |
| if (!(*transports)[i].is_valid()) |
| continue; |
| if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { |
| MessagePipeDispatcher* mp = |
| static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); |
| if (transferable_ && mp->transferable_ && |
| channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { |
| // The other case should have been disallowed by |Core|. (Note: |port| |
| // is the peer port of the handle given to |WriteMessage()|.) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| } else if (!transferable_ && !mp->transferable_ && |
| pipe_id_ == mp->pipe_id_) { |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| } |
| } |
| } |
| |
| // Clone the dispatchers and attach them to the message. (This must be done as |
| // a separate loop, since we want to leave the dispatchers alone on failure.) |
| scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); |
| dispatchers->reserve(transports->size()); |
| for (size_t i = 0; i < transports->size(); i++) { |
| if ((*transports)[i].is_valid()) { |
| dispatchers->push_back( |
| (*transports)[i].CreateEquivalentDispatcherAndClose()); |
| } else { |
| LOG(WARNING) << "Enqueueing null dispatcher"; |
| dispatchers->push_back(nullptr); |
| } |
| } |
| message->SetDispatchers(std::move(dispatchers)); |
| return MOJO_RESULT_OK; |
| } |
| |
| void MessagePipeDispatcher::RequestNontransferableChannel() { |
| lock().AssertAcquired(); |
| CHECK(!transferable_); |
| CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE); |
| non_transferable_state_ = CONNECT_CALLED; |
| #if !defined(OFFICIAL_BUILD) |
| non_transferable_bound_stack_.reset(new base::debug::StackTrace); |
| #endif |
| |
| // PostTask since the broker can call us back synchronously. |
| internal::g_io_thread_task_runner->PostTask( |
| FROM_HERE, |
| base::Bind(&Broker::ConnectMessagePipe, |
| base::Unretained(internal::g_broker), pipe_id_, |
| base::Unretained(this))); |
| } |
| |
| } // namespace edk |
| } // namespace mojo |