package stackdriver
import (
opencensus ""
monitoring ""
distributionpb ""
labelpb ""
metricpb ""
monitoredrespb ""
monitoringpb ""
const (
maxTimeSeriesPerUpload = 200
opencensusTaskKey = "opencensus_task"
opencensusTaskDescription = "Opencensus task identifier"
defaultDisplayNamePrefix = "OpenCensus"
version = "0.10.0"
var userAgent = fmt.Sprintf("opencensus-go %s; stackdriver-exporter %s", opencensus.Version(), version)
// statsExporter exports stats to the Stackdriver Monitoring.
type statsExporter struct {
o Options
viewDataBundler *bundler.Bundler
protoMetricsBundler *bundler.Bundler
createdViewsMu sync.Mutex
createdViews map[string]*metricpb.MetricDescriptor // Views already created remotely
protoMu sync.Mutex
protoMetricDescriptors map[string]*metricpb.MetricDescriptor // Saves the metric descriptors that were already created remotely
c *monitoring.MetricClient
defaultLabels map[string]labelValue
var (
errBlankProjectID = errors.New("expecting a non-blank ProjectID")
// newStatsExporter returns an exporter that uploads stats data to Stackdriver Monitoring.
// Only one Stackdriver exporter should be created per ProjectID per process, any subsequent
// invocations of NewExporter with the same ProjectID will return an error.
func newStatsExporter(o Options) (*statsExporter, error) {
if strings.TrimSpace(o.ProjectID) == "" {
return nil, errBlankProjectID
opts := append(o.MonitoringClientOptions, option.WithUserAgent(userAgent))
ctx, cancel := o.newContextWithTimeout()
defer cancel()
client, err := monitoring.NewMetricClient(ctx, opts...)
if err != nil {
return nil, err
e := &statsExporter{
c: client,
o: o,
createdViews: make(map[string]*metricpb.MetricDescriptor),
protoMetricDescriptors: make(map[string]*metricpb.MetricDescriptor),
if o.DefaultMonitoringLabels != nil {
e.defaultLabels = o.DefaultMonitoringLabels.m
} else {
e.defaultLabels = map[string]labelValue{
opencensusTaskKey: {val: getTaskValue(), desc: opencensusTaskDescription},
e.viewDataBundler = bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) {
vds := bundle.([]*view.Data)
e.protoMetricsBundler = bundler.NewBundler((*metricProtoPayload)(nil), func(bundle interface{}) {
payloads := bundle.([]*metricProtoPayload)
if delayThreshold := e.o.BundleDelayThreshold; delayThreshold > 0 {
e.viewDataBundler.DelayThreshold = delayThreshold
e.protoMetricsBundler.DelayThreshold = delayThreshold
if countThreshold := e.o.BundleCountThreshold; countThreshold > 0 {
e.viewDataBundler.BundleCountThreshold = countThreshold
e.protoMetricsBundler.BundleCountThreshold = countThreshold
return e, nil
func (e *statsExporter) getMonitoredResource(v *view.View, tags []tag.Tag) ([]tag.Tag, *monitoredrespb.MonitoredResource) {
if get := e.o.GetMonitoredResource; get != nil {
newTags, mr := get(v, tags)
return newTags, convertMonitoredResourceToPB(mr)
} else {
resource := e.o.Resource
if resource == nil {
resource = &monitoredrespb.MonitoredResource{
Type: "global",
return tags, resource
// ExportView exports to the Stackdriver Monitoring if view data
// has one or more rows.
func (e *statsExporter) ExportView(vd *view.Data) {
if len(vd.Rows) == 0 {
err := e.viewDataBundler.Add(vd, 1)
switch err {
case nil:
case bundler.ErrOverflow:
e.o.handleError(errors.New("failed to upload: buffer full"))
// getTaskValue returns a task label value in the format of
// "go-<pid>@<hostname>".
func getTaskValue() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "localhost"
return "go-" + strconv.Itoa(os.Getpid()) + "@" + hostname
// handleUpload handles uploading a slice
// of Data, as well as error handling.
func (e *statsExporter) handleUpload(vds ...*view.Data) {
if err := e.uploadStats(vds); err != nil {
// Flush waits for exported view data and metrics to be uploaded.
// This is useful if your program is ending and you do not
// want to lose data that hasn't yet been exported.
func (e *statsExporter) Flush() {
func (e *statsExporter) uploadStats(vds []*view.Data) error {
ctx, cancel := e.o.newContextWithTimeout()
defer cancel()
ctx, span := trace.StartSpan(
defer span.End()
for _, vd := range vds {
if err := e.createMeasure(ctx, vd.View); err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
return err
for _, req := range e.makeReq(vds, maxTimeSeriesPerUpload) {
if err := createTimeSeries(ctx, e.c, req); err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
// TODO(jbd): Don't fail fast here, batch errors?
return err
return nil
func (se *statsExporter) makeReq(vds []*view.Data, limit int) []*monitoringpb.CreateTimeSeriesRequest {
var reqs []*monitoringpb.CreateTimeSeriesRequest
var allTimeSeries []*monitoringpb.TimeSeries
for _, vd := range vds {
for _, row := range vd.Rows {
tags, resource := se.getMonitoredResource(vd.View, append([]tag.Tag(nil), row.Tags...))
ts := &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: se.metricType(vd.View),
Labels: newLabels(se.defaultLabels, tags),
Resource: resource,
Points: []*monitoringpb.Point{newPoint(vd.View, row, vd.Start, vd.End)},
allTimeSeries = append(allTimeSeries, ts)
var timeSeries []*monitoringpb.TimeSeries
for _, ts := range allTimeSeries {
timeSeries = append(timeSeries, ts)
if len(timeSeries) == limit {
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(timeSeries)
reqs = append(reqs, ctsreql...)
timeSeries = timeSeries[:0]
if len(timeSeries) > 0 {
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(timeSeries)
reqs = append(reqs, ctsreql...)
return reqs
func (e *statsExporter) viewToMetricDescriptor(ctx context.Context, v *view.View) (*metricpb.MetricDescriptor, error) {
m := v.Measure
agg := v.Aggregation
viewName := v.Name
metricType := e.metricType(v)
var valueType metricpb.MetricDescriptor_ValueType
unit := m.Unit()
// Default metric Kind
metricKind := metricpb.MetricDescriptor_CUMULATIVE
switch agg.Type {
case view.AggTypeCount:
valueType = metricpb.MetricDescriptor_INT64
// If the aggregation type is count, which counts the number of recorded measurements, the unit must be "1",
// because this view does not apply to the recorded values.
unit = stats.UnitDimensionless
case view.AggTypeSum:
switch m.(type) {
case *stats.Int64Measure:
valueType = metricpb.MetricDescriptor_INT64
case *stats.Float64Measure:
valueType = metricpb.MetricDescriptor_DOUBLE
case view.AggTypeDistribution:
valueType = metricpb.MetricDescriptor_DISTRIBUTION
case view.AggTypeLastValue:
metricKind = metricpb.MetricDescriptor_GAUGE
switch m.(type) {
case *stats.Int64Measure:
valueType = metricpb.MetricDescriptor_INT64
case *stats.Float64Measure:
valueType = metricpb.MetricDescriptor_DOUBLE
return nil, fmt.Errorf("unsupported aggregation type: %s", agg.Type.String())
var displayName string
if e.o.GetMetricDisplayName == nil {
displayName = e.displayName(viewName)
} else {
displayName = e.o.GetMetricDisplayName(v)
res := &metricpb.MetricDescriptor{
Name: fmt.Sprintf("projects/%s/metricDescriptors/%s", e.o.ProjectID, metricType),
DisplayName: displayName,
Description: v.Description,
Unit: unit,
Type: metricType,
MetricKind: metricKind,
ValueType: valueType,
Labels: newLabelDescriptors(e.defaultLabels, v.TagKeys),
return res, nil
func (e *statsExporter) viewToCreateMetricDescriptorRequest(ctx context.Context, v *view.View) (*monitoringpb.CreateMetricDescriptorRequest, error) {
inMD, err := e.viewToMetricDescriptor(ctx, v)
if err != nil {
return nil, err
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
MetricDescriptor: inMD,
return cmrdesc, nil
// createMeasure creates a MetricDescriptor for the given view data in Stackdriver Monitoring.
// An error will be returned if there is already a metric descriptor created with the same name
// but it has a different aggregation or keys.
func (e *statsExporter) createMeasure(ctx context.Context, v *view.View) error {
defer e.createdViewsMu.Unlock()
viewName := v.Name
if md, ok := e.createdViews[viewName]; ok {
// [TODO:rghetia] Temporary fix for
if builtinMetric(md.Type) {
return nil
return e.equalMeasureAggTagKeys(md, v.Measure, v.Aggregation, v.TagKeys)
inMD, err := e.viewToMetricDescriptor(ctx, v)
if err != nil {
return err
var dmd *metric.MetricDescriptor
if builtinMetric(inMD.Type) {
gmrdesc := &monitoringpb.GetMetricDescriptorRequest{
Name: inMD.Name,
dmd, err = getMetricDescriptor(ctx, e.c, gmrdesc)
} else {
cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", e.o.ProjectID),
MetricDescriptor: inMD,
dmd, err = createMetricDescriptor(ctx, e.c, cmrdesc)
if err != nil {
return err
// Now cache the metric descriptor
e.createdViews[viewName] = dmd
return err
func (e *statsExporter) displayName(suffix string) string {
displayNamePrefix := defaultDisplayNamePrefix
if e.o.MetricPrefix != "" {
displayNamePrefix = e.o.MetricPrefix
return path.Join(displayNamePrefix, suffix)
func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point {
switch v.Aggregation.Type {
case view.AggTypeLastValue:
return newGaugePoint(v, row, end)
return newCumulativePoint(v, row, start, end)
func newCumulativePoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point {
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: start.Unix(),
Nanos: int32(start.Nanosecond()),
EndTime: &timestamp.Timestamp{
Seconds: end.Unix(),
Nanos: int32(end.Nanosecond()),
Value: newTypedValue(v, row),
func newGaugePoint(v *view.View, row *view.Row, end time.Time) *monitoringpb.Point {
gaugeTime := &timestamp.Timestamp{
Seconds: end.Unix(),
Nanos: int32(end.Nanosecond()),
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: gaugeTime,
Value: newTypedValue(v, row),
func newTypedValue(vd *view.View, r *view.Row) *monitoringpb.TypedValue {
switch v := r.Data.(type) {
case *view.CountData:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: v.Value,
case *view.SumData:
switch vd.Measure.(type) {
case *stats.Int64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(v.Value),
case *stats.Float64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: v.Value,
case *view.DistributionData:
insertZeroBound := shouldInsertZeroBound(vd.Aggregation.Buckets...)
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distributionpb.Distribution{
Count: v.Count,
Mean: v.Mean,
SumOfSquaredDeviation: v.SumOfSquaredDev,
// TODO(songya): uncomment this once Stackdriver supports min/max.
// Range: &distributionpb.Distribution_Range{
// Min: v.Min,
// Max: v.Max,
// },
BucketOptions: &distributionpb.Distribution_BucketOptions{
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
Bounds: addZeroBoundOnCondition(insertZeroBound, vd.Aggregation.Buckets...),
BucketCounts: addZeroBucketCountOnCondition(insertZeroBound, v.CountPerBucket...),
case *view.LastValueData:
switch vd.Measure.(type) {
case *stats.Int64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(v.Value),
case *stats.Float64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: v.Value,
return nil
func shouldInsertZeroBound(bounds ...float64) bool {
if len(bounds) > 0 && bounds[0] != 0.0 {
return true
return false
func addZeroBucketCountOnCondition(insert bool, counts ...int64) []int64 {
if insert {
return append([]int64{0}, counts...)
return counts
func addZeroBoundOnCondition(insert bool, bounds ...float64) []float64 {
if insert {
return append([]float64{0.0}, bounds...)
return bounds
func (e *statsExporter) metricType(v *view.View) string {
if formatter := e.o.GetMetricType; formatter != nil {
return formatter(v)
} else {
return path.Join("", "opencensus", v.Name)
func newLabels(defaults map[string]labelValue, tags []tag.Tag) map[string]string {
labels := make(map[string]string)
for k, lbl := range defaults {
labels[sanitize(k)] = lbl.val
for _, tag := range tags {
labels[sanitize(tag.Key.Name())] = tag.Value
return labels
func newLabelDescriptors(defaults map[string]labelValue, keys []tag.Key) []*labelpb.LabelDescriptor {
labelDescriptors := make([]*labelpb.LabelDescriptor, 0, len(keys)+len(defaults))
for key, lbl := range defaults {
labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{
Key: sanitize(key),
Description: lbl.desc,
ValueType: labelpb.LabelDescriptor_STRING,
for _, key := range keys {
labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{
Key: sanitize(key.Name()),
ValueType: labelpb.LabelDescriptor_STRING, // We only use string tags
return labelDescriptors
func (e *statsExporter) equalMeasureAggTagKeys(md *metricpb.MetricDescriptor, m stats.Measure, agg *view.Aggregation, keys []tag.Key) error {
var aggTypeMatch bool
switch md.ValueType {
case metricpb.MetricDescriptor_INT64:
if _, ok := m.(*stats.Int64Measure); !(ok || agg.Type == view.AggTypeCount) {
return fmt.Errorf("stackdriver metric descriptor was not created as int64")
aggTypeMatch = agg.Type == view.AggTypeCount || agg.Type == view.AggTypeSum || agg.Type == view.AggTypeLastValue
case metricpb.MetricDescriptor_DOUBLE:
if _, ok := m.(*stats.Float64Measure); !ok {
return fmt.Errorf("stackdriver metric descriptor was not created as double")
aggTypeMatch = agg.Type == view.AggTypeSum || agg.Type == view.AggTypeLastValue
case metricpb.MetricDescriptor_DISTRIBUTION:
aggTypeMatch = agg.Type == view.AggTypeDistribution
if !aggTypeMatch {
return fmt.Errorf("stackdriver metric descriptor was not created with aggregation type %T", agg.Type)
labels := make(map[string]struct{}, len(keys)+len(e.defaultLabels))
for _, k := range keys {
labels[sanitize(k.Name())] = struct{}{}
for k := range e.defaultLabels {
labels[sanitize(k)] = struct{}{}
for _, k := range md.Labels {
if _, ok := labels[k.Key]; !ok {
return fmt.Errorf("stackdriver metric descriptor %q was not created with label %q", md.Type, k)
delete(labels, k.Key)
if len(labels) > 0 {
extra := make([]string, 0, len(labels))
for k := range labels {
extra = append(extra, k)
return fmt.Errorf("stackdriver metric descriptor %q contains unexpected labels: %s", md.Type, strings.Join(extra, ", "))
return nil
var createMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.CreateMetricDescriptorRequest) (*metric.MetricDescriptor, error) {
return c.CreateMetricDescriptor(ctx, mdr)
var getMetricDescriptor = func(ctx context.Context, c *monitoring.MetricClient, mdr *monitoringpb.GetMetricDescriptorRequest) (*metric.MetricDescriptor, error) {
return c.GetMetricDescriptor(ctx, mdr)
var createTimeSeries = func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
return c.CreateTimeSeries(ctx, ts)
var knownExternalMetricPrefixes = []string{
// builtinMetric returns true if a MetricType is a heuristically known
// built-in Stackdriver metric
func builtinMetric(metricType string) bool {
for _, knownExternalMetric := range knownExternalMetricPrefixes {
if strings.HasPrefix(metricType, knownExternalMetric) {
return false
return true