blob: 1c28d57bea13430271181520d8ebe421d6807d05 [file] [log] [blame]
// Copyright 2024 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bqutil
import (
"context"
"time"
"cloud.google.com/go/bigquery/storage/managedwriter"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/grpc/grpcmon"
"go.chromium.org/luci/server/auth"
)
// RowMaxBytes is the maximum number of row bytes to send in one
// BigQuery Storage Write API - AppendRows request. As at writing, the
// request size limit for this RPC is 10 MB:
// https://cloud.google.com/bigquery/quotas#write-api-limits.
// The maximum size of rows must be less than this as there are
// some overheads in each request.
const RowMaxBytes = 9 * 1024 * 1024 // 9 MB
var InvalidRowTagKey = errors.NewTagKey("InvalidRow")
// NewWriterClient returns a new BigQuery managedwriter client for use with the
// given GCP project, that authenticates as ResultDB itself.
func NewWriterClient(ctx context.Context, gcpProject string) (*managedwriter.Client, error) {
// Create shared client for all writes.
// This will ensure a shared connection pool is used for all writes,
// as recommended by:
// https://cloud.google.com/bigquery/docs/write-api-best-practices#limit_the_number_of_concurrent_connections
creds, err := auth.GetPerRPCCredentials(ctx, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return nil, errors.Annotate(err, "failed to initialize credentials").Err()
}
return managedwriter.NewClient(ctx, gcpProject,
option.WithGRPCDialOption(grpc.WithStatsHandler(&grpcmon.ClientRPCStatsMonitor{})),
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor())),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor())),
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)),
option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Minute,
})))
}
// Writer is used to export rows to BigQuery table.
type Writer struct {
client *managedwriter.Client
tableName string
tableSchemaDescriptor *descriptorpb.DescriptorProto
}
// NewWriter creates a writer for exporting rows to the provided BigQuery table
// via the provided managedWriter client.
func NewWriter(
client *managedwriter.Client,
tableName string,
tableSchemaDescriptor *descriptorpb.DescriptorProto,
) *Writer {
return &Writer{
client: client,
tableName: tableName,
tableSchemaDescriptor: tableSchemaDescriptor,
}
}
// AppendRowsWithDefaultStream write to the default stream. This provides at-least-one
// semantic (instead of exactly-one).
func (s *Writer) AppendRowsWithDefaultStream(ctx context.Context, rows []proto.Message) error {
ms, err := s.client.NewManagedStream(ctx,
managedwriter.WithType(managedwriter.DefaultStream),
managedwriter.WithSchemaDescriptor(s.tableSchemaDescriptor),
managedwriter.WithDestinationTable(s.tableName))
if err != nil {
return err
}
defer ms.Close()
return s.batchAppendRows(ctx, ms, rows)
}
// batchAppendRows chunk rows into batches and append each batch to the provided managedStream.
func (s *Writer) batchAppendRows(ctx context.Context, ms *managedwriter.ManagedStream, rows []proto.Message) error {
batches, err := batch(rows)
if err != nil {
return errors.Annotate(err, "batching rows").Tag(errors.BoolTag{Key: InvalidRowTagKey}).Err()
}
results := make([]*managedwriter.AppendResult, 0, len(batches))
for _, batch := range batches {
encoded := make([][]byte, 0, len(batch))
for _, r := range batch {
b, err := proto.Marshal(r)
if err != nil {
return errors.Annotate(err, "marshal proto").Tag(errors.BoolTag{Key: InvalidRowTagKey}).Err()
}
encoded = append(encoded, b)
}
result, err := ms.AppendRows(ctx, encoded)
if err != nil {
return errors.Annotate(err, "start appending rows").Err()
}
// Defer waiting on AppendRows until after all batches sent out.
// https://cloud.google.com/bigquery/docs/write-api-best-practices#do_not_block_on_appendrows_calls
results = append(results, result)
}
for _, result := range results {
_, err := result.GetResult(ctx)
if err != nil {
return errors.Annotate(err, "appending rows").Err()
}
}
return nil
}
// batch divides the rows to be inserted into batches, with each
// batch having an on-the-wire size not exceeding batchMaxBytes.
func batch(rows []proto.Message) ([][]proto.Message, error) {
var result [][]proto.Message
batchStartIndex := 0
batchSizeInBytes := 0
for i, row := range rows {
rowSize := RowSize(row)
if (batchSizeInBytes + rowSize) > RowMaxBytes {
if rowSize > RowMaxBytes {
return nil, errors.Reason("a single row exceeds the maximum BigQuery AppendRows request size of %v bytes", RowMaxBytes).Err()
}
// Output batch from batchStartIndex (inclusive) to i (exclusive).
result = append(result, rows[batchStartIndex:i])
// The current row becomes part of the next batch.
batchStartIndex = i
batchSizeInBytes = 0
}
batchSizeInBytes += rowSize
}
lastBatch := rows[batchStartIndex:]
if len(lastBatch) > 0 {
result = append(result, lastBatch)
}
return result, nil
}
// RowSize return size of row when we do batching.
func RowSize(row proto.Message) int {
// Assume 16 bytes of overhead per row not captured here.
return proto.Size(row) + 16
}