blob: cc2f76acbd4b6e875a98c65621aeff3af4e4a295 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggrmetrics
import (
"context"
"sync"
"time"
"golang.org/x/sync/errgroup"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/common/tsmon/distribution"
"go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/configs/prjcfg"
"go.chromium.org/luci/cv/internal/metrics"
"go.chromium.org/luci/cv/internal/run"
)
const (
// maxRuns limits how many Runs can probably be processed in-process
// reasonably well given 10s minute deadline.
//
// If there are actually equal or more than this many Runs, then
// runsAggregator will refuse to work to avoid giving incorrect / partial
// data.
maxRuns = 2000
// maxRunsWorkingSet limits how many Runs are loaded into RAM at the same
// time.
maxRunsWorkingSet = 500
)
type runsAggregator struct {
}
// metrics implements aggregator interface.
func (r *runsAggregator) metrics() []types.Metric {
return []types.Metric{
metrics.Public.PendingRunCount,
metrics.Public.PendingRunDuration,
metrics.Public.MaxPendingRunAge,
metrics.Public.ActiveRunCount,
metrics.Public.ActiveRunDuration,
}
}
// report implements aggregator interface.
func (r *runsAggregator) report(ctx context.Context, projects []string) error {
eg, ectx := errgroup.WithContext(ctx)
var pendingRunKeys []*datastore.Key
var activeRunKeys []*datastore.Key
var runStats *runStats
eg.Go(func() (err error) {
q := datastore.NewQuery(common.RunKind).Eq("Status", run.Status_PENDING)
switch pendingRunKeys, err = loadRunKeys(ectx, q, maxRuns+1); {
case err != nil:
return err
case len(pendingRunKeys) == maxRuns+1:
// Outright refuse sending incomplete data.
logging.Errorf(ctx, "FIXME: too many pending runs (>%d) to report aggregated metrics for", maxRuns)
return errors.New("too many pending Runs")
default:
return nil
}
})
eg.Go(func() (err error) {
q := datastore.NewQuery(common.RunKind).
Lt("Status", run.Status_ENDED_MASK).
Gt("Status", run.Status_PENDING)
switch activeRunKeys, err = loadRunKeys(ectx, q, maxRuns+1); {
case err != nil:
return err
case len(activeRunKeys) == maxRuns+1:
// Outright refuse sending incomplete data.
logging.Errorf(ctx, "FIXME: too many active runs (>%d) to report aggregated metrics for", maxRuns)
return errors.New("too many active Runs")
default:
return nil
}
})
eg.Go(func() (err error) {
runStats, err = initRunStats(ectx, projects)
return err
})
if err := eg.Wait(); err != nil {
return err
}
eg, ectx = errgroup.WithContext(ctx)
now := clock.Now(ctx)
eg.Go(func() error {
return iterRuns(ectx, pendingRunKeys, maxRunsWorkingSet, func(r *run.Run) {
runStats.addPending(r, now)
})
})
eg.Go(func() error {
return iterRuns(ectx, activeRunKeys, maxRunsWorkingSet, func(r *run.Run) {
runStats.addActive(r, now)
})
})
if err := eg.Wait(); err != nil {
return err
}
runStats.report(ctx)
return nil
}
type runStats struct {
pendingRunCounts map[runFields]int64
pendingRunDurations map[runFields]*distribution.Distribution
maxPendingRunAge map[runFields]time.Duration
activeRunCounts map[runFields]int64
activeRunDurations map[runFields]*distribution.Distribution
}
type runFields struct {
project, configGroup, mode string
}
func makeRunFields(r *run.Run) runFields {
return runFields{
project: r.ID.LUCIProject(),
configGroup: r.ConfigGroupID.Name(),
mode: string(r.Mode),
}
}
func (f runFields) toMetricFields() []interface{} {
return []interface{}{f.project, f.configGroup, f.mode}
}
func initRunStats(ctx context.Context, projects []string) (*runStats, error) {
rs := &runStats{
pendingRunCounts: make(map[runFields]int64),
pendingRunDurations: make(map[runFields]*distribution.Distribution),
maxPendingRunAge: make(map[runFields]time.Duration),
activeRunCounts: make(map[runFields]int64),
activeRunDurations: make(map[runFields]*distribution.Distribution),
}
var rsMu sync.Mutex
// TODO: Support `GetLatestMetas` and `GetConfigGroups` in prjcfg package
// and use those APIs instead of parallel.WorkPool to reduce the call to
// datastore.
err := parallel.WorkPool(min(8, len(projects)), func(work chan<- func() error) {
for _, project := range projects {
project := project
work <- func() error {
switch meta, err := prjcfg.GetLatestMeta(ctx, project); {
case err != nil:
return err
case meta.Status != prjcfg.StatusEnabled:
// race condition: by the time Project config is loaded, the Project
// is disabled, skipping this Project as we won't expect any
// non-ended runs from this Project. Even if there's any, they will
// be ended very soon.
return nil
default:
rsMu.Lock()
defer rsMu.Unlock()
// pre-populate all combinations of
// project+config_group+(DRY_RUN|FULL_RUN) so that zero value is
// reported.
for _, cgName := range meta.ConfigGroupNames {
for _, standardMode := range []run.Mode{run.DryRun, run.FullRun} {
f := runFields{
project: project,
configGroup: cgName,
mode: string(standardMode),
}
rs.activeRunCounts[f] = 0
rs.activeRunDurations[f] = distribution.New(metrics.Public.ActiveRunDuration.Bucketer())
rs.pendingRunCounts[f] = 0
rs.pendingRunDurations[f] = distribution.New(metrics.Public.PendingRunDuration.Bucketer())
rs.maxPendingRunAge[f] = time.Duration(0)
}
}
return nil
}
}
}
})
if err != nil {
return nil, err
}
return rs, nil
}
func (rs *runStats) addPending(r *run.Run, now time.Time) {
switch {
case r.Status != run.Status_PENDING:
// Since the Run is loaded after the query, the Run might change the status.
// Ignore this type of runs.
return
case r.CreateTime.After(now):
// This should be rare, yet may happen if this process' time is
// somewhat behind time of a concurrent process which has just created a new
// Run. It's better to skip this newly created Run for later than report a
// negative duration for it.
return
}
runFields := makeRunFields(r)
rs.pendingRunCounts[runFields] = rs.pendingRunCounts[runFields] + 1
if _, ok := rs.pendingRunDurations[runFields]; !ok {
rs.pendingRunDurations[runFields] = distribution.New(metrics.Public.PendingRunDuration.Bucketer())
}
dur := now.Sub(r.CreateTime)
rs.pendingRunDurations[runFields].Add(float64(dur.Milliseconds()))
if rs.maxPendingRunAge[runFields] == 0 || dur > rs.maxPendingRunAge[runFields] {
rs.maxPendingRunAge[runFields] = dur
}
}
func (rs *runStats) addActive(r *run.Run, now time.Time) {
switch {
case run.IsEnded(r.Status):
// Since the Run is loaded after the query, the Run may be already finished.
// Such Runs are no longer considered active and shouldn't be reported.
return
case r.StartTime.After(now):
// This should be rare, yet may happen if this process' time is
// somewhat behind time of a concurrent process which has just started a new
// Run. It's better to skip this newly started Run for later than report a
// negative duration for it.
return
}
runFields := makeRunFields(r)
rs.activeRunCounts[runFields] = rs.activeRunCounts[runFields] + 1
if _, ok := rs.activeRunDurations[runFields]; !ok {
rs.activeRunDurations[runFields] = distribution.New(metrics.Public.ActiveRunDuration.Bucketer())
}
rs.activeRunDurations[runFields].Add(now.Sub(r.StartTime).Seconds())
}
func (rs *runStats) report(ctx context.Context) {
for fields, cnt := range rs.pendingRunCounts {
metrics.Public.PendingRunCount.Set(ctx, cnt, fields.toMetricFields()...)
}
for fields, dist := range rs.pendingRunDurations {
metrics.Public.PendingRunDuration.Set(ctx, dist, fields.toMetricFields()...)
}
for fields, dur := range rs.maxPendingRunAge {
metrics.Public.MaxPendingRunAge.Set(ctx, dur.Milliseconds(), fields.toMetricFields()...)
}
for fields, cnt := range rs.activeRunCounts {
metrics.Public.ActiveRunCount.Set(ctx, cnt, fields.toMetricFields()...)
}
for fields, dist := range rs.activeRunDurations {
metrics.Public.ActiveRunDuration.Set(ctx, dist, fields.toMetricFields()...)
}
}
// loadRunKeys returns only the keys of the Runs matching the given query.
func loadRunKeys(ctx context.Context, q *datastore.Query, limit int32) ([]*datastore.Key, error) {
q = q.Limit(limit).KeysOnly(true)
var out []*datastore.Key
switch err := datastore.GetAll(ctx, q, &out); {
case ctx.Err() != nil:
logging.Warningf(ctx, "%s while fetching %s", ctx.Err(), q)
return nil, ctx.Err()
case err != nil:
return nil, errors.Annotate(err, "failed to fetch Runs").Tag(transient.Tag).Err()
case len(out) == int(limit):
logging.Errorf(ctx, "FIXME: %s fetched exactly the limit of Runs; reported data is incomplete", q)
}
return out, nil
}
// iterRuns calls clbk function on each Run represented by its key.
//
// Loads Runs in batches to avoid excessive RAM consumption.
func iterRuns(ctx context.Context, keys []*datastore.Key, bufSize int, clbk func(r *run.Run)) error {
batch := make([]*run.Run, 0, min(bufSize, len(keys)))
for len(keys) > 0 {
batch = batch[:0]
for i := 0; i < min(len(keys), cap(batch)); i++ {
batch = append(batch, &run.Run{ID: common.RunID(keys[i].StringID())})
}
keys = keys[len(batch):]
if err := datastore.Get(ctx, batch); err != nil {
return errors.Annotate(err, "failed to load Runs").Tag(transient.Tag).Err()
}
for _, r := range batch {
clbk(r)
}
}
return nil
}
func min(i, j int) int {
if i < j {
return i
}
return j
}