blob: ba48dead8e3377ed601533f2a2ae3778ad296780 [file] [log] [blame]
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stackdriver
import (
"context"
"encoding/json"
"errors"
"net"
"reflect"
"sync"
"testing"
"time"
"contrib.go.opencensus.io/exporter/ocagent"
"go.opencensus.io/exemplar"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"google.golang.org/api/option"
"google.golang.org/grpc"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
"github.com/golang/protobuf/ptypes/empty"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
func TestStatsAndMetricsEquivalence(t *testing.T) {
ma, addr, stop := createMockAgent(t)
defer stop()
oce, err := ocagent.NewExporter(ocagent.WithInsecure(),
ocagent.WithAddress(addr),
ocagent.WithReconnectionPeriod(1*time.Millisecond))
if err != nil {
t.Fatalf("Failed to create the ocagent exporter: %v", err)
}
time.Sleep(5 * time.Millisecond)
startTime := time.Date(2018, 11, 25, 15, 38, 18, 997, time.UTC)
mLatencyMs := stats.Float64("latency", "The latency for various methods", "ms")
// Generate the view.Data.
var vdl []*view.Data
for i := 0; i < 100; i++ {
vdl = append(vdl, &view.Data{
Start: startTime,
End: startTime.Add(time.Duration(1+i) * time.Second),
View: &view.View{
Name: "ocagent.io/latency",
Description: "The latency of the various methods",
Aggregation: view.Count(),
Measure: mLatencyMs,
},
Rows: []*view.Row{
{
Data: &view.CountData{Value: int64(4 * (i + 2))},
},
},
})
}
// Now perform some exporting.
for i, vd := range vdl {
oce.ExportView(vd)
oce.Flush()
time.Sleep(30 * time.Millisecond)
oce.Flush()
var last *agentmetricspb.ExportMetricsServiceRequest
ma.forEachRequest(func(emr *agentmetricspb.ExportMetricsServiceRequest) {
last = emr
})
if last == nil || len(last.Metrics) == 0 {
t.Errorf("#%d: Failed to retrieve any metrics", i)
continue
}
se := &statsExporter{
o: Options{ProjectID: "equivalence"},
}
ctx := context.Background()
sMD, err := se.viewToCreateMetricDescriptorRequest(ctx, vd.View)
if err != nil {
t.Errorf("#%d: Stats.viewToMetricDescriptor: %v", i, err)
}
pMD, err := se.protoMetricDescriptorToCreateMetricDescriptorRequest(ctx, last.Metrics[0])
if err != nil {
t.Errorf("#%d: Stats.protoMetricDescriptorToMetricDescriptor: %v", i, err)
}
if !reflect.DeepEqual(sMD, pMD) {
t.Errorf("MetricDescriptor Mismatch\nStats MetricDescriptor:\n\t%v\nProto MetricDescriptor:\n\t%v\n", sMD, pMD)
}
vdl := []*view.Data{vd}
sctreql := se.makeReq(vdl, maxTimeSeriesPerUpload)
tsl, _ := se.protoMetricToTimeSeries(ctx, last.Node, last.Resource, last.Metrics[0])
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
if !reflect.DeepEqual(sctreql, pctreql) {
t.Errorf("#%d: TimeSeries Mismatch\nStats CreateTimeSeriesRequest:\n\t%v\nProto CreateTimeSeriesRequest:\n\t%v\n",
i, sctreql, pctreql)
}
}
}
// This test creates and uses a "Stackdriver backend" which receives
// CreateTimeSeriesRequest and CreateMetricDescriptor requests
// that the Stackdriver Metrics Proto client then sends to, as it would
// send to Google Stackdriver backends.
//
// This test ensures that the final responses sent by direct stats(view.Data) exporting
// are exactly equal to those from view.Data-->OpenCensus-Proto.Metrics exporting.
func TestEquivalenceStatsVsMetricsUploads(t *testing.T) {
ma, addr, doneFn := createMockAgent(t)
defer doneFn()
// Now create a gRPC connection to the agent.
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to make a gRPC connection to the agent: %v", err)
}
defer conn.Close()
// Finally create the OpenCensus stats exporter
exporterOptions := Options{
ProjectID: "equivalence",
MonitoringClientOptions: []option.ClientOption{option.WithGRPCConn(conn)},
// Setting this time delay threshold to a very large value
// so that batching is performed deterministically and flushing is
// fully controlled by us.
BundleDelayThreshold: 2 * time.Hour,
}
se, err := newStatsExporter(exporterOptions)
if err != nil {
t.Fatalf("Failed to create the statsExporter: %v", err)
}
startTime := time.Date(2019, 1, 16, 15, 04, 23, 73, time.UTC)
mLatencyMs := stats.Float64("latency", "The latency for various methods", "ms")
mConnections := stats.Float64("connections", "The count of various connections at a point in time", "1")
mTimeMs := stats.Float64("time", "Counts time in milliseconds", "ms")
// Generate the view.Data.
var vdl []*view.Data
for i := 0; i < 10; i++ {
vdl = append(vdl,
&view.Data{
Start: startTime,
End: startTime.Add(time.Duration(1+i) * time.Second),
View: &view.View{
Name: "ocagent.io/calls",
Description: "The number of the various calls",
Aggregation: view.Count(),
Measure: mLatencyMs,
},
Rows: []*view.Row{
{
Data: &view.CountData{Value: int64(4 * (i + 2))},
},
},
},
&view.Data{
Start: startTime,
End: startTime.Add(time.Duration(2+i) * time.Second),
View: &view.View{
Name: "ocagent.io/latency",
Description: "The latency of the various methods",
Aggregation: view.Distribution(100, 500, 1000, 2000, 4000, 8000, 16000),
Measure: mLatencyMs,
},
Rows: []*view.Row{
{
Data: &view.DistributionData{
Count: 1,
Min: 100,
Max: 500,
Mean: 125.9,
CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0},
ExemplarsPerBucket: []*exemplar.Exemplar{
nil,
{
Value: 125.9, Timestamp: startTime.Add(time.Duration(1+i) * time.Second),
},
nil, nil, nil, nil, nil,
},
},
},
},
},
&view.Data{
Start: startTime,
End: startTime.Add(time.Duration(3+i) * time.Second),
View: &view.View{
Name: "ocagent.io/connections",
Description: "The count of various connections instantaneously",
Aggregation: view.LastValue(),
Measure: mConnections,
},
Rows: []*view.Row{
{Data: &view.LastValueData{Value: 99}},
},
},
&view.Data{
Start: startTime,
End: startTime.Add(time.Duration(1+i) * time.Second),
View: &view.View{
Name: "ocagent.io/uptime",
Description: "The total uptime at any instance",
Aggregation: view.Sum(),
Measure: mTimeMs,
},
Rows: []*view.Row{
{Data: &view.SumData{Value: 199903.97}},
},
})
}
for _, vd := range vdl {
// Export the view.Data to the Stackdriver backend.
se.ExportView(vd)
}
se.Flush()
// Examining the stackdriver metrics that are available.
var stackdriverTimeSeriesFromStats []*monitoringpb.CreateTimeSeriesRequest
ma.forEachStackdriverTimeSeries(func(sdt *monitoringpb.CreateTimeSeriesRequest) {
stackdriverTimeSeriesFromStats = append(stackdriverTimeSeriesFromStats, sdt)
})
var stackdriverMetricDescriptorsFromStats []*monitoringpb.CreateMetricDescriptorRequest
ma.forEachStackdriverMetricDescriptor(func(sdmd *monitoringpb.CreateMetricDescriptorRequest) {
stackdriverMetricDescriptorsFromStats = append(stackdriverMetricDescriptorsFromStats, sdmd)
})
// Reset the stackdriverTimeSeries to enable fresh collection
// and then comparison with the results from metrics uploads.
ma.resetStackdriverTimeSeries()
ma.resetStackdriverMetricDescriptors()
// Now for the metrics sent by the metrics exporter.
oce, err := ocagent.NewExporter(ocagent.WithInsecure(),
ocagent.WithAddress(addr),
ocagent.WithReconnectionPeriod(1*time.Millisecond))
if err != nil {
t.Fatalf("Failed to create the ocagent exporter: %v", err)
}
time.Sleep(5 * time.Millisecond)
for _, vd := range vdl {
// Perform the view.Data --> metricspb.Metric transformation.
oce.ExportView(vd)
oce.Flush()
time.Sleep(2 * time.Millisecond)
}
oce.Flush()
ma.forEachRequest(func(emr *agentmetricspb.ExportMetricsServiceRequest) {
_ = se.ExportMetricsProto(context.Background(), emr.Node, emr.Resource, emr.Metrics)
})
se.Flush()
var stackdriverTimeSeriesFromMetrics []*monitoringpb.CreateTimeSeriesRequest
ma.forEachStackdriverTimeSeries(func(sdt *monitoringpb.CreateTimeSeriesRequest) {
stackdriverTimeSeriesFromMetrics = append(stackdriverTimeSeriesFromMetrics, sdt)
})
var stackdriverMetricDescriptorsFromMetrics []*monitoringpb.CreateMetricDescriptorRequest
ma.forEachStackdriverMetricDescriptor(func(sdmd *monitoringpb.CreateMetricDescriptorRequest) {
stackdriverMetricDescriptorsFromMetrics = append(stackdriverMetricDescriptorsFromMetrics, sdmd)
})
// The results should be equal now
if !reflect.DeepEqual(stackdriverTimeSeriesFromMetrics, stackdriverTimeSeriesFromStats) {
blobFromMetrics := jsonBlob(stackdriverTimeSeriesFromMetrics)
blobFromStats := jsonBlob(stackdriverTimeSeriesFromStats)
t.Errorf("StackdriverTimeSeriesFromMetrics (%d):\n%s\n\nStackdriverTimeSeriesFromStats (%d):\n%s\n\n",
len(stackdriverTimeSeriesFromMetrics), blobFromMetrics,
len(stackdriverTimeSeriesFromStats), blobFromStats)
}
// Examining the metric descriptors too.
if !reflect.DeepEqual(stackdriverMetricDescriptorsFromMetrics, stackdriverMetricDescriptorsFromStats) {
t.Errorf("StackdriverMetricDescriptorsFromMetrics:\n%v\nStackdriverMetricDescriptors:\n%v\n\n",
stackdriverMetricDescriptorsFromMetrics, stackdriverMetricDescriptorsFromStats)
}
}
type metricsAgent struct {
mu sync.RWMutex
metrics []*agentmetricspb.ExportMetricsServiceRequest
stackdriverTimeSeries []*monitoringpb.CreateTimeSeriesRequest
stackdriverMetricDescriptors []*monitoringpb.CreateMetricDescriptorRequest
}
func createMockAgent(t *testing.T) (*metricsAgent, string, func()) {
ln, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Failed to bind to an available address: %v", err)
}
ma := new(metricsAgent)
srv := grpc.NewServer()
agentmetricspb.RegisterMetricsServiceServer(srv, ma)
monitoringpb.RegisterMetricServiceServer(srv, ma)
go func() {
_ = srv.Serve(ln)
}()
stop := func() {
srv.Stop()
_ = ln.Close()
}
_, agentPortStr, _ := net.SplitHostPort(ln.Addr().String())
return ma, ":" + agentPortStr, stop
}
func (ma *metricsAgent) Export(mes agentmetricspb.MetricsService_ExportServer) error {
// Expecting the first message to contain the Node information
firstMetric, err := mes.Recv()
if err != nil {
return err
}
if firstMetric == nil || firstMetric.Node == nil {
return errors.New("Expecting a non-nil Node in the first message")
}
ma.addMetric(firstMetric)
for {
msg, err := mes.Recv()
if err != nil {
return err
}
ma.addMetric(msg)
}
}
func (ma *metricsAgent) addMetric(metric *agentmetricspb.ExportMetricsServiceRequest) {
ma.mu.Lock()
ma.metrics = append(ma.metrics, metric)
ma.mu.Unlock()
}
func (ma *metricsAgent) forEachRequest(fn func(*agentmetricspb.ExportMetricsServiceRequest)) {
ma.mu.RLock()
defer ma.mu.RUnlock()
for _, req := range ma.metrics {
fn(req)
}
}
func (ma *metricsAgent) forEachStackdriverTimeSeries(fn func(sdt *monitoringpb.CreateTimeSeriesRequest)) {
ma.mu.RLock()
defer ma.mu.RUnlock()
for _, sdt := range ma.stackdriverTimeSeries {
fn(sdt)
}
}
func (ma *metricsAgent) forEachStackdriverMetricDescriptor(fn func(sdmd *monitoringpb.CreateMetricDescriptorRequest)) {
ma.mu.RLock()
defer ma.mu.RUnlock()
for _, sdmd := range ma.stackdriverMetricDescriptors {
fn(sdmd)
}
}
func (ma *metricsAgent) resetStackdriverTimeSeries() {
ma.mu.Lock()
ma.stackdriverTimeSeries = ma.stackdriverTimeSeries[:0]
ma.mu.Unlock()
}
func (ma *metricsAgent) resetStackdriverMetricDescriptors() {
ma.mu.Lock()
ma.stackdriverMetricDescriptors = ma.stackdriverMetricDescriptors[:0]
ma.mu.Unlock()
}
var _ monitoringpb.MetricServiceServer = (*metricsAgent)(nil)
func (ma *metricsAgent) GetMetricDescriptor(ctx context.Context, req *monitoringpb.GetMetricDescriptorRequest) (*googlemetricpb.MetricDescriptor, error) {
return new(googlemetricpb.MetricDescriptor), nil
}
func (ma *metricsAgent) CreateMetricDescriptor(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest) (*googlemetricpb.MetricDescriptor, error) {
ma.mu.Lock()
ma.stackdriverMetricDescriptors = append(ma.stackdriverMetricDescriptors, req)
ma.mu.Unlock()
return req.MetricDescriptor, nil
}
func (ma *metricsAgent) CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*empty.Empty, error) {
ma.mu.Lock()
ma.stackdriverTimeSeries = append(ma.stackdriverTimeSeries, req)
ma.mu.Unlock()
return new(empty.Empty), nil
}
func (ma *metricsAgent) ListTimeSeries(ctx context.Context, req *monitoringpb.ListTimeSeriesRequest) (*monitoringpb.ListTimeSeriesResponse, error) {
return new(monitoringpb.ListTimeSeriesResponse), nil
}
func (ma *metricsAgent) DeleteMetricDescriptor(ctx context.Context, req *monitoringpb.DeleteMetricDescriptorRequest) (*empty.Empty, error) {
return new(empty.Empty), nil
}
func (ma *metricsAgent) ListMetricDescriptors(ctx context.Context, req *monitoringpb.ListMetricDescriptorsRequest) (*monitoringpb.ListMetricDescriptorsResponse, error) {
return new(monitoringpb.ListMetricDescriptorsResponse), nil
}
func (ma *metricsAgent) GetMonitoredResourceDescriptor(ctx context.Context, req *monitoringpb.GetMonitoredResourceDescriptorRequest) (*monitoredrespb.MonitoredResourceDescriptor, error) {
return new(monitoredrespb.MonitoredResourceDescriptor), nil
}
func (ma *metricsAgent) ListMonitoredResourceDescriptors(ctx context.Context, req *monitoringpb.ListMonitoredResourceDescriptorsRequest) (*monitoringpb.ListMonitoredResourceDescriptorsResponse, error) {
return new(monitoringpb.ListMonitoredResourceDescriptorsResponse), nil
}
func jsonBlob(v interface{}) []byte {
blob, _ := json.MarshalIndent(v, "", " ")
return blob
}