Reworks Channel pausing behavior

It turns out creating a Channel paused is not good enough; we actually
need to be able to send some messages, then pause, send more messages,
unpause, send even more messages, and then flush.

This is because of subtleties in how RPHI handles queuing. It queues
messages before the Channel is created, then there is a brief period
between Channel creation and process launch where messages aren't
queued but are sent immediately (but previously queued messages remain
in queue), and then new messages are queued again during process
launch, and finally queueing is turned off after process launch.

This changes Channel to have an explicit Pause() API and gets rid
of the recently added ConnectPaused().

BUG=612500

Review-Url: https://codereview.chromium.org/2316963005
Cr-Commit-Position: refs/heads/master@{#417344}
diff --git a/ipc/ipc_channel.h b/ipc/ipc_channel.h
index c1e163c..11b9119 100644
--- a/ipc/ipc_channel.h
+++ b/ipc/ipc_channel.h
@@ -222,21 +222,15 @@
   // implementation.
   virtual bool Connect() WARN_UNUSED_RESULT = 0;
 
-  // Similar to (and exclusive to) Connect() above, but blocks outgoing messages
-  // until a future call to Unpause().
-  //
-  // Not all implementations support ConnectPaused(). Note that this interface
-  // exists only to facilitate weird behavior where some Channel consumers want
-  // to force some early messages to be transmitted before ones which were sent
-  // earlier. This allows that to be done without the consumer implementing
-  // their own message queueing support which may be incompatible with the
-  // Channel's internal queueing behavior.
-  virtual bool ConnectPaused() WARN_UNUSED_RESULT;
+  // Pause the channel. Subsequent sends will be queued internally until
+  // Unpause() is called and the channel is flushed either by Unpause() or a
+  // subsequent call to Flush().
+  virtual void Pause();
 
-  // Unpause the pipe. This allows subsequent Send() calls to transmit messages
-  // immediately, without queueing. If |flush| is true, any messages queued
-  // while paused will be flushed immediately upon unpausing. Otherwise you must
-  // call Flush() explicitly.
+  // Unpause the channel. This allows subsequent Send() calls to transmit
+  // messages immediately, without queueing. If |flush| is true, any messages
+  // queued while paused will be flushed immediately upon unpausing. Otherwise
+  // you must call Flush() explicitly.
   //
   // Not all implementations support Unpause(). See ConnectPaused() above for
   // details.
diff --git a/ipc/ipc_channel_common.cc b/ipc/ipc_channel_common.cc
index 7fb51a2..24bb543 100644
--- a/ipc/ipc_channel_common.cc
+++ b/ipc/ipc_channel_common.cc
@@ -80,10 +80,7 @@
   return nullptr;
 }
 
-bool Channel::ConnectPaused() {
-  NOTREACHED();
-  return false;
-}
+void Channel::Pause() { NOTREACHED(); }
 
 void Channel::Unpause(bool flush) { NOTREACHED(); }
 
diff --git a/ipc/ipc_channel_mojo.cc b/ipc/ipc_channel_mojo.cc
index 80bbfb56..d4fdca5 100644
--- a/ipc/ipc_channel_mojo.cc
+++ b/ipc/ipc_channel_mojo.cc
@@ -282,14 +282,6 @@
 }
 
 bool ChannelMojo::Connect() {
-  if (!ConnectPaused())
-    return false;
-
-  Unpause(true);
-  return true;
-}
-
-bool ChannelMojo::ConnectPaused() {
   WillConnect();
 
   DCHECK(!task_runner_);
@@ -300,14 +292,12 @@
   return true;
 }
 
+void ChannelMojo::Pause() {
+  bootstrap_->Pause();
+}
+
 void ChannelMojo::Unpause(bool flush) {
-  bootstrap_->Start();
-
-  // Ensure that no matter what messages have been queued so far, the first
-  // message we send is always the peer PID.
-  DCHECK(message_reader_);
-  message_reader_->sender()->SetPeerPid(GetSelfPID());
-
+  bootstrap_->Unpause();
   if (flush)
     Flush();
 }
@@ -330,6 +320,7 @@
 // MojoBootstrap::Delegate implementation
 void ChannelMojo::OnPipesAvailable(mojom::ChannelAssociatedPtr sender,
                                    mojom::ChannelAssociatedRequest receiver) {
+  sender->SetPeerPid(GetSelfPID());
   message_reader_.reset(new internal::MessagePipeReader(
       pipe_, std::move(sender), std::move(receiver), this));
 }
diff --git a/ipc/ipc_channel_mojo.h b/ipc/ipc_channel_mojo.h
index 44877b2..8dbc23e 100644
--- a/ipc/ipc_channel_mojo.h
+++ b/ipc/ipc_channel_mojo.h
@@ -68,7 +68,7 @@
 
   // Channel implementation
   bool Connect() override;
-  bool ConnectPaused() override;
+  void Pause() override;
   void Unpause(bool flush) override;
   void Flush() override;
   void Close() override;
diff --git a/ipc/ipc_channel_mojo_unittest.cc b/ipc/ipc_channel_mojo_unittest.cc
index 07b8fe1d..ddb0e09 100644
--- a/ipc/ipc_channel_mojo_unittest.cc
+++ b/ipc/ipc_channel_mojo_unittest.cc
@@ -739,7 +739,7 @@
         listener, io_thread_.task_runner(), &never_signaled_);
   }
 
-  void RunProxy(bool create_paused) {
+  void RunProxy() {
     std::unique_ptr<IPC::ChannelFactory> factory;
     if (for_server_) {
       factory = IPC::ChannelMojo::CreateServerFactory(
@@ -748,7 +748,7 @@
       factory = IPC::ChannelMojo::CreateClientFactory(
           std::move(handle_), io_thread_.task_runner());
     }
-    proxy_->Init(std::move(factory), true, create_paused);
+    proxy_->Init(std::move(factory), true);
   }
 
   IPC::ChannelProxy* proxy() { return proxy_.get(); }
@@ -771,8 +771,8 @@
     runner_.reset(new ChannelProxyRunner(TakeHandle(), true));
   }
   void CreateProxy(IPC::Listener* listener) { runner_->CreateProxy(listener); }
-  void RunProxy(bool create_paused = false) {
-    runner_->RunProxy(create_paused);
+  void RunProxy() {
+    runner_->RunProxy();
   }
   void DestroyProxy() {
     runner_.reset();
@@ -878,7 +878,7 @@
 
   void CreateProxy(IPC::Listener* listener) { runner_->CreateProxy(listener); }
 
-  void RunProxy() { runner_->RunProxy(false); }
+  void RunProxy() { runner_->RunProxy(); }
 
   void DestroyProxy() {
     runner_.reset();
@@ -1252,19 +1252,22 @@
   DestroyProxy();
 }
 
-TEST_F(IPCChannelProxyMojoTest, CreatePaused) {
-  // Ensures that creating a paused channel elicits the expected behavior when
-  // sending messages, unpausing, sending more messages, and then manually
-  // flushing. Specifically a sequence like:
+TEST_F(IPCChannelProxyMojoTest, Pause) {
+  // Ensures that pausing a channel elicits the expected behavior when sending
+  // messages, unpausing, sending more messages, and then manually flushing.
+  // Specifically a sequence like:
   //
   //   Connect()
   //   Send(A)
+  //   Pause()
   //   Send(B)
-  //   Unpause(false)
   //   Send(C)
+  //   Unpause(false)
+  //   Send(D)
+  //   Send(E)
   //   Flush()
   //
-  // must result in the other end receiving messages C, A, and then B, in that
+  // must result in the other end receiving messages A, D, E, B, D; in that
   // order.
   //
   // This behavior is required by some consumers of IPC::Channel, and it is not
@@ -1275,17 +1278,22 @@
 
   DummyListener listener;
   CreateProxy(&listener);
-  RunProxy(true /* create_paused */);
+  RunProxy();
+
+  // This message must be sent immediately since the channel is unpaused.
+  SendValue(proxy(), 1);
+
+  proxy()->Pause();
 
   // These messages must be queued internally since the channel is paused.
-  SendValue(proxy(), 1);
   SendValue(proxy(), 2);
+  SendValue(proxy(), 3);
 
   proxy()->Unpause(false /* flush */);
 
   // These messages must be sent immediately since the channel is unpaused.
-  SendValue(proxy(), 3);
   SendValue(proxy(), 4);
+  SendValue(proxy(), 5);
 
   // Now we flush the previously queued messages.
   proxy()->Flush();
@@ -1323,10 +1331,11 @@
   std::queue<int32_t> expected_values;
   ExpectValueSequenceListener listener(&expected_values);
   CreateProxy(&listener);
-  expected_values.push(3);
-  expected_values.push(4);
   expected_values.push(1);
+  expected_values.push(4);
+  expected_values.push(5);
   expected_values.push(2);
+  expected_values.push(3);
   RunProxy();
   base::RunLoop().Run();
   EXPECT_TRUE(expected_values.empty());
diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc
index db51da8..9bbeb85 100644
--- a/ipc/ipc_channel_proxy.cc
+++ b/ipc/ipc_channel_proxy.cc
@@ -115,6 +115,12 @@
 }
 
 // Called on the IPC::Channel thread
+void ChannelProxy::Context::PauseChannel() {
+  DCHECK(channel_);
+  channel_->Pause();
+}
+
+// Called on the IPC::Channel thread
 void ChannelProxy::Context::UnpauseChannel(bool flush) {
   DCHECK(channel_);
   channel_->Unpause(flush);
@@ -171,15 +177,14 @@
 }
 
 // Called on the IPC::Channel thread
-void ChannelProxy::Context::OnChannelOpened(bool pause) {
+void ChannelProxy::Context::OnChannelOpened() {
   DCHECK(channel_ != NULL);
 
   // Assume a reference to ourselves on behalf of this thread.  This reference
   // will be released when we are closed.
   AddRef();
 
-  bool success = pause ? channel_->ConnectPaused() : channel_->Connect();
-  if (!success) {
+  if (!channel_->Connect()) {
     OnChannelError();
     return;
   }
@@ -475,8 +480,7 @@
 
 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
                         Channel::Mode mode,
-                        bool create_pipe_now,
-                        bool create_paused) {
+                        bool create_pipe_now) {
 #if defined(OS_POSIX)
   // When we are creating a server on POSIX, we need its file descriptor
   // to be created immediately so that it can be accessed and passed
@@ -488,12 +492,11 @@
 #endif  // defined(OS_POSIX)
   Init(
       ChannelFactory::Create(channel_handle, mode, context_->ipc_task_runner()),
-      create_pipe_now, create_paused);
+      create_pipe_now);
 }
 
 void ChannelProxy::Init(std::unique_ptr<ChannelFactory> factory,
-                        bool create_pipe_now,
-                        bool create_paused) {
+                        bool create_pipe_now) {
   DCHECK(CalledOnValidThread());
   DCHECK(!did_init_);
 
@@ -512,12 +515,17 @@
   // complete initialization on the background thread
   context_->ipc_task_runner()->PostTask(
       FROM_HERE,
-      base::Bind(&Context::OnChannelOpened, context_, create_paused));
+      base::Bind(&Context::OnChannelOpened, context_));
 
   did_init_ = true;
   OnChannelInit();
 }
 
+void ChannelProxy::Pause() {
+  context_->ipc_task_runner()->PostTask(
+      FROM_HERE, base::Bind(&Context::PauseChannel, context_));
+}
+
 void ChannelProxy::Unpause(bool flush) {
   context_->ipc_task_runner()->PostTask(
       FROM_HERE, base::Bind(&Context::UnpauseChannel, context_, flush));
diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h
index b544a19..06b3267 100644
--- a/ipc/ipc_channel_proxy.h
+++ b/ipc/ipc_channel_proxy.h
@@ -113,20 +113,21 @@
   // Initializes the channel proxy. Only call this once to initialize a channel
   // proxy that was not initialized in its constructor. If |create_pipe_now| is
   // true, the pipe is created synchronously. Otherwise it's created on the IO
-  // thread. If |create_paused| is true, outgoing messages will continue to be
-  // queued until Unpause() is called.
+  // thread.
   void Init(const IPC::ChannelHandle& channel_handle,
             Channel::Mode mode,
-            bool create_pipe_now,
-            bool create_paused = false);
+            bool create_pipe_now);
   void Init(std::unique_ptr<ChannelFactory> factory,
-            bool create_pipe_now,
-            bool create_paused = false);
+            bool create_pipe_now);
 
-  // Unpause the channel. Only useful if Init was called with |create_paused|.
-  // If |flush| is true the channel will be flushed as soon as it's unpaused.
-  // Otherwise you must explicitly call Flush() to flush messages which were
-  // queued while the channel was paused.
+  // Pause the channel. Subsequent calls to Send() will be internally queued
+  // until Unpause() is called. Queued messages will not be sent until the
+  // channel is flushed.
+  void Pause();
+
+  // Unpause the channel. If |flush| is true the channel will be flushed as soon
+  // as it's unpaused (see Flush() below.) Otherwise you must explicitly call
+  // Flush() to flush messages which were queued while the channel was paused.
   void Unpause(bool flush);
 
   // Flush the channel. This sends any messages which were queued before calling
@@ -288,11 +289,12 @@
     // Returns true if the message was processed, false otherwise.
     bool TryFilters(const Message& message);
 
+    void PauseChannel();
     void UnpauseChannel(bool flush);
     void FlushChannel();
 
     // Like Open and Close, but called on the IPC thread.
-    virtual void OnChannelOpened(bool pause);
+    virtual void OnChannelOpened();
     virtual void OnChannelClosed();
 
     // Called on the consumers thread when the ChannelProxy is closed.  At that
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
index affcf18..1552f31 100644
--- a/ipc/ipc_mojo_bootstrap.cc
+++ b/ipc/ipc_mojo_bootstrap.cc
@@ -72,9 +72,14 @@
                    base::Unretained(this)));
   }
 
-  void Start() {
-    DCHECK(!started_);
-    started_ = true;
+  void Pause() {
+    DCHECK(!paused_);
+    paused_ = true;
+  }
+
+  void Unpause() {
+    DCHECK(paused_);
+    paused_ = false;
   }
 
   void FlushOutgoingMessages() {
@@ -473,9 +478,7 @@
   bool SendMessage(mojo::Message* message) {
     if (task_runner_->BelongsToCurrentThread()) {
       DCHECK(thread_checker_.CalledOnValidThread());
-      if (!connector_ || !started_) {
-        // Pipe may not be bound yet or the channel may still be paused, so we
-        // queue the message.
+      if (!connector_ || paused_) {
         outgoing_messages_.emplace_back(std::move(*message));
         return true;
       }
@@ -760,7 +763,7 @@
 
   scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
   const bool set_interface_id_namespace_bit_;
-  bool started_ = false;
+  bool paused_ = false;
   std::unique_ptr<mojo::Connector> connector_;
   mojo::FilterChain filters_;
   mojo::PipeControlMessageHandler control_message_handler_;
@@ -812,8 +815,12 @@
     delegate_->OnPipesAvailable(std::move(sender), std::move(receiver));
   }
 
-  void Start() override {
-    controller_->Start();
+  void Pause() override {
+    controller_->Pause();
+  }
+
+  void Unpause() override {
+    controller_->Unpause();
   }
 
   void Flush() override {
diff --git a/ipc/ipc_mojo_bootstrap.h b/ipc/ipc_mojo_bootstrap.h
index fb71e8a..5188cd0 100644
--- a/ipc/ipc_mojo_bootstrap.h
+++ b/ipc/ipc_mojo_bootstrap.h
@@ -52,8 +52,11 @@
   // Start the handshake over the underlying message pipe.
   virtual void Connect() = 0;
 
+  // Stop transmitting messages and start queueing them instead.
+  virtual void Pause() = 0;
+
   // Stop queuing new messages and start transmitting them instead.
-  virtual void Start() = 0;
+  virtual void Unpause() = 0;
 
   // Flush outgoing messages which were queued before Start().
   virtual void Flush() = 0;
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
index 682001e8..a627d42 100644
--- a/ipc/ipc_sync_channel.cc
+++ b/ipc/ipc_sync_channel.cc
@@ -450,12 +450,12 @@
   Context::OnChannelError();
 }
 
-void SyncChannel::SyncContext::OnChannelOpened(bool pause) {
+void SyncChannel::SyncContext::OnChannelOpened() {
   shutdown_watcher_.StartWatching(
       shutdown_event_,
       base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled,
                  base::Unretained(this)));
-  Context::OnChannelOpened(pause);
+  Context::OnChannelOpened();
 }
 
 void SyncChannel::SyncContext::OnChannelClosed() {
diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h
index 52d4ae9..e8c96d20 100644
--- a/ipc/ipc_sync_channel.h
+++ b/ipc/ipc_sync_channel.h
@@ -189,7 +189,7 @@
     // Called on the IPC thread.
     bool OnMessageReceived(const Message& msg) override;
     void OnChannelError() override;
-    void OnChannelOpened(bool pause) override;
+    void OnChannelOpened() override;
     void OnChannelClosed() override;
 
     // Cancels all pending Send calls.