| // Copyright 2024 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| package dumper |
| |
| import ( |
| "context" |
| _ "embed" |
| "fmt" |
| "strconv" |
| "strings" |
| "sync" |
| |
| "cloud.google.com/go/bigquery" |
| "google.golang.org/api/iterator" |
| "google.golang.org/genproto/protobuf/field_mask" |
| |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/sync/parallel" |
| |
| ufspb "go.chromium.org/infra/unifiedfleet/api/v1/models" |
| "go.chromium.org/infra/unifiedfleet/app/config" |
| "go.chromium.org/infra/unifiedfleet/app/controller" |
| "go.chromium.org/infra/unifiedfleet/app/model/registration" |
| "go.chromium.org/infra/unifiedfleet/app/util" |
| ) |
| |
| const ( |
| // Nlyte "DUT STD" Material ID |
| nlyteMaterialIDChromeOSAsset = 25144 |
| ) |
| |
| var ( |
| //go:embed nlyte_os_bq_sync.sql |
| nlyteQueryGetMountedDuts string |
| ) |
| |
| type assetResult struct { |
| ID int |
| Type string |
| Tag, Serial, Row, Rack bigquery.NullString |
| HostNumber int |
| Model, Board, Zone bigquery.NullString |
| } |
| |
| func (a assetResult) toAssetProto() (*ufspb.Asset, error) { |
| var merr []error |
| |
| if !a.Tag.Valid { |
| merr = append(merr, errors.New("missing asset tag")) |
| return nil, errors.Join(merr...) |
| } |
| if !a.Zone.Valid { |
| merr = append(merr, errors.New("missing zone")) |
| return nil, errors.Join(merr...) |
| } |
| zoneString := strings.ToUpper(a.Zone.StringVal) |
| if !strings.HasPrefix(zoneString, "ZONE_") { |
| zoneString = fmt.Sprintf("ZONE_%s", zoneString) |
| } |
| zoneInt, ok := ufspb.Zone_value[zoneString] |
| if !ok { |
| merr = append(merr, errors.New(fmt.Sprintf("unknown zone %q", zoneString))) |
| } |
| typeInt, ok := ufspb.AssetType_value[strings.ToUpper(a.Type)] |
| if !ok { |
| merr = append(merr, errors.New(fmt.Sprintf("unknown asset type %q", a.Type))) |
| } |
| |
| asset := &ufspb.Asset{ |
| Name: a.Tag.StringVal, |
| Type: ufspb.AssetType(typeInt), |
| Info: &ufspb.AssetInfo{ |
| AssetTag: a.Tag.StringVal, |
| }, |
| Location: &ufspb.Location{ |
| Zone: ufspb.Zone(zoneInt), |
| Position: strconv.Itoa(a.HostNumber), |
| }, |
| } |
| if a.Row.Valid { |
| asset.Location.Row = a.Row.StringVal |
| } |
| if a.Rack.Valid { |
| asset.Location.Rack = a.Rack.StringVal |
| } |
| if a.Model.Valid { |
| asset.Info.Model = a.Model.StringVal |
| asset.Model = a.Model.StringVal |
| } |
| if a.Board.Valid { |
| asset.Info.BuildTarget = a.Board.StringVal |
| } |
| if a.Serial.Valid { |
| asset.Info.SerialNumber = a.Serial.StringVal |
| } |
| return asset, errors.Join(merr...) |
| } |
| |
| func fetchNlyteBigQueryOsData(ctx context.Context) (rErr error) { |
| logging.Debugf(ctx, "Entering OS Nlyte Sync") |
| defer logging.Debugf(ctx, "Exiting OS Nlyte sync") |
| defer func() { |
| fetchNlyteBigQueryDataTick.Add(ctx, 1, rErr == nil) |
| }() |
| logging.Infof(ctx, "Setting namespce as OS") |
| ctx, err := util.SetupDatastoreNamespace(ctx, util.OSNamespace) |
| if err != nil { |
| return errors.Fmt("setting DataStore namespace: %w", err) |
| } |
| client, err := bigquery.NewClient(ctx, "nlyte-tng-prod") |
| if err != nil { |
| return fmt.Errorf("bigquery.NewClient: %w", err) |
| } |
| defer client.Close() |
| |
| nlyteSyncCfg := config.Get(ctx).GetNlyteSyncConfig() |
| if nlyteSyncCfg == nil { |
| return errors.New("Nlyte Sync configuration not set") |
| } |
| if nlyteSyncCfg.GetZoneConfig() == nil { |
| logging.Warningf(ctx, "No zones configured for Nlyte sync") |
| return nil |
| } |
| |
| // Location must match that of the dataset(s) referenced in the query. |
| client.Location = "US" |
| q := client.Query(nlyteQueryGetMountedDuts) |
| q.Parameters = []bigquery.QueryParameter{ |
| { |
| Name: "material_ids", |
| Value: []int{nlyteMaterialIDChromeOSAsset}, |
| }, |
| } |
| it, err := q.Read(ctx) |
| if err != nil { |
| return errors.Fmt("executing nlyte query: %w", err) |
| } |
| |
| var merr []error |
| |
| // Use sync.Map to ensure we don't have a data race while writing |
| nlyteAssetMap := &sync.Map{} |
| err = parallel.WorkPool(16, func(c chan<- func() error) { |
| for { |
| var row assetResult |
| if err := it.Next(&row); err != nil { |
| if errors.Is(err, iterator.Done) { |
| break |
| } |
| logging.Errorf(ctx, "Unable to get results for Nlyte asset ID %d: %s", row.ID, err) |
| c <- func() error { |
| return errors.Fmt("reading nlyte query results: %w", err) |
| } |
| continue |
| } |
| logging.Debugf(ctx, "Nlyte sync attempt got data: %+v", row) |
| c <- func() error { |
| na, err := row.toAssetProto() |
| if err != nil { |
| return err |
| } |
| nlyteAssetMap.Store(row.Tag.StringVal, struct{}{}) |
| return nlyteUpcertAsset(ctx, nlyteSyncCfg, na) |
| } |
| |
| } |
| }) |
| if err != nil { |
| merr = append(merr, errors.Fmt("upcerting Nlyte assets: %w", err)) |
| } |
| |
| for _, zoneCfg := range nlyteSyncCfg.GetZoneConfig() { |
| if !zoneCfg.GetEnabled() { |
| logging.Warningf(ctx, "Skipping zone %q due to enable being false", zoneCfg.GetZone()) |
| continue |
| } |
| filter := map[string][]any{ |
| "zone": {zoneCfg.GetZone()}, |
| } |
| var getAssetsFn func(context.Context, int32, string, map[string][]any, bool) ([]*ufspb.Asset, string, error) |
| // When listing assets we can use registration as we aren't trying to modify any entries in Datastore. |
| switch zoneCfg.GetDefaultSyncAssetType() { |
| case config.NlyteSyncConfig_ZoneConfig_NLYTE_ASSET: |
| getAssetsFn = registration.ListNlyteAssets |
| case config.NlyteSyncConfig_ZoneConfig_ASSET: |
| getAssetsFn = registration.ListAssets |
| default: |
| merr = append(merr, errors.New("unknown asset sync type")) |
| continue |
| } |
| |
| var assetPageToken string |
| var assets []*ufspb.Asset |
| for { |
| // There is a limit of changes that can be made in one transaction, so limit our page to 100 assets to ensure |
| // we don't change more than 100 at a time. |
| // This limit may be able to be increased; needs investigation. |
| assets, assetPageToken, err = getAssetsFn(ctx, 100, assetPageToken, filter, false) |
| if err != nil { |
| logging.Errorf(ctx, "Failure listing zone assets for removal: %v", err) |
| merr = append(merr, errors.Fmt("listing zone assets for removal: %w", err)) |
| break |
| } |
| // Don't use a transaction here as it is already implemented per action in controller |
| err := genRemoveStaleZoneAssetsFunc(nlyteAssetMap, assets, zoneCfg)(ctx) |
| if err != nil { |
| logging.Errorf(ctx, "Failure removing stale zone assets: %v", err) |
| merr = append(merr, errors.Fmt("removing stale zone assets: %w", err)) |
| } |
| if assetPageToken == "" { |
| break |
| } |
| } |
| } |
| return errors.Join(merr...) |
| } |
| |
| // Create a new nlyte asset Kind if one does not exist, otherwise update an existing nlyte asset Kind. |
| func nlyteUpcertAsset(ctx context.Context, nlyteSyncCfg *config.NlyteSyncConfig, nlyteAsset *ufspb.Asset) error { |
| zoneCfgs := nlyteSyncCfg.GetZoneConfig() |
| assetZone := nlyteAsset.GetLocation().GetZone().String() |
| var zoneCfg *config.NlyteSyncConfig_ZoneConfig |
| for _, z := range zoneCfgs { |
| if assetZone == z.GetZone() { |
| zoneCfg = z |
| break |
| } |
| } |
| if zoneCfg == nil { |
| logging.Debugf(ctx, "Ignoring upcert for asset %q as it is not in a configured zone", nlyteAsset.GetName()) |
| return nil |
| } |
| if !zoneCfg.GetEnabled() { |
| logging.Warningf(ctx, "Ignoring upcert for asset %q as zone %q enable is false", nlyteAsset.GetName(), assetZone) |
| return nil |
| } |
| |
| exists, enabled, assetSyncType := getRowSyncOptions(nlyteAsset.GetLocation().GetRow(), zoneCfg) |
| if !exists { |
| logging.Debugf(ctx, "Ignoring upcert for asset %q as row %q is not configured", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow()) |
| return nil |
| } |
| if exists && !enabled { |
| logging.Warningf(ctx, "Ignoring upcert for asset %q as row %q enable is false", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow()) |
| return nil |
| } |
| |
| if assetSyncType == config.NlyteSyncConfig_ZoneConfig_WRITE_ASSET_AS_UNSPECIFIED { |
| logging.Debugf(ctx, "Using zone default asset sync type for asset %q in row %q", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow()) |
| assetSyncType = zoneCfg.GetDefaultSyncAssetType() |
| } |
| |
| var getAssetFn func(ctx context.Context, name string) (*ufspb.Asset, error) |
| var updateAssetFn func(ctx context.Context, asset *ufspb.Asset, mask *field_mask.FieldMask) (*ufspb.Asset, error) |
| var registerAssetFn func(ctx context.Context, asset *ufspb.Asset) (*ufspb.Asset, error) |
| switch assetSyncType { |
| case config.NlyteSyncConfig_ZoneConfig_NLYTE_ASSET: |
| logging.Debugf(ctx, "Upcerting as NlyteAsset") |
| getAssetFn = controller.GetNlyteAsset |
| updateAssetFn = controller.UpdateNlyteAsset |
| registerAssetFn = controller.NlyteAssetRegistration |
| case config.NlyteSyncConfig_ZoneConfig_ASSET: |
| logging.Debugf(ctx, "Upcerting as Asset") |
| getAssetFn = controller.GetAsset |
| updateAssetFn = controller.UpdateAsset |
| registerAssetFn = controller.AssetRegistration |
| default: |
| logging.Errorf(ctx, "Invalid sync asset processing type was specified for asset %q of row %q", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow()) |
| return errors.Fmt("Invalid sync asset processing type was specified for asset %q of row %q", nlyteAsset.GetName(), nlyteAsset.GetLocation().GetRow()) |
| } |
| |
| fieldMask := &field_mask.FieldMask{} |
| fieldMask.Paths = []string{util.LocationRowPath, util.LocationRackPath, util.LocationPositionPath, util.LocationZonePath} |
| |
| ufsAsset, err := getAssetFn(ctx, nlyteAsset.Name) |
| if err == nil { |
| logging.Debugf(ctx, "Nlyte synced asset %q found", nlyteAsset.Name) |
| if isUpToDate(ufsAsset, nlyteAsset) { |
| logging.Debugf(ctx, "Nlyte synced asset %q is up to date", nlyteAsset.Name) |
| } else { |
| logging.Debugf(ctx, "Nlyte synced asset %q is out of date, updating", nlyteAsset.Name) |
| _, err = updateAssetFn(ctx, nlyteAsset, fieldMask) |
| } |
| } else if util.IsNotFoundError(err) { |
| logging.Debugf(ctx, "Unable to find existing Nlyte synced asset %q, creating", nlyteAsset.Name) |
| _, err = registerAssetFn(ctx, nlyteAsset) |
| } |
| if err != nil { |
| logging.Errorf(ctx, "Unable to upscert Nlyte synced asset %q: %s", nlyteAsset.Name, err) |
| } |
| return err |
| } |
| |
| func genRemoveStaleZoneAssetsFunc(nlyteAssetsMap *sync.Map, ufsAssets []*ufspb.Asset, zoneCfg *config.NlyteSyncConfig_ZoneConfig) func(context.Context) error { |
| return func(ctx context.Context) error { |
| var merr []error |
| var zoneRmFunc func(context.Context, string) error |
| switch zoneCfg.GetDefaultSyncAssetType() { |
| case config.NlyteSyncConfig_ZoneConfig_NLYTE_ASSET: |
| logging.Debugf(ctx, "Zone asset sync type is NLYTE_ASSET; using registration.DeleteNlyteAsset") |
| zoneRmFunc = controller.DeleteNlyteAsset |
| case config.NlyteSyncConfig_ZoneConfig_ASSET: |
| logging.Debugf(ctx, "Zone asset sync type is ASSET; using registration.DeleteAsset") |
| zoneRmFunc = controller.DeleteAsset |
| default: |
| return errors.New("unknown asset sync type") |
| } |
| rs := zoneCfg.GetRowSyncAssetTypes() |
| for _, asset := range ufsAssets { |
| var rmFunc func(context.Context, string) error |
| rowConfig, ok := rs[asset.Location.GetRow()] |
| if !ok { |
| logging.Debugf(ctx, "Skipping stale asset check for asset %q as row %q is not configured", asset.GetName(), asset.GetLocation().GetRow()) |
| continue |
| } |
| // Asset has a specific row configuration, we need to check enable and type |
| if !rowConfig.GetEnabled() { |
| logging.Infof(ctx, "Skipping stale asset check for asset %q as row %q enable is false", asset.GetName(), asset.GetLocation().GetRow()) |
| continue |
| } |
| switch rowConfig.GetRowSyncAssetType() { |
| case config.NlyteSyncConfig_ZoneConfig_NLYTE_ASSET: |
| logging.Debugf(ctx, "Row asset sync type is NLYTE_ASSET; using registration.DeleteNlyteAsset") |
| rmFunc = controller.DeleteNlyteAsset |
| case config.NlyteSyncConfig_ZoneConfig_ASSET: |
| logging.Debugf(ctx, "Row asset sync type is ASSET; using registration.DeleteAsset") |
| rmFunc = controller.DeleteAsset |
| case config.NlyteSyncConfig_ZoneConfig_WRITE_ASSET_AS_UNSPECIFIED: |
| logging.Debugf(ctx, "Using zone default asset sync type for asset %q of row %q", asset.GetName(), asset.GetLocation().GetRow()) |
| rmFunc = zoneRmFunc |
| default: |
| logging.Warningf(ctx, "Unknown asset sync type for row %q; using zone default", asset.GetLocation().GetRow()) |
| rmFunc = zoneRmFunc |
| } |
| if _, ok := nlyteAssetsMap.Load(asset.GetName()); ok { |
| // UFS asset was found in Nlyte during sync. |
| continue |
| } |
| // If not found, UFS asset is stale and needs to be deleted. |
| logging.Debugf(ctx, "Marking asset %q for removal", asset.GetName()) |
| err := rmFunc(ctx, asset.GetName()) |
| if err != nil { |
| merr = append(merr, errors.Fmt("unable to delete stale UFS asset %q: %w", asset.GetName(), err)) |
| } |
| } |
| return errors.Join(merr...) |
| } |
| } |
| |
| func getRowSyncOptions(rowName string, zoneCfg *config.NlyteSyncConfig_ZoneConfig) (exists, enabled bool, assetType config.NlyteSyncConfig_ZoneConfig_WriteAssetAs) { |
| assetType = zoneCfg.GetDefaultSyncAssetType() |
| exists, enabled = false, false |
| rowCfg := zoneCfg.GetRowSyncAssetTypes() |
| if rowCfg == nil { |
| return |
| } |
| if v, ok := rowCfg[rowName]; ok { |
| exists, enabled = true, v.GetEnabled() |
| assetType = v.GetRowSyncAssetType() |
| } |
| return |
| } |
| |
| func isUpToDate(a *ufspb.Asset, b *ufspb.Asset) bool { |
| // We can't just do a straight compare with proto.Equals as UFS will change fields like |
| // last update time as part of normal operations. |
| isUpToDate := true |
| |
| isUpToDate = isUpToDate && (a.GetLocation().GetRow() == b.GetLocation().GetRow()) |
| isUpToDate = isUpToDate && (a.GetLocation().GetRack() == b.GetLocation().GetRack()) |
| isUpToDate = isUpToDate && (a.GetLocation().GetPosition() == b.GetLocation().GetPosition()) |
| isUpToDate = isUpToDate && (a.GetLocation().GetZone() == b.GetLocation().GetZone()) |
| |
| return isUpToDate |
| } |