| // Copyright 2020 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "fuchsia/cast_streaming/cast_message_port_impl.h" |
| |
| #include "base/fuchsia/fuchsia_logging.h" |
| #include "base/json/json_reader.h" |
| #include "base/json/json_writer.h" |
| #include "base/logging.h" |
| #include "base/values.h" |
| #include "fuchsia/base/mem_buffer_util.h" |
| #include "third_party/openscreen/src/platform/base/error.h" |
| |
| namespace cast_streaming { |
| |
| namespace { |
| |
| // TODO(b/156118960): Remove all these when Cast messages are handled by Open |
| // Screen. |
| const char kMirroringNamespace[] = "urn:x-cast:com.google.cast.webrtc"; |
| const char kRemotingNamespace[] = "urn:x-cast:com.google.cast.remoting"; |
| const char kSystemNamespace[] = "urn:x-cast:com.google.cast.system"; |
| |
| const char kKeySenderId[] = "senderId"; |
| const char kKeyNamespace[] = "namespace"; |
| const char kKeyData[] = "data"; |
| |
| const char kValueSystemSenderId[] = "SystemSender"; |
| |
| const char kInitialConnectMessage[] = R"( |
| { |
| "type": "ready", |
| "activeNamespaces": [ |
| "urn:x-cast:com.google.cast.webrtc", |
| "urn:x-cast:com.google.cast.remoting" |
| ], |
| "version": "2.0.0", |
| "messagesVersion": "1.0" |
| } |
| )"; |
| |
| // Upper limit for pending FIDL messages. Messages are going to be pending as |
| // long as the other end of the MessagePort does not acknowledge the latest |
| // message. This is to prevent the queue from being overrun in case the other |
| // end of the FIDL MessagePort is misbehaving. |
| // This should cover the largest burst of messages from the Open Screen |
| // implementation. |
| constexpr size_t kMaxPendingFidlMessages = 10; |
| |
| // Extracts |buffer| data into |sender_id|, |message_namespace| and |message|. |
| // Returns true on success. |
| bool ParseMessageBuffer(const fuchsia::mem::Buffer& buffer, |
| std::string* sender_id, |
| std::string* message_namespace, |
| std::string* message) { |
| std::string string_buffer; |
| if (!cr_fuchsia::StringFromMemBuffer(buffer, &string_buffer)) |
| return false; |
| |
| base::Optional<base::Value> converted_value = |
| base::JSONReader::Read(string_buffer); |
| if (!converted_value) |
| return false; |
| |
| const std::string* sender_id_value = |
| converted_value->FindStringPath(kKeySenderId); |
| if (!sender_id_value) |
| return false; |
| *sender_id = *sender_id_value; |
| |
| const std::string* message_namespace_value = |
| converted_value->FindStringPath(kKeyNamespace); |
| if (!message_namespace_value) |
| return false; |
| *message_namespace = *message_namespace_value; |
| |
| const std::string* message_value = converted_value->FindStringPath(kKeyData); |
| if (!message_value) |
| return false; |
| *message = *message_value; |
| |
| return true; |
| } |
| |
| // Creates a WebMessage out of the |sender_id|, |message_namespace| and |
| // |message|. |
| fuchsia::web::WebMessage CreateWebMessage(absl::string_view sender_id, |
| absl::string_view message_namespace, |
| absl::string_view message) { |
| base::Value value(base::Value::Type::DICTIONARY); |
| value.SetStringKey(kKeyNamespace, std::string(message_namespace)); |
| value.SetStringKey(kKeySenderId, std::string(sender_id)); |
| value.SetStringKey(kKeyData, std::string(message)); |
| |
| std::string json_message; |
| CHECK(base::JSONWriter::Write(value, &json_message)); |
| |
| fuchsia::mem::Buffer buffer; |
| buffer.size = json_message.size(); |
| zx_status_t status = zx::vmo::create(json_message.size(), 0, &buffer.vmo); |
| ZX_DCHECK(status == ZX_OK, status); |
| status = buffer.vmo.write(json_message.data(), 0, json_message.size()); |
| ZX_DCHECK(status == ZX_OK, status); |
| |
| fuchsia::web::WebMessage web_message; |
| web_message.set_data(std::move(buffer)); |
| |
| return web_message; |
| } |
| |
| } // namespace |
| |
| CastMessagePortImpl::CastMessagePortImpl( |
| fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request) |
| : message_port_binding_(this, std::move(message_port_request)) { |
| DVLOG(1) << __func__; |
| DCHECK(message_port_binding_.is_bound()); |
| message_port_binding_.set_error_handler([this](zx_status_t status) { |
| ZX_LOG(ERROR, status) << "MessagePort disconnected."; |
| MaybeCloseWithEpitaph(ZX_ERR_BAD_STATE); |
| }); |
| |
| // Initialize the connection with the Cast Streaming Sender. |
| PostMessage(kValueSystemSenderId, kSystemNamespace, kInitialConnectMessage); |
| } |
| |
| CastMessagePortImpl::~CastMessagePortImpl() = default; |
| |
| void CastMessagePortImpl::MaybeSendMessageToFidl() { |
| DVLOG(3) << __func__; |
| if (!receive_message_callback_ || pending_fidl_messages_.empty()) |
| return; |
| |
| receive_message_callback_(std::move(pending_fidl_messages_.front())); |
| receive_message_callback_ = nullptr; |
| pending_fidl_messages_.pop_front(); |
| } |
| |
| void CastMessagePortImpl::MaybeCloseWithEpitaph(zx_status_t epitaph) { |
| if (message_port_binding_.is_bound()) |
| message_port_binding_.Close(epitaph); |
| if (client_) { |
| client_->OnError( |
| openscreen::Error(openscreen::Error::Code::kCastV2CastSocketError)); |
| } |
| pending_fidl_messages_.clear(); |
| } |
| |
| void CastMessagePortImpl::SetClient( |
| openscreen::cast::MessagePort::Client* client) { |
| DVLOG(2) << __func__; |
| DCHECK_NE(!client_, !client); |
| client_ = client; |
| if (!client_) |
| MaybeCloseWithEpitaph(ZX_OK); |
| } |
| |
| void CastMessagePortImpl::PostMessage(absl::string_view sender_id, |
| absl::string_view message_namespace, |
| absl::string_view message) { |
| DVLOG(3) << __func__; |
| if (!message_port_binding_.is_bound()) |
| return; |
| |
| if (pending_fidl_messages_.size() > kMaxPendingFidlMessages) { |
| LOG(ERROR) << "Too many buffered Open Screen messages."; |
| MaybeCloseWithEpitaph(ZX_ERR_BAD_STATE); |
| return; |
| } |
| |
| DVLOG(3) << "Received Open Screen message. SenderId: " << sender_id |
| << ". Namespace: " << message_namespace << ". Message: " << message; |
| |
| pending_fidl_messages_.push_back( |
| CreateWebMessage(sender_id, message_namespace, message)); |
| MaybeSendMessageToFidl(); |
| } |
| |
| void CastMessagePortImpl::PostMessage( |
| fuchsia::web::WebMessage message, |
| fuchsia::web::MessagePort::PostMessageCallback callback) { |
| DVLOG(3) << __func__; |
| |
| // If |client_| was cleared, the binding should have been closed. |
| DCHECK(client_); |
| |
| std::string sender_id; |
| std::string message_namespace; |
| std::string str_message; |
| if (!ParseMessageBuffer(message.data(), &sender_id, &message_namespace, |
| &str_message)) { |
| LOG(ERROR) << "Received bad message."; |
| client_->OnError( |
| openscreen::Error(openscreen::Error::Code::kCastV2InvalidMessage)); |
| return; |
| } |
| DVLOG(3) << "Received FIDL message. SenderId: " << sender_id |
| << ". Namespace: " << message_namespace |
| << ". Message: " << str_message; |
| |
| // TODO(b/156118960): Have Open Screen handle message namespaces. |
| if (message_namespace == kMirroringNamespace || |
| message_namespace == kRemotingNamespace) { |
| client_->OnMessage(sender_id, message_namespace, str_message); |
| } else if (message_namespace.compare(kSystemNamespace) != 0) { |
| // System messages are ignored, log messages from unknown namespaces. |
| DVLOG(2) << "Unknown message from " << sender_id |
| << ", namespace=" << message_namespace |
| << ", message=" << str_message; |
| } |
| } |
| |
| void CastMessagePortImpl::ReceiveMessage( |
| fuchsia::web::MessagePort::ReceiveMessageCallback callback) { |
| DVLOG(3) << __func__; |
| if (receive_message_callback_) { |
| MaybeCloseWithEpitaph(ZX_ERR_BAD_STATE); |
| return; |
| } |
| |
| receive_message_callback_ = std::move(callback); |
| MaybeSendMessageToFidl(); |
| } |
| |
| } // namespace cast_streaming |