blob: 8f038b01e97eb81242d6d4c5fa6cb8c2c9fd4567 [file] [log] [blame]
// Copyright 2025 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package service
import (
"context"
"encoding/json"
"fmt"
"log"
"cloud.google.com/go/bigquery"
"go.chromium.org/chromiumos/config/go/test/api"
"go.chromium.org/chromiumos/config/go/test/artifact"
"google.golang.org/api/iterator"
)
const (
// TesthausGCPProject Testhaus GCP project ID.
TesthausGCPProject = "cros-test-analytics"
// TestInfoDataset Test info BigQuery dataset name.
TestInfoDataset = "testinfo"
// EQCInfoTable EqC info BigQuery table name.
EQCInfoTable = "eqc_info"
// EQCSubjectKey 3D EqC subject key used to fetch EqC info from the
// scheduling publish key map.
EQCSubjectKey = "3D"
)
// EQCPublishService 3D EqC publish service to interact with BigQuery.
// Note that the caller should call the Close function to close the bqClient
// before the service is done.
type EQCPublishService struct {
// bqClient the underlying BigQuery client.
bqClient *bigquery.Client
// eqcInfo the EqC info related to the current test results.
eqcInfo *artifact.EqcInfo
}
// EQCRow represents a row in the EqC info BigQuery table.
type EQCRow struct {
EQCHash string
EQCName string
EQCCategoryExpression map[string]string
EQCDimensions map[string]string
}
// NewEQCPublishService create a new EqC publish service to interact with
// BigQuery.
func NewEQCPublishService(ctx context.Context, req *api.PublishRequest) (*EQCPublishService, error) {
if req == nil {
return nil, fmt.Errorf("publish request is nil")
}
client, err := bigquery.NewClient(ctx, TesthausGCPProject)
if err != nil {
return nil, fmt.Errorf("creating the BQ client: %w", err)
}
eqcInfo, err := EqcInfo(req)
if err != nil {
client.Close()
return nil, fmt.Errorf("extracting the EqC info from test results: %w", err)
}
return &EQCPublishService{
bqClient: client,
eqcInfo: eqcInfo,
}, nil
}
// Close closes the EqC publish service.
func (eps *EQCPublishService) Close() error {
err := eps.bqClient.Close()
if err != nil {
return fmt.Errorf("close BQ client: %w", err)
}
return nil
}
// ExportToBQ exports the EqC info to BigQuery.
func (eps *EQCPublishService) ExportToBQ(ctx context.Context) error {
eqcInfo := eps.eqcInfo
log.Printf("Exporting the EqC info: %#v to BQ", eqcInfo)
if eqcInfo == nil {
log.Printf("Skipping the BQ export because the EqC info is empty")
return nil
}
if eqcInfo.GetEqcHash() == "" {
log.Printf("Skipping the BQ export because the EqC hash is empty: %#v", eqcInfo)
return nil
}
exist, err := eps.checkExists(ctx, eqcInfo)
if err != nil {
return fmt.Errorf("checking if the EqC info already exists: %w", err)
}
if exist {
log.Printf("Skipping the BQ export because the EqC info already exists: %#v", eqcInfo)
return nil
}
// Inserts the EqC info because it doesn't exist in the BQ table.
eqcRow := &EQCRow{
EQCHash: eqcInfo.EqcHash,
EQCName: eqcInfo.EqcName,
EQCCategoryExpression: eqcInfo.EqcCategoryExpression,
EQCDimensions: eqcInfo.EqcDimensions,
}
inserter := eps.bqClient.Dataset(TestInfoDataset).Table(EQCInfoTable).Inserter()
if err := inserter.Put(ctx, eqcRow); err != nil {
return fmt.Errorf("exporting the EqC info: %#v to BQ: %w", eqcInfo, err)
}
log.Printf("Successfully exported the EqC info: %#v to BQ", eqcInfo)
return nil
}
// checkExists checks if the given EqC info already exists in the BQ table.
func (eps *EQCPublishService) checkExists(ctx context.Context, eqcInfo *artifact.EqcInfo) (bool, error) {
// Constructs the query to check if the EqC info already exists.
q := eps.bqClient.Query(fmt.Sprintf("SELECT * FROM %s.%s.%s WHERE eqc_hash = @eqc_hash", TesthausGCPProject, TestInfoDataset, EQCInfoTable))
q.Parameters = []bigquery.QueryParameter{
{
Name: "eqc_hash",
Value: eqcInfo.EqcHash,
},
}
// Runs the query.
job, err := q.Run(ctx)
if err != nil {
return false, fmt.Errorf("running the BQ query: %w", err)
}
status, err := job.Wait(ctx)
if err != nil {
return false, fmt.Errorf("waiting for the BQ job: %w", err)
}
if err := status.Err(); err != nil {
return false, fmt.Errorf("checking the BQ job status: %w", err)
}
// Checks if any rows were returned.
it, err := job.Read(ctx)
if err != nil {
return false, fmt.Errorf("reading the BQ job results: %w", err)
}
err = it.Next(&EQCRow{})
if err == iterator.Done {
log.Printf("EqC info doesn't exist in the BQ: %#v", eqcInfo)
return false, nil
} else if err != nil {
return false, fmt.Errorf("checking if the EqC info already exists: %w", err)
} else {
log.Printf("EqC info already exists in the BQ: %#v", eqcInfo)
return true, nil
}
}
// Save implements the ValueSaver interface for the EQCRow struct.
func (e *EQCRow) Save() (map[string]bigquery.Value, string, error) {
if e.EQCCategoryExpression == nil {
e.EQCCategoryExpression = make(map[string]string)
}
categoryExpression, err := json.Marshal(e.EQCCategoryExpression)
if err != nil {
return nil, "", fmt.Errorf("marshalling the EqC category expression: %w", err)
}
if e.EQCDimensions == nil {
e.EQCDimensions = make(map[string]string)
}
dimensions, err := json.Marshal(e.EQCDimensions)
if err != nil {
return nil, "", fmt.Errorf("marshalling the EqC dimensions: %w", err)
}
return map[string]bigquery.Value{
"eqc_hash": e.EQCHash,
"eqc_name": e.EQCName,
"eqc_category_expression": string(categoryExpression),
"eqc_dimensions": string(dimensions),
}, e.EQCHash, nil
}
// EqcInfo extracts the EqC info from the rdb metadata in the publish
// request.
func EqcInfo(req *api.PublishRequest) (*artifact.EqcInfo, error) {
log.Printf("Started to extract the EqC info from the request: %#v", req)
metadata, err := UnpackMetadata(req)
if err != nil {
return nil, fmt.Errorf("unpacking the metadata in the publish request: %w", err)
}
eqcInfoMap := make(map[string]string)
for _, items := range metadata.GetPublishKeys() {
if items.GetSubject() == EQCSubjectKey {
eqcInfoMap = items.GetKeyValues()
break
}
}
if len(eqcInfoMap) == 0 {
return &artifact.EqcInfo{}, nil
}
eqcInfo := &artifact.EqcInfo{
EqcHash: eqcInfoMap["eqcHash"],
EqcName: eqcInfoMap["eqcName"],
EqcCategoryExpression: make(map[string]string),
EqcDimensions: make(map[string]string),
}
// The content of the "eqcCategoryExpression" field is aligned with the
// CategoryExpression proto in "/ttcp/protos/ttcp/syntax/syntax.proto".
if categoryExpressionJSON, ok := eqcInfoMap["eqcCategoryExpression"]; ok {
categoryExpressionMap := make(map[string]interface{})
if err := json.Unmarshal([]byte(categoryExpressionJSON), &categoryExpressionMap); err != nil {
return nil, fmt.Errorf("unmarshalling the EqC category expression: %w", err)
}
for k, v := range categoryExpressionMap {
switch value := v.(type) {
case string:
// Corresponds to the name field which indicates the name of the
// predefined category. Most of the time (> 99.99%) it will be
// mapped to this string type.
eqcInfo.EqcCategoryExpression[k] = fmt.Sprintf("%v", value)
default:
// Corresponds to the category proto field which explicitly
// specifies the category contents.
jsonString, err := json.Marshal(value)
if err != nil {
log.Printf("Failed to marshal the category expression key: %s", k)
continue
}
eqcInfo.EqcCategoryExpression[k] = string(jsonString)
}
}
}
if dimensionsJSON, ok := eqcInfoMap["eqcDimensions"]; ok {
if err := json.Unmarshal([]byte(dimensionsJSON), &eqcInfo.EqcDimensions); err != nil {
return nil, fmt.Errorf("unmarshalling the EqC dimensions: %w", err)
}
}
log.Printf("Successfully extracted the EqC info: %#v", eqcInfo)
return eqcInfo, nil
}