blob: 4c0413cc9bfb98c5dfbdb08d79b21504fa583990 [file]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package inventory
import (
"context"
"fmt"
"time"
"github.com/golang/protobuf/proto"
"go.chromium.org/chromiumos/infra/proto/go/device"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/gae/service/datastore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
api "infra/appengine/cros/lab_inventory/api/v1"
fleet "infra/appengine/crosskylabadmin/api/fleet/v1"
dsinventory "infra/appengine/crosskylabadmin/app/frontend/internal/datastore/inventory"
"infra/appengine/crosskylabadmin/app/gitstore"
"infra/libs/skylab/inventory"
)
type inventoryClient interface {
addManyDUTsToFleet(context.Context, []*inventory.CommonDeviceSpecs, bool) (string, []*inventory.CommonDeviceSpecs, error)
getAssetsFromRegistration(ctx context.Context, assetList *api.AssetIDList) (*api.AssetResponse, error)
updateAssetsInRegistration(ctx context.Context, assetList *api.AssetList) (*api.AssetResponse, error)
updateDUTSpecs(context.Context, *inventory.CommonDeviceSpecs, *inventory.CommonDeviceSpecs, bool) (string, error)
deleteDUTsFromFleet(context.Context, []string) (string, []string, error)
selectDutsFromInventory(context.Context, *fleet.DutSelector) ([]*inventory.DeviceUnderTest, error)
commitBalancePoolChanges(context.Context, []*fleet.PoolChange) (string, error)
getDutInfo(context.Context, *fleet.GetDutInfoRequest) ([]byte, time.Time, error)
deviceConfigsExists(context.Context, []*device.ConfigId) (map[int32]bool, error)
}
type gitStoreClient struct {
store *gitstore.InventoryStore
}
func newGitStoreClient(ctx context.Context, gs *gitstore.InventoryStore) (inventoryClient, error) {
return &gitStoreClient{
store: gs,
}, nil
}
func (client *gitStoreClient) addManyDUTsToFleet(ctx context.Context, nds []*inventory.CommonDeviceSpecs, pickServoPort bool) (string, []*inventory.CommonDeviceSpecs, error) {
return addManyDUTsToFleet(ctx, client.store, nds, pickServoPort)
}
func (client *gitStoreClient) getAssetsFromRegistration(ctx context.Context, assetList *api.AssetIDList) (*api.AssetResponse, error) {
// Nothing to get from registration for gitStoreClient
return nil, errors.New("gitStoreClient - nothing to get from registration")
}
func (client *gitStoreClient) updateAssetsInRegistration(ctx context.Context, assetList *api.AssetList) (*api.AssetResponse, error) {
// Nothing to update in registration for gitStoreClient
return nil, errors.New("gitStoreClient - nothing to update in registration")
}
func (client *gitStoreClient) updateDUTSpecs(ctx context.Context, od, nd *inventory.CommonDeviceSpecs, pickServoPort bool) (string, error) {
return updateDUTSpecs(ctx, client.store, od, nd, pickServoPort)
}
func (client *gitStoreClient) deleteDUTsFromFleet(ctx context.Context, ids []string) (string, []string, error) {
return deleteDUTsFromFleet(ctx, client.store, ids)
}
func (client *gitStoreClient) selectDutsFromInventory(ctx context.Context, sel *fleet.DutSelector) ([]*inventory.DeviceUnderTest, error) {
return selectDutsFromInventory(ctx, client.store, sel)
}
func (client *gitStoreClient) commitBalancePoolChanges(ctx context.Context, changes []*fleet.PoolChange) (string, error) {
return commitBalancePoolChanges(ctx, client.store, changes)
}
func (client *gitStoreClient) deviceConfigsExists(ctx context.Context, configIds []*device.ConfigId) (map[int32]bool, error) {
// Nothing to check for device configs in gitStore
return nil, errors.New("gitStoreClient - nothing to check for device configs in gitStore")
}
func (client *gitStoreClient) getDutInfo(ctx context.Context, req *fleet.GetDutInfoRequest) ([]byte, time.Time, error) {
var dut *dsinventory.DeviceUnderTest
var now time.Time
var err error
if req.Id != "" {
dut, err = dsinventory.GetSerializedDUTByID(ctx, req.Id)
} else {
dut, err = dsinventory.GetSerializedDUTByHostname(ctx, req.Hostname)
}
if err != nil {
if datastore.IsErrNoSuchEntity(err) {
return nil, now, status.Errorf(codes.NotFound, err.Error())
}
return nil, now, err
}
return dut.Data, dut.Updated, nil
}
func addManyDUTsToFleet(ctx context.Context, s *gitstore.InventoryStore, nds []*inventory.CommonDeviceSpecs, pickServoPort bool) (string, []*inventory.CommonDeviceSpecs, error) {
var respURL string
newDeviceToID := make(map[*inventory.CommonDeviceSpecs]string)
f := func() error {
var ds []*inventory.CommonDeviceSpecs
for _, nd := range nds {
ds = append(ds, proto.Clone(nd).(*inventory.CommonDeviceSpecs))
}
if err := s.Refresh(ctx); err != nil {
return errors.Annotate(err, "add dut to fleet").Err()
}
// New cache after refreshing store.
c := newGlobalInvCache(ctx, s)
for i, d := range ds {
hostname := d.GetHostname()
logging.Infof(ctx, "add device to fleet: %s", hostname)
if _, ok := c.hostnameToID[hostname]; ok {
logging.Infof(ctx, "dut with hostname %s already exists, skip adding", hostname)
continue
}
if pickServoPort && !hasServoPortAttribute(d) {
if err := assignNewServoPort(s.Lab.Duts, d); err != nil {
logging.Infof(ctx, "fail to assign new servo port, skip adding")
continue
}
}
nid := addDUTToStore(s, d)
newDeviceToID[nds[i]] = nid
}
// TODO(ayatane): Implement this better than just regenerating the cache.
c = newGlobalInvCache(ctx, s)
for _, id := range newDeviceToID {
if _, err := assignDUT(ctx, c, id); err != nil {
return errors.Annotate(err, "add dut to fleet").Err()
}
}
firstHostname := "<empty>"
if len(ds) > 0 {
firstHostname = ds[0].GetHostname()
}
url, err := s.Commit(ctx, fmt.Sprintf("Add %d new DUT(s) : %s ...", len(ds), firstHostname))
if err != nil {
return errors.Annotate(err, "add dut to fleet").Err()
}
respURL = url
for _, nd := range nds {
id := newDeviceToID[nd]
nd.Id = &id
}
return nil
}
err := retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "addManyDUTsToFleet"))
newDevices := make([]*inventory.CommonDeviceSpecs, 0)
for nd := range newDeviceToID {
newDevices = append(newDevices, nd)
}
return respURL, newDevices, err
}
// updateDUTSpecs updates the DUT specs for an existing DUT in the inventory.
func updateDUTSpecs(ctx context.Context, s *gitstore.InventoryStore, od, nd *inventory.CommonDeviceSpecs, pickServoPort bool) (string, error) {
var respURL string
f := func() error {
// Clone device specs before modifications so that changes don't leak
// across retries.
d := proto.Clone(nd).(*inventory.CommonDeviceSpecs)
if err := s.Refresh(ctx); err != nil {
return errors.Annotate(err, "add new dut to inventory").Err()
}
if pickServoPort && !hasServoPortAttribute(d) {
if err := assignNewServoPort(s.Lab.Duts, d); err != nil {
return errors.Annotate(err, "add dut to fleet").Err()
}
}
dut, exists := getDUTByID(s.Lab, od.GetId())
if !exists {
return status.Errorf(codes.NotFound, "no DUT with ID %s", od.GetId())
}
// TODO(crbug/929776) DUTs under deployment are not marked specially in the
// inventory yet. This causes two problems:
// - Another admin task (say repair) may get scheduled on the new bot
// before the deploy task we create.
// - If the deploy task fails, the DUT will still enter the fleet, but may
// not be ready for use.
if !proto.Equal(dut.GetCommon(), od) {
return errors.Reason("DUT specs update conflict").Err()
}
dut.Common = d
url, err := s.Commit(ctx, fmt.Sprintf("Update DUT %s", od.GetId()))
if err != nil {
return errors.Annotate(err, "update DUT specs").Err()
}
respURL = url
return nil
}
err := retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "updateDUTSpecs"))
return respURL, err
}
func deleteDUTsFromFleet(ctx context.Context, s *gitstore.InventoryStore, ids []string) (string, []string, error) {
var changeURL string
var removedIDs []string
f := func() error {
if err2 := s.Refresh(ctx); err2 != nil {
return err2
}
removedDUTs := removeDUTWithHostnames(s, ids)
url, err2 := s.Commit(ctx, fmt.Sprintf("delete %d duts", len(removedDUTs)))
if gitstore.IsEmptyErr(err2) {
return nil
}
if err2 != nil {
return err2
}
// Captured variables only on success, hence at most once.
changeURL = url
removedIDs = make([]string, 0, len(removedDUTs))
for _, d := range removedDUTs {
removedIDs = append(removedIDs, d.GetCommon().GetId())
}
return nil
}
err := retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "DeleteDut"))
return changeURL, removedIDs, err
}
func selectDutsFromInventory(ctx context.Context, store *gitstore.InventoryStore, sel *fleet.DutSelector) ([]*inventory.DeviceUnderTest, error) {
if err := store.Refresh(ctx); err != nil {
return nil, err
}
duts, err := GetDutsByEnvironment(ctx, store)
if err != nil {
return nil, nil
}
var filteredDuts []*inventory.DeviceUnderTest
for _, d := range duts {
if sel == nil || dutMatchesSelector(d, sel) {
filteredDuts = append(filteredDuts, d)
}
}
return filteredDuts, nil
}
func commitBalancePoolChanges(ctx context.Context, store *gitstore.InventoryStore, changes []*fleet.PoolChange) (string, error) {
if len(changes) == 0 {
// No inventory changes are required.
// TODO(pprabhu) add a unittest enforcing this.
return "", nil
}
if err := applyChanges(ctx, store.Lab, changes); err != nil {
return "", errors.Annotate(err, "apply balance pool changes").Err()
}
return store.Commit(ctx, "balance pool")
}