blob: e1265212da3a3797295dd4329552791c0c2dc52b [file]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inventory
import (
"context"
"sync"
fleet "infra/appengine/crosskylabadmin/api/fleet/v1"
"infra/appengine/crosskylabadmin/app/clients"
"infra/appengine/crosskylabadmin/app/config"
"infra/appengine/crosskylabadmin/app/frontend/internal/dutpool"
"infra/appengine/crosskylabadmin/app/frontend/internal/swarming"
"infra/libs/skylab/inventory"
"go.chromium.org/luci/common/data/strpair"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/grpc/grpcutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// BalancePools implements the method from fleet.InventoryServer interface.
func (is *ServerImpl) BalancePools(ctx context.Context, req *fleet.BalancePoolsRequest) (resp *fleet.BalancePoolsResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(ctx, err)
}()
if err = req.Validate(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
err = retry.Retry(
ctx,
transientErrorRetries(),
func() error {
var ierr error
resp, ierr = is.balancePoolsNoRetry(ctx, req)
return ierr
},
retry.LogCallback(ctx, "BalancePools"),
)
return resp, err
}
func (is *ServerImpl) balancePoolsNoRetry(ctx context.Context, req *fleet.BalancePoolsRequest) (*fleet.BalancePoolsResponse, error) {
cfg := config.Get(ctx)
sc, err := is.newSwarmingClient(ctx, cfg.Swarming.Host)
if err != nil {
return nil, err
}
botsHealth, err := getBotsHealth(ctx, sc)
if err != nil {
return nil, err
}
ic, err := is.newInventoryClient(ctx)
if err != nil {
return nil, err
}
duts, err := ic.selectDutsFromInventory(ctx, req.DutSelector)
if err != nil {
return nil, err
}
if len(duts) == 0 {
// Technically correct: No DUTs were selected so both target and spare are
// empty and healthy and no changes were required.
logging.Infof(ctx, "no duts were found based on (%s)", req.DutSelector.String())
return &fleet.BalancePoolsResponse{}, nil
}
mds := mapModelsToDUTs(duts)
resp := &fleet.BalancePoolsResponse{
ModelResult: make(map[string]*fleet.EnsurePoolHealthyResponse),
}
// Protects access to resp
mResp := &sync.Mutex{}
err = parallel.WorkPool(10, func(workC chan<- func() error) {
for m, ds := range mds {
// In-scope variables for goroutine closure.
im := m
ids := ds
workC <- func() error {
logging.Infof(ctx, "balancing pool for model: %s", m)
iResp, err2 := ensurePoolHealthyForModel(ctx, ids, botsHealth, req.TargetPool, req.SparePool, req.MaxUnhealthyDuts)
if err2 != nil {
return err2
}
mResp.Lock()
defer mResp.Unlock()
resp.ModelResult[im] = iResp
return nil
}
}
})
if err != nil {
return nil, err
}
changes := collectChanges(resp.ModelResult)
if !req.GetOptions().GetDryrun() {
u, err := ic.commitBalancePoolChanges(ctx, changes)
if err != nil {
return nil, err
}
resp.GeneratedChangeUrl = u
for _, res := range resp.ModelResult {
res.Url = u
}
}
return resp, nil
}
// ResizePool implements the method from fleet.InventoryServer interface.
func (is *ServerImpl) ResizePool(ctx context.Context, req *fleet.ResizePoolRequest) (resp *fleet.ResizePoolResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(ctx, err)
}()
if err = req.Validate(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
err = retry.Retry(
ctx,
transientErrorRetries(),
func() error {
var ierr error
resp, ierr = is.resizePoolNoRetry(ctx, req)
return ierr
},
retry.LogCallback(ctx, "resizePoolNoRetry"),
)
return resp, err
}
func (is *ServerImpl) resizePoolNoRetry(ctx context.Context, req *fleet.ResizePoolRequest) (*fleet.ResizePoolResponse, error) {
ic, err := is.newInventoryClient(ctx)
duts, err := ic.selectDutsFromInventory(ctx, req.DutSelector)
if err != nil {
return nil, err
}
changes, err := dutpool.Resize(duts, req.TargetPool, int(req.TargetPoolSize), req.SparePool)
if err != nil {
return nil, err
}
u, err := ic.commitBalancePoolChanges(ctx, changes)
if err != nil {
return nil, err
}
return &fleet.ResizePoolResponse{
Url: u,
Changes: changes,
}, nil
}
func dutMatchesSelector(d *inventory.DeviceUnderTest, sel *fleet.DutSelector) bool {
c := d.GetCommon()
if sel.Id != "" && sel.Id != c.GetId() {
return false
}
if sel.Hostname != "" && sel.Hostname != c.GetHostname() {
return false
}
if sel.Model != "" && sel.Model != c.GetLabels().GetModel() {
return false
}
return true
}
func ensurePoolHealthyForModel(ctx context.Context, duts []*inventory.DeviceUnderTest, botsHealth map[string]fleet.Health, target, spare string, maxUnhealthyDUTs int32) (*fleet.EnsurePoolHealthyResponse, error) {
pb, err := dutpool.NewBalancer(duts, target, spare)
if err != nil {
return nil, errors.Annotate(err, "ensure pool healthy").Err()
}
pb.FillInHealth(botsHealth)
logging.Debugf(ctx, "initial state: %+v", pb)
changes, failures := pb.EnsureTargetHealthy(int(maxUnhealthyDUTs))
return &fleet.EnsurePoolHealthyResponse{
Failures: failures,
TargetPoolStatus: &fleet.PoolStatus{
Size: int32(len(pb.Target)),
HealthyCount: int32(pb.TargetHealthyCount()),
},
SparePoolStatus: &fleet.PoolStatus{
Size: int32(len(pb.Spare)),
HealthyCount: int32(pb.SpareHealthyCount()),
},
Changes: changes,
}, nil
}
func collectChanges(mrs map[string]*fleet.EnsurePoolHealthyResponse) []*fleet.PoolChange {
// No way of knowning how many total changs are necessary without walking all
// the changes.
ret := make([]*fleet.PoolChange, 0)
for _, res := range mrs {
ret = append(ret, res.Changes...)
}
return ret
}
func applyChanges(ctx context.Context, lab *inventory.Lab, changes []*fleet.PoolChange) error {
logging.Infof(ctx, "%v", changes)
oldPool := make(map[string]inventory.SchedulableLabels_DUTPool)
oldSelfServePool := make(map[string]string)
newPool := make(map[string]inventory.SchedulableLabels_DUTPool)
newSelfServePool := make(map[string]string)
for _, c := range changes {
// Check if oldpool is a critical pool or a normal string self-serve pool.
// Same check for newpool below.
op, ok := inventory.SchedulableLabels_DUTPool_value[c.OldPool]
if ok {
oldPool[c.DutId] = inventory.SchedulableLabels_DUTPool(op)
} else {
logging.Debugf(ctx, "old pool: %s, not a known critical pool", c.OldPool)
oldSelfServePool[c.DutId] = c.OldPool
}
np, ok := inventory.SchedulableLabels_DUTPool_value[c.NewPool]
if ok {
newPool[c.DutId] = inventory.SchedulableLabels_DUTPool(np)
} else {
logging.Debugf(ctx, "new pool: %s, not a known critical pool", c.NewPool)
newSelfServePool[c.DutId] = c.NewPool
}
}
for _, d := range lab.Duts {
id := d.GetCommon().GetId()
np, hasNewCPool := newPool[id]
nsp, hasNewSSPool := newSelfServePool[id]
if hasNewCPool || hasNewSSPool {
// New pool is assigned. Remove old pool.
lcp := d.GetCommon().GetLabels().GetCriticalPools()
lsp := d.GetCommon().GetLabels().GetSelfServePools()
if op, ok := oldPool[id]; ok {
lcp = removeOld(lcp, op)
}
if osp, ok := oldSelfServePool[id]; ok {
lsp = removeOldSelfServePool(lsp, osp)
}
if hasNewCPool {
lcp = append(lcp, np)
d.GetCommon().GetLabels().CriticalPools = lcp
d.GetCommon().GetLabels().SelfServePools = lsp
} else {
lsp = append(lsp, nsp)
d.GetCommon().GetLabels().SelfServePools = lsp
d.GetCommon().GetLabels().CriticalPools = lcp
}
}
}
return nil
}
// Remove an old self serve pool from given pool list.
// Return the new list of self serve pool after removal.
// The first parameter is invalidated.
func removeOldSelfServePool(ls []string, old string) []string {
for i, l := range ls {
if l == old {
copy(ls[i:], ls[i+1:])
ls[len(ls)-1] = ""
return ls[:len(ls)-1]
}
}
return ls
}
// Remove an old critical pool from given pool list.
// Return the new list of critical pool after removal.
// The first parameter is invalidated.
func removeOld(ls []inventory.SchedulableLabels_DUTPool, old inventory.SchedulableLabels_DUTPool) []inventory.SchedulableLabels_DUTPool {
for i, l := range ls {
if l == old {
copy(ls[i:], ls[i+1:])
ls[len(ls)-1] = inventory.SchedulableLabels_DUT_POOL_INVALID
return ls[:len(ls)-1]
}
}
return ls
}
func mapModelsToDUTs(duts []*inventory.DeviceUnderTest) map[string][]*inventory.DeviceUnderTest {
dms := make(map[string][]*inventory.DeviceUnderTest)
for _, d := range duts {
m := d.GetCommon().GetLabels().GetModel()
dms[m] = append(dms[m], d)
}
return dms
}
func getBotsHealth(ctx context.Context, sc clients.SwarmingClient) (map[string]fleet.Health, error) {
cfg := config.Get(ctx)
bots, err := sc.ListAliveBotsInPool(ctx, cfg.Swarming.BotPool, strpair.Map{})
if err != nil {
return nil, err
}
botsHealth := make(map[string]fleet.Health, len(bots))
for _, b := range bots {
ds := clients.GetStateDimension(b.Dimensions)
dims := swarming.DimensionsMap(b.Dimensions)
dutID, err := swarming.ExtractSingleValuedDimension(dims, clients.DutIDDimensionKey)
if err != nil {
logging.Errorf(ctx, "fail to get dutID for bot %s: %s", b.BotId, err.Error())
continue
}
if healthy := clients.HealthyDutStates[ds]; healthy {
botsHealth[dutID] = fleet.Health_Healthy
} else {
botsHealth[dutID] = fleet.Health_Unhealthy
}
}
return botsHealth, nil
}