blob: ce1f23888f049e284bbd2b142f942a7d0c564b80 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package bq handles sending rows to BigQuery.
package bq
import (
"context"
"net/http"
"cloud.google.com/go/bigquery"
"google.golang.org/api/option"
"google.golang.org/protobuf/proto"
lucibq "go.chromium.org/luci/common/bq"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/server/auth"
)
// Row encapsulates destination and actual row to send.
//
// Exists to avoid confusion over multiple string arguments to SendRow.
type Row struct {
// CloudProject allows sending rows to other projects.
//
// Optional. Defaults to the one in the scope of which this process is running
// (e.g. "luci-change-verifier-dev").
CloudProject string
Dataset string
Table string
// OperationID is used for de-duplication, but over just 1 minute window :(
OperationID string
Payload proto.Message
}
type Client interface {
// SendRow appends a row to a BigQuery table synchronously.
SendRow(ctx context.Context, row Row) error
}
// NewProdClient creates new production client.
//
// The specified cloud project should be the one, in the scope of which this
// code is running, e.g. "luci-change-verifier-dev".
func NewProdClient(ctx context.Context, cloudProject string) (*prodClient, error) {
t, err := auth.GetRPCTransport(ctx, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return nil, err
}
b, err := bigquery.NewClient(ctx, cloudProject, option.WithHTTPClient(&http.Client{Transport: t}))
if err != nil {
return nil, errors.Annotate(err, "failed to create BQ client").Err()
}
return &prodClient{b}, nil
}
// prodClient implements a BigQuery Client for production.
type prodClient struct {
b *bigquery.Client
}
// SendRow sends a row to a real BigQuery table.
func (c *prodClient) SendRow(ctx context.Context, row Row) error {
var table *bigquery.Table
if row.CloudProject == "" {
table = c.b.Dataset(row.Dataset).Table(row.Table)
} else {
table = c.b.DatasetInProject(row.CloudProject, row.Dataset).Table(row.Table)
}
r := &lucibq.Row{
Message: row.Payload,
InsertID: row.OperationID,
}
if err := table.Inserter().Put(ctx, r); err != nil {
if pme, _ := err.(bigquery.PutMultiError); len(pme) != 0 {
return errors.Annotate(err, "bad row").Err()
}
return errors.Annotate(err, "unknown error sending row").Tag(transient.Tag).Err()
}
return nil
}