blob: e211211ddac60a27c74ac3a086d6df9cc4938b14 [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"flag"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"go.chromium.org/luci/auth"
"go.chromium.org/luci/common/errors"
luciflag "go.chromium.org/luci/common/flag"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
)
type baseHistoryRun struct {
out string
startTime time.Time
endTime time.Time
builderRegex string
testIDRegex string
clOwner string
authOpt *auth.Options
authenticator *auth.Authenticator
http *http.Client
}
func (r *baseHistoryRun) RegisterBaseFlags(fs *flag.FlagSet) {
fs.StringVar(&r.out, "out", "", "Path to the output directory")
fs.Var(luciflag.Date(&r.startTime), "from", "Fetch results starting from this date; format: 2020-01-15")
fs.Var(luciflag.Date(&r.endTime), "to", "Fetch results until this date; format: 2020-01-15")
fs.StringVar(&r.builderRegex, "builder", ".*", "A regular expression for builder. Implicitly wrapped with ^ and $.")
fs.StringVar(&r.testIDRegex, "test", ".*", "A regular expression for test. Implicitly wrapped with ^ and $.")
fs.StringVar(&r.clOwner, "cl-owner", "", "CL owner, e.g. someone@chromium.org")
}
func (r *baseHistoryRun) ValidateBaseFlags() error {
switch {
case r.out == "":
return errors.New("-out is required")
case r.startTime.IsZero():
return errors.New("-from is required")
case r.endTime.IsZero():
return errors.New("-to is required")
case r.endTime.Before(r.startTime):
return errors.New("the -to date must not be before the -from date")
default:
return nil
}
}
// Init initializes r. Validates flags as well.
func (r *baseHistoryRun) Init(ctx context.Context) error {
if err := r.ValidateBaseFlags(); err != nil {
return err
}
r.authenticator = auth.NewAuthenticator(ctx, auth.InteractiveLogin, *r.authOpt)
var err error
r.http, err = r.authenticator.Client()
return err
}
// runAndFetchResults runs the BigQuery query and saves results to r.out directory,
// as GZIP-compressed JSON Lines files.
func (r *baseHistoryRun) runAndFetchResults(ctx context.Context, sql string, extraParams ...bigquery.QueryParameter) error {
logging.Infof(ctx, "starting a BigQuery query...\n")
table, err := r.runQuery(ctx, sql, extraParams...)
if err != nil {
return err
}
logging.Infof(ctx, "fetching results...\n")
return r.fetchResults(ctx, table)
}
// runQuery runs the query and returns the table with results.
func (r *baseHistoryRun) runQuery(ctx context.Context, sql string, extraParams ...bigquery.QueryParameter) (*bigquery.Table, error) {
client, err := newBQClient(ctx, r.authenticator)
if err != nil {
return nil, err
}
// Prepare the query.
prepRe := func(rgx string) string {
if rgx == "" || rgx == ".*" {
return ""
}
return fmt.Sprintf("^(%s)$", rgx)
}
q := client.Query(sql)
q.Parameters = []bigquery.QueryParameter{
{Name: "startTime", Value: r.startTime},
{Name: "endTime", Value: r.endTime},
{Name: "builderRegexp", Value: prepRe(r.builderRegex)},
{Name: "testIdRegexp", Value: prepRe(r.testIDRegex)},
{Name: "minChangedFiles", Value: minChangedFiles},
{Name: "maxChangedFiles", Value: maxChangedFiles},
{Name: "clOwner", Value: r.clOwner},
}
q.Parameters = append(q.Parameters, extraParams...)
// Run the query.
job, err := q.Run(ctx)
if err != nil {
return nil, err
}
logging.Infof(ctx, "BigQuery job: https://console.cloud.google.com/bigquery?project=chrome-rts&page=queryresults&j=bq:US:%s", job.ID())
if err := waitForSuccess(ctx, job); err != nil {
return nil, err
}
cfg, err := job.Config()
if err != nil {
return nil, err
}
return cfg.(*bigquery.QueryConfig).Dst, nil
}
// fetchResults fetches the table to GZIP-compressed JSON Lines files in r.out
// directory.
func (r *baseHistoryRun) fetchResults(ctx context.Context, table *bigquery.Table) error {
// The fetching processing is done in two phases:
// 1. Extract the table to GCS.
// This also takes care of the converting from tabular format to a file format.
// 2. Download the prepared files to the destination directory.
if err := prepareOutDir(r.out, "*.jsonl.gz"); err != nil {
return err
}
// Extract the table to GCS.
bucketName := "chrome-rts"
dirName := fmt.Sprintf("tmp/extract-%s", table.TableID)
gcsDir := fmt.Sprintf("gs://%s/%s/", bucketName, dirName)
logging.Infof(ctx, "extracting results to %s", gcsDir)
gcsRef := &bigquery.GCSReference{
// Start the object name with a date, so that the user can merge
// data directories if needed.
URIs: []string{fmt.Sprintf("%s%s-*.jsonl.gz", gcsDir, r.startTime.Format("2006-01-02"))},
DestinationFormat: bigquery.JSON,
Compression: bigquery.Gzip,
}
job, err := table.ExtractorTo(gcsRef).Run(ctx)
if err != nil {
return err
}
if err := waitForSuccess(ctx, job); err != nil {
return errors.Annotate(err, "extract job %q failed", job.ID()).Err()
}
// Fetch the extracted files from GCS to the file system.
gcs, err := storage.NewClient(ctx, option.WithHTTPClient(r.http))
if err != nil {
return err
}
bucket := gcs.Bucket(bucketName)
// Find all files in the temp GCS dir and fetch them all, concurrently.
iter := bucket.Objects(ctx, &storage.Query{Prefix: dirName})
return parallel.WorkPool(100, func(work chan<- func() error) {
for {
objAttrs, err := iter.Next()
switch {
case err == iterator.Done:
return
case err != nil:
work <- func() error { return err }
return
}
// Fetch the file.
work <- func() error {
// Prepare the source.
rd, err := bucket.Object(objAttrs.Name).NewReader(ctx)
if err != nil {
return err
}
defer rd.Close()
// Prepare the sink.
f, err := os.Create(filepath.Join(r.out, path.Base(objAttrs.Name)))
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(f, rd)
return err
}
}
})
}
func waitForSuccess(ctx context.Context, job *bigquery.Job) error {
st, err := job.Wait(ctx)
if err != nil {
return err
}
return st.Err()
}