| // Copyright 2018, OpenCensus Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package stackdriver |
| |
| /* |
| The code in this file is responsible for converting OpenCensus Proto metrics |
| directly to Stackdriver Metrics. |
| */ |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "path" |
| "strings" |
| |
| "go.opencensus.io/resource" |
| |
| commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" |
| metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" |
| resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" |
| timestamppb "github.com/golang/protobuf/ptypes/timestamp" |
| distributionpb "google.golang.org/genproto/googleapis/api/distribution" |
| labelpb "google.golang.org/genproto/googleapis/api/label" |
| googlemetricpb "google.golang.org/genproto/googleapis/api/metric" |
| monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" |
| monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" |
| ) |
| |
| var errNilMetricOrMetricDescriptor = errors.New("non-nil metric or metric descriptor") |
| var percentileLabelKey = &metricspb.LabelKey{ |
| Key: "percentile", |
| Description: "the value at a given percentile of a distribution", |
| } |
| var globalResource = &resource.Resource{Type: "global"} |
| var domains = []string{"googleapis.com", "kubernetes.io", "istio.io", "knative.dev"} |
| |
| // PushMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously, |
| // without de-duping or adding proto metrics to the bundler. |
| func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) { |
| if len(metrics) == 0 { |
| return 0, errNilMetricOrMetricDescriptor |
| } |
| |
| // Caches the resources seen so far |
| seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) |
| |
| mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, se.o.Timeout) |
| for _, metric := range metrics { |
| if len(metric.GetTimeseries()) == 0 { |
| // No TimeSeries to export, skip this metric. |
| continue |
| } |
| mappedRsc := se.getResource(rsc, metric, seenResources) |
| if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY { |
| summaryMtcs := se.convertSummaryMetrics(metric) |
| for _, summaryMtc := range summaryMtcs { |
| if err := se.createMetricDescriptorFromMetricProto(ctx, summaryMtc); err != nil { |
| mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err) |
| continue |
| } |
| se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb) |
| } |
| } else { |
| if err := se.createMetricDescriptorFromMetricProto(ctx, metric); err != nil { |
| mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err) |
| continue |
| } |
| se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb) |
| } |
| } |
| |
| return mb.droppedTimeSeries, mb.close(ctx) |
| } |
| |
| func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric { |
| var metrics []*metricspb.Metric |
| |
| for _, ts := range summary.Timeseries { |
| var percentileTss []*metricspb.TimeSeries |
| var countTss []*metricspb.TimeSeries |
| var sumTss []*metricspb.TimeSeries |
| lvs := ts.GetLabelValues() |
| |
| startTime := ts.StartTimestamp |
| for _, pt := range ts.GetPoints() { |
| ptTimestamp := pt.GetTimestamp() |
| summaryValue := pt.GetSummaryValue() |
| if summaryValue.Sum != nil { |
| sumTs := &metricspb.TimeSeries{ |
| LabelValues: lvs, |
| StartTimestamp: startTime, |
| Points: []*metricspb.Point{ |
| { |
| Value: &metricspb.Point_DoubleValue{ |
| DoubleValue: summaryValue.Sum.Value, |
| }, |
| Timestamp: ptTimestamp, |
| }, |
| }, |
| } |
| sumTss = append(sumTss, sumTs) |
| } |
| |
| if summaryValue.Count != nil { |
| countTs := &metricspb.TimeSeries{ |
| LabelValues: lvs, |
| StartTimestamp: startTime, |
| Points: []*metricspb.Point{ |
| { |
| Value: &metricspb.Point_Int64Value{ |
| Int64Value: summaryValue.Count.Value, |
| }, |
| Timestamp: ptTimestamp, |
| }, |
| }, |
| } |
| countTss = append(countTss, countTs) |
| } |
| |
| snapshot := summaryValue.GetSnapshot() |
| for _, percentileValue := range snapshot.GetPercentileValues() { |
| lvsWithPercentile := lvs[0:] |
| lvsWithPercentile = append(lvsWithPercentile, &metricspb.LabelValue{ |
| HasValue: true, |
| Value: fmt.Sprintf("%f", percentileValue.Percentile), |
| }) |
| percentileTs := &metricspb.TimeSeries{ |
| LabelValues: lvsWithPercentile, |
| StartTimestamp: nil, |
| Points: []*metricspb.Point{ |
| { |
| Value: &metricspb.Point_DoubleValue{ |
| DoubleValue: percentileValue.Value, |
| }, |
| Timestamp: ptTimestamp, |
| }, |
| }, |
| } |
| percentileTss = append(percentileTss, percentileTs) |
| } |
| } |
| |
| if len(sumTss) > 0 { |
| metric := &metricspb.Metric{ |
| MetricDescriptor: &metricspb.MetricDescriptor{ |
| Name: fmt.Sprintf("%s_summary_sum", summary.GetMetricDescriptor().GetName()), |
| Description: summary.GetMetricDescriptor().GetDescription(), |
| Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, |
| Unit: summary.GetMetricDescriptor().GetUnit(), |
| LabelKeys: summary.GetMetricDescriptor().GetLabelKeys(), |
| }, |
| Timeseries: sumTss, |
| Resource: summary.Resource, |
| } |
| metrics = append(metrics, metric) |
| } |
| if len(countTss) > 0 { |
| metric := &metricspb.Metric{ |
| MetricDescriptor: &metricspb.MetricDescriptor{ |
| Name: fmt.Sprintf("%s_summary_count", summary.GetMetricDescriptor().GetName()), |
| Description: summary.GetMetricDescriptor().GetDescription(), |
| Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, |
| Unit: "1", |
| LabelKeys: summary.GetMetricDescriptor().GetLabelKeys(), |
| }, |
| Timeseries: countTss, |
| Resource: summary.Resource, |
| } |
| metrics = append(metrics, metric) |
| } |
| if len(percentileTss) > 0 { |
| lks := summary.GetMetricDescriptor().GetLabelKeys()[0:] |
| lks = append(lks, percentileLabelKey) |
| metric := &metricspb.Metric{ |
| MetricDescriptor: &metricspb.MetricDescriptor{ |
| Name: fmt.Sprintf("%s_summary_percentile", summary.GetMetricDescriptor().GetName()), |
| Description: summary.GetMetricDescriptor().GetDescription(), |
| Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, |
| Unit: summary.GetMetricDescriptor().GetUnit(), |
| LabelKeys: lks, |
| }, |
| Timeseries: percentileTss, |
| Resource: summary.Resource, |
| } |
| metrics = append(metrics, metric) |
| } |
| } |
| return metrics |
| } |
| |
| func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource { |
| var resource = rsc |
| if metric.Resource != nil { |
| resource = metric.Resource |
| } |
| mappedRsc, ok := seenRscs[resource] |
| if !ok { |
| mappedRsc = se.o.MapResource(resourcepbToResource(resource)) |
| seenRscs[resource] = mappedRsc |
| } |
| return mappedRsc |
| } |
| |
| func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource { |
| if rsc == nil { |
| return globalResource |
| } |
| res := &resource.Resource{ |
| Type: rsc.Type, |
| Labels: make(map[string]string, len(rsc.Labels)), |
| } |
| |
| for k, v := range rsc.Labels { |
| res.Labels[k] = v |
| } |
| return res |
| } |
| |
| // protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest |
| // but it doesn't invoke any remote API. |
| func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, mb *metricsBatcher) { |
| if metric == nil || metric.MetricDescriptor == nil { |
| mb.recordDroppedTimeseries(len(metric.GetTimeseries()), errNilMetricOrMetricDescriptor) |
| } |
| |
| metricType := se.metricTypeFromProto(metric.GetMetricDescriptor().GetName()) |
| metricLabelKeys := metric.GetMetricDescriptor().GetLabelKeys() |
| metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric) |
| labelKeys := make([]string, 0, len(metricLabelKeys)) |
| for _, key := range metricLabelKeys { |
| labelKeys = append(labelKeys, sanitize(key.GetKey())) |
| } |
| |
| for _, protoTimeSeries := range metric.Timeseries { |
| if len(protoTimeSeries.Points) == 0 { |
| // No points to send just move forward. |
| continue |
| } |
| |
| sdPoints, err := se.protoTimeSeriesToMonitoringPoints(protoTimeSeries, metricKind) |
| if err != nil { |
| mb.recordDroppedTimeseries(1, err) |
| continue |
| } |
| |
| // Each TimeSeries has labelValues which MUST be correlated |
| // with that from the MetricDescriptor |
| labels, err := labelsPerTimeSeries(se.defaultLabels, labelKeys, protoTimeSeries.GetLabelValues()) |
| if err != nil { |
| mb.recordDroppedTimeseries(1, err) |
| continue |
| } |
| mb.addTimeSeries(&monitoringpb.TimeSeries{ |
| Metric: &googlemetricpb.Metric{ |
| Type: metricType, |
| Labels: labels, |
| }, |
| MetricKind: metricKind, |
| ValueType: valueType, |
| Resource: mappedRsc, |
| Points: sdPoints, |
| }) |
| } |
| } |
| |
| func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, labelValues []*metricspb.LabelValue) (map[string]string, error) { |
| if len(labelKeys) != len(labelValues) { |
| return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues)) |
| } |
| |
| if len(defaults)+len(labelKeys) == 0 { |
| // No labels for this metric |
| return nil, nil |
| } |
| |
| labels := make(map[string]string) |
| // Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched. |
| for key, label := range defaults { |
| labels[key] = label.val |
| } |
| |
| for i, labelKey := range labelKeys { |
| labelValue := labelValues[i] |
| if !labelValue.GetHasValue() { |
| continue |
| } |
| labels[labelKey] = labelValue.GetValue() |
| } |
| |
| return labels, nil |
| } |
| |
| func (se *statsExporter) createMetricDescriptorFromMetricProto(ctx context.Context, metric *metricspb.Metric) error { |
| // Skip create metric descriptor if configured |
| if se.o.SkipCMD { |
| return nil |
| } |
| |
| ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout) |
| defer cancel() |
| |
| se.protoMu.Lock() |
| defer se.protoMu.Unlock() |
| |
| name := metric.GetMetricDescriptor().GetName() |
| if _, created := se.protoMetricDescriptors[name]; created { |
| return nil |
| } |
| |
| if builtinMetric(se.metricTypeFromProto(name)) { |
| se.protoMetricDescriptors[name] = true |
| return nil |
| } |
| |
| // Otherwise, we encountered a cache-miss and |
| // should create the metric descriptor remotely. |
| inMD, err := se.protoToMonitoringMetricDescriptor(metric, se.defaultLabels) |
| if err != nil { |
| return err |
| } |
| |
| if err = se.createMetricDescriptor(ctx, inMD); err != nil { |
| return err |
| } |
| |
| se.protoMetricDescriptors[name] = true |
| return nil |
| } |
| |
| func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) ([]*monitoringpb.Point, error) { |
| sptl := make([]*monitoringpb.Point, 0, len(ts.Points)) |
| for _, pt := range ts.Points { |
| // If we have a last value aggregation point i.e. MetricDescriptor_GAUGE |
| // StartTime should be nil. |
| startTime := ts.StartTimestamp |
| if metricKind == googlemetricpb.MetricDescriptor_GAUGE { |
| startTime = nil |
| } |
| spt, err := fromProtoPoint(startTime, pt) |
| if err != nil { |
| return nil, err |
| } |
| sptl = append(sptl, spt) |
| } |
| return sptl, nil |
| } |
| |
| func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric, additionalLabels map[string]labelValue) (*googlemetricpb.MetricDescriptor, error) { |
| if metric == nil || metric.MetricDescriptor == nil { |
| return nil, errNilMetricOrMetricDescriptor |
| } |
| |
| md := metric.GetMetricDescriptor() |
| metricName := md.GetName() |
| unit := md.GetUnit() |
| description := md.GetDescription() |
| metricType := se.metricTypeFromProto(metricName) |
| displayName := se.displayName(metricName) |
| metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric) |
| |
| sdm := &googlemetricpb.MetricDescriptor{ |
| Name: fmt.Sprintf("projects/%s/metricDescriptors/%s", se.o.ProjectID, metricType), |
| DisplayName: displayName, |
| Description: description, |
| Unit: unit, |
| Type: metricType, |
| MetricKind: metricKind, |
| ValueType: valueType, |
| Labels: labelDescriptorsFromProto(additionalLabels, metric.GetMetricDescriptor().GetLabelKeys()), |
| } |
| |
| return sdm, nil |
| } |
| |
| func labelDescriptorsFromProto(defaults map[string]labelValue, protoLabelKeys []*metricspb.LabelKey) []*labelpb.LabelDescriptor { |
| labelDescriptors := make([]*labelpb.LabelDescriptor, 0, len(defaults)+len(protoLabelKeys)) |
| |
| // Fill in the defaults first. |
| for key, lbl := range defaults { |
| labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{ |
| Key: sanitize(key), |
| Description: lbl.desc, |
| ValueType: labelpb.LabelDescriptor_STRING, |
| }) |
| } |
| |
| // Now fill in those from the metric. |
| for _, protoKey := range protoLabelKeys { |
| labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{ |
| Key: sanitize(protoKey.GetKey()), |
| Description: protoKey.GetDescription(), |
| ValueType: labelpb.LabelDescriptor_STRING, // We only use string tags |
| }) |
| } |
| return labelDescriptors |
| } |
| |
| func (se *statsExporter) metricTypeFromProto(name string) string { |
| prefix := se.o.MetricPrefix |
| if se.o.GetMetricPrefix != nil { |
| prefix = se.o.GetMetricPrefix(name) |
| } |
| if prefix != "" { |
| name = path.Join(prefix, name) |
| } |
| if !hasDomain(name) { |
| // Still needed because the name may or may not have a "/" at the beginning. |
| name = path.Join(defaultDomain, name) |
| } |
| return name |
| } |
| |
| // hasDomain checks if the metric name already has a domain in it. |
| func hasDomain(name string) bool { |
| for _, domain := range domains { |
| if strings.Contains(name, domain) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func fromProtoPoint(startTime *timestamppb.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) { |
| if pt == nil { |
| return nil, nil |
| } |
| |
| mptv, err := protoToMetricPoint(pt.Value) |
| if err != nil { |
| return nil, err |
| } |
| |
| endTime := pt.Timestamp |
| interval := &monitoringpb.TimeInterval{ |
| StartTime: startTime, |
| EndTime: endTime, |
| } |
| if startTime != nil && endTime != nil { |
| interval = toValidTimeIntervalpb(startTime.AsTime(), endTime.AsTime()) |
| } |
| |
| return &monitoringpb.Point{ |
| Value: mptv, |
| Interval: interval, |
| }, nil |
| } |
| |
| func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) { |
| if value == nil { |
| return nil, nil |
| } |
| |
| switch v := value.(type) { |
| default: |
| // All the other types are not yet handled. |
| // TODO: (@odeke-em, @songy23) talk to the Stackdriver team to determine |
| // the use cases for: |
| // |
| // *TypedValue_BoolValue |
| // *TypedValue_StringValue |
| // |
| // and then file feature requests on OpenCensus-Specs and then OpenCensus-Proto, |
| // lest we shall error here. |
| // |
| // TODO: Add conversion from SummaryValue when |
| // https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/66 |
| // has been figured out. |
| return nil, fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value) |
| |
| case *metricspb.Point_Int64Value: |
| return &monitoringpb.TypedValue{ |
| Value: &monitoringpb.TypedValue_Int64Value{ |
| Int64Value: v.Int64Value, |
| }, |
| }, nil |
| |
| case *metricspb.Point_DoubleValue: |
| return &monitoringpb.TypedValue{ |
| Value: &monitoringpb.TypedValue_DoubleValue{ |
| DoubleValue: v.DoubleValue, |
| }, |
| }, nil |
| |
| case *metricspb.Point_DistributionValue: |
| dv := v.DistributionValue |
| var mv *monitoringpb.TypedValue_DistributionValue |
| if dv != nil { |
| var mean float64 |
| if dv.Count > 0 { |
| mean = float64(dv.Sum) / float64(dv.Count) |
| } |
| mv = &monitoringpb.TypedValue_DistributionValue{ |
| DistributionValue: &distributionpb.Distribution{ |
| Count: dv.Count, |
| Mean: mean, |
| SumOfSquaredDeviation: dv.SumOfSquaredDeviation, |
| }, |
| } |
| |
| insertZeroBound := false |
| if bopts := dv.BucketOptions; bopts != nil && bopts.Type != nil { |
| bexp, ok := bopts.Type.(*metricspb.DistributionValue_BucketOptions_Explicit_) |
| if ok && bexp != nil && bexp.Explicit != nil { |
| insertZeroBound = shouldInsertZeroBound(bexp.Explicit.Bounds...) |
| mv.DistributionValue.BucketOptions = &distributionpb.Distribution_BucketOptions{ |
| Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{ |
| ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{ |
| // The first bucket bound should be 0.0 because the Metrics first bucket is |
| // [0, first_bound) but Stackdriver monitoring bucket bounds begin with -infinity |
| // (first bucket is (-infinity, 0)) |
| Bounds: addZeroBoundOnCondition(insertZeroBound, bexp.Explicit.Bounds...), |
| }, |
| }, |
| } |
| } |
| } |
| mv.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(insertZeroBound, bucketCounts(dv.Buckets)...) |
| |
| } |
| return &monitoringpb.TypedValue{Value: mv}, nil |
| } |
| } |
| |
| func bucketCounts(buckets []*metricspb.DistributionValue_Bucket) []int64 { |
| bucketCounts := make([]int64, len(buckets)) |
| for i, bucket := range buckets { |
| if bucket != nil { |
| bucketCounts[i] = bucket.Count |
| } |
| } |
| return bucketCounts |
| } |
| |
| func protoMetricDescriptorTypeToMetricKind(m *metricspb.Metric) (googlemetricpb.MetricDescriptor_MetricKind, googlemetricpb.MetricDescriptor_ValueType) { |
| dt := m.GetMetricDescriptor() |
| if dt == nil { |
| return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED |
| } |
| |
| switch dt.Type { |
| case metricspb.MetricDescriptor_CUMULATIVE_INT64: |
| return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_INT64 |
| |
| case metricspb.MetricDescriptor_CUMULATIVE_DOUBLE: |
| return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DOUBLE |
| |
| case metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION: |
| return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DISTRIBUTION |
| |
| case metricspb.MetricDescriptor_GAUGE_DOUBLE: |
| return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DOUBLE |
| |
| case metricspb.MetricDescriptor_GAUGE_INT64: |
| return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_INT64 |
| |
| case metricspb.MetricDescriptor_GAUGE_DISTRIBUTION: |
| return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DISTRIBUTION |
| |
| default: |
| return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED |
| } |
| } |