blob: d5c02f8fac15c2d703be7a167675654442a006f4 [file]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package frontend
import (
"context"
"fmt"
"sync"
"time"
swarming "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.chromium.org/luci/common/data/strpair"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/grpc/grpcutil"
fleet "infra/appengine/crosskylabadmin/api/fleet/v1"
"infra/appengine/crosskylabadmin/app/clients"
"infra/appengine/crosskylabadmin/app/config"
swarming_utils "infra/appengine/crosskylabadmin/app/frontend/internal/swarming"
"infra/cros/lab_inventory/utilization"
)
// SwarmingFactory is a constructor for a SwarmingClient.
type SwarmingFactory func(c context.Context, host string) (clients.SwarmingClient, error)
// TrackerServerImpl implements the fleet.TrackerServer interface.
type TrackerServerImpl struct {
// SwarmingFactory is an optional factory function for creating clients.
//
// If SwarmingFactory is nil, clients.NewSwarmingClient is used.
SwarmingFactory SwarmingFactory
}
func (tsi *TrackerServerImpl) newSwarmingClient(c context.Context, host string) (clients.SwarmingClient, error) {
if tsi.SwarmingFactory != nil {
return tsi.SwarmingFactory(c, host)
}
return clients.NewSwarmingClient(c, host)
}
// PushBotsForAdminTasks implements the fleet.Tracker.pushBotsForAdminTasks() method.
func (tsi *TrackerServerImpl) PushBotsForAdminTasks(ctx context.Context, req *fleet.PushBotsForAdminTasksRequest) (res *fleet.PushBotsForAdminTasksResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(ctx, err)
}()
cfg := config.Get(ctx)
sc, err := tsi.newSwarmingClient(ctx, cfg.Swarming.Host)
if err != nil {
return nil, errors.Annotate(err, "failed to obtain Swarming client").Err()
}
dutState, ok := clients.DutStateRevMap[req.GetTargetDutState()]
if !ok {
return nil, fmt.Errorf("DutState=%#v does not map to swarming value", req.GetTargetDutState())
}
// Schedule admin tasks to idle DUTs.
dims := make(strpair.Map)
dims[clients.DutStateDimensionKey] = []string{dutState}
bots, err := sc.ListAliveIdleBotsInPool(ctx, cfg.Swarming.BotPool, dims)
if err != nil {
reason := fmt.Sprintf("failed to list alive idle cros bots with dut_state %q", dutState)
return nil, errors.Annotate(err, reason).Err()
}
logging.Infof(ctx, "successfully get %d alive idle cros bots with dut_state %q.", len(bots), dutState)
// Parse BOT id to schedule tasks for readability.
repairBOTs := identifyBotsForRepair(ctx, bots)
err = clients.PushRepairDUTs(ctx, repairBOTs, dutState)
if err != nil {
logging.Infof(ctx, "push repair bots: %v", err)
return nil, errors.New("failed to push repair duts")
}
return &fleet.PushBotsForAdminTasksResponse{}, nil
}
// PushBotsForAdminAuditTasks implements the fleet.Tracker.pushBotsForAdminTasks() method.
func (tsi *TrackerServerImpl) PushBotsForAdminAuditTasks(ctx context.Context, req *fleet.PushBotsForAdminAuditTasksRequest) (res *fleet.PushBotsForAdminAuditTasksResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(ctx, err)
}()
dutStates := map[fleet.DutState]bool{
fleet.DutState_Ready: true,
fleet.DutState_NeedsRepair: true,
fleet.DutState_NeedsReset: true,
fleet.DutState_RepairFailed: true,
fleet.DutState_NeedsManualRepair: true,
fleet.DutState_NeedsReplacement: false,
fleet.DutState_NeedsDeploy: false,
}
var actions []string
var taskname string
switch req.Task {
case fleet.AuditTask_ServoUSBKey:
actions = []string{"verify-servo-usb-drive"}
taskname = "USB-drive"
case fleet.AuditTask_DUTStorage:
actions = []string{"verify-dut-storage"}
taskname = "Storage"
dutStates[fleet.DutState_RepairFailed] = false
dutStates[fleet.DutState_NeedsManualRepair] = false
case fleet.AuditTask_RPMConfig:
actions = []string{"verify-rpm-config"}
taskname = "RPM Config"
dutStates[fleet.DutState_RepairFailed] = false
dutStates[fleet.DutState_NeedsManualRepair] = false
}
if len(actions) == 0 {
logging.Infof(ctx, "No action specified", err)
return nil, errors.New("failed to push audit bots")
}
cfg := config.Get(ctx)
sc, err := tsi.newSwarmingClient(ctx, cfg.Swarming.Host)
if err != nil {
return nil, errors.Annotate(err, "failed to obtain Swarming client").Err()
}
// Schedule audit tasks to ready|needs_repair|needs_reset|repair_failed DUTs.
var bots []*swarming.SwarmingRpcsBotInfo
f := func() (err error) {
dims := make(strpair.Map)
bots, err = sc.ListAliveBotsInPool(ctx, cfg.Swarming.BotPool, dims)
return err
}
err = retry.Retry(ctx, simple3TimesRetry(), f, retry.LogCallback(ctx, "Try get list of the BOTs"))
if err != nil {
return nil, errors.Annotate(err, "failed to list alive cros bots").Err()
}
logging.Infof(ctx, "successfully get %d alive cros bots", len(bots))
botIDs := identifyBotsForAudit(ctx, bots, dutStates, req.Task)
err = clients.PushAuditDUTs(ctx, botIDs, actions, taskname)
if err != nil {
logging.Infof(ctx, "failed push audit bots: %v", err)
return nil, errors.New("failed to push audit bots")
}
return &fleet.PushBotsForAdminAuditTasksResponse{}, nil
}
// PushRepairJobsForLabstations implements the fleet.Tracker.pushLabstationsForRepair() method.
func (tsi *TrackerServerImpl) PushRepairJobsForLabstations(ctx context.Context, req *fleet.PushRepairJobsForLabstationsRequest) (res *fleet.PushRepairJobsForLabstationsResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(ctx, err)
}()
cfg := config.Get(ctx)
sc, err := tsi.newSwarmingClient(ctx, cfg.Swarming.Host)
if err != nil {
return nil, errors.Annotate(err, "failed to obtain Swarming client").Err()
}
// Schedule repair jobs to idle labstations. It's for periodically checking
// and rebooting labstations to ensure they're in good state.
dims := make(strpair.Map)
dims[clients.DutOSDimensionKey] = []string{"OS_TYPE_LABSTATION"}
bots, err := sc.ListAliveIdleBotsInPool(ctx, cfg.Swarming.BotPool, dims)
if err != nil {
return nil, errors.Annotate(err, "failed to list alive idle labstation bots").Err()
}
logging.Infof(ctx, "successfully get %d alive idle labstation bots.", len(bots))
// Parse BOT id to schedule tasks for readability.
botIDs := identifyLabstationsForRepair(ctx, bots)
err = clients.PushRepairLabstations(ctx, botIDs)
if err != nil {
logging.Infof(ctx, "push repair labstations: %v", err)
return nil, errors.New("failed to push repair labstations")
}
return &fleet.PushRepairJobsForLabstationsResponse{}, nil
}
// ReportBots reports metrics of swarming bots.
func (tsi *TrackerServerImpl) ReportBots(ctx context.Context, req *fleet.ReportBotsRequest) (res *fleet.ReportBotsResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(ctx, err)
}()
cfg := config.Get(ctx)
sc, err := tsi.newSwarmingClient(ctx, cfg.Swarming.Host)
if err != nil {
return nil, errors.Annotate(err, "failed to obtain Swarming client").Err()
}
bots, err := sc.ListAliveBotsInPool(ctx, cfg.Swarming.BotPool, strpair.Map{})
utilization.ReportMetrics(ctx, flattenAndDedpulicateBots([][]*swarming.SwarmingRpcsBotInfo{bots}))
return &fleet.ReportBotsResponse{}, nil
}
// getBotsFromSwarming lists bots by calling the Swarming service.
func getBotsFromSwarming(ctx context.Context, sc clients.SwarmingClient, pool string, sels []*fleet.BotSelector) ([]*swarming.SwarmingRpcsBotInfo, error) {
// No filters implies get all bots.
if len(sels) == 0 {
bots, err := sc.ListAliveBotsInPool(ctx, pool, strpair.Map{})
if err != nil {
return nil, errors.Annotate(err, "failed to get bots in pool %s", pool).Err()
}
return bots, nil
}
bots := make([][]*swarming.SwarmingRpcsBotInfo, 0, len(sels))
// Protects access to bots
m := &sync.Mutex{}
err := parallel.WorkPool(clients.MaxConcurrentSwarmingCalls, func(workC chan<- func() error) {
for i := range sels {
// In-scope variable for goroutine closure.
sel := sels[i]
workC <- func() error {
bs, ierr := getFilteredBotsFromSwarming(ctx, sc, pool, sel)
if ierr != nil {
return ierr
}
m.Lock()
defer m.Unlock()
bots = append(bots, bs)
return nil
}
}
})
return flattenAndDedpulicateBots(bots), err
}
// getFilteredBotsFromSwarming lists bots for a single selector by calling the
// Swarming service.
//
// This function is intended to be used in a parallel.WorkPool().
func getFilteredBotsFromSwarming(ctx context.Context, sc clients.SwarmingClient, pool string, sel *fleet.BotSelector) ([]*swarming.SwarmingRpcsBotInfo, error) {
dims := make(strpair.Map)
if id := sel.GetDutId(); id != "" {
dims[clients.DutIDDimensionKey] = []string{id}
}
if m := sel.GetDimensions().GetModel(); m != "" {
dims[clients.DutModelDimensionKey] = []string{m}
}
if p := sel.GetDimensions().GetPools(); len(p) > 0 {
dims[clients.DutPoolDimensionKey] = p
}
if n := sel.GetDimensions().GetDutName(); n != "" {
dims[clients.DutNameDimensionKey] = []string{n}
}
if len(dims) == 0 {
return nil, fmt.Errorf("empty selector %v", sel)
}
bs, err := sc.ListAliveBotsInPool(ctx, pool, dims)
if err != nil {
return nil, errors.Annotate(err, "failed to get bots in pool %s with dimensions %s", pool, dims).Err()
}
return bs, nil
}
func flattenAndDedpulicateBots(nb [][]*swarming.SwarmingRpcsBotInfo) []*swarming.SwarmingRpcsBotInfo {
bm := make(map[string]*swarming.SwarmingRpcsBotInfo)
for _, bs := range nb {
for _, b := range bs {
bm[b.BotId] = b
}
}
bots := make([]*swarming.SwarmingRpcsBotInfo, 0, len(bm))
for _, v := range bm {
bots = append(bots, v)
}
return bots
}
var dutStatesForRepairTask = map[fleet.DutState]bool{
fleet.DutState_NeedsRepair: true,
fleet.DutState_RepairFailed: true,
fleet.DutState_NeedsManualRepair: true,
}
// identifyBotsForRepair identifies duts that need run admin repair.
func identifyBotsForRepair(ctx context.Context, bots []*swarming.SwarmingRpcsBotInfo) (repairBOTs []string) {
repairBOTs = make([]string, 0, len(bots))
for _, b := range bots {
dims := swarming_utils.DimensionsMap(b.Dimensions)
os, err := swarming_utils.ExtractSingleValuedDimension(dims, clients.DutOSDimensionKey)
if err != nil || os == "OS_TYPE_LABSTATION" {
continue
}
id, err := swarming_utils.ExtractSingleValuedDimension(dims, clients.BotIDDimensionKey)
if err != nil {
logging.Warningf(ctx, "failed to obtain BOT id for bot %q", b.BotId)
continue
}
s := clients.GetStateDimension(b.Dimensions)
if dutStatesForRepairTask[s] {
logging.Infof(ctx, "BOT: %s - Needs repair", id)
repairBOTs = append(repairBOTs, id)
}
}
return repairBOTs
}
// identifyBotsForAudit identifies duts to run admin audit.
func identifyBotsForAudit(ctx context.Context, bots []*swarming.SwarmingRpcsBotInfo, dutStateMap map[fleet.DutState]bool, auditTask fleet.AuditTask) []string {
logging.Infof(ctx, "Filtering bots for task: %s", auditTask)
botIDs := make([]string, 0, len(bots))
for _, b := range bots {
dims := swarming_utils.DimensionsMap(b.Dimensions)
os, err := swarming_utils.ExtractSingleValuedDimension(dims, clients.DutOSDimensionKey)
if err != nil || os == "OS_TYPE_LABSTATION" {
continue
}
id, err := swarming_utils.ExtractSingleValuedDimension(dims, clients.BotIDDimensionKey)
if err != nil {
logging.Warningf(ctx, "failed to obtain BOT id for bot %q", b.BotId)
continue
}
switch auditTask {
case fleet.AuditTask_ServoUSBKey:
state := swarming_utils.ExtractBotState(b).ServoUSBState
if len(state) > 0 && state[0] == "NEED_REPLACEMENT" {
logging.Infof(ctx, "Skipping BOT with id: %q as USB-key marked for replacement", b.BotId)
continue
}
case fleet.AuditTask_DUTStorage:
state := swarming_utils.ExtractBotState(b).StorageState
if len(state) > 0 && state[0] == "NEED_REPLACEMENT" {
logging.Infof(ctx, "Skipping BOT with id: %q as storage marked for replacement", b.BotId)
continue
}
case fleet.AuditTask_RPMConfig:
state := swarming_utils.ExtractBotState(b).RpmState
if len(state) > 0 && state[0] != "UNKNOWN" {
// expecting that RPM is going through check everytime when we do any update on setup.
logging.Infof(ctx, "Skipping BOT with id: %q as RPM was already audited", b.BotId)
continue
}
}
s := clients.GetStateDimension(b.Dimensions)
if v, ok := dutStateMap[s]; ok && v {
botIDs = append(botIDs, id)
} else {
logging.Infof(ctx, "Skipping BOT with id: %q", b.BotId)
}
}
return botIDs
}
// identifyLabstationsForRepair identifies labstations that need repair.
func identifyLabstationsForRepair(ctx context.Context, bots []*swarming.SwarmingRpcsBotInfo) []string {
botIDs := make([]string, 0, len(bots))
for _, b := range bots {
dims := swarming_utils.DimensionsMap(b.Dimensions)
os, err := swarming_utils.ExtractSingleValuedDimension(dims, clients.DutOSDimensionKey)
if err != nil {
logging.Warningf(ctx, "failed to obtain os type for bot %q", b.BotId)
continue
} else if os != "OS_TYPE_LABSTATION" {
continue
}
id, err := swarming_utils.ExtractSingleValuedDimension(dims, clients.BotIDDimensionKey)
if err != nil {
logging.Warningf(ctx, "failed to obtain BOT id for bot %q", b.BotId)
continue
}
botIDs = append(botIDs, id)
}
return botIDs
}
// simple3TimesRetryIterator simple retry iterator to try 3 times.
var simple3TimesRetryIterator = retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 200 * time.Millisecond,
Retries: 3,
},
}
// simple3TimesRetry returns a retry.Factory based on simple3TimesRetryIterator.
func simple3TimesRetry() retry.Factory {
return func() retry.Iterator {
return &simple3TimesRetryIterator
}
}