blob: 0cf52a467d68c01609ee652afcbe9c99bdc3ea0b [file] [log] [blame]
// Copyright 2024 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package artifactexporter
import (
"context"
"fmt"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/bq"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/resultdb/bqutil"
bqpb "go.chromium.org/luci/resultdb/proto/bq"
)
// NewClient creates a new client for exporting artifacts
// via the BigQuery Write API.
func NewClient(ctx context.Context, projectID string) (s *Client, reterr error) {
if projectID == "" {
return nil, errors.New("GCP Project must be specified")
}
bqClient, err := bqutil.Client(ctx, projectID)
if err != nil {
return nil, errors.Annotate(err, "creating BQ client").Err()
}
defer func() {
if reterr != nil {
// This method failed for some reason, clean up the
// BigQuery client. Swallow any error returned by the Close()
// call.
bqClient.Close()
}
}()
mwClient, err := bqutil.NewWriterClient(ctx, projectID)
if err != nil {
return nil, errors.Annotate(err, "creating managed writer client").Err()
}
return &Client{
projectID: projectID,
bqClient: bqClient,
mwClient: mwClient,
}, nil
}
// Close releases resources held by the client.
func (c *Client) Close() (reterr error) {
// Ensure both bqClient and mwClient Close() methods
// are called, even if one panics or fails.
defer func() {
err := c.mwClient.Close()
if reterr == nil {
reterr = err
}
}()
return c.bqClient.Close()
}
// Client provides methods to export artifacts to BigQuery
// via the BigQuery Write API.
type Client struct {
// projectID is the name of the GCP project that contains ResultDB
// BigQuery datasets.
projectID string
bqClient *bigquery.Client
mwClient *managedwriter.Client
}
// schemaApplier ensures BQ schema matches the row proto definitions.
var schemaApplyer = bq.NewSchemaApplyer(bq.RegisterSchemaApplyerCache(1))
func (c *Client) EnsureSchema(ctx context.Context) error {
table := c.bqClient.Dataset(bqutil.InternalDatasetID).Table(tableName)
if err := schemaApplyer.EnsureTable(ctx, table, tableMetadata, bq.UpdateMetadata()); err != nil {
return errors.Annotate(err, "ensuring text artifacts table").Err()
}
return nil
}
// InsertArtifactRows inserts the given rows in BigQuery.
func (c *Client) InsertArtifactRows(ctx context.Context, rows []*bqpb.TextArtifactRow) error {
if err := c.EnsureSchema(ctx); err != nil {
return errors.Annotate(err, "ensure schema").Err()
}
tableName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", c.projectID, bqutil.InternalDatasetID, tableName)
writer := bqutil.NewWriter(c.mwClient, tableName, tableSchemaDescriptor)
payload := make([]proto.Message, len(rows))
for i, r := range rows {
payload[i] = r
}
return writer.AppendRowsWithDefaultStream(ctx, payload)
}