Handle sync loads with DRP's URLLoaderThrottle

After https://chromium-review.googlesource.com/c/chromium/src/+/1565873,
another interesting sequencing scenario is a synchronous load (as in
blink::ResourceLoader::RequestSynchronously() triggered by a synchronous
XMLHttpRequest() or WorkerGlobalScope.importScripts()).  In this case,
the URLLoaderThrottle is not used on the sequence where it was created
(see https://chromium-review.googlesource.com/c/chromium/src/+/631040/).
If it calls back to the throttle manager
(manager_->MarkProxiesAsBad()), we're trying to make a mojo call on the
wrong sequence again.

The solution is similar to safe_browsing::RendererURLLoaderThrottle:
"private" mojo pipes to mojom::DataReductionProxy.  We take care to make
them private in the synchronous load case only: it's not unusual for a
single renderer to create tens of throttles, but only a small percentage
of those is for a synchronous load (and there should be fewer and fewer
of them given that synchronous XHR is deprecated).

Bug: 963353
Change-Id: Ie502b881e946d188d6613119de02b1f6c0874a30
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1609834
Commit-Queue: Wojciech Dzierżanowski <wdzierzanowski@opera.com>
Reviewed-by: Tarun Bansal <tbansal@chromium.org>
Cr-Commit-Position: refs/heads/master@{#659873}
diff --git a/chrome/browser/data_reduction_proxy/data_reduction_proxy_browsertest.cc b/chrome/browser/data_reduction_proxy/data_reduction_proxy_browsertest.cc
index e9eba24..74cf42f 100644
--- a/chrome/browser/data_reduction_proxy/data_reduction_proxy_browsertest.cc
+++ b/chrome/browser/data_reduction_proxy/data_reduction_proxy_browsertest.cc
@@ -437,17 +437,27 @@
 }
 
 // Gets the response body for an XHR to |url| (as seen by the renderer).
-std::string ReadSubresourceFromRenderer(Browser* browser, const GURL& url) {
-  std::string script = R"((url => {
+std::string ReadSubresourceFromRenderer(Browser* browser,
+                                        const GURL& url,
+                                        bool asynchronous_xhr = true) {
+  static const char asynchronous_script[] = R"((url => {
     var xhr = new XMLHttpRequest();
     xhr.open('GET', url, true);
     xhr.onload = () => domAutomationController.send(xhr.responseText);
     xhr.send();
   }))";
+  static const char synchronous_script[] = R"((url => {
+    var xhr = new XMLHttpRequest();
+    xhr.open('GET', url, false);
+    xhr.send();
+    domAutomationController.send(xhr.responseText);
+  }))";
   std::string result;
   EXPECT_TRUE(ExecuteScriptAndExtractString(
       browser->tab_strip_model()->GetActiveWebContents(),
-      script + "('" + url.spec() + "')", &result));
+      base::StrCat({asynchronous_xhr ? asynchronous_script : synchronous_script,
+                    "('", url.spec(), "')"}),
+      &result));
   return result;
 }
 
@@ -490,6 +500,29 @@
   EXPECT_THAT(result, HasSubstr("p="));
 }
 
+IN_PROC_BROWSER_TEST_F(DataReductionProxyBrowsertest,
+                       ChromeProxyHeaderSetForSubresourceSync) {
+  net::EmbeddedTestServer test_server;
+  test_server.RegisterRequestHandler(
+      base::BindRepeating(&BasicResponse, kDummyBody));
+  ASSERT_TRUE(test_server.Start());
+
+  ui_test_utils::NavigateToURL(browser(),
+                               GetURLWithMockHost(test_server, "/echo"));
+
+  const bool asynchronous_xhr = false;
+  std::string result = ReadSubresourceFromRenderer(
+      browser(), GetURLWithMockHost(test_server, "/echoheader?Chrome-Proxy"),
+      asynchronous_xhr);
+
+  EXPECT_THAT(result, HasSubstr(kSessionKey));
+  EXPECT_THAT(result, Not(HasSubstr("pid=")));
+  EXPECT_THAT(result, HasSubstr("s="));
+  EXPECT_THAT(result, HasSubstr("c="));
+  EXPECT_THAT(result, HasSubstr("b="));
+  EXPECT_THAT(result, HasSubstr("p="));
+}
+
 IN_PROC_BROWSER_TEST_F(DataReductionProxyBrowsertest, ChromeProxyEctHeaderSet) {
   // Proxy will be used, so it shouldn't matter if the host cannot be resolved.
   ui_test_utils::NavigateToURL(
diff --git a/components/data_reduction_proxy/content/common/BUILD.gn b/components/data_reduction_proxy/content/common/BUILD.gn
index 87e8d32..f33a342 100644
--- a/components/data_reduction_proxy/content/common/BUILD.gn
+++ b/components/data_reduction_proxy/content/common/BUILD.gn
@@ -10,9 +10,13 @@
     "header_util.h",
   ]
 
-  deps = [
+  public_deps = [
     "//components/data_reduction_proxy/core/common",
     "//content/public/common",
+    "//mojo/public/cpp/bindings",
+  ]
+
+  deps = [
     "//net",
   ]
 }
@@ -29,6 +33,7 @@
     "//base/test:test_support",
     "//components/data_reduction_proxy/core/common",
     "//content/public/common",
+    "//mojo/public/cpp/bindings",
     "//net",
     "//testing/gtest",
   ]
diff --git a/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle.cc b/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle.cc
index 206011b..32526960 100644
--- a/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle.cc
+++ b/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle.cc
@@ -35,17 +35,57 @@
 DataReductionProxyURLLoaderThrottle::DataReductionProxyURLLoaderThrottle(
     const net::HttpRequestHeaders& post_cache_headers,
     DataReductionProxyThrottleManager* manager)
-    : post_cache_headers_(post_cache_headers), manager_(manager) {
-  DCHECK(manager);
+    : post_cache_headers_(post_cache_headers),
+      manager_(manager),
+      data_reduction_proxy_(manager_->data_reduction_proxy()),
+      private_config_observer_binding_(this) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+  DCHECK(manager_);
+
+  manager_->AddSameSequenceObserver(this);
+  OnThrottleConfigChanged(manager_->last_proxy_config());
 }
 
-DataReductionProxyURLLoaderThrottle::~DataReductionProxyURLLoaderThrottle() {}
+DataReductionProxyURLLoaderThrottle::~DataReductionProxyURLLoaderThrottle() {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
 
-void DataReductionProxyURLLoaderThrottle::DetachFromCurrentSequence() {}
+  if (!private_data_reduction_proxy_info_ && !private_data_reduction_proxy_)
+    manager_->RemoveSameSequenceObserver(this);
+}
+
+void DataReductionProxyURLLoaderThrottle::DetachFromCurrentSequence() {
+  DETACH_FROM_SEQUENCE(sequence_checker_);
+
+  manager_->RemoveSameSequenceObserver(this);
+
+  data_reduction_proxy_->Clone(
+      mojo::MakeRequest(&private_data_reduction_proxy_info_));
+  data_reduction_proxy_ = nullptr;
+}
+
+void DataReductionProxyURLLoaderThrottle::SetUpPrivateMojoPipes() {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+  DCHECK_EQ(data_reduction_proxy_, nullptr);
+
+  // Bind the pipe created in DetachFromCurrentSequence() to the current
+  // sequence.
+  private_data_reduction_proxy_.Bind(
+      std::move(private_data_reduction_proxy_info_));
+  data_reduction_proxy_ = private_data_reduction_proxy_.get();
+
+  mojom::DataReductionProxyThrottleConfigObserverPtr observer_ptr;
+  private_config_observer_binding_.Bind(mojo::MakeRequest(&observer_ptr));
+  data_reduction_proxy_->AddThrottleConfigObserver(std::move(observer_ptr));
+}
 
 void DataReductionProxyURLLoaderThrottle::WillStartRequest(
     network::ResourceRequest* request,
     bool* defer) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
+  if (private_data_reduction_proxy_info_)
+    SetUpPrivateMojoPipes();
+
   url_chain_.clear();
   url_chain_.push_back(request->url);
   request_method_ = request->method;
@@ -68,6 +108,8 @@
     bool* defer,
     std::vector<std::string>* to_be_removed_request_headers,
     net::HttpRequestHeaders* modified_request_headers) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
   url_chain_.push_back(redirect_info->new_url);
   request_method_ = redirect_info->new_method;
 }
@@ -76,6 +118,8 @@
     const GURL& response_url,
     const network::ResourceResponseHead& response_head,
     bool* defer) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
   before_will_process_response_received_ = true;
   if (response_head.was_fetched_via_cache)
     return;
@@ -99,6 +143,8 @@
     const net::HttpResponseHeaders* headers,
     net::Error net_error,
     bool* defer) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
   // The set of data reduction proxy servers to mark as bad prior to
   // restarting the request.
   std::vector<net::ProxyServer> bad_proxies;
@@ -117,9 +163,9 @@
   DataReductionProxyBypassProtocol protocol;
   pending_restart_ = protocol.MaybeBypassProxyAndPrepareToRetry(
       request_method_, url_chain_, headers, proxy_server, net_error,
-      proxy_retry_info,
-      manager_->FindConfiguredDataReductionProxy(proxy_server), &bypass_type,
-      &data_reduction_proxy_info, &bad_proxies, &pending_restart_load_flags_);
+      proxy_retry_info, FindConfiguredDataReductionProxy(proxy_server),
+      &bypass_type, &data_reduction_proxy_info, &bad_proxies,
+      &pending_restart_load_flags_);
 
   if (!bad_proxies.empty())
     MarkProxiesAsBad(bad_proxies, data_reduction_proxy_info.bypass_duration);
@@ -140,8 +186,10 @@
     const GURL& response_url,
     network::ResourceResponseHead* response_head,
     bool* defer) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
   base::Optional<DataReductionProxyTypeInfo> proxy_info =
-      manager_->FindConfiguredDataReductionProxy(response_head->proxy_server);
+      FindConfiguredDataReductionProxy(response_head->proxy_server);
   if (!proxy_info || (final_load_flags_ & net::LOAD_BYPASS_PROXY) != 0)
     return;
 
@@ -152,6 +200,8 @@
 void DataReductionProxyURLLoaderThrottle::WillOnCompleteWithError(
     const network::URLLoaderCompletionStatus& status,
     bool* defer) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
   if (!before_will_process_response_received_) {
     MaybeRetry(status.proxy_server, nullptr,
                static_cast<net::Error>(status.error_code), defer);
@@ -161,6 +211,7 @@
 void DataReductionProxyURLLoaderThrottle::MarkProxiesAsBad(
     const std::vector<net::ProxyServer>& bad_proxies,
     base::TimeDelta bypass_duration) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
   DCHECK(!waiting_for_mark_proxies_);
   DCHECK(!bad_proxies.empty());
 
@@ -175,11 +226,18 @@
       weak_factory_.GetWeakPtr());
 
   waiting_for_mark_proxies_ = true;
-  manager_->MarkProxiesAsBad(bypass_duration, proxy_list, std::move(callback));
+
+  // There is no need to handle the case where |callback| is never invoked
+  // (possible on connection error). That would imply disconnection from the
+  // browser, which is not recoverable.
+  data_reduction_proxy_->MarkProxiesAsBad(bypass_duration, proxy_list,
+                                          std::move(callback));
 }
 
 void DataReductionProxyURLLoaderThrottle::OnMarkProxiesAsBadComplete() {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
   DCHECK(waiting_for_mark_proxies_);
+
   waiting_for_mark_proxies_ = false;
 
   DoPendingRestart();
@@ -188,7 +246,34 @@
   delegate_->Resume();
 }
 
+void DataReductionProxyURLLoaderThrottle::OnThrottleConfigChanged(
+    mojom::DataReductionProxyThrottleConfigPtr config) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
+  proxies_for_http_.clear();
+
+  if (!config)
+    return;
+
+  // TODO(eroman): Use typemappings instead of converting here?
+  for (const auto& entry : config->proxies_for_http) {
+    proxies_for_http_.push_back(DataReductionProxyServer(entry->proxy_server));
+  }
+}
+
+base::Optional<DataReductionProxyTypeInfo>
+DataReductionProxyURLLoaderThrottle::FindConfiguredDataReductionProxy(
+    const net::ProxyServer& proxy_server) const {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
+  // TODO(https://crbug.com/721403): The non-NS code also searches through the
+  // recently seen proxies, not just the current ones.
+  return params::FindConfiguredProxyInVector(proxies_for_http_, proxy_server);
+}
+
 void DataReductionProxyURLLoaderThrottle::DoPendingRestart() {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+
   if (!pending_restart_)
     return;
 
diff --git a/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle.h b/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle.h
index bd99a50..4e83f3fe 100644
--- a/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle.h
+++ b/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle.h
@@ -5,12 +5,18 @@
 #ifndef COMPONENTS_DATA_REDUCTION_PROXY_CONTENT_COMMON_DATA_REDUCTION_PROXY_URL_LOADER_THROTTLE_H_
 #define COMPONENTS_DATA_REDUCTION_PROXY_CONTENT_COMMON_DATA_REDUCTION_PROXY_URL_LOADER_THROTTLE_H_
 
+#include <vector>
+
+#include "base/sequence_checker.h"
 #include "components/data_reduction_proxy/core/common/data_reduction_proxy.mojom.h"
+#include "components/data_reduction_proxy/core/common/data_reduction_proxy_server.h"
 #include "content/public/common/url_loader_throttle.h"
+#include "mojo/public/cpp/bindings/binding.h"
 
 namespace data_reduction_proxy {
 
 class DataReductionProxyThrottleManager;
+struct DataReductionProxyTypeInfo;
 
 // Handles Data Reduction Proxy logic that needs to be applied to each request.
 //
@@ -19,10 +25,11 @@
 //   * Processing response headers from a data reduction proxy.
 //   * Restarting the URL loader in order to use a different proxy.
 //   * Marking data reduction proxies to be bypassed for future requests.
-class DataReductionProxyURLLoaderThrottle : public content::URLLoaderThrottle {
+class DataReductionProxyURLLoaderThrottle
+    : public content::URLLoaderThrottle,
+      public mojom::DataReductionProxyThrottleConfigObserver {
  public:
-  // |manager| is shared between all the DRP Throttles and used to access shared
-  // state such as the current DRP configuration.
+  // |manager| is shared between all the DRP Throttles.
   DataReductionProxyURLLoaderThrottle(
       const net::HttpRequestHeaders& post_cache_headers,
       DataReductionProxyThrottleManager* manager);
@@ -48,7 +55,16 @@
   void WillOnCompleteWithError(const network::URLLoaderCompletionStatus& status,
                                bool* defer) override;
 
+  // mojom::DataReductionProxyThrottleConfigObserver:
+  void OnThrottleConfigChanged(
+      mojom::DataReductionProxyThrottleConfigPtr config) override;
+
  private:
+  // As the throttle instance is being moved to another sequence, this
+  // functions arranges for mojom::DataReductionProxy interactions to happen
+  // through this throttle's private mojo pipes.
+  void SetUpPrivateMojoPipes();
+
   // Retry the request bypassing proxies or falling back to next proxy based on
   // |net_error| and the response headers.
   void MaybeRetry(const net::ProxyServer& proxy_server,
@@ -64,6 +80,9 @@
   // Tells |delegate_| to restart the URL loader if |pending_restart_| was set.
   void DoPendingRestart();
 
+  base::Optional<DataReductionProxyTypeInfo> FindConfiguredDataReductionProxy(
+      const net::ProxyServer& proxy_server) const;
+
   net::HttpRequestHeaders post_cache_headers_;
 
   // List of URLs in the redirect chain. |.front()| is the original URL
@@ -73,6 +92,22 @@
 
   DataReductionProxyThrottleManager* manager_ = nullptr;
 
+  // Throttles that run on the same sequence as the manager share the manager's
+  // mojo pipes. In this case, |data_reduction_proxy_| routes calls through the
+  // manager's connection to mojom::DataReductionProxy, and
+  // mojom::DataReductionProxyThrottleConfigObserver events received by the
+  // manager are forwarded to the same-sequence throttles.
+  mojom::DataReductionProxy* data_reduction_proxy_;
+
+  // Throttles that run on different sequences need "private" mojo pipes.
+  mojom::DataReductionProxyPtrInfo private_data_reduction_proxy_info_;
+  mojom::DataReductionProxyPtr private_data_reduction_proxy_;
+  mojo::Binding<mojom::DataReductionProxyThrottleConfigObserver>
+      private_config_observer_binding_;
+
+  // The last seen config values.
+  std::vector<DataReductionProxyServer> proxies_for_http_;
+
   // |pending_restart_| is set to true if the URL loader needs to be restarted
   // using |pending_restart_load_flags_|.
   int pending_restart_load_flags_ = 0;
@@ -90,6 +125,8 @@
   // True if BeforeWillProcessResponse has been called.
   bool before_will_process_response_received_ = false;
 
+  SEQUENCE_CHECKER(sequence_checker_);
+
   base::WeakPtrFactory<DataReductionProxyURLLoaderThrottle> weak_factory_{this};
 };
 
diff --git a/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle_unittest.cc b/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle_unittest.cc
index c83aefe..d44e544 100644
--- a/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle_unittest.cc
+++ b/components/data_reduction_proxy/content/common/data_reduction_proxy_url_loader_throttle_unittest.cc
@@ -13,6 +13,7 @@
 #include "components/data_reduction_proxy/core/common/data_reduction_proxy_server.h"
 #include "components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.h"
 #include "content/public/common/previews_state.h"
+#include "mojo/public/cpp/bindings/binding_set.h"
 #include "net/base/load_flags.h"
 #include "net/http/http_request_headers.h"
 #include "testing/gtest/include/gtest/gtest.h"
@@ -27,16 +28,54 @@
   void MarkProxiesAsBad(base::TimeDelta bypass_duration,
                         const net::ProxyList& bad_proxies,
                         MarkProxiesAsBadCallback callback) override {
+    if (waiting_for_mark_as_bad_closure_) {
+      // We are being called via mojo.
+      std::move(callback).Run();
+      base::ThreadTaskRunnerHandle::Get()->PostTask(
+          FROM_HERE, std::move(waiting_for_mark_as_bad_closure_));
+      return;
+    }
+
+    // We are being called via the mojom::DataReductionProxy interface
+    // directly.
     base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
                                                   std::move(callback));
   }
 
   void AddThrottleConfigObserver(
-      mojom::DataReductionProxyThrottleConfigObserverPtr observer) override {}
+      mojom::DataReductionProxyThrottleConfigObserverPtr /* observer */)
+      override {
+    ++observer_count_;
+  }
 
-  void Clone(mojom::DataReductionProxyRequest request) override {}
+  void Clone(mojom::DataReductionProxyRequest request) override {
+    bindings_.AddBinding(this, std::move(request));
+  }
+
+  // When mojom::DataReductionProxy methods are called via mojo, things are
+  // asynchronous by nature.  Test code can then use this function to wait for
+  // MarkProxiesAsBad() to be called.
+  void WaitUntilMarkProxiesAsBadCalled() {
+    ASSERT_TRUE(waiting_for_mark_as_bad_closure_.is_null());
+
+    base::RunLoop run_loop;
+    waiting_for_mark_as_bad_closure_ = run_loop.QuitClosure();
+    run_loop.Run();
+  }
+
+  // +1 because MockMojoDataReductionProxy uses direct calls through the
+  // mojom::DataReductionProxy interface by default, and mojo is only involved
+  // after Clone().
+  size_t pipe_count() const { return bindings_.size() + 1u; }
+
+  size_t observer_count() const { return observer_count_; }
 
  private:
+  mojo::BindingSet<mojom::DataReductionProxy> bindings_;
+  size_t observer_count_ = 0;
+
+  base::OnceClosure waiting_for_mark_as_bad_closure_;
+
   DISALLOW_COPY_AND_ASSIGN(MockMojoDataReductionProxy);
 };
 
@@ -331,6 +370,53 @@
   EXPECT_EQ(0, delegate.restart_additional_load_flags);
 }
 
+TEST_F(DataReductionProxyURLLoaderThrottleTest, MarkProxyAsBadOnNewSequence) {
+  auto drp_server = MakeCoreDrpServer("HTTPS localhost");
+
+  auto manager = CreateManager(mock_mojo_data_reduction_proxy(), {drp_server});
+  MockDelegate delegate;
+  DataReductionProxyURLLoaderThrottle throttle((net::HttpRequestHeaders()),
+                                               manager.get());
+  throttle.set_delegate(&delegate);
+
+  // The URLLoaderThrottle is about to move to another sequence.  It should
+  // create a private pipe to the DRP interface in preparation.
+  throttle.DetachFromCurrentSequence();
+  EXPECT_EQ(mock_mojo_data_reduction_proxy()->pipe_count(), 2u);
+
+  network::ResourceRequest request;
+  request.resource_type = static_cast<int>(content::ResourceType::kMainFrame);
+  request.url = GURL("http://www.example.com/");
+  bool defer = false;
+
+  // WillStartRequest() is the first call on the new sequence.  This is when
+  // the throttle should bind the private pipes to the sequence.
+  throttle.WillStartRequest(&request, &defer);
+  EXPECT_FALSE(defer);
+
+  network::ResourceResponseHead response_head;
+  response_head.proxy_server = drp_server.proxy_server();
+  response_head.headers = base::MakeRefCounted<net::HttpResponseHeaders>(
+      "HTTP/1.1 500 Server error\n");
+
+  throttle.BeforeWillProcessResponse(request.url, response_head, &defer);
+
+  // The throttle should have marked the proxy as bad.
+  EXPECT_TRUE(defer);
+  EXPECT_EQ(0u, delegate.resume_called);
+  EXPECT_EQ(0u, delegate.restart_with_flags_called);
+
+  // The DRP calls are carried by mojo now, which makes them asynchronous.
+  mock_mojo_data_reduction_proxy()->WaitUntilMarkProxiesAsBadCalled();
+
+  EXPECT_EQ(mock_mojo_data_reduction_proxy()->observer_count(), 2u);
+
+  // The throttle should restart and resume.
+  EXPECT_EQ(1u, delegate.resume_called);
+  EXPECT_EQ(1u, delegate.restart_with_flags_called);
+  EXPECT_EQ(0, delegate.restart_additional_load_flags);
+}
+
 }  // namespace
 
 }  // namespace data_reduction_proxy
diff --git a/components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.cc b/components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.cc
index 2c37c32..70d8c93 100644
--- a/components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.cc
+++ b/components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.cc
@@ -17,13 +17,13 @@
 DataReductionProxyThrottleManager::DataReductionProxyThrottleManager(
     mojom::DataReductionProxy* data_reduction_proxy,
     mojom::DataReductionProxyThrottleConfigPtr initial_config)
-    : data_reduction_proxy_(data_reduction_proxy), binding_(this) {
+    : shared_data_reduction_proxy_(data_reduction_proxy), binding_(this) {
   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
 
   mojom::DataReductionProxyThrottleConfigObserverPtr observer;
   binding_.Bind(mojo::MakeRequest(&observer));
 
-  data_reduction_proxy_->AddThrottleConfigObserver(std::move(observer));
+  shared_data_reduction_proxy_->AddThrottleConfigObserver(std::move(observer));
 
   OnThrottleConfigChanged(std::move(initial_config));
 }
@@ -32,44 +32,29 @@
   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
 }
 
-base::Optional<DataReductionProxyTypeInfo>
-DataReductionProxyThrottleManager::FindConfiguredDataReductionProxy(
-    const net::ProxyServer& proxy_server) {
-  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
-
-  // TODO(https://crbug.com/721403): The non-NS code also searches through the
-  // recently seen proxies, not just the current ones.
-  return params::FindConfiguredProxyInVector(proxies_for_http_, proxy_server);
-}
-
-void DataReductionProxyThrottleManager::MarkProxiesAsBad(
-    base::TimeDelta bypass_duration,
-    const net::ProxyList& bad_proxies,
-    mojom::DataReductionProxy::MarkProxiesAsBadCallback callback) {
-  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
-
-  // There is no need to handle the case where |callback| is never invoked
-  // (possible on connection error). That would imply disconnection from the
-  // browser, which is not recoverable.
-  data_reduction_proxy_->MarkProxiesAsBad(bypass_duration, bad_proxies,
-                                          std::move(callback));
-}
-
 void DataReductionProxyThrottleManager::OnThrottleConfigChanged(
     mojom::DataReductionProxyThrottleConfigPtr config) {
   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+  last_proxy_config_ = config.Clone();
 
-  proxies_for_http_.clear();
-
-  if (!config)
-    return;
-
-  // TODO(eroman): Use typemappings instead of converting here?
-  for (const auto& entry : config->proxies_for_http) {
-    proxies_for_http_.push_back(DataReductionProxyServer(entry->proxy_server));
+  for (mojom::DataReductionProxyThrottleConfigObserver& observer :
+       same_sequence_observers_) {
+    observer.OnThrottleConfigChanged(config.Clone());
   }
 }
 
+void DataReductionProxyThrottleManager::AddSameSequenceObserver(
+    mojom::DataReductionProxyThrottleConfigObserver* observer) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+  same_sequence_observers_.AddObserver(observer);
+}
+
+void DataReductionProxyThrottleManager::RemoveSameSequenceObserver(
+    mojom::DataReductionProxyThrottleConfigObserver* observer) {
+  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+  same_sequence_observers_.RemoveObserver(observer);
+}
+
 // static
 mojom::DataReductionProxyThrottleConfigPtr
 DataReductionProxyThrottleManager::CreateConfig(
diff --git a/components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.h b/components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.h
index f4a14656..54cabcfd 100644
--- a/components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.h
+++ b/components/data_reduction_proxy/core/common/data_reduction_proxy_throttle_manager.h
@@ -5,6 +5,7 @@
 #ifndef COMPONENTS_DATA_REDUCTION_PROXY_CORE_COMMON_DATA_REDUCTION_PROXY_THROTTLE_MANAGER_H_
 #define COMPONENTS_DATA_REDUCTION_PROXY_CORE_COMMON_DATA_REDUCTION_PROXY_THROTTLE_MANAGER_H_
 
+#include "base/observer_list.h"
 #include "base/sequence_checker.h"
 #include "components/data_reduction_proxy/core/common/data_reduction_proxy.mojom.h"
 #include "mojo/public/cpp/bindings/binding.h"
@@ -12,49 +13,60 @@
 namespace data_reduction_proxy {
 
 class DataReductionProxyServer;
-struct DataReductionProxyTypeInfo;
 
 // Helper that encapsulates the shared state between
-// DataReductionProxyURLThrottles, whose main responsibility is keeping an up to
-// date view of the config.
+// DataReductionProxyURLThrottles, whose main responsibility is keeping the
+// shared mojo connections required by the throttles.
 class DataReductionProxyThrottleManager
     : public mojom::DataReductionProxyThrottleConfigObserver {
  public:
   // Observes |data_reduction_proxy| for changes to the config, and starts
   // off with the initial value (possibly empty) |initial_config|.
   DataReductionProxyThrottleManager(
-      mojom::DataReductionProxy* data_reduction_proxy_info,
+      mojom::DataReductionProxy* data_reduction_proxy,
       mojom::DataReductionProxyThrottleConfigPtr initial_config);
 
   ~DataReductionProxyThrottleManager() override;
 
-  base::Optional<DataReductionProxyTypeInfo> FindConfiguredDataReductionProxy(
-      const net::ProxyServer& proxy_server);
-
-  void MarkProxiesAsBad(
-      base::TimeDelta bypass_duration,
-      const net::ProxyList& bad_proxies,
-      mojom::DataReductionProxy::MarkProxiesAsBadCallback callback);
-
   // mojom::DataReductionProxyThrottleConfigObserver implementation.
   void OnThrottleConfigChanged(
       mojom::DataReductionProxyThrottleConfigPtr config) override;
 
+  // Called by throttles living on the manager's sequence when they want to
+  // sign up for / sign out of receiving
+  // mojom::DataReductionProxyThrottleConfigObserver events.
+  void AddSameSequenceObserver(
+      mojom::DataReductionProxyThrottleConfigObserver* observer);
+  void RemoveSameSequenceObserver(
+      mojom::DataReductionProxyThrottleConfigObserver* observer);
+
+  mojom::DataReductionProxy* data_reduction_proxy() {
+    return shared_data_reduction_proxy_;
+  }
+
+  mojom::DataReductionProxyThrottleConfigPtr last_proxy_config() const {
+    return last_proxy_config_.Clone();
+  }
+
   static mojom::DataReductionProxyThrottleConfigPtr CreateConfig(
       const std::vector<DataReductionProxyServer>& proxies_for_http);
 
  private:
-  void SetDataReductionProxy(mojom::DataReductionProxyPtr data_reduction_proxy);
-
-  mojom::DataReductionProxy* const data_reduction_proxy_;
-
-  // The last seen config values.
-  std::vector<DataReductionProxyServer> proxies_for_http_;
+  // Most DataReductionProxyURLThrottles will live on the manager's sequence.
+  // It makes sense for all of them to reuse the manager's mojo pipes set up
+  // for the interfaces mojom::DataReductionProxy and
+  // mojom::DataReductionProxyThrottleConfigObserver.
+  mojom::DataReductionProxy* const shared_data_reduction_proxy_;
+  base::ObserverList<mojom::DataReductionProxyThrottleConfigObserver>::Unchecked
+      same_sequence_observers_;
 
   mojo::Binding<
       data_reduction_proxy::mojom::DataReductionProxyThrottleConfigObserver>
       binding_;
 
+  // The last seen config values.
+  mojom::DataReductionProxyThrottleConfigPtr last_proxy_config_;
+
   SEQUENCE_CHECKER(sequence_checker_);
 
   DISALLOW_COPY_AND_ASSIGN(DataReductionProxyThrottleManager);