Add MessagePort Implementation over Mojo

This CL adds a new MessagePort implementation backed by a Mojo IPC.
This MessagePort is to be used solely for E2E testing of the Cast
Runtime.

Bug: internal b/190564826
Change-Id: I60a4c28c6dda7afacdc74eefacc9700f24432dab
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2947893
Reviewed-by: Will Cassella <cassew@google.com>
Reviewed-by: Matthew Denton <mpdenton@chromium.org>
Reviewed-by: Stefan Zager <szager@chromium.org>
Reviewed-by: Oksana Zhuravlova <oksamyt@chromium.org>
Reviewed-by: Fabrice de Gans <fdegans@chromium.org>
Commit-Queue: Ryan Keane <rwkeane@google.com>
Cr-Commit-Position: refs/heads/master@{#893141}
diff --git a/components/cast/message_port/BUILD.gn b/components/cast/message_port/BUILD.gn
index 392580a..e9df1dc0 100644
--- a/components/cast/message_port/BUILD.gn
+++ b/components/cast/message_port/BUILD.gn
@@ -3,10 +3,14 @@
 # found in the LICENSE file.
 
 import("//build/config/features.gni")
+import("//chromecast/chromecast.gni")
+import("//media/media_options.gni")
 import("//testing/test.gni")
 
 source_set("message_port") {
-  if (is_fuchsia) {
+  if (enable_cast_runtime_e2e_testing) {
+    public_deps = [ ":message_port_mojo" ]
+  } else if (is_fuchsia) {
     public_deps = [ ":message_port_fuchsia" ]
   } else {
     public_deps = [ ":message_port_cast" ]
@@ -61,6 +65,19 @@
   ]
 }
 
+source_set("message_port_mojo") {
+  public = [ "message_port_mojo.h" ]
+
+  sources = [ "message_port_mojo.cc" ]
+
+  public_deps = [
+    ":public",
+    "//components/cast/message_port/mojom",
+  ]
+
+  deps = [ ":public" ]
+}
+
 source_set("message_port_unittest") {
   testonly = true
   sources = [ "message_port_unittest.cc" ]
@@ -68,6 +85,9 @@
     ":message_port",
     ":test_message_port_receiver",
     "//base/test:test_support",
+    "//build:buildflag_header_h",
+    "//components/cast/message_port/mojom",
+    "//media:media_buildflags",
     "//testing/gtest",
   ]
 }
diff --git a/components/cast/message_port/DEPS b/components/cast/message_port/DEPS
index 362bdec..955f55b 100644
--- a/components/cast/message_port/DEPS
+++ b/components/cast/message_port/DEPS
@@ -1,5 +1,8 @@
 include_rules = [
   # TODO(crbug.com/1120731): Remove dependencies on //fuchsia.
   "+fuchsia/base/mem_buffer_util.h",
+
+  "+media/media_buildflags.h",
+  "+mojo/public",
   "+third_party/blink/public/common/messaging",
 ]
diff --git a/components/cast/message_port/message_port_mojo.cc b/components/cast/message_port/message_port_mojo.cc
new file mode 100644
index 0000000..98309a8
--- /dev/null
+++ b/components/cast/message_port/message_port_mojo.cc
@@ -0,0 +1,123 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "components/cast/message_port/message_port_mojo.h"
+
+#include <vector>
+
+#include "mojo/public/cpp/bindings/pending_receiver.h"
+#include "mojo/public/cpp/bindings/pending_remote.h"
+
+namespace cast_api_bindings {
+
+// static
+void MessagePort::CreatePair(std::unique_ptr<MessagePort>* client,
+                             std::unique_ptr<MessagePort>* server) {
+  mojo::PendingRemote<mojom::UnidirectionalMessagePort> client_remote;
+  mojo::PendingRemote<mojom::UnidirectionalMessagePort> server_remote;
+
+  auto client_receiver = server_remote.InitWithNewPipeAndPassReceiver();
+  auto server_receiver = client_remote.InitWithNewPipeAndPassReceiver();
+
+  *client = std::make_unique<MessagePortMojo>(std::move(client_receiver),
+                                              std::move(client_remote));
+  *server = std::make_unique<MessagePortMojo>(std::move(server_receiver),
+                                              std::move(server_remote));
+}
+
+MessagePortMojo::MessagePortMojo(
+    mojo::PendingReceiver<mojom::UnidirectionalMessagePort> receiver,
+    mojo::PendingRemote<mojom::UnidirectionalMessagePort> remote)
+    : mojo_remote_(std::move(remote)),
+      mojo_receiver_(this, std::move(receiver)),
+      weak_factory_(this) {
+  mojo_receiver_.set_disconnect_handler(base::BindOnce(
+      &MessagePortMojo::OnMojoDisconnected, weak_factory_.GetWeakPtr()));
+  mojo_remote_.set_disconnect_handler(base::BindOnce(
+      &MessagePortMojo::OnMojoDisconnected, weak_factory_.GetWeakPtr()));
+}
+
+MessagePortMojo::~MessagePortMojo() = default;
+
+bool MessagePortMojo::PostMessage(base::StringPiece message) {
+  return PostMessageWithTransferables(std::move(message), {});
+}
+
+bool MessagePortMojo::PostMessageWithTransferables(
+    base::StringPiece message,
+    std::vector<std::unique_ptr<MessagePort>> ports) {
+  if (!CanPostMessage()) {
+    return false;
+  }
+
+  std::vector<mojom::MessagePortPtr> mojo_ports;
+  mojo_ports.reserve(ports.size());
+  for (std::unique_ptr<MessagePort>& port : ports) {
+    DCHECK(port);
+
+    // The |mojo_remote_| object used below would be invalidated in this
+    // process.
+    DCHECK(port.get() != this);
+
+    // This is safe because there is one MessagePort implementation used at a
+    // time.
+    MessagePortMojo* casted_port = static_cast<MessagePortMojo*>(port.get());
+
+    mojo_ports.push_back(casted_port->Unbind());
+  }
+
+  mojo_remote_->PostMessageWithTransferables(
+      std::string(message.begin(), message.end()), std::move(mojo_ports));
+  return true;
+}
+
+void MessagePortMojo::SetReceiver(MessagePort::Receiver* receiver) {
+  receiver_ = receiver;
+}
+
+void MessagePortMojo::Close() {
+  mojo_remote_.reset();
+  mojo_receiver_.reset();
+}
+
+bool MessagePortMojo::CanPostMessage() const {
+  return mojo_remote_.is_bound();
+}
+
+void MessagePortMojo::PostMessageWithTransferables(
+    const std::string& message,
+    std::vector<mojom::MessagePortPtr> ports) {
+  if (!receiver_) {
+    return;
+  }
+
+  std::vector<std::unique_ptr<MessagePort>> mojo_ports;
+  mojo_ports.reserve(ports.size());
+  for (mojom::MessagePortPtr& port : ports) {
+    mojo_ports.push_back(std::make_unique<MessagePortMojo>(
+        std::move(port.get()->receiver), std::move(port.get()->remote)));
+  }
+
+  receiver_->OnMessage(message, std::move(mojo_ports));
+}
+
+mojom::MessagePortPtr MessagePortMojo::Unbind() {
+  if (!mojo_remote_.is_bound() || !mojo_receiver_.is_bound()) {
+    return nullptr;
+  }
+
+  return mojom::MessagePort::New(mojo_receiver_.Unbind(),
+                                 mojo_remote_.Unbind());
+}
+
+void MessagePortMojo::OnMojoDisconnected() {
+  if (!receiver_) {
+    return;
+  }
+
+  receiver_->OnPipeError();
+  Close();
+}
+
+}  // namespace cast_api_bindings
diff --git a/components/cast/message_port/message_port_mojo.h b/components/cast/message_port/message_port_mojo.h
new file mode 100644
index 0000000..a1b792a
--- /dev/null
+++ b/components/cast/message_port/message_port_mojo.h
@@ -0,0 +1,58 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_MOJO_H_
+#define COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_MOJO_H_
+
+#include <string>
+
+#include "components/cast/message_port/message_port.h"
+#include "components/cast/message_port/mojom/unidirectional_message_port.mojom.h"
+#include "mojo/public/cpp/bindings/receiver.h"
+#include "mojo/public/cpp/bindings/remote.h"
+
+namespace cast_api_bindings {
+
+// A MessagePort implementation built on top of a Mojo stream.
+class MessagePortMojo : public mojom::UnidirectionalMessagePort,
+                        public MessagePort {
+ public:
+  MessagePortMojo(
+      mojo::PendingReceiver<mojom::UnidirectionalMessagePort> receiver,
+      mojo::PendingRemote<mojom::UnidirectionalMessagePort> remote);
+  ~MessagePortMojo() override;
+
+  // MessagePort overrides.
+  bool PostMessage(base::StringPiece message) override;
+  bool PostMessageWithTransferables(
+      base::StringPiece message,
+      std::vector<std::unique_ptr<MessagePort>> ports) override;
+  void SetReceiver(MessagePort::Receiver* receiver) override;
+  void Close() override;
+  bool CanPostMessage() const override;
+
+ private:
+  // mojom::UnidirectionalMessagePort overrides.
+  void PostMessageWithTransferables(
+      const std::string& message,
+      std::vector<mojom::MessagePortPtr> ports) override;
+
+  // Returns the mojo::MessagePort containing both the |sender_| and |receiver_|
+  // associated with this instance, which is invalidated by this method.
+  mojom::MessagePortPtr Unbind();
+
+  // Called as a disconnect handler for |sender_| and |receiver_|.
+  void OnMojoDisconnected();
+
+  MessagePort::Receiver* receiver_ = nullptr;
+
+  mojo::Remote<mojom::UnidirectionalMessagePort> mojo_remote_;
+  mojo::Receiver<mojom::UnidirectionalMessagePort> mojo_receiver_;
+
+  base::WeakPtrFactory<MessagePortMojo> weak_factory_;
+};
+
+}  // namespace cast_api_bindings
+
+#endif  // COMPONENTS_CAST_MESSAGE_PORT_MESSAGE_PORT_MOJO_H_
diff --git a/components/cast/message_port/message_port_unittest.cc b/components/cast/message_port/message_port_unittest.cc
index 4b8a190..02250bc 100644
--- a/components/cast/message_port/message_port_unittest.cc
+++ b/components/cast/message_port/message_port_unittest.cc
@@ -6,15 +6,19 @@
 #include "base/run_loop.h"
 #include "base/test/task_environment.h"
 #include "build/build_config.h"
+#include "build/buildflag.h"
 #include "components/cast/message_port/test_message_port_receiver.h"
+#include "media/media_buildflags.h"
 #include "testing/gtest/include/gtest/gtest.h"
 
-#if defined(OS_FUCHSIA)
+#if BUILDFLAG(ENABLE_CAST_RUNTIME_E2E_TESTING)
+#include "components/cast/message_port/message_port_mojo.h"  // nogncheck
+#elif defined(OS_FUCHSIA)
 #include "components/cast/message_port/message_port_fuchsia.h"
 #else
 #include "components/cast/message_port/message_port_cast.h"  // nogncheck
 #include "third_party/blink/public/common/messaging/web_message_port.h"  // nogncheck
-#endif  // defined(OS_FUCHSIA)
+#endif  // BUILDFLAG(ENABLE_CAST_RUNTIME_E2E_TESTING)
 
 #ifdef PostMessage
 #undef PostMessage
@@ -92,21 +96,6 @@
   ASSERT_FALSE(server_->CanPostMessage());
 }
 
-TEST_F(MessagePortTest, OnError) {
-  server_receiver_.SetOnMessageResult(false);
-  SetDefaultReceivers();
-  client_->PostMessage("");
-
-#if defined(OS_FUCHSIA)
-  // blink::WebMessagePort reports failure when PostMessage returns false, but
-  // fuchsia::web::MessagePort will not report the error until the port closes
-  server_receiver_.RunUntilMessageCountEqual(1);
-  server_.reset();
-#endif
-
-  client_receiver_.RunUntilDisconnected();
-}
-
 TEST_F(MessagePortTest, PostMessage) {
   TestPostMessage();
 }
@@ -139,6 +128,22 @@
   PostMessages({"from port1"}, port1.get(), &port0_receiver);
 }
 
+#if !BUILDFLAG(ENABLE_CAST_RUNTIME_E2E_TESTING)
+TEST_F(MessagePortTest, OnError) {
+  server_receiver_.SetOnMessageResult(false);
+  SetDefaultReceivers();
+  client_->PostMessage("");
+
+#if defined(OS_FUCHSIA)
+  // blink::WebMessagePort reports failure when PostMessage returns false, but
+  // fuchsia::web::MessagePort will not report the error until the port closes
+  server_receiver_.RunUntilMessageCountEqual(1);
+  server_.reset();
+#endif
+
+  client_receiver_.RunUntilDisconnected();
+}
+
 TEST_F(MessagePortTest, WrapPlatformPort) {
   // Initialize ports from the platform type instead of agnostic CreatePair
 #if defined(OS_FUCHSIA)
@@ -171,5 +176,6 @@
 
   TestPostMessage();
 }
+#endif  // !BUILDFLAG(ENABLE_CAST_RUNTIME_E2E_TESTING)
 
 }  // namespace cast_api_bindings
diff --git a/components/cast/message_port/mojom/BUILD.gn b/components/cast/message_port/mojom/BUILD.gn
new file mode 100644
index 0000000..7016dc4
--- /dev/null
+++ b/components/cast/message_port/mojom/BUILD.gn
@@ -0,0 +1,10 @@
+# Copyright 2021 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import("//mojo/public/tools/bindings/mojom.gni")
+
+mojom("mojom") {
+  sources = [ "unidirectional_message_port.mojom" ]
+  public_deps = [ "//mojo/public/mojom/base" ]
+}
diff --git a/components/cast/message_port/mojom/OWNERS b/components/cast/message_port/mojom/OWNERS
new file mode 100644
index 0000000..08850f4
--- /dev/null
+++ b/components/cast/message_port/mojom/OWNERS
@@ -0,0 +1,2 @@
+per-file *.mojom=set noparent
+per-file *.mojom=file://ipc/SECURITY_OWNERS
diff --git a/components/cast/message_port/mojom/unidirectional_message_port.mojom b/components/cast/message_port/mojom/unidirectional_message_port.mojom
new file mode 100644
index 0000000..438face
--- /dev/null
+++ b/components/cast/message_port/mojom/unidirectional_message_port.mojom
@@ -0,0 +1,21 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+module cast_api_bindings.mojom;
+
+// Mojo MessagePort representation based on a pair of
+// UnidirectionalMessagePorts.
+struct MessagePort {
+  pending_receiver<UnidirectionalMessagePort> receiver;
+  pending_remote<UnidirectionalMessagePort> remote;
+};
+
+// A UnidirectionalMessagePort represents a single flow direction in a
+// bidirectional MessagePort instance. MessagePorts can both send and receive
+// data and MessagePorts so a single MessagePort corresponds to the above
+// struct, which contains both a receiver and a remote endpoint of this
+// interface.
+interface UnidirectionalMessagePort {
+  PostMessageWithTransferables(string message, array<MessagePort> ports);
+};
diff --git a/components/cast_streaming/browser/BUILD.gn b/components/cast_streaming/browser/BUILD.gn
index 6be496b..468d1849 100644
--- a/components/cast_streaming/browser/BUILD.gn
+++ b/components/cast_streaming/browser/BUILD.gn
@@ -2,6 +2,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+import("//media/media_options.gni")
 import("//testing/test.gni")
 
 # TODO(crbug.com/1207718): Remove cast_message_port_impl code from this
@@ -186,5 +187,8 @@
     "//components/cast/message_port:test_message_port_receiver",
     "//testing/gtest",
   ]
-  sources = [ "cast_message_port_impl_unittest.cc" ]
+  sources = []
+  if (!enable_cast_runtime_e2e_testing) {
+    sources += [ "cast_message_port_impl_unittest.cc" ]
+  }
 }
diff --git a/content/renderer/media/media_factory.cc b/content/renderer/media/media_factory.cc
index c2686a42..91a4bda 100644
--- a/content/renderer/media/media_factory.cc
+++ b/content/renderer/media/media_factory.cc
@@ -762,18 +762,16 @@
 
 #if BUILDFLAG(ENABLE_CAST_STREAMING_RENDERER)
   if (cast_streaming::IsCastStreamingMediaSourceUrl(url)) {
-#if BUILDFLAG(ENABLE_CAST_RENDERER)
+#if BUILDFLAG(ENABLE_CAST_RUNTIME_E2E_TESTING)
+    // TODO(b/187332037): Correct issues preventing playback when using this
+    // renderer.
+    auto default_factory_cast_streaming = CreateDefaultRendererFactory(
+        media_log, decoder_factory, render_thread, render_frame_);
+#else   // BUILDFLAG(ENABLE_CAST_RUNTIME_E2E_TESTING)
     auto default_factory_cast_streaming =
         std::make_unique<CastRendererClientFactory>(
             media_log, CreateMojoRendererFactory());
-#else   // BUILDFLAG(ENABLE_CAST_RENDERER)
-    // NOTE: Prior to the resolution of b/187332037, playback will not work
-    // correctly with this renderer.
-    // NOTE: This renderer is only expected to be used in TEST scenarios and
-    // should not be used in production.
-    auto default_factory_cast_streaming = CreateDefaultRendererFactory(
-        media_log, decoder_factory, render_thread, render_frame_);
-#endif  // BUILDFLAG(ENABLE_CAST_RENDERER)
+#endif  // BUILDFLAG(ENABLE_CAST_RUNTIME_E2E_TESTING)
 
     auto cast_streaming_renderer_factory =
         std::make_unique<media::cast::CastStreamingRendererFactory>(
diff --git a/media/BUILD.gn b/media/BUILD.gn
index 393cf68f..ed17ee4 100644
--- a/media/BUILD.gn
+++ b/media/BUILD.gn
@@ -42,6 +42,7 @@
     "ENABLE_PLATFORM_MPEG_H_AUDIO=$enable_platform_mpeg_h_audio",
     "ENABLE_MSE_MPEG2TS_STREAM_PARSER=$enable_mse_mpeg2ts_stream_parser",
     "ENABLE_CAST_STREAMING_RENDERER=$enable_cast_streaming_renderer",
+    "ENABLE_CAST_RUNTIME_E2E_TESTING=$enable_cast_runtime_e2e_testing",
     "USE_CHROMEOS_MEDIA_ACCELERATION=$use_vaapi||$use_v4l2_codec",
     "USE_CHROMEOS_PROTECTED_MEDIA=$use_chromeos_protected_media",
     "USE_PROPRIETARY_CODECS=$proprietary_codecs",
diff --git a/media/media_options.gni b/media/media_options.gni
index a8cac26..bfeac801 100644
--- a/media/media_options.gni
+++ b/media/media_options.gni
@@ -246,8 +246,20 @@
   enable_cast_streaming_renderer = false
 }
 
+# Use an additional declare_args to pick up overrides of
+# |enable_cast_streaming_renderer|.
+declare_args() {
+  # Signifies whether E2E testing for the Cast Media Runtime is being performed
+  # with this build.
+  enable_cast_runtime_e2e_testing =
+      enable_cast_streaming_renderer && !enable_cast_renderer
+}
+
 assert(!enable_cast_streaming_renderer || is_chromecast,
        "Currently, libcast mirroring is only supported for chromecast.")
+assert(!enable_cast_streaming_renderer || enable_cast_runtime_e2e_testing ||
+           enable_cast_renderer,
+       "No valid renderer enabled for use with the Cast Streaming Renderer.")
 
 # Do not expand this list without double-checking with OWNERS, this is a list of
 # all targets which roll up into the //media component. It controls visibility