blob: e1f12a2153031f361b4b6ed4f7994e8f109a9a52 [file] [log] [blame]
// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package dumper
import (
"context"
"fmt"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/tsmon"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/monitor"
"go.chromium.org/luci/common/tsmon/store"
"go.chromium.org/luci/common/tsmon/target"
"go.chromium.org/luci/server"
tsmonsrv "go.chromium.org/luci/server/tsmon"
"infra/cros/dutstate"
invV1 "infra/libs/skylab/inventory"
ufspb "infra/unifiedfleet/api/v1/models"
"infra/unifiedfleet/app/config"
"infra/unifiedfleet/app/model/inventory"
"infra/unifiedfleet/app/model/registration"
"infra/unifiedfleet/app/util"
)
// inventoryCounter collects number of DUTs per bucket and status.
type inventoryCounter map[bucket]int
// suMetric is the metric name for scheduling unit count.
var suMetric = metric.NewInt(
"chromeos/skylab/inventory/scheduling_unit_count",
"The number of scheduling units in a given bucket",
nil,
field.String("board"),
field.String("model"),
field.String("pool"),
field.String("environment"),
field.String("zone"),
field.String("swarming_instance"),
field.String("status"),
)
// Only MachineLSEs for chromeOS and browser
var utilizationExportNamespaces = []string{util.OSNamespace, util.BrowserNamespace}
var suMetricState *tsmon.State
// initializeUFSInventoryTsmonState creates a tsmon.State for tracking the suMetric.
func initializeUFSInventoryTsmonState(srv *server.Server) error {
suMetricState = tsmon.NewState()
suMetricState.SetStore(store.NewInMemory(&target.Task{
DataCenter: "appengine",
ServiceName: srv.Options.TsMonServiceName,
JobName: srv.Options.TsMonJobName,
HostName: srv.Options.Hostname,
}))
suMetricState.InhibitGlobalCallbacksOnFlush()
var mon monitor.Monitor
switch {
case srv.Options.Prod && srv.Options.TsMonAccount != "":
var err error
mon, err = tsmonsrv.NewProdXMonitor(srv.Context, 4096, srv.Options.TsMonAccount)
if err != nil {
return err
}
case !srv.Options.Prod:
mon = monitor.NewDebugMonitor("")
default:
mon = monitor.NewNilMonitor()
}
suMetricState.SetMonitor(mon)
return nil
}
// reportUFSInventoryCronHandler runs the ufs dut metric collection
// for each namespace in `utilizationExportNamespaces`
func reportUFSInventoryCronHandler(ctx context.Context) (err error) {
var errs []error
for _, ns := range utilizationExportNamespaces {
datastoreNamespace := util.ClientToDatastoreNamespace[ns]
ctx, err = util.SetupDatastoreNamespace(ctx, datastoreNamespace)
if err != nil {
errs = append(errs, err)
continue
}
if err := reportUFSInventoryForNamespace(ctx, ns); err != nil {
errs = append(errs, err)
continue
}
}
return errors.Append(errs...)
}
// reportUFSInventoryForNamespace push the ufs duts metrics to tsmon
func reportUFSInventoryForNamespace(ctx context.Context, ns string) (err error) {
logging.Infof(ctx, "Reporting UFS inventory DUT metrics for namespace %s", ns)
env := config.Get(ctx).SelfStorageBucket
// Get all the MachineLSEs
lses, err := getAllMachineLSEs(ctx, false)
if err != nil {
return err
}
idTolseMap := make(map[string]*ufspb.MachineLSE, 0)
for _, lse := range lses {
idTolseMap[lse.GetName()] = lse
}
// Get all Machines
machines, err := getAllMachines(ctx, false)
if err != nil {
return err
}
idTomachineMap := make(map[string]*ufspb.Machine, 0)
for _, machine := range machines {
idTomachineMap[machine.GetName()] = machine
}
logging.Infof(ctx, "Found %d LSEs and %d machines", len(lses), len(machines))
// Scheduling Units are OS namespace only
var lseInSUnitMap map[string]bool
c := make(inventoryCounter)
if ns == util.OSNamespace {
sUnits, err := getAllSchedulingUnits(ctx, false)
if err != nil {
return err
}
// Map for MachineLSEs associated with SchedulingUnit for easy search.
lseInSUnitMap = make(map[string]bool)
for _, su := range sUnits {
if len(su.GetMachineLSEs()) > 0 {
suLses := make([]*ufspb.MachineLSE, len(su.GetMachineLSEs()))
for i, lseID := range su.GetMachineLSEs() {
suLses[i] = idTolseMap[lseID]
}
b, err := getBucketForSchedulingUnit(su, suLses, idTomachineMap, env)
if err != nil {
logging.Warningf(ctx, err.Error())
continue
}
c[*b]++
for _, lseName := range su.GetMachineLSEs() {
lseInSUnitMap[lseName] = true
}
}
}
}
for _, lse := range lses {
name := lse.GetName()
if ns == util.OSNamespace && lseInSUnitMap[name] {
continue
}
machine, err := getMachineForLse(lse, idTomachineMap)
if err != nil {
logging.Warningf(ctx, err.Error())
continue
}
b := getBucketForDevice(lse, machine, env, ns)
c[*b]++
}
logging.Infof(ctx, "report UFS inventory metrics for %d devices", len(c))
// Add metric state to context
mctx := tsmon.WithState(ctx, suMetricState)
// Reset the metric to stop reporting no-longer-existing devices and states.
defer suMetricState.Store().Reset(mctx, suMetric)
// Report the metrics and flush
for b, count := range c {
logging.Infof(ctx, "bucket: %s, number: %d", b.String(), count)
suMetric.Set(mctx, int64(count), b.board, b.model, b.pool, b.environment, b.zone, b.swarmingInstance, b.status)
}
if err := suMetricState.ParallelFlush(ctx, nil, 32); err != nil {
return errors.Annotate(err, "failed to flush values to monitoring").Err()
}
return nil
}
// getMachineForLse returns the Machine that's attached to the MachineLSE
// iff the MachineLSE references exactly one Machine
func getMachineForLse(lse *ufspb.MachineLSE, idTomachineMap map[string]*ufspb.Machine) (*ufspb.Machine, error) {
machines := lse.GetMachines()
if n := len(machines); n != 1 {
return nil, errors.Reason("report ufs inventory cron handler: %d machines %v associated with %q", n, machines, lse.GetName()).Err()
}
machine, ok := idTomachineMap[machines[0]]
if !ok {
return nil, errors.Reason("report ufs inventory cron handler: machine %s not found for LSE %s", machines[0], lse.GetName()).Err()
}
return machine, nil
}
// getBucketForDevice instantiates a *bucket for a given MachineLSE and
// corresponding Machine
func getBucketForDevice(lse *ufspb.MachineLSE, machine *ufspb.Machine, env string, ns string) *bucket {
b := &bucket{
board: machine.GetChromeosMachine().GetBuildTarget(),
model: machine.GetChromeosMachine().GetModel(),
pool: "[None]",
environment: env,
zone: lse.GetZone(),
swarmingInstance: "[None]",
status: dutstate.ConvertFromUFSState(lse.GetResourceState()).String(),
}
if dut := lse.GetChromeosMachineLse().GetDeviceLse().GetDut(); dut != nil {
b.pool = getReportPool(dut.GetPools())
}
if labstation := lse.GetChromeosMachineLse().GetDeviceLse().GetLabstation(); labstation != nil {
b.pool = getReportPool(labstation.GetPools())
}
switch ns {
case util.OSNamespace:
b.swarmingInstance = "chromeos-swarming"
case util.BrowserNamespace:
b.swarmingInstance = lse.GetOwnership().GetSwarmingInstance()
}
return b
}
// machineFieldToValueFunc is a helper type for extracting the DUT fields
// for a given scheduling unit
type machineFieldToValueFunc func(machine *ufspb.Machine) string
var (
machineBoardValueFunc = func(machine *ufspb.Machine) string { return machine.GetChromeosMachine().GetBuildTarget() }
machineModelValueFunc = func(machine *ufspb.Machine) string { return machine.GetChromeosMachine().GetModel() }
)
// getBucketForSchedulingUnit instantiates a *bucket for a given SchedulingUnit
// and corresponding DUTs.
// Depending on the ExposeType, the bucket dimensions are based on a combination
// of the primary DUT values and an aggregate on all DUTs
func getBucketForSchedulingUnit(su *ufspb.SchedulingUnit, lses []*ufspb.MachineLSE, idTomachineMap map[string]*ufspb.Machine, env string) (*bucket, error) {
b := &bucket{
board: "[None]",
model: "[None]",
pool: getReportPool(su.GetPools()),
environment: env,
zone: "[None]",
swarmingInstance: "chromeos-swarming",
status: schedulingUnitStatusFromLses(lses),
}
// fields from all DUTs
switch su.GetExposeType() {
case ufspb.SchedulingUnit_DEFAULT:
fallthrough
case ufspb.SchedulingUnit_DEFAULT_PLUS_PRIMARY:
board, err := schedulingUnitLabelForLses(lses, idTomachineMap, machineBoardValueFunc)
if err != nil {
return nil, err
}
model, err := schedulingUnitLabelForLses(lses, idTomachineMap, machineModelValueFunc)
if err != nil {
return nil, err
}
b.board = board
b.model = model
case ufspb.SchedulingUnit_STRICTLY_PRIMARY_ONLY:
// nothing from all DUTs
default:
return nil, errors.Reason("Unknown SchedulingUnit Expose Type for %s", su.GetName()).Err()
}
// fields from primary
var primaryLse *ufspb.MachineLSE
for _, lse := range lses {
if lse.GetName() == su.GetPrimaryDut() {
primaryLse = lse
break
}
}
switch su.GetExposeType() {
case ufspb.SchedulingUnit_STRICTLY_PRIMARY_ONLY:
if primaryLse == nil {
return nil, errors.Reason("Could not find primary MachineLSE %s for scheduling unit %s", su.GetPrimaryDut(), su.GetName()).Err()
}
machine, err := getMachineForLse(primaryLse, idTomachineMap)
if err != nil {
return nil, err
}
b.board = machine.GetChromeosMachine().GetBuildTarget()
b.model = machine.GetChromeosMachine().GetModel()
fallthrough
case ufspb.SchedulingUnit_DEFAULT_PLUS_PRIMARY:
if primaryLse == nil {
return nil, errors.Reason("Could not find primary MachineLSE %s for scheduling unit %s", su.GetPrimaryDut(), su.GetName()).Err()
}
b.zone = primaryLse.GetZone()
case ufspb.SchedulingUnit_DEFAULT:
// nothing from the primary DUT
default:
return nil, errors.Reason("Unknown SchedulingUnit Expose Type for %s", su.GetName()).Err()
}
return b, nil
}
// schedulingUnitLabelForLses calculates an overall label for a scheduling unit
// given a list of MachineLSEs
func schedulingUnitLabelForLses(lses []*ufspb.MachineLSE, idTomachineMap map[string]*ufspb.Machine, f machineFieldToValueFunc) (string, error) {
machines := make([]*ufspb.Machine, len(lses))
for i, lse := range lses {
machine, err := getMachineForLse(lse, idTomachineMap)
if err != nil {
return "", err
}
machines[i] = machine
}
labelSet := make(map[string]struct{}) // Set of all label values
for _, machine := range machines {
machineLabel := f(machine)
if len(machineLabel) > 0 {
labelSet[machineLabel] = struct{}{}
}
}
labels := make([]string, 0, len(labelSet))
for k := range labelSet {
labels = append(labels, k)
}
return summarizeValues(labels), nil
}
// schedulingUnitStatusFromLses calculates a weighted status based on all DUTs
// to represent the scheduling unit
func schedulingUnitStatusFromLses(lses []*ufspb.MachineLSE) string {
states := make([]string, len(lses))
for i, lse := range lses {
s := dutstate.ConvertFromUFSState(lse.GetResourceState()).String()
states[i] = s
}
return util.SchedulingUnitDutState(states)
}
// bucket contains static DUT dimensions.
//
// These dimensions do not change often. If all DUTs with a given set of
// dimensions are removed, the related metric is not automatically reset. The
// metric will get reset eventually.
type bucket struct {
board string
model string
pool string
environment string
zone string
swarmingInstance string
status string
}
func (b *bucket) String() string {
return fmt.Sprintf("board: %s, model: %s, pool: %s, env: %s, zone: %q, swarmingInstance: %s, status: %s", b.board, b.model, b.pool, b.environment, b.zone, b.swarmingInstance, b.status)
}
func summarizeValues(vs []string) string {
switch len(vs) {
case 0:
return "[None]"
case 1:
return vs[0]
default:
return "[Multiple]"
}
}
func isManagedPool(p string) bool {
_, ok := invV1.SchedulableLabels_DUTPool_value[p]
return ok
}
func getReportPool(pools []string) string {
p := summarizeValues(pools)
if isManagedPool(p) {
return fmt.Sprintf("managed:%s", p)
}
return p
}
func getAllMachineLSEs(ctx context.Context, keysOnly bool) ([]*ufspb.MachineLSE, error) {
var lses []*ufspb.MachineLSE
for startToken := ""; ; {
res, nextToken, err := inventory.ListMachineLSEs(ctx, pageSize, startToken, nil, keysOnly)
if err != nil {
return nil, errors.Annotate(err, "get all MachineLSEs").Err()
}
lses = append(lses, res...)
if nextToken == "" {
break
}
startToken = nextToken
}
return lses, nil
}
func getAllMachines(ctx context.Context, keysOnly bool) ([]*ufspb.Machine, error) {
var machines []*ufspb.Machine
for startToken := ""; ; {
res, nextToken, err := registration.ListMachines(ctx, pageSize, startToken, nil, keysOnly)
if err != nil {
return nil, errors.Annotate(err, "get all Machines").Err()
}
machines = append(machines, res...)
if nextToken == "" {
break
}
startToken = nextToken
}
return machines, nil
}