Merge pull request #19569 from jtattermusch/csharp_internal_span_use

C#: add System.Memory dependency and use Span<> internally for all target frameworks 
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 22c926d..8dfa4e8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -332,7 +332,6 @@
 add_dependencies(buildtests_c grpc_completion_queue_test)
 add_dependencies(buildtests_c grpc_completion_queue_threading_test)
 add_dependencies(buildtests_c grpc_credentials_test)
-add_dependencies(buildtests_c grpc_fetch_oauth2)
 add_dependencies(buildtests_c grpc_ipv6_loopback_available_test)
 if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
 add_dependencies(buildtests_c grpc_json_token_test)
@@ -634,6 +633,7 @@
 add_dependencies(buildtests_cxx grpc_alts_credentials_options_test)
 add_dependencies(buildtests_cxx grpc_cli)
 add_dependencies(buildtests_cxx grpc_core_map_test)
+add_dependencies(buildtests_cxx grpc_fetch_oauth2)
 add_dependencies(buildtests_cxx grpc_linux_system_roots_test)
 add_dependencies(buildtests_cxx grpc_tool_test)
 add_dependencies(buildtests_cxx grpclb_api_test)
@@ -8270,40 +8270,6 @@
 endif (gRPC_BUILD_TESTS)
 if (gRPC_BUILD_TESTS)
 
-add_executable(grpc_fetch_oauth2
-  test/core/security/fetch_oauth2.cc
-)
-
-
-target_include_directories(grpc_fetch_oauth2
-  PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
-  PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
-  PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
-  PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
-  PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
-  PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
-  PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
-  PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
-  PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
-  PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
-)
-
-target_link_libraries(grpc_fetch_oauth2
-  ${_gRPC_ALLTARGETS_LIBRARIES}
-  grpc_test_util
-  grpc
-  gpr
-)
-
-  # avoid dependency on libstdc++
-  if (_gRPC_CORE_NOSTDCXX_FLAGS)
-    set_target_properties(grpc_fetch_oauth2 PROPERTIES LINKER_LANGUAGE C)
-    target_compile_options(grpc_fetch_oauth2 PRIVATE $<$<COMPILE_LANGUAGE:CXX>:${_gRPC_CORE_NOSTDCXX_FLAGS}>)
-  endif()
-
-endif (gRPC_BUILD_TESTS)
-if (gRPC_BUILD_TESTS)
-
 add_executable(grpc_ipv6_loopback_available_test
   test/core/iomgr/grpc_ipv6_loopback_available_test.cc
 )
@@ -14080,6 +14046,45 @@
 endif (gRPC_BUILD_CODEGEN)
 if (gRPC_BUILD_TESTS)
 
+add_executable(grpc_fetch_oauth2
+  test/core/security/fetch_oauth2.cc
+  third_party/googletest/googletest/src/gtest-all.cc
+  third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(grpc_fetch_oauth2
+  PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+  PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+  PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
+  PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
+  PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
+  PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
+  PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
+  PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
+  PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+  PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
+  PRIVATE third_party/googletest/googletest/include
+  PRIVATE third_party/googletest/googletest
+  PRIVATE third_party/googletest/googlemock/include
+  PRIVATE third_party/googletest/googlemock
+  PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(grpc_fetch_oauth2
+  ${_gRPC_PROTOBUF_LIBRARIES}
+  ${_gRPC_ALLTARGETS_LIBRARIES}
+  grpc_test_util
+  grpc++
+  grpc
+  gpr
+  ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
 add_executable(grpc_linux_system_roots_test
   test/core/security/linux_system_roots_test.cc
   third_party/googletest/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index c11462f..2a1bfad 100644
--- a/Makefile
+++ b/Makefile
@@ -1056,7 +1056,6 @@
 grpc_completion_queue_threading_test: $(BINDIR)/$(CONFIG)/grpc_completion_queue_threading_test
 grpc_create_jwt: $(BINDIR)/$(CONFIG)/grpc_create_jwt
 grpc_credentials_test: $(BINDIR)/$(CONFIG)/grpc_credentials_test
-grpc_fetch_oauth2: $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2
 grpc_ipv6_loopback_available_test: $(BINDIR)/$(CONFIG)/grpc_ipv6_loopback_available_test
 grpc_json_token_test: $(BINDIR)/$(CONFIG)/grpc_json_token_test
 grpc_jwt_verifier_test: $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test
@@ -1219,6 +1218,7 @@
 grpc_core_map_test: $(BINDIR)/$(CONFIG)/grpc_core_map_test
 grpc_cpp_plugin: $(BINDIR)/$(CONFIG)/grpc_cpp_plugin
 grpc_csharp_plugin: $(BINDIR)/$(CONFIG)/grpc_csharp_plugin
+grpc_fetch_oauth2: $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2
 grpc_linux_system_roots_test: $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test
 grpc_node_plugin: $(BINDIR)/$(CONFIG)/grpc_node_plugin
 grpc_objective_c_plugin: $(BINDIR)/$(CONFIG)/grpc_objective_c_plugin
@@ -1489,7 +1489,6 @@
   $(BINDIR)/$(CONFIG)/grpc_completion_queue_test \
   $(BINDIR)/$(CONFIG)/grpc_completion_queue_threading_test \
   $(BINDIR)/$(CONFIG)/grpc_credentials_test \
-  $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2 \
   $(BINDIR)/$(CONFIG)/grpc_ipv6_loopback_available_test \
   $(BINDIR)/$(CONFIG)/grpc_json_token_test \
   $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test \
@@ -1693,6 +1692,7 @@
   $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \
   $(BINDIR)/$(CONFIG)/grpc_cli \
   $(BINDIR)/$(CONFIG)/grpc_core_map_test \
+  $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2 \
   $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test \
   $(BINDIR)/$(CONFIG)/grpc_tool_test \
   $(BINDIR)/$(CONFIG)/grpclb_api_test \
@@ -1857,6 +1857,7 @@
   $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \
   $(BINDIR)/$(CONFIG)/grpc_cli \
   $(BINDIR)/$(CONFIG)/grpc_core_map_test \
+  $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2 \
   $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test \
   $(BINDIR)/$(CONFIG)/grpc_tool_test \
   $(BINDIR)/$(CONFIG)/grpclb_api_test \
@@ -10987,38 +10988,6 @@
 endif
 
 
-GRPC_FETCH_OAUTH2_SRC = \
-    test/core/security/fetch_oauth2.cc \
-
-GRPC_FETCH_OAUTH2_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_FETCH_OAUTH2_SRC))))
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL.
-
-$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: openssl_dep_error
-
-else
-
-
-
-$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: $(GRPC_FETCH_OAUTH2_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
-	$(E) "[LD]      Linking $@"
-	$(Q) mkdir -p `dirname $@`
-	$(Q) $(LD) $(LDFLAGS) $(GRPC_FETCH_OAUTH2_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2
-
-endif
-
-$(OBJDIR)/$(CONFIG)/test/core/security/fetch_oauth2.o:  $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
-
-deps_grpc_fetch_oauth2: $(GRPC_FETCH_OAUTH2_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(GRPC_FETCH_OAUTH2_OBJS:.o=.dep)
-endif
-endif
-
-
 GRPC_IPV6_LOOPBACK_AVAILABLE_TEST_SRC = \
     test/core/iomgr/grpc_ipv6_loopback_available_test.cc \
 
@@ -17134,6 +17103,49 @@
 endif
 
 
+GRPC_FETCH_OAUTH2_SRC = \
+    test/core/security/fetch_oauth2.cc \
+
+GRPC_FETCH_OAUTH2_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_FETCH_OAUTH2_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
+
+$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: $(PROTOBUF_DEP) $(GRPC_FETCH_OAUTH2_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+	$(E) "[LD]      Linking $@"
+	$(Q) mkdir -p `dirname $@`
+	$(Q) $(LDXX) $(LDFLAGS) $(GRPC_FETCH_OAUTH2_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/security/fetch_oauth2.o:  $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_grpc_fetch_oauth2: $(GRPC_FETCH_OAUTH2_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(GRPC_FETCH_OAUTH2_OBJS:.o=.dep)
+endif
+endif
+
+
 GRPC_LINUX_SYSTEM_ROOTS_TEST_SRC = \
     test/core/security/linux_system_roots_test.cc \
 
diff --git a/build.yaml b/build.yaml
index 1c7be4e..4134ecd 100644
--- a/build.yaml
+++ b/build.yaml
@@ -2863,16 +2863,6 @@
   - grpc_test_util
   - grpc
   - gpr
-- name: grpc_fetch_oauth2
-  build: test
-  run: false
-  language: c
-  src:
-  - test/core/security/fetch_oauth2.cc
-  deps:
-  - grpc_test_util
-  - grpc
-  - gpr
 - name: grpc_ipv6_loopback_available_test
   build: test
   language: c
@@ -4945,6 +4935,17 @@
   deps:
   - grpc_plugin_support
   secure: false
+- name: grpc_fetch_oauth2
+  build: test
+  run: false
+  language: c++
+  src:
+  - test/core/security/fetch_oauth2.cc
+  deps:
+  - grpc_test_util
+  - grpc++
+  - grpc
+  - gpr
 - name: grpc_linux_system_roots_test
   gtest: true
   build: test
diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h
index 8e4f26a..777142f 100644
--- a/include/grpc/grpc_security.h
+++ b/include/grpc/grpc_security.h
@@ -330,20 +330,20 @@
 
 /** Options for creating STS Oauth Token Exchange credentials following the IETF
    draft https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16.
-   Optional fields may be set to NULL. It is the responsibility of the caller to
-   ensure that the subject and actor tokens are refreshed on disk at the
-   specified paths. This API is used for experimental purposes for now and may
-   change in the future. */
+   Optional fields may be set to NULL or empty string. It is the responsibility
+   of the caller to ensure that the subject and actor tokens are refreshed on
+   disk at the specified paths. This API is used for experimental purposes for
+   now and may change in the future. */
 typedef struct {
-  const char* sts_endpoint_url;     /* Required. */
-  const char* resource;             /* Optional. */
-  const char* audience;             /* Optional. */
-  const char* scope;                /* Optional. */
-  const char* requested_token_type; /* Optional. */
-  const char* subject_token_path;   /* Required. */
-  const char* subject_token_type;   /* Required. */
-  const char* actor_token_path;     /* Optional. */
-  const char* actor_token_type;     /* Optional. */
+  const char* token_exchange_service_uri; /* Required. */
+  const char* resource;                   /* Optional. */
+  const char* audience;                   /* Optional. */
+  const char* scope;                      /* Optional. */
+  const char* requested_token_type;       /* Optional. */
+  const char* subject_token_path;         /* Required. */
+  const char* subject_token_type;         /* Required. */
+  const char* actor_token_path;           /* Optional. */
+  const char* actor_token_type;           /* Optional. */
 } grpc_sts_credentials_options;
 
 /** Creates an STS credentials following the STS Token Exchanged specifed in the
diff --git a/include/grpcpp/security/credentials.h b/include/grpcpp/security/credentials.h
index b124d3d..5190b1b 100644
--- a/include/grpcpp/security/credentials.h
+++ b/include/grpcpp/security/credentials.h
@@ -106,6 +106,24 @@
 
 namespace experimental {
 
+typedef ::grpc_impl::experimental::StsCredentialsOptions StsCredentialsOptions;
+
+static inline grpc::Status StsCredentialsOptionsFromJson(
+    const grpc::string& json_string, StsCredentialsOptions* options) {
+  return ::grpc_impl::experimental::StsCredentialsOptionsFromJson(json_string,
+                                                                  options);
+}
+
+static inline grpc::Status StsCredentialsOptionsFromEnv(
+    StsCredentialsOptions* options) {
+  return grpc_impl::experimental::StsCredentialsOptionsFromEnv(options);
+}
+
+static inline std::shared_ptr<grpc_impl::CallCredentials> StsCredentials(
+    const StsCredentialsOptions& options) {
+  return grpc_impl::experimental::StsCredentials(options);
+}
+
 typedef ::grpc_impl::experimental::AltsCredentialsOptions
     AltsCredentialsOptions;
 
diff --git a/include/grpcpp/security/credentials_impl.h b/include/grpcpp/security/credentials_impl.h
index 34920a5..e236512 100644
--- a/include/grpcpp/security/credentials_impl.h
+++ b/include/grpcpp/security/credentials_impl.h
@@ -259,6 +259,70 @@
 
 namespace experimental {
 
+/// Options for creating STS Oauth Token Exchange credentials following the IETF
+/// draft https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16.
+/// Optional fields may be set to empty string. It is the responsibility of the
+/// caller to ensure that the subject and actor tokens are refreshed on disk at
+/// the specified paths.
+struct StsCredentialsOptions {
+  grpc::string token_exchange_service_uri;  // Required.
+  grpc::string resource;                    // Optional.
+  grpc::string audience;                    // Optional.
+  grpc::string scope;                       // Optional.
+  grpc::string requested_token_type;        // Optional.
+  grpc::string subject_token_path;          // Required.
+  grpc::string subject_token_type;          // Required.
+  grpc::string actor_token_path;            // Optional.
+  grpc::string actor_token_type;            // Optional.
+};
+
+/// Creates STS Options from a JSON string. The JSON schema is as follows:
+/// {
+///   "title": "STS Credentials Config",
+///   "type": "object",
+///   "required": ["token_exchange_service_uri", "subject_token_path",
+///                "subject_token_type"],
+///    "properties": {
+///      "token_exchange_service_uri": {
+///        "type": "string"
+///     },
+///     "resource": {
+///       "type": "string"
+///     },
+///     "audience": {
+///       "type": "string"
+///     },
+///     "scope": {
+///       "type": "string"
+///     },
+///     "requested_token_type": {
+///       "type": "string"
+///     },
+///     "subject_token_path": {
+///       "type": "string"
+///     },
+///     "subject_token_type": {
+///     "type": "string"
+///     },
+///     "actor_token_path" : {
+///       "type": "string"
+///     },
+///     "actor_token_type": {
+///       "type": "string"
+///     }
+///   }
+/// }
+grpc::Status StsCredentialsOptionsFromJson(const grpc::string& json_string,
+                                           StsCredentialsOptions* options);
+
+/// Creates STS credentials options from the $STS_CREDENTIALS environment
+/// variable. This environment variable points to the path of a JSON file
+/// comforming to the schema described above.
+grpc::Status StsCredentialsOptionsFromEnv(StsCredentialsOptions* options);
+
+std::shared_ptr<CallCredentials> StsCredentials(
+    const StsCredentialsOptions& options);
+
 /// Options used to build AltsCredentials.
 struct AltsCredentialsOptions {
   /// service accounts of target endpoint that will be acceptable
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 43b28bc..4e7bb5e 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -433,6 +433,8 @@
 
 class AresDnsResolverFactory : public ResolverFactory {
  public:
+  bool IsValidUri(const grpc_uri* uri) const override { return true; }
+
   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
     return OrphanablePtr<Resolver>(New<AresDnsResolver>(std::move(args)));
   }
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 1abe9ad..bc6d73a 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -258,11 +258,16 @@
 
 class NativeDnsResolverFactory : public ResolverFactory {
  public:
-  OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
-    if (GPR_UNLIKELY(0 != strcmp(args.uri->authority, ""))) {
+  bool IsValidUri(const grpc_uri* uri) const override {
+    if (GPR_UNLIKELY(0 != strcmp(uri->authority, ""))) {
       gpr_log(GPR_ERROR, "authority based dns uri's not supported");
-      return OrphanablePtr<Resolver>(nullptr);
+      return false;
     }
+    return true;
+  }
+
+  OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
+    if (!IsValidUri(args.uri)) return nullptr;
     return OrphanablePtr<Resolver>(New<NativeDnsResolver>(std::move(args)));
   }
 
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index ff728a3..c5a27f1 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -89,9 +89,15 @@
     : Resolver(args.combiner, std::move(args.result_handler)) {
   GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this,
                     grpc_combiner_scheduler(combiner()));
-  channel_args_ = grpc_channel_args_copy(args.args);
   FakeResolverResponseGenerator* response_generator =
       FakeResolverResponseGenerator::GetFromArgs(args.args);
+  // Channels sharing the same subchannels may have different resolver response
+  // generators. If we don't remove this arg, subchannel pool will create new
+  // subchannels for the same address instead of reusing existing ones because
+  // of different values of this channel arg.
+  const char* args_to_remove[] = {GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
+  channel_args_ = grpc_channel_args_copy_and_remove(
+      args.args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove));
   if (response_generator != nullptr) {
     response_generator->resolver_ = this;
     if (response_generator->has_result_) {
@@ -315,6 +321,8 @@
 
 class FakeResolverFactory : public ResolverFactory {
  public:
+  bool IsValidUri(const grpc_uri* uri) const override { return true; }
+
   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
     return OrphanablePtr<Resolver>(New<FakeResolver>(std::move(args)));
   }
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
index 517b929..532fd6d 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
@@ -80,24 +80,23 @@
 
 void DoNothing(void* ignored) {}
 
-OrphanablePtr<Resolver> CreateSockaddrResolver(
-    ResolverArgs args,
-    bool parse(const grpc_uri* uri, grpc_resolved_address* dst)) {
-  if (0 != strcmp(args.uri->authority, "")) {
+bool ParseUri(const grpc_uri* uri,
+              bool parse(const grpc_uri* uri, grpc_resolved_address* dst),
+              ServerAddressList* addresses) {
+  if (0 != strcmp(uri->authority, "")) {
     gpr_log(GPR_ERROR, "authority-based URIs not supported by the %s scheme",
-            args.uri->scheme);
-    return nullptr;
+            uri->scheme);
+    return false;
   }
   // Construct addresses.
   grpc_slice path_slice =
-      grpc_slice_new(args.uri->path, strlen(args.uri->path), DoNothing);
+      grpc_slice_new(uri->path, strlen(uri->path), DoNothing);
   grpc_slice_buffer path_parts;
   grpc_slice_buffer_init(&path_parts);
   grpc_slice_split(path_slice, ",", &path_parts);
-  ServerAddressList addresses;
   bool errors_found = false;
   for (size_t i = 0; i < path_parts.count; i++) {
-    grpc_uri ith_uri = *args.uri;
+    grpc_uri ith_uri = *uri;
     UniquePtr<char> part_str(grpc_slice_to_c_string(path_parts.slices[i]));
     ith_uri.path = part_str.get();
     grpc_resolved_address addr;
@@ -105,13 +104,20 @@
       errors_found = true;
       break;
     }
-    addresses.emplace_back(addr, nullptr /* args */);
+    if (addresses != nullptr) {
+      addresses->emplace_back(addr, nullptr /* args */);
+    }
   }
   grpc_slice_buffer_destroy_internal(&path_parts);
   grpc_slice_unref_internal(path_slice);
-  if (errors_found) {
-    return OrphanablePtr<Resolver>(nullptr);
-  }
+  return !errors_found;
+}
+
+OrphanablePtr<Resolver> CreateSockaddrResolver(
+    ResolverArgs args,
+    bool parse(const grpc_uri* uri, grpc_resolved_address* dst)) {
+  ServerAddressList addresses;
+  if (!ParseUri(args.uri, parse, &addresses)) return nullptr;
   // Instantiate resolver.
   return OrphanablePtr<Resolver>(
       New<SockaddrResolver>(std::move(addresses), std::move(args)));
@@ -119,6 +125,10 @@
 
 class IPv4ResolverFactory : public ResolverFactory {
  public:
+  bool IsValidUri(const grpc_uri* uri) const override {
+    return ParseUri(uri, grpc_parse_ipv4, nullptr);
+  }
+
   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
     return CreateSockaddrResolver(std::move(args), grpc_parse_ipv4);
   }
@@ -128,6 +138,10 @@
 
 class IPv6ResolverFactory : public ResolverFactory {
  public:
+  bool IsValidUri(const grpc_uri* uri) const override {
+    return ParseUri(uri, grpc_parse_ipv6, nullptr);
+  }
+
   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
     return CreateSockaddrResolver(std::move(args), grpc_parse_ipv6);
   }
@@ -138,6 +152,10 @@
 #ifdef GRPC_HAVE_UNIX_SOCKET
 class UnixResolverFactory : public ResolverFactory {
  public:
+  bool IsValidUri(const grpc_uri* uri) const override {
+    return ParseUri(uri, grpc_parse_unix, nullptr);
+  }
+
   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
     return CreateSockaddrResolver(std::move(args), grpc_parse_unix);
   }
diff --git a/src/core/ext/filters/client_channel/resolver_factory.h b/src/core/ext/filters/client_channel/resolver_factory.h
index 273fd8d..7fed48b 100644
--- a/src/core/ext/filters/client_channel/resolver_factory.h
+++ b/src/core/ext/filters/client_channel/resolver_factory.h
@@ -47,6 +47,10 @@
 
 class ResolverFactory {
  public:
+  /// Returns a bool indicating whether the input uri is valid to create a
+  /// resolver.
+  virtual bool IsValidUri(const grpc_uri* uri) const GRPC_ABSTRACT;
+
   /// Returns a new resolver instance.
   virtual OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const
       GRPC_ABSTRACT;
diff --git a/src/core/ext/filters/client_channel/resolver_registry.cc b/src/core/ext/filters/client_channel/resolver_registry.cc
index 5b00eab..509c4ef 100644
--- a/src/core/ext/filters/client_channel/resolver_registry.cc
+++ b/src/core/ext/filters/client_channel/resolver_registry.cc
@@ -132,6 +132,17 @@
   return g_state->LookupResolverFactory(scheme);
 }
 
+bool ResolverRegistry::IsValidTarget(const char* target) {
+  grpc_uri* uri = nullptr;
+  char* canonical_target = nullptr;
+  ResolverFactory* factory =
+      g_state->FindResolverFactory(target, &uri, &canonical_target);
+  bool result = factory == nullptr ? false : factory->IsValidUri(uri);
+  grpc_uri_destroy(uri);
+  gpr_free(canonical_target);
+  return result;
+}
+
 OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
     const char* target, const grpc_channel_args* args,
     grpc_pollset_set* pollset_set, grpc_combiner* combiner,
diff --git a/src/core/ext/filters/client_channel/resolver_registry.h b/src/core/ext/filters/client_channel/resolver_registry.h
index 0eec678..4248a06 100644
--- a/src/core/ext/filters/client_channel/resolver_registry.h
+++ b/src/core/ext/filters/client_channel/resolver_registry.h
@@ -50,6 +50,9 @@
     static void RegisterResolverFactory(UniquePtr<ResolverFactory> factory);
   };
 
+  /// Checks whether the user input \a target is valid to create a resolver.
+  static bool IsValidTarget(const char* target);
+
   /// Creates a resolver given \a target.
   /// First tries to parse \a target as a URI. If this succeeds, tries
   /// to locate a registered resolver factory based on the URI scheme.
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
index 06fa8bb..7a58483 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
@@ -18,6 +18,7 @@
 
 #include <grpc/support/port_platform.h>
 
+#include "src/core/lib/json/json.h"
 #include "src/core/lib/security/credentials/oauth2/oauth2_credentials.h"
 
 #include <string.h>
@@ -641,8 +642,8 @@
   *sts_url_out = nullptr;
   InlinedVector<grpc_error*, 3> error_list;
   UniquePtr<grpc_uri, GrpcUriDeleter> sts_url(
-      options->sts_endpoint_url != nullptr
-          ? grpc_uri_parse(options->sts_endpoint_url, false)
+      options->token_exchange_service_uri != nullptr
+          ? grpc_uri_parse(options->token_exchange_service_uri, false)
           : nullptr);
   if (sts_url == nullptr) {
     error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc
index 2b69f57..7eedad3 100644
--- a/src/core/lib/slice/slice_buffer.cc
+++ b/src/core/lib/slice/slice_buffer.cc
@@ -262,9 +262,9 @@
   src->length = 0;
 }
 
+template <bool incref>
 static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer* src, size_t n,
-                                              grpc_slice_buffer* dst,
-                                              bool incref) {
+                                              grpc_slice_buffer* dst) {
   GPR_ASSERT(src->length >= n);
   if (src->length == n) {
     grpc_slice_buffer_move_into(src, dst);
@@ -304,12 +304,12 @@
 
 void grpc_slice_buffer_move_first(grpc_slice_buffer* src, size_t n,
                                   grpc_slice_buffer* dst) {
-  slice_buffer_move_first_maybe_ref(src, n, dst, true);
+  slice_buffer_move_first_maybe_ref<true>(src, n, dst);
 }
 
 void grpc_slice_buffer_move_first_no_ref(grpc_slice_buffer* src, size_t n,
                                          grpc_slice_buffer* dst) {
-  slice_buffer_move_first_maybe_ref(src, n, dst, false);
+  slice_buffer_move_first_maybe_ref<false>(src, n, dst);
 }
 
 void grpc_slice_buffer_move_first_into_buffer(grpc_slice_buffer* src, size_t n,
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index d73b3e0..5de5a76 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -17,13 +17,23 @@
  */
 
 #include "src/cpp/client/secure_credentials.h"
+
+#include <grpc/impl/codegen/slice.h>
+#include <grpc/slice.h>
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 #include <grpcpp/channel.h>
+#include <grpcpp/impl/codegen/status_code_enum.h>
 #include <grpcpp/impl/grpc_library.h>
 #include <grpcpp/support/channel_arguments.h>
+
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/load_file.h"
+#include "src/core/lib/json/json.h"
 #include "src/core/lib/security/transport/auth_filters.h"
+#include "src/core/lib/security/util/json_util.h"
 #include "src/cpp/client/create_channel_internal.h"
 #include "src/cpp/common/secure_auth_context.h"
 
@@ -105,6 +115,144 @@
 
 namespace experimental {
 
+namespace {
+
+void ClearStsCredentialsOptions(StsCredentialsOptions* options) {
+  if (options == nullptr) return;
+  options->token_exchange_service_uri.clear();
+  options->resource.clear();
+  options->audience.clear();
+  options->scope.clear();
+  options->requested_token_type.clear();
+  options->subject_token_path.clear();
+  options->subject_token_type.clear();
+  options->actor_token_path.clear();
+  options->actor_token_type.clear();
+}
+
+}  // namespace
+
+// Builds STS credentials options from JSON.
+grpc::Status StsCredentialsOptionsFromJson(const grpc::string& json_string,
+                                           StsCredentialsOptions* options) {
+  struct GrpcJsonDeleter {
+    void operator()(grpc_json* json) { grpc_json_destroy(json); }
+  };
+  if (options == nullptr) {
+    return grpc::Status(grpc::INVALID_ARGUMENT, "options cannot be nullptr.");
+  }
+  ClearStsCredentialsOptions(options);
+  std::vector<char> scratchpad(json_string.c_str(),
+                               json_string.c_str() + json_string.size() + 1);
+  std::unique_ptr<grpc_json, GrpcJsonDeleter> json(
+      grpc_json_parse_string(&scratchpad[0]));
+  if (json == nullptr) {
+    return grpc::Status(grpc::INVALID_ARGUMENT, "Invalid json.");
+  }
+
+  // Required fields.
+  const char* value = grpc_json_get_string_property(
+      json.get(), "token_exchange_service_uri", nullptr);
+  if (value == nullptr) {
+    ClearStsCredentialsOptions(options);
+    return grpc::Status(grpc::INVALID_ARGUMENT,
+                        "token_exchange_service_uri must be specified.");
+  }
+  options->token_exchange_service_uri.assign(value);
+  value =
+      grpc_json_get_string_property(json.get(), "subject_token_path", nullptr);
+  if (value == nullptr) {
+    ClearStsCredentialsOptions(options);
+    return grpc::Status(grpc::INVALID_ARGUMENT,
+                        "subject_token_path must be specified.");
+  }
+  options->subject_token_path.assign(value);
+  value =
+      grpc_json_get_string_property(json.get(), "subject_token_type", nullptr);
+  if (value == nullptr) {
+    ClearStsCredentialsOptions(options);
+    return grpc::Status(grpc::INVALID_ARGUMENT,
+                        "subject_token_type must be specified.");
+  }
+  options->subject_token_type.assign(value);
+
+  // Optional fields.
+  value = grpc_json_get_string_property(json.get(), "resource", nullptr);
+  if (value != nullptr) options->resource.assign(value);
+  value = grpc_json_get_string_property(json.get(), "audience", nullptr);
+  if (value != nullptr) options->audience.assign(value);
+  value = grpc_json_get_string_property(json.get(), "scope", nullptr);
+  if (value != nullptr) options->scope.assign(value);
+  value = grpc_json_get_string_property(json.get(), "requested_token_type",
+                                        nullptr);
+  if (value != nullptr) options->requested_token_type.assign(value);
+  value =
+      grpc_json_get_string_property(json.get(), "actor_token_path", nullptr);
+  if (value != nullptr) options->actor_token_path.assign(value);
+  value =
+      grpc_json_get_string_property(json.get(), "actor_token_type", nullptr);
+  if (value != nullptr) options->actor_token_type.assign(value);
+
+  return grpc::Status();
+}
+
+// Builds STS credentials Options from the $STS_CREDENTIALS env var.
+grpc::Status StsCredentialsOptionsFromEnv(StsCredentialsOptions* options) {
+  if (options == nullptr) {
+    return grpc::Status(grpc::INVALID_ARGUMENT, "options cannot be nullptr.");
+  }
+  ClearStsCredentialsOptions(options);
+  grpc_slice json_string = grpc_empty_slice();
+  char* sts_creds_path = gpr_getenv("STS_CREDENTIALS");
+  grpc_error* error = GRPC_ERROR_NONE;
+  grpc::Status status;
+  auto cleanup = [&json_string, &sts_creds_path, &error, &status]() {
+    grpc_slice_unref_internal(json_string);
+    gpr_free(sts_creds_path);
+    GRPC_ERROR_UNREF(error);
+    return status;
+  };
+
+  if (sts_creds_path == nullptr) {
+    status = grpc::Status(grpc::NOT_FOUND,
+                          "STS_CREDENTIALS environment variable not set.");
+    return cleanup();
+  }
+  error = grpc_load_file(sts_creds_path, 1, &json_string);
+  if (error != GRPC_ERROR_NONE) {
+    status = grpc::Status(grpc::NOT_FOUND, grpc_error_string(error));
+    return cleanup();
+  }
+  status = StsCredentialsOptionsFromJson(
+      reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(json_string)),
+      options);
+  return cleanup();
+}
+
+// C++ to Core STS Credentials options.
+grpc_sts_credentials_options StsCredentialsCppToCoreOptions(
+    const StsCredentialsOptions& options) {
+  grpc_sts_credentials_options opts;
+  memset(&opts, 0, sizeof(opts));
+  opts.token_exchange_service_uri = options.token_exchange_service_uri.c_str();
+  opts.resource = options.resource.c_str();
+  opts.audience = options.audience.c_str();
+  opts.scope = options.scope.c_str();
+  opts.requested_token_type = options.requested_token_type.c_str();
+  opts.subject_token_path = options.subject_token_path.c_str();
+  opts.subject_token_type = options.subject_token_type.c_str();
+  opts.actor_token_path = options.actor_token_path.c_str();
+  opts.actor_token_type = options.actor_token_type.c_str();
+  return opts;
+}
+
+// Builds STS credentials.
+std::shared_ptr<CallCredentials> StsCredentials(
+    const StsCredentialsOptions& options) {
+  auto opts = StsCredentialsCppToCoreOptions(options);
+  return WrapCallCredentials(grpc_sts_credentials_create(&opts, nullptr));
+}
+
 // Builds ALTS Credentials given ALTS specific options
 std::shared_ptr<ChannelCredentials> AltsCredentials(
     const AltsCredentialsOptions& options) {
diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h
index dd379ca..ed14df4 100644
--- a/src/cpp/client/secure_credentials.h
+++ b/src/cpp/client/secure_credentials.h
@@ -22,6 +22,7 @@
 #include <grpc/grpc_security.h>
 
 #include <grpcpp/security/credentials.h>
+#include <grpcpp/security/credentials_impl.h>
 #include <grpcpp/support/config.h>
 
 #include "src/core/lib/security/credentials/credentials.h"
@@ -68,6 +69,16 @@
   grpc_call_credentials* const c_creds_;
 };
 
+namespace experimental {
+
+// Transforms C++ STS Credentials options to core options. The pointers of the
+// resulting core options point to the memory held by the C++ options so C++
+// options need to be kept alive until after the core credentials creation.
+grpc_sts_credentials_options StsCredentialsCppToCoreOptions(
+    const StsCredentialsOptions& options);
+
+}  // namespace experimental
+
 }  // namespace grpc_impl
 
 namespace grpc {
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 0bf8e03..24f928e 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 """Invocation-side implementation of gRPC Python."""
 
-import functools
 import logging
 import sys
 import threading
@@ -82,6 +81,17 @@
         unknown_cygrpc_code, details)
 
 
+def _wait_once_until(condition, until):
+    if until is None:
+        condition.wait()
+    else:
+        remaining = until - time.time()
+        if remaining < 0:
+            raise grpc.FutureTimeoutError()
+        else:
+            condition.wait(timeout=remaining)
+
+
 class _RPCState(object):
 
     def __init__(self, due, initial_metadata, trailing_metadata, code, details):
@@ -168,11 +178,12 @@
 #pylint: disable=too-many-statements
 def _consume_request_iterator(request_iterator, state, call, request_serializer,
                               event_handler):
-    """Consume a request iterator supplied by the user."""
+    if cygrpc.is_fork_support_enabled():
+        condition_wait_timeout = 1.0
+    else:
+        condition_wait_timeout = None
 
     def consume_request_iterator():  # pylint: disable=too-many-branches
-        # Iterate over the request iterator until it is exhausted or an error
-        # condition is encountered.
         while True:
             return_from_user_request_generator_invoked = False
             try:
@@ -213,19 +224,14 @@
                             state.due.add(cygrpc.OperationType.send_message)
                         else:
                             return
-
-                        def _done():
-                            return (state.code is not None or
-                                    cygrpc.OperationType.send_message not in
-                                    state.due)
-
-                        _common.wait(
-                            state.condition.wait,
-                            _done,
-                            spin_cb=functools.partial(
-                                cygrpc.block_if_fork_in_progress, state))
-                        if state.code is not None:
-                            return
+                        while True:
+                            state.condition.wait(condition_wait_timeout)
+                            cygrpc.block_if_fork_in_progress(state)
+                            if state.code is None:
+                                if cygrpc.OperationType.send_message not in state.due:
+                                    break
+                            else:
+                                return
                 else:
                     return
         with state.condition:
@@ -275,21 +281,13 @@
         with self._state.condition:
             return self._state.code is not None
 
-    def _is_complete(self):
-        return self._state.code is not None
-
     def result(self, timeout=None):
-        """Returns the result of the computation or raises its exception.
-
-        See grpc.Future.result for the full API contract.
-        """
+        until = None if timeout is None else time.time() + timeout
         with self._state.condition:
-            timed_out = _common.wait(
-                self._state.condition.wait, self._is_complete, timeout=timeout)
-            if timed_out:
-                raise grpc.FutureTimeoutError()
-            else:
-                if self._state.code is grpc.StatusCode.OK:
+            while True:
+                if self._state.code is None:
+                    _wait_once_until(self._state.condition, until)
+                elif self._state.code is grpc.StatusCode.OK:
                     return self._state.response
                 elif self._state.cancelled:
                     raise grpc.FutureCancelledError()
@@ -297,17 +295,12 @@
                     raise self
 
     def exception(self, timeout=None):
-        """Return the exception raised by the computation.
-
-        See grpc.Future.exception for the full API contract.
-        """
+        until = None if timeout is None else time.time() + timeout
         with self._state.condition:
-            timed_out = _common.wait(
-                self._state.condition.wait, self._is_complete, timeout=timeout)
-            if timed_out:
-                raise grpc.FutureTimeoutError()
-            else:
-                if self._state.code is grpc.StatusCode.OK:
+            while True:
+                if self._state.code is None:
+                    _wait_once_until(self._state.condition, until)
+                elif self._state.code is grpc.StatusCode.OK:
                     return None
                 elif self._state.cancelled:
                     raise grpc.FutureCancelledError()
@@ -315,17 +308,12 @@
                     return self
 
     def traceback(self, timeout=None):
-        """Access the traceback of the exception raised by the computation.
-
-        See grpc.future.traceback for the full API contract.
-        """
+        until = None if timeout is None else time.time() + timeout
         with self._state.condition:
-            timed_out = _common.wait(
-                self._state.condition.wait, self._is_complete, timeout=timeout)
-            if timed_out:
-                raise grpc.FutureTimeoutError()
-            else:
-                if self._state.code is grpc.StatusCode.OK:
+            while True:
+                if self._state.code is None:
+                    _wait_once_until(self._state.condition, until)
+                elif self._state.code is grpc.StatusCode.OK:
                     return None
                 elif self._state.cancelled:
                     raise grpc.FutureCancelledError()
@@ -357,23 +345,17 @@
                 raise StopIteration()
             else:
                 raise self
-
-            def _response_ready():
-                return (
-                    self._state.response is not None or
-                    (cygrpc.OperationType.receive_message not in self._state.due
-                     and self._state.code is not None))
-
-            _common.wait(self._state.condition.wait, _response_ready)
-            if self._state.response is not None:
-                response = self._state.response
-                self._state.response = None
-                return response
-            elif cygrpc.OperationType.receive_message not in self._state.due:
-                if self._state.code is grpc.StatusCode.OK:
-                    raise StopIteration()
-                elif self._state.code is not None:
-                    raise self
+            while True:
+                self._state.condition.wait()
+                if self._state.response is not None:
+                    response = self._state.response
+                    self._state.response = None
+                    return response
+                elif cygrpc.OperationType.receive_message not in self._state.due:
+                    if self._state.code is grpc.StatusCode.OK:
+                        raise StopIteration()
+                    elif self._state.code is not None:
+                        raise self
 
     def __iter__(self):
         return self
@@ -404,47 +386,32 @@
 
     def initial_metadata(self):
         with self._state.condition:
-
-            def _done():
-                return self._state.initial_metadata is not None
-
-            _common.wait(self._state.condition.wait, _done)
+            while self._state.initial_metadata is None:
+                self._state.condition.wait()
             return self._state.initial_metadata
 
     def trailing_metadata(self):
         with self._state.condition:
-
-            def _done():
-                return self._state.trailing_metadata is not None
-
-            _common.wait(self._state.condition.wait, _done)
+            while self._state.trailing_metadata is None:
+                self._state.condition.wait()
             return self._state.trailing_metadata
 
     def code(self):
         with self._state.condition:
-
-            def _done():
-                return self._state.code is not None
-
-            _common.wait(self._state.condition.wait, _done)
+            while self._state.code is None:
+                self._state.condition.wait()
             return self._state.code
 
     def details(self):
         with self._state.condition:
-
-            def _done():
-                return self._state.details is not None
-
-            _common.wait(self._state.condition.wait, _done)
+            while self._state.details is None:
+                self._state.condition.wait()
             return _common.decode(self._state.details)
 
     def debug_error_string(self):
         with self._state.condition:
-
-            def _done():
-                return self._state.debug_error_string is not None
-
-            _common.wait(self._state.condition.wait, _done)
+            while self._state.debug_error_string is None:
+                self._state.condition.wait()
             return _common.decode(self._state.debug_error_string)
 
     def _repr(self):
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index b4b2473..f69127e 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -15,7 +15,6 @@
 
 import logging
 
-import time
 import six
 
 import grpc
@@ -61,8 +60,6 @@
         CYGRPC_STATUS_CODE_TO_STATUS_CODE)
 }
 
-MAXIMUM_WAIT_TIMEOUT = 0.1
-
 
 def encode(s):
     if isinstance(s, bytes):
@@ -99,50 +96,3 @@
 
 def fully_qualified_method(group, method):
     return '/{}/{}'.format(group, method)
-
-
-def _wait_once(wait_fn, timeout, spin_cb):
-    wait_fn(timeout=timeout)
-    if spin_cb is not None:
-        spin_cb()
-
-
-def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None):
-    """Blocks waiting for an event without blocking the thread indefinitely.
-
-    See https://github.com/grpc/grpc/issues/19464 for full context. CPython's
-    `threading.Event.wait` and `threading.Condition.wait` methods, if invoked
-    without a timeout kwarg, may block the calling thread indefinitely. If the
-    call is made from the main thread, this means that signal handlers may not
-    run for an arbitrarily long period of time.
-
-    This wrapper calls the supplied wait function with an arbitrary short
-    timeout to ensure that no signal handler has to wait longer than
-    MAXIMUM_WAIT_TIMEOUT before executing.
-
-    Args:
-      wait_fn: A callable acceptable a single float-valued kwarg named
-        `timeout`. This function is expected to be one of `threading.Event.wait`
-        or `threading.Condition.wait`.
-      wait_complete_fn: A callable taking no arguments and returning a bool.
-        When this function returns true, it indicates that waiting should cease.
-      timeout: An optional float-valued number of seconds after which the wait
-        should cease.
-      spin_cb: An optional Callable taking no arguments and returning nothing.
-        This callback will be called on each iteration of the spin. This may be
-        used for, e.g. work related to forking.
-
-    Returns:
-      True if a timeout was supplied and it was reached. False otherwise.
-    """
-    if timeout is None:
-        while not wait_complete_fn():
-            _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
-    else:
-        end = time.time() + timeout
-        while not wait_complete_fn():
-            remaining = min(end - time.time(), MAXIMUM_WAIT_TIMEOUT)
-            if remaining < 0:
-                return True
-            _wait_once(wait_fn, remaining, spin_cb)
-    return False
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py
index 166cea1..dc0795d 100644
--- a/src/python/grpcio_tests/commands.py
+++ b/src/python/grpcio_tests/commands.py
@@ -145,8 +145,6 @@
         'unit._exit_test.ExitTest.test_in_flight_partial_unary_stream_call',
         'unit._exit_test.ExitTest.test_in_flight_partial_stream_unary_call',
         'unit._exit_test.ExitTest.test_in_flight_partial_stream_stream_call',
-        # TODO(https://github.com/grpc/grpc/issues/18980): Reenable.
-        'unit._signal_handling_test.SignalHandlingTest',
         'unit._metadata_flags_test',
         'health_check._health_servicer_test.HealthServicerTest.test_cancelled_watch_removed_from_watch_list',
         # TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index 16ba484..cc08d56 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -67,7 +67,6 @@
   "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth",
   "unit._server_test.ServerTest",
   "unit._session_cache_test.SSLSessionCacheTest",
-  "unit._signal_handling_test.SignalHandlingTest",
   "unit._version_test.VersionTest",
   "unit.beta._beta_features_test.BetaFeaturesTest",
   "unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel
index d21f5a5..a161794 100644
--- a/src/python/grpcio_tests/tests/unit/BUILD.bazel
+++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel
@@ -16,7 +16,6 @@
     "_credentials_test.py",
     "_dns_resolver_test.py",
     "_empty_message_test.py",
-    "_error_message_encoding_test.py",
     "_exit_test.py",
     "_interceptor_test.py",
     "_invalid_metadata_test.py",
@@ -28,7 +27,6 @@
     # "_reconnect_test.py",
     "_resource_exhausted_test.py",
     "_rpc_test.py",
-    "_signal_handling_test.py",
     # TODO(ghostwriternr): To be added later.
     # "_server_ssl_cert_config_test.py",
     "_server_test.py",
@@ -42,11 +40,6 @@
 )
 
 py_library(
-    name = "_signal_client",
-    srcs = ["_signal_client.py"],
-)
-
-py_library(
     name = "resources",
     srcs = ["resources.py"],
     data=[
@@ -94,7 +87,6 @@
             ":_server_shutdown_scenarios",
             ":_from_grpc_import_star",
             ":_tcp_proxy",
-            ":_signal_client",
             "//src/python/grpcio_tests/tests/unit/framework/common",
             "//src/python/grpcio_tests/tests/testing",
             requirement('six'),
diff --git a/src/python/grpcio_tests/tests/unit/_signal_client.py b/src/python/grpcio_tests/tests/unit/_signal_client.py
deleted file mode 100644
index 65ddd6d..0000000
--- a/src/python/grpcio_tests/tests/unit/_signal_client.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Copyright 2019 the gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Client for testing responsiveness to signals."""
-
-from __future__ import print_function
-
-import argparse
-import functools
-import logging
-import signal
-import sys
-
-import grpc
-
-SIGTERM_MESSAGE = "Handling sigterm!"
-
-UNARY_UNARY = "/test/Unary"
-UNARY_STREAM = "/test/ServerStreaming"
-
-_MESSAGE = b'\x00\x00\x00'
-
-_ASSERTION_MESSAGE = "Control flow should never reach here."
-
-# NOTE(gnossen): We use a global variable here so that the signal handler can be
-# installed before the RPC begins. If we do not do this, then we may receive the
-# SIGINT before the signal handler is installed. I'm not happy with per-process
-# global state, but the per-process global state that is signal handlers
-# somewhat forces my hand.
-per_process_rpc_future = None
-
-
-def handle_sigint(unused_signum, unused_frame):
-    print(SIGTERM_MESSAGE)
-    if per_process_rpc_future is not None:
-        per_process_rpc_future.cancel()
-    sys.stderr.flush()
-    sys.exit(0)
-
-
-def main_unary(server_target):
-    """Initiate a unary RPC to be interrupted by a SIGINT."""
-    global per_process_rpc_future  # pylint: disable=global-statement
-    with grpc.insecure_channel(server_target) as channel:
-        multicallable = channel.unary_unary(UNARY_UNARY)
-        signal.signal(signal.SIGINT, handle_sigint)
-        per_process_rpc_future = multicallable.future(
-            _MESSAGE, wait_for_ready=True)
-        result = per_process_rpc_future.result()
-        assert False, _ASSERTION_MESSAGE
-
-
-def main_streaming(server_target):
-    """Initiate a streaming RPC to be interrupted by a SIGINT."""
-    global per_process_rpc_future  # pylint: disable=global-statement
-    with grpc.insecure_channel(server_target) as channel:
-        signal.signal(signal.SIGINT, handle_sigint)
-        per_process_rpc_future = channel.unary_stream(UNARY_STREAM)(
-            _MESSAGE, wait_for_ready=True)
-        for result in per_process_rpc_future:
-            pass
-        assert False, _ASSERTION_MESSAGE
-
-
-if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description='Signal test client.')
-    parser.add_argument('server', help='Server target')
-    parser.add_argument(
-        'arity', help='RPC arity', choices=('unary', 'streaming'))
-    args = parser.parse_args()
-    if args.arity == 'unary':
-        main_unary(args.server)
-    else:
-        main_streaming(args.server)
diff --git a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py
deleted file mode 100644
index 9d0d647..0000000
--- a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py
+++ /dev/null
@@ -1,158 +0,0 @@
-# Copyright 2019 the gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Test of responsiveness to signals."""
-
-from __future__ import print_function
-
-import logging
-import os
-import signal
-import subprocess
-import tempfile
-import threading
-import unittest
-import sys
-
-import grpc
-
-from tests.unit import test_common
-from tests.unit import _signal_client
-
-_CLIENT_PATH = os.path.abspath(os.path.realpath(_signal_client.__file__))
-_HOST = 'localhost'
-
-
-class _GenericHandler(grpc.GenericRpcHandler):
-
-    def __init__(self):
-        self._connected_clients_lock = threading.RLock()
-        self._connected_clients_event = threading.Event()
-        self._connected_clients = 0
-
-        self._unary_unary_handler = grpc.unary_unary_rpc_method_handler(
-            self._handle_unary_unary)
-        self._unary_stream_handler = grpc.unary_stream_rpc_method_handler(
-            self._handle_unary_stream)
-
-    def _on_client_connect(self):
-        with self._connected_clients_lock:
-            self._connected_clients += 1
-            self._connected_clients_event.set()
-
-    def _on_client_disconnect(self):
-        with self._connected_clients_lock:
-            self._connected_clients -= 1
-            if self._connected_clients == 0:
-                self._connected_clients_event.clear()
-
-    def await_connected_client(self):
-        """Blocks until a client connects to the server."""
-        self._connected_clients_event.wait()
-
-    def _handle_unary_unary(self, request, servicer_context):
-        """Handles a unary RPC.
-
-        Blocks until the client disconnects and then echoes.
-        """
-        stop_event = threading.Event()
-
-        def on_rpc_end():
-            self._on_client_disconnect()
-            stop_event.set()
-
-        servicer_context.add_callback(on_rpc_end)
-        self._on_client_connect()
-        stop_event.wait()
-        return request
-
-    def _handle_unary_stream(self, request, servicer_context):
-        """Handles a server streaming RPC.
-
-        Blocks until the client disconnects and then echoes.
-        """
-        stop_event = threading.Event()
-
-        def on_rpc_end():
-            self._on_client_disconnect()
-            stop_event.set()
-
-        servicer_context.add_callback(on_rpc_end)
-        self._on_client_connect()
-        stop_event.wait()
-        yield request
-
-    def service(self, handler_call_details):
-        if handler_call_details.method == _signal_client.UNARY_UNARY:
-            return self._unary_unary_handler
-        elif handler_call_details.method == _signal_client.UNARY_STREAM:
-            return self._unary_stream_handler
-        else:
-            return None
-
-
-def _read_stream(stream):
-    stream.seek(0)
-    return stream.read()
-
-
-class SignalHandlingTest(unittest.TestCase):
-
-    def setUp(self):
-        self._server = test_common.test_server()
-        self._port = self._server.add_insecure_port('{}:0'.format(_HOST))
-        self._handler = _GenericHandler()
-        self._server.add_generic_rpc_handlers((self._handler,))
-        self._server.start()
-
-    def tearDown(self):
-        self._server.stop(None)
-
-    @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows')
-    def testUnary(self):
-        """Tests that the server unary code path does not stall signal handlers."""
-        server_target = '{}:{}'.format(_HOST, self._port)
-        with tempfile.TemporaryFile(mode='r') as client_stdout:
-            with tempfile.TemporaryFile(mode='r') as client_stderr:
-                client = subprocess.Popen(
-                    (sys.executable, _CLIENT_PATH, server_target, 'unary'),
-                    stdout=client_stdout,
-                    stderr=client_stderr)
-                self._handler.await_connected_client()
-                client.send_signal(signal.SIGINT)
-                self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
-                client_stdout.seek(0)
-                self.assertIn(_signal_client.SIGTERM_MESSAGE,
-                              client_stdout.read())
-
-    @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows')
-    def testStreaming(self):
-        """Tests that the server streaming code path does not stall signal handlers."""
-        server_target = '{}:{}'.format(_HOST, self._port)
-        with tempfile.TemporaryFile(mode='r') as client_stdout:
-            with tempfile.TemporaryFile(mode='r') as client_stderr:
-                client = subprocess.Popen(
-                    (sys.executable, _CLIENT_PATH, server_target, 'streaming'),
-                    stdout=client_stdout,
-                    stderr=client_stderr)
-                self._handler.await_connected_client()
-                client.send_signal(signal.SIGINT)
-                self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
-                client_stdout.seek(0)
-                self.assertIn(_signal_client.SIGTERM_MESSAGE,
-                              client_stdout.read())
-
-
-if __name__ == '__main__':
-    logging.basicConfig()
-    unittest.main(verbosity=2)
diff --git a/test/core/security/BUILD b/test/core/security/BUILD
index d8dcdc2..835c0ad 100644
--- a/test/core/security/BUILD
+++ b/test/core/security/BUILD
@@ -171,6 +171,7 @@
         ":oauth2_utils",
         "//:gpr",
         "//:grpc",
+        "//:grpc++",
         "//test/core/util:grpc_test_util",
     ],
 )
diff --git a/test/core/security/fetch_oauth2.cc b/test/core/security/fetch_oauth2.cc
index d404368..1aa9997 100644
--- a/test/core/security/fetch_oauth2.cc
+++ b/test/core/security/fetch_oauth2.cc
@@ -26,53 +26,40 @@
 #include <grpc/support/log.h>
 #include <grpc/support/sync.h>
 
+#include "grpcpp/security/credentials_impl.h"
 #include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/iomgr/load_file.h"
 #include "src/core/lib/security/credentials/credentials.h"
 #include "src/core/lib/security/util/json_util.h"
+#include "src/cpp/client/secure_credentials.h"
 #include "test/core/security/oauth2_utils.h"
 #include "test/core/util/cmdline.h"
 
-static grpc_sts_credentials_options sts_options_from_json(grpc_json* json) {
-  grpc_sts_credentials_options options;
-  memset(&options, 0, sizeof(options));
-  grpc_error* error = GRPC_ERROR_NONE;
-  options.sts_endpoint_url =
-      grpc_json_get_string_property(json, "sts_endpoint_url", &error);
-  GRPC_LOG_IF_ERROR("STS credentials parsing", error);
-  options.resource = grpc_json_get_string_property(json, "resource", nullptr);
-  options.audience = grpc_json_get_string_property(json, "audience", nullptr);
-  options.scope = grpc_json_get_string_property(json, "scope", nullptr);
-  options.requested_token_type =
-      grpc_json_get_string_property(json, "requested_token_type", nullptr);
-  options.subject_token_path =
-      grpc_json_get_string_property(json, "subject_token_path", &error);
-  GRPC_LOG_IF_ERROR("STS credentials parsing", error);
-  options.subject_token_type =
-      grpc_json_get_string_property(json, "subject_token_type", &error);
-  GRPC_LOG_IF_ERROR("STS credentials parsing", error);
-  options.actor_token_path =
-      grpc_json_get_string_property(json, "actor_token_path", nullptr);
-  options.actor_token_type =
-      grpc_json_get_string_property(json, "actor_token_type", nullptr);
-  return options;
-}
-
 static grpc_call_credentials* create_sts_creds(const char* json_file_path) {
-  grpc_slice sts_options_slice;
-  GPR_ASSERT(GRPC_LOG_IF_ERROR(
-      "load_file", grpc_load_file(json_file_path, 1, &sts_options_slice)));
-  grpc_json* json = grpc_json_parse_string(
-      reinterpret_cast<char*>(GRPC_SLICE_START_PTR(sts_options_slice)));
-  if (json == nullptr) {
-    gpr_log(GPR_ERROR, "Invalid json");
-    return nullptr;
+  grpc_impl::experimental::StsCredentialsOptions options;
+  if (strlen(json_file_path) == 0) {
+    auto status =
+        grpc_impl::experimental::StsCredentialsOptionsFromEnv(&options);
+    if (!status.ok()) {
+      gpr_log(GPR_ERROR, "%s", status.error_message().c_str());
+      return nullptr;
+    }
+  } else {
+    grpc_slice sts_options_slice;
+    GPR_ASSERT(GRPC_LOG_IF_ERROR(
+        "load_file", grpc_load_file(json_file_path, 1, &sts_options_slice)));
+    auto status = grpc_impl::experimental::StsCredentialsOptionsFromJson(
+        reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(sts_options_slice)),
+        &options);
+    gpr_slice_unref(sts_options_slice);
+    if (!status.ok()) {
+      gpr_log(GPR_ERROR, "%s", status.error_message().c_str());
+      return nullptr;
+    }
   }
-  grpc_sts_credentials_options options = sts_options_from_json(json);
-  grpc_call_credentials* result =
-      grpc_sts_credentials_create(&options, nullptr);
-  grpc_json_destroy(json);
-  gpr_slice_unref(sts_options_slice);
+  grpc_sts_credentials_options opts =
+      grpc_impl::experimental::StsCredentialsCppToCoreOptions(options);
+  grpc_call_credentials* result = grpc_sts_credentials_create(&opts, nullptr);
   return result;
 }
 
@@ -99,9 +86,12 @@
   gpr_cmdline_add_string(cl, "json_refresh_token",
                          "File path of the json refresh token.",
                          &json_refresh_token_file_path);
-  gpr_cmdline_add_string(cl, "json_sts_options",
-                         "File path of the json sts options.",
-                         &json_sts_options_file_path);
+  gpr_cmdline_add_string(
+      cl, "json_sts_options",
+      "File path of the json sts options. If the path is empty, the program "
+      "will attempt to use the $STS_CREDENTIALS environment variable to access "
+      "a file containing the options.",
+      &json_sts_options_file_path);
   gpr_cmdline_add_flag(
       cl, "gce",
       "Get a token from the GCE metadata server (only works in GCE).",
diff --git a/test/cpp/client/credentials_test.cc b/test/cpp/client/credentials_test.cc
index e64e260..d560fb6 100644
--- a/test/cpp/client/credentials_test.cc
+++ b/test/cpp/client/credentials_test.cc
@@ -20,9 +20,14 @@
 
 #include <memory>
 
+#include <gmock/gmock.h>
 #include <grpc/grpc.h>
 #include <gtest/gtest.h>
 
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/tmpfile.h"
+#include "src/cpp/client/secure_credentials.h"
+
 namespace grpc {
 namespace testing {
 
@@ -39,6 +44,158 @@
   auto creds = GoogleDefaultCredentials();
 }
 
+TEST_F(CredentialsTest, StsCredentialsOptionsCppToCore) {
+  grpc::experimental::StsCredentialsOptions options;
+  options.token_exchange_service_uri = "https://foo.com/exchange";
+  options.resource = "resource";
+  options.audience = "audience";
+  options.scope = "scope";
+  // options.requested_token_type explicitly not set.
+  options.subject_token_path = "/foo/bar";
+  options.subject_token_type = "nice_token_type";
+  options.actor_token_path = "/foo/baz";
+  options.actor_token_type = "even_nicer_token_type";
+  grpc_sts_credentials_options core_opts =
+      grpc_impl::experimental::StsCredentialsCppToCoreOptions(options);
+  EXPECT_EQ(options.token_exchange_service_uri,
+            core_opts.token_exchange_service_uri);
+  EXPECT_EQ(options.resource, core_opts.resource);
+  EXPECT_EQ(options.audience, core_opts.audience);
+  EXPECT_EQ(options.scope, core_opts.scope);
+  EXPECT_EQ(options.requested_token_type, core_opts.requested_token_type);
+  EXPECT_EQ(options.subject_token_path, core_opts.subject_token_path);
+  EXPECT_EQ(options.subject_token_type, core_opts.subject_token_type);
+  EXPECT_EQ(options.actor_token_path, core_opts.actor_token_path);
+  EXPECT_EQ(options.actor_token_type, core_opts.actor_token_type);
+}
+
+TEST_F(CredentialsTest, StsCredentialsOptionsJson) {
+  const char valid_json[] = R"(
+  {
+    "token_exchange_service_uri": "https://foo/exchange",
+    "resource": "resource",
+    "audience": "audience",
+    "scope": "scope",
+    "requested_token_type": "requested_token_type",
+    "subject_token_path": "subject_token_path",
+    "subject_token_type": "subject_token_type",
+    "actor_token_path": "actor_token_path",
+    "actor_token_type": "actor_token_type"
+  })";
+  grpc::experimental::StsCredentialsOptions options;
+  EXPECT_TRUE(
+      grpc::experimental::StsCredentialsOptionsFromJson(valid_json, &options)
+          .ok());
+  EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange");
+  EXPECT_EQ(options.resource, "resource");
+  EXPECT_EQ(options.audience, "audience");
+  EXPECT_EQ(options.scope, "scope");
+  EXPECT_EQ(options.requested_token_type, "requested_token_type");
+  EXPECT_EQ(options.subject_token_path, "subject_token_path");
+  EXPECT_EQ(options.subject_token_type, "subject_token_type");
+  EXPECT_EQ(options.actor_token_path, "actor_token_path");
+  EXPECT_EQ(options.actor_token_type, "actor_token_type");
+
+  const char minimum_valid_json[] = R"(
+  {
+    "token_exchange_service_uri": "https://foo/exchange",
+    "subject_token_path": "subject_token_path",
+    "subject_token_type": "subject_token_type"
+  })";
+  EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(
+                  minimum_valid_json, &options)
+                  .ok());
+  EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange");
+  EXPECT_EQ(options.resource, "");
+  EXPECT_EQ(options.audience, "");
+  EXPECT_EQ(options.scope, "");
+  EXPECT_EQ(options.requested_token_type, "");
+  EXPECT_EQ(options.subject_token_path, "subject_token_path");
+  EXPECT_EQ(options.subject_token_type, "subject_token_type");
+  EXPECT_EQ(options.actor_token_path, "");
+  EXPECT_EQ(options.actor_token_type, "");
+
+  const char invalid_json[] = R"(
+  I'm not a valid JSON.
+  )";
+  EXPECT_EQ(
+      grpc::INVALID_ARGUMENT,
+      grpc::experimental::StsCredentialsOptionsFromJson(invalid_json, &options)
+          .error_code());
+
+  const char invalid_json_missing_subject_token_type[] = R"(
+  {
+    "token_exchange_service_uri": "https://foo/exchange",
+    "subject_token_path": "subject_token_path"
+  })";
+  auto status = grpc::experimental::StsCredentialsOptionsFromJson(
+      invalid_json_missing_subject_token_type, &options);
+  EXPECT_EQ(grpc::INVALID_ARGUMENT, status.error_code());
+  EXPECT_THAT(status.error_message(),
+              ::testing::HasSubstr("subject_token_type"));
+
+  const char invalid_json_missing_subject_token_path[] = R"(
+  {
+    "token_exchange_service_uri": "https://foo/exchange",
+    "subject_token_type": "subject_token_type"
+  })";
+  status = grpc::experimental::StsCredentialsOptionsFromJson(
+      invalid_json_missing_subject_token_path, &options);
+  EXPECT_EQ(grpc::INVALID_ARGUMENT, status.error_code());
+  EXPECT_THAT(status.error_message(),
+              ::testing::HasSubstr("subject_token_path"));
+
+  const char invalid_json_missing_token_exchange_uri[] = R"(
+  {
+    "subject_token_path": "subject_token_path",
+    "subject_token_type": "subject_token_type"
+  })";
+  status = grpc::experimental::StsCredentialsOptionsFromJson(
+      invalid_json_missing_token_exchange_uri, &options);
+  EXPECT_EQ(grpc::INVALID_ARGUMENT, status.error_code());
+  EXPECT_THAT(status.error_message(),
+              ::testing::HasSubstr("token_exchange_service_uri"));
+}
+
+TEST_F(CredentialsTest, StsCredentialsOptionsFromEnv) {
+  // Unset env and check expected failure.
+  gpr_unsetenv("STS_CREDENTIALS");
+  grpc::experimental::StsCredentialsOptions options;
+  auto status = grpc::experimental::StsCredentialsOptionsFromEnv(&options);
+  EXPECT_EQ(grpc::NOT_FOUND, status.error_code());
+
+  // Set env and check for success.
+  const char valid_json[] = R"(
+  {
+    "token_exchange_service_uri": "https://foo/exchange",
+    "subject_token_path": "subject_token_path",
+    "subject_token_type": "subject_token_type"
+  })";
+  char* creds_file_name;
+  FILE* creds_file = gpr_tmpfile("sts_creds_options", &creds_file_name);
+  ASSERT_NE(creds_file_name, nullptr);
+  ASSERT_NE(creds_file, nullptr);
+  ASSERT_EQ(sizeof(valid_json),
+            fwrite(valid_json, 1, sizeof(valid_json), creds_file));
+  fclose(creds_file);
+  gpr_setenv("STS_CREDENTIALS", creds_file_name);
+  gpr_free(creds_file_name);
+  status = grpc::experimental::StsCredentialsOptionsFromEnv(&options);
+  EXPECT_TRUE(status.ok());
+  EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange");
+  EXPECT_EQ(options.resource, "");
+  EXPECT_EQ(options.audience, "");
+  EXPECT_EQ(options.scope, "");
+  EXPECT_EQ(options.requested_token_type, "");
+  EXPECT_EQ(options.subject_token_path, "subject_token_path");
+  EXPECT_EQ(options.subject_token_type, "subject_token_type");
+  EXPECT_EQ(options.actor_token_path, "");
+  EXPECT_EQ(options.actor_token_type, "");
+
+  // Cleanup.
+  gpr_unsetenv("STS_CREDENTIALS");
+}
+
 }  // namespace testing
 }  // namespace grpc
 
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index d499f13..8fc0eac 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -133,6 +133,59 @@
   std::set<grpc::string> clients_;
 };
 
+class FakeResolverResponseGeneratorWrapper {
+ public:
+  FakeResolverResponseGeneratorWrapper()
+      : response_generator_(grpc_core::MakeRefCounted<
+                            grpc_core::FakeResolverResponseGenerator>()) {}
+
+  FakeResolverResponseGeneratorWrapper(
+      FakeResolverResponseGeneratorWrapper&& other) {
+    response_generator_ = std::move(other.response_generator_);
+  }
+
+  void SetNextResolution(const std::vector<int>& ports) {
+    grpc_core::ExecCtx exec_ctx;
+    response_generator_->SetResponse(BuildFakeResults(ports));
+  }
+
+  void SetNextResolutionUponError(const std::vector<int>& ports) {
+    grpc_core::ExecCtx exec_ctx;
+    response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
+  }
+
+  void SetFailureOnReresolution() {
+    grpc_core::ExecCtx exec_ctx;
+    response_generator_->SetFailureOnReresolution();
+  }
+
+  grpc_core::FakeResolverResponseGenerator* Get() const {
+    return response_generator_.get();
+  }
+
+ private:
+  static grpc_core::Resolver::Result BuildFakeResults(
+      const std::vector<int>& ports) {
+    grpc_core::Resolver::Result result;
+    for (const int& port : ports) {
+      char* lb_uri_str;
+      gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
+      grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
+      GPR_ASSERT(lb_uri != nullptr);
+      grpc_resolved_address address;
+      GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
+      result.addresses.emplace_back(address.addr, address.len,
+                                    nullptr /* args */);
+      grpc_uri_destroy(lb_uri);
+      gpr_free(lb_uri_str);
+    }
+    return result;
+  }
+
+  grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
+      response_generator_;
+};
+
 class ClientLbEnd2endTest : public ::testing::Test {
  protected:
   ClientLbEnd2endTest()
@@ -147,11 +200,7 @@
     GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
   }
 
-  void SetUp() override {
-    grpc_init();
-    response_generator_ =
-        grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
-  }
+  void SetUp() override { grpc_init(); }
 
   void TearDown() override {
     for (size_t i = 0; i < servers_.size(); ++i) {
@@ -186,38 +235,6 @@
     }
   }
 
-  grpc_core::Resolver::Result BuildFakeResults(const std::vector<int>& ports) {
-    grpc_core::Resolver::Result result;
-    for (const int& port : ports) {
-      char* lb_uri_str;
-      gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
-      grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
-      GPR_ASSERT(lb_uri != nullptr);
-      grpc_resolved_address address;
-      GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
-      result.addresses.emplace_back(address.addr, address.len,
-                                    nullptr /* args */);
-      grpc_uri_destroy(lb_uri);
-      gpr_free(lb_uri_str);
-    }
-    return result;
-  }
-
-  void SetNextResolution(const std::vector<int>& ports) {
-    grpc_core::ExecCtx exec_ctx;
-    response_generator_->SetResponse(BuildFakeResults(ports));
-  }
-
-  void SetNextResolutionUponError(const std::vector<int>& ports) {
-    grpc_core::ExecCtx exec_ctx;
-    response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
-  }
-
-  void SetFailureOnReresolution() {
-    grpc_core::ExecCtx exec_ctx;
-    response_generator_->SetFailureOnReresolution();
-  }
-
   std::vector<int> GetServersPorts(size_t start_index = 0) {
     std::vector<int> ports;
     for (size_t i = start_index; i < servers_.size(); ++i) {
@@ -226,6 +243,10 @@
     return ports;
   }
 
+  FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
+    return FakeResolverResponseGeneratorWrapper();
+  }
+
   std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
       const std::shared_ptr<Channel>& channel) {
     return grpc::testing::EchoTestService::NewStub(channel);
@@ -233,12 +254,13 @@
 
   std::shared_ptr<Channel> BuildChannel(
       const grpc::string& lb_policy_name,
+      const FakeResolverResponseGeneratorWrapper& response_generator,
       ChannelArguments args = ChannelArguments()) {
     if (lb_policy_name.size() > 0) {
       args.SetLoadBalancingPolicyName(lb_policy_name);
     }  // else, default to pick first
     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
-                    response_generator_.get());
+                    response_generator.Get());
     return ::grpc::CreateCustomChannel("fake:///", creds_, args);
   }
 
@@ -401,8 +423,6 @@
   const grpc::string server_host_;
   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
   std::vector<std::unique_ptr<ServerData>> servers_;
-  grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
-      response_generator_;
   const grpc::string kRequestMessage_;
   std::shared_ptr<ChannelCredentials> creds_;
 };
@@ -410,7 +430,8 @@
 TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("", response_generator);
   auto stub = BuildStub(channel);
   // Initial state should be IDLE.
   EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
@@ -423,7 +444,7 @@
   EXPECT_EQ(channel->GetState(false /* try_to_connect */),
             GRPC_CHANNEL_CONNECTING);
   // Return a resolver result, which allows the connection attempt to proceed.
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   // We should eventually transition into state READY.
   EXPECT_TRUE(WaitForChannelReady(channel.get()));
 }
@@ -432,9 +453,11 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("");  // test that pick first is the default.
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel(
+      "", response_generator);  // test that pick first is the default.
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   for (size_t i = 0; i < servers_.size(); ++i) {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
   }
@@ -454,19 +477,22 @@
 }
 
 TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
-  StartServers(1);                  // Single server
-  auto channel = BuildChannel("");  // test that pick first is the default.
+  StartServers(1);  // Single server
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel(
+      "", response_generator);  // test that pick first is the default.
   auto stub = BuildStub(channel);
-  SetNextResolution({servers_[0]->port_});
+  response_generator.SetNextResolution({servers_[0]->port_});
   WaitForServer(stub, 0, DEBUG_LOCATION);
   // Create a new channel and its corresponding PF LB policy, which will pick
   // the subchannels in READY state from the previous RPC against the same
   // target (even if it happened over a different channel, because subchannels
   // are globally reused). Progress should happen without any transition from
   // this READY state.
-  auto second_channel = BuildChannel("");
+  auto second_response_generator = BuildResolverResponseGenerator();
+  auto second_channel = BuildChannel("", second_response_generator);
   auto second_stub = BuildStub(second_channel);
-  SetNextResolution({servers_[0]->port_});
+  second_response_generator.SetNextResolution({servers_[0]->port_});
   CheckRpcSendOk(second_stub, DEBUG_LOCATION);
 }
 
@@ -479,16 +505,18 @@
                             grpc_pick_unused_port_or_die()};
   CreateServers(2, ports);
   StartServer(1);
-  auto channel1 = BuildChannel("pick_first", args);
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("pick_first", response_generator1, args);
   auto stub1 = BuildStub(channel1);
-  SetNextResolution(ports);
+  response_generator1.SetNextResolution(ports);
   // Wait for second server to be ready.
   WaitForServer(stub1, 1, DEBUG_LOCATION);
   // Create a second channel with the same addresses.  Its PF instance
   // should immediately pick the second subchannel, since it's already
   // in READY state.
-  auto channel2 = BuildChannel("pick_first", args);
-  SetNextResolution(ports);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("pick_first", response_generator2, args);
+  response_generator2.SetNextResolution(ports);
   // Check that the channel reports READY without waiting for the
   // initial backoff.
   EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
@@ -500,9 +528,10 @@
   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
   const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
-  auto channel = BuildChannel("pick_first", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // The channel won't become connected (there's no server).
   ASSERT_FALSE(channel->WaitForConnected(
       grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
@@ -529,9 +558,10 @@
   constexpr int kMinReconnectBackOffMs = 1000;
   args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
-  auto channel = BuildChannel("pick_first", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // Make connection delay a 10% longer than it's willing to in order to make
   // sure we are hitting the codepath that waits for the min reconnect backoff.
   gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
@@ -554,9 +584,10 @@
   constexpr int kInitialBackOffMs = 1000;
   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
-  auto channel = BuildChannel("pick_first", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // The channel won't become connected (there's no server).
   EXPECT_FALSE(
       channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
@@ -585,9 +616,10 @@
   constexpr int kInitialBackOffMs = 1000;
   args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
   const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
-  auto channel = BuildChannel("pick_first", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // Wait for connect, which should fail ~immediately, because the server
   // is not up.
   gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
@@ -628,21 +660,22 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
 
   std::vector<int> ports;
 
   // Perform one RPC against the first server.
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET [0] *******");
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
 
   // An empty update will result in the channel going into TRANSIENT_FAILURE.
   ports.clear();
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET none *******");
   grpc_connectivity_state channel_state;
   do {
@@ -654,7 +687,7 @@
   // Next update introduces servers_[1], making the channel recover.
   ports.clear();
   ports.emplace_back(servers_[1]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET [1] *******");
   WaitForServer(stub, 1, DEBUG_LOCATION);
   EXPECT_EQ(servers_[0]->service_.request_count(), 0);
@@ -662,7 +695,7 @@
   // And again for servers_[2]
   ports.clear();
   ports.emplace_back(servers_[2]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET [2] *******");
   WaitForServer(stub, 2, DEBUG_LOCATION);
   EXPECT_EQ(servers_[0]->service_.request_count(), 0);
@@ -676,14 +709,15 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
 
   std::vector<int> ports;
 
   // Perform one RPC against the first server.
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET [0] *******");
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(servers_[0]->service_.request_count(), 1);
@@ -693,7 +727,7 @@
   ports.clear();
   ports.emplace_back(servers_[1]->port_);
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** SET superset *******");
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   // We stick to the previously connected server.
@@ -710,12 +744,14 @@
   StartServers(kNumServers);
   std::vector<int> ports = GetServersPorts();
   // Create two channels that (by default) use the global subchannel pool.
-  auto channel1 = BuildChannel("pick_first");
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("pick_first", response_generator1);
   auto stub1 = BuildStub(channel1);
-  SetNextResolution(ports);
-  auto channel2 = BuildChannel("pick_first");
+  response_generator1.SetNextResolution(ports);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("pick_first", response_generator2);
   auto stub2 = BuildStub(channel2);
-  SetNextResolution(ports);
+  response_generator2.SetNextResolution(ports);
   WaitForServer(stub1, 0, DEBUG_LOCATION);
   // Send one RPC on each channel.
   CheckRpcSendOk(stub1, DEBUG_LOCATION);
@@ -735,12 +771,14 @@
   // Create two channels that use local subchannel pool.
   ChannelArguments args;
   args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
-  auto channel1 = BuildChannel("pick_first", args);
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("pick_first", response_generator1, args);
   auto stub1 = BuildStub(channel1);
-  SetNextResolution(ports);
-  auto channel2 = BuildChannel("pick_first", args);
+  response_generator1.SetNextResolution(ports);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("pick_first", response_generator2, args);
   auto stub2 = BuildStub(channel2);
-  SetNextResolution(ports);
+  response_generator2.SetNextResolution(ports);
   WaitForServer(stub1, 0, DEBUG_LOCATION);
   // Send one RPC on each channel.
   CheckRpcSendOk(stub1, DEBUG_LOCATION);
@@ -756,13 +794,14 @@
   const int kNumUpdates = 1000;
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
   std::vector<int> ports = GetServersPorts();
   for (size_t i = 0; i < kNumUpdates; ++i) {
     std::shuffle(ports.begin(), ports.end(),
                  std::mt19937(std::random_device()()));
-    SetNextResolution(ports);
+    response_generator.SetNextResolution(ports);
     // We should re-enter core at the end of the loop to give the resolution
     // setting closure a chance to run.
     if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -784,16 +823,17 @@
       dead_ports.emplace_back(grpc_pick_unused_port_or_die());
     }
   }
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
   // The initial resolution only contains dead ports. There won't be any
   // selected subchannel. Re-resolution will return the same result.
-  SetNextResolution(dead_ports);
+  response_generator.SetNextResolution(dead_ports);
   gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
   for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
   // Set a re-resolution result that contains reachable ports, so that the
   // pick_first LB policy can recover soon.
-  SetNextResolutionUponError(alive_ports);
+  response_generator.SetNextResolutionUponError(alive_ports);
   gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
   WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
   CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -805,9 +845,10 @@
 TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
   std::vector<int> ports = {grpc_pick_unused_port_or_die()};
   StartServers(1, ports);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
   WaitForServer(stub, 0, DEBUG_LOCATION);
   gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
@@ -824,9 +865,10 @@
                             grpc_pick_unused_port_or_die()};
   CreateServers(2, ports);
   StartServer(1);
-  auto channel = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("pick_first", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
   WaitForServer(stub, 1, DEBUG_LOCATION);
   gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
@@ -840,9 +882,10 @@
 TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
   std::vector<int> ports = {grpc_pick_unused_port_or_die()};
   StartServers(1, ports);
-  auto channel_1 = BuildChannel("pick_first");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel_1 = BuildChannel("pick_first", response_generator);
   auto stub_1 = BuildStub(channel_1);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
   WaitForServer(stub_1, 0, DEBUG_LOCATION);
   gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
@@ -851,13 +894,10 @@
   // create a new subchannel and hold a ref to it.
   StartServers(1, ports);
   gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
-  auto channel_2 = BuildChannel("pick_first");
+  auto response_generator_2 = BuildResolverResponseGenerator();
+  auto channel_2 = BuildChannel("pick_first", response_generator_2);
   auto stub_2 = BuildStub(channel_2);
-  // TODO(juanlishen): This resolution result will only be visible to channel 2
-  // since the response generator is only associated with channel 2 now. We
-  // should change the response generator to be able to deliver updates to
-  // multiple channels at once.
-  SetNextResolution(ports);
+  response_generator_2.SetNextResolution(ports);
   gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
   WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
   gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
@@ -883,13 +923,15 @@
   // Start server, send RPC, and make sure channel is READY.
   const int kNumServers = 1;
   StartServers(kNumServers);
-  auto channel = BuildChannel("");  // pick_first is the default.
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("", response_generator);  // pick_first is the default.
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
   // Stop server.  Channel should go into state IDLE.
-  SetFailureOnReresolution();
+  response_generator.SetFailureOnReresolution();
   servers_[0]->Shutdown();
   EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
@@ -897,14 +939,16 @@
 }
 
 TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
-  auto channel = BuildChannel("");  // pick_first is the default.
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("", response_generator);  // pick_first is the default.
   auto stub = BuildStub(channel);
   // Create a number of servers, but only start 1 of them.
   CreateServers(10);
   StartServer(0);
   // Initially resolve to first server and make sure it connects.
   gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
-  SetNextResolution({servers_[0]->port_});
+  response_generator.SetNextResolution({servers_[0]->port_});
   CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
   // Send a resolution update with the remaining servers, none of which are
@@ -916,7 +960,7 @@
   gpr_log(GPR_INFO,
           "Phase 2: Resolver update pointing to remaining "
           "(not started) servers.");
-  SetNextResolution(GetServersPorts(1 /* start_index */));
+  response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
   // RPCs will continue to be sent to the first server.
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   // Now stop the first server, so that the current subchannel list
@@ -947,9 +991,11 @@
   // Start server, send RPC, and make sure channel is READY.
   const int kNumServers = 1;
   StartServers(kNumServers);
-  auto channel = BuildChannel("");  // pick_first is the default.
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("", response_generator);  // pick_first is the default.
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
   // Stop server.  Channel should go into state IDLE.
@@ -958,13 +1004,13 @@
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
   // Now send resolver update that includes no addresses.  Channel
   // should stay in state IDLE.
-  SetNextResolution({});
+  response_generator.SetNextResolution({});
   EXPECT_FALSE(channel->WaitForStateChange(
       GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
   // Now bring the backend back up and send a non-empty resolver update,
   // and then try to send an RPC.  Channel should go back into state READY.
   StartServer(0);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
 }
@@ -973,9 +1019,10 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   // Wait until all backends are ready.
   do {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -999,18 +1046,20 @@
 
 TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
   StartServers(1);  // Single server
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution({servers_[0]->port_});
+  response_generator.SetNextResolution({servers_[0]->port_});
   WaitForServer(stub, 0, DEBUG_LOCATION);
   // Create a new channel and its corresponding RR LB policy, which will pick
   // the subchannels in READY state from the previous RPC against the same
   // target (even if it happened over a different channel, because subchannels
   // are globally reused). Progress should happen without any transition from
   // this READY state.
-  auto second_channel = BuildChannel("round_robin");
+  auto second_response_generator = BuildResolverResponseGenerator();
+  auto second_channel = BuildChannel("round_robin", second_response_generator);
   auto second_stub = BuildStub(second_channel);
-  SetNextResolution({servers_[0]->port_});
+  second_response_generator.SetNextResolution({servers_[0]->port_});
   CheckRpcSendOk(second_stub, DEBUG_LOCATION);
 }
 
@@ -1018,13 +1067,14 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
   std::vector<int> ports;
   // Start with a single server.
   gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 0, DEBUG_LOCATION);
   // Send RPCs. They should all go servers_[0]
   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -1036,7 +1086,7 @@
   gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
   ports.clear();
   ports.emplace_back(servers_[1]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   // Wait until update has been processed, as signaled by the second backend
   // receiving a request.
   EXPECT_EQ(0, servers_[1]->service_.request_count());
@@ -1050,7 +1100,7 @@
   gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
   ports.clear();
   ports.emplace_back(servers_[2]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 2, DEBUG_LOCATION);
   for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
   EXPECT_EQ(0, servers_[0]->service_.request_count());
@@ -1063,7 +1113,7 @@
   ports.emplace_back(servers_[0]->port_);
   ports.emplace_back(servers_[1]->port_);
   ports.emplace_back(servers_[2]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 0, DEBUG_LOCATION);
   WaitForServer(stub, 1, DEBUG_LOCATION);
   WaitForServer(stub, 2, DEBUG_LOCATION);
@@ -1075,7 +1125,7 @@
   // An empty update will result in the channel going into TRANSIENT_FAILURE.
   gpr_log(GPR_INFO, "*** NO BACKENDS ***");
   ports.clear();
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   grpc_connectivity_state channel_state;
   do {
     channel_state = channel->GetState(true /* try to connect */);
@@ -1086,7 +1136,7 @@
   gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
   ports.clear();
   ports.emplace_back(servers_[1]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 1, DEBUG_LOCATION);
   channel_state = channel->GetState(false /* try to connect */);
   ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
@@ -1097,13 +1147,14 @@
 TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
   std::vector<int> ports;
 
   // Start with a single server.
   ports.emplace_back(servers_[0]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 0, DEBUG_LOCATION);
   // Send RPCs. They should all go to servers_[0]
   for (size_t i = 0; i < 10; ++i) SendRpc(stub);
@@ -1116,7 +1167,7 @@
   servers_[1]->Shutdown();
   ports.emplace_back(servers_[1]->port_);
   ports.emplace_back(servers_[2]->port_);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   WaitForServer(stub, 0, DEBUG_LOCATION);
   WaitForServer(stub, 2, DEBUG_LOCATION);
 
@@ -1130,13 +1181,14 @@
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
   StartServers(kNumServers);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
   std::vector<int> ports = GetServersPorts();
   for (size_t i = 0; i < 1000; ++i) {
     std::shuffle(ports.begin(), ports.end(),
                  std::mt19937(std::random_device()()));
-    SetNextResolution(ports);
+    response_generator.SetNextResolution(ports);
     if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
   }
   // Check LB policy name for the channel.
@@ -1162,9 +1214,10 @@
     second_ports.push_back(grpc_pick_unused_port_or_die());
   }
   StartServers(kNumServers, first_ports);
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(first_ports);
+  response_generator.SetNextResolution(first_ports);
   // Send a number of RPCs, which succeed.
   for (size_t i = 0; i < 100; ++i) {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -1188,7 +1241,7 @@
   StartServers(kNumServers, second_ports);
   // Don't notify of the update. Wait for the LB policy's re-resolution to
   // "pull" the new ports.
-  SetNextResolutionUponError(second_ports);
+  response_generator.SetNextResolutionUponError(second_ports);
   gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
   gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
   // Client request should eventually (but still fairly soon) succeed.
@@ -1205,9 +1258,10 @@
   const int kNumServers = 3;
   StartServers(kNumServers);
   const auto ports = GetServersPorts();
-  auto channel = BuildChannel("round_robin");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(ports);
+  response_generator.SetNextResolution(ports);
   for (size_t i = 0; i < kNumServers; ++i) {
     WaitForServer(stub, i, DEBUG_LOCATION);
   }
@@ -1251,9 +1305,10 @@
   args.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name\"}}");
-  auto channel = BuildChannel("round_robin", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution({servers_[0]->port_});
+  response_generator.SetNextResolution({servers_[0]->port_});
   EXPECT_TRUE(WaitForChannelReady(channel.get()));
   CheckRpcSendOk(stub, DEBUG_LOCATION);
 }
@@ -1267,9 +1322,10 @@
   args.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name\"}}");
-  auto channel = BuildChannel("round_robin", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel = BuildChannel("round_robin", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   // Channel should not become READY, because health checks should be failing.
   gpr_log(GPR_INFO,
           "*** initial state: unknown health check service name for "
@@ -1341,15 +1397,17 @@
   args.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name\"}}");
-  auto channel1 = BuildChannel("round_robin", args);
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("round_robin", response_generator1, args);
   auto stub1 = BuildStub(channel1);
   std::vector<int> ports = GetServersPorts();
-  SetNextResolution(ports);
+  response_generator1.SetNextResolution(ports);
   // Create a channel with health checking enabled but inhibited.
   args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
-  auto channel2 = BuildChannel("round_robin", args);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("round_robin", response_generator2, args);
   auto stub2 = BuildStub(channel2);
-  SetNextResolution(ports);
+  response_generator2.SetNextResolution(ports);
   // First channel should not become READY, because health checks should be
   // failing.
   EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
@@ -1376,19 +1434,21 @@
   args.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name\"}}");
-  auto channel1 = BuildChannel("round_robin", args);
+  auto response_generator1 = BuildResolverResponseGenerator();
+  auto channel1 = BuildChannel("round_robin", response_generator1, args);
   auto stub1 = BuildStub(channel1);
   std::vector<int> ports = GetServersPorts();
-  SetNextResolution(ports);
+  response_generator1.SetNextResolution(ports);
   // Create a channel with health-checking enabled with a different
   // service name.
   ChannelArguments args2;
   args2.SetServiceConfigJSON(
       "{\"healthCheckConfig\": "
       "{\"serviceName\": \"health_check_service_name2\"}}");
-  auto channel2 = BuildChannel("round_robin", args2);
+  auto response_generator2 = BuildResolverResponseGenerator();
+  auto channel2 = BuildChannel("round_robin", response_generator2, args2);
   auto stub2 = BuildStub(channel2);
-  SetNextResolution(ports);
+  response_generator2.SetNextResolution(ports);
   // Allow health checks from channel 2 to succeed.
   servers_[0]->SetServingStatus("health_check_service_name2", true);
   // First channel should not become READY, because health checks should be
@@ -1438,9 +1498,11 @@
   const int kNumServers = 1;
   const int kNumRpcs = 10;
   StartServers(kNumServers);
-  auto channel = BuildChannel("intercept_trailing_metadata_lb");
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("intercept_trailing_metadata_lb", response_generator);
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   for (size_t i = 0; i < kNumRpcs; ++i) {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
   }
@@ -1470,9 +1532,11 @@
       "    }\n"
       "  } ]\n"
       "}");
-  auto channel = BuildChannel("intercept_trailing_metadata_lb", args);
+  auto response_generator = BuildResolverResponseGenerator();
+  auto channel =
+      BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
   auto stub = BuildStub(channel);
-  SetNextResolution(GetServersPorts());
+  response_generator.SetNextResolution(GetServersPorts());
   for (size_t i = 0; i < kNumRpcs; ++i) {
     CheckRpcSendOk(stub, DEBUG_LOCATION);
   }
diff --git a/tools/distrib/python/bazel_deps.sh b/tools/distrib/python/bazel_deps.sh
index 5f4ee1d..67896f1 100755
--- a/tools/distrib/python/bazel_deps.sh
+++ b/tools/distrib/python/bazel_deps.sh
@@ -26,6 +26,7 @@
   docker build -t bazel_local_img tools/dockerfile/test/sanity
   docker run -v "$(realpath .):/src/grpc/:ro" \
     -w /src/grpc/third_party/protobuf         \
+    --rm=true                                 \
     bazel_local_img                           \
     bazel query 'deps('$1')'
 fi
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 92b3757..0f560fd 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -1033,22 +1033,6 @@
     "headers": [], 
     "is_filegroup": false, 
     "language": "c", 
-    "name": "grpc_fetch_oauth2", 
-    "src": [
-      "test/core/security/fetch_oauth2.cc"
-    ], 
-    "third_party": false, 
-    "type": "target"
-  }, 
-  {
-    "deps": [
-      "gpr", 
-      "grpc", 
-      "grpc_test_util"
-    ], 
-    "headers": [], 
-    "is_filegroup": false, 
-    "language": "c", 
     "name": "grpc_ipv6_loopback_available_test", 
     "src": [
       "test/core/iomgr/grpc_ipv6_loopback_available_test.cc"
@@ -3882,6 +3866,23 @@
     "deps": [
       "gpr", 
       "grpc", 
+      "grpc++", 
+      "grpc_test_util"
+    ], 
+    "headers": [], 
+    "is_filegroup": false, 
+    "language": "c++", 
+    "name": "grpc_fetch_oauth2", 
+    "src": [
+      "test/core/security/fetch_oauth2.cc"
+    ], 
+    "third_party": false, 
+    "type": "target"
+  }, 
+  {
+    "deps": [
+      "gpr", 
+      "grpc", 
       "grpc_test_util"
     ], 
     "headers": [],