blob: ca8d02148ddc329cb38b6bc4a3f308e13102b406 [file]
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "src/cpp/ext/otel/otel_plugin.h"
#include <grpc/support/port_platform.h>
#include <grpcpp/ext/otel_plugin.h>
#include <grpcpp/version_info.h>
#include <memory>
#include <type_traits>
#include <utility>
#include <variant>
#include "opentelemetry/metrics/async_instruments.h"
#include "opentelemetry/metrics/meter.h"
#include "opentelemetry/metrics/meter_provider.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/trace/context.h"
#include "opentelemetry/trace/span_context.h"
#include "src/core/client_channel/client_channel_filter.h"
#include "src/core/config/core_configuration.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/telemetry/call_tracer.h"
#include "src/core/telemetry/instrument.h"
#include "src/core/util/grpc_check.h"
#include "src/core/util/match.h"
#include "src/cpp/ext/otel/key_value_iterable.h"
#include "src/cpp/ext/otel/otel_client_call_tracer.h"
#include "src/cpp/ext/otel/otel_server_call_tracer.h"
#include "absl/strings/escaping.h"
using opentelemetry::context::propagation::TextMapPropagator;
using opentelemetry::trace::SpanContext;
using opentelemetry::trace::SpanId;
using opentelemetry::trace::TraceId;
namespace grpc {
namespace internal {
namespace {
bool IsMetricEnabledByDefault(absl::string_view) { return false; }
} // namespace
bool IsOpenTelemetryLabelOptional(absl::string_view label_key) {
// TODO(ctiller): register other optional labels here with
// `if (label_key =="xyz") return true;` checks.
return absl::StartsWith(label_key, "test_optional.");
}
absl::string_view OpenTelemetryMethodKey() { return "grpc.method"; }
absl::string_view OpenTelemetryStatusKey() { return "grpc.status"; }
absl::string_view OpenTelemetryTargetKey() { return "grpc.target"; }
namespace {
absl::flat_hash_set<std::string> BaseMetrics() {
absl::flat_hash_set<std::string> base_metrics{
std::string(grpc::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName),
std::string(grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName),
std::string(
grpc::OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName),
std::string(
grpc::OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName),
std::string(
grpc::OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName),
std::string(
grpc::OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName),
std::string(grpc::OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName),
std::string(grpc::OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName)};
grpc_core::GlobalInstrumentsRegistry::ForEach(
[&](const grpc_core::GlobalInstrumentsRegistry::
GlobalInstrumentDescriptor& descriptor) {
if (descriptor.enable_by_default) {
base_metrics.emplace(descriptor.name);
}
});
grpc_core::InstrumentMetadata::ForEachInstrument(
[&](const grpc_core::InstrumentMetadata::Description* description) {
if (IsMetricEnabledByDefault(description->name)) {
base_metrics.emplace(description->name);
}
});
return base_metrics;
}
} // namespace
class OpenTelemetryPluginImpl::NPCMetricsKeyValueIterable
: public opentelemetry::common::KeyValueIterable {
public:
NPCMetricsKeyValueIterable(
absl::Span<const absl::string_view> label_keys,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_label_keys,
absl::Span<const absl::string_view> optional_label_values,
const OptionalLabelsBitSet& optional_labels_bits)
: label_keys_(label_keys),
label_values_(label_values),
optional_label_keys_(optional_label_keys),
optional_label_values_(optional_label_values),
optional_labels_bits_(optional_labels_bits) {}
bool ForEachKeyValue(opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
callback) const noexcept override {
for (size_t i = 0; i < label_keys_.size(); ++i) {
if (!callback(AbslStrViewToOpenTelemetryStrView(label_keys_[i]),
AbslStrViewToOpenTelemetryStrView(label_values_[i]))) {
return false;
}
}
// Since we are saving the optional label values as std::string for callback
// gauges, we want to minimize memory usage by filtering out the disabled
// optional label values.
bool filtered = optional_label_values_.size() < optional_label_keys_.size();
for (size_t i = 0, j = 0; i < optional_label_keys_.size(); ++i) {
if (!optional_labels_bits_.test(i)) {
if (!filtered) ++j;
continue;
}
if (!callback(
AbslStrViewToOpenTelemetryStrView(optional_label_keys_[i]),
AbslStrViewToOpenTelemetryStrView(optional_label_values_[j++]))) {
return false;
}
}
return true;
}
size_t size() const noexcept override {
return label_keys_.size() + optional_labels_bits_.count();
}
private:
absl::Span<const absl::string_view> label_keys_;
absl::Span<const absl::string_view> label_values_;
absl::Span<const absl::string_view> optional_label_keys_;
absl::Span<const absl::string_view> optional_label_values_;
const OptionalLabelsBitSet& optional_labels_bits_;
};
//
// OpenTelemetryPluginBuilderImpl
//
OpenTelemetryPluginBuilderImpl::OpenTelemetryPluginBuilderImpl()
: metrics_(BaseMetrics()) {}
OpenTelemetryPluginBuilderImpl::~OpenTelemetryPluginBuilderImpl() = default;
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
meter_provider_ = std::move(meter_provider);
return *this;
}
OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::EnableMetrics(
absl::Span<const absl::string_view> metric_names) {
for (const auto& metric_name : metric_names) {
metrics_.emplace(metric_name);
}
return *this;
}
OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::DisableMetrics(
absl::Span<const absl::string_view> metric_names) {
for (const auto& metric_name : metric_names) {
metrics_.erase(metric_name);
}
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::DisableAllMetrics() {
metrics_.clear();
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
target_attribute_filter_ = std::move(target_attribute_filter);
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter) {
generic_method_attribute_filter_ = std::move(generic_method_attribute_filter);
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetServerSelector(
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector) {
server_selector_ = std::move(server_selector);
return *this;
}
OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddPluginOption(
std::unique_ptr<InternalOpenTelemetryPluginOption> option) {
// We allow a limit of 64 plugin options to be registered at this time.
GRPC_CHECK_LT(plugin_options_.size(), 64u);
plugin_options_.push_back(std::move(option));
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::AddOptionalLabel(
absl::string_view optional_label_key) {
optional_label_keys_.emplace(optional_label_key);
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetTracerProvider(
std::shared_ptr<opentelemetry::trace::TracerProvider> tracer_provider) {
tracer_provider_ = std::move(tracer_provider);
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetTextMapPropagator(
std::unique_ptr<TextMapPropagator> text_map_propagator) {
text_map_propagator_ = std::move(text_map_propagator);
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetChannelScopeFilter(
absl::AnyInvocable<
bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter) {
channel_scope_filter_ = std::move(channel_scope_filter);
return *this;
}
absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
if (meter_provider_ == nullptr && tracer_provider_ == nullptr) {
return absl::InvalidArgumentError(
"Need to configure a valid meter provider or tracer provider.");
}
grpc_core::GlobalStatsPluginRegistry::RegisterStatsPlugin(
std::make_shared<OpenTelemetryPluginImpl>(
metrics_, meter_provider_, std::move(target_attribute_filter_),
std::move(generic_method_attribute_filter_),
std::move(server_selector_), std::move(plugin_options_),
std::move(optional_label_keys_), std::move(tracer_provider_),
std::move(text_map_propagator_), std::move(channel_scope_filter_)));
return absl::OkStatus();
}
absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
OpenTelemetryPluginBuilderImpl::Build() {
if (meter_provider_ == nullptr && tracer_provider_ == nullptr) {
return absl::InvalidArgumentError(
"Need to configure a valid meter provider or tracer provider.");
}
return std::make_shared<OpenTelemetryPluginImpl>(
metrics_, meter_provider_, std::move(target_attribute_filter_),
std::move(generic_method_attribute_filter_), std::move(server_selector_),
std::move(plugin_options_), std::move(optional_label_keys_),
std::move(tracer_provider_), std::move(text_map_propagator_),
std::move(channel_scope_filter_));
}
OpenTelemetryPluginImpl::CallbackMetricReporter::CallbackMetricReporter(
OpenTelemetryPluginImpl* ot_plugin,
grpc_core::RegisteredMetricCallback* key)
: ot_plugin_(ot_plugin), key_(key) {
// Since we are updating the timestamp and updating the cache for all
// registered instruments in a RegisteredMetricCallback, we will need to
// clear all the cache cells for this RegisteredMetricCallback first, so
// that if a particular combination of labels was previously present but
// is no longer present, we won't continue to report it.
for (const auto& handle : key->metrics()) {
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(
descriptor.instrument_type ==
grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
switch (descriptor.value_type) {
case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
auto& callback_gauge_state =
std::get<std::unique_ptr<CallbackGaugeState<int64_t>>>(
ot_plugin_->instruments_data_.at(handle.index).instrument);
callback_gauge_state->caches[key].clear();
break;
}
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
auto& callback_gauge_state =
std::get<std::unique_ptr<CallbackGaugeState<double>>>(
ot_plugin_->instruments_data_.at(handle.index).instrument);
callback_gauge_state->caches[key].clear();
break;
}
default:
grpc_core::Crash(absl::StrFormat(
"Unknown or unsupported value type: %d", descriptor.value_type));
}
}
}
void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportInt64(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
int64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index);
auto* callback_gauge_state =
std::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
&instrument_data.instrument);
GRPC_CHECK_NE(callback_gauge_state, nullptr);
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(descriptor.label_keys.size() == label_values.size());
GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size());
if ((*callback_gauge_state)->caches.find(key_) ==
(*callback_gauge_state)->caches.end()) {
LOG(ERROR) << "This may occur when the gauge used in AddCallback is "
"different from the gauge used in Report. This indicates a "
"misuse of the API. The value "
<< value << " will not be recorded for instrument "
<< handle.index;
return;
}
auto& cell = (*callback_gauge_state)->caches.at(key_);
std::vector<std::string> key;
key.reserve(label_values.size() +
instrument_data.optional_labels_bits.count());
for (const absl::string_view value : label_values) {
key.emplace_back(value);
}
for (size_t i = 0; i < optional_values.size(); ++i) {
if (instrument_data.optional_labels_bits.test(i)) {
key.emplace_back(optional_values[i]);
}
}
cell.insert_or_assign(std::move(key), value);
}
void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportDouble(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index);
auto* callback_gauge_state =
std::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
&instrument_data.instrument);
GRPC_CHECK_NE(callback_gauge_state, nullptr);
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(descriptor.label_keys.size() == label_values.size());
GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size());
if ((*callback_gauge_state)->caches.find(key_) ==
(*callback_gauge_state)->caches.end()) {
LOG(ERROR) << "This may occur when the gauge used in AddCallback is "
"different from the gauge used in Report. This indicates a "
"misuse of the API. The value "
<< value << " will not be recorded for instrument "
<< handle.index;
return;
}
auto& cell = (*callback_gauge_state)->caches.at(key_);
std::vector<std::string> key;
key.reserve(label_values.size() +
instrument_data.optional_labels_bits.count());
for (const absl::string_view value : label_values) {
key.emplace_back(value);
}
for (size_t i = 0; i < optional_values.size(); ++i) {
if (instrument_data.optional_labels_bits.test(i)) {
key.emplace_back(optional_values[i]);
}
}
cell.insert_or_assign(std::move(key), value);
}
void OpenTelemetryPluginImpl::ServerBuilderOption::UpdateArguments(
grpc::ChannelArguments* args) {
plugin_->AddToChannelArguments(args);
}
class OpenTelemetryPluginImpl::ExporterCallback {
public:
virtual ~ExporterCallback() = default;
};
template <class Exporter>
class OpenTelemetryPluginImpl::ExporterCallbackImpl final
: public ExporterCallback {
public:
template <class... Args>
explicit ExporterCallbackImpl(
opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObservableInstrument>
instrument,
Args&&... args)
: exporter_(std::forward<Args>(args)...),
instrument_(std::move(instrument)) {
instrument_->AddCallback(Callback, this);
}
~ExporterCallbackImpl() override {
instrument_->RemoveCallback(Callback, this);
}
private:
static void Callback(opentelemetry::metrics::ObserverResult result,
void* arg) {
static_cast<ExporterCallbackImpl*>(arg)->exporter_.Export(
std::move(result));
}
Exporter exporter_;
const opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObservableInstrument>
instrument_;
};
class OpenTelemetryPluginImpl::ExportedMetricKeyValueIterable final
: public opentelemetry::common::KeyValueIterable {
public:
explicit ExportedMetricKeyValueIterable(
absl::Span<const std::string> label_keys,
absl::Span<const std::string> label_values)
: label_keys_(label_keys), label_values_(label_values) {
CHECK_EQ(label_keys_.size(), label_values_.size());
}
bool ForEachKeyValue(opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
callback) const noexcept override {
for (size_t i = 0; i < label_keys_.size(); ++i) {
if (!callback(label_keys_[i],
opentelemetry::common::AttributeValue(label_values_[i]))) {
return false;
}
}
return true;
}
size_t size() const noexcept override { return label_values_.size(); }
private:
const absl::Span<const std::string> label_keys_;
const absl::Span<const std::string> label_values_;
};
class OpenTelemetryPluginImpl::CounterExporter final {
public:
explicit CounterExporter(OpenTelemetryPluginImpl* impl,
const grpc_core::InstrumentMetadata::Description* md)
: impl_(impl), md_(md) {}
void Export(opentelemetry::metrics::ObserverResult result) {
Sink sink(std::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<int64_t>>>(result),
md_->shape);
impl_->QueryMetrics({md_->name}, sink);
}
private:
class Sink final : public grpc_core::MetricsSink {
public:
explicit Sink(opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<int64_t>>
observer,
grpc_core::InstrumentMetadata::Shape shape)
: observer_(std::move(observer)), shape_(shape) {}
void Counter(grpc_core::InstrumentLabelList label_keys,
absl::Span<const std::string> label_values, absl::string_view,
uint64_t value) override {
GRPC_DCHECK(
std::holds_alternative<grpc_core::InstrumentMetadata::CounterShape>(
shape_));
std::vector<std::string> label_key_strings;
label_key_strings.reserve(label_keys.size());
for (const auto& label : label_keys) {
label_key_strings.push_back(std::string(label.label()));
}
LOG(ERROR) << "Counter: " << value
<< " label_keys: " << absl::StrJoin(label_key_strings, ",")
<< " label_values: " << absl::StrJoin(label_values, ",");
ExportedMetricKeyValueIterable labels_iterable(label_key_strings,
label_values);
observer_->Observe(value, labels_iterable);
}
void UpDownCounter(grpc_core::InstrumentLabelList label_keys,
absl::Span<const std::string> label_values,
absl::string_view, uint64_t value) override {
GRPC_DCHECK(std::holds_alternative<
grpc_core::InstrumentMetadata::UpDownCounterShape>(shape_));
std::vector<std::string> label_key_strings;
label_key_strings.reserve(label_keys.size());
for (const auto& label : label_keys) {
label_key_strings.push_back(std::string(label.label()));
}
LOG(ERROR) << "UpDownCounter: " << value
<< " label_keys: " << absl::StrJoin(label_key_strings, ",")
<< " label_values: " << absl::StrJoin(label_values, ",");
ExportedMetricKeyValueIterable labels_iterable(label_key_strings,
label_values);
observer_->Observe(value, labels_iterable);
}
void Histogram(grpc_core::InstrumentLabelList,
absl::Span<const std::string>, absl::string_view,
grpc_core::HistogramBuckets,
absl::Span<const uint64_t>) override {
LOG(FATAL) << "Expected a counter, got a histogram";
}
void DoubleGauge(grpc_core::InstrumentLabelList,
absl::Span<const std::string>, absl::string_view,
double) override {
LOG(FATAL) << "Expected a counter, got a double gauge";
}
void IntGauge(grpc_core::InstrumentLabelList, absl::Span<const std::string>,
absl::string_view, int64_t) override {
LOG(FATAL) << "Expected a counter, got an int gauge";
}
void UintGauge(grpc_core::InstrumentLabelList,
absl::Span<const std::string>, absl::string_view,
uint64_t) override {
LOG(FATAL) << "Expected a counter, got a uint gauge";
}
private:
const opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<int64_t>>
observer_;
const grpc_core::InstrumentMetadata::Shape shape_;
};
OpenTelemetryPluginImpl* const impl_;
const grpc_core::InstrumentMetadata::Description* const md_;
};
void OpenTelemetryPluginImpl::QueryMetrics(
absl::Span<const absl::string_view> metrics, grpc_core::MetricsSink& sink) {
grpc_core::MetricsQuery()
.OnlyMetrics(std::vector<std::string>(metrics.begin(), metrics.end()))
.Run(collection_scope_, sink);
}
namespace {
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> MaybeMakeTracer(
opentelemetry::trace::TracerProvider* tracer_provider) {
if (tracer_provider == nullptr) {
return opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer>();
}
return tracer_provider->GetTracer("grpc-c++", GRPC_CPP_VERSION_STRING);
}
} // namespace
OpenTelemetryPluginImpl::OpenTelemetryPluginImpl(
const absl::flat_hash_set<std::string>& metrics,
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider,
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter,
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter,
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector,
std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
plugin_options,
const std::set<absl::string_view>& optional_label_keys,
std::shared_ptr<opentelemetry::trace::TracerProvider> tracer_provider,
std::unique_ptr<TextMapPropagator> text_map_propagator,
absl::AnyInvocable<
bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter)
: meter_provider_(std::move(meter_provider)),
server_selector_(std::move(server_selector)),
target_attribute_filter_(std::move(target_attribute_filter)),
generic_method_attribute_filter_(
std::move(generic_method_attribute_filter)),
plugin_options_(std::move(plugin_options)),
tracer_provider_(std::move(tracer_provider)),
tracer_(MaybeMakeTracer(tracer_provider_.get())),
text_map_propagator_(std::move(text_map_propagator)),
channel_scope_filter_(std::move(channel_scope_filter)) {
if (meter_provider_ != nullptr) {
auto meter = meter_provider_->GetMeter("grpc-c++", GRPC_CPP_VERSION_STRING);
// Per-call metrics.
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName)) {
client_.attempt.started = meter->CreateUInt64Counter(
std::string(grpc::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName),
"Number of client call attempts started", "{attempt}");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName)) {
client_.attempt.duration = meter->CreateDoubleHistogram(
std::string(grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName),
"End-to-end time taken to complete a client call attempt", "s");
}
if (metrics.contains(
grpc::OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName)) {
client_.attempt
.sent_total_compressed_message_size = meter->CreateUInt64Histogram(
std::string(
grpc::OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes sent per client call attempt", "By");
}
if (metrics.contains(
grpc::OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName)) {
client_.attempt
.rcvd_total_compressed_message_size = meter->CreateUInt64Histogram(
std::string(
grpc::OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes received per call attempt", "By");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName)) {
server_.call.started = meter->CreateUInt64Counter(
std::string(grpc::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName),
"Number of server calls started", "{call}");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName)) {
server_.call.duration = meter->CreateDoubleHistogram(
std::string(grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName),
"End-to-end time taken to complete a call from server transport's "
"perspective",
"s");
}
if (metrics.contains(
grpc::OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName)) {
server_.call.sent_total_compressed_message_size =
meter->CreateUInt64Histogram(
std::string(
grpc::OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes sent per server call", "By");
}
if (metrics.contains(
grpc::OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName)) {
server_.call.rcvd_total_compressed_message_size =
meter->CreateUInt64Histogram(
std::string(
grpc::OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes received per server call", "By");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kClientCallRetriesInstrumentName)) {
client_.call.retries = meter->CreateUInt64Histogram(
std::string(grpc::OpenTelemetryPluginBuilder::
kClientCallRetriesInstrumentName),
"EXPERIMENTAL: Number of retries during the client call. If there "
"were no retries, 0 is not reported.",
"{retry}");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kClientCallTransparentRetriesInstrumentName)) {
client_.call.transparent_retries = meter->CreateUInt64Histogram(
std::string(grpc::OpenTelemetryPluginBuilder::
kClientCallTransparentRetriesInstrumentName),
"EXPERIMENTAL: Number of transparent retries during the client call. "
"If there were no transparent retries, 0 is not reported.",
"{transparent_retry}");
}
if (metrics.contains(grpc::OpenTelemetryPluginBuilder::
kClientCallRetryDelayInstrumentName)) {
client_.call.retry_delay = meter->CreateDoubleHistogram(
std::string(grpc::OpenTelemetryPluginBuilder::
kClientCallRetryDelayInstrumentName),
"EXPERIMENTAL: Total time of delay while there is no active attempt "
"during the client call",
"s");
}
// Store optional label keys for per call metrics
GRPC_CHECK(static_cast<size_t>(
grpc_core::ClientCallTracerInterface::CallAttemptTracer::
OptionalLabelKey::kSize) <= kOptionalLabelsSizeLimit);
for (const auto& key : optional_label_keys) {
auto optional_key = OptionalLabelStringToKey(key);
if (optional_key.has_value()) {
per_call_optional_label_bits_.set(
static_cast<size_t>(optional_key.value()));
}
}
if (grpc_core::IsOtelExportTelemetryDomainsEnabled()) {
// gRPC metrics.
absl::flat_hash_set<std::string> labels;
grpc_core::InstrumentMetadata::ForEachInstrument(
[&](const grpc_core::InstrumentMetadata::Description* description) {
if (!metrics.contains(description->name)) return;
for (const auto& label : description->domain->label_names()) {
if (!internal::IsOpenTelemetryLabelOptional(label.label()) ||
optional_label_keys.find(label.label()) !=
optional_label_keys.end()) {
labels.insert(std::string(label.label()));
}
}
grpc_core::Match(
description->shape,
[&](grpc_core::InstrumentMetadata::CounterShape) {
auto instrument = meter->CreateInt64ObservableCounter(
std::string(description->name),
std::string(description->description),
std::string(description->unit));
exporter_callbacks_.push_back(
std::make_unique<ExporterCallbackImpl<CounterExporter>>(
instrument, this, description));
},
[&](grpc_core::InstrumentMetadata::UpDownCounterShape) {
auto instrument = meter->CreateInt64ObservableUpDownCounter(
std::string(description->name),
std::string(description->description),
std::string(description->unit));
exporter_callbacks_.push_back(
std::make_unique<ExporterCallbackImpl<CounterExporter>>(
instrument, this, description));
},
[&](grpc_core::InstrumentMetadata::DoubleGaugeShape) {
LOG(FATAL) << "Double gauge shape is not supported yet";
},
[&](grpc_core::InstrumentMetadata::IntGaugeShape) {
LOG(FATAL) << "Int gauge shape is not supported yet";
},
[&](grpc_core::InstrumentMetadata::UintGaugeShape) {
LOG(FATAL) << "Uint gauge shape is not supported yet";
},
[&](grpc_core::InstrumentMetadata::HistogramShape) {
LOG(FATAL) << "Histogram shape is not supported yet";
});
});
grpc_core::InstrumentLabelSet label_set;
for (const auto& label : labels) {
label_set.Set(grpc_core::InstrumentLabel(label));
}
collection_scope_ = grpc_core::CreateCollectionScope({}, label_set);
}
// Non-per-call metrics.
grpc_core::GlobalInstrumentsRegistry::ForEach(
[&, this](const grpc_core::GlobalInstrumentsRegistry::
GlobalInstrumentDescriptor& descriptor) {
GRPC_CHECK(descriptor.optional_label_keys.size() <=
kOptionalLabelsSizeLimit);
if (instruments_data_.size() < descriptor.index + 1) {
instruments_data_.resize(descriptor.index + 1);
}
if (!metrics.contains(descriptor.name)) {
return;
}
switch (descriptor.instrument_type) {
case grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter:
switch (descriptor.value_type) {
case grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64:
instruments_data_[descriptor.index].instrument =
meter->CreateUInt64Counter(
std::string(descriptor.name),
std::string(descriptor.description),
std::string(descriptor.unit));
break;
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble:
instruments_data_[descriptor.index].instrument =
meter->CreateDoubleCounter(
std::string(descriptor.name),
std::string(descriptor.description),
std::string(descriptor.unit));
break;
default:
grpc_core::Crash(
absl::StrFormat("Unknown or unsupported value type: %d",
descriptor.value_type));
}
break;
case grpc_core::GlobalInstrumentsRegistry::InstrumentType::
kHistogram:
switch (descriptor.value_type) {
case grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64:
instruments_data_[descriptor.index].instrument =
meter->CreateUInt64Histogram(
std::string(descriptor.name),
std::string(descriptor.description),
std::string(descriptor.unit));
break;
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble:
instruments_data_[descriptor.index].instrument =
meter->CreateDoubleHistogram(
std::string(descriptor.name),
std::string(descriptor.description),
std::string(descriptor.unit));
break;
default:
grpc_core::Crash(
absl::StrFormat("Unknown or unsupported value type: %d",
descriptor.value_type));
}
break;
case grpc_core::GlobalInstrumentsRegistry::InstrumentType::
kCallbackGauge:
switch (descriptor.value_type) {
case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
auto observable_state =
std::make_unique<CallbackGaugeState<int64_t>>();
observable_state->id = descriptor.index;
observable_state->ot_plugin = this;
observable_state->instrument =
meter->CreateInt64ObservableGauge(
std::string(descriptor.name),
std::string(descriptor.description),
std::string(descriptor.unit));
instruments_data_[descriptor.index].instrument =
std::move(observable_state);
break;
}
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
auto observable_state =
std::make_unique<CallbackGaugeState<double>>();
observable_state->id = descriptor.index;
observable_state->ot_plugin = this;
observable_state->instrument =
meter->CreateDoubleObservableGauge(
std::string(descriptor.name),
std::string(descriptor.description),
std::string(descriptor.unit));
instruments_data_[descriptor.index].instrument =
std::move(observable_state);
break;
}
default:
grpc_core::Crash(
absl::StrFormat("Unknown or unsupported value type: %d",
descriptor.value_type));
}
break;
default:
grpc_core::Crash(absl::StrFormat("Unknown instrument_type: %d",
descriptor.instrument_type));
}
for (size_t i = 0; i < descriptor.optional_label_keys.size(); ++i) {
if (optional_label_keys.find(descriptor.optional_label_keys[i]) !=
optional_label_keys.end()) {
instruments_data_[descriptor.index].optional_labels_bits.set(i);
}
}
});
}
}
OpenTelemetryPluginImpl::~OpenTelemetryPluginImpl() {
for (const auto& instrument_data : instruments_data_) {
grpc_core::Match(
instrument_data.instrument, [](const Disabled&) {},
[](const std::unique_ptr<opentelemetry::metrics::Counter<double>>&) {},
[](const std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>&) {
},
[](const std::unique_ptr<
opentelemetry::metrics::Histogram<uint64_t>>&) {},
[](const std::unique_ptr<opentelemetry::metrics::Histogram<double>>&) {
},
[](const std::unique_ptr<CallbackGaugeState<int64_t>>& state) {
GRPC_CHECK(state->caches.empty());
if (state->ot_callback_registered) {
state->instrument->RemoveCallback(
&CallbackGaugeState<int64_t>::CallbackGaugeCallback,
state.get());
state->ot_callback_registered = false;
}
},
[](const std::unique_ptr<CallbackGaugeState<double>>& state) {
GRPC_CHECK(state->caches.empty());
if (state->ot_callback_registered) {
state->instrument->RemoveCallback(
&CallbackGaugeState<double>::CallbackGaugeCallback,
state.get());
state->ot_callback_registered = false;
}
});
}
}
namespace {
constexpr absl::string_view kLocality = "grpc.lb.locality";
constexpr absl::string_view kBackendService = "grpc.lb.backend_service";
} // namespace
absl::string_view OpenTelemetryPluginImpl::OptionalLabelKeyToString(
grpc_core::ClientCallTracerInterface::CallAttemptTracer::OptionalLabelKey
key) {
switch (key) {
case grpc_core::ClientCallTracerInterface::CallAttemptTracer::
OptionalLabelKey::kLocality:
return kLocality;
case grpc_core::ClientCallTracerInterface::CallAttemptTracer::
OptionalLabelKey::kBackendService:
return kBackendService;
default:
grpc_core::Crash("Illegal OptionalLabelKey index");
}
}
std::optional<
grpc_core::ClientCallTracerInterface::CallAttemptTracer::OptionalLabelKey>
OpenTelemetryPluginImpl::OptionalLabelStringToKey(absl::string_view key) {
if (key == kLocality) {
return grpc_core::ClientCallTracerInterface::CallAttemptTracer::
OptionalLabelKey::kLocality;
} else if (key == kBackendService) {
return grpc_core::ClientCallTracerInterface::CallAttemptTracer::
OptionalLabelKey::kBackendService;
}
return std::nullopt;
}
absl::string_view OpenTelemetryPluginImpl::GetMethodFromPath(
const grpc_core::Slice& path) {
// Check for leading '/' and trim it if present.
return absl::StripPrefix(path.as_string_view(), "/");
}
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
OpenTelemetryPluginImpl::IsEnabledForChannel(
const OpenTelemetryPluginBuilder::ChannelScope& scope) const {
if (channel_scope_filter_ == nullptr || channel_scope_filter_(scope)) {
return {true, std::make_shared<ClientScopeConfig>(this, scope)};
}
return {false, nullptr};
}
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
OpenTelemetryPluginImpl::IsEnabledForServer(
const grpc_core::ChannelArgs& args) const {
// Return true only if there is no server selector registered or if the
// server selector returns true.
if (server_selector_ == nullptr || server_selector_(args)) {
return {true, std::make_shared<ServerScopeConfig>(this, args)};
}
return {false, nullptr};
}
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>
OpenTelemetryPluginImpl::GetChannelScopeConfig(
const OpenTelemetryPluginBuilder::ChannelScope& scope) const {
GRPC_CHECK(channel_scope_filter_ == nullptr || channel_scope_filter_(scope));
return std::make_shared<ClientScopeConfig>(this, scope);
}
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>
OpenTelemetryPluginImpl::GetServerScopeConfig(
const grpc_core::ChannelArgs& args) const {
GRPC_CHECK(server_selector_ == nullptr || server_selector_(args));
return std::make_shared<ServerScopeConfig>(this, args);
}
void OpenTelemetryPluginImpl::AddCounter(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
if (meter_provider_ == nullptr) return;
const auto& instrument_data = instruments_data_.at(handle.index);
if (std::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
return;
}
GRPC_CHECK(std::holds_alternative<
std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
instrument_data.instrument));
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(descriptor.label_keys.size() == label_values.size());
GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size());
if (label_values.empty() && optional_values.empty()) {
std::get<std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
instrument_data.instrument)
->Add(value);
} else {
std::get<std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>(
instrument_data.instrument)
->Add(value, NPCMetricsKeyValueIterable(
descriptor.label_keys, label_values,
descriptor.optional_label_keys, optional_values,
instrument_data.optional_labels_bits));
}
}
void OpenTelemetryPluginImpl::AddCounter(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
if (meter_provider_ == nullptr) return;
const auto& instrument_data = instruments_data_.at(handle.index);
if (std::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
return;
}
GRPC_CHECK(std::holds_alternative<
std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
instrument_data.instrument));
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(descriptor.label_keys.size() == label_values.size());
GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size());
if (label_values.empty() && optional_values.empty()) {
std::get<std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
instrument_data.instrument)
->Add(value);
} else {
std::get<std::unique_ptr<opentelemetry::metrics::Counter<double>>>(
instrument_data.instrument)
->Add(value, NPCMetricsKeyValueIterable(
descriptor.label_keys, label_values,
descriptor.optional_label_keys, optional_values,
instrument_data.optional_labels_bits));
}
}
void OpenTelemetryPluginImpl::RecordHistogram(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
if (meter_provider_ == nullptr) return;
const auto& instrument_data = instruments_data_.at(handle.index);
if (std::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
return;
}
GRPC_CHECK(std::holds_alternative<
std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
instrument_data.instrument));
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(descriptor.label_keys.size() == label_values.size());
GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size());
if (label_values.empty() && optional_values.empty()) {
std::get<std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
instrument_data.instrument)
->Record(value, opentelemetry::context::Context{});
} else {
std::get<std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>(
instrument_data.instrument)
->Record(value,
NPCMetricsKeyValueIterable(
descriptor.label_keys, label_values,
descriptor.optional_label_keys, optional_values,
instrument_data.optional_labels_bits),
opentelemetry::context::Context{});
}
}
void OpenTelemetryPluginImpl::RecordHistogram(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
if (meter_provider_ == nullptr) return;
const auto& instrument_data = instruments_data_.at(handle.index);
if (std::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
return;
}
GRPC_CHECK(std::holds_alternative<
std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
instrument_data.instrument));
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(descriptor.label_keys.size() == label_values.size());
GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size());
if (label_values.empty() && optional_values.empty()) {
std::get<std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
instrument_data.instrument)
->Record(value, opentelemetry::context::Context{});
} else {
std::get<std::unique_ptr<opentelemetry::metrics::Histogram<double>>>(
instrument_data.instrument)
->Record(value,
NPCMetricsKeyValueIterable(
descriptor.label_keys, label_values,
descriptor.optional_label_keys, optional_values,
instrument_data.optional_labels_bits),
opentelemetry::context::Context{});
}
}
void OpenTelemetryPluginImpl::AddCallback(
grpc_core::RegisteredMetricCallback* callback) {
if (meter_provider_ == nullptr) return;
std::vector<
std::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
gauges_that_need_to_add_callback;
{
grpc_core::MutexLock lock(&mu_);
callback_timestamps_.emplace(callback, grpc_core::Timestamp::InfPast());
for (const auto& handle : callback->metrics()) {
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(
descriptor.instrument_type ==
grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
switch (descriptor.value_type) {
case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
const auto& instrument_data = instruments_data_.at(handle.index);
if (std::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
continue;
}
auto* callback_gauge_state =
std::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
&instrument_data.instrument);
GRPC_CHECK_NE(callback_gauge_state, nullptr);
(*callback_gauge_state)
->caches.emplace(callback, CallbackGaugeState<int64_t>::Cache{});
if (!std::exchange((*callback_gauge_state)->ot_callback_registered,
true)) {
gauges_that_need_to_add_callback.push_back(
callback_gauge_state->get());
}
break;
}
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
const auto& instrument_data = instruments_data_.at(handle.index);
if (std::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
continue;
}
auto* callback_gauge_state =
std::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
&instrument_data.instrument);
GRPC_CHECK_NE(callback_gauge_state, nullptr);
(*callback_gauge_state)
->caches.emplace(callback, CallbackGaugeState<double>::Cache{});
if (!std::exchange((*callback_gauge_state)->ot_callback_registered,
true)) {
gauges_that_need_to_add_callback.push_back(
callback_gauge_state->get());
}
break;
}
default:
grpc_core::Crash(absl::StrFormat(
"Unknown or unsupported value type: %d", descriptor.value_type));
}
}
}
// AddCallback internally grabs OpenTelemetry's observable_registry's
// lock. So we need to call it without our plugin lock otherwise we may
// deadlock.
for (const auto& gauge : gauges_that_need_to_add_callback) {
grpc_core::Match(
gauge,
[](CallbackGaugeState<int64_t>* gauge) {
gauge->instrument->AddCallback(
&CallbackGaugeState<int64_t>::CallbackGaugeCallback, gauge);
},
[](CallbackGaugeState<double>* gauge) {
gauge->instrument->AddCallback(
&CallbackGaugeState<double>::CallbackGaugeCallback, gauge);
});
}
}
void OpenTelemetryPluginImpl::RemoveCallback(
grpc_core::RegisteredMetricCallback* callback) {
if (meter_provider_ == nullptr) return;
{
grpc_core::MutexLock lock(&mu_);
callback_timestamps_.erase(callback);
for (const auto& handle : callback->metrics()) {
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GRPC_CHECK(
descriptor.instrument_type ==
grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
switch (descriptor.value_type) {
case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
const auto& instrument_data = instruments_data_.at(handle.index);
if (std::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
continue;
}
auto* callback_gauge_state =
std::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
&instrument_data.instrument);
GRPC_CHECK_NE(callback_gauge_state, nullptr);
GRPC_CHECK((*callback_gauge_state)->ot_callback_registered);
GRPC_CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
break;
}
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
const auto& instrument_data = instruments_data_.at(handle.index);
if (std::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
continue;
}
auto* callback_gauge_state =
std::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
&instrument_data.instrument);
GRPC_CHECK_NE(callback_gauge_state, nullptr);
GRPC_CHECK((*callback_gauge_state)->ot_callback_registered);
GRPC_CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
break;
}
default:
grpc_core::Crash(absl::StrFormat(
"Unknown or unsupported value type: %d", descriptor.value_type));
}
}
}
// Note that we are not removing the callback from OpenTelemetry immediately,
// and instead remove it when the plugin is destroyed. We just have a single
// callback per OpenTelemetry instrument which is a small number. If we decide
// to remove the callback immediately at this point, we need to make sure that
// 1) the callback is removed without holding mu_ and 2) we make sure that
// this does not race against a possible `AddCallback` operation. A potential
// way to do this is to use WorkSerializer.
}
template <typename ValueType>
void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::Observe(
opentelemetry::metrics::ObserverResult& result, const Cache& cache) {
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor({id});
for (const auto& pair : cache) {
GRPC_CHECK(pair.first.size() <= (descriptor.label_keys.size() +
descriptor.optional_label_keys.size()));
if (descriptor.label_keys.empty() &&
descriptor.optional_label_keys.empty()) {
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<ValueType>>>(result)
->Observe(pair.second);
} else {
auto& instrument_data = ot_plugin->instruments_data_.at(id);
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<ValueType>>>(result)
->Observe(pair.second,
NPCMetricsKeyValueIterable(
descriptor.label_keys,
absl::FixedArray<absl::string_view>(
pair.first.begin(),
pair.first.begin() + descriptor.label_keys.size()),
descriptor.optional_label_keys,
absl::FixedArray<absl::string_view>(
pair.first.begin() + descriptor.label_keys.size(),
pair.first.end()),
instrument_data.optional_labels_bits));
}
}
}
// OpenTelemetry calls our callback with its observable_registry's lock
// held.
template <typename ValueType>
void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::
CallbackGaugeCallback(opentelemetry::metrics::ObserverResult result,
void* arg) {
auto* callback_gauge_state = static_cast<CallbackGaugeState<ValueType>*>(arg);
auto now = grpc_core::Timestamp::Now();
grpc_core::MutexLock plugin_lock(&callback_gauge_state->ot_plugin->mu_);
for (auto& elem : callback_gauge_state->caches) {
auto* registered_metric_callback = elem.first;
auto iter = callback_gauge_state->ot_plugin->callback_timestamps_.find(
registered_metric_callback);
GRPC_CHECK(iter !=
callback_gauge_state->ot_plugin->callback_timestamps_.end());
if (now - iter->second < registered_metric_callback->min_interval()) {
// Use cached value.
callback_gauge_state->Observe(result, elem.second);
continue;
}
// Otherwise update and use the cache.
iter->second = now;
CallbackMetricReporter reporter(callback_gauge_state->ot_plugin,
registered_metric_callback);
registered_metric_callback->Run(reporter);
callback_gauge_state->Observe(result, elem.second);
}
}
grpc_core::ClientCallTracerInterface*
OpenTelemetryPluginImpl::GetClientCallTracer(
const grpc_core::Slice& path, bool registered_method,
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
return grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<ClientCallTracerInterface>(
path, grpc_core::GetContext<grpc_core::Arena>(), registered_method,
this,
std::static_pointer_cast<OpenTelemetryPluginImpl::ClientScopeConfig>(
scope_config));
}
grpc_core::ServerCallTracerInterface*
OpenTelemetryPluginImpl::GetServerCallTracer(
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
auto arena = grpc_core::GetContext<grpc_core::Arena>();
return arena
->MakeRefCounted<ServerCallTracerInterface>(
this, arena,
std::static_pointer_cast<OpenTelemetryPluginImpl::ServerScopeConfig>(
scope_config))
.release();
}
bool OpenTelemetryPluginImpl::IsInstrumentEnabled(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle) const {
return !std::holds_alternative<Disabled>(
instruments_data_.at(handle.index).instrument);
}
void OpenTelemetryPluginImpl::AddToChannelArguments(
grpc::ChannelArguments* args) {
const grpc_channel_args c_args = args->c_channel_args();
auto* stats_plugin_list = grpc_channel_args_find_pointer<
std::shared_ptr<std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>>(
&c_args, GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
if (stats_plugin_list != nullptr) {
(*stats_plugin_list)->emplace_back(shared_from_this());
} else {
auto stats_plugin_list = std::make_shared<
std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>();
args->SetPointerWithVtable(
GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS, &stats_plugin_list,
grpc_core::ChannelArgTypeTraits<decltype(stats_plugin_list)>::VTable());
stats_plugin_list->emplace_back(shared_from_this());
}
}
void OpenTelemetryPluginImpl::AddToServerBuilder(grpc::ServerBuilder* builder) {
builder->SetOption(std::make_unique<ServerBuilderOption>(shared_from_this()));
}
class GrpcTraceBinTextMapPropagator : public TextMapPropagator {
public:
opentelemetry::context::Context Extract(
const opentelemetry::context::propagation::TextMapCarrier& carrier,
opentelemetry::context::Context& context) noexcept override {
opentelemetry::nostd::string_view grpc_trace_bin_val =
carrier.Get("grpc-trace-bin");
std::string base64_unescaped_val;
absl::Base64Unescape(NoStdStringViewToAbslStringView(grpc_trace_bin_val),
&base64_unescaped_val);
return opentelemetry::trace::SetSpan(
context,
std::shared_ptr<opentelemetry::trace::Span>(
new (std::nothrow) opentelemetry::trace::DefaultSpan(
GrpcTraceBinHeaderToSpanContext(base64_unescaped_val))));
}
void Inject(
opentelemetry::context::propagation::TextMapCarrier& carrier,
const opentelemetry::context::Context& context) noexcept override {
auto span_context = opentelemetry::trace::GetSpan(context)->GetContext();
if (!span_context.IsValid()) {
return;
}
carrier.Set("grpc-trace-bin",
absl::Base64Escape(absl::string_view(
reinterpret_cast<char*>(
SpanContextToGrpcTraceBinHeader(span_context).data()),
kGrpcTraceBinHeaderLen)));
}
bool Fields(opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view)>
callback) const noexcept override {
return callback("grpc-trace-bin");
}
private:
static constexpr int kGrpcTraceBinHeaderLen = 29;
std::array<uint8_t, kGrpcTraceBinHeaderLen> SpanContextToGrpcTraceBinHeader(
const SpanContext& ctx) {
std::array<uint8_t, kGrpcTraceBinHeaderLen> header;
header[0] = 0;
header[1] = 0;
ctx.trace_id().CopyBytesTo(
opentelemetry::nostd::span<uint8_t, 16>(&header[2], 16));
header[18] = 1;
ctx.span_id().CopyBytesTo(
opentelemetry::nostd::span<uint8_t, 8>(&header[19], 8));
header[27] = 2;
header[28] = ctx.trace_flags().flags();
return header;
}
SpanContext GrpcTraceBinHeaderToSpanContext(
opentelemetry::nostd::string_view header) {
if (header.size() != kGrpcTraceBinHeaderLen || header[0] != 0 ||
header[1] != 0 || header[18] != 1 || header[27] != 2) {
return SpanContext::GetInvalid();
}
return SpanContext(TraceId(opentelemetry::nostd::span<const uint8_t, 16>(
reinterpret_cast<const uint8_t*>(&header[2]), 16)),
SpanId(opentelemetry::nostd::span<const uint8_t, 8>(
reinterpret_cast<const uint8_t*>(&header[19]), 8)),
opentelemetry::trace::TraceFlags(header[28]),
/*is_remote*/ true);
}
};
opentelemetry::nostd::string_view GrpcTextMapCarrier::Get(
opentelemetry::nostd::string_view key) const noexcept {
absl::string_view absl_key = NoStdStringViewToAbslStringView(key);
std::string scratch;
std::string ret_val;
if (absl_key == "grpc-trace-bin") {
ret_val = absl::Base64Escape(
metadata_->GetStringValue(absl_key, &scratch).value_or(""));
} else if (absl::EndsWith(absl_key, "-bin")) {
// Maybe ok to support a custom binary propagator. Needs based64 encoding
// validation if so. Not for now.
LOG(ERROR) << "Binary propagator other than GrpcTraceBinPropagator is "
"not supported.";
} else {
ret_val = std::string(AbslStringViewToNoStdStringView(
metadata_->GetStringValue(absl_key, &scratch).value_or("")));
}
if (ret_val.empty()) {
return "";
}
// Store the string on the arena since we are returning a string_view.
std::string* arena_stored_string =
grpc_core::GetContext<grpc_core::Arena>()->ManagedNew<std::string>(
std::move(ret_val));
return *arena_stored_string;
}
void GrpcTextMapCarrier::Set(opentelemetry::nostd::string_view key,
opentelemetry::nostd::string_view value) noexcept {
absl::string_view absl_key = NoStdStringViewToAbslStringView(key);
absl::string_view absl_value = NoStdStringViewToAbslStringView(value);
if (absl_key == "grpc-trace-bin") {
std::string unescaped_value;
if (!absl::Base64Unescape(absl_value, &unescaped_value)) {
return;
}
metadata_->Set(
grpc_core::GrpcTraceBinMetadata(),
grpc_core::Slice::FromCopiedString(std::move(unescaped_value)));
} else if (absl::EndsWith(absl_key, "-bin")) {
LOG(ERROR) << "Binary propagator other than GrpcTraceBinPropagator is "
"not supported.";
return;
} else {
// A propagator other than GrpcTraceBinTextMapPropagator was used.
metadata_->Append(absl_key, grpc_core::Slice::FromCopiedString(absl_value),
[](absl::string_view, const grpc_core::Slice&) {
LOG(ERROR)
<< "Failed to add tracing information in metadata.";
});
}
}
std::string OTelSpanTraceIdToString(opentelemetry::trace::Span* span) {
if (span == nullptr) {
return "";
}
auto context = span->GetContext();
auto trace_id = context.trace_id();
if (!trace_id.IsValid()) {
return "";
}
auto trace_id_span = trace_id.Id();
return absl::BytesToHexString(
absl::string_view(reinterpret_cast<const char*>(trace_id_span.data()),
trace_id_span.size()));
}
std::string OTelSpanSpanIdToString(opentelemetry::trace::Span* span) {
if (span == nullptr) {
return "";
}
auto context = span->GetContext();
auto span_id = context.span_id();
if (!span_id.IsValid()) {
return "";
}
auto span_id_span = span_id.Id();
return absl::BytesToHexString(absl::string_view(
reinterpret_cast<const char*>(span_id_span.data()), span_id_span.size()));
}
} // namespace internal
constexpr absl::string_view
OpenTelemetryPluginBuilder::kClientAttemptStartedInstrumentName;
constexpr absl::string_view
OpenTelemetryPluginBuilder::kClientAttemptDurationInstrumentName;
constexpr absl::string_view OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName;
constexpr absl::string_view OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName;
constexpr absl::string_view
OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName;
constexpr absl::string_view
OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName;
constexpr absl::string_view OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName;
constexpr absl::string_view OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName;
//
// OpenTelemetryPluginBuilder
//
OpenTelemetryPluginBuilder::OpenTelemetryPluginBuilder()
: impl_(std::make_unique<internal::OpenTelemetryPluginBuilderImpl>()) {}
OpenTelemetryPluginBuilder::~OpenTelemetryPluginBuilder() = default;
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
impl_->SetMeterProvider(std::move(meter_provider));
return *this;
}
OpenTelemetryPluginBuilder&
OpenTelemetryPluginBuilder::SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
impl_->SetTargetAttributeFilter(std::move(target_attribute_filter));
return *this;
}
OpenTelemetryPluginBuilder&
OpenTelemetryPluginBuilder::SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter) {
impl_->SetGenericMethodAttributeFilter(
std::move(generic_method_attribute_filter));
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::EnableMetrics(
absl::Span<const absl::string_view> metric_names) {
impl_->EnableMetrics(metric_names);
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableMetrics(
absl::Span<const absl::string_view> metric_names) {
impl_->DisableMetrics(metric_names);
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableAllMetrics() {
impl_->DisableAllMetrics();
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddPluginOption(
std::unique_ptr<OpenTelemetryPluginOption> option) {
impl_->AddPluginOption(
std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>(
static_cast<grpc::internal::InternalOpenTelemetryPluginOption*>(
option.release())));
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddOptionalLabel(
absl::string_view optional_label_key) {
impl_->AddOptionalLabel(optional_label_key);
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetTracerProvider(
std::shared_ptr<opentelemetry::trace::TracerProvider> tracer_provider) {
impl_->SetTracerProvider(std::move(tracer_provider));
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetTextMapPropagator(
std::unique_ptr<TextMapPropagator> text_map_propagator) {
impl_->SetTextMapPropagator(std::move(text_map_propagator));
return *this;
}
std::unique_ptr<TextMapPropagator>
OpenTelemetryPluginBuilder::MakeGrpcTraceBinTextMapPropagator() {
return std::make_unique<internal::GrpcTraceBinTextMapPropagator>();
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetChannelScopeFilter(
absl::AnyInvocable<bool(const ChannelScope& /*scope*/) const>
channel_scope_filter) {
impl_->SetChannelScopeFilter(std::move(channel_scope_filter));
return *this;
}
absl::Status OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
return impl_->BuildAndRegisterGlobal();
}
absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
OpenTelemetryPluginBuilder::Build() {
return impl_->Build();
}
} // namespace grpc