blob: fb0ce89bc923d3cb4bf2be2e9daf0a2cbef2fc3c [file]
// Copyright 2025 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/telemetry/instrument.h"
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <tuple>
#include <utility>
#include <variant>
#include <vector>
#include "src/core/channelz/channelz.h"
#include "src/core/channelz/property_list.h"
#include "src/core/telemetry/histogram.h"
#include "src/core/util/bitset.h"
#include "src/core/util/grpc_check.h"
#include "src/core/util/match.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/single_set_ptr.h"
#include "src/core/util/sync.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/function_ref.h"
#include "absl/hash/hash.h"
#include "absl/log/log.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
namespace grpc_core {
std::atomic<const std::string*>* InstrumentLabel::GetLabels() {
static std::atomic<const std::string*>* static_labels = new std::atomic<
const std::string*>[InstrumentLabel::kMaxLabelsPerProcess] {};
return static_labels;
}
std::string InstrumentLabel::RegistrationDebugString() {
return absl::StrJoin(
absl::MakeSpan(GetLabels(), kMaxLabelsPerProcess), ", ",
[](std::string* s, const std::atomic<const std::string*>& p) {
auto* loaded_p = p.load(std::memory_order_acquire);
if (loaded_p == nullptr) {
s->append("<null>");
} else {
s->append(*loaded_p);
}
});
}
InstrumentLabel::InstrumentLabel(absl::string_view label) {
auto* labels = GetLabels();
std::unique_ptr<std::string> label_copy;
for (size_t i = 0; i < InstrumentLabel::kMaxLabelsPerProcess; ++i) {
auto* current_value = labels[i].load(std::memory_order_acquire);
while (current_value == nullptr) {
if (label_copy == nullptr) {
label_copy = std::make_unique<std::string>(label);
}
if (!labels[i].compare_exchange_weak(current_value, label_copy.get(),
std::memory_order_acq_rel)) {
continue;
}
std::ignore = label_copy.release();
index_ = i;
return;
}
GRPC_CHECK(current_value != nullptr);
if (*current_value == label) {
index_ = i;
return;
}
}
GRPC_CHECK(false) << "Too many instrument labels registered";
}
InstrumentLabelList InstrumentLabelSet::ToList() const {
InstrumentLabelList list;
for (size_t i = 0; i < InstrumentLabel::kMaxLabelsPerProcess; ++i) {
if (set_.is_set(i)) list.Append(InstrumentLabel::FromIndex(i));
}
return list;
}
std::string InstrumentLabelList::DebugString() const {
return absl::StrJoin(absl::MakeSpan(labels_, count_), ", ",
[](std::string* s, const InstrumentLabel& label) {
s->append(label.label());
s->append("[idx=");
s->append(std::to_string(label.index()));
s->append("]");
});
}
namespace {
struct Hook {
HistogramCollectionHook hook;
Hook* next;
};
std::atomic<Hook*> hooks = nullptr;
} // namespace
void RegisterHistogramCollectionHook(HistogramCollectionHook hook) {
Hook* new_hook =
new Hook{std::move(hook), hooks.load(std::memory_order_acquire)};
while (!hooks.compare_exchange_weak(new_hook->next, new_hook,
std::memory_order_acq_rel)) {
}
}
namespace instrument_detail {
void CallHistogramCollectionHooks(
const InstrumentMetadata::Description* instrument,
absl::Span<const std::string> labels, int64_t value) {
Hook* hook = hooks.load(std::memory_order_acquire);
while (GPR_UNLIKELY(hook != nullptr)) {
hook->hook(instrument, labels, value);
hook = hook->next;
}
}
} // namespace instrument_detail
namespace {
std::vector<std::string> FilterLabels(
InstrumentLabelList domain_label_names,
InstrumentLabelSet scope_labels_of_interest,
absl::Span<const std::string> full_label_values) {
std::vector<std::string> result;
result.reserve(domain_label_names.size());
for (size_t i = 0; i < domain_label_names.size(); ++i) {
if (scope_labels_of_interest.contains(domain_label_names[i])) {
result.push_back(full_label_values[i]);
} else {
result.push_back(std::string(kOmittedLabel));
}
}
return result;
}
} // namespace
CollectionScope::CollectionScope(
std::vector<RefCountedPtr<CollectionScope>> parents,
InstrumentLabelSet labels_of_interest, size_t child_shards_count,
size_t storage_shards_count)
: parents_(std::move(parents)),
labels_of_interest_(labels_of_interest),
child_shards_(child_shards_count),
storage_shards_(storage_shards_count) {
// Sort parents (by address) and then remove any duplicates.
std::sort(parents_.begin(), parents_.end());
parents_.erase(std::unique(parents_.begin(), parents_.end()), parents_.end());
for (const auto& parent : parents_) {
if (parent != nullptr) {
labels_of_interest_.Merge(parent->labels_of_interest_);
auto& shard = parent->child_shard(this);
MutexLock lock(&shard.mu);
shard.children.insert(this);
}
}
}
CollectionScope::~CollectionScope() {
for (const auto& parent : parents_) {
if (parent != nullptr) {
auto& shard = parent->child_shard(this);
MutexLock lock(&shard.mu);
shard.children.erase(this);
}
}
for (auto& shard : storage_shards_) {
// TODO(ctiller): Consider a different entry point than GetDomainStorage
// for this post-aggregation. We ought to be able to do this step without
// accessing full_labels.
MutexLock lock(&shard.mu);
for (auto& storage_pair : shard.storage) {
for (const auto& parent : parents_) {
if (parent != nullptr) {
storage_pair.second->domain()
->GetDomainStorage(parent, storage_pair.second->label())
->Add(storage_pair.second.get());
}
}
}
}
}
size_t CollectionScope::TestOnlyCountStorageHeld() const {
size_t count = 0;
for (const auto& shard : storage_shards_) {
MutexLock lock(&shard.mu);
count += shard.storage.size();
}
return count;
}
void CollectionScope::ForEachUniqueStorage(
absl::FunctionRef<void(instrument_detail::DomainStorage*)> cb) {
absl::flat_hash_set<instrument_detail::DomainStorage*> visited;
ForEachUniqueStorage(cb, visited);
}
void CollectionScope::ForEachUniqueStorage(
absl::FunctionRef<void(instrument_detail::DomainStorage*)> cb,
absl::flat_hash_set<instrument_detail::DomainStorage*>& visited) {
for (auto& shard : storage_shards_) {
MutexLock lock(&shard.mu);
for (const auto& s : shard.storage) {
if (visited.insert(s.second.get()).second) {
cb(s.second.get());
}
}
}
for (auto& shard : child_shards_) {
MutexLock lock(&shard.mu);
for (auto* child : shard.children) {
child->ForEachUniqueStorage(cb, visited);
}
}
}
void CollectionScope::TestOnlyReset() {
for (auto& shard : storage_shards_) {
MutexLock lock(&shard.mu);
shard.storage.clear();
}
for (auto& shard : child_shards_) {
MutexLock lock(&shard.mu);
shard.children.clear();
}
}
RefCountedPtr<CollectionScope> CreateCollectionScope(
std::vector<RefCountedPtr<CollectionScope>> parents,
InstrumentLabelSet labels, size_t child_shards_count,
size_t storage_shards_count) {
return MakeRefCounted<CollectionScope>(
std::move(parents), labels, child_shards_count, storage_shards_count);
}
////////////////////////////////////////////////////////////////////////////////
// InstrumentMetadata
void InstrumentMetadata::ForEachInstrument(
absl::FunctionRef<void(const InstrumentMetadata::Description*)> fn) {
instrument_detail::QueryableDomain::ForEachInstrument(fn);
}
/////////////////////////////////////////////////////////////////////////////////
// GaugeStorage
namespace instrument_detail {
GaugeStorage::GaugeStorage(QueryableDomain* domain)
: double_gauges_(domain->allocated_double_gauge_slots()),
int_gauges_(domain->allocated_int_gauge_slots()),
uint_gauges_(domain->allocated_uint_gauge_slots()) {}
} // namespace instrument_detail
////////////////////////////////////////////////////////////////////////////////
// MetricsQuery
MetricsQuery& MetricsQuery::WithLabelEq(absl::string_view label,
std::string value) {
label_eqs_.emplace(label, std::move(value));
return *this;
}
MetricsQuery& MetricsQuery::CollapseLabels(
absl::Span<const InstrumentLabel> labels) {
for (const auto& label : labels) {
collapsed_labels_.Set(label);
}
return *this;
}
MetricsQuery& MetricsQuery::OnlyMetrics(std::vector<std::string> metrics) {
only_metrics_.emplace(std::move(metrics));
return *this;
}
void MetricsQuery::Run(RefCountedPtr<CollectionScope> scope,
MetricsSink& sink) const {
GRPC_CHECK_NE(scope.get(), nullptr);
struct DomainInfo {
std::vector<const InstrumentMetadata::Description*> metrics;
std::vector<RefCountedPtr<instrument_detail::DomainStorage>> storage;
};
absl::flat_hash_map<instrument_detail::QueryableDomain*, DomainInfo>
domain_info_map;
auto selected_metrics = this->selected_metrics();
// Calculate list of desired metrics, per domain.
if (selected_metrics.has_value()) {
for (const auto& metric : *selected_metrics) {
const auto* desc = instrument_detail::InstrumentIndex::Get().Find(metric);
GRPC_CHECK_NE(desc, nullptr) << "Metric not found: " << metric;
domain_info_map[desc->domain].metrics.push_back(desc);
}
} else {
instrument_detail::QueryableDomain::ForEachInstrument(
[&](const InstrumentMetadata::Description* desc) {
domain_info_map[desc->domain].metrics.push_back(desc);
});
}
// Calculate list of storage objects, per domain, that have at least one
// desired metric.
scope->ForEachUniqueStorage([&](instrument_detail::DomainStorage* storage) {
auto it = domain_info_map.find(storage->domain());
if (it == domain_info_map.end()) return;
it->second.storage.push_back(storage->Ref());
});
for (const auto& pair : domain_info_map) {
instrument_detail::QueryableDomain* domain = pair.first;
const auto& metrics = pair.second.metrics;
const auto& storages = pair.second.storage;
GRPC_CHECK(!metrics.empty());
if (storages.empty()) continue;
this->Apply(
domain->label_names(),
[&](MetricsSink& sink) {
for (auto& storage : storages) {
const auto label_values = storage->label();
const auto label_keys = domain->label_names();
instrument_detail::GaugeStorage gauge_storage(storage->domain());
storage->FillGaugeStorage(gauge_storage);
for (const auto* metric : metrics) {
Match(
metric->shape,
[metric, &sink, storage, &label_values,
&label_keys](InstrumentMetadata::CounterShape) {
sink.Counter(label_keys, label_values, metric->name,
storage->SumCounter(metric->offset));
},
[metric, &sink, storage, &label_values,
&label_keys](InstrumentMetadata::UpDownCounterShape) {
sink.UpDownCounter(label_keys, label_values, metric->name,
storage->SumCounter(metric->offset));
},
[metric, &sink, storage, &label_values,
&label_keys](InstrumentMetadata::HistogramShape bounds) {
std::vector<uint64_t> counts(bounds.size());
for (size_t i = 0; i < bounds.size(); ++i) {
counts[i] = storage->SumCounter(metric->offset + i);
}
sink.Histogram(label_keys, label_values, metric->name,
bounds, counts);
},
[metric, &sink, &gauge_storage, &label_values,
&label_keys](InstrumentMetadata::DoubleGaugeShape) {
if (auto value = gauge_storage.GetDouble(metric->offset);
value.has_value()) {
sink.DoubleGauge(label_keys, label_values, metric->name,
*value);
}
},
[metric, &sink, &gauge_storage, &label_values,
&label_keys](InstrumentMetadata::IntGaugeShape) {
if (auto value = gauge_storage.GetInt(metric->offset);
value.has_value()) {
sink.IntGauge(label_keys, label_values, metric->name,
*value);
}
},
[metric, &sink, &gauge_storage, &label_values,
&label_keys](InstrumentMetadata::UintGaugeShape) {
if (auto value = gauge_storage.GetUint(metric->offset);
value.has_value()) {
sink.UintGauge(label_keys, label_values, metric->name,
*value);
}
});
}
}
},
sink);
}
}
void MetricsQuery::Apply(InstrumentLabelList label_names,
absl::FunctionRef<void(MetricsSink&)> fn,
MetricsSink& sink) const {
if (collapsed_labels_.empty()) {
ApplyLabelChecks(label_names, fn, sink);
return;
}
std::vector<size_t> include_labels;
InstrumentLabelList label_keys;
for (size_t i = 0; i < label_names.size(); ++i) {
if (!collapsed_labels_.contains(label_names[i])) {
include_labels.push_back(i);
label_keys.Append(label_names[i]);
}
}
if (include_labels.size() == label_names.size()) {
ApplyLabelChecks(label_names, fn, sink);
return;
}
class Filter final : public MetricsSink {
public:
explicit Filter(absl::Span<const size_t> include_labels,
InstrumentLabelList label_keys)
: include_labels_(include_labels), label_keys_(label_keys) {}
void Counter(InstrumentLabelList /* label_keys */,
absl::Span<const std::string> label_values,
absl::string_view name, uint64_t value) override {
uint64_counters_[ConstructKey(label_values, name)] += value;
}
void UpDownCounter(InstrumentLabelList /* label_keys */,
absl::Span<const std::string> label_values,
absl::string_view name, uint64_t value) override {
uint64_up_down_counters_[ConstructKey(label_values, name)] += value;
}
void Histogram(InstrumentLabelList /* label_keys */,
absl::Span<const std::string> label_values,
absl::string_view name, HistogramBuckets bounds,
absl::Span<const uint64_t> counts) override {
GRPC_CHECK_EQ(counts.size(), bounds.size());
auto it = histograms_.find(ConstructKey(label_values, name));
if (it == histograms_.end()) {
histograms_.emplace(std::piecewise_construct,
std::tuple(ConstructKey(label_values, name)),
std::tuple(bounds, counts));
} else {
if (it->second.bounds != bounds) {
LOG(FATAL) << "Histogram bounds mismatch for metric '" << name
<< "': {" << absl::StrJoin(it->second.bounds, ",")
<< "} vs {" << absl::StrJoin(bounds, ",") << "}";
}
for (size_t i = 0; i < counts.size(); ++i) {
it->second.counts[i] += counts[i];
}
}
}
void DoubleGauge(InstrumentLabelList /* label_keys */,
absl::Span<const std::string>, absl::string_view,
double) override {
// Not aggregatable
}
void IntGauge(InstrumentLabelList /* label_keys */,
absl::Span<const std::string>, absl::string_view,
int64_t) override {
// Not aggregatable
}
void UintGauge(InstrumentLabelList /* label_keys */,
absl::Span<const std::string>, absl::string_view,
uint64_t) override {
// Not aggregatable
}
void Publish(MetricsSink& sink) const {
for (const auto& [key, value] : uint64_counters_) {
sink.Counter(label_keys_, std::get<0>(key), std::get<1>(key), value);
}
for (const auto& [key, value] : uint64_up_down_counters_) {
sink.UpDownCounter(label_keys_, std::get<0>(key), std::get<1>(key),
value);
}
for (const auto& [key, value] : histograms_) {
sink.Histogram(label_keys_, std::get<0>(key), std::get<1>(key),
value.bounds, value.counts);
}
}
private:
std::tuple<std::vector<std::string>, absl::string_view> ConstructKey(
absl::Span<const std::string> label_values,
absl::string_view name) const {
std::vector<std::string> key;
key.reserve(include_labels_.size());
for (auto i : include_labels_) {
key.push_back(label_values[i]);
}
return std::tuple(std::move(key), name);
}
absl::Span<const size_t> include_labels_;
InstrumentLabelList label_keys_;
absl::flat_hash_map<std::tuple<std::vector<std::string>, absl::string_view>,
uint64_t>
uint64_counters_;
absl::flat_hash_map<std::tuple<std::vector<std::string>, absl::string_view>,
uint64_t>
uint64_up_down_counters_;
struct HistogramValue {
HistogramValue(HistogramBuckets bounds, absl::Span<const uint64_t> counts)
: bounds(bounds), counts(counts.begin(), counts.end()) {}
HistogramBuckets bounds;
std::vector<uint64_t> counts;
};
absl::flat_hash_map<std::tuple<std::vector<std::string>, absl::string_view>,
HistogramValue>
histograms_;
};
Filter filter(include_labels, label_keys);
ApplyLabelChecks(label_names, fn, filter);
filter.Publish(sink);
}
void MetricsQuery::ApplyLabelChecks(InstrumentLabelList label_names,
absl::FunctionRef<void(MetricsSink&)> fn,
MetricsSink& sink) const {
if (label_eqs_.empty()) {
fn(sink);
return;
}
struct LabelEq {
size_t offset;
absl::string_view value;
};
std::vector<LabelEq> label_eqs;
for (size_t i = 0; i < label_names.size(); ++i) {
const auto& label = label_names[i];
auto it = label_eqs_.find(label);
if (it != label_eqs_.end()) label_eqs.push_back({i, it->second});
}
// If there are labels to match, but this domain doesn't have all the labels
// requested, skip it - it can never match all!
if (label_eqs.size() < label_eqs_.size()) return;
class Filter final : public MetricsSink {
public:
explicit Filter(MetricsSink& sink,
absl::Span<const LabelEq> inclusion_checks)
: inclusion_checks_(inclusion_checks), sink_(sink) {}
void Counter(InstrumentLabelList label_keys,
absl::Span<const std::string> label_values,
absl::string_view name, uint64_t value) override {
if (!Matches(label_values)) return;
sink_.Counter(label_keys, label_values, name, value);
}
void UpDownCounter(InstrumentLabelList label_keys,
absl::Span<const std::string> label_values,
absl::string_view name, uint64_t value) override {
if (!Matches(label_values)) return;
sink_.UpDownCounter(label_keys, label_values, name, value);
}
void Histogram(InstrumentLabelList label_keys,
absl::Span<const std::string> label_values,
absl::string_view name, HistogramBuckets bounds,
absl::Span<const uint64_t> counts) override {
if (!Matches(label_values)) return;
sink_.Histogram(label_keys, label_values, name, bounds, counts);
}
void DoubleGauge(InstrumentLabelList label_keys,
absl::Span<const std::string> label_values,
absl::string_view name, double value) override {
if (!Matches(label_values)) return;
sink_.DoubleGauge(label_keys, label_values, name, value);
}
void IntGauge(InstrumentLabelList label_keys,
absl::Span<const std::string> label_values,
absl::string_view name, int64_t value) override {
if (!Matches(label_values)) return;
sink_.IntGauge(label_keys, label_values, name, value);
}
void UintGauge(InstrumentLabelList label_keys,
absl::Span<const std::string> label_values,
absl::string_view name, uint64_t value) override {
if (!Matches(label_values)) return;
sink_.UintGauge(label_keys, label_values, name, value);
}
private:
bool Matches(absl::Span<const std::string> label_values) const {
for (const auto& check : inclusion_checks_) {
if (label_values[check.offset] != check.value) return false;
}
return true;
}
absl::Span<const LabelEq> inclusion_checks_;
MetricsSink& sink_;
};
Filter filter(sink, label_eqs);
fn(filter);
}
namespace instrument_detail {
////////////////////////////////////////////////////////////////////////////////
// InstrumentIndex
const InstrumentMetadata::Description* InstrumentIndex::Register(
QueryableDomain* domain, uint64_t offset, absl::string_view name,
absl::string_view description, absl::string_view unit,
InstrumentMetadata::Shape shape) {
auto it = metrics_.emplace(
name, InstrumentMetadata::Description{domain, offset, name, description,
unit, shape});
if (!it.second) {
// If this is firing one of two things is true:
// 1. We have code in gRPC that's registering two different metrics with the
// same name. gRPC should fix this.
// 2. gRPC static initialization is executing twice. This is an unsupported
// use of the gRPC library and the application owner should fix it.
LOG(ERROR) << "Metric with name '" << name
<< "' registered more than once. Ignoring later registration.";
}
return &it.first->second;
}
const InstrumentMetadata::Description* InstrumentIndex::Find(
absl::string_view name) const {
auto it = metrics_.find(name);
if (it == metrics_.end()) {
return nullptr;
}
return &it->second;
}
////////////////////////////////////////////////////////////////////////////////
// DomainStorage
DomainStorage::DomainStorage(QueryableDomain* domain,
std::vector<std::string> label)
: DataSource(MakeRefCounted<channelz::MetricsDomainStorageNode>(
absl::StrCat(domain->name(), ":", absl::StrJoin(label, ",")))),
domain_(domain),
label_(std::move(label)) {
channelz_node()->AddParent(domain->channelz_node().get());
SourceConstructed();
}
void DomainStorage::Orphaned() { SourceDestructing(); }
void DomainStorage::AddData(channelz::DataSink sink) {
sink.AddData(
"domain_storage",
channelz::PropertyList()
.Set("label",
[this]() {
channelz::PropertyGrid grid;
for (size_t i = 0; i < label_.size(); ++i) {
grid.SetRow(
domain_->label_names()[i].label(),
channelz::PropertyList().Set("value", label_[i]));
}
return grid;
}())
.Set("metrics", [this]() {
GaugeStorage storage(domain_);
FillGaugeStorage(storage);
channelz::PropertyGrid grid;
for (const auto* metric : domain_->metrics_) {
Match(
metric->shape,
[&, this](InstrumentMetadata::CounterShape) {
grid.SetRow(metric->name,
channelz::PropertyList().Set(
"value", SumCounter(metric->offset)));
},
[&, this](InstrumentMetadata::UpDownCounterShape) {
grid.SetRow(metric->name,
channelz::PropertyList().Set(
"value", SumCounter(metric->offset)));
},
[&](InstrumentMetadata::DoubleGaugeShape) {
grid.SetRow(
metric->name,
channelz::PropertyList().Set(
"value", storage.GetDouble(metric->offset)));
},
[&](InstrumentMetadata::IntGaugeShape) {
grid.SetRow(metric->name,
channelz::PropertyList().Set(
"value", storage.GetInt(metric->offset)));
},
[&](InstrumentMetadata::UintGaugeShape) {
grid.SetRow(metric->name,
channelz::PropertyList().Set(
"value", storage.GetUint(metric->offset)));
},
[&](const InstrumentMetadata::HistogramShape& h) {
channelz::PropertyTable table;
for (size_t i = 0; i < h.size(); ++i) {
table.AppendRow(
channelz::PropertyList()
.Set("bucket_max", h[i])
.Set("count", SumCounter(metric->offset + i)));
}
grid.SetRow(metric->name, channelz::PropertyList().Set(
"value", std::move(table)));
});
}
return grid;
}()));
}
////////////////////////////////////////////////////////////////////////////////
// QueryableDomain
void QueryableDomain::AddData(channelz::DataSink sink) {
sink.AddData(
"domain",
channelz::PropertyList()
.Set("allocated_counter_slots", allocated_counter_slots_)
.Set("allocated_double_gauge_slots", allocated_double_gauge_slots_)
.Set("allocated_int_gauge_slots", allocated_int_gauge_slots_)
.Set("allocated_uint_gauge_slots", allocated_uint_gauge_slots_)
.Set("map_shards", map_shards_size_)
.Set("metrics",
[this]() {
channelz::PropertyGrid grid;
for (auto* metric : metrics_) {
grid.SetRow(
metric->name,
channelz::PropertyList()
.Set("description", metric->description)
.Set("unit", metric->unit)
.Set("offset", metric->offset)
.Set("shape",
Match(
metric->shape,
[](InstrumentMetadata::CounterShape)
-> std::string { return "counter"; },
[](InstrumentMetadata::UpDownCounterShape)
-> std::string {
return "up_down_counter";
},
[](InstrumentMetadata::DoubleGaugeShape)
-> std::string {
return "double_gauge";
},
[](InstrumentMetadata::IntGaugeShape)
-> std::string { return "int_gauge"; },
[](InstrumentMetadata::UintGaugeShape)
-> std::string { return "uint_gauge"; },
[](const InstrumentMetadata::HistogramShape&
h) -> std::string {
return absl::StrCat(
"histogram:", absl::StrJoin(h, ","));
})));
}
return grid;
}())
.Set("labels", absl::StrJoin(label_names_, ",")));
}
void QueryableDomain::Constructed() {
GRPC_CHECK_EQ(prev_, nullptr);
prev_ = last_;
last_ = this;
}
void QueryableDomain::ForEachInstrument(
absl::FunctionRef<void(const InstrumentMetadata::Description*)> fn) {
for (auto* domain = last_; domain != nullptr; domain = domain->prev_) {
for (const auto* metric : domain->metrics_) {
fn(metric);
}
}
}
size_t QueryableDomain::TestOnlyCountStorageHeld() const {
size_t count = 0;
for (size_t i = 0; i < map_shards_size_; ++i) {
MutexLock lock(&map_shards_[i].mu);
map_shards_[i].storage_map.ForEach(
[&count](const auto&, const auto&) { count++; });
}
return count;
}
void QueryableDomain::DomainStorageOrphaned(DomainStorage* /*storage*/) {}
QueryableDomain::MapShard& QueryableDomain::GetMapShard(
absl::Span<const std::string> label) {
size_t shard;
if (map_shards_size_ == 1) {
shard = 0;
} else {
GRPC_CHECK(!label.empty());
// Use the first label to shard, all labels to index.
shard = absl::HashOf(label[0], this) % map_shards_size_;
}
return map_shards_[shard];
}
void QueryableDomain::TestOnlyReset() {
channelz_.Reset();
map_shards_ = std::make_unique<MapShard[]>(map_shards_size_);
}
const InstrumentMetadata::Description* QueryableDomain::AllocateCounter(
absl::string_view name, absl::string_view description,
absl::string_view unit) {
const size_t offset = allocated_counter_slots_++;
auto* desc =
InstrumentIndex::Get().Register(this, offset, name, description, unit,
InstrumentMetadata::CounterShape{});
metrics_.push_back(desc);
return desc;
}
const InstrumentMetadata::Description* QueryableDomain::AllocateUpDownCounter(
absl::string_view name, absl::string_view description,
absl::string_view unit) {
const size_t offset = allocated_counter_slots_++;
auto* desc =
InstrumentIndex::Get().Register(this, offset, name, description, unit,
InstrumentMetadata::UpDownCounterShape{});
metrics_.push_back(desc);
return desc;
}
const InstrumentMetadata::Description* QueryableDomain::AllocateHistogram(
absl::string_view name, absl::string_view description,
absl::string_view unit, HistogramBuckets bounds) {
const size_t offset = AllocateCounterSlots(bounds.size());
auto* desc = InstrumentIndex::Get().Register(this, offset, name, description,
unit, bounds);
metrics_.push_back(desc);
return desc;
}
const InstrumentMetadata::Description* QueryableDomain::AllocateDoubleGauge(
absl::string_view name, absl::string_view description,
absl::string_view unit) {
const size_t offset = allocated_double_gauge_slots_++;
auto* desc =
InstrumentIndex::Get().Register(this, offset, name, description, unit,
InstrumentMetadata::DoubleGaugeShape{});
metrics_.push_back(desc);
return desc;
}
const InstrumentMetadata::Description* QueryableDomain::AllocateIntGauge(
absl::string_view name, absl::string_view description,
absl::string_view unit) {
const size_t offset = allocated_int_gauge_slots_++;
auto* desc =
InstrumentIndex::Get().Register(this, offset, name, description, unit,
InstrumentMetadata::IntGaugeShape{});
metrics_.push_back(desc);
return desc;
}
const InstrumentMetadata::Description* QueryableDomain::AllocateUintGauge(
absl::string_view name, absl::string_view description,
absl::string_view unit) {
const size_t offset = allocated_uint_gauge_slots_++;
auto* desc =
InstrumentIndex::Get().Register(this, offset, name, description, unit,
InstrumentMetadata::UintGaugeShape{});
metrics_.push_back(desc);
return desc;
}
void QueryableDomain::TestOnlyResetAll() {
for (auto* domain = last_; domain != nullptr; domain = domain->prev_) {
domain->TestOnlyReset();
}
}
RefCountedPtr<DomainStorage> QueryableDomain::GetDomainStorage(
RefCountedPtr<CollectionScope> scope,
absl::Span<const std::string> label_values) {
auto key_labels =
FilterLabels(label_names_, scope->labels_of_interest_, label_values);
if (scope->parents_.size() == 1 && scope->parents_[0] != nullptr) {
auto parent_key_labels = FilterLabels(
label_names_, scope->parents_[0]->labels_of_interest_, label_values);
if (key_labels == parent_key_labels) {
return GetDomainStorage(scope->parents_[0], label_values);
}
}
size_t shard_idx = absl::HashOf(key_labels) % scope->storage_shards_.size();
auto& shard = scope->storage_shards_[shard_idx];
MutexLock lock(&shard.mu);
auto it = shard.storage.find({this, key_labels});
if (it != shard.storage.end()) {
return it->second;
}
auto storage = CreateDomainStorage(key_labels);
shard.storage.emplace(std::pair(this, key_labels), storage);
return storage;
}
} // namespace instrument_detail
LowContentionBackend::LowContentionBackend(size_t size)
: counters_(new std::atomic<uint64_t>[size]) {
for (size_t i = 0; i < size; ++i) {
counters_[i].store(0, std::memory_order_relaxed);
}
}
uint64_t LowContentionBackend::Sum(size_t index) {
return counters_[index].load(std::memory_order_relaxed);
}
HighContentionBackend::HighContentionBackend(size_t size) {
for (auto& shard : counters_) {
shard = std::make_unique<std::atomic<int64_t>[]>(size);
for (size_t i = 0; i < size; ++i) {
shard[i].store(0, std::memory_order_relaxed);
}
}
}
uint64_t HighContentionBackend::Sum(size_t index) {
uint64_t positive_sum = 0;
uint64_t negative_sum = 0;
for (auto& shard : counters_) {
int64_t value = shard[index].load(std::memory_order_relaxed);
if (value > 0) {
positive_sum += value;
} else if (value < 0) {
negative_sum += -value;
}
}
// Every decrement should have a corresponding increment.
GRPC_CHECK(positive_sum >= negative_sum);
return positive_sum - negative_sum;
}
class GlobalCollectionScopeManager {
public:
GlobalCollectionScopeManager(const GlobalCollectionScopeManager&) = delete;
GlobalCollectionScopeManager& operator=(const GlobalCollectionScopeManager&) =
delete;
static GlobalCollectionScopeManager& Get() {
static GlobalCollectionScopeManager* manager =
new GlobalCollectionScopeManager();
return *manager;
}
RefCountedPtr<CollectionScope> CreateRootScope(InstrumentLabelSet labels,
size_t child_shards_count,
size_t storage_shards_count) {
MutexLock lock(&mu_);
if (auto* building = std::get_if<Building>(&state_); building != nullptr) {
auto scope = CreateCollectionScope({}, labels, child_shards_count,
storage_shards_count);
building->root_scopes.push_back(scope);
return scope;
} else {
// Global scope is already created, we can no longer subset labels.
auto& published = std::get<Published>(state_);
std::vector<absl::string_view> missing_labels;
for (const auto& label : labels.ToList()) {
if (!published.global_scope->ObservesLabel(label)) {
missing_labels.push_back(label.label());
}
}
if (missing_labels.empty()) {
LOG(ERROR) << "Attempt to create a root scope with labels ["
<< absl::StrJoin(labels.ToList(), ", ")
<< "] after the global scope was already created. "
"All requested labels are collected by the global scope, "
"so this scope will be returned instead. "
"To eliminate this message, ensure the root scope "
"creation that triggered it occurs before the first call "
"to GlobalCollectionScope().";
} else {
LOG(ERROR) << "Attempt to create a root scope with labels ["
<< absl::StrJoin(labels.ToList(), ", ")
<< "] after the global scope was already created. "
"The following labels are not collected by the global "
"scope, and so will not be available: ["
<< absl::StrJoin(missing_labels, ", ")
<< "]."
"To eliminate this message, ensure the root scope "
"creation that triggered it occurs before the first call "
"to GlobalCollectionScope().";
}
return published.global_scope;
}
}
RefCountedPtr<CollectionScope> GetGlobalScope() {
MutexLock lock(&mu_);
if (auto* building = std::get_if<Building>(&state_); building != nullptr) {
auto global_scope = CreateCollectionScope(building->root_scopes,
InstrumentLabelSet(), 32, 32);
state_ = Published{global_scope};
return global_scope;
} else {
return std::get<Published>(state_).global_scope;
}
}
void TestOnlyReset() {
std::variant<Building, Published> state;
MutexLock lock(&mu_);
state = std::exchange(state_, Building{});
if (auto* published = std::get_if<Published>(&state);
published != nullptr) {
published->global_scope->TestOnlyReset();
}
}
private:
GlobalCollectionScopeManager() = default;
struct Building {
std::vector<RefCountedPtr<CollectionScope>> root_scopes;
};
struct Published {
RefCountedPtr<CollectionScope> global_scope;
};
Mutex mu_;
std::variant<Building, Published> state_ ABSL_GUARDED_BY(mu_);
};
RefCountedPtr<CollectionScope> CreateRootCollectionScope(
InstrumentLabelSet labels, size_t child_shards_count,
size_t storage_shards_count) {
return GlobalCollectionScopeManager::Get().CreateRootScope(
labels, child_shards_count, storage_shards_count);
}
RefCountedPtr<CollectionScope> GlobalCollectionScope() {
return GlobalCollectionScopeManager::Get().GetGlobalScope();
}
void TestOnlyResetInstruments() {
Hook* hook = hooks.load(std::memory_order_acquire);
while (hook != nullptr) {
Hook* next = hook->next;
delete hook;
hook = next;
}
hooks.store(nullptr, std::memory_order_release);
instrument_detail::QueryableDomain::TestOnlyResetAll();
GlobalCollectionScopeManager::Get().TestOnlyReset();
}
} // namespace grpc_core