blob: 5c8b755299af7c0039dd6545b3e4184024d3d12b [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggrmetrics
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/tsmon"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
cfgpb "go.chromium.org/luci/cv/api/config/v2"
"go.chromium.org/luci/cv/internal/configs/prjcfg/prjcfgtest"
"go.chromium.org/luci/cv/internal/cvtesting"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestDriver(t *testing.T) {
t.Parallel()
Convey("Driver smoke test", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
_ = New(ctx, ct.TQDispatcher)
})
Convey("Driver works", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
mSent := func(fields ...interface{}) interface{} {
return ct.TSMonSentValue(ctx, testMetric, fields...)
}
d := Driver{
aggregators: []aggregator{
&testAggregator{name: "agg"},
},
}
tsmon.RegisterCallbackIn(ctx, d.tsmonCallback)
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldBeEmpty)
Convey("No projects", func() {
So(d.Cron(ctx), ShouldBeNil)
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldBeEmpty)
})
Convey("With one project", func() {
prjcfgtest.Create(ctx, "first", &cfgpb.Config{})
So(d.Cron(ctx), ShouldBeNil)
So(tsmon.Flush(ctx), ShouldBeNil)
Convey("Reports new data", func() {
So(ct.TSMonStore.GetAll(ctx), ShouldHaveLength, 1)
So(mSent("first", "agg"), ShouldEqual, 1001)
})
Convey("Next flush doesn't send the old data", func() {
ct.Clock.Add(time.Minute)
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldBeEmpty)
})
Convey("Does not report old data", func() {
ct.Clock.Add(time.Minute)
So(d.Cron(ctx), ShouldBeNil)
// Simulate very delayed flush.
ct.Clock.Add(reportTTL + time.Second)
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldBeEmpty)
})
Convey("Reports only the newest data if a flush was delayed", func() {
So(d.Cron(ctx), ShouldBeNil)
// Change aggregator name to detect which data was sent.
d.aggregators[0].(*testAggregator).name = "latest"
ct.Clock.Add(time.Minute)
So(d.Cron(ctx), ShouldBeNil)
// Finally, flush.
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldHaveLength, 1)
So(mSent("first", "latest"), ShouldEqual, 1001)
So(mSent("first", "agg"), ShouldBeNil)
})
Convey("With another project", func() {
ct.Clock.Add(activeProjectsTTL)
prjcfgtest.Create(ctx, "second", &cfgpb.Config{})
So(d.Cron(ctx), ShouldBeNil)
Convey("Reports data for both projects", func() {
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldHaveLength, 2)
So(mSent("first", "agg"), ShouldEqual, 1001)
So(mSent("second", "agg"), ShouldEqual, 1002)
Convey("With the second project disabled, reports just the first project", func() {
ct.Clock.Add(activeProjectsTTL)
prjcfgtest.Disable(ctx, "second")
So(d.Cron(ctx), ShouldBeNil)
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldHaveLength, 1)
So(mSent("first", "agg"), ShouldEqual, 1001)
})
})
})
Convey("Erroring out aggregator is ignored", func() {
d.aggregators = append(d.aggregators, &testAggregator{name: "err", err: errors.New("oops")})
ct.Clock.Add(time.Minute)
So(d.Cron(ctx), ShouldErrLike, "oops")
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldHaveLength, 1)
So(mSent("first", "agg"), ShouldEqual, 1001)
So(mSent("first", "err"), ShouldBeNil)
})
Convey("Another OK aggregator", func() {
d.aggregators = append(d.aggregators, &testAggregator{name: "xxz"})
ct.Clock.Add(time.Minute)
So(d.Cron(ctx), ShouldBeNil)
So(tsmon.Flush(ctx), ShouldBeNil)
So(ct.TSMonStore.GetAll(ctx), ShouldHaveLength, 2)
So(mSent("first", "agg"), ShouldEqual, 1001)
So(mSent("first", "xxz"), ShouldEqual, 1001)
})
})
})
}
var testMetric = metric.NewInt("test/aggrmetrics", "test only", nil, field.String("project"), field.String("name"))
type testAggregator struct {
name string
err error
cnt int32
}
func (t *testAggregator) metrics() []types.Metric {
return []types.Metric{testMetric}
}
func (t *testAggregator) prepare(ctx context.Context, activeProjects stringset.Set) (reportFunc, error) {
cnt := atomic.AddInt32(&t.cnt, 1)
if t.err != nil {
return nil, t.err
}
vals := make(map[string]int64, len(activeProjects))
for rank, p := range activeProjects.ToSortedSlice() {
vals[p] = int64(1000 + 1 + rank)
}
logging.Debugf(ctx, "testAggregator %q [%d]: computed %d values", t.name, cnt, len(vals))
return func(ctx context.Context) {
logging.Debugf(ctx, "testAggregator %q [%d]: reporting %d values", t.name, cnt, len(vals))
for p, v := range vals {
testMetric.Set(ctx, v, p, t.name)
}
}, nil
}