blob: b2b8270b89743e8f19ae5592e8855ec2324eecda [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"cloud.google.com/go/bigquery"
"google.golang.org/api/option"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
"go.chromium.org/luci/auth"
"go.chromium.org/luci/common/bq"
"go.chromium.org/luci/common/errors"
luciflag "go.chromium.org/luci/common/flag"
"go.chromium.org/luci/common/flag/stringlistflag"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"go.chromium.org/luci/common/proto/google/descutil"
"go.chromium.org/luci/common/proto/protoc"
"go.chromium.org/luci/hardcoded/chromeinfra"
)
var (
canceledByUser = errors.BoolTag{
Key: errors.NewTagKey("operation canceled by user"),
}
errCanceledByUser = errors.Reason("operation canceled by user").Tag(canceledByUser).Err()
)
type tableDef struct {
ProjectID string
DataSetID string
TableID string
FriendlyName string
Description string
PartitioningDisabled bool
PartitioningExpiration time.Duration
PartitioningField string
PartitioningType string
Schema bigquery.Schema
ClusteringFields []string
}
func updateFromTableDef(ctx context.Context, force bool, ts tableStore, td tableDef) error {
tableID := fmt.Sprintf("%s.%s.%s", td.ProjectID, td.DataSetID, td.TableID)
shouldContinue := func() bool {
if force {
return true
}
return confirm("Continue")
}
md, err := ts.getTableMetadata(ctx, td.DataSetID, td.TableID)
switch {
case isNotFound(err): // new table
fmt.Printf("Table %q does not exist.\n", tableID)
fmt.Println("It will be created with the following schema:")
fmt.Println(strings.Repeat("=", 80))
fmt.Println(bq.SchemaString(td.Schema))
fmt.Println(strings.Repeat("=", 80))
if !shouldContinue() {
return errCanceledByUser
}
md = &bigquery.TableMetadata{
Name: td.FriendlyName,
Description: td.Description,
Schema: td.Schema,
}
if !td.PartitioningDisabled {
md.TimePartitioning = &bigquery.TimePartitioning{
Expiration: td.PartitioningExpiration,
Field: td.PartitioningField,
Type: bigquery.TimePartitioningType(td.PartitioningType),
}
}
if len(td.ClusteringFields) > 0 {
md.Clustering = &bigquery.Clustering{Fields: td.ClusteringFields}
}
if err = ts.createTable(ctx, td.DataSetID, td.TableID, md); err != nil {
return err
}
fmt.Println("Table is created.")
fmt.Println("Please update the documentation in https://chromium.googlesource.com/infra/infra/+/master/doc/bigquery_tables.md or the internal equivalent.")
return nil
case err != nil:
return err
default: // existing table
fmt.Printf("Updating table %q\n", tableID)
// add fields missing in td.Schema because BigQuery does not support
// removing fields anyway.
bq.AddMissingFields(&td.Schema, md.Schema)
if diff := bq.SchemaDiff(md.Schema, td.Schema); diff == "" {
fmt.Println("No changes to schema detected.")
} else {
fmt.Println("The following changes to the schema will be made:")
fmt.Println(strings.Repeat("=", 80))
fmt.Println(diff)
fmt.Println(strings.Repeat("=", 80))
if !shouldContinue() {
return errCanceledByUser
}
}
update := bigquery.TableMetadataToUpdate{
Name: td.FriendlyName,
Description: td.Description,
Schema: td.Schema,
}
if err := ts.updateTable(ctx, td.DataSetID, td.TableID, update); err != nil {
return err
}
fmt.Println("Finished updating the table.")
return nil
}
}
type flags struct {
tableDef
protoDir string
messageName string
force bool
verbose bool
importPaths stringlistflag.Flag
noGoMode bool
goModules stringlistflag.Flag
}
func parseFlags() (*flags, error) {
var f flags
table := flag.String("table", "", `Table name with format "<project id>.<dataset id>.<table id>". Use "-" to dump the schema to stdout only.`)
flag.StringVar(&f.FriendlyName, "friendly-name", "", "Friendly name for the table.")
flag.StringVar(&f.PartitioningField, "partitioning-field", "", "Name of a timestamp field to use for table partitioning (beta).")
// See: https://pkg.go.dev/cloud.google.com/go/bigquery#TimePartitioning
flag.StringVar(&f.PartitioningType, "partitioning-type", "DAY", "One of HOUR, DAY, MONTH and YEAR.")
flag.BoolVar(&f.PartitioningDisabled, "disable-partitioning", false, "Makes the table not time-partitioned.")
flag.DurationVar(&f.PartitioningExpiration, "partitioning-expiration", 0, "Expiration for partitions. 0 for no expiration.")
flag.Var(luciflag.StringSlice(&f.ClusteringFields), "clustering-field", "Optional, one or more clustering fields. Can be specified multiple times and order is significant.")
flag.StringVar(&f.protoDir, "message-dir", ".", "Path to directory with the .proto file that defines the schema message.")
flag.BoolVar(&f.noGoMode, "no-go-mode", false, "Don't try to recognize active Go module based on cwd.")
flag.Var(&f.goModules, "go-module", "Make protos in the given module available in proto import path. Can be specified multiple times.")
flag.BoolVar(&f.force, "force", false, "Proceed without a user confirmation.")
flag.BoolVar(&f.verbose, "verbose", false, "Print more information in the log.")
// -I matches protoc's flag and its error message suggesting to pass -I.
flag.Var(&f.importPaths, "I", "Path to directory with the imported .proto file; can be specified multiple times.")
flag.StringVar(&f.messageName,
"message",
"",
"Full name of the protobuf message that defines the table schema. The name must contain proto package name.")
flag.Parse()
switch {
case len(flag.Args()) > 0:
return nil, fmt.Errorf("unexpected arguments: %q", flag.Args())
case *table == "":
return nil, fmt.Errorf("-table is required")
case f.messageName == "":
return nil, fmt.Errorf("-message is required (the name must contain the proto package name)")
case f.PartitioningField != "" && f.PartitioningDisabled:
return nil, fmt.Errorf("partitioning field cannot be non-empty with disabled partitioning")
case f.noGoMode && len(f.goModules) > 0:
return nil, fmt.Errorf("-no-go-mode and -go-module flags are not compatible")
}
if parts := strings.Split(*table, "."); len(parts) == 3 {
f.ProjectID = parts[0]
f.DataSetID = parts[1]
f.TableID = parts[2]
} else if *table == "-" {
f.TableID = "-"
} else {
return nil, fmt.Errorf("table name %q must be '-' or have exactly 2 dots", *table)
}
return &f, nil
}
func run(ctx context.Context) error {
flags, err := parseFlags()
if err != nil {
return errors.Annotate(err, "failed to parse flags").Err()
}
if flags.verbose {
ctx = logging.SetLevel(ctx, logging.Debug)
} else {
ctx = logging.SetLevel(ctx, logging.Error)
}
td := flags.tableDef
desc, err := loadProtoDescription(ctx, flags.protoDir, !flags.noGoMode, flags.goModules, flags.importPaths)
if err != nil {
return errors.Annotate(err, "failed to load proto descriptor").Err()
}
td.Schema, td.Description, err = schemaFromMessage(desc, flags.messageName)
if err != nil {
return errors.Annotate(err, "could not derive schema from message %q at path %q", flags.messageName, flags.protoDir).Err()
}
if td.TableID == "-" {
return dumpSchemaJSON(&td.Schema)
}
file, _, _ := descutil.Resolve(desc, flags.messageName)
td.Description = fmt.Sprintf(
"Proto: https://cs.chromium.org/%s\nTable Description:\n%s",
url.PathEscape(fmt.Sprintf("%s file:%s", flags.messageName, file.GetName())),
td.Description)
// Create an Authenticator and use it for BigQuery operations.
authOpts := chromeinfra.DefaultAuthOptions()
authOpts.Scopes = []string{bigquery.Scope}
authenticator := auth.NewAuthenticator(ctx, auth.InteractiveLogin, authOpts)
authTS, err := authenticator.TokenSource()
if err != nil {
return errors.Annotate(err, "could not get authentication credentials").Err()
}
c, err := bigquery.NewClient(ctx, td.ProjectID, option.WithTokenSource(authTS))
if err != nil {
return errors.Annotate(err, "could not create BigQuery client").Err()
}
return updateFromTableDef(ctx, flags.force, bqTableStore{c}, td)
}
func main() {
ctx := gologger.StdConfig.Use(context.Background())
switch err := run(ctx); {
case canceledByUser.In(err):
os.Exit(1)
case err != nil:
log.Fatal(err)
}
}
// schemaFromMessage loads a message by name from .proto files in dir
// and converts the message to a bigquery schema.
func schemaFromMessage(desc *descriptorpb.FileDescriptorSet, messageName string) (schema bigquery.Schema, description string, err error) {
conv := bq.SchemaConverter{
Desc: desc,
SourceCodeInfo: make(map[*descriptorpb.FileDescriptorProto]bq.SourceCodeInfoMap, len(desc.File)),
}
for _, f := range desc.File {
conv.SourceCodeInfo[f], err = descutil.IndexSourceCodeInfo(f)
if err != nil {
return nil, "", errors.Annotate(err, "failed to index source code info in file %q", f.GetName()).Err()
}
}
return conv.Schema(messageName)
}
// checkGoMode returns true if `go` executable is in PATH and `dir` is in
// a Go module.
//
// Note that GOPATH mode is not supported. Returns an error if it sees GOPATH
// env var.
func checkGoMode(dir string) (bool, error) {
cmd := exec.Command("go", "list", "-m")
cmd.Dir = dir
buf, err := cmd.CombinedOutput()
if err == nil {
// When `dir` is not a Go package, `go -list -m` returns
// "command-line-arguments". See https://github.com/golang/go/issues/36793.
return strings.TrimSpace(string(buf)) != "command-line-arguments", nil
}
if os.Getenv("GO111MODULE") != "off" && os.Getenv("GOPATH") != "" {
return false, errors.Reason("GOPATH mode is not supported").Err()
}
return false, nil
}
// prepInputs prepares inputs for protoc depending on Go vs non-Go mode.
func prepInputs(ctx context.Context, dir string, allowGoMode bool, goModules, importPaths []string) (*protoc.StagedInputs, error) {
useGo := allowGoMode && len(goModules) > 0
if !useGo && allowGoMode {
var err error
if useGo, err = checkGoMode(dir); err != nil {
return nil, err
}
}
if useGo {
logging.Infof(ctx, "Running in Go mode: importing *.proto from Go source tree")
return protoc.StageGoInputs(ctx, dir, goModules, nil, importPaths)
}
logging.Infof(ctx, "Running in generic mode: importing *.proto from explicitly given paths only")
return protoc.StageGenericInputs(ctx, dir, importPaths)
}
// loadProtoDescription compiles .proto files in the dir
// and returns their descriptor.
func loadProtoDescription(ctx context.Context, dir string, allowGoMode bool, goModules, importPaths []string) (*descriptorpb.FileDescriptorSet, error) {
// Stage all requested Go modules under a single root.
inputs, err := prepInputs(ctx, dir, allowGoMode, goModules, importPaths)
if err != nil {
return nil, err
}
defer inputs.Cleanup()
// Prep the temp directory for the resulting descriptor file.
tempDir, err := ioutil.TempDir("", "")
if err != nil {
return nil, err
}
defer os.RemoveAll(tempDir)
descFile := filepath.Join(tempDir, "desc")
// Compile protos to get the descriptor.
err = protoc.Compile(ctx, &protoc.CompileParams{
Inputs: inputs,
OutputDescriptorSet: descFile,
})
if err != nil {
return nil, err
}
// Read the resulting descriptor.
descBytes, err := os.ReadFile(descFile)
if err != nil {
return nil, err
}
var desc descriptorpb.FileDescriptorSet
err = proto.Unmarshal(descBytes, &desc)
return &desc, err
}
// confirm asks for a user confirmation for an action, with No as default.
// Only "y" or "Y" responses is treated as yes.
func confirm(action string) (response bool) {
fmt.Printf("%s? [y/N] ", action)
var res string
fmt.Scanln(&res)
return res == "y" || res == "Y"
}
// dumpSchema dumps the table schema to stdout in JSON.
func dumpSchemaJSON(s *bigquery.Schema) error {
b, err := s.ToJSONFields()
if err != nil {
return errors.Annotate(err, "failed to dump table schema to JSON").Err()
}
fmt.Println(string(b))
return nil
}