| // Copyright 2016 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/core/node_channel.h" |
| |
| #include <cstring> |
| #include <limits> |
| #include <sstream> |
| |
| #include "base/bind.h" |
| #include "base/location.h" |
| #include "base/logging.h" |
| #include "base/memory/ptr_util.h" |
| #include "mojo/core/broker_host.h" |
| #include "mojo/core/channel.h" |
| #include "mojo/core/configuration.h" |
| #include "mojo/core/core.h" |
| #include "mojo/core/request_context.h" |
| |
| namespace mojo { |
| namespace core { |
| |
| namespace { |
| |
| // NOTE: Please ONLY append messages to the end of this enum. |
| enum class MessageType : uint32_t { |
| ACCEPT_INVITEE, |
| ACCEPT_INVITATION, |
| ADD_BROKER_CLIENT, |
| BROKER_CLIENT_ADDED, |
| ACCEPT_BROKER_CLIENT, |
| EVENT_MESSAGE, |
| REQUEST_PORT_MERGE, |
| REQUEST_INTRODUCTION, |
| INTRODUCE, |
| #if defined(OS_WIN) |
| RELAY_EVENT_MESSAGE, |
| #endif |
| BROADCAST_EVENT, |
| #if defined(OS_WIN) |
| EVENT_MESSAGE_FROM_RELAY, |
| #endif |
| ACCEPT_PEER, |
| BIND_BROKER_HOST, |
| }; |
| |
| #pragma pack(push, 1) |
| |
| struct alignas(8) Header { |
| MessageType type; |
| }; |
| |
| static_assert(IsAlignedForChannelMessage(sizeof(Header)), |
| "Invalid header size."); |
| |
| struct alignas(8) AcceptInviteeDataV0 { |
| ports::NodeName inviter_name; |
| ports::NodeName token; |
| }; |
| |
| using AcceptInviteeData = AcceptInviteeDataV0; |
| |
| struct alignas(8) AcceptInvitationDataV0 { |
| ports::NodeName token; |
| ports::NodeName invitee_name; |
| }; |
| |
| using AcceptInvitationData = AcceptInvitationDataV0; |
| |
| struct alignas(8) AcceptPeerDataV0 { |
| ports::NodeName token; |
| ports::NodeName peer_name; |
| ports::PortName port_name; |
| }; |
| |
| using AcceptPeerData = AcceptPeerDataV0; |
| |
| // This message may include a process handle on platforms that require it. |
| struct alignas(8) AddBrokerClientDataV0 { |
| ports::NodeName client_name; |
| #if !defined(OS_WIN) |
| uint32_t process_handle = 0; |
| #endif |
| }; |
| |
| using AddBrokerClientData = AddBrokerClientDataV0; |
| |
| #if !defined(OS_WIN) |
| static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t), |
| "Unexpected pid size"); |
| static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0, |
| "Invalid AddBrokerClientData size."); |
| #endif |
| |
| // This data is followed by a platform channel handle to the broker. |
| struct alignas(8) BrokerClientAddedDataV0 { |
| ports::NodeName client_name; |
| }; |
| |
| using BrokerClientAddedData = BrokerClientAddedDataV0; |
| |
| // This data may be followed by a platform channel handle to the broker. If not, |
| // then the inviter is the broker and its channel should be used as such. |
| struct alignas(8) AcceptBrokerClientDataV0 { |
| ports::NodeName broker_name; |
| }; |
| |
| using AcceptBrokerClientData = AcceptBrokerClientDataV0; |
| |
| // This is followed by arbitrary payload data which is interpreted as a token |
| // string for port location. |
| // NOTE: Because this field is variable length it cannot be versioned. |
| struct alignas(8) RequestPortMergeData { |
| ports::PortName connector_port_name; |
| }; |
| |
| // Used for both REQUEST_INTRODUCTION and INTRODUCE. |
| // |
| // For INTRODUCE the message also includes a valid platform handle for a channel |
| // the receiver may use to communicate with the named node directly, or an |
| // invalid platform handle if the node is unknown to the sender or otherwise |
| // cannot be introduced. |
| struct alignas(8) IntroductionDataV0 { |
| ports::NodeName name; |
| }; |
| |
| using IntroductionData = IntroductionDataV0; |
| |
| // This message is just a PlatformHandle. The data struct alignas(8) here has |
| // only a padding field to ensure an aligned, non-zero-length payload. |
| struct alignas(8) BindBrokerHostDataV0 {}; |
| |
| using BindBrokerHostData = BindBrokerHostDataV0; |
| |
| #if defined(OS_WIN) |
| // This struct alignas(8) is followed by the full payload of a message to be |
| // relayed. |
| // NOTE: Because this field is variable length it cannot be versioned. |
| struct alignas(8) RelayEventMessageData { |
| ports::NodeName destination; |
| }; |
| |
| // This struct alignas(8) is followed by the full payload of a relayed message. |
| struct alignas(8) EventMessageFromRelayDataV0 { |
| ports::NodeName source; |
| }; |
| |
| using EventMessageFromRelayData = EventMessageFromRelayDataV0; |
| |
| #endif |
| |
| #pragma pack(pop) |
| |
| Channel::MessagePtr CreateMessage(MessageType type, |
| size_t payload_size, |
| size_t num_handles, |
| void** out_data, |
| size_t capacity = 0) { |
| const size_t total_size = payload_size + sizeof(Header); |
| if (capacity == 0) |
| capacity = total_size; |
| else |
| capacity = std::max(total_size, capacity); |
| auto message = |
| std::make_unique<Channel::Message>(capacity, total_size, num_handles); |
| Header* header = reinterpret_cast<Header*>(message->mutable_payload()); |
| |
| // Make sure any header padding gets zeroed. |
| memset(header, 0, sizeof(Header)); |
| header->type = type; |
| |
| // The out_data starts beyond the header. |
| *out_data = reinterpret_cast<void*>(header + 1); |
| return message; |
| } |
| |
| template <typename DataType> |
| Channel::MessagePtr CreateMessage(MessageType type, |
| size_t payload_size, |
| size_t num_handles, |
| DataType** out_data, |
| size_t capacity = 0) { |
| auto msg_ptr = CreateMessage(type, payload_size, num_handles, |
| reinterpret_cast<void**>(out_data), capacity); |
| |
| // Since we know the type let's make sure any padding areas are zeroed. |
| memset(*out_data, 0, sizeof(DataType)); |
| |
| return msg_ptr; |
| } |
| |
| // This method takes a second template argument which is another datatype which |
| // represents the smallest size this payload can be to be considered valid this |
| // MUST be used when there is more than one version of a message to specify the |
| // oldest version of the message. |
| template <typename DataType, typename MinSizedDataType> |
| bool GetMessagePayloadMinimumSized(const void* bytes, |
| size_t num_bytes, |
| DataType* out_data) { |
| static_assert(sizeof(DataType) > 0, "DataType must have non-zero size."); |
| if (num_bytes < sizeof(Header) + sizeof(MinSizedDataType)) |
| return false; |
| |
| // Always make sure that the full object is zeored and default constructed as |
| // we may not have the complete type. The default construction allows fields |
| // to be default initialized to be resilient to older message versions. |
| memset(out_data, 0, sizeof(*out_data)); |
| new (out_data) DataType; |
| |
| // Overwrite any fields we received. |
| memcpy(out_data, static_cast<const uint8_t*>(bytes) + sizeof(Header), |
| std::min(sizeof(DataType), num_bytes - sizeof(Header))); |
| return true; |
| } |
| |
| template <typename DataType> |
| bool GetMessagePayload(const void* bytes, |
| size_t num_bytes, |
| DataType* out_data) { |
| return GetMessagePayloadMinimumSized<DataType, DataType>(bytes, num_bytes, |
| out_data); |
| } |
| |
| } // namespace |
| |
| // static |
| scoped_refptr<NodeChannel> NodeChannel::Create( |
| Delegate* delegate, |
| ConnectionParams connection_params, |
| Channel::HandlePolicy channel_handle_policy, |
| scoped_refptr<base::SingleThreadTaskRunner> io_task_runner, |
| const ProcessErrorCallback& process_error_callback) { |
| #if defined(OS_NACL_SFI) |
| LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI"; |
| return nullptr; |
| #else |
| return new NodeChannel(delegate, std::move(connection_params), |
| channel_handle_policy, io_task_runner, |
| process_error_callback); |
| #endif |
| } |
| |
| // static |
| Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity, |
| size_t payload_size, |
| void** payload, |
| size_t num_handles) { |
| return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles, |
| payload, capacity); |
| } |
| |
| // static |
| void NodeChannel::GetEventMessageData(Channel::Message* message, |
| void** data, |
| size_t* num_data_bytes) { |
| // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message |
| // with a payload of fewer than |sizeof(Header)| bytes. |
| *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1; |
| *num_data_bytes = message->payload_size() - sizeof(Header); |
| } |
| |
| void NodeChannel::Start() { |
| base::AutoLock lock(channel_lock_); |
| // ShutDown() may have already been called, in which case |channel_| is null. |
| if (channel_) |
| channel_->Start(); |
| } |
| |
| void NodeChannel::ShutDown() { |
| base::AutoLock lock(channel_lock_); |
| if (channel_) { |
| channel_->ShutDown(); |
| channel_ = nullptr; |
| } |
| } |
| |
| void NodeChannel::LeakHandleOnShutdown() { |
| base::AutoLock lock(channel_lock_); |
| if (channel_) { |
| channel_->LeakHandle(); |
| } |
| } |
| |
| void NodeChannel::NotifyBadMessage(const std::string& error) { |
| DCHECK(HasBadMessageHandler()); |
| process_error_callback_.Run("Received bad user message: " + error); |
| } |
| |
| void NodeChannel::SetRemoteProcessHandle(base::Process process_handle) { |
| DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); |
| { |
| base::AutoLock lock(channel_lock_); |
| if (channel_) |
| channel_->set_remote_process(process_handle.Duplicate()); |
| } |
| base::AutoLock lock(remote_process_handle_lock_); |
| DCHECK(!remote_process_handle_.IsValid()); |
| CHECK_NE(remote_process_handle_.Handle(), base::GetCurrentProcessHandle()); |
| remote_process_handle_ = std::move(process_handle); |
| } |
| |
| bool NodeChannel::HasRemoteProcessHandle() { |
| base::AutoLock lock(remote_process_handle_lock_); |
| return remote_process_handle_.IsValid(); |
| } |
| |
| base::Process NodeChannel::CloneRemoteProcessHandle() { |
| base::AutoLock lock(remote_process_handle_lock_); |
| return remote_process_handle_.Duplicate(); |
| } |
| |
| void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) { |
| DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); |
| remote_node_name_ = name; |
| } |
| |
| void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name, |
| const ports::NodeName& token) { |
| AcceptInviteeData* data; |
| Channel::MessagePtr message = CreateMessage( |
| MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data); |
| data->inviter_name = inviter_name; |
| data->token = token; |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::AcceptInvitation(const ports::NodeName& token, |
| const ports::NodeName& invitee_name) { |
| AcceptInvitationData* data; |
| Channel::MessagePtr message = CreateMessage( |
| MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data); |
| data->token = token; |
| data->invitee_name = invitee_name; |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::AcceptPeer(const ports::NodeName& sender_name, |
| const ports::NodeName& token, |
| const ports::PortName& port_name) { |
| AcceptPeerData* data; |
| Channel::MessagePtr message = |
| CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data); |
| data->token = token; |
| data->peer_name = sender_name; |
| data->port_name = port_name; |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::AddBrokerClient(const ports::NodeName& client_name, |
| base::Process process_handle) { |
| AddBrokerClientData* data; |
| std::vector<PlatformHandle> handles; |
| #if defined(OS_WIN) |
| handles.emplace_back(base::win::ScopedHandle(process_handle.Release())); |
| #endif |
| Channel::MessagePtr message = |
| CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData), |
| handles.size(), &data); |
| message->SetHandles(std::move(handles)); |
| data->client_name = client_name; |
| #if !defined(OS_WIN) |
| data->process_handle = process_handle.Handle(); |
| #endif |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name, |
| PlatformHandle broker_channel) { |
| BrokerClientAddedData* data; |
| std::vector<PlatformHandle> handles; |
| if (broker_channel.is_valid()) |
| handles.emplace_back(std::move(broker_channel)); |
| Channel::MessagePtr message = |
| CreateMessage(MessageType::BROKER_CLIENT_ADDED, |
| sizeof(BrokerClientAddedData), handles.size(), &data); |
| message->SetHandles(std::move(handles)); |
| data->client_name = client_name; |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name, |
| PlatformHandle broker_channel) { |
| AcceptBrokerClientData* data; |
| std::vector<PlatformHandle> handles; |
| if (broker_channel.is_valid()) |
| handles.emplace_back(std::move(broker_channel)); |
| Channel::MessagePtr message = |
| CreateMessage(MessageType::ACCEPT_BROKER_CLIENT, |
| sizeof(AcceptBrokerClientData), handles.size(), &data); |
| message->SetHandles(std::move(handles)); |
| data->broker_name = broker_name; |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name, |
| const std::string& token) { |
| RequestPortMergeData* data; |
| Channel::MessagePtr message = |
| CreateMessage(MessageType::REQUEST_PORT_MERGE, |
| sizeof(RequestPortMergeData) + token.size(), 0, &data); |
| data->connector_port_name = connector_port_name; |
| memcpy(data + 1, token.data(), token.size()); |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::RequestIntroduction(const ports::NodeName& name) { |
| IntroductionData* data; |
| Channel::MessagePtr message = CreateMessage( |
| MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data); |
| data->name = name; |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::Introduce(const ports::NodeName& name, |
| PlatformHandle channel_handle) { |
| IntroductionData* data; |
| std::vector<PlatformHandle> handles; |
| if (channel_handle.is_valid()) |
| handles.emplace_back(std::move(channel_handle)); |
| Channel::MessagePtr message = CreateMessage( |
| MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data); |
| message->SetHandles(std::move(handles)); |
| data->name = name; |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::SendChannelMessage(Channel::MessagePtr message) { |
| WriteChannelMessage(std::move(message)); |
| } |
| |
| void NodeChannel::Broadcast(Channel::MessagePtr message) { |
| DCHECK(!message->has_handles()); |
| void* data; |
| Channel::MessagePtr broadcast_message = CreateMessage( |
| MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data); |
| memcpy(data, message->data(), message->data_num_bytes()); |
| WriteChannelMessage(std::move(broadcast_message)); |
| } |
| |
| void NodeChannel::BindBrokerHost(PlatformHandle broker_host_handle) { |
| #if !defined(OS_APPLE) && !defined(OS_NACL) && !defined(OS_FUCHSIA) |
| DCHECK(broker_host_handle.is_valid()); |
| BindBrokerHostData* data; |
| std::vector<PlatformHandle> handles; |
| handles.push_back(std::move(broker_host_handle)); |
| Channel::MessagePtr message = |
| CreateMessage(MessageType::BIND_BROKER_HOST, sizeof(BindBrokerHostData), |
| handles.size(), &data); |
| message->SetHandles(std::move(handles)); |
| WriteChannelMessage(std::move(message)); |
| #endif |
| } |
| |
| #if defined(OS_WIN) |
| void NodeChannel::RelayEventMessage(const ports::NodeName& destination, |
| Channel::MessagePtr message) { |
| DCHECK(message->has_handles()); |
| |
| // Note that this is only used on Windows, and on Windows all platform |
| // handles are included in the message data. We blindly copy all the data |
| // here and the relay node (the broker) will duplicate handles as needed. |
| size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes(); |
| RelayEventMessageData* data; |
| Channel::MessagePtr relay_message = |
| CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data); |
| data->destination = destination; |
| memcpy(data + 1, message->data(), message->data_num_bytes()); |
| |
| // When the handles are duplicated in the broker, the source handles will |
| // be closed. If the broker never receives this message then these handles |
| // will leak, but that means something else has probably broken and the |
| // sending process won't likely be around much longer. |
| // |
| // TODO(https://crbug.com/813112): We would like to be able to violate the |
| // above stated assumption. We should not leak handles in cases where we |
| // outlive the broker, as we may continue existing and eventually accept a new |
| // broker invitation. |
| std::vector<PlatformHandleInTransit> handles = message->TakeHandles(); |
| for (auto& handle : handles) |
| handle.TakeHandle().release(); |
| |
| WriteChannelMessage(std::move(relay_message)); |
| } |
| |
| void NodeChannel::EventMessageFromRelay(const ports::NodeName& source, |
| Channel::MessagePtr message) { |
| size_t num_bytes = |
| sizeof(EventMessageFromRelayData) + message->payload_size(); |
| EventMessageFromRelayData* data; |
| Channel::MessagePtr relayed_message = |
| CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes, |
| message->num_handles(), &data); |
| data->source = source; |
| if (message->payload_size()) |
| memcpy(data + 1, message->payload(), message->payload_size()); |
| relayed_message->SetHandles(message->TakeHandles()); |
| WriteChannelMessage(std::move(relayed_message)); |
| } |
| #endif // defined(OS_WIN) |
| |
| NodeChannel::NodeChannel( |
| Delegate* delegate, |
| ConnectionParams connection_params, |
| Channel::HandlePolicy channel_handle_policy, |
| scoped_refptr<base::SingleThreadTaskRunner> io_task_runner, |
| const ProcessErrorCallback& process_error_callback) |
| : base::RefCountedDeleteOnSequence<NodeChannel>(io_task_runner), |
| delegate_(delegate), |
| process_error_callback_(process_error_callback) |
| #if !defined(OS_NACL_SFI) |
| , |
| channel_(Channel::Create(this, |
| std::move(connection_params), |
| channel_handle_policy, |
| std::move(io_task_runner))) |
| #endif |
| { |
| } |
| |
| NodeChannel::~NodeChannel() { |
| ShutDown(); |
| } |
| |
| void NodeChannel::CreateAndBindLocalBrokerHost( |
| PlatformHandle broker_host_handle) { |
| #if !defined(OS_APPLE) && !defined(OS_NACL) && !defined(OS_FUCHSIA) |
| // Self-owned. |
| ConnectionParams connection_params( |
| PlatformChannelEndpoint(std::move(broker_host_handle))); |
| new BrokerHost(remote_process_handle_.Duplicate(), |
| std::move(connection_params), process_error_callback_); |
| #endif |
| } |
| |
| void NodeChannel::OnChannelMessage(const void* payload, |
| size_t payload_size, |
| std::vector<PlatformHandle> handles) { |
| DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); |
| |
| RequestContext request_context(RequestContext::Source::SYSTEM); |
| |
| if (payload_size <= sizeof(Header)) { |
| delegate_->OnChannelError(remote_node_name_, this); |
| return; |
| } |
| |
| const Header* header = static_cast<const Header*>(payload); |
| switch (header->type) { |
| case MessageType::ACCEPT_INVITEE: { |
| AcceptInviteeData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| delegate_->OnAcceptInvitee(remote_node_name_, data.inviter_name, |
| data.token); |
| return; |
| } |
| break; |
| } |
| |
| case MessageType::ACCEPT_INVITATION: { |
| AcceptInvitationData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| delegate_->OnAcceptInvitation(remote_node_name_, data.token, |
| data.invitee_name); |
| return; |
| } |
| break; |
| } |
| |
| case MessageType::ADD_BROKER_CLIENT: { |
| AddBrokerClientData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| #if defined(OS_WIN) |
| if (handles.size() != 1) { |
| DLOG(ERROR) << "Dropping invalid AddBrokerClient message."; |
| break; |
| } |
| delegate_->OnAddBrokerClient(remote_node_name_, data.client_name, |
| handles[0].ReleaseHandle()); |
| #else |
| if (!handles.empty()) { |
| DLOG(ERROR) << "Dropping invalid AddBrokerClient message."; |
| break; |
| } |
| delegate_->OnAddBrokerClient(remote_node_name_, data.client_name, |
| data.process_handle); |
| #endif |
| return; |
| } |
| break; |
| } |
| |
| case MessageType::BROKER_CLIENT_ADDED: { |
| BrokerClientAddedData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| if (handles.size() != 1) { |
| DLOG(ERROR) << "Dropping invalid BrokerClientAdded message."; |
| break; |
| } |
| delegate_->OnBrokerClientAdded(remote_node_name_, data.client_name, |
| std::move(handles[0])); |
| return; |
| } |
| break; |
| } |
| |
| case MessageType::ACCEPT_BROKER_CLIENT: { |
| AcceptBrokerClientData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| PlatformHandle broker_channel; |
| if (handles.size() > 1) { |
| DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message."; |
| break; |
| } |
| if (handles.size() == 1) |
| broker_channel = std::move(handles[0]); |
| |
| delegate_->OnAcceptBrokerClient(remote_node_name_, data.broker_name, |
| std::move(broker_channel)); |
| return; |
| } |
| break; |
| } |
| |
| case MessageType::EVENT_MESSAGE: { |
| Channel::MessagePtr message( |
| new Channel::Message(payload_size, handles.size())); |
| message->SetHandles(std::move(handles)); |
| memcpy(message->mutable_payload(), payload, payload_size); |
| delegate_->OnEventMessage(remote_node_name_, std::move(message)); |
| return; |
| } |
| |
| case MessageType::REQUEST_PORT_MERGE: { |
| RequestPortMergeData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| // Don't accept an empty token. |
| size_t token_size = payload_size - sizeof(data) - sizeof(Header); |
| if (token_size == 0) |
| break; |
| std::string token(reinterpret_cast<const char*>(payload) + |
| sizeof(Header) + sizeof(data), |
| token_size); |
| delegate_->OnRequestPortMerge(remote_node_name_, |
| data.connector_port_name, token); |
| return; |
| } |
| break; |
| } |
| |
| case MessageType::REQUEST_INTRODUCTION: { |
| IntroductionData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| delegate_->OnRequestIntroduction(remote_node_name_, data.name); |
| return; |
| } |
| break; |
| } |
| |
| case MessageType::INTRODUCE: { |
| IntroductionData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| if (handles.size() > 1) { |
| DLOG(ERROR) << "Dropping invalid introduction message."; |
| break; |
| } |
| PlatformHandle channel_handle; |
| if (handles.size() == 1) |
| channel_handle = std::move(handles[0]); |
| |
| delegate_->OnIntroduce(remote_node_name_, data.name, |
| std::move(channel_handle)); |
| return; |
| } |
| break; |
| } |
| |
| #if defined(OS_WIN) |
| case MessageType::RELAY_EVENT_MESSAGE: { |
| base::ProcessHandle from_process; |
| { |
| base::AutoLock lock(remote_process_handle_lock_); |
| // NOTE: It's safe to retain a weak reference to this process handle |
| // through the extent of this call because |this| is kept alive and |
| // |remote_process_handle_| is never reset once set. |
| from_process = remote_process_handle_.Handle(); |
| } |
| RelayEventMessageData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| // Don't try to relay an empty message. |
| if (payload_size <= sizeof(Header) + sizeof(data)) |
| break; |
| |
| const void* message_start = reinterpret_cast<const uint8_t*>(payload) + |
| sizeof(Header) + sizeof(data); |
| Channel::MessagePtr message = Channel::Message::Deserialize( |
| message_start, payload_size - sizeof(Header) - sizeof(data), |
| from_process); |
| if (!message) { |
| DLOG(ERROR) << "Dropping invalid relay message."; |
| break; |
| } |
| delegate_->OnRelayEventMessage(remote_node_name_, from_process, |
| data.destination, std::move(message)); |
| return; |
| } |
| break; |
| } |
| #endif |
| |
| case MessageType::BROADCAST_EVENT: { |
| if (payload_size <= sizeof(Header)) |
| break; |
| const void* data = static_cast<const void*>( |
| reinterpret_cast<const Header*>(payload) + 1); |
| Channel::MessagePtr message = |
| Channel::Message::Deserialize(data, payload_size - sizeof(Header)); |
| if (!message || message->has_handles()) { |
| DLOG(ERROR) << "Dropping invalid broadcast message."; |
| break; |
| } |
| delegate_->OnBroadcast(remote_node_name_, std::move(message)); |
| return; |
| } |
| |
| #if defined(OS_WIN) |
| case MessageType::EVENT_MESSAGE_FROM_RELAY: { |
| EventMessageFromRelayData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| if (payload_size < (sizeof(Header) + sizeof(data))) |
| break; |
| |
| size_t num_bytes = payload_size - sizeof(data) - sizeof(Header); |
| |
| Channel::MessagePtr message( |
| new Channel::Message(num_bytes, handles.size())); |
| message->SetHandles(std::move(handles)); |
| if (num_bytes) |
| memcpy(message->mutable_payload(), |
| static_cast<const uint8_t*>(payload) + sizeof(Header) + |
| sizeof(data), |
| num_bytes); |
| delegate_->OnEventMessageFromRelay(remote_node_name_, data.source, |
| std::move(message)); |
| return; |
| } |
| break; |
| } |
| #endif // defined(OS_WIN) |
| |
| case MessageType::ACCEPT_PEER: { |
| AcceptPeerData data; |
| if (GetMessagePayload(payload, payload_size, &data)) { |
| delegate_->OnAcceptPeer(remote_node_name_, data.token, data.peer_name, |
| data.port_name); |
| return; |
| } |
| break; |
| } |
| |
| case MessageType::BIND_BROKER_HOST: |
| if (handles.size() == 1) { |
| CreateAndBindLocalBrokerHost(std::move(handles[0])); |
| return; |
| } |
| break; |
| |
| default: |
| // Ignore unrecognized message types, allowing for future extensibility. |
| return; |
| } |
| |
| DLOG(ERROR) << "Received invalid message. Closing channel."; |
| if (process_error_callback_) |
| process_error_callback_.Run("NodeChannel received a malformed message"); |
| delegate_->OnChannelError(remote_node_name_, this); |
| } |
| |
| void NodeChannel::OnChannelError(Channel::Error error) { |
| DCHECK(owning_task_runner()->RunsTasksInCurrentSequence()); |
| |
| RequestContext request_context(RequestContext::Source::SYSTEM); |
| |
| ShutDown(); |
| |
| if (process_error_callback_ && |
| error == Channel::Error::kReceivedMalformedData) { |
| process_error_callback_.Run("Channel received a malformed message"); |
| } |
| |
| // |OnChannelError()| may cause |this| to be destroyed, but still need access |
| // to the name after that destruction. So make a copy of |
| // |remote_node_name_| so it can be used if |this| becomes destroyed. |
| ports::NodeName node_name = remote_node_name_; |
| delegate_->OnChannelError(node_name, this); |
| } |
| |
| void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) { |
| base::AutoLock lock(channel_lock_); |
| if (!channel_) |
| DLOG(ERROR) << "Dropping message on closed channel."; |
| else |
| channel_->Write(std::move(message)); |
| } |
| |
| } // namespace core |
| } // namespace mojo |