Merge pull request #19569 from jtattermusch/csharp_internal_span_use
C#: add System.Memory dependency and use Span<> internally for all target frameworks
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 22c926d..8dfa4e8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -332,7 +332,6 @@
add_dependencies(buildtests_c grpc_completion_queue_test)
add_dependencies(buildtests_c grpc_completion_queue_threading_test)
add_dependencies(buildtests_c grpc_credentials_test)
-add_dependencies(buildtests_c grpc_fetch_oauth2)
add_dependencies(buildtests_c grpc_ipv6_loopback_available_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c grpc_json_token_test)
@@ -634,6 +633,7 @@
add_dependencies(buildtests_cxx grpc_alts_credentials_options_test)
add_dependencies(buildtests_cxx grpc_cli)
add_dependencies(buildtests_cxx grpc_core_map_test)
+add_dependencies(buildtests_cxx grpc_fetch_oauth2)
add_dependencies(buildtests_cxx grpc_linux_system_roots_test)
add_dependencies(buildtests_cxx grpc_tool_test)
add_dependencies(buildtests_cxx grpclb_api_test)
@@ -8270,40 +8270,6 @@
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
-add_executable(grpc_fetch_oauth2
- test/core/security/fetch_oauth2.cc
-)
-
-
-target_include_directories(grpc_fetch_oauth2
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
- PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
- PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
- PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
- PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
- PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
- PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
- PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
- PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
-)
-
-target_link_libraries(grpc_fetch_oauth2
- ${_gRPC_ALLTARGETS_LIBRARIES}
- grpc_test_util
- grpc
- gpr
-)
-
- # avoid dependency on libstdc++
- if (_gRPC_CORE_NOSTDCXX_FLAGS)
- set_target_properties(grpc_fetch_oauth2 PROPERTIES LINKER_LANGUAGE C)
- target_compile_options(grpc_fetch_oauth2 PRIVATE $<$<COMPILE_LANGUAGE:CXX>:${_gRPC_CORE_NOSTDCXX_FLAGS}>)
- endif()
-
-endif (gRPC_BUILD_TESTS)
-if (gRPC_BUILD_TESTS)
-
add_executable(grpc_ipv6_loopback_available_test
test/core/iomgr/grpc_ipv6_loopback_available_test.cc
)
@@ -14080,6 +14046,45 @@
endif (gRPC_BUILD_CODEGEN)
if (gRPC_BUILD_TESTS)
+add_executable(grpc_fetch_oauth2
+ test/core/security/fetch_oauth2.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(grpc_fetch_oauth2
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
+ PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
+ PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
+ PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
+ PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
+ PRIVATE third_party/googletest/googletest/include
+ PRIVATE third_party/googletest/googletest
+ PRIVATE third_party/googletest/googlemock/include
+ PRIVATE third_party/googletest/googlemock
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(grpc_fetch_oauth2
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ grpc++
+ grpc
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(grpc_linux_system_roots_test
test/core/security/linux_system_roots_test.cc
third_party/googletest/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index c11462f..2a1bfad 100644
--- a/Makefile
+++ b/Makefile
@@ -1056,7 +1056,6 @@
grpc_completion_queue_threading_test: $(BINDIR)/$(CONFIG)/grpc_completion_queue_threading_test
grpc_create_jwt: $(BINDIR)/$(CONFIG)/grpc_create_jwt
grpc_credentials_test: $(BINDIR)/$(CONFIG)/grpc_credentials_test
-grpc_fetch_oauth2: $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2
grpc_ipv6_loopback_available_test: $(BINDIR)/$(CONFIG)/grpc_ipv6_loopback_available_test
grpc_json_token_test: $(BINDIR)/$(CONFIG)/grpc_json_token_test
grpc_jwt_verifier_test: $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test
@@ -1219,6 +1218,7 @@
grpc_core_map_test: $(BINDIR)/$(CONFIG)/grpc_core_map_test
grpc_cpp_plugin: $(BINDIR)/$(CONFIG)/grpc_cpp_plugin
grpc_csharp_plugin: $(BINDIR)/$(CONFIG)/grpc_csharp_plugin
+grpc_fetch_oauth2: $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2
grpc_linux_system_roots_test: $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test
grpc_node_plugin: $(BINDIR)/$(CONFIG)/grpc_node_plugin
grpc_objective_c_plugin: $(BINDIR)/$(CONFIG)/grpc_objective_c_plugin
@@ -1489,7 +1489,6 @@
$(BINDIR)/$(CONFIG)/grpc_completion_queue_test \
$(BINDIR)/$(CONFIG)/grpc_completion_queue_threading_test \
$(BINDIR)/$(CONFIG)/grpc_credentials_test \
- $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2 \
$(BINDIR)/$(CONFIG)/grpc_ipv6_loopback_available_test \
$(BINDIR)/$(CONFIG)/grpc_json_token_test \
$(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test \
@@ -1693,6 +1692,7 @@
$(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \
$(BINDIR)/$(CONFIG)/grpc_cli \
$(BINDIR)/$(CONFIG)/grpc_core_map_test \
+ $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2 \
$(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test \
$(BINDIR)/$(CONFIG)/grpc_tool_test \
$(BINDIR)/$(CONFIG)/grpclb_api_test \
@@ -1857,6 +1857,7 @@
$(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \
$(BINDIR)/$(CONFIG)/grpc_cli \
$(BINDIR)/$(CONFIG)/grpc_core_map_test \
+ $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2 \
$(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test \
$(BINDIR)/$(CONFIG)/grpc_tool_test \
$(BINDIR)/$(CONFIG)/grpclb_api_test \
@@ -10987,38 +10988,6 @@
endif
-GRPC_FETCH_OAUTH2_SRC = \
- test/core/security/fetch_oauth2.cc \
-
-GRPC_FETCH_OAUTH2_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_FETCH_OAUTH2_SRC))))
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL.
-
-$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: openssl_dep_error
-
-else
-
-
-
-$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: $(GRPC_FETCH_OAUTH2_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LD) $(LDFLAGS) $(GRPC_FETCH_OAUTH2_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2
-
-endif
-
-$(OBJDIR)/$(CONFIG)/test/core/security/fetch_oauth2.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
-
-deps_grpc_fetch_oauth2: $(GRPC_FETCH_OAUTH2_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(GRPC_FETCH_OAUTH2_OBJS:.o=.dep)
-endif
-endif
-
-
GRPC_IPV6_LOOPBACK_AVAILABLE_TEST_SRC = \
test/core/iomgr/grpc_ipv6_loopback_available_test.cc \
@@ -17134,6 +17103,49 @@
endif
+GRPC_FETCH_OAUTH2_SRC = \
+ test/core/security/fetch_oauth2.cc \
+
+GRPC_FETCH_OAUTH2_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_FETCH_OAUTH2_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
+
+$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/grpc_fetch_oauth2: $(PROTOBUF_DEP) $(GRPC_FETCH_OAUTH2_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(GRPC_FETCH_OAUTH2_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/security/fetch_oauth2.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_grpc_fetch_oauth2: $(GRPC_FETCH_OAUTH2_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(GRPC_FETCH_OAUTH2_OBJS:.o=.dep)
+endif
+endif
+
+
GRPC_LINUX_SYSTEM_ROOTS_TEST_SRC = \
test/core/security/linux_system_roots_test.cc \
diff --git a/build.yaml b/build.yaml
index 1c7be4e..4134ecd 100644
--- a/build.yaml
+++ b/build.yaml
@@ -2863,16 +2863,6 @@
- grpc_test_util
- grpc
- gpr
-- name: grpc_fetch_oauth2
- build: test
- run: false
- language: c
- src:
- - test/core/security/fetch_oauth2.cc
- deps:
- - grpc_test_util
- - grpc
- - gpr
- name: grpc_ipv6_loopback_available_test
build: test
language: c
@@ -4945,6 +4935,17 @@
deps:
- grpc_plugin_support
secure: false
+- name: grpc_fetch_oauth2
+ build: test
+ run: false
+ language: c++
+ src:
+ - test/core/security/fetch_oauth2.cc
+ deps:
+ - grpc_test_util
+ - grpc++
+ - grpc
+ - gpr
- name: grpc_linux_system_roots_test
gtest: true
build: test
diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h
index 8e4f26a..777142f 100644
--- a/include/grpc/grpc_security.h
+++ b/include/grpc/grpc_security.h
@@ -330,20 +330,20 @@
/** Options for creating STS Oauth Token Exchange credentials following the IETF
draft https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16.
- Optional fields may be set to NULL. It is the responsibility of the caller to
- ensure that the subject and actor tokens are refreshed on disk at the
- specified paths. This API is used for experimental purposes for now and may
- change in the future. */
+ Optional fields may be set to NULL or empty string. It is the responsibility
+ of the caller to ensure that the subject and actor tokens are refreshed on
+ disk at the specified paths. This API is used for experimental purposes for
+ now and may change in the future. */
typedef struct {
- const char* sts_endpoint_url; /* Required. */
- const char* resource; /* Optional. */
- const char* audience; /* Optional. */
- const char* scope; /* Optional. */
- const char* requested_token_type; /* Optional. */
- const char* subject_token_path; /* Required. */
- const char* subject_token_type; /* Required. */
- const char* actor_token_path; /* Optional. */
- const char* actor_token_type; /* Optional. */
+ const char* token_exchange_service_uri; /* Required. */
+ const char* resource; /* Optional. */
+ const char* audience; /* Optional. */
+ const char* scope; /* Optional. */
+ const char* requested_token_type; /* Optional. */
+ const char* subject_token_path; /* Required. */
+ const char* subject_token_type; /* Required. */
+ const char* actor_token_path; /* Optional. */
+ const char* actor_token_type; /* Optional. */
} grpc_sts_credentials_options;
/** Creates an STS credentials following the STS Token Exchanged specifed in the
diff --git a/include/grpcpp/security/credentials.h b/include/grpcpp/security/credentials.h
index b124d3d..5190b1b 100644
--- a/include/grpcpp/security/credentials.h
+++ b/include/grpcpp/security/credentials.h
@@ -106,6 +106,24 @@
namespace experimental {
+typedef ::grpc_impl::experimental::StsCredentialsOptions StsCredentialsOptions;
+
+static inline grpc::Status StsCredentialsOptionsFromJson(
+ const grpc::string& json_string, StsCredentialsOptions* options) {
+ return ::grpc_impl::experimental::StsCredentialsOptionsFromJson(json_string,
+ options);
+}
+
+static inline grpc::Status StsCredentialsOptionsFromEnv(
+ StsCredentialsOptions* options) {
+ return grpc_impl::experimental::StsCredentialsOptionsFromEnv(options);
+}
+
+static inline std::shared_ptr<grpc_impl::CallCredentials> StsCredentials(
+ const StsCredentialsOptions& options) {
+ return grpc_impl::experimental::StsCredentials(options);
+}
+
typedef ::grpc_impl::experimental::AltsCredentialsOptions
AltsCredentialsOptions;
diff --git a/include/grpcpp/security/credentials_impl.h b/include/grpcpp/security/credentials_impl.h
index 34920a5..e236512 100644
--- a/include/grpcpp/security/credentials_impl.h
+++ b/include/grpcpp/security/credentials_impl.h
@@ -259,6 +259,70 @@
namespace experimental {
+/// Options for creating STS Oauth Token Exchange credentials following the IETF
+/// draft https://tools.ietf.org/html/draft-ietf-oauth-token-exchange-16.
+/// Optional fields may be set to empty string. It is the responsibility of the
+/// caller to ensure that the subject and actor tokens are refreshed on disk at
+/// the specified paths.
+struct StsCredentialsOptions {
+ grpc::string token_exchange_service_uri; // Required.
+ grpc::string resource; // Optional.
+ grpc::string audience; // Optional.
+ grpc::string scope; // Optional.
+ grpc::string requested_token_type; // Optional.
+ grpc::string subject_token_path; // Required.
+ grpc::string subject_token_type; // Required.
+ grpc::string actor_token_path; // Optional.
+ grpc::string actor_token_type; // Optional.
+};
+
+/// Creates STS Options from a JSON string. The JSON schema is as follows:
+/// {
+/// "title": "STS Credentials Config",
+/// "type": "object",
+/// "required": ["token_exchange_service_uri", "subject_token_path",
+/// "subject_token_type"],
+/// "properties": {
+/// "token_exchange_service_uri": {
+/// "type": "string"
+/// },
+/// "resource": {
+/// "type": "string"
+/// },
+/// "audience": {
+/// "type": "string"
+/// },
+/// "scope": {
+/// "type": "string"
+/// },
+/// "requested_token_type": {
+/// "type": "string"
+/// },
+/// "subject_token_path": {
+/// "type": "string"
+/// },
+/// "subject_token_type": {
+/// "type": "string"
+/// },
+/// "actor_token_path" : {
+/// "type": "string"
+/// },
+/// "actor_token_type": {
+/// "type": "string"
+/// }
+/// }
+/// }
+grpc::Status StsCredentialsOptionsFromJson(const grpc::string& json_string,
+ StsCredentialsOptions* options);
+
+/// Creates STS credentials options from the $STS_CREDENTIALS environment
+/// variable. This environment variable points to the path of a JSON file
+/// comforming to the schema described above.
+grpc::Status StsCredentialsOptionsFromEnv(StsCredentialsOptions* options);
+
+std::shared_ptr<CallCredentials> StsCredentials(
+ const StsCredentialsOptions& options);
+
/// Options used to build AltsCredentials.
struct AltsCredentialsOptions {
/// service accounts of target endpoint that will be acceptable
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 43b28bc..4e7bb5e 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -433,6 +433,8 @@
class AresDnsResolverFactory : public ResolverFactory {
public:
+ bool IsValidUri(const grpc_uri* uri) const override { return true; }
+
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return OrphanablePtr<Resolver>(New<AresDnsResolver>(std::move(args)));
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 1abe9ad..bc6d73a 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -258,11 +258,16 @@
class NativeDnsResolverFactory : public ResolverFactory {
public:
- OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
- if (GPR_UNLIKELY(0 != strcmp(args.uri->authority, ""))) {
+ bool IsValidUri(const grpc_uri* uri) const override {
+ if (GPR_UNLIKELY(0 != strcmp(uri->authority, ""))) {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
- return OrphanablePtr<Resolver>(nullptr);
+ return false;
}
+ return true;
+ }
+
+ OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
+ if (!IsValidUri(args.uri)) return nullptr;
return OrphanablePtr<Resolver>(New<NativeDnsResolver>(std::move(args)));
}
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index ff728a3..c5a27f1 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -89,9 +89,15 @@
: Resolver(args.combiner, std::move(args.result_handler)) {
GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this,
grpc_combiner_scheduler(combiner()));
- channel_args_ = grpc_channel_args_copy(args.args);
FakeResolverResponseGenerator* response_generator =
FakeResolverResponseGenerator::GetFromArgs(args.args);
+ // Channels sharing the same subchannels may have different resolver response
+ // generators. If we don't remove this arg, subchannel pool will create new
+ // subchannels for the same address instead of reusing existing ones because
+ // of different values of this channel arg.
+ const char* args_to_remove[] = {GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
+ channel_args_ = grpc_channel_args_copy_and_remove(
+ args.args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove));
if (response_generator != nullptr) {
response_generator->resolver_ = this;
if (response_generator->has_result_) {
@@ -315,6 +321,8 @@
class FakeResolverFactory : public ResolverFactory {
public:
+ bool IsValidUri(const grpc_uri* uri) const override { return true; }
+
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return OrphanablePtr<Resolver>(New<FakeResolver>(std::move(args)));
}
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
index 517b929..532fd6d 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
@@ -80,24 +80,23 @@
void DoNothing(void* ignored) {}
-OrphanablePtr<Resolver> CreateSockaddrResolver(
- ResolverArgs args,
- bool parse(const grpc_uri* uri, grpc_resolved_address* dst)) {
- if (0 != strcmp(args.uri->authority, "")) {
+bool ParseUri(const grpc_uri* uri,
+ bool parse(const grpc_uri* uri, grpc_resolved_address* dst),
+ ServerAddressList* addresses) {
+ if (0 != strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "authority-based URIs not supported by the %s scheme",
- args.uri->scheme);
- return nullptr;
+ uri->scheme);
+ return false;
}
// Construct addresses.
grpc_slice path_slice =
- grpc_slice_new(args.uri->path, strlen(args.uri->path), DoNothing);
+ grpc_slice_new(uri->path, strlen(uri->path), DoNothing);
grpc_slice_buffer path_parts;
grpc_slice_buffer_init(&path_parts);
grpc_slice_split(path_slice, ",", &path_parts);
- ServerAddressList addresses;
bool errors_found = false;
for (size_t i = 0; i < path_parts.count; i++) {
- grpc_uri ith_uri = *args.uri;
+ grpc_uri ith_uri = *uri;
UniquePtr<char> part_str(grpc_slice_to_c_string(path_parts.slices[i]));
ith_uri.path = part_str.get();
grpc_resolved_address addr;
@@ -105,13 +104,20 @@
errors_found = true;
break;
}
- addresses.emplace_back(addr, nullptr /* args */);
+ if (addresses != nullptr) {
+ addresses->emplace_back(addr, nullptr /* args */);
+ }
}
grpc_slice_buffer_destroy_internal(&path_parts);
grpc_slice_unref_internal(path_slice);
- if (errors_found) {
- return OrphanablePtr<Resolver>(nullptr);
- }
+ return !errors_found;
+}
+
+OrphanablePtr<Resolver> CreateSockaddrResolver(
+ ResolverArgs args,
+ bool parse(const grpc_uri* uri, grpc_resolved_address* dst)) {
+ ServerAddressList addresses;
+ if (!ParseUri(args.uri, parse, &addresses)) return nullptr;
// Instantiate resolver.
return OrphanablePtr<Resolver>(
New<SockaddrResolver>(std::move(addresses), std::move(args)));
@@ -119,6 +125,10 @@
class IPv4ResolverFactory : public ResolverFactory {
public:
+ bool IsValidUri(const grpc_uri* uri) const override {
+ return ParseUri(uri, grpc_parse_ipv4, nullptr);
+ }
+
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return CreateSockaddrResolver(std::move(args), grpc_parse_ipv4);
}
@@ -128,6 +138,10 @@
class IPv6ResolverFactory : public ResolverFactory {
public:
+ bool IsValidUri(const grpc_uri* uri) const override {
+ return ParseUri(uri, grpc_parse_ipv6, nullptr);
+ }
+
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return CreateSockaddrResolver(std::move(args), grpc_parse_ipv6);
}
@@ -138,6 +152,10 @@
#ifdef GRPC_HAVE_UNIX_SOCKET
class UnixResolverFactory : public ResolverFactory {
public:
+ bool IsValidUri(const grpc_uri* uri) const override {
+ return ParseUri(uri, grpc_parse_unix, nullptr);
+ }
+
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return CreateSockaddrResolver(std::move(args), grpc_parse_unix);
}
diff --git a/src/core/ext/filters/client_channel/resolver_factory.h b/src/core/ext/filters/client_channel/resolver_factory.h
index 273fd8d..7fed48b 100644
--- a/src/core/ext/filters/client_channel/resolver_factory.h
+++ b/src/core/ext/filters/client_channel/resolver_factory.h
@@ -47,6 +47,10 @@
class ResolverFactory {
public:
+ /// Returns a bool indicating whether the input uri is valid to create a
+ /// resolver.
+ virtual bool IsValidUri(const grpc_uri* uri) const GRPC_ABSTRACT;
+
/// Returns a new resolver instance.
virtual OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const
GRPC_ABSTRACT;
diff --git a/src/core/ext/filters/client_channel/resolver_registry.cc b/src/core/ext/filters/client_channel/resolver_registry.cc
index 5b00eab..509c4ef 100644
--- a/src/core/ext/filters/client_channel/resolver_registry.cc
+++ b/src/core/ext/filters/client_channel/resolver_registry.cc
@@ -132,6 +132,17 @@
return g_state->LookupResolverFactory(scheme);
}
+bool ResolverRegistry::IsValidTarget(const char* target) {
+ grpc_uri* uri = nullptr;
+ char* canonical_target = nullptr;
+ ResolverFactory* factory =
+ g_state->FindResolverFactory(target, &uri, &canonical_target);
+ bool result = factory == nullptr ? false : factory->IsValidUri(uri);
+ grpc_uri_destroy(uri);
+ gpr_free(canonical_target);
+ return result;
+}
+
OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
const char* target, const grpc_channel_args* args,
grpc_pollset_set* pollset_set, grpc_combiner* combiner,
diff --git a/src/core/ext/filters/client_channel/resolver_registry.h b/src/core/ext/filters/client_channel/resolver_registry.h
index 0eec678..4248a06 100644
--- a/src/core/ext/filters/client_channel/resolver_registry.h
+++ b/src/core/ext/filters/client_channel/resolver_registry.h
@@ -50,6 +50,9 @@
static void RegisterResolverFactory(UniquePtr<ResolverFactory> factory);
};
+ /// Checks whether the user input \a target is valid to create a resolver.
+ static bool IsValidTarget(const char* target);
+
/// Creates a resolver given \a target.
/// First tries to parse \a target as a URI. If this succeeds, tries
/// to locate a registered resolver factory based on the URI scheme.
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
index 06fa8bb..7a58483 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
@@ -18,6 +18,7 @@
#include <grpc/support/port_platform.h>
+#include "src/core/lib/json/json.h"
#include "src/core/lib/security/credentials/oauth2/oauth2_credentials.h"
#include <string.h>
@@ -641,8 +642,8 @@
*sts_url_out = nullptr;
InlinedVector<grpc_error*, 3> error_list;
UniquePtr<grpc_uri, GrpcUriDeleter> sts_url(
- options->sts_endpoint_url != nullptr
- ? grpc_uri_parse(options->sts_endpoint_url, false)
+ options->token_exchange_service_uri != nullptr
+ ? grpc_uri_parse(options->token_exchange_service_uri, false)
: nullptr);
if (sts_url == nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc
index 2b69f57..7eedad3 100644
--- a/src/core/lib/slice/slice_buffer.cc
+++ b/src/core/lib/slice/slice_buffer.cc
@@ -262,9 +262,9 @@
src->length = 0;
}
+template <bool incref>
static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer* src, size_t n,
- grpc_slice_buffer* dst,
- bool incref) {
+ grpc_slice_buffer* dst) {
GPR_ASSERT(src->length >= n);
if (src->length == n) {
grpc_slice_buffer_move_into(src, dst);
@@ -304,12 +304,12 @@
void grpc_slice_buffer_move_first(grpc_slice_buffer* src, size_t n,
grpc_slice_buffer* dst) {
- slice_buffer_move_first_maybe_ref(src, n, dst, true);
+ slice_buffer_move_first_maybe_ref<true>(src, n, dst);
}
void grpc_slice_buffer_move_first_no_ref(grpc_slice_buffer* src, size_t n,
grpc_slice_buffer* dst) {
- slice_buffer_move_first_maybe_ref(src, n, dst, false);
+ slice_buffer_move_first_maybe_ref<false>(src, n, dst);
}
void grpc_slice_buffer_move_first_into_buffer(grpc_slice_buffer* src, size_t n,
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index d73b3e0..5de5a76 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -17,13 +17,23 @@
*/
#include "src/cpp/client/secure_credentials.h"
+
+#include <grpc/impl/codegen/slice.h>
+#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpcpp/channel.h>
+#include <grpcpp/impl/codegen/status_code_enum.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/support/channel_arguments.h>
+
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/load_file.h"
+#include "src/core/lib/json/json.h"
#include "src/core/lib/security/transport/auth_filters.h"
+#include "src/core/lib/security/util/json_util.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/common/secure_auth_context.h"
@@ -105,6 +115,144 @@
namespace experimental {
+namespace {
+
+void ClearStsCredentialsOptions(StsCredentialsOptions* options) {
+ if (options == nullptr) return;
+ options->token_exchange_service_uri.clear();
+ options->resource.clear();
+ options->audience.clear();
+ options->scope.clear();
+ options->requested_token_type.clear();
+ options->subject_token_path.clear();
+ options->subject_token_type.clear();
+ options->actor_token_path.clear();
+ options->actor_token_type.clear();
+}
+
+} // namespace
+
+// Builds STS credentials options from JSON.
+grpc::Status StsCredentialsOptionsFromJson(const grpc::string& json_string,
+ StsCredentialsOptions* options) {
+ struct GrpcJsonDeleter {
+ void operator()(grpc_json* json) { grpc_json_destroy(json); }
+ };
+ if (options == nullptr) {
+ return grpc::Status(grpc::INVALID_ARGUMENT, "options cannot be nullptr.");
+ }
+ ClearStsCredentialsOptions(options);
+ std::vector<char> scratchpad(json_string.c_str(),
+ json_string.c_str() + json_string.size() + 1);
+ std::unique_ptr<grpc_json, GrpcJsonDeleter> json(
+ grpc_json_parse_string(&scratchpad[0]));
+ if (json == nullptr) {
+ return grpc::Status(grpc::INVALID_ARGUMENT, "Invalid json.");
+ }
+
+ // Required fields.
+ const char* value = grpc_json_get_string_property(
+ json.get(), "token_exchange_service_uri", nullptr);
+ if (value == nullptr) {
+ ClearStsCredentialsOptions(options);
+ return grpc::Status(grpc::INVALID_ARGUMENT,
+ "token_exchange_service_uri must be specified.");
+ }
+ options->token_exchange_service_uri.assign(value);
+ value =
+ grpc_json_get_string_property(json.get(), "subject_token_path", nullptr);
+ if (value == nullptr) {
+ ClearStsCredentialsOptions(options);
+ return grpc::Status(grpc::INVALID_ARGUMENT,
+ "subject_token_path must be specified.");
+ }
+ options->subject_token_path.assign(value);
+ value =
+ grpc_json_get_string_property(json.get(), "subject_token_type", nullptr);
+ if (value == nullptr) {
+ ClearStsCredentialsOptions(options);
+ return grpc::Status(grpc::INVALID_ARGUMENT,
+ "subject_token_type must be specified.");
+ }
+ options->subject_token_type.assign(value);
+
+ // Optional fields.
+ value = grpc_json_get_string_property(json.get(), "resource", nullptr);
+ if (value != nullptr) options->resource.assign(value);
+ value = grpc_json_get_string_property(json.get(), "audience", nullptr);
+ if (value != nullptr) options->audience.assign(value);
+ value = grpc_json_get_string_property(json.get(), "scope", nullptr);
+ if (value != nullptr) options->scope.assign(value);
+ value = grpc_json_get_string_property(json.get(), "requested_token_type",
+ nullptr);
+ if (value != nullptr) options->requested_token_type.assign(value);
+ value =
+ grpc_json_get_string_property(json.get(), "actor_token_path", nullptr);
+ if (value != nullptr) options->actor_token_path.assign(value);
+ value =
+ grpc_json_get_string_property(json.get(), "actor_token_type", nullptr);
+ if (value != nullptr) options->actor_token_type.assign(value);
+
+ return grpc::Status();
+}
+
+// Builds STS credentials Options from the $STS_CREDENTIALS env var.
+grpc::Status StsCredentialsOptionsFromEnv(StsCredentialsOptions* options) {
+ if (options == nullptr) {
+ return grpc::Status(grpc::INVALID_ARGUMENT, "options cannot be nullptr.");
+ }
+ ClearStsCredentialsOptions(options);
+ grpc_slice json_string = grpc_empty_slice();
+ char* sts_creds_path = gpr_getenv("STS_CREDENTIALS");
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc::Status status;
+ auto cleanup = [&json_string, &sts_creds_path, &error, &status]() {
+ grpc_slice_unref_internal(json_string);
+ gpr_free(sts_creds_path);
+ GRPC_ERROR_UNREF(error);
+ return status;
+ };
+
+ if (sts_creds_path == nullptr) {
+ status = grpc::Status(grpc::NOT_FOUND,
+ "STS_CREDENTIALS environment variable not set.");
+ return cleanup();
+ }
+ error = grpc_load_file(sts_creds_path, 1, &json_string);
+ if (error != GRPC_ERROR_NONE) {
+ status = grpc::Status(grpc::NOT_FOUND, grpc_error_string(error));
+ return cleanup();
+ }
+ status = StsCredentialsOptionsFromJson(
+ reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(json_string)),
+ options);
+ return cleanup();
+}
+
+// C++ to Core STS Credentials options.
+grpc_sts_credentials_options StsCredentialsCppToCoreOptions(
+ const StsCredentialsOptions& options) {
+ grpc_sts_credentials_options opts;
+ memset(&opts, 0, sizeof(opts));
+ opts.token_exchange_service_uri = options.token_exchange_service_uri.c_str();
+ opts.resource = options.resource.c_str();
+ opts.audience = options.audience.c_str();
+ opts.scope = options.scope.c_str();
+ opts.requested_token_type = options.requested_token_type.c_str();
+ opts.subject_token_path = options.subject_token_path.c_str();
+ opts.subject_token_type = options.subject_token_type.c_str();
+ opts.actor_token_path = options.actor_token_path.c_str();
+ opts.actor_token_type = options.actor_token_type.c_str();
+ return opts;
+}
+
+// Builds STS credentials.
+std::shared_ptr<CallCredentials> StsCredentials(
+ const StsCredentialsOptions& options) {
+ auto opts = StsCredentialsCppToCoreOptions(options);
+ return WrapCallCredentials(grpc_sts_credentials_create(&opts, nullptr));
+}
+
// Builds ALTS Credentials given ALTS specific options
std::shared_ptr<ChannelCredentials> AltsCredentials(
const AltsCredentialsOptions& options) {
diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h
index dd379ca..ed14df4 100644
--- a/src/cpp/client/secure_credentials.h
+++ b/src/cpp/client/secure_credentials.h
@@ -22,6 +22,7 @@
#include <grpc/grpc_security.h>
#include <grpcpp/security/credentials.h>
+#include <grpcpp/security/credentials_impl.h>
#include <grpcpp/support/config.h>
#include "src/core/lib/security/credentials/credentials.h"
@@ -68,6 +69,16 @@
grpc_call_credentials* const c_creds_;
};
+namespace experimental {
+
+// Transforms C++ STS Credentials options to core options. The pointers of the
+// resulting core options point to the memory held by the C++ options so C++
+// options need to be kept alive until after the core credentials creation.
+grpc_sts_credentials_options StsCredentialsCppToCoreOptions(
+ const StsCredentialsOptions& options);
+
+} // namespace experimental
+
} // namespace grpc_impl
namespace grpc {
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 0bf8e03..24f928e 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -13,7 +13,6 @@
# limitations under the License.
"""Invocation-side implementation of gRPC Python."""
-import functools
import logging
import sys
import threading
@@ -82,6 +81,17 @@
unknown_cygrpc_code, details)
+def _wait_once_until(condition, until):
+ if until is None:
+ condition.wait()
+ else:
+ remaining = until - time.time()
+ if remaining < 0:
+ raise grpc.FutureTimeoutError()
+ else:
+ condition.wait(timeout=remaining)
+
+
class _RPCState(object):
def __init__(self, due, initial_metadata, trailing_metadata, code, details):
@@ -168,11 +178,12 @@
#pylint: disable=too-many-statements
def _consume_request_iterator(request_iterator, state, call, request_serializer,
event_handler):
- """Consume a request iterator supplied by the user."""
+ if cygrpc.is_fork_support_enabled():
+ condition_wait_timeout = 1.0
+ else:
+ condition_wait_timeout = None
def consume_request_iterator(): # pylint: disable=too-many-branches
- # Iterate over the request iterator until it is exhausted or an error
- # condition is encountered.
while True:
return_from_user_request_generator_invoked = False
try:
@@ -213,19 +224,14 @@
state.due.add(cygrpc.OperationType.send_message)
else:
return
-
- def _done():
- return (state.code is not None or
- cygrpc.OperationType.send_message not in
- state.due)
-
- _common.wait(
- state.condition.wait,
- _done,
- spin_cb=functools.partial(
- cygrpc.block_if_fork_in_progress, state))
- if state.code is not None:
- return
+ while True:
+ state.condition.wait(condition_wait_timeout)
+ cygrpc.block_if_fork_in_progress(state)
+ if state.code is None:
+ if cygrpc.OperationType.send_message not in state.due:
+ break
+ else:
+ return
else:
return
with state.condition:
@@ -275,21 +281,13 @@
with self._state.condition:
return self._state.code is not None
- def _is_complete(self):
- return self._state.code is not None
-
def result(self, timeout=None):
- """Returns the result of the computation or raises its exception.
-
- See grpc.Future.result for the full API contract.
- """
+ until = None if timeout is None else time.time() + timeout
with self._state.condition:
- timed_out = _common.wait(
- self._state.condition.wait, self._is_complete, timeout=timeout)
- if timed_out:
- raise grpc.FutureTimeoutError()
- else:
- if self._state.code is grpc.StatusCode.OK:
+ while True:
+ if self._state.code is None:
+ _wait_once_until(self._state.condition, until)
+ elif self._state.code is grpc.StatusCode.OK:
return self._state.response
elif self._state.cancelled:
raise grpc.FutureCancelledError()
@@ -297,17 +295,12 @@
raise self
def exception(self, timeout=None):
- """Return the exception raised by the computation.
-
- See grpc.Future.exception for the full API contract.
- """
+ until = None if timeout is None else time.time() + timeout
with self._state.condition:
- timed_out = _common.wait(
- self._state.condition.wait, self._is_complete, timeout=timeout)
- if timed_out:
- raise grpc.FutureTimeoutError()
- else:
- if self._state.code is grpc.StatusCode.OK:
+ while True:
+ if self._state.code is None:
+ _wait_once_until(self._state.condition, until)
+ elif self._state.code is grpc.StatusCode.OK:
return None
elif self._state.cancelled:
raise grpc.FutureCancelledError()
@@ -315,17 +308,12 @@
return self
def traceback(self, timeout=None):
- """Access the traceback of the exception raised by the computation.
-
- See grpc.future.traceback for the full API contract.
- """
+ until = None if timeout is None else time.time() + timeout
with self._state.condition:
- timed_out = _common.wait(
- self._state.condition.wait, self._is_complete, timeout=timeout)
- if timed_out:
- raise grpc.FutureTimeoutError()
- else:
- if self._state.code is grpc.StatusCode.OK:
+ while True:
+ if self._state.code is None:
+ _wait_once_until(self._state.condition, until)
+ elif self._state.code is grpc.StatusCode.OK:
return None
elif self._state.cancelled:
raise grpc.FutureCancelledError()
@@ -357,23 +345,17 @@
raise StopIteration()
else:
raise self
-
- def _response_ready():
- return (
- self._state.response is not None or
- (cygrpc.OperationType.receive_message not in self._state.due
- and self._state.code is not None))
-
- _common.wait(self._state.condition.wait, _response_ready)
- if self._state.response is not None:
- response = self._state.response
- self._state.response = None
- return response
- elif cygrpc.OperationType.receive_message not in self._state.due:
- if self._state.code is grpc.StatusCode.OK:
- raise StopIteration()
- elif self._state.code is not None:
- raise self
+ while True:
+ self._state.condition.wait()
+ if self._state.response is not None:
+ response = self._state.response
+ self._state.response = None
+ return response
+ elif cygrpc.OperationType.receive_message not in self._state.due:
+ if self._state.code is grpc.StatusCode.OK:
+ raise StopIteration()
+ elif self._state.code is not None:
+ raise self
def __iter__(self):
return self
@@ -404,47 +386,32 @@
def initial_metadata(self):
with self._state.condition:
-
- def _done():
- return self._state.initial_metadata is not None
-
- _common.wait(self._state.condition.wait, _done)
+ while self._state.initial_metadata is None:
+ self._state.condition.wait()
return self._state.initial_metadata
def trailing_metadata(self):
with self._state.condition:
-
- def _done():
- return self._state.trailing_metadata is not None
-
- _common.wait(self._state.condition.wait, _done)
+ while self._state.trailing_metadata is None:
+ self._state.condition.wait()
return self._state.trailing_metadata
def code(self):
with self._state.condition:
-
- def _done():
- return self._state.code is not None
-
- _common.wait(self._state.condition.wait, _done)
+ while self._state.code is None:
+ self._state.condition.wait()
return self._state.code
def details(self):
with self._state.condition:
-
- def _done():
- return self._state.details is not None
-
- _common.wait(self._state.condition.wait, _done)
+ while self._state.details is None:
+ self._state.condition.wait()
return _common.decode(self._state.details)
def debug_error_string(self):
with self._state.condition:
-
- def _done():
- return self._state.debug_error_string is not None
-
- _common.wait(self._state.condition.wait, _done)
+ while self._state.debug_error_string is None:
+ self._state.condition.wait()
return _common.decode(self._state.debug_error_string)
def _repr(self):
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index b4b2473..f69127e 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -15,7 +15,6 @@
import logging
-import time
import six
import grpc
@@ -61,8 +60,6 @@
CYGRPC_STATUS_CODE_TO_STATUS_CODE)
}
-MAXIMUM_WAIT_TIMEOUT = 0.1
-
def encode(s):
if isinstance(s, bytes):
@@ -99,50 +96,3 @@
def fully_qualified_method(group, method):
return '/{}/{}'.format(group, method)
-
-
-def _wait_once(wait_fn, timeout, spin_cb):
- wait_fn(timeout=timeout)
- if spin_cb is not None:
- spin_cb()
-
-
-def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None):
- """Blocks waiting for an event without blocking the thread indefinitely.
-
- See https://github.com/grpc/grpc/issues/19464 for full context. CPython's
- `threading.Event.wait` and `threading.Condition.wait` methods, if invoked
- without a timeout kwarg, may block the calling thread indefinitely. If the
- call is made from the main thread, this means that signal handlers may not
- run for an arbitrarily long period of time.
-
- This wrapper calls the supplied wait function with an arbitrary short
- timeout to ensure that no signal handler has to wait longer than
- MAXIMUM_WAIT_TIMEOUT before executing.
-
- Args:
- wait_fn: A callable acceptable a single float-valued kwarg named
- `timeout`. This function is expected to be one of `threading.Event.wait`
- or `threading.Condition.wait`.
- wait_complete_fn: A callable taking no arguments and returning a bool.
- When this function returns true, it indicates that waiting should cease.
- timeout: An optional float-valued number of seconds after which the wait
- should cease.
- spin_cb: An optional Callable taking no arguments and returning nothing.
- This callback will be called on each iteration of the spin. This may be
- used for, e.g. work related to forking.
-
- Returns:
- True if a timeout was supplied and it was reached. False otherwise.
- """
- if timeout is None:
- while not wait_complete_fn():
- _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
- else:
- end = time.time() + timeout
- while not wait_complete_fn():
- remaining = min(end - time.time(), MAXIMUM_WAIT_TIMEOUT)
- if remaining < 0:
- return True
- _wait_once(wait_fn, remaining, spin_cb)
- return False
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py
index 166cea1..dc0795d 100644
--- a/src/python/grpcio_tests/commands.py
+++ b/src/python/grpcio_tests/commands.py
@@ -145,8 +145,6 @@
'unit._exit_test.ExitTest.test_in_flight_partial_unary_stream_call',
'unit._exit_test.ExitTest.test_in_flight_partial_stream_unary_call',
'unit._exit_test.ExitTest.test_in_flight_partial_stream_stream_call',
- # TODO(https://github.com/grpc/grpc/issues/18980): Reenable.
- 'unit._signal_handling_test.SignalHandlingTest',
'unit._metadata_flags_test',
'health_check._health_servicer_test.HealthServicerTest.test_cancelled_watch_removed_from_watch_list',
# TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index 16ba484..cc08d56 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -67,7 +67,6 @@
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth",
"unit._server_test.ServerTest",
"unit._session_cache_test.SSLSessionCacheTest",
- "unit._signal_handling_test.SignalHandlingTest",
"unit._version_test.VersionTest",
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel
index d21f5a5..a161794 100644
--- a/src/python/grpcio_tests/tests/unit/BUILD.bazel
+++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel
@@ -16,7 +16,6 @@
"_credentials_test.py",
"_dns_resolver_test.py",
"_empty_message_test.py",
- "_error_message_encoding_test.py",
"_exit_test.py",
"_interceptor_test.py",
"_invalid_metadata_test.py",
@@ -28,7 +27,6 @@
# "_reconnect_test.py",
"_resource_exhausted_test.py",
"_rpc_test.py",
- "_signal_handling_test.py",
# TODO(ghostwriternr): To be added later.
# "_server_ssl_cert_config_test.py",
"_server_test.py",
@@ -42,11 +40,6 @@
)
py_library(
- name = "_signal_client",
- srcs = ["_signal_client.py"],
-)
-
-py_library(
name = "resources",
srcs = ["resources.py"],
data=[
@@ -94,7 +87,6 @@
":_server_shutdown_scenarios",
":_from_grpc_import_star",
":_tcp_proxy",
- ":_signal_client",
"//src/python/grpcio_tests/tests/unit/framework/common",
"//src/python/grpcio_tests/tests/testing",
requirement('six'),
diff --git a/src/python/grpcio_tests/tests/unit/_signal_client.py b/src/python/grpcio_tests/tests/unit/_signal_client.py
deleted file mode 100644
index 65ddd6d..0000000
--- a/src/python/grpcio_tests/tests/unit/_signal_client.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Copyright 2019 the gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Client for testing responsiveness to signals."""
-
-from __future__ import print_function
-
-import argparse
-import functools
-import logging
-import signal
-import sys
-
-import grpc
-
-SIGTERM_MESSAGE = "Handling sigterm!"
-
-UNARY_UNARY = "/test/Unary"
-UNARY_STREAM = "/test/ServerStreaming"
-
-_MESSAGE = b'\x00\x00\x00'
-
-_ASSERTION_MESSAGE = "Control flow should never reach here."
-
-# NOTE(gnossen): We use a global variable here so that the signal handler can be
-# installed before the RPC begins. If we do not do this, then we may receive the
-# SIGINT before the signal handler is installed. I'm not happy with per-process
-# global state, but the per-process global state that is signal handlers
-# somewhat forces my hand.
-per_process_rpc_future = None
-
-
-def handle_sigint(unused_signum, unused_frame):
- print(SIGTERM_MESSAGE)
- if per_process_rpc_future is not None:
- per_process_rpc_future.cancel()
- sys.stderr.flush()
- sys.exit(0)
-
-
-def main_unary(server_target):
- """Initiate a unary RPC to be interrupted by a SIGINT."""
- global per_process_rpc_future # pylint: disable=global-statement
- with grpc.insecure_channel(server_target) as channel:
- multicallable = channel.unary_unary(UNARY_UNARY)
- signal.signal(signal.SIGINT, handle_sigint)
- per_process_rpc_future = multicallable.future(
- _MESSAGE, wait_for_ready=True)
- result = per_process_rpc_future.result()
- assert False, _ASSERTION_MESSAGE
-
-
-def main_streaming(server_target):
- """Initiate a streaming RPC to be interrupted by a SIGINT."""
- global per_process_rpc_future # pylint: disable=global-statement
- with grpc.insecure_channel(server_target) as channel:
- signal.signal(signal.SIGINT, handle_sigint)
- per_process_rpc_future = channel.unary_stream(UNARY_STREAM)(
- _MESSAGE, wait_for_ready=True)
- for result in per_process_rpc_future:
- pass
- assert False, _ASSERTION_MESSAGE
-
-
-if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='Signal test client.')
- parser.add_argument('server', help='Server target')
- parser.add_argument(
- 'arity', help='RPC arity', choices=('unary', 'streaming'))
- args = parser.parse_args()
- if args.arity == 'unary':
- main_unary(args.server)
- else:
- main_streaming(args.server)
diff --git a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py
deleted file mode 100644
index 9d0d647..0000000
--- a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py
+++ /dev/null
@@ -1,158 +0,0 @@
-# Copyright 2019 the gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Test of responsiveness to signals."""
-
-from __future__ import print_function
-
-import logging
-import os
-import signal
-import subprocess
-import tempfile
-import threading
-import unittest
-import sys
-
-import grpc
-
-from tests.unit import test_common
-from tests.unit import _signal_client
-
-_CLIENT_PATH = os.path.abspath(os.path.realpath(_signal_client.__file__))
-_HOST = 'localhost'
-
-
-class _GenericHandler(grpc.GenericRpcHandler):
-
- def __init__(self):
- self._connected_clients_lock = threading.RLock()
- self._connected_clients_event = threading.Event()
- self._connected_clients = 0
-
- self._unary_unary_handler = grpc.unary_unary_rpc_method_handler(
- self._handle_unary_unary)
- self._unary_stream_handler = grpc.unary_stream_rpc_method_handler(
- self._handle_unary_stream)
-
- def _on_client_connect(self):
- with self._connected_clients_lock:
- self._connected_clients += 1
- self._connected_clients_event.set()
-
- def _on_client_disconnect(self):
- with self._connected_clients_lock:
- self._connected_clients -= 1
- if self._connected_clients == 0:
- self._connected_clients_event.clear()
-
- def await_connected_client(self):
- """Blocks until a client connects to the server."""
- self._connected_clients_event.wait()
-
- def _handle_unary_unary(self, request, servicer_context):
- """Handles a unary RPC.
-
- Blocks until the client disconnects and then echoes.
- """
- stop_event = threading.Event()
-
- def on_rpc_end():
- self._on_client_disconnect()
- stop_event.set()
-
- servicer_context.add_callback(on_rpc_end)
- self._on_client_connect()
- stop_event.wait()
- return request
-
- def _handle_unary_stream(self, request, servicer_context):
- """Handles a server streaming RPC.
-
- Blocks until the client disconnects and then echoes.
- """
- stop_event = threading.Event()
-
- def on_rpc_end():
- self._on_client_disconnect()
- stop_event.set()
-
- servicer_context.add_callback(on_rpc_end)
- self._on_client_connect()
- stop_event.wait()
- yield request
-
- def service(self, handler_call_details):
- if handler_call_details.method == _signal_client.UNARY_UNARY:
- return self._unary_unary_handler
- elif handler_call_details.method == _signal_client.UNARY_STREAM:
- return self._unary_stream_handler
- else:
- return None
-
-
-def _read_stream(stream):
- stream.seek(0)
- return stream.read()
-
-
-class SignalHandlingTest(unittest.TestCase):
-
- def setUp(self):
- self._server = test_common.test_server()
- self._port = self._server.add_insecure_port('{}:0'.format(_HOST))
- self._handler = _GenericHandler()
- self._server.add_generic_rpc_handlers((self._handler,))
- self._server.start()
-
- def tearDown(self):
- self._server.stop(None)
-
- @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows')
- def testUnary(self):
- """Tests that the server unary code path does not stall signal handlers."""
- server_target = '{}:{}'.format(_HOST, self._port)
- with tempfile.TemporaryFile(mode='r') as client_stdout:
- with tempfile.TemporaryFile(mode='r') as client_stderr:
- client = subprocess.Popen(
- (sys.executable, _CLIENT_PATH, server_target, 'unary'),
- stdout=client_stdout,
- stderr=client_stderr)
- self._handler.await_connected_client()
- client.send_signal(signal.SIGINT)
- self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
- client_stdout.seek(0)
- self.assertIn(_signal_client.SIGTERM_MESSAGE,
- client_stdout.read())
-
- @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows')
- def testStreaming(self):
- """Tests that the server streaming code path does not stall signal handlers."""
- server_target = '{}:{}'.format(_HOST, self._port)
- with tempfile.TemporaryFile(mode='r') as client_stdout:
- with tempfile.TemporaryFile(mode='r') as client_stderr:
- client = subprocess.Popen(
- (sys.executable, _CLIENT_PATH, server_target, 'streaming'),
- stdout=client_stdout,
- stderr=client_stderr)
- self._handler.await_connected_client()
- client.send_signal(signal.SIGINT)
- self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
- client_stdout.seek(0)
- self.assertIn(_signal_client.SIGTERM_MESSAGE,
- client_stdout.read())
-
-
-if __name__ == '__main__':
- logging.basicConfig()
- unittest.main(verbosity=2)
diff --git a/test/core/security/BUILD b/test/core/security/BUILD
index d8dcdc2..835c0ad 100644
--- a/test/core/security/BUILD
+++ b/test/core/security/BUILD
@@ -171,6 +171,7 @@
":oauth2_utils",
"//:gpr",
"//:grpc",
+ "//:grpc++",
"//test/core/util:grpc_test_util",
],
)
diff --git a/test/core/security/fetch_oauth2.cc b/test/core/security/fetch_oauth2.cc
index d404368..1aa9997 100644
--- a/test/core/security/fetch_oauth2.cc
+++ b/test/core/security/fetch_oauth2.cc
@@ -26,53 +26,40 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include "grpcpp/security/credentials_impl.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/util/json_util.h"
+#include "src/cpp/client/secure_credentials.h"
#include "test/core/security/oauth2_utils.h"
#include "test/core/util/cmdline.h"
-static grpc_sts_credentials_options sts_options_from_json(grpc_json* json) {
- grpc_sts_credentials_options options;
- memset(&options, 0, sizeof(options));
- grpc_error* error = GRPC_ERROR_NONE;
- options.sts_endpoint_url =
- grpc_json_get_string_property(json, "sts_endpoint_url", &error);
- GRPC_LOG_IF_ERROR("STS credentials parsing", error);
- options.resource = grpc_json_get_string_property(json, "resource", nullptr);
- options.audience = grpc_json_get_string_property(json, "audience", nullptr);
- options.scope = grpc_json_get_string_property(json, "scope", nullptr);
- options.requested_token_type =
- grpc_json_get_string_property(json, "requested_token_type", nullptr);
- options.subject_token_path =
- grpc_json_get_string_property(json, "subject_token_path", &error);
- GRPC_LOG_IF_ERROR("STS credentials parsing", error);
- options.subject_token_type =
- grpc_json_get_string_property(json, "subject_token_type", &error);
- GRPC_LOG_IF_ERROR("STS credentials parsing", error);
- options.actor_token_path =
- grpc_json_get_string_property(json, "actor_token_path", nullptr);
- options.actor_token_type =
- grpc_json_get_string_property(json, "actor_token_type", nullptr);
- return options;
-}
-
static grpc_call_credentials* create_sts_creds(const char* json_file_path) {
- grpc_slice sts_options_slice;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "load_file", grpc_load_file(json_file_path, 1, &sts_options_slice)));
- grpc_json* json = grpc_json_parse_string(
- reinterpret_cast<char*>(GRPC_SLICE_START_PTR(sts_options_slice)));
- if (json == nullptr) {
- gpr_log(GPR_ERROR, "Invalid json");
- return nullptr;
+ grpc_impl::experimental::StsCredentialsOptions options;
+ if (strlen(json_file_path) == 0) {
+ auto status =
+ grpc_impl::experimental::StsCredentialsOptionsFromEnv(&options);
+ if (!status.ok()) {
+ gpr_log(GPR_ERROR, "%s", status.error_message().c_str());
+ return nullptr;
+ }
+ } else {
+ grpc_slice sts_options_slice;
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "load_file", grpc_load_file(json_file_path, 1, &sts_options_slice)));
+ auto status = grpc_impl::experimental::StsCredentialsOptionsFromJson(
+ reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(sts_options_slice)),
+ &options);
+ gpr_slice_unref(sts_options_slice);
+ if (!status.ok()) {
+ gpr_log(GPR_ERROR, "%s", status.error_message().c_str());
+ return nullptr;
+ }
}
- grpc_sts_credentials_options options = sts_options_from_json(json);
- grpc_call_credentials* result =
- grpc_sts_credentials_create(&options, nullptr);
- grpc_json_destroy(json);
- gpr_slice_unref(sts_options_slice);
+ grpc_sts_credentials_options opts =
+ grpc_impl::experimental::StsCredentialsCppToCoreOptions(options);
+ grpc_call_credentials* result = grpc_sts_credentials_create(&opts, nullptr);
return result;
}
@@ -99,9 +86,12 @@
gpr_cmdline_add_string(cl, "json_refresh_token",
"File path of the json refresh token.",
&json_refresh_token_file_path);
- gpr_cmdline_add_string(cl, "json_sts_options",
- "File path of the json sts options.",
- &json_sts_options_file_path);
+ gpr_cmdline_add_string(
+ cl, "json_sts_options",
+ "File path of the json sts options. If the path is empty, the program "
+ "will attempt to use the $STS_CREDENTIALS environment variable to access "
+ "a file containing the options.",
+ &json_sts_options_file_path);
gpr_cmdline_add_flag(
cl, "gce",
"Get a token from the GCE metadata server (only works in GCE).",
diff --git a/test/cpp/client/credentials_test.cc b/test/cpp/client/credentials_test.cc
index e64e260..d560fb6 100644
--- a/test/cpp/client/credentials_test.cc
+++ b/test/cpp/client/credentials_test.cc
@@ -20,9 +20,14 @@
#include <memory>
+#include <gmock/gmock.h>
#include <grpc/grpc.h>
#include <gtest/gtest.h>
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/tmpfile.h"
+#include "src/cpp/client/secure_credentials.h"
+
namespace grpc {
namespace testing {
@@ -39,6 +44,158 @@
auto creds = GoogleDefaultCredentials();
}
+TEST_F(CredentialsTest, StsCredentialsOptionsCppToCore) {
+ grpc::experimental::StsCredentialsOptions options;
+ options.token_exchange_service_uri = "https://foo.com/exchange";
+ options.resource = "resource";
+ options.audience = "audience";
+ options.scope = "scope";
+ // options.requested_token_type explicitly not set.
+ options.subject_token_path = "/foo/bar";
+ options.subject_token_type = "nice_token_type";
+ options.actor_token_path = "/foo/baz";
+ options.actor_token_type = "even_nicer_token_type";
+ grpc_sts_credentials_options core_opts =
+ grpc_impl::experimental::StsCredentialsCppToCoreOptions(options);
+ EXPECT_EQ(options.token_exchange_service_uri,
+ core_opts.token_exchange_service_uri);
+ EXPECT_EQ(options.resource, core_opts.resource);
+ EXPECT_EQ(options.audience, core_opts.audience);
+ EXPECT_EQ(options.scope, core_opts.scope);
+ EXPECT_EQ(options.requested_token_type, core_opts.requested_token_type);
+ EXPECT_EQ(options.subject_token_path, core_opts.subject_token_path);
+ EXPECT_EQ(options.subject_token_type, core_opts.subject_token_type);
+ EXPECT_EQ(options.actor_token_path, core_opts.actor_token_path);
+ EXPECT_EQ(options.actor_token_type, core_opts.actor_token_type);
+}
+
+TEST_F(CredentialsTest, StsCredentialsOptionsJson) {
+ const char valid_json[] = R"(
+ {
+ "token_exchange_service_uri": "https://foo/exchange",
+ "resource": "resource",
+ "audience": "audience",
+ "scope": "scope",
+ "requested_token_type": "requested_token_type",
+ "subject_token_path": "subject_token_path",
+ "subject_token_type": "subject_token_type",
+ "actor_token_path": "actor_token_path",
+ "actor_token_type": "actor_token_type"
+ })";
+ grpc::experimental::StsCredentialsOptions options;
+ EXPECT_TRUE(
+ grpc::experimental::StsCredentialsOptionsFromJson(valid_json, &options)
+ .ok());
+ EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange");
+ EXPECT_EQ(options.resource, "resource");
+ EXPECT_EQ(options.audience, "audience");
+ EXPECT_EQ(options.scope, "scope");
+ EXPECT_EQ(options.requested_token_type, "requested_token_type");
+ EXPECT_EQ(options.subject_token_path, "subject_token_path");
+ EXPECT_EQ(options.subject_token_type, "subject_token_type");
+ EXPECT_EQ(options.actor_token_path, "actor_token_path");
+ EXPECT_EQ(options.actor_token_type, "actor_token_type");
+
+ const char minimum_valid_json[] = R"(
+ {
+ "token_exchange_service_uri": "https://foo/exchange",
+ "subject_token_path": "subject_token_path",
+ "subject_token_type": "subject_token_type"
+ })";
+ EXPECT_TRUE(grpc::experimental::StsCredentialsOptionsFromJson(
+ minimum_valid_json, &options)
+ .ok());
+ EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange");
+ EXPECT_EQ(options.resource, "");
+ EXPECT_EQ(options.audience, "");
+ EXPECT_EQ(options.scope, "");
+ EXPECT_EQ(options.requested_token_type, "");
+ EXPECT_EQ(options.subject_token_path, "subject_token_path");
+ EXPECT_EQ(options.subject_token_type, "subject_token_type");
+ EXPECT_EQ(options.actor_token_path, "");
+ EXPECT_EQ(options.actor_token_type, "");
+
+ const char invalid_json[] = R"(
+ I'm not a valid JSON.
+ )";
+ EXPECT_EQ(
+ grpc::INVALID_ARGUMENT,
+ grpc::experimental::StsCredentialsOptionsFromJson(invalid_json, &options)
+ .error_code());
+
+ const char invalid_json_missing_subject_token_type[] = R"(
+ {
+ "token_exchange_service_uri": "https://foo/exchange",
+ "subject_token_path": "subject_token_path"
+ })";
+ auto status = grpc::experimental::StsCredentialsOptionsFromJson(
+ invalid_json_missing_subject_token_type, &options);
+ EXPECT_EQ(grpc::INVALID_ARGUMENT, status.error_code());
+ EXPECT_THAT(status.error_message(),
+ ::testing::HasSubstr("subject_token_type"));
+
+ const char invalid_json_missing_subject_token_path[] = R"(
+ {
+ "token_exchange_service_uri": "https://foo/exchange",
+ "subject_token_type": "subject_token_type"
+ })";
+ status = grpc::experimental::StsCredentialsOptionsFromJson(
+ invalid_json_missing_subject_token_path, &options);
+ EXPECT_EQ(grpc::INVALID_ARGUMENT, status.error_code());
+ EXPECT_THAT(status.error_message(),
+ ::testing::HasSubstr("subject_token_path"));
+
+ const char invalid_json_missing_token_exchange_uri[] = R"(
+ {
+ "subject_token_path": "subject_token_path",
+ "subject_token_type": "subject_token_type"
+ })";
+ status = grpc::experimental::StsCredentialsOptionsFromJson(
+ invalid_json_missing_token_exchange_uri, &options);
+ EXPECT_EQ(grpc::INVALID_ARGUMENT, status.error_code());
+ EXPECT_THAT(status.error_message(),
+ ::testing::HasSubstr("token_exchange_service_uri"));
+}
+
+TEST_F(CredentialsTest, StsCredentialsOptionsFromEnv) {
+ // Unset env and check expected failure.
+ gpr_unsetenv("STS_CREDENTIALS");
+ grpc::experimental::StsCredentialsOptions options;
+ auto status = grpc::experimental::StsCredentialsOptionsFromEnv(&options);
+ EXPECT_EQ(grpc::NOT_FOUND, status.error_code());
+
+ // Set env and check for success.
+ const char valid_json[] = R"(
+ {
+ "token_exchange_service_uri": "https://foo/exchange",
+ "subject_token_path": "subject_token_path",
+ "subject_token_type": "subject_token_type"
+ })";
+ char* creds_file_name;
+ FILE* creds_file = gpr_tmpfile("sts_creds_options", &creds_file_name);
+ ASSERT_NE(creds_file_name, nullptr);
+ ASSERT_NE(creds_file, nullptr);
+ ASSERT_EQ(sizeof(valid_json),
+ fwrite(valid_json, 1, sizeof(valid_json), creds_file));
+ fclose(creds_file);
+ gpr_setenv("STS_CREDENTIALS", creds_file_name);
+ gpr_free(creds_file_name);
+ status = grpc::experimental::StsCredentialsOptionsFromEnv(&options);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(options.token_exchange_service_uri, "https://foo/exchange");
+ EXPECT_EQ(options.resource, "");
+ EXPECT_EQ(options.audience, "");
+ EXPECT_EQ(options.scope, "");
+ EXPECT_EQ(options.requested_token_type, "");
+ EXPECT_EQ(options.subject_token_path, "subject_token_path");
+ EXPECT_EQ(options.subject_token_type, "subject_token_type");
+ EXPECT_EQ(options.actor_token_path, "");
+ EXPECT_EQ(options.actor_token_type, "");
+
+ // Cleanup.
+ gpr_unsetenv("STS_CREDENTIALS");
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index d499f13..8fc0eac 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -133,6 +133,59 @@
std::set<grpc::string> clients_;
};
+class FakeResolverResponseGeneratorWrapper {
+ public:
+ FakeResolverResponseGeneratorWrapper()
+ : response_generator_(grpc_core::MakeRefCounted<
+ grpc_core::FakeResolverResponseGenerator>()) {}
+
+ FakeResolverResponseGeneratorWrapper(
+ FakeResolverResponseGeneratorWrapper&& other) {
+ response_generator_ = std::move(other.response_generator_);
+ }
+
+ void SetNextResolution(const std::vector<int>& ports) {
+ grpc_core::ExecCtx exec_ctx;
+ response_generator_->SetResponse(BuildFakeResults(ports));
+ }
+
+ void SetNextResolutionUponError(const std::vector<int>& ports) {
+ grpc_core::ExecCtx exec_ctx;
+ response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
+ }
+
+ void SetFailureOnReresolution() {
+ grpc_core::ExecCtx exec_ctx;
+ response_generator_->SetFailureOnReresolution();
+ }
+
+ grpc_core::FakeResolverResponseGenerator* Get() const {
+ return response_generator_.get();
+ }
+
+ private:
+ static grpc_core::Resolver::Result BuildFakeResults(
+ const std::vector<int>& ports) {
+ grpc_core::Resolver::Result result;
+ for (const int& port : ports) {
+ char* lb_uri_str;
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
+ grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
+ GPR_ASSERT(lb_uri != nullptr);
+ grpc_resolved_address address;
+ GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
+ result.addresses.emplace_back(address.addr, address.len,
+ nullptr /* args */);
+ grpc_uri_destroy(lb_uri);
+ gpr_free(lb_uri_str);
+ }
+ return result;
+ }
+
+ grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
+ response_generator_;
+};
+
class ClientLbEnd2endTest : public ::testing::Test {
protected:
ClientLbEnd2endTest()
@@ -147,11 +200,7 @@
GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
}
- void SetUp() override {
- grpc_init();
- response_generator_ =
- grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
- }
+ void SetUp() override { grpc_init(); }
void TearDown() override {
for (size_t i = 0; i < servers_.size(); ++i) {
@@ -186,38 +235,6 @@
}
}
- grpc_core::Resolver::Result BuildFakeResults(const std::vector<int>& ports) {
- grpc_core::Resolver::Result result;
- for (const int& port : ports) {
- char* lb_uri_str;
- gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
- grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
- GPR_ASSERT(lb_uri != nullptr);
- grpc_resolved_address address;
- GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
- result.addresses.emplace_back(address.addr, address.len,
- nullptr /* args */);
- grpc_uri_destroy(lb_uri);
- gpr_free(lb_uri_str);
- }
- return result;
- }
-
- void SetNextResolution(const std::vector<int>& ports) {
- grpc_core::ExecCtx exec_ctx;
- response_generator_->SetResponse(BuildFakeResults(ports));
- }
-
- void SetNextResolutionUponError(const std::vector<int>& ports) {
- grpc_core::ExecCtx exec_ctx;
- response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
- }
-
- void SetFailureOnReresolution() {
- grpc_core::ExecCtx exec_ctx;
- response_generator_->SetFailureOnReresolution();
- }
-
std::vector<int> GetServersPorts(size_t start_index = 0) {
std::vector<int> ports;
for (size_t i = start_index; i < servers_.size(); ++i) {
@@ -226,6 +243,10 @@
return ports;
}
+ FakeResolverResponseGeneratorWrapper BuildResolverResponseGenerator() {
+ return FakeResolverResponseGeneratorWrapper();
+ }
+
std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
const std::shared_ptr<Channel>& channel) {
return grpc::testing::EchoTestService::NewStub(channel);
@@ -233,12 +254,13 @@
std::shared_ptr<Channel> BuildChannel(
const grpc::string& lb_policy_name,
+ const FakeResolverResponseGeneratorWrapper& response_generator,
ChannelArguments args = ChannelArguments()) {
if (lb_policy_name.size() > 0) {
args.SetLoadBalancingPolicyName(lb_policy_name);
} // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
- response_generator_.get());
+ response_generator.Get());
return ::grpc::CreateCustomChannel("fake:///", creds_, args);
}
@@ -401,8 +423,6 @@
const grpc::string server_host_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerData>> servers_;
- grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
- response_generator_;
const grpc::string kRequestMessage_;
std::shared_ptr<ChannelCredentials> creds_;
};
@@ -410,7 +430,8 @@
TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel("");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("", response_generator);
auto stub = BuildStub(channel);
// Initial state should be IDLE.
EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
@@ -423,7 +444,7 @@
EXPECT_EQ(channel->GetState(false /* try_to_connect */),
GRPC_CHANNEL_CONNECTING);
// Return a resolver result, which allows the connection attempt to proceed.
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
// We should eventually transition into state READY.
EXPECT_TRUE(WaitForChannelReady(channel.get()));
}
@@ -432,9 +453,11 @@
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel(""); // test that pick first is the default.
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel(
+ "", response_generator); // test that pick first is the default.
auto stub = BuildStub(channel);
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
@@ -454,19 +477,22 @@
}
TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
- StartServers(1); // Single server
- auto channel = BuildChannel(""); // test that pick first is the default.
+ StartServers(1); // Single server
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel(
+ "", response_generator); // test that pick first is the default.
auto stub = BuildStub(channel);
- SetNextResolution({servers_[0]->port_});
+ response_generator.SetNextResolution({servers_[0]->port_});
WaitForServer(stub, 0, DEBUG_LOCATION);
// Create a new channel and its corresponding PF LB policy, which will pick
// the subchannels in READY state from the previous RPC against the same
// target (even if it happened over a different channel, because subchannels
// are globally reused). Progress should happen without any transition from
// this READY state.
- auto second_channel = BuildChannel("");
+ auto second_response_generator = BuildResolverResponseGenerator();
+ auto second_channel = BuildChannel("", second_response_generator);
auto second_stub = BuildStub(second_channel);
- SetNextResolution({servers_[0]->port_});
+ second_response_generator.SetNextResolution({servers_[0]->port_});
CheckRpcSendOk(second_stub, DEBUG_LOCATION);
}
@@ -479,16 +505,18 @@
grpc_pick_unused_port_or_die()};
CreateServers(2, ports);
StartServer(1);
- auto channel1 = BuildChannel("pick_first", args);
+ auto response_generator1 = BuildResolverResponseGenerator();
+ auto channel1 = BuildChannel("pick_first", response_generator1, args);
auto stub1 = BuildStub(channel1);
- SetNextResolution(ports);
+ response_generator1.SetNextResolution(ports);
// Wait for second server to be ready.
WaitForServer(stub1, 1, DEBUG_LOCATION);
// Create a second channel with the same addresses. Its PF instance
// should immediately pick the second subchannel, since it's already
// in READY state.
- auto channel2 = BuildChannel("pick_first", args);
- SetNextResolution(ports);
+ auto response_generator2 = BuildResolverResponseGenerator();
+ auto channel2 = BuildChannel("pick_first", response_generator2, args);
+ response_generator2.SetNextResolution(ports);
// Check that the channel reports READY without waiting for the
// initial backoff.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
@@ -500,9 +528,10 @@
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
- auto channel = BuildChannel("pick_first", args);
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator, args);
auto stub = BuildStub(channel);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
// The channel won't become connected (there's no server).
ASSERT_FALSE(channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
@@ -529,9 +558,10 @@
constexpr int kMinReconnectBackOffMs = 1000;
args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
- auto channel = BuildChannel("pick_first", args);
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator, args);
auto stub = BuildStub(channel);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
// Make connection delay a 10% longer than it's willing to in order to make
// sure we are hitting the codepath that waits for the min reconnect backoff.
gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
@@ -554,9 +584,10 @@
constexpr int kInitialBackOffMs = 1000;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
- auto channel = BuildChannel("pick_first", args);
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator, args);
auto stub = BuildStub(channel);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
// The channel won't become connected (there's no server).
EXPECT_FALSE(
channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
@@ -585,9 +616,10 @@
constexpr int kInitialBackOffMs = 1000;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
- auto channel = BuildChannel("pick_first", args);
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator, args);
auto stub = BuildStub(channel);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
// Wait for connect, which should fail ~immediately, because the server
// is not up.
gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT");
@@ -628,21 +660,22 @@
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel("pick_first");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator);
auto stub = BuildStub(channel);
std::vector<int> ports;
// Perform one RPC against the first server.
ports.emplace_back(servers_[0]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// An empty update will result in the channel going into TRANSIENT_FAILURE.
ports.clear();
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET none *******");
grpc_connectivity_state channel_state;
do {
@@ -654,7 +687,7 @@
// Next update introduces servers_[1], making the channel recover.
ports.clear();
ports.emplace_back(servers_[1]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [1] *******");
WaitForServer(stub, 1, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
@@ -662,7 +695,7 @@
// And again for servers_[2]
ports.clear();
ports.emplace_back(servers_[2]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [2] *******");
WaitForServer(stub, 2, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
@@ -676,14 +709,15 @@
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel("pick_first");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator);
auto stub = BuildStub(channel);
std::vector<int> ports;
// Perform one RPC against the first server.
ports.emplace_back(servers_[0]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
@@ -693,7 +727,7 @@
ports.clear();
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[0]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET superset *******");
CheckRpcSendOk(stub, DEBUG_LOCATION);
// We stick to the previously connected server.
@@ -710,12 +744,14 @@
StartServers(kNumServers);
std::vector<int> ports = GetServersPorts();
// Create two channels that (by default) use the global subchannel pool.
- auto channel1 = BuildChannel("pick_first");
+ auto response_generator1 = BuildResolverResponseGenerator();
+ auto channel1 = BuildChannel("pick_first", response_generator1);
auto stub1 = BuildStub(channel1);
- SetNextResolution(ports);
- auto channel2 = BuildChannel("pick_first");
+ response_generator1.SetNextResolution(ports);
+ auto response_generator2 = BuildResolverResponseGenerator();
+ auto channel2 = BuildChannel("pick_first", response_generator2);
auto stub2 = BuildStub(channel2);
- SetNextResolution(ports);
+ response_generator2.SetNextResolution(ports);
WaitForServer(stub1, 0, DEBUG_LOCATION);
// Send one RPC on each channel.
CheckRpcSendOk(stub1, DEBUG_LOCATION);
@@ -735,12 +771,14 @@
// Create two channels that use local subchannel pool.
ChannelArguments args;
args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
- auto channel1 = BuildChannel("pick_first", args);
+ auto response_generator1 = BuildResolverResponseGenerator();
+ auto channel1 = BuildChannel("pick_first", response_generator1, args);
auto stub1 = BuildStub(channel1);
- SetNextResolution(ports);
- auto channel2 = BuildChannel("pick_first", args);
+ response_generator1.SetNextResolution(ports);
+ auto response_generator2 = BuildResolverResponseGenerator();
+ auto channel2 = BuildChannel("pick_first", response_generator2, args);
auto stub2 = BuildStub(channel2);
- SetNextResolution(ports);
+ response_generator2.SetNextResolution(ports);
WaitForServer(stub1, 0, DEBUG_LOCATION);
// Send one RPC on each channel.
CheckRpcSendOk(stub1, DEBUG_LOCATION);
@@ -756,13 +794,14 @@
const int kNumUpdates = 1000;
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel("pick_first");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator);
auto stub = BuildStub(channel);
std::vector<int> ports = GetServersPorts();
for (size_t i = 0; i < kNumUpdates; ++i) {
std::shuffle(ports.begin(), ports.end(),
std::mt19937(std::random_device()()));
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
// We should re-enter core at the end of the loop to give the resolution
// setting closure a chance to run.
if ((i + 1) % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -784,16 +823,17 @@
dead_ports.emplace_back(grpc_pick_unused_port_or_die());
}
}
- auto channel = BuildChannel("pick_first");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator);
auto stub = BuildStub(channel);
// The initial resolution only contains dead ports. There won't be any
// selected subchannel. Re-resolution will return the same result.
- SetNextResolution(dead_ports);
+ response_generator.SetNextResolution(dead_ports);
gpr_log(GPR_INFO, "****** INITIAL RESOLUTION SET *******");
for (size_t i = 0; i < 10; ++i) CheckRpcSendFailure(stub);
// Set a re-resolution result that contains reachable ports, so that the
// pick_first LB policy can recover soon.
- SetNextResolutionUponError(alive_ports);
+ response_generator.SetNextResolutionUponError(alive_ports);
gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******");
WaitForServer(stub, 0, DEBUG_LOCATION, true /* ignore_failure */);
CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -805,9 +845,10 @@
TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);
- auto channel = BuildChannel("pick_first");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator);
auto stub = BuildStub(channel);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 0, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
@@ -824,9 +865,10 @@
grpc_pick_unused_port_or_die()};
CreateServers(2, ports);
StartServer(1);
- auto channel = BuildChannel("pick_first");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("pick_first", response_generator);
auto stub = BuildStub(channel);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 1, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
@@ -840,9 +882,10 @@
TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);
- auto channel_1 = BuildChannel("pick_first");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel_1 = BuildChannel("pick_first", response_generator);
auto stub_1 = BuildStub(channel_1);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
WaitForServer(stub_1, 0, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
@@ -851,13 +894,10 @@
// create a new subchannel and hold a ref to it.
StartServers(1, ports);
gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
- auto channel_2 = BuildChannel("pick_first");
+ auto response_generator_2 = BuildResolverResponseGenerator();
+ auto channel_2 = BuildChannel("pick_first", response_generator_2);
auto stub_2 = BuildStub(channel_2);
- // TODO(juanlishen): This resolution result will only be visible to channel 2
- // since the response generator is only associated with channel 2 now. We
- // should change the response generator to be able to deliver updates to
- // multiple channels at once.
- SetNextResolution(ports);
+ response_generator_2.SetNextResolution(ports);
gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
@@ -883,13 +923,15 @@
// Start server, send RPC, and make sure channel is READY.
const int kNumServers = 1;
StartServers(kNumServers);
- auto channel = BuildChannel(""); // pick_first is the default.
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel =
+ BuildChannel("", response_generator); // pick_first is the default.
auto stub = BuildStub(channel);
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// Stop server. Channel should go into state IDLE.
- SetFailureOnReresolution();
+ response_generator.SetFailureOnReresolution();
servers_[0]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
@@ -897,14 +939,16 @@
}
TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
- auto channel = BuildChannel(""); // pick_first is the default.
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel =
+ BuildChannel("", response_generator); // pick_first is the default.
auto stub = BuildStub(channel);
// Create a number of servers, but only start 1 of them.
CreateServers(10);
StartServer(0);
// Initially resolve to first server and make sure it connects.
gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
- SetNextResolution({servers_[0]->port_});
+ response_generator.SetNextResolution({servers_[0]->port_});
CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// Send a resolution update with the remaining servers, none of which are
@@ -916,7 +960,7 @@
gpr_log(GPR_INFO,
"Phase 2: Resolver update pointing to remaining "
"(not started) servers.");
- SetNextResolution(GetServersPorts(1 /* start_index */));
+ response_generator.SetNextResolution(GetServersPorts(1 /* start_index */));
// RPCs will continue to be sent to the first server.
CheckRpcSendOk(stub, DEBUG_LOCATION);
// Now stop the first server, so that the current subchannel list
@@ -947,9 +991,11 @@
// Start server, send RPC, and make sure channel is READY.
const int kNumServers = 1;
StartServers(kNumServers);
- auto channel = BuildChannel(""); // pick_first is the default.
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel =
+ BuildChannel("", response_generator); // pick_first is the default.
auto stub = BuildStub(channel);
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
// Stop server. Channel should go into state IDLE.
@@ -958,13 +1004,13 @@
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
// Now send resolver update that includes no addresses. Channel
// should stay in state IDLE.
- SetNextResolution({});
+ response_generator.SetNextResolution({});
EXPECT_FALSE(channel->WaitForStateChange(
GRPC_CHANNEL_IDLE, grpc_timeout_seconds_to_deadline(3)));
// Now bring the backend back up and send a non-empty resolver update,
// and then try to send an RPC. Channel should go back into state READY.
StartServer(0);
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
}
@@ -973,9 +1019,10 @@
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel("round_robin");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
// Wait until all backends are ready.
do {
CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -999,18 +1046,20 @@
TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
StartServers(1); // Single server
- auto channel = BuildChannel("round_robin");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
- SetNextResolution({servers_[0]->port_});
+ response_generator.SetNextResolution({servers_[0]->port_});
WaitForServer(stub, 0, DEBUG_LOCATION);
// Create a new channel and its corresponding RR LB policy, which will pick
// the subchannels in READY state from the previous RPC against the same
// target (even if it happened over a different channel, because subchannels
// are globally reused). Progress should happen without any transition from
// this READY state.
- auto second_channel = BuildChannel("round_robin");
+ auto second_response_generator = BuildResolverResponseGenerator();
+ auto second_channel = BuildChannel("round_robin", second_response_generator);
auto second_stub = BuildStub(second_channel);
- SetNextResolution({servers_[0]->port_});
+ second_response_generator.SetNextResolution({servers_[0]->port_});
CheckRpcSendOk(second_stub, DEBUG_LOCATION);
}
@@ -1018,13 +1067,14 @@
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel("round_robin");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
std::vector<int> ports;
// Start with a single server.
gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
ports.emplace_back(servers_[0]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
// Send RPCs. They should all go servers_[0]
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -1036,7 +1086,7 @@
gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
ports.clear();
ports.emplace_back(servers_[1]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
// Wait until update has been processed, as signaled by the second backend
// receiving a request.
EXPECT_EQ(0, servers_[1]->service_.request_count());
@@ -1050,7 +1100,7 @@
gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
ports.clear();
ports.emplace_back(servers_[2]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
WaitForServer(stub, 2, DEBUG_LOCATION);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[0]->service_.request_count());
@@ -1063,7 +1113,7 @@
ports.emplace_back(servers_[0]->port_);
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION);
@@ -1075,7 +1125,7 @@
// An empty update will result in the channel going into TRANSIENT_FAILURE.
gpr_log(GPR_INFO, "*** NO BACKENDS ***");
ports.clear();
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
grpc_connectivity_state channel_state;
do {
channel_state = channel->GetState(true /* try to connect */);
@@ -1086,7 +1136,7 @@
gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
ports.clear();
ports.emplace_back(servers_[1]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
WaitForServer(stub, 1, DEBUG_LOCATION);
channel_state = channel->GetState(false /* try to connect */);
ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
@@ -1097,13 +1147,14 @@
TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel("round_robin");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
std::vector<int> ports;
// Start with a single server.
ports.emplace_back(servers_[0]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
// Send RPCs. They should all go to servers_[0]
for (size_t i = 0; i < 10; ++i) SendRpc(stub);
@@ -1116,7 +1167,7 @@
servers_[1]->Shutdown();
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION);
@@ -1130,13 +1181,14 @@
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- auto channel = BuildChannel("round_robin");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
std::vector<int> ports = GetServersPorts();
for (size_t i = 0; i < 1000; ++i) {
std::shuffle(ports.begin(), ports.end(),
std::mt19937(std::random_device()()));
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// Check LB policy name for the channel.
@@ -1162,9 +1214,10 @@
second_ports.push_back(grpc_pick_unused_port_or_die());
}
StartServers(kNumServers, first_ports);
- auto channel = BuildChannel("round_robin");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
- SetNextResolution(first_ports);
+ response_generator.SetNextResolution(first_ports);
// Send a number of RPCs, which succeed.
for (size_t i = 0; i < 100; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
@@ -1188,7 +1241,7 @@
StartServers(kNumServers, second_ports);
// Don't notify of the update. Wait for the LB policy's re-resolution to
// "pull" the new ports.
- SetNextResolutionUponError(second_ports);
+ response_generator.SetNextResolutionUponError(second_ports);
gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
// Client request should eventually (but still fairly soon) succeed.
@@ -1205,9 +1258,10 @@
const int kNumServers = 3;
StartServers(kNumServers);
const auto ports = GetServersPorts();
- auto channel = BuildChannel("round_robin");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
- SetNextResolution(ports);
+ response_generator.SetNextResolution(ports);
for (size_t i = 0; i < kNumServers; ++i) {
WaitForServer(stub, i, DEBUG_LOCATION);
}
@@ -1251,9 +1305,10 @@
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name\"}}");
- auto channel = BuildChannel("round_robin", args);
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator, args);
auto stub = BuildStub(channel);
- SetNextResolution({servers_[0]->port_});
+ response_generator.SetNextResolution({servers_[0]->port_});
EXPECT_TRUE(WaitForChannelReady(channel.get()));
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
@@ -1267,9 +1322,10 @@
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name\"}}");
- auto channel = BuildChannel("round_robin", args);
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel = BuildChannel("round_robin", response_generator, args);
auto stub = BuildStub(channel);
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
// Channel should not become READY, because health checks should be failing.
gpr_log(GPR_INFO,
"*** initial state: unknown health check service name for "
@@ -1341,15 +1397,17 @@
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name\"}}");
- auto channel1 = BuildChannel("round_robin", args);
+ auto response_generator1 = BuildResolverResponseGenerator();
+ auto channel1 = BuildChannel("round_robin", response_generator1, args);
auto stub1 = BuildStub(channel1);
std::vector<int> ports = GetServersPorts();
- SetNextResolution(ports);
+ response_generator1.SetNextResolution(ports);
// Create a channel with health checking enabled but inhibited.
args.SetInt(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
- auto channel2 = BuildChannel("round_robin", args);
+ auto response_generator2 = BuildResolverResponseGenerator();
+ auto channel2 = BuildChannel("round_robin", response_generator2, args);
auto stub2 = BuildStub(channel2);
- SetNextResolution(ports);
+ response_generator2.SetNextResolution(ports);
// First channel should not become READY, because health checks should be
// failing.
EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
@@ -1376,19 +1434,21 @@
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name\"}}");
- auto channel1 = BuildChannel("round_robin", args);
+ auto response_generator1 = BuildResolverResponseGenerator();
+ auto channel1 = BuildChannel("round_robin", response_generator1, args);
auto stub1 = BuildStub(channel1);
std::vector<int> ports = GetServersPorts();
- SetNextResolution(ports);
+ response_generator1.SetNextResolution(ports);
// Create a channel with health-checking enabled with a different
// service name.
ChannelArguments args2;
args2.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name2\"}}");
- auto channel2 = BuildChannel("round_robin", args2);
+ auto response_generator2 = BuildResolverResponseGenerator();
+ auto channel2 = BuildChannel("round_robin", response_generator2, args2);
auto stub2 = BuildStub(channel2);
- SetNextResolution(ports);
+ response_generator2.SetNextResolution(ports);
// Allow health checks from channel 2 to succeed.
servers_[0]->SetServingStatus("health_check_service_name2", true);
// First channel should not become READY, because health checks should be
@@ -1438,9 +1498,11 @@
const int kNumServers = 1;
const int kNumRpcs = 10;
StartServers(kNumServers);
- auto channel = BuildChannel("intercept_trailing_metadata_lb");
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel =
+ BuildChannel("intercept_trailing_metadata_lb", response_generator);
auto stub = BuildStub(channel);
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
for (size_t i = 0; i < kNumRpcs; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
@@ -1470,9 +1532,11 @@
" }\n"
" } ]\n"
"}");
- auto channel = BuildChannel("intercept_trailing_metadata_lb", args);
+ auto response_generator = BuildResolverResponseGenerator();
+ auto channel =
+ BuildChannel("intercept_trailing_metadata_lb", response_generator, args);
auto stub = BuildStub(channel);
- SetNextResolution(GetServersPorts());
+ response_generator.SetNextResolution(GetServersPorts());
for (size_t i = 0; i < kNumRpcs; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
diff --git a/tools/distrib/python/bazel_deps.sh b/tools/distrib/python/bazel_deps.sh
index 5f4ee1d..67896f1 100755
--- a/tools/distrib/python/bazel_deps.sh
+++ b/tools/distrib/python/bazel_deps.sh
@@ -26,6 +26,7 @@
docker build -t bazel_local_img tools/dockerfile/test/sanity
docker run -v "$(realpath .):/src/grpc/:ro" \
-w /src/grpc/third_party/protobuf \
+ --rm=true \
bazel_local_img \
bazel query 'deps('$1')'
fi
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 92b3757..0f560fd 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -1033,22 +1033,6 @@
"headers": [],
"is_filegroup": false,
"language": "c",
- "name": "grpc_fetch_oauth2",
- "src": [
- "test/core/security/fetch_oauth2.cc"
- ],
- "third_party": false,
- "type": "target"
- },
- {
- "deps": [
- "gpr",
- "grpc",
- "grpc_test_util"
- ],
- "headers": [],
- "is_filegroup": false,
- "language": "c",
"name": "grpc_ipv6_loopback_available_test",
"src": [
"test/core/iomgr/grpc_ipv6_loopback_available_test.cc"
@@ -3882,6 +3866,23 @@
"deps": [
"gpr",
"grpc",
+ "grpc++",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "grpc_fetch_oauth2",
+ "src": [
+ "test/core/security/fetch_oauth2.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "grpc",
"grpc_test_util"
],
"headers": [],