| // |
| // |
| // Copyright 2023 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| // |
| |
| #include "src/cpp/ext/otel/otel_plugin.h" |
| |
| #include <grpc/support/port_platform.h> |
| #include <grpcpp/ext/otel_plugin.h> |
| #include <grpcpp/version_info.h> |
| |
| #include <memory> |
| #include <type_traits> |
| #include <utility> |
| #include <variant> |
| |
| #include "opentelemetry/metrics/async_instruments.h" |
| #include "opentelemetry/metrics/meter.h" |
| #include "opentelemetry/metrics/meter_provider.h" |
| #include "opentelemetry/metrics/sync_instruments.h" |
| #include "opentelemetry/nostd/string_view.h" |
| #include "opentelemetry/nostd/variant.h" |
| #include "opentelemetry/trace/context.h" |
| #include "opentelemetry/trace/span_context.h" |
| #include "src/core/client_channel/client_channel_filter.h" |
| #include "src/core/config/core_configuration.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/surface/channel_stack_type.h" |
| #include "src/core/telemetry/call_tracer.h" |
| #include "src/core/telemetry/instrument.h" |
| #include "src/core/util/grpc_check.h" |
| #include "src/core/util/match.h" |
| #include "src/cpp/ext/otel/key_value_iterable.h" |
| #include "src/cpp/ext/otel/otel_client_call_tracer.h" |
| #include "src/cpp/ext/otel/otel_server_call_tracer.h" |
| #include "absl/strings/escaping.h" |
| |
| using opentelemetry::context::propagation::TextMapPropagator; |
| using opentelemetry::trace::SpanContext; |
| using opentelemetry::trace::SpanId; |
| using opentelemetry::trace::TraceId; |
| |
| namespace grpc { |
| namespace internal { |
| |
| namespace { |
| bool IsMetricEnabledByDefault(absl::string_view) { return false; } |
| } // namespace |
| |
| bool IsOpenTelemetryLabelOptional(absl::string_view label_key) { |
| // TODO(ctiller): register other optional labels here with |
| // `if (label_key =="xyz") return true;` checks. |
| return absl::StartsWith(label_key, "test_optional."); |
| } |
| |
| absl::string_view OpenTelemetryMethodKey() { return "grpc.method"; } |
| |
| absl::string_view OpenTelemetryStatusKey() { return "grpc.status"; } |
| |
| absl::string_view OpenTelemetryTargetKey() { return "grpc.target"; } |
| |
| namespace { |
| absl::flat_hash_set<std::string> BaseMetrics() { |
| absl::flat_hash_set<std::string> base_metrics{ |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptStartedInstrumentName), |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptDurationInstrumentName), |
| std::string( |
| grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptSentTotalCompressedMessageSizeInstrumentName), |
| std::string( |
| grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName), |
| std::string( |
| grpc::OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName), |
| std::string( |
| grpc::OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName), |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kServerCallSentTotalCompressedMessageSizeInstrumentName), |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kServerCallRcvdTotalCompressedMessageSizeInstrumentName)}; |
| grpc_core::GlobalInstrumentsRegistry::ForEach( |
| [&](const grpc_core::GlobalInstrumentsRegistry:: |
| GlobalInstrumentDescriptor& descriptor) { |
| if (descriptor.enable_by_default) { |
| base_metrics.emplace(descriptor.name); |
| } |
| }); |
| grpc_core::InstrumentMetadata::ForEachInstrument( |
| [&](const grpc_core::InstrumentMetadata::Description* description) { |
| if (IsMetricEnabledByDefault(description->name)) { |
| base_metrics.emplace(description->name); |
| } |
| }); |
| return base_metrics; |
| } |
| |
| } // namespace |
| |
| class OpenTelemetryPluginImpl::NPCMetricsKeyValueIterable |
| : public opentelemetry::common::KeyValueIterable { |
| public: |
| NPCMetricsKeyValueIterable( |
| absl::Span<const absl::string_view> label_keys, |
| absl::Span<const absl::string_view> label_values, |
| absl::Span<const absl::string_view> optional_label_keys, |
| absl::Span<const absl::string_view> optional_label_values, |
| const OptionalLabelsBitSet& optional_labels_bits) |
| : label_keys_(label_keys), |
| label_values_(label_values), |
| optional_label_keys_(optional_label_keys), |
| optional_label_values_(optional_label_values), |
| optional_labels_bits_(optional_labels_bits) {} |
| |
| bool ForEachKeyValue(opentelemetry::nostd::function_ref< |
| bool(opentelemetry::nostd::string_view, |
| opentelemetry::common::AttributeValue)> |
| callback) const noexcept override { |
| for (size_t i = 0; i < label_keys_.size(); ++i) { |
| if (!callback(AbslStrViewToOpenTelemetryStrView(label_keys_[i]), |
| AbslStrViewToOpenTelemetryStrView(label_values_[i]))) { |
| return false; |
| } |
| } |
| // Since we are saving the optional label values as std::string for callback |
| // gauges, we want to minimize memory usage by filtering out the disabled |
| // optional label values. |
| bool filtered = optional_label_values_.size() < optional_label_keys_.size(); |
| for (size_t i = 0, j = 0; i < optional_label_keys_.size(); ++i) { |
| if (!optional_labels_bits_.test(i)) { |
| if (!filtered) ++j; |
| continue; |
| } |
| if (!callback( |
| AbslStrViewToOpenTelemetryStrView(optional_label_keys_[i]), |
| AbslStrViewToOpenTelemetryStrView(optional_label_values_[j++]))) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| size_t size() const noexcept override { |
| return label_keys_.size() + optional_labels_bits_.count(); |
| } |
| |
| private: |
| absl::Span<const absl::string_view> label_keys_; |
| absl::Span<const absl::string_view> label_values_; |
| absl::Span<const absl::string_view> optional_label_keys_; |
| absl::Span<const absl::string_view> optional_label_values_; |
| const OptionalLabelsBitSet& optional_labels_bits_; |
| }; |
| |
| // |
| // OpenTelemetryPluginBuilderImpl |
| // |
| |
| OpenTelemetryPluginBuilderImpl::OpenTelemetryPluginBuilderImpl() |
| : metrics_(BaseMetrics()) {} |
| |
| OpenTelemetryPluginBuilderImpl::~OpenTelemetryPluginBuilderImpl() = default; |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::SetMeterProvider( |
| std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) { |
| meter_provider_ = std::move(meter_provider); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::EnableMetrics( |
| absl::Span<const absl::string_view> metric_names) { |
| for (const auto& metric_name : metric_names) { |
| metrics_.emplace(metric_name); |
| } |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::DisableMetrics( |
| absl::Span<const absl::string_view> metric_names) { |
| for (const auto& metric_name : metric_names) { |
| metrics_.erase(metric_name); |
| } |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::DisableAllMetrics() { |
| metrics_.clear(); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::SetTargetAttributeFilter( |
| absl::AnyInvocable<bool(absl::string_view /*target*/) const> |
| target_attribute_filter) { |
| target_attribute_filter_ = std::move(target_attribute_filter); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::SetGenericMethodAttributeFilter( |
| absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const> |
| generic_method_attribute_filter) { |
| generic_method_attribute_filter_ = std::move(generic_method_attribute_filter); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::SetServerSelector( |
| absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const> |
| server_selector) { |
| server_selector_ = std::move(server_selector); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddPluginOption( |
| std::unique_ptr<InternalOpenTelemetryPluginOption> option) { |
| // We allow a limit of 64 plugin options to be registered at this time. |
| GRPC_CHECK_LT(plugin_options_.size(), 64u); |
| plugin_options_.push_back(std::move(option)); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::AddOptionalLabel( |
| absl::string_view optional_label_key) { |
| optional_label_keys_.emplace(optional_label_key); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::SetTracerProvider( |
| std::shared_ptr<opentelemetry::trace::TracerProvider> tracer_provider) { |
| tracer_provider_ = std::move(tracer_provider); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::SetTextMapPropagator( |
| std::unique_ptr<TextMapPropagator> text_map_propagator) { |
| text_map_propagator_ = std::move(text_map_propagator); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilderImpl& |
| OpenTelemetryPluginBuilderImpl::SetChannelScopeFilter( |
| absl::AnyInvocable< |
| bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> |
| channel_scope_filter) { |
| channel_scope_filter_ = std::move(channel_scope_filter); |
| return *this; |
| } |
| |
| absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() { |
| if (meter_provider_ == nullptr && tracer_provider_ == nullptr) { |
| return absl::InvalidArgumentError( |
| "Need to configure a valid meter provider or tracer provider."); |
| } |
| grpc_core::GlobalStatsPluginRegistry::RegisterStatsPlugin( |
| std::make_shared<OpenTelemetryPluginImpl>( |
| metrics_, meter_provider_, std::move(target_attribute_filter_), |
| std::move(generic_method_attribute_filter_), |
| std::move(server_selector_), std::move(plugin_options_), |
| std::move(optional_label_keys_), std::move(tracer_provider_), |
| std::move(text_map_propagator_), std::move(channel_scope_filter_))); |
| return absl::OkStatus(); |
| } |
| |
| absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>> |
| OpenTelemetryPluginBuilderImpl::Build() { |
| if (meter_provider_ == nullptr && tracer_provider_ == nullptr) { |
| return absl::InvalidArgumentError( |
| "Need to configure a valid meter provider or tracer provider."); |
| } |
| return std::make_shared<OpenTelemetryPluginImpl>( |
| metrics_, meter_provider_, std::move(target_attribute_filter_), |
| std::move(generic_method_attribute_filter_), std::move(server_selector_), |
| std::move(plugin_options_), std::move(optional_label_keys_), |
| std::move(tracer_provider_), std::move(text_map_propagator_), |
| std::move(channel_scope_filter_)); |
| } |
| |
| OpenTelemetryPluginImpl::CallbackMetricReporter::CallbackMetricReporter( |
| OpenTelemetryPluginImpl* ot_plugin, |
| grpc_core::RegisteredMetricCallback* key) |
| : ot_plugin_(ot_plugin), key_(key) { |
| // Since we are updating the timestamp and updating the cache for all |
| // registered instruments in a RegisteredMetricCallback, we will need to |
| // clear all the cache cells for this RegisteredMetricCallback first, so |
| // that if a particular combination of labels was previously present but |
| // is no longer present, we won't continue to report it. |
| for (const auto& handle : key->metrics()) { |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK( |
| descriptor.instrument_type == |
| grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge); |
| switch (descriptor.value_type) { |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: { |
| auto& callback_gauge_state = |
| std::get<std::unique_ptr<CallbackGaugeState<int64_t>>>( |
| ot_plugin_->instruments_data_.at(handle.index).instrument); |
| callback_gauge_state->caches[key].clear(); |
| break; |
| } |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: { |
| auto& callback_gauge_state = |
| std::get<std::unique_ptr<CallbackGaugeState<double>>>( |
| ot_plugin_->instruments_data_.at(handle.index).instrument); |
| callback_gauge_state->caches[key].clear(); |
| break; |
| } |
| default: |
| grpc_core::Crash(absl::StrFormat( |
| "Unknown or unsupported value type: %d", descriptor.value_type)); |
| } |
| } |
| } |
| |
| void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportInt64( |
| grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle, |
| int64_t value, absl::Span<const absl::string_view> label_values, |
| absl::Span<const absl::string_view> optional_values) { |
| const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index); |
| auto* callback_gauge_state = |
| std::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>( |
| &instrument_data.instrument); |
| GRPC_CHECK_NE(callback_gauge_state, nullptr); |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK(descriptor.label_keys.size() == label_values.size()); |
| GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size()); |
| if ((*callback_gauge_state)->caches.find(key_) == |
| (*callback_gauge_state)->caches.end()) { |
| LOG(ERROR) << "This may occur when the gauge used in AddCallback is " |
| "different from the gauge used in Report. This indicates a " |
| "misuse of the API. The value " |
| << value << " will not be recorded for instrument " |
| << handle.index; |
| return; |
| } |
| auto& cell = (*callback_gauge_state)->caches.at(key_); |
| std::vector<std::string> key; |
| key.reserve(label_values.size() + |
| instrument_data.optional_labels_bits.count()); |
| for (const absl::string_view value : label_values) { |
| key.emplace_back(value); |
| } |
| for (size_t i = 0; i < optional_values.size(); ++i) { |
| if (instrument_data.optional_labels_bits.test(i)) { |
| key.emplace_back(optional_values[i]); |
| } |
| } |
| cell.insert_or_assign(std::move(key), value); |
| } |
| |
| void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportDouble( |
| grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle, |
| double value, absl::Span<const absl::string_view> label_values, |
| absl::Span<const absl::string_view> optional_values) { |
| const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index); |
| auto* callback_gauge_state = |
| std::get_if<std::unique_ptr<CallbackGaugeState<double>>>( |
| &instrument_data.instrument); |
| GRPC_CHECK_NE(callback_gauge_state, nullptr); |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK(descriptor.label_keys.size() == label_values.size()); |
| GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size()); |
| if ((*callback_gauge_state)->caches.find(key_) == |
| (*callback_gauge_state)->caches.end()) { |
| LOG(ERROR) << "This may occur when the gauge used in AddCallback is " |
| "different from the gauge used in Report. This indicates a " |
| "misuse of the API. The value " |
| << value << " will not be recorded for instrument " |
| << handle.index; |
| return; |
| } |
| auto& cell = (*callback_gauge_state)->caches.at(key_); |
| std::vector<std::string> key; |
| key.reserve(label_values.size() + |
| instrument_data.optional_labels_bits.count()); |
| for (const absl::string_view value : label_values) { |
| key.emplace_back(value); |
| } |
| for (size_t i = 0; i < optional_values.size(); ++i) { |
| if (instrument_data.optional_labels_bits.test(i)) { |
| key.emplace_back(optional_values[i]); |
| } |
| } |
| cell.insert_or_assign(std::move(key), value); |
| } |
| |
| void OpenTelemetryPluginImpl::ServerBuilderOption::UpdateArguments( |
| grpc::ChannelArguments* args) { |
| plugin_->AddToChannelArguments(args); |
| } |
| |
| class OpenTelemetryPluginImpl::ExporterCallback { |
| public: |
| virtual ~ExporterCallback() = default; |
| }; |
| |
| template <class Exporter> |
| class OpenTelemetryPluginImpl::ExporterCallbackImpl final |
| : public ExporterCallback { |
| public: |
| template <class... Args> |
| explicit ExporterCallbackImpl( |
| opentelemetry::nostd::shared_ptr< |
| opentelemetry::metrics::ObservableInstrument> |
| instrument, |
| Args&&... args) |
| : exporter_(std::forward<Args>(args)...), |
| instrument_(std::move(instrument)) { |
| instrument_->AddCallback(Callback, this); |
| } |
| |
| ~ExporterCallbackImpl() override { |
| instrument_->RemoveCallback(Callback, this); |
| } |
| |
| private: |
| static void Callback(opentelemetry::metrics::ObserverResult result, |
| void* arg) { |
| static_cast<ExporterCallbackImpl*>(arg)->exporter_.Export( |
| std::move(result)); |
| } |
| |
| Exporter exporter_; |
| const opentelemetry::nostd::shared_ptr< |
| opentelemetry::metrics::ObservableInstrument> |
| instrument_; |
| }; |
| |
| class OpenTelemetryPluginImpl::ExportedMetricKeyValueIterable final |
| : public opentelemetry::common::KeyValueIterable { |
| public: |
| explicit ExportedMetricKeyValueIterable( |
| absl::Span<const std::string> label_keys, |
| absl::Span<const std::string> label_values) |
| : label_keys_(label_keys), label_values_(label_values) { |
| CHECK_EQ(label_keys_.size(), label_values_.size()); |
| } |
| |
| bool ForEachKeyValue(opentelemetry::nostd::function_ref< |
| bool(opentelemetry::nostd::string_view, |
| opentelemetry::common::AttributeValue)> |
| callback) const noexcept override { |
| for (size_t i = 0; i < label_keys_.size(); ++i) { |
| if (!callback(label_keys_[i], |
| opentelemetry::common::AttributeValue(label_values_[i]))) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| size_t size() const noexcept override { return label_values_.size(); } |
| |
| private: |
| const absl::Span<const std::string> label_keys_; |
| const absl::Span<const std::string> label_values_; |
| }; |
| |
| class OpenTelemetryPluginImpl::CounterExporter final { |
| public: |
| explicit CounterExporter(OpenTelemetryPluginImpl* impl, |
| const grpc_core::InstrumentMetadata::Description* md) |
| : impl_(impl), md_(md) {} |
| |
| void Export(opentelemetry::metrics::ObserverResult result) { |
| Sink sink(std::get<opentelemetry::nostd::shared_ptr< |
| opentelemetry::metrics::ObserverResultT<int64_t>>>(result), |
| md_->shape); |
| impl_->QueryMetrics({md_->name}, sink); |
| } |
| |
| private: |
| class Sink final : public grpc_core::MetricsSink { |
| public: |
| explicit Sink(opentelemetry::nostd::shared_ptr< |
| opentelemetry::metrics::ObserverResultT<int64_t>> |
| observer, |
| grpc_core::InstrumentMetadata::Shape shape) |
| : observer_(std::move(observer)), shape_(shape) {} |
| void Counter(grpc_core::InstrumentLabelList label_keys, |
| absl::Span<const std::string> label_values, absl::string_view, |
| uint64_t value) override { |
| GRPC_DCHECK( |
| std::holds_alternative<grpc_core::InstrumentMetadata::CounterShape>( |
| shape_)); |
| std::vector<std::string> label_key_strings; |
| label_key_strings.reserve(label_keys.size()); |
| for (const auto& label : label_keys) { |
| label_key_strings.push_back(std::string(label.label())); |
| } |
| LOG(ERROR) << "Counter: " << value |
| << " label_keys: " << absl::StrJoin(label_key_strings, ",") |
| << " label_values: " << absl::StrJoin(label_values, ","); |
| ExportedMetricKeyValueIterable labels_iterable(label_key_strings, |
| label_values); |
| observer_->Observe(value, labels_iterable); |
| } |
| void UpDownCounter(grpc_core::InstrumentLabelList label_keys, |
| absl::Span<const std::string> label_values, |
| absl::string_view, uint64_t value) override { |
| GRPC_DCHECK(std::holds_alternative< |
| grpc_core::InstrumentMetadata::UpDownCounterShape>(shape_)); |
| std::vector<std::string> label_key_strings; |
| label_key_strings.reserve(label_keys.size()); |
| for (const auto& label : label_keys) { |
| label_key_strings.push_back(std::string(label.label())); |
| } |
| LOG(ERROR) << "UpDownCounter: " << value |
| << " label_keys: " << absl::StrJoin(label_key_strings, ",") |
| << " label_values: " << absl::StrJoin(label_values, ","); |
| ExportedMetricKeyValueIterable labels_iterable(label_key_strings, |
| label_values); |
| observer_->Observe(value, labels_iterable); |
| } |
| void Histogram(grpc_core::InstrumentLabelList, |
| absl::Span<const std::string>, absl::string_view, |
| grpc_core::HistogramBuckets, |
| absl::Span<const uint64_t>) override { |
| LOG(FATAL) << "Expected a counter, got a histogram"; |
| } |
| void DoubleGauge(grpc_core::InstrumentLabelList, |
| absl::Span<const std::string>, absl::string_view, |
| double) override { |
| LOG(FATAL) << "Expected a counter, got a double gauge"; |
| } |
| void IntGauge(grpc_core::InstrumentLabelList, absl::Span<const std::string>, |
| absl::string_view, int64_t) override { |
| LOG(FATAL) << "Expected a counter, got an int gauge"; |
| } |
| void UintGauge(grpc_core::InstrumentLabelList, |
| absl::Span<const std::string>, absl::string_view, |
| uint64_t) override { |
| LOG(FATAL) << "Expected a counter, got a uint gauge"; |
| } |
| |
| private: |
| const opentelemetry::nostd::shared_ptr< |
| opentelemetry::metrics::ObserverResultT<int64_t>> |
| observer_; |
| const grpc_core::InstrumentMetadata::Shape shape_; |
| }; |
| |
| OpenTelemetryPluginImpl* const impl_; |
| const grpc_core::InstrumentMetadata::Description* const md_; |
| }; |
| |
| void OpenTelemetryPluginImpl::QueryMetrics( |
| absl::Span<const absl::string_view> metrics, grpc_core::MetricsSink& sink) { |
| grpc_core::MetricsQuery() |
| .OnlyMetrics(std::vector<std::string>(metrics.begin(), metrics.end())) |
| .Run(collection_scope_, sink); |
| } |
| |
| namespace { |
| opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> MaybeMakeTracer( |
| opentelemetry::trace::TracerProvider* tracer_provider) { |
| if (tracer_provider == nullptr) { |
| return opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer>(); |
| } |
| return tracer_provider->GetTracer("grpc-c++", GRPC_CPP_VERSION_STRING); |
| } |
| } // namespace |
| |
| OpenTelemetryPluginImpl::OpenTelemetryPluginImpl( |
| const absl::flat_hash_set<std::string>& metrics, |
| opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider> |
| meter_provider, |
| absl::AnyInvocable<bool(absl::string_view /*target*/) const> |
| target_attribute_filter, |
| absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const> |
| generic_method_attribute_filter, |
| absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const> |
| server_selector, |
| std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>> |
| plugin_options, |
| const std::set<absl::string_view>& optional_label_keys, |
| std::shared_ptr<opentelemetry::trace::TracerProvider> tracer_provider, |
| std::unique_ptr<TextMapPropagator> text_map_propagator, |
| absl::AnyInvocable< |
| bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> |
| channel_scope_filter) |
| : meter_provider_(std::move(meter_provider)), |
| server_selector_(std::move(server_selector)), |
| target_attribute_filter_(std::move(target_attribute_filter)), |
| generic_method_attribute_filter_( |
| std::move(generic_method_attribute_filter)), |
| plugin_options_(std::move(plugin_options)), |
| tracer_provider_(std::move(tracer_provider)), |
| tracer_(MaybeMakeTracer(tracer_provider_.get())), |
| text_map_propagator_(std::move(text_map_propagator)), |
| channel_scope_filter_(std::move(channel_scope_filter)) { |
| if (meter_provider_ != nullptr) { |
| auto meter = meter_provider_->GetMeter("grpc-c++", GRPC_CPP_VERSION_STRING); |
| // Per-call metrics. |
| if (metrics.contains(grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptStartedInstrumentName)) { |
| client_.attempt.started = meter->CreateUInt64Counter( |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptStartedInstrumentName), |
| "Number of client call attempts started", "{attempt}"); |
| } |
| if (metrics.contains(grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptDurationInstrumentName)) { |
| client_.attempt.duration = meter->CreateDoubleHistogram( |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptDurationInstrumentName), |
| "End-to-end time taken to complete a client call attempt", "s"); |
| } |
| if (metrics.contains( |
| grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptSentTotalCompressedMessageSizeInstrumentName)) { |
| client_.attempt |
| .sent_total_compressed_message_size = meter->CreateUInt64Histogram( |
| std::string( |
| grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptSentTotalCompressedMessageSizeInstrumentName), |
| "Compressed message bytes sent per client call attempt", "By"); |
| } |
| if (metrics.contains( |
| grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName)) { |
| client_.attempt |
| .rcvd_total_compressed_message_size = meter->CreateUInt64Histogram( |
| std::string( |
| grpc::OpenTelemetryPluginBuilder:: |
| kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName), |
| "Compressed message bytes received per call attempt", "By"); |
| } |
| if (metrics.contains(grpc::OpenTelemetryPluginBuilder:: |
| kServerCallStartedInstrumentName)) { |
| server_.call.started = meter->CreateUInt64Counter( |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kServerCallStartedInstrumentName), |
| "Number of server calls started", "{call}"); |
| } |
| if (metrics.contains(grpc::OpenTelemetryPluginBuilder:: |
| kServerCallDurationInstrumentName)) { |
| server_.call.duration = meter->CreateDoubleHistogram( |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kServerCallDurationInstrumentName), |
| "End-to-end time taken to complete a call from server transport's " |
| "perspective", |
| "s"); |
| } |
| if (metrics.contains( |
| grpc::OpenTelemetryPluginBuilder:: |
| kServerCallSentTotalCompressedMessageSizeInstrumentName)) { |
| server_.call.sent_total_compressed_message_size = |
| meter->CreateUInt64Histogram( |
| std::string( |
| grpc::OpenTelemetryPluginBuilder:: |
| kServerCallSentTotalCompressedMessageSizeInstrumentName), |
| "Compressed message bytes sent per server call", "By"); |
| } |
| if (metrics.contains( |
| grpc::OpenTelemetryPluginBuilder:: |
| kServerCallRcvdTotalCompressedMessageSizeInstrumentName)) { |
| server_.call.rcvd_total_compressed_message_size = |
| meter->CreateUInt64Histogram( |
| std::string( |
| grpc::OpenTelemetryPluginBuilder:: |
| kServerCallRcvdTotalCompressedMessageSizeInstrumentName), |
| "Compressed message bytes received per server call", "By"); |
| } |
| if (metrics.contains(grpc::OpenTelemetryPluginBuilder:: |
| kClientCallRetriesInstrumentName)) { |
| client_.call.retries = meter->CreateUInt64Histogram( |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kClientCallRetriesInstrumentName), |
| "EXPERIMENTAL: Number of retries during the client call. If there " |
| "were no retries, 0 is not reported.", |
| "{retry}"); |
| } |
| if (metrics.contains(grpc::OpenTelemetryPluginBuilder:: |
| kClientCallTransparentRetriesInstrumentName)) { |
| client_.call.transparent_retries = meter->CreateUInt64Histogram( |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kClientCallTransparentRetriesInstrumentName), |
| "EXPERIMENTAL: Number of transparent retries during the client call. " |
| "If there were no transparent retries, 0 is not reported.", |
| "{transparent_retry}"); |
| } |
| if (metrics.contains(grpc::OpenTelemetryPluginBuilder:: |
| kClientCallRetryDelayInstrumentName)) { |
| client_.call.retry_delay = meter->CreateDoubleHistogram( |
| std::string(grpc::OpenTelemetryPluginBuilder:: |
| kClientCallRetryDelayInstrumentName), |
| "EXPERIMENTAL: Total time of delay while there is no active attempt " |
| "during the client call", |
| "s"); |
| } |
| // Store optional label keys for per call metrics |
| GRPC_CHECK(static_cast<size_t>( |
| grpc_core::ClientCallTracerInterface::CallAttemptTracer:: |
| OptionalLabelKey::kSize) <= kOptionalLabelsSizeLimit); |
| for (const auto& key : optional_label_keys) { |
| auto optional_key = OptionalLabelStringToKey(key); |
| if (optional_key.has_value()) { |
| per_call_optional_label_bits_.set( |
| static_cast<size_t>(optional_key.value())); |
| } |
| } |
| if (grpc_core::IsOtelExportTelemetryDomainsEnabled()) { |
| // gRPC metrics. |
| absl::flat_hash_set<std::string> labels; |
| grpc_core::InstrumentMetadata::ForEachInstrument( |
| [&](const grpc_core::InstrumentMetadata::Description* description) { |
| if (!metrics.contains(description->name)) return; |
| for (const auto& label : description->domain->label_names()) { |
| if (!internal::IsOpenTelemetryLabelOptional(label.label()) || |
| optional_label_keys.find(label.label()) != |
| optional_label_keys.end()) { |
| labels.insert(std::string(label.label())); |
| } |
| } |
| grpc_core::Match( |
| description->shape, |
| [&](grpc_core::InstrumentMetadata::CounterShape) { |
| auto instrument = meter->CreateInt64ObservableCounter( |
| std::string(description->name), |
| std::string(description->description), |
| std::string(description->unit)); |
| exporter_callbacks_.push_back( |
| std::make_unique<ExporterCallbackImpl<CounterExporter>>( |
| instrument, this, description)); |
| }, |
| [&](grpc_core::InstrumentMetadata::UpDownCounterShape) { |
| auto instrument = meter->CreateInt64ObservableUpDownCounter( |
| std::string(description->name), |
| std::string(description->description), |
| std::string(description->unit)); |
| exporter_callbacks_.push_back( |
| std::make_unique<ExporterCallbackImpl<CounterExporter>>( |
| instrument, this, description)); |
| }, |
| [&](grpc_core::InstrumentMetadata::DoubleGaugeShape) { |
| LOG(FATAL) << "Double gauge shape is not supported yet"; |
| }, |
| [&](grpc_core::InstrumentMetadata::IntGaugeShape) { |
| LOG(FATAL) << "Int gauge shape is not supported yet"; |
| }, |
| [&](grpc_core::InstrumentMetadata::UintGaugeShape) { |
| LOG(FATAL) << "Uint gauge shape is not supported yet"; |
| }, |
| [&](grpc_core::InstrumentMetadata::HistogramShape) { |
| LOG(FATAL) << "Histogram shape is not supported yet"; |
| }); |
| }); |
| grpc_core::InstrumentLabelSet label_set; |
| for (const auto& label : labels) { |
| label_set.Set(grpc_core::InstrumentLabel(label)); |
| } |
| collection_scope_ = grpc_core::CreateCollectionScope({}, label_set); |
| } |
| // Non-per-call metrics. |
| grpc_core::GlobalInstrumentsRegistry::ForEach( |
| [&, this](const grpc_core::GlobalInstrumentsRegistry:: |
| GlobalInstrumentDescriptor& descriptor) { |
| GRPC_CHECK(descriptor.optional_label_keys.size() <= |
| kOptionalLabelsSizeLimit); |
| if (instruments_data_.size() < descriptor.index + 1) { |
| instruments_data_.resize(descriptor.index + 1); |
| } |
| if (!metrics.contains(descriptor.name)) { |
| return; |
| } |
| switch (descriptor.instrument_type) { |
| case grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter: |
| switch (descriptor.value_type) { |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64: |
| instruments_data_[descriptor.index].instrument = |
| meter->CreateUInt64Counter( |
| std::string(descriptor.name), |
| std::string(descriptor.description), |
| std::string(descriptor.unit)); |
| break; |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: |
| instruments_data_[descriptor.index].instrument = |
| meter->CreateDoubleCounter( |
| std::string(descriptor.name), |
| std::string(descriptor.description), |
| std::string(descriptor.unit)); |
| break; |
| default: |
| grpc_core::Crash( |
| absl::StrFormat("Unknown or unsupported value type: %d", |
| descriptor.value_type)); |
| } |
| break; |
| case grpc_core::GlobalInstrumentsRegistry::InstrumentType:: |
| kHistogram: |
| switch (descriptor.value_type) { |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64: |
| instruments_data_[descriptor.index].instrument = |
| meter->CreateUInt64Histogram( |
| std::string(descriptor.name), |
| std::string(descriptor.description), |
| std::string(descriptor.unit)); |
| break; |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: |
| instruments_data_[descriptor.index].instrument = |
| meter->CreateDoubleHistogram( |
| std::string(descriptor.name), |
| std::string(descriptor.description), |
| std::string(descriptor.unit)); |
| break; |
| default: |
| grpc_core::Crash( |
| absl::StrFormat("Unknown or unsupported value type: %d", |
| descriptor.value_type)); |
| } |
| break; |
| case grpc_core::GlobalInstrumentsRegistry::InstrumentType:: |
| kCallbackGauge: |
| switch (descriptor.value_type) { |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: { |
| auto observable_state = |
| std::make_unique<CallbackGaugeState<int64_t>>(); |
| observable_state->id = descriptor.index; |
| observable_state->ot_plugin = this; |
| observable_state->instrument = |
| meter->CreateInt64ObservableGauge( |
| std::string(descriptor.name), |
| std::string(descriptor.description), |
| std::string(descriptor.unit)); |
| instruments_data_[descriptor.index].instrument = |
| std::move(observable_state); |
| break; |
| } |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: { |
| auto observable_state = |
| std::make_unique<CallbackGaugeState<double>>(); |
| observable_state->id = descriptor.index; |
| observable_state->ot_plugin = this; |
| observable_state->instrument = |
| meter->CreateDoubleObservableGauge( |
| std::string(descriptor.name), |
| std::string(descriptor.description), |
| std::string(descriptor.unit)); |
| instruments_data_[descriptor.index].instrument = |
| std::move(observable_state); |
| break; |
| } |
| default: |
| grpc_core::Crash( |
| absl::StrFormat("Unknown or unsupported value type: %d", |
| descriptor.value_type)); |
| } |
| break; |
| default: |
| grpc_core::Crash(absl::StrFormat("Unknown instrument_type: %d", |
| descriptor.instrument_type)); |
| } |
| for (size_t i = 0; i < descriptor.optional_label_keys.size(); ++i) { |
| if (optional_label_keys.find(descriptor.optional_label_keys[i]) != |
| optional_label_keys.end()) { |
| instruments_data_[descriptor.index].optional_labels_bits.set(i); |
| } |
| } |
| }); |
| } |
| } |
| |
| OpenTelemetryPluginImpl::~OpenTelemetryPluginImpl() { |
| for (const auto& instrument_data : instruments_data_) { |
| grpc_core::Match( |
| instrument_data.instrument, [](const Disabled&) {}, |
| [](const std::unique_ptr<opentelemetry::metrics::Counter<double>>&) {}, |
| [](const std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>&) { |
| }, |
| [](const std::unique_ptr< |
| opentelemetry::metrics::Histogram<uint64_t>>&) {}, |
| [](const std::unique_ptr<opentelemetry::metrics::Histogram<double>>&) { |
| }, |
| [](const std::unique_ptr<CallbackGaugeState<int64_t>>& state) { |
| GRPC_CHECK(state->caches.empty()); |
| if (state->ot_callback_registered) { |
| state->instrument->RemoveCallback( |
| &CallbackGaugeState<int64_t>::CallbackGaugeCallback, |
| state.get()); |
| state->ot_callback_registered = false; |
| } |
| }, |
| [](const std::unique_ptr<CallbackGaugeState<double>>& state) { |
| GRPC_CHECK(state->caches.empty()); |
| if (state->ot_callback_registered) { |
| state->instrument->RemoveCallback( |
| &CallbackGaugeState<double>::CallbackGaugeCallback, |
| state.get()); |
| state->ot_callback_registered = false; |
| } |
| }); |
| } |
| } |
| |
| namespace { |
| constexpr absl::string_view kLocality = "grpc.lb.locality"; |
| constexpr absl::string_view kBackendService = "grpc.lb.backend_service"; |
| } // namespace |
| |
| absl::string_view OpenTelemetryPluginImpl::OptionalLabelKeyToString( |
| grpc_core::ClientCallTracerInterface::CallAttemptTracer::OptionalLabelKey |
| key) { |
| switch (key) { |
| case grpc_core::ClientCallTracerInterface::CallAttemptTracer:: |
| OptionalLabelKey::kLocality: |
| return kLocality; |
| case grpc_core::ClientCallTracerInterface::CallAttemptTracer:: |
| OptionalLabelKey::kBackendService: |
| return kBackendService; |
| default: |
| grpc_core::Crash("Illegal OptionalLabelKey index"); |
| } |
| } |
| |
| std::optional< |
| grpc_core::ClientCallTracerInterface::CallAttemptTracer::OptionalLabelKey> |
| OpenTelemetryPluginImpl::OptionalLabelStringToKey(absl::string_view key) { |
| if (key == kLocality) { |
| return grpc_core::ClientCallTracerInterface::CallAttemptTracer:: |
| OptionalLabelKey::kLocality; |
| } else if (key == kBackendService) { |
| return grpc_core::ClientCallTracerInterface::CallAttemptTracer:: |
| OptionalLabelKey::kBackendService; |
| } |
| return std::nullopt; |
| } |
| |
| absl::string_view OpenTelemetryPluginImpl::GetMethodFromPath( |
| const grpc_core::Slice& path) { |
| // Check for leading '/' and trim it if present. |
| return absl::StripPrefix(path.as_string_view(), "/"); |
| } |
| |
| std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>> |
| OpenTelemetryPluginImpl::IsEnabledForChannel( |
| const OpenTelemetryPluginBuilder::ChannelScope& scope) const { |
| if (channel_scope_filter_ == nullptr || channel_scope_filter_(scope)) { |
| return {true, std::make_shared<ClientScopeConfig>(this, scope)}; |
| } |
| return {false, nullptr}; |
| } |
| |
| std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>> |
| OpenTelemetryPluginImpl::IsEnabledForServer( |
| const grpc_core::ChannelArgs& args) const { |
| // Return true only if there is no server selector registered or if the |
| // server selector returns true. |
| if (server_selector_ == nullptr || server_selector_(args)) { |
| return {true, std::make_shared<ServerScopeConfig>(this, args)}; |
| } |
| return {false, nullptr}; |
| } |
| |
| std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> |
| OpenTelemetryPluginImpl::GetChannelScopeConfig( |
| const OpenTelemetryPluginBuilder::ChannelScope& scope) const { |
| GRPC_CHECK(channel_scope_filter_ == nullptr || channel_scope_filter_(scope)); |
| return std::make_shared<ClientScopeConfig>(this, scope); |
| } |
| |
| std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> |
| OpenTelemetryPluginImpl::GetServerScopeConfig( |
| const grpc_core::ChannelArgs& args) const { |
| GRPC_CHECK(server_selector_ == nullptr || server_selector_(args)); |
| return std::make_shared<ServerScopeConfig>(this, args); |
| } |
| |
| void OpenTelemetryPluginImpl::AddCounter( |
| grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle, |
| uint64_t value, absl::Span<const absl::string_view> label_values, |
| absl::Span<const absl::string_view> optional_values) { |
| if (meter_provider_ == nullptr) return; |
| const auto& instrument_data = instruments_data_.at(handle.index); |
| if (std::holds_alternative<Disabled>(instrument_data.instrument)) { |
| // This instrument is disabled. |
| return; |
| } |
| GRPC_CHECK(std::holds_alternative< |
| std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>( |
| instrument_data.instrument)); |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK(descriptor.label_keys.size() == label_values.size()); |
| GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size()); |
| if (label_values.empty() && optional_values.empty()) { |
| std::get<std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>( |
| instrument_data.instrument) |
| ->Add(value); |
| } else { |
| std::get<std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>>( |
| instrument_data.instrument) |
| ->Add(value, NPCMetricsKeyValueIterable( |
| descriptor.label_keys, label_values, |
| descriptor.optional_label_keys, optional_values, |
| instrument_data.optional_labels_bits)); |
| } |
| } |
| |
| void OpenTelemetryPluginImpl::AddCounter( |
| grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle, |
| double value, absl::Span<const absl::string_view> label_values, |
| absl::Span<const absl::string_view> optional_values) { |
| if (meter_provider_ == nullptr) return; |
| const auto& instrument_data = instruments_data_.at(handle.index); |
| if (std::holds_alternative<Disabled>(instrument_data.instrument)) { |
| // This instrument is disabled. |
| return; |
| } |
| GRPC_CHECK(std::holds_alternative< |
| std::unique_ptr<opentelemetry::metrics::Counter<double>>>( |
| instrument_data.instrument)); |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK(descriptor.label_keys.size() == label_values.size()); |
| GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size()); |
| if (label_values.empty() && optional_values.empty()) { |
| std::get<std::unique_ptr<opentelemetry::metrics::Counter<double>>>( |
| instrument_data.instrument) |
| ->Add(value); |
| } else { |
| std::get<std::unique_ptr<opentelemetry::metrics::Counter<double>>>( |
| instrument_data.instrument) |
| ->Add(value, NPCMetricsKeyValueIterable( |
| descriptor.label_keys, label_values, |
| descriptor.optional_label_keys, optional_values, |
| instrument_data.optional_labels_bits)); |
| } |
| } |
| |
| void OpenTelemetryPluginImpl::RecordHistogram( |
| grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle, |
| uint64_t value, absl::Span<const absl::string_view> label_values, |
| absl::Span<const absl::string_view> optional_values) { |
| if (meter_provider_ == nullptr) return; |
| const auto& instrument_data = instruments_data_.at(handle.index); |
| if (std::holds_alternative<Disabled>(instrument_data.instrument)) { |
| // This instrument is disabled. |
| return; |
| } |
| GRPC_CHECK(std::holds_alternative< |
| std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>( |
| instrument_data.instrument)); |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK(descriptor.label_keys.size() == label_values.size()); |
| GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size()); |
| if (label_values.empty() && optional_values.empty()) { |
| std::get<std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>( |
| instrument_data.instrument) |
| ->Record(value, opentelemetry::context::Context{}); |
| } else { |
| std::get<std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>>( |
| instrument_data.instrument) |
| ->Record(value, |
| NPCMetricsKeyValueIterable( |
| descriptor.label_keys, label_values, |
| descriptor.optional_label_keys, optional_values, |
| instrument_data.optional_labels_bits), |
| opentelemetry::context::Context{}); |
| } |
| } |
| |
| void OpenTelemetryPluginImpl::RecordHistogram( |
| grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle, |
| double value, absl::Span<const absl::string_view> label_values, |
| absl::Span<const absl::string_view> optional_values) { |
| if (meter_provider_ == nullptr) return; |
| const auto& instrument_data = instruments_data_.at(handle.index); |
| if (std::holds_alternative<Disabled>(instrument_data.instrument)) { |
| // This instrument is disabled. |
| return; |
| } |
| GRPC_CHECK(std::holds_alternative< |
| std::unique_ptr<opentelemetry::metrics::Histogram<double>>>( |
| instrument_data.instrument)); |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK(descriptor.label_keys.size() == label_values.size()); |
| GRPC_CHECK(descriptor.optional_label_keys.size() == optional_values.size()); |
| if (label_values.empty() && optional_values.empty()) { |
| std::get<std::unique_ptr<opentelemetry::metrics::Histogram<double>>>( |
| instrument_data.instrument) |
| ->Record(value, opentelemetry::context::Context{}); |
| } else { |
| std::get<std::unique_ptr<opentelemetry::metrics::Histogram<double>>>( |
| instrument_data.instrument) |
| ->Record(value, |
| NPCMetricsKeyValueIterable( |
| descriptor.label_keys, label_values, |
| descriptor.optional_label_keys, optional_values, |
| instrument_data.optional_labels_bits), |
| opentelemetry::context::Context{}); |
| } |
| } |
| |
| void OpenTelemetryPluginImpl::AddCallback( |
| grpc_core::RegisteredMetricCallback* callback) { |
| if (meter_provider_ == nullptr) return; |
| std::vector< |
| std::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>> |
| gauges_that_need_to_add_callback; |
| { |
| grpc_core::MutexLock lock(&mu_); |
| callback_timestamps_.emplace(callback, grpc_core::Timestamp::InfPast()); |
| for (const auto& handle : callback->metrics()) { |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK( |
| descriptor.instrument_type == |
| grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge); |
| switch (descriptor.value_type) { |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: { |
| const auto& instrument_data = instruments_data_.at(handle.index); |
| if (std::holds_alternative<Disabled>(instrument_data.instrument)) { |
| // This instrument is disabled. |
| continue; |
| } |
| auto* callback_gauge_state = |
| std::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>( |
| &instrument_data.instrument); |
| GRPC_CHECK_NE(callback_gauge_state, nullptr); |
| (*callback_gauge_state) |
| ->caches.emplace(callback, CallbackGaugeState<int64_t>::Cache{}); |
| if (!std::exchange((*callback_gauge_state)->ot_callback_registered, |
| true)) { |
| gauges_that_need_to_add_callback.push_back( |
| callback_gauge_state->get()); |
| } |
| break; |
| } |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: { |
| const auto& instrument_data = instruments_data_.at(handle.index); |
| if (std::holds_alternative<Disabled>(instrument_data.instrument)) { |
| // This instrument is disabled. |
| continue; |
| } |
| auto* callback_gauge_state = |
| std::get_if<std::unique_ptr<CallbackGaugeState<double>>>( |
| &instrument_data.instrument); |
| GRPC_CHECK_NE(callback_gauge_state, nullptr); |
| (*callback_gauge_state) |
| ->caches.emplace(callback, CallbackGaugeState<double>::Cache{}); |
| if (!std::exchange((*callback_gauge_state)->ot_callback_registered, |
| true)) { |
| gauges_that_need_to_add_callback.push_back( |
| callback_gauge_state->get()); |
| } |
| break; |
| } |
| default: |
| grpc_core::Crash(absl::StrFormat( |
| "Unknown or unsupported value type: %d", descriptor.value_type)); |
| } |
| } |
| } |
| // AddCallback internally grabs OpenTelemetry's observable_registry's |
| // lock. So we need to call it without our plugin lock otherwise we may |
| // deadlock. |
| for (const auto& gauge : gauges_that_need_to_add_callback) { |
| grpc_core::Match( |
| gauge, |
| [](CallbackGaugeState<int64_t>* gauge) { |
| gauge->instrument->AddCallback( |
| &CallbackGaugeState<int64_t>::CallbackGaugeCallback, gauge); |
| }, |
| [](CallbackGaugeState<double>* gauge) { |
| gauge->instrument->AddCallback( |
| &CallbackGaugeState<double>::CallbackGaugeCallback, gauge); |
| }); |
| } |
| } |
| |
| void OpenTelemetryPluginImpl::RemoveCallback( |
| grpc_core::RegisteredMetricCallback* callback) { |
| if (meter_provider_ == nullptr) return; |
| { |
| grpc_core::MutexLock lock(&mu_); |
| callback_timestamps_.erase(callback); |
| for (const auto& handle : callback->metrics()) { |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); |
| GRPC_CHECK( |
| descriptor.instrument_type == |
| grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge); |
| switch (descriptor.value_type) { |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: { |
| const auto& instrument_data = instruments_data_.at(handle.index); |
| if (std::holds_alternative<Disabled>(instrument_data.instrument)) { |
| // This instrument is disabled. |
| continue; |
| } |
| auto* callback_gauge_state = |
| std::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>( |
| &instrument_data.instrument); |
| GRPC_CHECK_NE(callback_gauge_state, nullptr); |
| GRPC_CHECK((*callback_gauge_state)->ot_callback_registered); |
| GRPC_CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u); |
| break; |
| } |
| case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: { |
| const auto& instrument_data = instruments_data_.at(handle.index); |
| if (std::holds_alternative<Disabled>(instrument_data.instrument)) { |
| // This instrument is disabled. |
| continue; |
| } |
| auto* callback_gauge_state = |
| std::get_if<std::unique_ptr<CallbackGaugeState<double>>>( |
| &instrument_data.instrument); |
| GRPC_CHECK_NE(callback_gauge_state, nullptr); |
| GRPC_CHECK((*callback_gauge_state)->ot_callback_registered); |
| GRPC_CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u); |
| break; |
| } |
| default: |
| grpc_core::Crash(absl::StrFormat( |
| "Unknown or unsupported value type: %d", descriptor.value_type)); |
| } |
| } |
| } |
| // Note that we are not removing the callback from OpenTelemetry immediately, |
| // and instead remove it when the plugin is destroyed. We just have a single |
| // callback per OpenTelemetry instrument which is a small number. If we decide |
| // to remove the callback immediately at this point, we need to make sure that |
| // 1) the callback is removed without holding mu_ and 2) we make sure that |
| // this does not race against a possible `AddCallback` operation. A potential |
| // way to do this is to use WorkSerializer. |
| } |
| |
| template <typename ValueType> |
| void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::Observe( |
| opentelemetry::metrics::ObserverResult& result, const Cache& cache) { |
| const auto& descriptor = |
| grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor({id}); |
| for (const auto& pair : cache) { |
| GRPC_CHECK(pair.first.size() <= (descriptor.label_keys.size() + |
| descriptor.optional_label_keys.size())); |
| if (descriptor.label_keys.empty() && |
| descriptor.optional_label_keys.empty()) { |
| opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr< |
| opentelemetry::metrics::ObserverResultT<ValueType>>>(result) |
| ->Observe(pair.second); |
| } else { |
| auto& instrument_data = ot_plugin->instruments_data_.at(id); |
| opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr< |
| opentelemetry::metrics::ObserverResultT<ValueType>>>(result) |
| ->Observe(pair.second, |
| NPCMetricsKeyValueIterable( |
| descriptor.label_keys, |
| absl::FixedArray<absl::string_view>( |
| pair.first.begin(), |
| pair.first.begin() + descriptor.label_keys.size()), |
| descriptor.optional_label_keys, |
| absl::FixedArray<absl::string_view>( |
| pair.first.begin() + descriptor.label_keys.size(), |
| pair.first.end()), |
| instrument_data.optional_labels_bits)); |
| } |
| } |
| } |
| |
| // OpenTelemetry calls our callback with its observable_registry's lock |
| // held. |
| template <typename ValueType> |
| void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>:: |
| CallbackGaugeCallback(opentelemetry::metrics::ObserverResult result, |
| void* arg) { |
| auto* callback_gauge_state = static_cast<CallbackGaugeState<ValueType>*>(arg); |
| auto now = grpc_core::Timestamp::Now(); |
| grpc_core::MutexLock plugin_lock(&callback_gauge_state->ot_plugin->mu_); |
| for (auto& elem : callback_gauge_state->caches) { |
| auto* registered_metric_callback = elem.first; |
| auto iter = callback_gauge_state->ot_plugin->callback_timestamps_.find( |
| registered_metric_callback); |
| GRPC_CHECK(iter != |
| callback_gauge_state->ot_plugin->callback_timestamps_.end()); |
| if (now - iter->second < registered_metric_callback->min_interval()) { |
| // Use cached value. |
| callback_gauge_state->Observe(result, elem.second); |
| continue; |
| } |
| // Otherwise update and use the cache. |
| iter->second = now; |
| CallbackMetricReporter reporter(callback_gauge_state->ot_plugin, |
| registered_metric_callback); |
| registered_metric_callback->Run(reporter); |
| callback_gauge_state->Observe(result, elem.second); |
| } |
| } |
| |
| grpc_core::ClientCallTracerInterface* |
| OpenTelemetryPluginImpl::GetClientCallTracer( |
| const grpc_core::Slice& path, bool registered_method, |
| std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) { |
| return grpc_core::GetContext<grpc_core::Arena>() |
| ->ManagedNew<ClientCallTracerInterface>( |
| path, grpc_core::GetContext<grpc_core::Arena>(), registered_method, |
| this, |
| std::static_pointer_cast<OpenTelemetryPluginImpl::ClientScopeConfig>( |
| scope_config)); |
| } |
| |
| grpc_core::ServerCallTracerInterface* |
| OpenTelemetryPluginImpl::GetServerCallTracer( |
| std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) { |
| auto arena = grpc_core::GetContext<grpc_core::Arena>(); |
| return arena |
| ->MakeRefCounted<ServerCallTracerInterface>( |
| this, arena, |
| std::static_pointer_cast<OpenTelemetryPluginImpl::ServerScopeConfig>( |
| scope_config)) |
| .release(); |
| } |
| |
| bool OpenTelemetryPluginImpl::IsInstrumentEnabled( |
| grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle) const { |
| return !std::holds_alternative<Disabled>( |
| instruments_data_.at(handle.index).instrument); |
| } |
| |
| void OpenTelemetryPluginImpl::AddToChannelArguments( |
| grpc::ChannelArguments* args) { |
| const grpc_channel_args c_args = args->c_channel_args(); |
| auto* stats_plugin_list = grpc_channel_args_find_pointer< |
| std::shared_ptr<std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>>( |
| &c_args, GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS); |
| if (stats_plugin_list != nullptr) { |
| (*stats_plugin_list)->emplace_back(shared_from_this()); |
| } else { |
| auto stats_plugin_list = std::make_shared< |
| std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>(); |
| args->SetPointerWithVtable( |
| GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS, &stats_plugin_list, |
| grpc_core::ChannelArgTypeTraits<decltype(stats_plugin_list)>::VTable()); |
| stats_plugin_list->emplace_back(shared_from_this()); |
| } |
| } |
| |
| void OpenTelemetryPluginImpl::AddToServerBuilder(grpc::ServerBuilder* builder) { |
| builder->SetOption(std::make_unique<ServerBuilderOption>(shared_from_this())); |
| } |
| |
| class GrpcTraceBinTextMapPropagator : public TextMapPropagator { |
| public: |
| opentelemetry::context::Context Extract( |
| const opentelemetry::context::propagation::TextMapCarrier& carrier, |
| opentelemetry::context::Context& context) noexcept override { |
| opentelemetry::nostd::string_view grpc_trace_bin_val = |
| carrier.Get("grpc-trace-bin"); |
| std::string base64_unescaped_val; |
| absl::Base64Unescape(NoStdStringViewToAbslStringView(grpc_trace_bin_val), |
| &base64_unescaped_val); |
| return opentelemetry::trace::SetSpan( |
| context, |
| std::shared_ptr<opentelemetry::trace::Span>( |
| new (std::nothrow) opentelemetry::trace::DefaultSpan( |
| GrpcTraceBinHeaderToSpanContext(base64_unescaped_val)))); |
| } |
| |
| void Inject( |
| opentelemetry::context::propagation::TextMapCarrier& carrier, |
| const opentelemetry::context::Context& context) noexcept override { |
| auto span_context = opentelemetry::trace::GetSpan(context)->GetContext(); |
| if (!span_context.IsValid()) { |
| return; |
| } |
| carrier.Set("grpc-trace-bin", |
| absl::Base64Escape(absl::string_view( |
| reinterpret_cast<char*>( |
| SpanContextToGrpcTraceBinHeader(span_context).data()), |
| kGrpcTraceBinHeaderLen))); |
| } |
| |
| bool Fields(opentelemetry::nostd::function_ref< |
| bool(opentelemetry::nostd::string_view)> |
| callback) const noexcept override { |
| return callback("grpc-trace-bin"); |
| } |
| |
| private: |
| static constexpr int kGrpcTraceBinHeaderLen = 29; |
| |
| std::array<uint8_t, kGrpcTraceBinHeaderLen> SpanContextToGrpcTraceBinHeader( |
| const SpanContext& ctx) { |
| std::array<uint8_t, kGrpcTraceBinHeaderLen> header; |
| header[0] = 0; |
| header[1] = 0; |
| ctx.trace_id().CopyBytesTo( |
| opentelemetry::nostd::span<uint8_t, 16>(&header[2], 16)); |
| header[18] = 1; |
| ctx.span_id().CopyBytesTo( |
| opentelemetry::nostd::span<uint8_t, 8>(&header[19], 8)); |
| header[27] = 2; |
| header[28] = ctx.trace_flags().flags(); |
| return header; |
| } |
| |
| SpanContext GrpcTraceBinHeaderToSpanContext( |
| opentelemetry::nostd::string_view header) { |
| if (header.size() != kGrpcTraceBinHeaderLen || header[0] != 0 || |
| header[1] != 0 || header[18] != 1 || header[27] != 2) { |
| return SpanContext::GetInvalid(); |
| } |
| return SpanContext(TraceId(opentelemetry::nostd::span<const uint8_t, 16>( |
| reinterpret_cast<const uint8_t*>(&header[2]), 16)), |
| SpanId(opentelemetry::nostd::span<const uint8_t, 8>( |
| reinterpret_cast<const uint8_t*>(&header[19]), 8)), |
| opentelemetry::trace::TraceFlags(header[28]), |
| /*is_remote*/ true); |
| } |
| }; |
| |
| opentelemetry::nostd::string_view GrpcTextMapCarrier::Get( |
| opentelemetry::nostd::string_view key) const noexcept { |
| absl::string_view absl_key = NoStdStringViewToAbslStringView(key); |
| std::string scratch; |
| std::string ret_val; |
| if (absl_key == "grpc-trace-bin") { |
| ret_val = absl::Base64Escape( |
| metadata_->GetStringValue(absl_key, &scratch).value_or("")); |
| } else if (absl::EndsWith(absl_key, "-bin")) { |
| // Maybe ok to support a custom binary propagator. Needs based64 encoding |
| // validation if so. Not for now. |
| LOG(ERROR) << "Binary propagator other than GrpcTraceBinPropagator is " |
| "not supported."; |
| } else { |
| ret_val = std::string(AbslStringViewToNoStdStringView( |
| metadata_->GetStringValue(absl_key, &scratch).value_or(""))); |
| } |
| if (ret_val.empty()) { |
| return ""; |
| } |
| // Store the string on the arena since we are returning a string_view. |
| std::string* arena_stored_string = |
| grpc_core::GetContext<grpc_core::Arena>()->ManagedNew<std::string>( |
| std::move(ret_val)); |
| return *arena_stored_string; |
| } |
| |
| void GrpcTextMapCarrier::Set(opentelemetry::nostd::string_view key, |
| opentelemetry::nostd::string_view value) noexcept { |
| absl::string_view absl_key = NoStdStringViewToAbslStringView(key); |
| absl::string_view absl_value = NoStdStringViewToAbslStringView(value); |
| if (absl_key == "grpc-trace-bin") { |
| std::string unescaped_value; |
| if (!absl::Base64Unescape(absl_value, &unescaped_value)) { |
| return; |
| } |
| metadata_->Set( |
| grpc_core::GrpcTraceBinMetadata(), |
| grpc_core::Slice::FromCopiedString(std::move(unescaped_value))); |
| } else if (absl::EndsWith(absl_key, "-bin")) { |
| LOG(ERROR) << "Binary propagator other than GrpcTraceBinPropagator is " |
| "not supported."; |
| return; |
| } else { |
| // A propagator other than GrpcTraceBinTextMapPropagator was used. |
| metadata_->Append(absl_key, grpc_core::Slice::FromCopiedString(absl_value), |
| [](absl::string_view, const grpc_core::Slice&) { |
| LOG(ERROR) |
| << "Failed to add tracing information in metadata."; |
| }); |
| } |
| } |
| |
| std::string OTelSpanTraceIdToString(opentelemetry::trace::Span* span) { |
| if (span == nullptr) { |
| return ""; |
| } |
| auto context = span->GetContext(); |
| auto trace_id = context.trace_id(); |
| if (!trace_id.IsValid()) { |
| return ""; |
| } |
| auto trace_id_span = trace_id.Id(); |
| return absl::BytesToHexString( |
| absl::string_view(reinterpret_cast<const char*>(trace_id_span.data()), |
| trace_id_span.size())); |
| } |
| |
| std::string OTelSpanSpanIdToString(opentelemetry::trace::Span* span) { |
| if (span == nullptr) { |
| return ""; |
| } |
| auto context = span->GetContext(); |
| auto span_id = context.span_id(); |
| if (!span_id.IsValid()) { |
| return ""; |
| } |
| auto span_id_span = span_id.Id(); |
| return absl::BytesToHexString(absl::string_view( |
| reinterpret_cast<const char*>(span_id_span.data()), span_id_span.size())); |
| } |
| |
| } // namespace internal |
| |
| constexpr absl::string_view |
| OpenTelemetryPluginBuilder::kClientAttemptStartedInstrumentName; |
| constexpr absl::string_view |
| OpenTelemetryPluginBuilder::kClientAttemptDurationInstrumentName; |
| constexpr absl::string_view OpenTelemetryPluginBuilder:: |
| kClientAttemptSentTotalCompressedMessageSizeInstrumentName; |
| constexpr absl::string_view OpenTelemetryPluginBuilder:: |
| kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName; |
| constexpr absl::string_view |
| OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName; |
| constexpr absl::string_view |
| OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName; |
| constexpr absl::string_view OpenTelemetryPluginBuilder:: |
| kServerCallSentTotalCompressedMessageSizeInstrumentName; |
| constexpr absl::string_view OpenTelemetryPluginBuilder:: |
| kServerCallRcvdTotalCompressedMessageSizeInstrumentName; |
| |
| // |
| // OpenTelemetryPluginBuilder |
| // |
| |
| OpenTelemetryPluginBuilder::OpenTelemetryPluginBuilder() |
| : impl_(std::make_unique<internal::OpenTelemetryPluginBuilderImpl>()) {} |
| |
| OpenTelemetryPluginBuilder::~OpenTelemetryPluginBuilder() = default; |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetMeterProvider( |
| std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) { |
| impl_->SetMeterProvider(std::move(meter_provider)); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& |
| OpenTelemetryPluginBuilder::SetTargetAttributeFilter( |
| absl::AnyInvocable<bool(absl::string_view /*target*/) const> |
| target_attribute_filter) { |
| impl_->SetTargetAttributeFilter(std::move(target_attribute_filter)); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& |
| OpenTelemetryPluginBuilder::SetGenericMethodAttributeFilter( |
| absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const> |
| generic_method_attribute_filter) { |
| impl_->SetGenericMethodAttributeFilter( |
| std::move(generic_method_attribute_filter)); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::EnableMetrics( |
| absl::Span<const absl::string_view> metric_names) { |
| impl_->EnableMetrics(metric_names); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableMetrics( |
| absl::Span<const absl::string_view> metric_names) { |
| impl_->DisableMetrics(metric_names); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableAllMetrics() { |
| impl_->DisableAllMetrics(); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddPluginOption( |
| std::unique_ptr<OpenTelemetryPluginOption> option) { |
| impl_->AddPluginOption( |
| std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>( |
| static_cast<grpc::internal::InternalOpenTelemetryPluginOption*>( |
| option.release()))); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddOptionalLabel( |
| absl::string_view optional_label_key) { |
| impl_->AddOptionalLabel(optional_label_key); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetTracerProvider( |
| std::shared_ptr<opentelemetry::trace::TracerProvider> tracer_provider) { |
| impl_->SetTracerProvider(std::move(tracer_provider)); |
| return *this; |
| } |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetTextMapPropagator( |
| std::unique_ptr<TextMapPropagator> text_map_propagator) { |
| impl_->SetTextMapPropagator(std::move(text_map_propagator)); |
| return *this; |
| } |
| |
| std::unique_ptr<TextMapPropagator> |
| OpenTelemetryPluginBuilder::MakeGrpcTraceBinTextMapPropagator() { |
| return std::make_unique<internal::GrpcTraceBinTextMapPropagator>(); |
| } |
| |
| OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetChannelScopeFilter( |
| absl::AnyInvocable<bool(const ChannelScope& /*scope*/) const> |
| channel_scope_filter) { |
| impl_->SetChannelScopeFilter(std::move(channel_scope_filter)); |
| return *this; |
| } |
| |
| absl::Status OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() { |
| return impl_->BuildAndRegisterGlobal(); |
| } |
| |
| absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>> |
| OpenTelemetryPluginBuilder::Build() { |
| return impl_->Build(); |
| } |
| |
| } // namespace grpc |