refactor response generator in client_lb_end2end_test
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index ff728a3..c48f232 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -89,9 +89,15 @@
     : Resolver(args.combiner, std::move(args.result_handler)) {
   GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this,
                     grpc_combiner_scheduler(combiner()));
-  channel_args_ = grpc_channel_args_copy(args.args);
   FakeResolverResponseGenerator* response_generator =
       FakeResolverResponseGenerator::GetFromArgs(args.args);
+  // Channels sharing the same subchannels may have different resolver response
+  // generators. If we don't remove this arg, subchannel pool will create new
+  // subchannels for the same address instead of reusing existing ones because
+  // of different values of this channel arg.
+  const char* args_to_remove[] = {GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
+  channel_args_ = grpc_channel_args_copy_and_remove(
+      args.args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove));
   if (response_generator != nullptr) {
     response_generator->resolver_ = this;
     if (response_generator->has_result_) {
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index d499f13..8fc0eac 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -133,6 +133,59 @@
   std::set<grpc::string> clients_;
 };
 
+class FakeResolverResponseGeneratorWrapper {
+ public:
+  FakeResolverResponseGeneratorWrapper()
+      : response_generator_(grpc_core::MakeRefCounted<
+                            grpc_core::FakeResolverResponseGenerator>()) {}
+
+  FakeResolverResponseGeneratorWrapper(
+      FakeResolverResponseGeneratorWrapper&& other) {
+    response_generator_ = std::move(other.response_generator_);
+  }
+
+  void SetNextResolution(const std::vector<int>& ports) {
+    grpc_core::ExecCtx exec_ctx;
+    response_generator_->SetResponse(BuildFakeResults(ports));
+  }
+
+  void SetNextResolutionUponError(const std::vector<int>& ports) {
+    grpc_core::ExecCtx exec_ctx;
+    response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
+  }
+
+  void SetFailureOnReresolution() {
+    grpc_core::ExecCtx exec_ctx;
+    response_generator_->SetFailureOnReresolution();
+  }
+
+  grpc_core::FakeResolverResponseGenerator* Get() const {
+    return response_generator_.get();
+  }
+
+ private:
+  static grpc_core::Resolver::Result BuildFakeResults(
+      const std::vector<int>& ports) {
+    grpc_core::Resolver::Result result;
+    for (const int& port : ports) {
+      char* lb_uri_str;
+      gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
+      grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
+      GPR_ASSERT(lb_uri != nullptr);
+      grpc_resolved_address address;
+      GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
+      result.addresses.emplace_back(address.addr, address.len,
+                                    nullptr /* args */);
+      grpc_uri_destroy(lb_uri);
+      gpr_free(lb_uri_str);
+    }
+    return result;
+  }
+
+  grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
+      response_generator_;
+};
+
 class ClientLbEnd2endTest : public ::testing::Test {
  protected:
   ClientLbEnd2endTest()
@@ -147,11 +200,7 @@
     GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
   }
 
-  void SetUp() override {
-    grpc_init();
-    response_generator_ =
-        grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
-  }
+  void SetUp() override { grpc_init(); }
 
   void TearDown() override {
     for (size_t i = 0; i < servers_.size(); ++i) {
@@ -186,38 +235,6 @@
     }
   }
 
-  grpc_core::Resolver::Result BuildFakeResults(const std::vector<int>& ports) {
-    grpc_core::Resolver::Result result;
-    for (const int& port : ports) {
-      char* lb_uri_str;
-      gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
-      grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
-      GPR_ASSERT(lb_uri != nullptr);
-      grpc_resolved_address address;
-      GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
-      result.addresses.emplace_back(address.addr, address.len,
-                                    nullptr /* args */);
-      grpc_uri_destroy(lb_uri);
-      gpr_free(lb_uri_str);
-    }
-    return result;
-  }
-
-  void SetNextResolution(const std::vector<int>& ports) {
-    grpc_core::ExecCtx exec_ctx;
-    response_generator_->SetResponse(BuildFakeResults(ports));
-  }
-
-  void SetNextResolutionUponError(const std::vector<int>& ports) {
-    grpc_core::ExecCtx exec_ctx;
-    response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
-  }
-
-  void SetFailureOnReresolution() {
-    grpc_core::ExecCtx exec_ctx;
-    response_generator_->SetFailureOnReresolution();
-  }
-
   std::vector<int> GetServersPorts(size_t start_index = 0) {
     std::vector<int> ports;
     for (size_t i = start_index; i < servers_.size(); ++i) {
@@ -226,6 +243,10 @@
     return ports;
   }
 
+  FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
+    return FakeResolverResponseGeneratorWrapper();
+  }
+
   std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
       const std::shared_ptr<Channel>& channel) {
     return grpc::testing::EchoTestService::NewStub(channel);
@@ -233,12 +254,13 @@
 
   std::shared_ptr<Channel> BuildChannel(
       const grpc::string& lb_policy_name,
+      const FakeResolverResponseGeneratorWrapper& response_generator,
       ChannelArguments args = ChannelArguments()) {
     if (lb_policy_name.size() > 0) {
       args.SetLoadBalancingPolicyName(lb_policy_name);
     }  // else, default to pick first
     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
-                    response_generator_.get());
+                    response_generator.Get());
     return ::grpc::CreateCustomChannel("fake:///", creds_, args);
   }
 
@@ -401,8 +423,6 @@
   const grpc::string server_host_;
   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
   std::vector<std::unique_ptr<ServerData>> servers_;
-  grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
-      response_generator_;
   const grpc::string kRequestMessage_;
   std::shared_ptr<ChannelCredentials> creds_;
 };
@@ -410,7 +430,8 @@
 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("", response_generator);
   auto stub = BuildStub(channel);
   // Initial state should be IDLE.
   EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
@@ -423,7 +444,7 @@
   EXPECT_EQ(channel->GetState(false /* try_to_connect */),
             GRPC_CHANNEL_CONNECTING);
   // Return a resolver result, which allows the connection attempt to proceed.
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   // We should eventually transition into state READY.
   EXPECT_TRUE(WaitForChannelReady(channel.get()));
 }
@@ -432,9 +453,11 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("");  // test that pick first is the default.
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel(
+      "", response_generator);  // test that pick first is the default.
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   for (size_t i = 0; i < servers_.size(); ++i) {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
   }
@@ -454,19 +477,22 @@
 }
 
 TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
-  StartServers(1);                  // Single server
-  auto channel = BuildChannel("");  // test that pick first is the default.
+  StartServers(1);  // Single server
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel(
+      "", response_generator);  // test that pick first is the default.
   auto stub = BuildStub(channel);
-  SetNextResolution({servers_[0]->port_});
+  response_generator.SetNextResolution({servers_[0]->port_});
   WaitForServer(stub, 0, DEBUG_LOCATION);
   // Create a new channel and its corresponding PF LB policy, which will pick
   // the subchannels in READY state from the previous RPC against the same
   // target (even if it happened over a different channel, because subchannels
   // are globally reused). Progress should happen without any transition from
   // this READY state.
-  auto second_channel = BuildChannel("");
+  auto second_response_generator = BuildResolverResponseGenerator();
+  auto second_channel = BuildChannel("", second_response_generator);
   auto second_stub = BuildStub(second_channel);
-  SetNextResolution({servers_[0]->port_});
+  second_response_generator.SetNextResolution({servers_[0]->port_});
   CheckRpcSendOk(second_stub, DEBUG_LOCATION);
 }
 
@@ -479,16 +505,18 @@
                             grpc_pick_unused_port_or_die()};
   CreateServers(2, ports);
   StartServer(1);
-  auto channel1 = BuildChannel("pick_first", args);
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("pick_first", response_generator1, args);
   auto stub1 = BuildStub(channel1);
-  SetNextResolution(ports);
+  response_generator1.SetNextResolution(ports);
   // Wait for second server to be ready.
   WaitForServer(stub1, 1, DEBUG_LOCATION);
   // Create a second channel with the same addresses.  Its PF instance
   // should immediately pick the second subchannel, since it's already
   // in READY state.
-  auto channel2 = BuildChannel("pick_first", args);
-  SetNextResolution(ports);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("pick_first", response_generator2, args);
+  response_generator2.SetNextResolution(ports);
   // Check that the channel reports READY without waiting for the
   // initial backoff.
   EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
@@ -500,9 +528,10 @@
   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
   const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
-  auto channel = BuildChannel("pick_first", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // The channel won't become connected (there's no server).
   ASSERT_FALSE(channel->WaitForConnected(
       grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
@@ -529,9 +558,10 @@
   constexpr int kMinReconnectBackOffMs = 1000;
   args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
-  auto channel = BuildChannel("pick_first", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // Make connection delay a 10% longer than it's willing to in order to make
   // sure we are hitting the codepath that waits for the min reconnect backoff.
   gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
@@ -554,9 +584,10 @@
   constexpr int kInitialBackOffMs = 1000;
   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
-  auto channel = BuildChannel("pick_first", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // The channel won't become connected (there's no server).
   EXPECT_FALSE(
       channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
@@ -585,9 +616,10 @@
   constexpr int kInitialBackOffMs = 1000;
   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
-  auto channel = BuildChannel("pick_first", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // Wait for connect, which should fail ~immediately, because the server
   // is not up.
   gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
@@ -628,21 +660,22 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
 
   std::vector<int> ports;
 
   // Perform one RPC against the first server.
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET [0] *******");
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
 
   // An empty update will result in the channel going into TRANSIENT_FAILURE.
   ports.clear();
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET none *******");
   grpc_connectivity_state channel_state;
   do {
@@ -654,7 +687,7 @@
   // Next update introduces servers_[1], making the channel recover.
   ports.clear();
   ports.emplace_back(servers_[1]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET [1] *******");
   WaitForServer(stub, 1, DEBUG_LOCATION);
   EXPECT_EQ(servers_[0]->service_.request_count(), 0);
@@ -662,7 +695,7 @@
   // And again for servers_[2]
   ports.clear();
   ports.emplace_back(servers_[2]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET [2] *******");
   WaitForServer(stub, 2, DEBUG_LOCATION);
   EXPECT_EQ(servers_[0]->service_.request_count(), 0);
@@ -676,14 +709,15 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
 
   std::vector<int> ports;
 
   // Perform one RPC against the first server.
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET [0] *******");
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
@@ -693,7 +727,7 @@
   ports.clear();
   ports.emplace_back(servers_[1]->port_);
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET superset *******");
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   // We stick to the previously connected server.
@@ -710,12 +744,14 @@
   StartServers(kNumServers);
   std::vector<int> ports = GetServersPorts();
   // Create two channels that (by default) use the global subchannel pool.
-  auto channel1 = BuildChannel("pick_first");
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("pick_first", response_generator1);
   auto stub1 = BuildStub(channel1);
-  SetNextResolution(ports);
-  auto channel2 = BuildChannel("pick_first");
+  response_generator1.SetNextResolution(ports);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("pick_first", response_generator2);
   auto stub2 = BuildStub(channel2);
-  SetNextResolution(ports);
+  response_generator2.SetNextResolution(ports);
   WaitForServer(stub1, 0, DEBUG_LOCATION);
   // Send one RPC on each channel.
   CheckRpcSendOk(stub1, DEBUG_LOCATION);
@@ -735,12 +771,14 @@
   // Create two channels that use local subchannel pool.
   ChannelArguments args;
   args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
-  auto channel1 = BuildChannel("pick_first", args);
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("pick_first", response_generator1, args);
   auto stub1 = BuildStub(channel1);
-  SetNextResolution(ports);
-  auto channel2 = BuildChannel("pick_first", args);
+  response_generator1.SetNextResolution(ports);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("pick_first", response_generator2, args);
   auto stub2 = BuildStub(channel2);
-  SetNextResolution(ports);
+  response_generator2.SetNextResolution(ports);
   WaitForServer(stub1, 0, DEBUG_LOCATION);
   // Send one RPC on each channel.
   CheckRpcSendOk(stub1, DEBUG_LOCATION);
@@ -756,13 +794,14 @@
   const int kNumUpdates = 1000;
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
   std::vector<int> ports = GetServersPorts();
   for (size_t i = 0; i < kNumUpdates; ++i) {
     std::shuffle(ports.begin(), ports.end(),
                  std::mt19937(std::random_device()()));
-    SetNextResolution(ports);
+    response_generator.SetNextResolution(ports);
     // We should re-enter core at the end of the loop to give the resolution
     // setting closure a chance to run.
     if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -784,16 +823,17 @@
       dead_ports.emplace_back(grpc_pick_unused_port_or_die());
     }
   }
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
   // The initial resolution only contains dead ports. There won't be any
   // selected subchannel. Re-resolution will return the same result.
-  SetNextResolution(dead_ports);
+  response_generator.SetNextResolution(dead_ports);
   gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
   for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
   // Set a re-resolution result that contains reachable ports, so that the
   // pick_first LB policy can recover soon.
-  SetNextResolutionUponError(alive_ports);
+  response_generator.SetNextResolutionUponError(alive_ports);
   gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
   WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
   CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -805,9 +845,10 @@
 TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
   std::vector<int> ports = {grpc_pick_unused_port_or_die()};
   StartServers(1, ports);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
   WaitForServer(stub, 0, DEBUG_LOCATION);
   gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
@@ -824,9 +865,10 @@
                             grpc_pick_unused_port_or_die()};
   CreateServers(2, ports);
   StartServer(1);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
   WaitForServer(stub, 1, DEBUG_LOCATION);
   gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
@@ -840,9 +882,10 @@
 TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
   std::vector<int> ports = {grpc_pick_unused_port_or_die()};
   StartServers(1, ports);
-  auto channel_1 = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel_1 = BuildChannel("pick_first", response_generator);
   auto stub_1 = BuildStub(channel_1);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
   WaitForServer(stub_1, 0, DEBUG_LOCATION);
   gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
@@ -851,13 +894,10 @@
   // create a new subchannel and hold a ref to it.
   StartServers(1, ports);
   gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
-  auto channel_2 = BuildChannel("pick_first");
+  auto response_generator_2 = BuildResolverResponseGenerator();
+  auto channel_2 = BuildChannel("pick_first", response_generator_2);
   auto stub_2 = BuildStub(channel_2);
-  // TODO(juanlishen): This resolution result will only be visible to channel 2
-  // since the response generator is only associated with channel 2 now. We
-  // should change the response generator to be able to deliver updates to
-  // multiple channels at once.
-  SetNextResolution(ports);
+  response_generator_2.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
   WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
   gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
@@ -883,13 +923,15 @@
   // Start server, send RPC, and make sure channel is READY.
   const int kNumServers = 1;
   StartServers(kNumServers);
-  auto channel = BuildChannel("");  // pick_first is the default.
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("", response_generator);  // pick_first is the default.
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
   // Stop server.  Channel should go into state IDLE.
-  SetFailureOnReresolution();
+  response_generator.SetFailureOnReresolution();
   servers_[0]->Shutdown();
   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
@@ -897,14 +939,16 @@
 }
 
 TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
-  auto channel = BuildChannel("");  // pick_first is the default.
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("", response_generator);  // pick_first is the default.
   auto stub = BuildStub(channel);
   // Create a number of servers, but only start 1 of them.
   CreateServers(10);
   StartServer(0);
   // Initially resolve to first server and make sure it connects.
   gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
-  SetNextResolution({servers_[0]->port_});
+  response_generator.SetNextResolution({servers_[0]->port_});
   CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
   // Send a resolution update with the remaining servers, none of which are
@@ -916,7 +960,7 @@
   gpr_log(GPR_INFO,
           "Phase 2: Resolver update pointing to remaining "
           "(not started) servers.");
-  SetNextResolution(GetServersPorts(1 /* start_index */));
+  response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
   // RPCs will continue to be sent to the first server.
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   // Now stop the first server, so that the current subchannel list
@@ -947,9 +991,11 @@
   // Start server, send RPC, and make sure channel is READY.
   const int kNumServers = 1;
   StartServers(kNumServers);
-  auto channel = BuildChannel("");  // pick_first is the default.
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("", response_generator);  // pick_first is the default.
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
   // Stop server.  Channel should go into state IDLE.
@@ -958,13 +1004,13 @@
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
   // Now send resolver update that includes no addresses.  Channel
   // should stay in state IDLE.
-  SetNextResolution({});
+  response_generator.SetNextResolution({});
   EXPECT_FALSE(channel->WaitForStateChange(
       GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
   // Now bring the backend back up and send a non-empty resolver update,
   // and then try to send an RPC.  Channel should go back into state READY.
   StartServer(0);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
 }
@@ -973,9 +1019,10 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   // Wait until all backends are ready.
   do {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -999,18 +1046,20 @@
 
 TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
   StartServers(1);  // Single server
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution({servers_[0]->port_});
+  response_generator.SetNextResolution({servers_[0]->port_});
   WaitForServer(stub, 0, DEBUG_LOCATION);
   // Create a new channel and its corresponding RR LB policy, which will pick
   // the subchannels in READY state from the previous RPC against the same
   // target (even if it happened over a different channel, because subchannels
   // are globally reused). Progress should happen without any transition from
   // this READY state.
-  auto second_channel = BuildChannel("round_robin");
+  auto second_response_generator = BuildResolverResponseGenerator();
+  auto second_channel = BuildChannel("round_robin", second_response_generator);
   auto second_stub = BuildStub(second_channel);
-  SetNextResolution({servers_[0]->port_});
+  second_response_generator.SetNextResolution({servers_[0]->port_});
   CheckRpcSendOk(second_stub, DEBUG_LOCATION);
 }
 
@@ -1018,13 +1067,14 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
   std::vector<int> ports;
   // Start with a single server.
   gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 0, DEBUG_LOCATION);
   // Send RPCs. They should all go servers_[0]
   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -1036,7 +1086,7 @@
   gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
   ports.clear();
   ports.emplace_back(servers_[1]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // Wait until update has been processed, as signaled by the second backend
   // receiving a request.
   EXPECT_EQ(0, servers_[1]->service_.request_count());
@@ -1050,7 +1100,7 @@
   gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
   ports.clear();
   ports.emplace_back(servers_[2]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 2, DEBUG_LOCATION);
   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(0, servers_[0]->service_.request_count());
@@ -1063,7 +1113,7 @@
   ports.emplace_back(servers_[0]->port_);
   ports.emplace_back(servers_[1]->port_);
   ports.emplace_back(servers_[2]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 0, DEBUG_LOCATION);
   WaitForServer(stub, 1, DEBUG_LOCATION);
   WaitForServer(stub, 2, DEBUG_LOCATION);
@@ -1075,7 +1125,7 @@
   // An empty update will result in the channel going into TRANSIENT_FAILURE.
   gpr_log(GPR_INFO, "*** NO BACKENDS ***");
   ports.clear();
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   grpc_connectivity_state channel_state;
   do {
     channel_state = channel->GetState(true /* try to connect */);
@@ -1086,7 +1136,7 @@
   gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
   ports.clear();
   ports.emplace_back(servers_[1]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 1, DEBUG_LOCATION);
   channel_state = channel->GetState(false /* try to connect */);
   ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
@@ -1097,13 +1147,14 @@
 TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
   std::vector<int> ports;
 
   // Start with a single server.
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 0, DEBUG_LOCATION);
   // Send RPCs. They should all go to servers_[0]
   for (size_t i = 0; i < 10; ++i) SendRpc(stub);
@@ -1116,7 +1167,7 @@
   servers_[1]->Shutdown();
   ports.emplace_back(servers_[1]->port_);
   ports.emplace_back(servers_[2]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 0, DEBUG_LOCATION);
   WaitForServer(stub, 2, DEBUG_LOCATION);
 
@@ -1130,13 +1181,14 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
   std::vector<int> ports = GetServersPorts();
   for (size_t i = 0; i < 1000; ++i) {
     std::shuffle(ports.begin(), ports.end(),
                  std::mt19937(std::random_device()()));
-    SetNextResolution(ports);
+    response_generator.SetNextResolution(ports);
     if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
   }
   // Check LB policy name for the channel.
@@ -1162,9 +1214,10 @@
     second_ports.push_back(grpc_pick_unused_port_or_die());
   }
   StartServers(kNumServers, first_ports);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(first_ports);
+  response_generator.SetNextResolution(first_ports);
   // Send a number of RPCs, which succeed.
   for (size_t i = 0; i < 100; ++i) {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -1188,7 +1241,7 @@
   StartServers(kNumServers, second_ports);
   // Don't notify of the update. Wait for the LB policy's re-resolution to
   // "pull" the new ports.
-  SetNextResolutionUponError(second_ports);
+  response_generator.SetNextResolutionUponError(second_ports);
   gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
   gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
   // Client request should eventually (but still fairly soon) succeed.
@@ -1205,9 +1258,10 @@
   const int kNumServers = 3;
   StartServers(kNumServers);
   const auto ports = GetServersPorts();
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   for (size_t i = 0; i < kNumServers; ++i) {
     WaitForServer(stub, i, DEBUG_LOCATION);
   }
@@ -1251,9 +1305,10 @@
   args.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name\"}}");
-  auto channel = BuildChannel("round_robin", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution({servers_[0]->port_});
+  response_generator.SetNextResolution({servers_[0]->port_});
   EXPECT_TRUE(WaitForChannelReady(channel.get()));
   CheckRpcSendOk(stub, DEBUG_LOCATION);
 }
@@ -1267,9 +1322,10 @@
   args.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name\"}}");
-  auto channel = BuildChannel("round_robin", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   // Channel should not become READY, because health checks should be failing.
   gpr_log(GPR_INFO,
           "*** initial state: unknown health check service name for "
@@ -1341,15 +1397,17 @@
   args.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name\"}}");
-  auto channel1 = BuildChannel("round_robin", args);
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("round_robin", response_generator1, args);
   auto stub1 = BuildStub(channel1);
   std::vector<int> ports = GetServersPorts();
-  SetNextResolution(ports);
+  response_generator1.SetNextResolution(ports);
   // Create a channel with health checking enabled but inhibited.
   args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
-  auto channel2 = BuildChannel("round_robin", args);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("round_robin", response_generator2, args);
   auto stub2 = BuildStub(channel2);
-  SetNextResolution(ports);
+  response_generator2.SetNextResolution(ports);
   // First channel should not become READY, because health checks should be
   // failing.
   EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
@@ -1376,19 +1434,21 @@
   args.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name\"}}");
-  auto channel1 = BuildChannel("round_robin", args);
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("round_robin", response_generator1, args);
   auto stub1 = BuildStub(channel1);
   std::vector<int> ports = GetServersPorts();
-  SetNextResolution(ports);
+  response_generator1.SetNextResolution(ports);
   // Create a channel with health-checking enabled with a different
   // service name.
   ChannelArguments args2;
   args2.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name2\"}}");
-  auto channel2 = BuildChannel("round_robin", args2);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("round_robin", response_generator2, args2);
   auto stub2 = BuildStub(channel2);
-  SetNextResolution(ports);
+  response_generator2.SetNextResolution(ports);
   // Allow health checks from channel 2 to succeed.
   servers_[0]->SetServingStatus("health_check_service_name2", true);
   // First channel should not become READY, because health checks should be
@@ -1438,9 +1498,11 @@
   const int kNumServers = 1;
   const int kNumRpcs = 10;
   StartServers(kNumServers);
-  auto channel = BuildChannel("intercept_trailing_metadata_lb");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("intercept_trailing_metadata_lb", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   for (size_t i = 0; i < kNumRpcs; ++i) {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
   }
@@ -1470,9 +1532,11 @@
       "    }\n"
       "  } ]\n"
       "}");
-  auto channel = BuildChannel("intercept_trailing_metadata_lb", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   for (size_t i = 0; i < kNumRpcs; ++i) {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
   }