blob: b80b89940da2d68f894070dbf44dfaa72724f935 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggrmetrics
import (
"context"
"math"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/tsmon/distribution"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/run"
)
var (
// TODO(tandrii): remove "internal" from metrics below and make it available
// to users once we are comfortable with its target & fields.
metricActiveRunsCount = metric.NewInt(
"cv/internal/runs/active/count",
"Count of active Runs.",
nil,
field.String("project"),
field.String("status"),
)
metricActiveRunsDurationsS = metric.NewNonCumulativeDistribution(
"cv/internal/runs/active/durations",
"Ages of active Runs",
&types.MetricMetadata{Units: types.Seconds},
// Bucketer for 1s .. ~7d range.
//
// Not accurate above 1h.
// TODO(tandrii): add another metric with a fixed width bucketer spanning
// range up to 2 hours, which is what most projects care about.
distribution.GeometricBucketer(math.Pow(10, 0.06), 100),
field.String("project"),
)
)
const (
// maxRuns limits how many Runs can probably be processed in-process
// reasonably well given 10s minute deadline.
//
// If there are actually equal or more than this many Runs, then
// runsAggregator will refuse to work to avoid giving incorrect / partial
// data.
maxRuns = 2000
// maxRunsWorkingSet limits how many Runs are loaded into RAM at the same
// time.
maxRunsWorkingSet = 500
)
type runsAggregator struct {
}
// metrics implements aggregator interface.
func (r *runsAggregator) metrics() []types.Metric {
return []types.Metric{
metricActiveRunsCount,
metricActiveRunsDurationsS,
}
}
// metrics implements aggregator interface.
func (r *runsAggregator) prepare(ctx context.Context, activeProjects stringset.Set) (reportFunc, error) {
now := clock.Now(ctx)
keys, err := loadActiveRuns(ctx, maxRuns+1)
switch {
case err != nil:
return nil, err
case len(keys) == maxRuns+1:
// Outright refuse sending incomplete data.
logging.Errorf(ctx, "FIXME: too many active runs (>%d) to report aggregated metrics for", maxRuns)
return nil, errors.New("too many active Runs")
}
stats := make(map[string]*projectStat, len(activeProjects))
for p := range activeProjects {
stats[p] = newProjectStat()
}
err = iterRuns(ctx, keys, maxRunsWorkingSet, func(r *run.Run) {
name := r.ID.LUCIProject()
p, exists := stats[name]
if !exists {
// Although rare, this can happen if a new project has just been added.
p = newProjectStat()
stats[name] = p
}
p.add(r, now)
})
if err != nil {
return nil, err
}
return func(ctx context.Context) {
for project, ps := range stats {
ps.report(ctx, project)
}
}, nil
}
type projectStat struct {
byStatus map[run.Status]int64
sinceCreation *distribution.Distribution
}
func newProjectStat() *projectStat {
return &projectStat{
// Always set Status_RUNNING to 0, as this way all projects will always be
// reported.
byStatus: map[run.Status]int64{run.Status_RUNNING: 0},
sinceCreation: distribution.New(metricActiveRunsDurationsS.Bucketer()),
}
}
func (p *projectStat) add(r *run.Run, now time.Time) {
switch {
case run.IsEnded(r.Status):
// Since the Run is loaded after the query, the Run may be already finished.
// Such Runs are no longer active and shouldn't be reported.
return
case r.CreateTime.After(now):
// This should be rare, yet may happen if this process' time is
// somewhat behind time of a concurrent process which has just created a new
// Run. It's better to skip this newly created Run for later than report a
// negative duration for it.
return
}
p.byStatus[r.Status]++
p.sinceCreation.Add(now.Sub(r.CreateTime).Seconds())
}
func (p *projectStat) report(ctx context.Context, project string) {
for code, cnt := range p.byStatus {
status := run.Status_name[int32(code)]
metricActiveRunsCount.Set(ctx, cnt, project, status)
}
metricActiveRunsDurationsS.Set(ctx, p.sinceCreation, project)
}
// loadActiveRuns returns only the keys of the active Runs.
//
// This is a cheap query in Datastore, both in terms of time and $ cost.
func loadActiveRuns(ctx context.Context, limit int32) ([]*datastore.Key, error) {
q := datastore.NewQuery(common.RunKind).Limit(limit).KeysOnly(true).Lt("Status", run.Status_ENDED_MASK)
var out []*datastore.Key
switch err := datastore.GetAll(ctx, q, &out); {
case ctx.Err() != nil:
logging.Warningf(ctx, "%s while fetching %s", ctx.Err(), q)
return nil, ctx.Err()
case err != nil:
return nil, errors.Annotate(err, "failed to fetch active Runs").Tag(transient.Tag).Err()
case len(out) == int(limit):
logging.Errorf(ctx, "FIXME: %s fetched exactly the limit of Runs; reported data is incomplete", q)
}
return out, nil
}
// iterRuns calls clbk function on each Run represented by its key.
//
// Loads Runs in batches to avoid excessive RAM consumption.
func iterRuns(ctx context.Context, keys []*datastore.Key, bufSize int, clbk func(r *run.Run)) error {
batch := make([]*run.Run, 0, min(bufSize, len(keys)))
for len(keys) > 0 {
batch = batch[:0]
for i := 0; i < min(len(keys), cap(batch)); i++ {
batch = append(batch, &run.Run{ID: common.RunID(keys[i].StringID())})
}
keys = keys[len(batch):]
if err := datastore.Get(ctx, batch); err != nil {
return errors.Annotate(err, "failed to load Runs").Tag(transient.Tag).Err()
}
for _, r := range batch {
clbk(r)
}
}
return nil
}
func min(i, j int) int {
if i < j {
return i
}
return j
}