blob: 65bf678eda32db0cee70b1709aada5ec31e004fb [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bq
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/bigquery"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
)
// ID is the global InsertIDGenerator
var ID InsertIDGenerator
const insertLimit = 10000
const batchDefault = 500
// Uploader contains the necessary data for streaming data to BigQuery.
type Uploader struct {
*bigquery.Inserter
// Uploader is bound to a specific table. DatasetID and Table ID are
// provided for reference.
DatasetID string
TableID string
// UploadsMetricName is a string used to create a tsmon Counter metric
// for event upload attempts via Put, e.g.
// "/chrome/infra/commit_queue/events/count". If unset, no metric will
// be created.
UploadsMetricName string
// uploads is the Counter metric described by UploadsMetricName. It
// contains a field "status" set to either "success" or "failure."
uploads metric.Counter
initMetricOnce sync.Once
// BatchSize is the max number of rows to send to BigQuery at a time.
// The default is 500.
BatchSize int
}
// Row implements bigquery.ValueSaver
type Row struct {
proto.Message // embedded
// InsertID is unique per insert operation to handle deduplication.
InsertID string
}
// Save is used by bigquery.Inserter.Put when inserting values into a table.
func (r *Row) Save() (map[string]bigquery.Value, string, error) {
m, err := mapFromMessage(r.Message, nil)
return m, r.InsertID, err
}
// mapFromMessage returns a {BQ Field name: BQ value} map.
// path is a slice of Go field names leading to m.
func mapFromMessage(pm proto.Message, path []string) (map[string]bigquery.Value, error) {
type kvPair struct {
key string
val protoreflect.Value
}
m := pm.ProtoReflect()
if !m.IsValid() {
return nil, nil
}
fields := m.Descriptor().Fields()
var row map[string]bigquery.Value // keep it nil unless there are values
path = append(path, "")
for i := 0; i < fields.Len(); i++ {
var bqValue any
var err error
field := fields.Get(i)
fieldValue := m.Get(field)
bqField := string(field.Name())
path[len(path)-1] = bqField
switch {
case field.IsList():
list := fieldValue.List()
elems := make([]any, 0, list.Len())
vPath := append(path, "")
for i := 0; i < list.Len(); i++ {
vPath[len(vPath)-1] = strconv.Itoa(i)
elemValue, err := getValue(field, list.Get(i), vPath)
if err != nil {
return nil, errors.Annotate(err, "%s[%d]", bqField, i).Err()
}
if elemValue == nil {
continue
}
elems = append(elems, elemValue)
}
if len(elems) == 0 {
continue
}
bqValue = elems
case field.IsMap():
if field.MapKey().Kind() != protoreflect.StringKind {
return nil, fmt.Errorf("map key must be a string")
}
mapValue := fieldValue.Map()
if mapValue.Len() == 0 {
continue
}
pairs := make([]kvPair, 0, mapValue.Len())
mapValue.Range(func(key protoreflect.MapKey, value protoreflect.Value) bool {
pairs = append(pairs, kvPair{key.String(), value})
return true
})
slices.SortFunc(pairs, func(i, j kvPair) int {
switch {
case i.key == j.key:
return 0
case i.key < j.key:
return -1
default:
return 1
}
})
valueDesc := field.MapValue()
elems := make([]any, mapValue.Len())
vPath := append(path, "")
for i, pair := range pairs {
vPath[len(vPath)-1] = pair.key
elemValue, err := getValue(valueDesc, pair.val, vPath)
if err != nil {
return nil, errors.Annotate(err, "%s[%s]", bqField, pair.key).Err()
}
elems[i] = map[string]bigquery.Value{
"key": pair.key,
"value": elemValue,
}
}
bqValue = elems
default:
if bqValue, err = getValue(field, fieldValue, path); err != nil {
return nil, errors.Annotate(err, "%s", bqField).Err()
} else if bqValue == nil {
// Omit NULL/nil values
continue
}
}
if row == nil {
row = map[string]bigquery.Value{}
}
row[bqField] = bigquery.Value(bqValue)
}
return row, nil
}
func getValue(field protoreflect.FieldDescriptor, value protoreflect.Value, path []string) (any, error) {
// enums and primitives
if enumField := field.Enum(); enumField != nil {
enumName := string(enumField.Values().ByNumber(value.Enum()).Name())
return enumName, nil
}
if field.Kind() != protoreflect.MessageKind && field.Kind() != protoreflect.GroupKind {
return value.Interface(), nil
}
// structs
messageInterface := value.Message().Interface()
if dpb, ok := messageInterface.(*durationpb.Duration); ok {
if dpb == nil {
return nil, nil
}
if err := dpb.CheckValid(); err != nil {
return nil, fmt.Errorf("tried to write an invalid duration for [%+v] for field %q", dpb, strings.Join(path, "."))
}
value := dpb.AsDuration()
// Convert to FLOAT64.
return value.Seconds(), nil
}
if tspb, ok := messageInterface.(*timestamppb.Timestamp); ok {
if tspb == nil {
return nil, nil
}
if err := tspb.CheckValid(); err != nil {
return nil, fmt.Errorf("tried to write an invalid timestamp for [%+v] for field %q", tspb, strings.Join(path, "."))
}
value := tspb.AsTime()
return value, nil
}
if s, ok := messageInterface.(*structpb.Struct); ok {
if s == nil {
return nil, nil
}
// Structs are persisted as JSONPB strings.
// See also https://bit.ly/chromium-bq-struct
var buf []byte
var err error
if buf, err = protojson.Marshal(s); err != nil {
return nil, err
}
return string(buf), nil
}
message, err := mapFromMessage(messageInterface, path)
if message == nil {
// a nil map is not nil when converted to any,
// so return nil explicitly.
return nil, err
}
return message, err
}
// NewUploader constructs a new Uploader struct.
//
// DatasetID and TableID are provided to the BigQuery client to
// gain access to a particular table.
//
// You may want to change the default configuration of the bigquery.Inserter.
// Check the documentation for more details.
//
// Set UploadsMetricName on the resulting Uploader to use the default counter
// metric.
//
// Set BatchSize to set a custom batch size.
func NewUploader(ctx context.Context, c *bigquery.Client, datasetID, tableID string) *Uploader {
return &Uploader{
DatasetID: datasetID,
TableID: tableID,
Inserter: c.Dataset(datasetID).Table(tableID).Inserter(),
}
}
func (u *Uploader) batchSize() int {
switch {
case u.BatchSize > insertLimit:
return insertLimit
case u.BatchSize <= 0:
return batchDefault
default:
return u.BatchSize
}
}
func (u *Uploader) getCounter() metric.Counter {
u.initMetricOnce.Do(func() {
if u.UploadsMetricName != "" {
desc := "Upload attempts; status is 'success' or 'failure'"
field := field.String("status")
u.uploads = metric.NewCounter(u.UploadsMetricName, desc, nil, field)
}
})
return u.uploads
}
func (u *Uploader) updateUploads(ctx context.Context, count int64, status string) {
if uploads := u.getCounter(); uploads != nil && count != 0 {
uploads.Add(ctx, count, status)
}
}
// Put uploads one or more rows to the BigQuery service. Put takes care of
// adding InsertIDs, used by BigQuery to deduplicate rows.
//
// If any rows do now match one of the expected types, Put will not attempt to
// upload any rows and returns an InvalidTypeError.
//
// Put returns a PutMultiError if one or more rows failed to be uploaded.
// The PutMultiError contains a RowInsertionError for each failed row.
//
// Put will retry on temporary errors. If the error persists, the call will
// run indefinitely. Because of this, if ctx does not have a timeout, Put will
// add one.
//
// See bigquery documentation and source code for detailed information on how
// struct values are mapped to rows.
func (u *Uploader) Put(ctx context.Context, messages ...proto.Message) error {
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Minute)
defer cancel()
}
rows := make([]*Row, len(messages))
for i, m := range messages {
rows[i] = &Row{
Message: m,
InsertID: ID.Generate(),
}
}
return parallel.WorkPool(16, func(workC chan<- func() error) {
for _, rowSet := range batch(rows, u.batchSize()) {
rowSet := rowSet
workC <- func() error {
var failed int
err := u.Inserter.Put(ctx, rowSet)
if err != nil {
logging.WithError(err).Errorf(ctx, "eventupload: Uploader.Put failed")
if merr, ok := err.(bigquery.PutMultiError); ok {
if failed = len(merr); failed > len(rowSet) {
logging.Errorf(ctx, "eventupload: %v failures trying to insert %v rows", failed, len(rowSet))
}
} else {
failed = len(rowSet)
}
u.updateUploads(ctx, int64(failed), "failure")
}
succeeded := len(rowSet) - failed
u.updateUploads(ctx, int64(succeeded), "success")
return err
}
}
})
}
func batch(rows []*Row, batchSize int) [][]*Row {
rowSetsLen := int(math.Ceil(float64(len(rows) / batchSize)))
rowSets := make([][]*Row, 0, rowSetsLen)
for len(rows) > 0 {
batch := rows
if len(batch) > batchSize {
batch = batch[:batchSize]
}
rowSets = append(rowSets, batch)
rows = rows[len(batch):]
}
return rowSets
}