blob: c48fb573fea961aa4125c25f065a5177e26b7732 [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/node_channel.h"
#include <cstring>
#include <limits>
#include <sstream>
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "mojo/core/broker_host.h"
#include "mojo/core/channel.h"
#include "mojo/core/configuration.h"
#include "mojo/core/core.h"
#include "mojo/core/request_context.h"
namespace mojo {
namespace core {
namespace {
// NOTE: Please ONLY append messages to the end of this enum.
enum class MessageType : uint32_t {
ACCEPT_INVITEE,
ACCEPT_INVITATION,
ADD_BROKER_CLIENT,
BROKER_CLIENT_ADDED,
ACCEPT_BROKER_CLIENT,
EVENT_MESSAGE,
REQUEST_PORT_MERGE,
REQUEST_INTRODUCTION,
INTRODUCE,
#if defined(OS_WIN)
RELAY_EVENT_MESSAGE,
#endif
BROADCAST_EVENT,
#if defined(OS_WIN)
EVENT_MESSAGE_FROM_RELAY,
#endif
ACCEPT_PEER,
BIND_BROKER_HOST,
};
#pragma pack(push, 1)
struct alignas(8) Header {
MessageType type;
};
static_assert(IsAlignedForChannelMessage(sizeof(Header)),
"Invalid header size.");
struct alignas(8) AcceptInviteeDataV0 {
ports::NodeName inviter_name;
ports::NodeName token;
};
using AcceptInviteeData = AcceptInviteeDataV0;
struct alignas(8) AcceptInvitationDataV0 {
ports::NodeName token;
ports::NodeName invitee_name;
};
using AcceptInvitationData = AcceptInvitationDataV0;
struct alignas(8) AcceptPeerDataV0 {
ports::NodeName token;
ports::NodeName peer_name;
ports::PortName port_name;
};
using AcceptPeerData = AcceptPeerDataV0;
// This message may include a process handle on platforms that require it.
struct alignas(8) AddBrokerClientDataV0 {
ports::NodeName client_name;
#if !defined(OS_WIN)
uint32_t process_handle = 0;
#endif
};
using AddBrokerClientData = AddBrokerClientDataV0;
#if !defined(OS_WIN)
static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
"Unexpected pid size");
static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
"Invalid AddBrokerClientData size.");
#endif
// This data is followed by a platform channel handle to the broker.
struct alignas(8) BrokerClientAddedDataV0 {
ports::NodeName client_name;
};
using BrokerClientAddedData = BrokerClientAddedDataV0;
// This data may be followed by a platform channel handle to the broker. If not,
// then the inviter is the broker and its channel should be used as such.
struct alignas(8) AcceptBrokerClientDataV0 {
ports::NodeName broker_name;
};
using AcceptBrokerClientData = AcceptBrokerClientDataV0;
// This is followed by arbitrary payload data which is interpreted as a token
// string for port location.
// NOTE: Because this field is variable length it cannot be versioned.
struct alignas(8) RequestPortMergeData {
ports::PortName connector_port_name;
};
// Used for both REQUEST_INTRODUCTION and INTRODUCE.
//
// For INTRODUCE the message also includes a valid platform handle for a channel
// the receiver may use to communicate with the named node directly, or an
// invalid platform handle if the node is unknown to the sender or otherwise
// cannot be introduced.
struct alignas(8) IntroductionDataV0 {
ports::NodeName name;
};
using IntroductionData = IntroductionDataV0;
// This message is just a PlatformHandle. The data struct alignas(8) here has
// only a padding field to ensure an aligned, non-zero-length payload.
struct alignas(8) BindBrokerHostDataV0 {};
using BindBrokerHostData = BindBrokerHostDataV0;
#if defined(OS_WIN)
// This struct alignas(8) is followed by the full payload of a message to be
// relayed.
// NOTE: Because this field is variable length it cannot be versioned.
struct alignas(8) RelayEventMessageData {
ports::NodeName destination;
};
// This struct alignas(8) is followed by the full payload of a relayed message.
struct alignas(8) EventMessageFromRelayDataV0 {
ports::NodeName source;
};
using EventMessageFromRelayData = EventMessageFromRelayDataV0;
#endif
#pragma pack(pop)
Channel::MessagePtr CreateMessage(MessageType type,
size_t payload_size,
size_t num_handles,
void** out_data,
size_t capacity = 0) {
const size_t total_size = payload_size + sizeof(Header);
if (capacity == 0)
capacity = total_size;
else
capacity = std::max(total_size, capacity);
auto message =
std::make_unique<Channel::Message>(capacity, total_size, num_handles);
Header* header = reinterpret_cast<Header*>(message->mutable_payload());
// Make sure any header padding gets zeroed.
memset(header, 0, sizeof(Header));
header->type = type;
// The out_data starts beyond the header.
*out_data = reinterpret_cast<void*>(header + 1);
return message;
}
template <typename DataType>
Channel::MessagePtr CreateMessage(MessageType type,
size_t payload_size,
size_t num_handles,
DataType** out_data,
size_t capacity = 0) {
auto msg_ptr = CreateMessage(type, payload_size, num_handles,
reinterpret_cast<void**>(out_data), capacity);
// Since we know the type let's make sure any padding areas are zeroed.
memset(*out_data, 0, sizeof(DataType));
return msg_ptr;
}
// This method takes a second template argument which is another datatype which
// represents the smallest size this payload can be to be considered valid this
// MUST be used when there is more than one version of a message to specify the
// oldest version of the message.
template <typename DataType, typename MinSizedDataType>
bool GetMessagePayloadMinimumSized(const void* bytes,
size_t num_bytes,
DataType* out_data) {
static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
if (num_bytes < sizeof(Header) + sizeof(MinSizedDataType))
return false;
// Always make sure that the full object is zeored and default constructed as
// we may not have the complete type. The default construction allows fields
// to be default initialized to be resilient to older message versions.
memset(out_data, 0, sizeof(*out_data));
new (out_data) DataType;
// Overwrite any fields we received.
memcpy(out_data, static_cast<const uint8_t*>(bytes) + sizeof(Header),
std::min(sizeof(DataType), num_bytes - sizeof(Header)));
return true;
}
template <typename DataType>
bool GetMessagePayload(const void* bytes,
size_t num_bytes,
DataType* out_data) {
return GetMessagePayloadMinimumSized<DataType, DataType>(bytes, num_bytes,
out_data);
}
} // namespace
// static
scoped_refptr<NodeChannel> NodeChannel::Create(
Delegate* delegate,
ConnectionParams connection_params,
Channel::HandlePolicy channel_handle_policy,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
const ProcessErrorCallback& process_error_callback) {
#if defined(OS_NACL_SFI)
LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
return nullptr;
#else
return new NodeChannel(delegate, std::move(connection_params),
channel_handle_policy, io_task_runner,
process_error_callback);
#endif
}
// static
Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity,
size_t payload_size,
void** payload,
size_t num_handles) {
return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles,
payload, capacity);
}
// static
void NodeChannel::GetEventMessageData(Channel::Message* message,
void** data,
size_t* num_data_bytes) {
// NOTE: OnChannelMessage guarantees that we never accept a Channel::Message
// with a payload of fewer than |sizeof(Header)| bytes.
*data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
*num_data_bytes = message->payload_size() - sizeof(Header);
}
void NodeChannel::Start() {
base::AutoLock lock(channel_lock_);
// ShutDown() may have already been called, in which case |channel_| is null.
if (channel_)
channel_->Start();
}
void NodeChannel::ShutDown() {
base::AutoLock lock(channel_lock_);
if (channel_) {
channel_->ShutDown();
channel_ = nullptr;
}
}
void NodeChannel::LeakHandleOnShutdown() {
base::AutoLock lock(channel_lock_);
if (channel_) {
channel_->LeakHandle();
}
}
void NodeChannel::NotifyBadMessage(const std::string& error) {
DCHECK(HasBadMessageHandler());
process_error_callback_.Run("Received bad user message: " + error);
}
void NodeChannel::SetRemoteProcessHandle(base::Process process_handle) {
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
{
base::AutoLock lock(channel_lock_);
if (channel_)
channel_->set_remote_process(process_handle.Duplicate());
}
base::AutoLock lock(remote_process_handle_lock_);
DCHECK(!remote_process_handle_.IsValid());
CHECK_NE(remote_process_handle_.Handle(), base::GetCurrentProcessHandle());
remote_process_handle_ = std::move(process_handle);
}
bool NodeChannel::HasRemoteProcessHandle() {
base::AutoLock lock(remote_process_handle_lock_);
return remote_process_handle_.IsValid();
}
base::Process NodeChannel::CloneRemoteProcessHandle() {
base::AutoLock lock(remote_process_handle_lock_);
return remote_process_handle_.Duplicate();
}
void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
remote_node_name_ = name;
}
void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name,
const ports::NodeName& token) {
AcceptInviteeData* data;
Channel::MessagePtr message = CreateMessage(
MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data);
data->inviter_name = inviter_name;
data->token = token;
WriteChannelMessage(std::move(message));
}
void NodeChannel::AcceptInvitation(const ports::NodeName& token,
const ports::NodeName& invitee_name) {
AcceptInvitationData* data;
Channel::MessagePtr message = CreateMessage(
MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data);
data->token = token;
data->invitee_name = invitee_name;
WriteChannelMessage(std::move(message));
}
void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
const ports::NodeName& token,
const ports::PortName& port_name) {
AcceptPeerData* data;
Channel::MessagePtr message =
CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
data->token = token;
data->peer_name = sender_name;
data->port_name = port_name;
WriteChannelMessage(std::move(message));
}
void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
base::Process process_handle) {
AddBrokerClientData* data;
std::vector<PlatformHandle> handles;
#if defined(OS_WIN)
handles.emplace_back(base::win::ScopedHandle(process_handle.Release()));
#endif
Channel::MessagePtr message =
CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
handles.size(), &data);
message->SetHandles(std::move(handles));
data->client_name = client_name;
#if !defined(OS_WIN)
data->process_handle = process_handle.Handle();
#endif
WriteChannelMessage(std::move(message));
}
void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
PlatformHandle broker_channel) {
BrokerClientAddedData* data;
std::vector<PlatformHandle> handles;
if (broker_channel.is_valid())
handles.emplace_back(std::move(broker_channel));
Channel::MessagePtr message =
CreateMessage(MessageType::BROKER_CLIENT_ADDED,
sizeof(BrokerClientAddedData), handles.size(), &data);
message->SetHandles(std::move(handles));
data->client_name = client_name;
WriteChannelMessage(std::move(message));
}
void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
PlatformHandle broker_channel) {
AcceptBrokerClientData* data;
std::vector<PlatformHandle> handles;
if (broker_channel.is_valid())
handles.emplace_back(std::move(broker_channel));
Channel::MessagePtr message =
CreateMessage(MessageType::ACCEPT_BROKER_CLIENT,
sizeof(AcceptBrokerClientData), handles.size(), &data);
message->SetHandles(std::move(handles));
data->broker_name = broker_name;
WriteChannelMessage(std::move(message));
}
void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
const std::string& token) {
RequestPortMergeData* data;
Channel::MessagePtr message =
CreateMessage(MessageType::REQUEST_PORT_MERGE,
sizeof(RequestPortMergeData) + token.size(), 0, &data);
data->connector_port_name = connector_port_name;
memcpy(data + 1, token.data(), token.size());
WriteChannelMessage(std::move(message));
}
void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
IntroductionData* data;
Channel::MessagePtr message = CreateMessage(
MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
data->name = name;
WriteChannelMessage(std::move(message));
}
void NodeChannel::Introduce(const ports::NodeName& name,
PlatformHandle channel_handle) {
IntroductionData* data;
std::vector<PlatformHandle> handles;
if (channel_handle.is_valid())
handles.emplace_back(std::move(channel_handle));
Channel::MessagePtr message = CreateMessage(
MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data);
message->SetHandles(std::move(handles));
data->name = name;
WriteChannelMessage(std::move(message));
}
void NodeChannel::SendChannelMessage(Channel::MessagePtr message) {
WriteChannelMessage(std::move(message));
}
void NodeChannel::Broadcast(Channel::MessagePtr message) {
DCHECK(!message->has_handles());
void* data;
Channel::MessagePtr broadcast_message = CreateMessage(
MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data);
memcpy(data, message->data(), message->data_num_bytes());
WriteChannelMessage(std::move(broadcast_message));
}
void NodeChannel::BindBrokerHost(PlatformHandle broker_host_handle) {
#if !defined(OS_APPLE) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
DCHECK(broker_host_handle.is_valid());
BindBrokerHostData* data;
std::vector<PlatformHandle> handles;
handles.push_back(std::move(broker_host_handle));
Channel::MessagePtr message =
CreateMessage(MessageType::BIND_BROKER_HOST, sizeof(BindBrokerHostData),
handles.size(), &data);
message->SetHandles(std::move(handles));
WriteChannelMessage(std::move(message));
#endif
}
#if defined(OS_WIN)
void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
Channel::MessagePtr message) {
DCHECK(message->has_handles());
// Note that this is only used on Windows, and on Windows all platform
// handles are included in the message data. We blindly copy all the data
// here and the relay node (the broker) will duplicate handles as needed.
size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
RelayEventMessageData* data;
Channel::MessagePtr relay_message =
CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data);
data->destination = destination;
memcpy(data + 1, message->data(), message->data_num_bytes());
// When the handles are duplicated in the broker, the source handles will
// be closed. If the broker never receives this message then these handles
// will leak, but that means something else has probably broken and the
// sending process won't likely be around much longer.
//
// TODO(https://crbug.com/813112): We would like to be able to violate the
// above stated assumption. We should not leak handles in cases where we
// outlive the broker, as we may continue existing and eventually accept a new
// broker invitation.
std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
for (auto& handle : handles)
handle.TakeHandle().release();
WriteChannelMessage(std::move(relay_message));
}
void NodeChannel::EventMessageFromRelay(const ports::NodeName& source,
Channel::MessagePtr message) {
size_t num_bytes =
sizeof(EventMessageFromRelayData) + message->payload_size();
EventMessageFromRelayData* data;
Channel::MessagePtr relayed_message =
CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes,
message->num_handles(), &data);
data->source = source;
if (message->payload_size())
memcpy(data + 1, message->payload(), message->payload_size());
relayed_message->SetHandles(message->TakeHandles());
WriteChannelMessage(std::move(relayed_message));
}
#endif // defined(OS_WIN)
NodeChannel::NodeChannel(
Delegate* delegate,
ConnectionParams connection_params,
Channel::HandlePolicy channel_handle_policy,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
const ProcessErrorCallback& process_error_callback)
: base::RefCountedDeleteOnSequence<NodeChannel>(io_task_runner),
delegate_(delegate),
process_error_callback_(process_error_callback)
#if !defined(OS_NACL_SFI)
,
channel_(Channel::Create(this,
std::move(connection_params),
channel_handle_policy,
std::move(io_task_runner)))
#endif
{
}
NodeChannel::~NodeChannel() {
ShutDown();
}
void NodeChannel::CreateAndBindLocalBrokerHost(
PlatformHandle broker_host_handle) {
#if !defined(OS_APPLE) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
// Self-owned.
ConnectionParams connection_params(
PlatformChannelEndpoint(std::move(broker_host_handle)));
new BrokerHost(remote_process_handle_.Duplicate(),
std::move(connection_params), process_error_callback_);
#endif
}
void NodeChannel::OnChannelMessage(const void* payload,
size_t payload_size,
std::vector<PlatformHandle> handles) {
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
RequestContext request_context(RequestContext::Source::SYSTEM);
if (payload_size <= sizeof(Header)) {
delegate_->OnChannelError(remote_node_name_, this);
return;
}
const Header* header = static_cast<const Header*>(payload);
switch (header->type) {
case MessageType::ACCEPT_INVITEE: {
AcceptInviteeData data;
if (GetMessagePayload(payload, payload_size, &data)) {
delegate_->OnAcceptInvitee(remote_node_name_, data.inviter_name,
data.token);
return;
}
break;
}
case MessageType::ACCEPT_INVITATION: {
AcceptInvitationData data;
if (GetMessagePayload(payload, payload_size, &data)) {
delegate_->OnAcceptInvitation(remote_node_name_, data.token,
data.invitee_name);
return;
}
break;
}
case MessageType::ADD_BROKER_CLIENT: {
AddBrokerClientData data;
if (GetMessagePayload(payload, payload_size, &data)) {
#if defined(OS_WIN)
if (handles.size() != 1) {
DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
break;
}
delegate_->OnAddBrokerClient(remote_node_name_, data.client_name,
handles[0].ReleaseHandle());
#else
if (!handles.empty()) {
DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
break;
}
delegate_->OnAddBrokerClient(remote_node_name_, data.client_name,
data.process_handle);
#endif
return;
}
break;
}
case MessageType::BROKER_CLIENT_ADDED: {
BrokerClientAddedData data;
if (GetMessagePayload(payload, payload_size, &data)) {
if (handles.size() != 1) {
DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
break;
}
delegate_->OnBrokerClientAdded(remote_node_name_, data.client_name,
std::move(handles[0]));
return;
}
break;
}
case MessageType::ACCEPT_BROKER_CLIENT: {
AcceptBrokerClientData data;
if (GetMessagePayload(payload, payload_size, &data)) {
PlatformHandle broker_channel;
if (handles.size() > 1) {
DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
break;
}
if (handles.size() == 1)
broker_channel = std::move(handles[0]);
delegate_->OnAcceptBrokerClient(remote_node_name_, data.broker_name,
std::move(broker_channel));
return;
}
break;
}
case MessageType::EVENT_MESSAGE: {
Channel::MessagePtr message(
new Channel::Message(payload_size, handles.size()));
message->SetHandles(std::move(handles));
memcpy(message->mutable_payload(), payload, payload_size);
delegate_->OnEventMessage(remote_node_name_, std::move(message));
return;
}
case MessageType::REQUEST_PORT_MERGE: {
RequestPortMergeData data;
if (GetMessagePayload(payload, payload_size, &data)) {
// Don't accept an empty token.
size_t token_size = payload_size - sizeof(data) - sizeof(Header);
if (token_size == 0)
break;
std::string token(reinterpret_cast<const char*>(payload) +
sizeof(Header) + sizeof(data),
token_size);
delegate_->OnRequestPortMerge(remote_node_name_,
data.connector_port_name, token);
return;
}
break;
}
case MessageType::REQUEST_INTRODUCTION: {
IntroductionData data;
if (GetMessagePayload(payload, payload_size, &data)) {
delegate_->OnRequestIntroduction(remote_node_name_, data.name);
return;
}
break;
}
case MessageType::INTRODUCE: {
IntroductionData data;
if (GetMessagePayload(payload, payload_size, &data)) {
if (handles.size() > 1) {
DLOG(ERROR) << "Dropping invalid introduction message.";
break;
}
PlatformHandle channel_handle;
if (handles.size() == 1)
channel_handle = std::move(handles[0]);
delegate_->OnIntroduce(remote_node_name_, data.name,
std::move(channel_handle));
return;
}
break;
}
#if defined(OS_WIN)
case MessageType::RELAY_EVENT_MESSAGE: {
base::ProcessHandle from_process;
{
base::AutoLock lock(remote_process_handle_lock_);
// NOTE: It's safe to retain a weak reference to this process handle
// through the extent of this call because |this| is kept alive and
// |remote_process_handle_| is never reset once set.
from_process = remote_process_handle_.Handle();
}
RelayEventMessageData data;
if (GetMessagePayload(payload, payload_size, &data)) {
// Don't try to relay an empty message.
if (payload_size <= sizeof(Header) + sizeof(data))
break;
const void* message_start = reinterpret_cast<const uint8_t*>(payload) +
sizeof(Header) + sizeof(data);
Channel::MessagePtr message = Channel::Message::Deserialize(
message_start, payload_size - sizeof(Header) - sizeof(data),
from_process);
if (!message) {
DLOG(ERROR) << "Dropping invalid relay message.";
break;
}
delegate_->OnRelayEventMessage(remote_node_name_, from_process,
data.destination, std::move(message));
return;
}
break;
}
#endif
case MessageType::BROADCAST_EVENT: {
if (payload_size <= sizeof(Header))
break;
const void* data = static_cast<const void*>(
reinterpret_cast<const Header*>(payload) + 1);
Channel::MessagePtr message =
Channel::Message::Deserialize(data, payload_size - sizeof(Header));
if (!message || message->has_handles()) {
DLOG(ERROR) << "Dropping invalid broadcast message.";
break;
}
delegate_->OnBroadcast(remote_node_name_, std::move(message));
return;
}
#if defined(OS_WIN)
case MessageType::EVENT_MESSAGE_FROM_RELAY: {
EventMessageFromRelayData data;
if (GetMessagePayload(payload, payload_size, &data)) {
if (payload_size < (sizeof(Header) + sizeof(data)))
break;
size_t num_bytes = payload_size - sizeof(data) - sizeof(Header);
Channel::MessagePtr message(
new Channel::Message(num_bytes, handles.size()));
message->SetHandles(std::move(handles));
if (num_bytes)
memcpy(message->mutable_payload(),
static_cast<const uint8_t*>(payload) + sizeof(Header) +
sizeof(data),
num_bytes);
delegate_->OnEventMessageFromRelay(remote_node_name_, data.source,
std::move(message));
return;
}
break;
}
#endif // defined(OS_WIN)
case MessageType::ACCEPT_PEER: {
AcceptPeerData data;
if (GetMessagePayload(payload, payload_size, &data)) {
delegate_->OnAcceptPeer(remote_node_name_, data.token, data.peer_name,
data.port_name);
return;
}
break;
}
case MessageType::BIND_BROKER_HOST:
if (handles.size() == 1) {
CreateAndBindLocalBrokerHost(std::move(handles[0]));
return;
}
break;
default:
// Ignore unrecognized message types, allowing for future extensibility.
return;
}
DLOG(ERROR) << "Received invalid message. Closing channel.";
if (process_error_callback_)
process_error_callback_.Run("NodeChannel received a malformed message");
delegate_->OnChannelError(remote_node_name_, this);
}
void NodeChannel::OnChannelError(Channel::Error error) {
DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
RequestContext request_context(RequestContext::Source::SYSTEM);
ShutDown();
if (process_error_callback_ &&
error == Channel::Error::kReceivedMalformedData) {
process_error_callback_.Run("Channel received a malformed message");
}
// |OnChannelError()| may cause |this| to be destroyed, but still need access
// to the name after that destruction. So make a copy of
// |remote_node_name_| so it can be used if |this| becomes destroyed.
ports::NodeName node_name = remote_node_name_;
delegate_->OnChannelError(node_name, this);
}
void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
base::AutoLock lock(channel_lock_);
if (!channel_)
DLOG(ERROR) << "Dropping message on closed channel.";
else
channel_->Write(std::move(message));
}
} // namespace core
} // namespace mojo