blob: f117b6c56a982cdd8baa2082338c721e22466334 [file] [log] [blame]
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package dumper
import (
"context"
_ "embed"
"fmt"
"strconv"
"strings"
"sync"
"cloud.google.com/go/bigquery"
"google.golang.org/api/iterator"
"google.golang.org/genproto/protobuf/field_mask"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
ufspb "go.chromium.org/infra/unifiedfleet/api/v1/models"
"go.chromium.org/infra/unifiedfleet/app/config"
"go.chromium.org/infra/unifiedfleet/app/controller"
"go.chromium.org/infra/unifiedfleet/app/model/registration"
"go.chromium.org/infra/unifiedfleet/app/util"
)
const (
// Nlyte "DUT STD" Material ID
nlyteMaterialIDChromeOSAsset = 25144
)
var (
//go:embed nlyte_os_bq_sync.sql
nlyteQueryGetMountedDuts string
)
type assetResult struct {
ID int
Type string
Tag, Serial, Row, Rack bigquery.NullString
HostNumber int
Model, Board, Zone bigquery.NullString
}
func (a assetResult) toAssetProto() (*ufspb.Asset, error) {
var merr []error
if !a.Tag.Valid {
merr = append(merr, errors.New("missing asset tag"))
return nil, errors.Join(merr...)
}
if !a.Zone.Valid {
merr = append(merr, errors.New("missing zone"))
return nil, errors.Join(merr...)
}
zoneString := strings.ToUpper(a.Zone.StringVal)
if !strings.HasPrefix(zoneString, "ZONE_") {
zoneString = fmt.Sprintf("ZONE_%s", zoneString)
}
zoneInt, ok := ufspb.Zone_value[zoneString]
if !ok {
merr = append(merr, errors.New(fmt.Sprintf("unknown zone %q", zoneString)))
}
typeInt, ok := ufspb.AssetType_value[strings.ToUpper(a.Type)]
if !ok {
merr = append(merr, errors.New(fmt.Sprintf("unknown asset type %q", a.Type)))
}
asset := &ufspb.Asset{
Name: a.Tag.StringVal,
Type: ufspb.AssetType(typeInt),
Info: &ufspb.AssetInfo{
AssetTag: a.Tag.StringVal,
},
Location: &ufspb.Location{
Zone: ufspb.Zone(zoneInt),
Position: strconv.Itoa(a.HostNumber),
},
}
if a.Row.Valid {
asset.Location.Row = a.Row.StringVal
}
if a.Rack.Valid {
asset.Location.Rack = a.Rack.StringVal
}
if a.Model.Valid {
asset.Info.Model = a.Model.StringVal
asset.Model = a.Model.StringVal
}
if a.Board.Valid {
asset.Info.BuildTarget = a.Board.StringVal
}
if a.Serial.Valid {
asset.Info.SerialNumber = a.Serial.StringVal
}
return asset, errors.Join(merr...)
}
func fetchNlyteBigQueryOsData(ctx context.Context) (rErr error) {
logging.Debugf(ctx, "Entering OS Nlyte Sync")
defer logging.Debugf(ctx, "Exiting OS Nlyte sync")
defer func() {
fetchNlyteBigQueryDataTick.Add(ctx, 1, rErr == nil)
}()
logging.Infof(ctx, "Setting namespce as OS")
ctx, err := util.SetupDatastoreNamespace(ctx, util.OSNamespace)
if err != nil {
return errors.Fmt("setting DataStore namespace: %w", err)
}
client, err := bigquery.NewClient(ctx, "nlyte-tng-prod")
if err != nil {
return fmt.Errorf("bigquery.NewClient: %w", err)
}
defer client.Close()
nlyteSyncCfg := config.Get(ctx).GetNlyteSyncConfig()
if nlyteSyncCfg == nil {
return errors.New("Nlyte Sync configuration not set")
}
if nlyteSyncCfg.GetZoneConfig() == nil {
logging.Warningf(ctx, "No zones configured for Nlyte sync")
return nil
}
// Location must match that of the dataset(s) referenced in the query.
client.Location = "US"
q := client.Query(nlyteQueryGetMountedDuts)
q.Parameters = []bigquery.QueryParameter{
{
Name: "material_ids",
Value: []int{nlyteMaterialIDChromeOSAsset},
},
}
it, err := q.Read(ctx)
if err != nil {
return errors.Fmt("executing nlyte query: %w", err)
}
var merr []error
// Use sync.Map to ensure we don't have a data race while writing
nlyteAssetMap := &sync.Map{}
err = parallel.WorkPool(16, func(c chan<- func() error) {
for {
var row assetResult
if err := it.Next(&row); err != nil {
if errors.Is(err, iterator.Done) {
break
}
logging.Errorf(ctx, "Unable to get results for Nlyte asset ID %d: %s", row.ID, err)
c <- func() error {
return errors.Fmt("reading nlyte query results: %w", err)
}
continue
}
logging.Debugf(ctx, "Nlyte sync attempt got data: %+v", row)
c <- func() error {
na, err := row.toAssetProto()
if err != nil {
return err
}
nlyteAssetMap.Store(row.Tag.StringVal, struct{}{})
return nlyteUpcertAsset(ctx, nlyteSyncCfg, na)
}
}
})
if err != nil {
merr = append(merr, errors.Fmt("upcerting Nlyte assets: %w", err))
}
for _, zoneCfg := range nlyteSyncCfg.GetZoneConfig() {
if !zoneCfg.GetEnabled() {
logging.Warningf(ctx, "Skipping zone %q due to enable being false", zoneCfg.GetZone())
continue
}
filter := map[string][]any{
"zone": {zoneCfg.GetZone()},
}
var getAssetsFn func(context.Context, int32, string, map[string][]any, bool) ([]*ufspb.Asset, string, error)
// When listing assets we can use registration as we aren't trying to modify any entries in Datastore.
switch zoneCfg.GetDefaultSyncAssetType() {
case config.NlyteSyncConfig_ZoneConfig_NLYTE_ASSET:
getAssetsFn = registration.ListNlyteAssets
case config.NlyteSyncConfig_ZoneConfig_ASSET:
getAssetsFn = registration.ListAssets
default:
merr = append(merr, errors.New("unknown asset sync type"))
continue
}
var assetPageToken string
var assets []*ufspb.Asset
for {
// There is a limit of changes that can be made in one transaction, so limit our page to 100 assets to ensure
// we don't change more than 100 at a time.
// This limit may be able to be increased; needs investigation.
assets, assetPageToken, err = getAssetsFn(ctx, 100, assetPageToken, filter, false)
if err != nil {
logging.Errorf(ctx, "Failure listing zone assets for removal: %v", err)
merr = append(merr, errors.Fmt("listing zone assets for removal: %w", err))
break
}
// Don't use a transaction here as it is already implemented per action in controller
err := genRemoveStaleZoneAssetsFunc(nlyteAssetMap, assets, zoneCfg)(ctx)
if err != nil {
logging.Errorf(ctx, "Failure removing stale zone assets: %v", err)
merr = append(merr, errors.Fmt("removing stale zone assets: %w", err))
}
if assetPageToken == "" {
break
}
}
}
return errors.Join(merr...)
}
// Create a new nlyte asset Kind if one does not exist, otherwise update an existing nlyte asset Kind.
func nlyteUpcertAsset(ctx context.Context, nlyteSyncCfg *config.NlyteSyncConfig, nlyteAsset *ufspb.Asset) error {
zoneCfgs := nlyteSyncCfg.GetZoneConfig()
assetZone := nlyteAsset.GetLocation().GetZone().String()
var zoneCfg *config.NlyteSyncConfig_ZoneConfig
for _, z := range zoneCfgs {
if assetZone == z.GetZone() {
zoneCfg = z
break
}
}
if zoneCfg == nil {
logging.Debugf(ctx, "Ignoring upcert for asset %q as it is not in a configured zone", nlyteAsset.GetName())
return nil
}
if !zoneCfg.GetEnabled() {
logging.Warningf(ctx, "Ignoring upcert for asset %q as zone %q enable is false", nlyteAsset.GetName(), assetZone)
return nil
}
exists, enabled, assetSyncType := getRowSyncOptions(nlyteAsset.GetLocation().GetRow(), zoneCfg)
if !exists {
logging.Debugf(ctx, "Ignoring upcert for asset %q as row %q is not configured", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow())
return nil
}
if exists && !enabled {
logging.Warningf(ctx, "Ignoring upcert for asset %q as row %q enable is false", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow())
return nil
}
if assetSyncType == config.NlyteSyncConfig_ZoneConfig_WRITE_ASSET_AS_UNSPECIFIED {
logging.Debugf(ctx, "Using zone default asset sync type for asset %q in row %q", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow())
assetSyncType = zoneCfg.GetDefaultSyncAssetType()
}
var getAssetFn func(ctx context.Context, name string) (*ufspb.Asset, error)
var updateAssetFn func(ctx context.Context, asset *ufspb.Asset, mask *field_mask.FieldMask) (*ufspb.Asset, error)
var registerAssetFn func(ctx context.Context, asset *ufspb.Asset) (*ufspb.Asset, error)
switch assetSyncType {
case config.NlyteSyncConfig_ZoneConfig_NLYTE_ASSET:
logging.Debugf(ctx, "Upcerting as NlyteAsset")
getAssetFn = controller.GetNlyteAsset
updateAssetFn = controller.UpdateNlyteAsset
registerAssetFn = controller.NlyteAssetRegistration
case config.NlyteSyncConfig_ZoneConfig_ASSET:
logging.Debugf(ctx, "Upcerting as Asset")
getAssetFn = controller.GetAsset
updateAssetFn = controller.UpdateAsset
registerAssetFn = controller.AssetRegistration
default:
logging.Errorf(ctx, "Invalid sync asset processing type was specified for asset %q of row %q", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow())
return errors.Fmt("Invalid sync asset processing type was specified for asset %q of row %q", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow())
}
fieldMask := &field_mask.FieldMask{}
fieldMask.Paths = []string{util.LocationRowPath, util.LocationRackPath, util.LocationPositionPath, util.LocationZonePath}
ufsAsset, err := getAssetFn(ctx, nlyteAsset.Name)
if err == nil {
logging.Debugf(ctx, "Nlyte synced asset %q found", nlyteAsset.Name)
if isUpToDate(ufsAsset, nlyteAsset) {
logging.Debugf(ctx, "Nlyte synced asset %q is up to date", nlyteAsset.Name)
} else {
logging.Debugf(ctx, "Nlyte synced asset %q is out of date, updating", nlyteAsset.Name)
_, err = updateAssetFn(ctx, nlyteAsset, fieldMask)
}
} else if util.IsNotFoundError(err) {
logging.Debugf(ctx, "Unable to find existing Nlyte synced asset %q, creating", nlyteAsset.Name)
_, err = registerAssetFn(ctx, nlyteAsset)
}
if err != nil {
logging.Errorf(ctx, "Unable to upscert Nlyte synced asset %q: %s", nlyteAsset.Name, err)
}
return err
}
func genRemoveStaleZoneAssetsFunc(nlyteAssetsMap *sync.Map, ufsAssets []*ufspb.Asset, zoneCfg *config.NlyteSyncConfig_ZoneConfig) func(context.Context) error {
return func(ctx context.Context) error {
var merr []error
var zoneRmFunc func(context.Context, string) error
switch zoneCfg.GetDefaultSyncAssetType() {
case config.NlyteSyncConfig_ZoneConfig_NLYTE_ASSET:
logging.Debugf(ctx, "Zone asset sync type is NLYTE_ASSET; using registration.DeleteNlyteAsset")
zoneRmFunc = controller.DeleteNlyteAsset
case config.NlyteSyncConfig_ZoneConfig_ASSET:
logging.Debugf(ctx, "Zone asset sync type is ASSET; using registration.DeleteAsset")
zoneRmFunc = controller.DeleteAsset
default:
return errors.New("unknown asset sync type")
}
rs := zoneCfg.GetRowSyncAssetTypes()
for _, asset := range ufsAssets {
var rmFunc func(context.Context, string) error
rowConfig, ok := rs[asset.Location.GetRow()]
if !ok {
logging.Debugf(ctx, "Skipping stale asset check for asset %q as row %q is not configured", asset.GetName(), asset.GetLocation().GetRow())
continue
}
// Asset has a specific row configuration, we need to check enable and type
if !rowConfig.GetEnabled() {
logging.Infof(ctx, "Skipping stale asset check for asset %q as row %q enable is false", asset.GetName(), asset.GetLocation().GetRow())
continue
}
switch rowConfig.GetRowSyncAssetType() {
case config.NlyteSyncConfig_ZoneConfig_NLYTE_ASSET:
logging.Debugf(ctx, "Row asset sync type is NLYTE_ASSET; using registration.DeleteNlyteAsset")
rmFunc = controller.DeleteNlyteAsset
case config.NlyteSyncConfig_ZoneConfig_ASSET:
logging.Debugf(ctx, "Row asset sync type is ASSET; using registration.DeleteAsset")
rmFunc = controller.DeleteAsset
case config.NlyteSyncConfig_ZoneConfig_WRITE_ASSET_AS_UNSPECIFIED:
logging.Debugf(ctx, "Using zone default asset sync type for asset %q of row %q", asset.GetName(), asset.GetLocation().GetRow())
rmFunc = zoneRmFunc
default:
logging.Warningf(ctx, "Unknown asset sync type for row %q; using zone default", asset.GetLocation().GetRow())
rmFunc = zoneRmFunc
}
if _, ok := nlyteAssetsMap.Load(asset.GetName()); ok {
// UFS asset was found in Nlyte during sync.
continue
}
// If not found, UFS asset is stale and needs to be deleted.
logging.Debugf(ctx, "Marking asset %q for removal", asset.GetName())
err := rmFunc(ctx, asset.GetName())
if err != nil {
merr = append(merr, errors.Fmt("unable to delete stale UFS asset %q: %w", asset.GetName(), err))
}
}
return errors.Join(merr...)
}
}
func getRowSyncOptions(rowName string, zoneCfg *config.NlyteSyncConfig_ZoneConfig) (exists, enabled bool, assetType config.NlyteSyncConfig_ZoneConfig_WriteAssetAs) {
assetType = zoneCfg.GetDefaultSyncAssetType()
exists, enabled = false, false
rowCfg := zoneCfg.GetRowSyncAssetTypes()
if rowCfg == nil {
return
}
if v, ok := rowCfg[rowName]; ok {
exists, enabled = true, v.GetEnabled()
assetType = v.GetRowSyncAssetType()
}
return
}
func isUpToDate(a *ufspb.Asset, b *ufspb.Asset) bool {
// We can't just do a straight compare with proto.Equals as UFS will change fields like
// last update time as part of normal operations.
isUpToDate := true
isUpToDate = isUpToDate && (a.GetLocation().GetRow() == b.GetLocation().GetRow())
isUpToDate = isUpToDate && (a.GetLocation().GetRack() == b.GetLocation().GetRack())
isUpToDate = isUpToDate && (a.GetLocation().GetPosition() == b.GetLocation().GetPosition())
isUpToDate = isUpToDate && (a.GetLocation().GetZone() == b.GetLocation().GetZone())
return isUpToDate
}