blob: 37a7e9991076d96259f77ed321db63af446dacef [file]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package inventory
import (
"context"
"fmt"
"net/http"
"runtime/debug"
"strings"
"time"
"github.com/golang/protobuf/proto"
"go.chromium.org/chromiumos/infra/proto/go/device"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/gae/service/info"
"go.chromium.org/luci/grpc/prpc"
"go.chromium.org/luci/server/auth"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/chromiumos/infra/proto/go/lab"
api "infra/appengine/cros/lab_inventory/api/v1"
invV1Api "infra/appengine/crosskylabadmin/api/fleet/v1"
"infra/libs/skylab/inventory"
)
type invServiceClient struct {
client api.InventoryClient
}
func newInvServiceClient(ctx context.Context, host string) (inventoryClient, error) {
t, err := auth.GetRPCTransport(ctx, auth.AsCredentialsForwarder)
if err != nil {
return nil, errors.Annotate(err, "failed to get RPC transport to inventory service").Err()
}
ic := api.NewInventoryPRPCClient(&prpc.Client{
C: &http.Client{Transport: t},
Host: host,
Options: &prpc.Options{
UserAgent: fmt.Sprintf("%s/%s", info.AppID(ctx), info.VersionID(ctx)),
},
})
return &invServiceClient{client: ic}, nil
}
func (c *invServiceClient) logInfo(ctx context.Context, t string, s ...interface{}) {
logging.Infof(ctx, fmt.Sprintf("[InventoryV2Client]: %s", t), s...)
}
func (c *invServiceClient) addManyDUTsToFleet(ctx context.Context, nds []*inventory.CommonDeviceSpecs, pickServoPort bool) (string, []*inventory.CommonDeviceSpecs, error) {
// In case there's any panic happens in the new code.
defer func() {
if r := recover(); r != nil {
logging.Errorf(ctx, "Recovered in addManyDUTsToFleet(%s)", r)
debug.PrintStack()
}
}()
c.logInfo(ctx, "Access inventory service as user: %s", auth.CurrentUser(ctx))
devicesToAdd, labstations, _, err := api.ImportFromV1DutSpecs(nds)
if err != nil {
logging.Errorf(ctx, "failed to import DUT specs: %s", err)
return "", nil, err
}
if len(devicesToAdd) == 0 {
devicesToAdd = labstations
}
var rsp *api.AddCrosDevicesResponse
f := func() error {
rsp, err = c.client.AddCrosDevices(ctx, &api.AddCrosDevicesRequest{
Devices: devicesToAdd,
PickServoPort: pickServoPort,
})
return err
}
err = retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "addManyDUTsToFleet v2"))
if err != nil {
return "", nil, err
}
passedDevSpecs := getPassedDevSpecs(nds, rsp.GetPassedDevices())
failedDevices := rsp.GetFailedDevices()
msgs := make([]string, 0, len(failedDevices))
for _, d := range failedDevices {
msgs = append(msgs, d.GetErrorMsg())
}
if len(msgs) > 0 {
err = errors.Reason(strings.Join(msgs, ",")).Err()
}
return "URL N/A", passedDevSpecs, err
}
func (c *invServiceClient) getAssetsFromRegistration(ctx context.Context, assetIDList *api.AssetIDList) (*api.AssetResponse, error) {
// In case there's any panic happens in the new code.
defer func() {
if r := recover(); r != nil {
logging.Errorf(ctx, "Recovered in getAssetsFromRegistration(%s)", r)
debug.PrintStack()
}
}()
c.logInfo(ctx, "Getting Assets from Registration System as user : %s", auth.CurrentUser(ctx))
// getting existing assets from registration system
return c.client.GetAssets(ctx, assetIDList)
}
func (c *invServiceClient) updateAssetsInRegistration(ctx context.Context, assetList *api.AssetList) (*api.AssetResponse, error) {
// In case there's any panic happens in the new code.
defer func() {
if r := recover(); r != nil {
logging.Errorf(ctx, "Recovered in updateAssetsInRegistration(%s)", r)
debug.PrintStack()
}
}()
c.logInfo(ctx, "Updating Registration System as user : %s", auth.CurrentUser(ctx))
// Update existing assets to registration system
return c.client.UpdateAssets(ctx, assetList)
}
func getPassedDevSpecs(allDevSpecs []*inventory.CommonDeviceSpecs, passedDevices []*api.DeviceOpResult) []*inventory.CommonDeviceSpecs {
// The response only has hostname/id, so we need to get other data from the
// input.
nameToSpec := map[string]*inventory.CommonDeviceSpecs{}
passedDevSpecs := make([]*inventory.CommonDeviceSpecs, 0, len(allDevSpecs))
for i := range allDevSpecs {
nameToSpec[allDevSpecs[i].GetHostname()] = allDevSpecs[i]
}
for _, d := range passedDevices {
pd := nameToSpec[d.GetHostname()]
id := d.GetId()
pd.Id = &id // The ID may be newly assigned by inventory service.
passedDevSpecs = append(passedDevSpecs, pd)
}
return passedDevSpecs
}
func (c *invServiceClient) updateDUTSpecs(ctx context.Context, od, nd *inventory.CommonDeviceSpecs, pickServoPort bool) (string, error) {
defer func() {
if r := recover(); r != nil {
logging.Errorf(ctx, "Recovered in updateDUTSpecs(%s)", r)
}
}()
devicesToUpdate, labstations, _, err := api.ImportFromV1DutSpecs([]*inventory.CommonDeviceSpecs{nd})
if err != nil {
logging.Errorf(ctx, "failed to import DUT specs: %s", err)
return "", err
}
if len(devicesToUpdate) == 0 {
devicesToUpdate = labstations
}
cleanPreDeployFields(devicesToUpdate)
f := func() error {
if rsp, err := c.client.UpdateCrosDevicesSetup(ctx, &api.UpdateCrosDevicesSetupRequest{
Devices: devicesToUpdate,
PickServoPort: pickServoPort,
// TODO (guocb) Add reason why update.
}); err != nil {
return err
} else if len(rsp.FailedDevices) > 0 {
// There's only one device under updating.
return errors.Reason(rsp.FailedDevices[0].ErrorMsg).Err()
}
return nil
}
err = retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "updateDUTSpecs v2"))
if err != nil {
return "", errors.Annotate(err, "update DUT spects").Err()
}
return "URL N/A", nil
}
func (c *invServiceClient) deleteDUTsFromFleet(ctx context.Context, ids []string) (string, []string, error) {
defer func() {
if r := recover(); r != nil {
logging.Errorf(ctx, "Recovered in deleteDUTsFromFleet(%s)", r)
}
}()
return "fake delete URL", []string{}, nil
}
func (c *invServiceClient) selectDutsFromInventory(ctx context.Context, sel *invV1Api.DutSelector) ([]*inventory.DeviceUnderTest, error) {
defer func() {
if r := recover(); r != nil {
logging.Errorf(ctx, "Recovered in deleteDUTsFromFleet(%s)", r)
}
}()
var rsp *api.GetCrosDevicesResponse
f := func() (err error) {
ids := []*api.DeviceID{}
if sel.GetHostname() != "" {
ids = append(ids, &api.DeviceID{
Id: &api.DeviceID_Hostname{Hostname: sel.GetHostname()},
})
}
if sel.GetId() != "" {
ids = append(ids, &api.DeviceID{
Id: &api.DeviceID_ChromeosDeviceId{ChromeosDeviceId: sel.GetId()},
})
}
var models []string
if sel.GetModel() != "" {
models = append(models, sel.GetModel())
}
rsp, err = c.client.GetCrosDevices(ctx, &api.GetCrosDevicesRequest{Ids: ids, Models: models})
return
}
err := retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "selectorDutsFromInventory v2"))
if err != nil {
return nil, nil
}
result := []*inventory.DeviceUnderTest{}
for _, d := range rsp.GetData() {
if r, err := api.AdaptToV1DutSpec(d); err != nil {
continue
} else {
result = append(result, r)
}
}
return result, nil
}
func (c *invServiceClient) commitBalancePoolChanges(ctx context.Context, changes []*invV1Api.PoolChange) (string, error) {
ids := make([]*api.DeviceID, len(changes))
changesMap := map[string]*invV1Api.PoolChange{}
for i := range changes {
ids[i] = &api.DeviceID{
Id: &api.DeviceID_ChromeosDeviceId{ChromeosDeviceId: changes[i].GetDutId()},
}
changesMap[changes[i].GetDutId()] = changes[i]
}
var rsp *api.GetCrosDevicesResponse
f := func() (err error) {
rsp, err = c.client.GetCrosDevices(ctx, &api.GetCrosDevicesRequest{Ids: ids})
return
}
err := retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "commitBalancePoolChanges v2: Get Devices"))
if err != nil {
return "", err
}
changedDuts := make([]*lab.ChromeOSDevice, 0, len(rsp.GetData()))
for _, d := range rsp.GetData() {
dev := d.GetLabConfig()
dut := dev.GetDut()
if dut == nil {
continue
}
change := changesMap[dev.GetId().GetValue()]
if changePool(dut, change.OldPool, change.NewPool) {
changedDuts = append(changedDuts, dev)
}
}
if len(changedDuts) == 0 {
logging.Debugf(ctx, "no pool changes to commit")
return "", nil
}
var rspCommit *api.UpdateCrosDevicesSetupResponse
f = func() (err error) {
rspCommit, err = c.client.UpdateCrosDevicesSetup(ctx, &api.UpdateCrosDevicesSetupRequest{
Devices: changedDuts,
Reason: "balance pool",
PickServoPort: false,
})
return
}
err = retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "commitBalancePoolChanges v2: Commit changes"))
if err != nil {
return "", err
}
failedDevices := rspCommit.GetFailedDevices()
msgs := make([]string, 0, len(failedDevices))
for _, d := range failedDevices {
msgs = append(msgs, d.GetErrorMsg())
}
if len(msgs) > 0 {
return "", errors.Reason(strings.Join(msgs, ",")).Err()
}
return "URL N/A", nil
}
func changePool(dut *lab.DeviceUnderTest, oldPool, newPool string) bool {
pools := dut.GetPools()
modified := false
for i, p := range pools {
if p == oldPool {
pools[i] = newPool
modified = true
}
}
return modified
}
func (c *invServiceClient) getDutInfo(ctx context.Context, req *invV1Api.GetDutInfoRequest) ([]byte, time.Time, error) {
var devID *api.DeviceID
now := time.Now().UTC()
if req.Id != "" {
devID = &api.DeviceID{Id: &api.DeviceID_ChromeosDeviceId{ChromeosDeviceId: req.Id}}
} else {
devID = &api.DeviceID{Id: &api.DeviceID_Hostname{Hostname: req.Hostname}}
}
var rsp *api.GetCrosDevicesResponse
f := func() (err error) {
rsp, err = c.client.GetCrosDevices(ctx, &api.GetCrosDevicesRequest{
Ids: []*api.DeviceID{devID},
})
return err
}
err := retry.Retry(ctx, transientErrorRetries(), f, retry.LogCallback(ctx, "GetDutInfo v2"))
if err != nil {
return nil, now, err
}
if devices := rsp.GetData(); len(devices) == 1 {
dut, err := api.AdaptToV1DutSpec(devices[0])
if err != nil {
return nil, now, errors.Annotate(err, "adapt to v1 format").Err()
}
data, err := proto.Marshal(dut)
if err != nil {
return nil, now, errors.Annotate(err, "marshal dut proto to bytes").Err()
}
return data, now, nil
}
if devices := rsp.GetFailedDevices(); len(devices) == 1 {
if msg := devices[0].ErrorMsg; strings.Contains(msg, "No such host:") || strings.Contains(msg, "datastore: no such entity") {
return nil, now, status.Errorf(codes.NotFound, msg)
}
return nil, now, errors.Reason(devices[0].ErrorMsg).Err()
}
return nil, now, errors.Reason("GetDutInfo %#v failed. No data responsed in either passed or failed list!", req).Err()
}
func (c *invServiceClient) deviceConfigsExists(ctx context.Context, configIds []*device.ConfigId) (map[int32]bool, error) {
// In case there's any panic happens in the new code.
defer func() {
if r := recover(); r != nil {
logging.Errorf(ctx, "Recovered in deviceConfigsExists(%s)", r)
debug.PrintStack()
}
}()
// checking if device configs exists in Inventory V2
resp, err := c.client.DeviceConfigsExists(ctx, &api.DeviceConfigsExistsRequest{ConfigIds: configIds})
if err != nil {
c.logInfo(ctx, "Unable to check if device configs exists", err)
return nil, err
}
return resp.GetExists(), err
}
// cleanPreDeployFields resets values for the fields re-generated during deployment.
func cleanPreDeployFields(devices []*lab.ChromeOSDevice) {
for _, d := range devices {
if dut := d.GetDut(); dut != nil {
servo := dut.GetPeripherals().GetServo()
if servo != nil {
servo.ServoType = ""
servo.ServoTopology = nil
}
}
}
}