blob: 7841bd24d55ab12d08c90ab5d879c560192d9344 [file]
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "test/cpp/end2end/xds/xds_server.h"
#include <deque>
#include <optional>
#include <set>
#include <string>
#include <thread>
#include <vector>
#include "envoy/service/discovery/v3/discovery.pb.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/util/crash.h"
#include "src/core/util/grpc_check.h"
#include "src/core/util/sync.h"
#include "absl/log/log.h"
namespace grpc {
namespace testing {
//
// AdsServiceImpl
//
void AdsServiceImpl::SetResource(google::protobuf::Any resource,
const std::string& type_url,
const std::string& name) {
grpc_core::MutexLock lock(&ads_mu_);
ResourceTypeState& resource_type_state = resource_map_[type_url];
++resource_type_state.resource_type_version;
ResourceState& resource_state = resource_type_state.resource_name_map[name];
resource_state.resource_type_version =
resource_type_state.resource_type_version;
resource_state.resource = std::move(resource);
LOG(INFO) << "ADS[" << debug_label_ << "]: Updating " << type_url
<< " resource " << name << "; resource_type_version now "
<< resource_type_state.resource_type_version;
for (Reactor* reactor : resource_state.subscriptions) {
reactor->MaybeStartWrite(type_url);
}
}
void AdsServiceImpl::UnsetResource(const std::string& type_url,
const std::string& name) {
grpc_core::MutexLock lock(&ads_mu_);
ResourceTypeState& resource_type_state = resource_map_[type_url];
++resource_type_state.resource_type_version;
ResourceState& resource_state = resource_type_state.resource_name_map[name];
resource_state.resource_type_version =
resource_type_state.resource_type_version;
resource_state.resource.reset();
LOG(INFO) << "ADS[" << debug_label_ << "]: Unsetting " << type_url
<< " resource " << name << "; resource_type_version now "
<< resource_type_state.resource_type_version;
for (Reactor* reactor : resource_state.subscriptions) {
reactor->MaybeStartWrite(type_url);
}
}
void AdsServiceImpl::Shutdown() {
{
grpc_core::MutexLock lock(&ads_mu_);
resource_type_response_state_.clear();
}
LOG(INFO) << "ADS[" << debug_label_ << "]: shut down";
}
//
// AdsServiceImpl::Reactor
//
AdsServiceImpl::Reactor::Reactor(
std::shared_ptr<AdsServiceImpl> ads_service_impl,
CallbackServerContext* context)
: ads_service_impl_(std::move(ads_service_impl)), context_(context) {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": StreamAggregatedResources starts";
{
grpc_core::MutexLock lock(&ads_service_impl_->ads_mu_);
if (ads_service_impl_->forced_ads_failure_.has_value()) {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this
<< ": StreamAggregatedResources forcing early failure "
"with status code: "
<< ads_service_impl_->forced_ads_failure_->error_code()
<< ", message: "
<< ads_service_impl_->forced_ads_failure_->error_message();
MaybeFinish(*ads_service_impl_->forced_ads_failure_);
return;
}
if (ads_service_impl_->call_creds_cb_ != nullptr) {
ads_service_impl_->call_creds_cb_(context->client_metadata());
}
}
ads_service_impl_->AddClient(context->peer());
StartRead(&request_);
}
void AdsServiceImpl::Reactor::OnDone() {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": OnDone()";
delete this;
}
void AdsServiceImpl::Reactor::OnCancel() {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": OnCancel()";
// Clean up any subscriptions that were still active when the call
// finished.
{
grpc_core::MutexLock lock(&ads_service_impl_->ads_mu_);
for (auto& [type_url, type_state] : type_state_map_) {
auto& resource_name_map =
ads_service_impl_->resource_map_[type_url].resource_name_map;
for (auto& [resource_name, _] : type_state.subscriptions) {
ResourceState& resource_state = resource_name_map[resource_name];
resource_state.subscriptions.erase(this);
}
}
}
ads_service_impl_->RemoveClient(context_->peer());
MaybeFinish(Status::OK);
}
void AdsServiceImpl::Reactor::OnReadDone(bool ok) {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": OnReadDone(" << ok << ")";
if (!ok) return;
grpc_core::MutexLock lock(&ads_service_impl_->ads_mu_);
if (!seen_first_request_) {
if (ads_service_impl_->check_first_request_ != nullptr) {
ads_service_impl_->check_first_request_(request_);
}
seen_first_request_ = true;
}
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": Received request for type " << request_.type_url()
<< " with content " << request_.DebugString();
auto& type_state = type_state_map_[request_.type_url()];
// Check the nonce sent by the client, if any.
// (This will be absent on the first request on a stream.)
if (request_.response_nonce().empty()) {
int client_resource_type_version = 0;
if (!request_.version_info().empty()) {
GRPC_CHECK(absl::SimpleAtoi(request_.version_info(),
&client_resource_type_version));
}
if (ads_service_impl_->check_version_callback_ != nullptr) {
ads_service_impl_->check_version_callback_(request_.type_url(),
client_resource_type_version);
}
} else {
int client_nonce;
GRPC_CHECK(absl::SimpleAtoi(request_.response_nonce(), &client_nonce));
// Check for ACK or NACK.
ResponseState response_state;
if (!request_.has_error_detail()) {
response_state.state = ResponseState::ACKED;
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this
<< ": client ACKed resource_type=" << request_.type_url()
<< " version=" << request_.version_info();
} else {
response_state.state = ResponseState::NACKED;
if (ads_service_impl_->check_nack_status_code_ != nullptr) {
ads_service_impl_->check_nack_status_code_(
static_cast<absl::StatusCode>(request_.error_detail().code()));
}
response_state.error_message = request_.error_detail().message();
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this
<< ": client NACKed resource_type=" << request_.type_url()
<< " version=" << request_.version_info() << ": "
<< response_state.error_message;
}
ads_service_impl_->resource_type_response_state_[request_.type_url()]
.emplace_back(std::move(response_state));
// Ignore requests with stale nonces.
if (client_nonce < type_state.nonce) {
request_.Clear();
StartRead(&request_);
return;
}
}
// Ignore resource types as requested by tests.
if (ads_service_impl_->resource_types_to_ignore_.find(request_.type_url()) !=
ads_service_impl_->resource_types_to_ignore_.end()) {
request_.Clear();
StartRead(&request_);
return;
}
// Get the map of resources for this type.
auto& resource_type_state =
ads_service_impl_->resource_map_[request_.type_url()];
auto& resource_name_map = resource_type_state.resource_name_map;
// Subscribe to any new resource names in the request.
for (const std::string& resource_name : request_.resource_names()) {
auto& resource_state = resource_name_map[resource_name];
if (!type_state.subscriptions.contains(resource_name)) {
type_state.subscriptions.emplace(resource_name, true);
resource_state.subscriptions.emplace(this);
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": subscribe to resource type "
<< request_.type_url() << " name " << resource_name;
}
}
// Unsubscribe from any resource not present in the request.
const absl::flat_hash_set<absl::string_view> resources_in_request(
request_.resource_names().begin(), request_.resource_names().end());
for (auto it = type_state.subscriptions.begin();
it != type_state.subscriptions.end();) {
const std::string& resource_name = it->first;
if (resources_in_request.contains(resource_name)) {
++it;
continue;
}
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": Unsubscribe to type=" << request_.type_url()
<< " name=" << resource_name;
auto it2 = resource_name_map.find(resource_name);
GRPC_CHECK(it2 != resource_name_map.end()) << resource_name;
auto& resource_state = it2->second;
resource_state.subscriptions.erase(this);
if (resource_state.subscriptions.empty() &&
!resource_state.resource.has_value()) {
resource_name_map.erase(it2);
}
type_state.subscriptions.erase(it++);
}
MaybeStartWrite(request_.type_url());
request_.Clear();
StartRead(&request_);
}
void AdsServiceImpl::Reactor::MaybeStartWrite(
const std::string& resource_type) {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": MaybeStartWrite(" << resource_type << ")";
if (write_pending_) {
response_needed_.insert(resource_type);
return;
}
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": Constructing response";
auto& type_state = type_state_map_[resource_type];
auto& resource_type_state = ads_service_impl_->resource_map_[resource_type];
auto& resource_name_map = resource_type_state.resource_name_map;
bool resource_needed_update = false;
for (auto& [resource_name, new_subscription] : type_state.subscriptions) {
auto& resource_state = resource_name_map[resource_name];
bool needs_update =
new_subscription || (type_state.resource_type_version <
resource_type_state.resource_type_version &&
resource_state.resource_type_version <=
resource_type_state.resource_type_version);
new_subscription = false;
if (needs_update) {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": Sending update for: " << resource_name;
resource_needed_update = true;
}
if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl ||
needs_update) {
if (resource_state.resource.has_value()) {
auto* resource = response_.add_resources();
resource->CopyFrom(*resource_state.resource);
if (ads_service_impl_->wrap_resources_) {
envoy::service::discovery::v3::Resource resource_wrapper;
*resource_wrapper.mutable_resource() = std::move(*resource);
resource->PackFrom(resource_wrapper);
}
}
} else {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this
<< ": client does not need update for: " << resource_name;
}
}
if (!resource_needed_update) {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": no resources to send for type=" << resource_type;
response_.Clear();
MaybeStartNextWrite();
return;
}
response_.set_type_url(resource_type);
response_.set_nonce(std::to_string(++type_state.nonce));
response_.set_version_info(
std::to_string(resource_type_state.resource_type_version));
type_state.resource_type_version = resource_type_state.resource_type_version;
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": sending response: " << response_.DebugString();
write_pending_ = true;
StartWrite(&response_);
}
void AdsServiceImpl::Reactor::OnWriteDone(bool ok) {
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": OnWriteDone(" << ok << ")";
grpc_core::MutexLock lock(&ads_service_impl_->ads_mu_);
write_pending_ = false;
response_.Clear();
if (!ok) return;
MaybeStartNextWrite();
}
void AdsServiceImpl::Reactor::MaybeStartNextWrite() {
auto it = response_needed_.begin();
if (it == response_needed_.end()) return;
std::string resource_type = *it;
response_needed_.erase(it);
MaybeStartWrite(resource_type);
}
void AdsServiceImpl::Reactor::MaybeFinish(const grpc::Status& status) {
if (called_finish_.exchange(true)) return;
LOG(INFO) << "ADS[" << ads_service_impl_->debug_label_ << "]: reactor "
<< this << ": calling Finish()";
Finish(status);
}
//
// LrsServiceImpl::ClientStats
//
uint64_t LrsServiceImpl::ClientStats::total_successful_requests() const {
uint64_t sum = 0;
for (auto& [_, stats] : locality_stats_) {
sum += stats.total_successful_requests;
}
return sum;
}
uint64_t LrsServiceImpl::ClientStats::total_requests_in_progress() const {
uint64_t sum = 0;
for (auto& [_, stats] : locality_stats_) {
sum += stats.total_requests_in_progress;
}
return sum;
}
uint64_t LrsServiceImpl::ClientStats::total_error_requests() const {
uint64_t sum = 0;
for (auto& [_, stats] : locality_stats_) {
sum += stats.total_error_requests;
}
return sum;
}
uint64_t LrsServiceImpl::ClientStats::total_issued_requests() const {
uint64_t sum = 0;
for (auto& [_, stats] : locality_stats_) {
sum += stats.total_issued_requests;
}
return sum;
}
uint64_t LrsServiceImpl::ClientStats::dropped_requests(
const std::string& category) const {
auto iter = dropped_requests_.find(category);
GRPC_CHECK(iter != dropped_requests_.end());
return iter->second;
}
LrsServiceImpl::ClientStats& LrsServiceImpl::ClientStats::operator+=(
const ClientStats& other) {
for (const auto& [name, stats] : other.locality_stats_) {
locality_stats_[name] += stats;
}
total_dropped_requests_ += other.total_dropped_requests_;
for (const auto& [category, count] : other.dropped_requests_) {
dropped_requests_[category] += count;
}
return *this;
}
//
// LrsServiceImpl
//
void LrsServiceImpl::Start() {
grpc_core::MutexLock lock(&load_report_mu_);
result_queue_.clear();
}
void LrsServiceImpl::Shutdown() {
LOG(INFO) << "LRS[" << debug_label_ << "]: shut down";
}
std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport(
absl::Duration timeout) {
timeout *= grpc_test_slowdown_factor();
grpc_core::MutexLock lock(&load_report_mu_);
grpc_core::CondVar cv;
if (result_queue_.empty()) {
load_report_cond_ = &cv;
while (result_queue_.empty()) {
if (cv.WaitWithTimeout(&load_report_mu_, timeout)) {
LOG(ERROR) << "timed out waiting for load report";
return {};
}
}
load_report_cond_ = nullptr;
}
std::vector<ClientStats> result = std::move(result_queue_.front());
result_queue_.pop_front();
return result;
}
//
// LrsServiceImpl::Reactor
//
LrsServiceImpl::Reactor::Reactor(
std::shared_ptr<LrsServiceImpl> lrs_service_impl)
: lrs_service_impl_(std::move(lrs_service_impl)) {
LOG(INFO) << "LRS[" << lrs_service_impl_->debug_label_ << "]: reactor "
<< this << ": StreamLoadStats starts";
if (lrs_service_impl_->stream_started_callback_ != nullptr) {
lrs_service_impl_->stream_started_callback_();
}
StartRead(&request_);
}
void LrsServiceImpl::Reactor::OnReadDone(bool ok) {
if (!ok) return;
grpc_core::MutexLock lock(&mu_);
if (finished_) return;
if (!seen_first_request_) {
seen_first_request_ = true;
// Handle initial request.
LOG(INFO) << "LRS[" << lrs_service_impl_->debug_label_ << "]: reactor "
<< this << ": read initial request: " << request_.DebugString();
lrs_service_impl_->IncreaseRequestCount();
if (lrs_service_impl_->check_first_request_ != nullptr) {
lrs_service_impl_->check_first_request_(request_);
}
// Send initial response.
if (lrs_service_impl_->send_all_clusters_) {
response_.set_send_all_clusters(true);
} else {
for (const std::string& cluster_name :
lrs_service_impl_->cluster_names_) {
response_.add_clusters(cluster_name);
}
}
response_.mutable_load_reporting_interval()->set_seconds(
lrs_service_impl_->client_load_reporting_interval_seconds_ *
grpc_test_slowdown_factor());
StartWrite(&response_);
} else {
// Handle load reports.
LOG(INFO) << "LRS[" << lrs_service_impl_->debug_label_ << "]: reactor "
<< this << ": received load report: " << request_.DebugString();
std::vector<ClientStats> stats;
for (const auto& cluster_stats : request_.cluster_stats()) {
stats.emplace_back(cluster_stats);
}
grpc_core::MutexLock lock(&lrs_service_impl_->load_report_mu_);
lrs_service_impl_->result_queue_.emplace_back(std::move(stats));
if (lrs_service_impl_->load_report_cond_ != nullptr) {
lrs_service_impl_->load_report_cond_->Signal();
}
}
StartRead(&request_);
}
void LrsServiceImpl::Reactor::OnWriteDone(bool /*ok*/) {
LOG(INFO) << "LRS[" << lrs_service_impl_->debug_label_ << "]: reactor "
<< this << ": OnWriteDone()";
lrs_service_impl_->IncreaseResponseCount();
}
void LrsServiceImpl::Reactor::OnDone() {
LOG(INFO) << "LRS[" << lrs_service_impl_->debug_label_ << "]: reactor "
<< this << ": OnDone()";
delete this;
}
void LrsServiceImpl::Reactor::OnCancel() {
LOG(INFO) << "LRS[" << lrs_service_impl_->debug_label_ << "]: reactor "
<< this << ": OnCancel()";
grpc_core::MutexLock lock(&mu_);
finished_ = true;
Finish(Status::OK);
}
} // namespace testing
} // namespace grpc