blob: 8ac73d84b4be2ec3cbff2a6513afa429f2797e86 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
"cloud.google.com/go/bigquery"
"github.com/maruel/subcommands"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/api/option"
"go.chromium.org/luci/auth"
"go.chromium.org/luci/common/api/gerrit"
"go.chromium.org/luci/common/cli"
"go.chromium.org/luci/common/data/caching/lru"
"go.chromium.org/luci/common/data/text"
"go.chromium.org/luci/common/errors"
luciflag "go.chromium.org/luci/common/flag"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/rts/presubmit/eval/history"
evalpb "go.chromium.org/luci/rts/presubmit/eval/proto"
)
func cmdPresubmitHistory(authOpt *auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: `presubmit-history`,
ShortDesc: "fetch presubmit history",
LongDesc: text.Doc(`
Fetch presubmit history, suitable for RTS evaluation.
`),
CommandRun: func() subcommands.CommandRun {
r := &presubmitHistoryRun{authOpt: authOpt}
r.Flags.StringVar(&r.out, "out", "", "Path to the output file")
r.Flags.Var(luciflag.Date(&r.startTime), "from", "Fetch results starting from this date; format: 2020-01-02")
r.Flags.Var(luciflag.Date(&r.endTime), "to", "Fetch results until this date; format: 2020-01-02")
r.Flags.Float64Var(&r.durationDataFrac, "duration-data-frac", 0.001, "Fraction of duration data to fetch")
r.Flags.DurationVar(&r.minDuration, "min-duration", time.Second, "Minimum duration to fetch")
r.Flags.StringVar(&r.builderRegex, "builder", ".*", "A regular expression for builder. Implicitly wrapped with ^ and $.")
r.Flags.StringVar(&r.testIDRegex, "test", ".*", "A regular expression for test. Implicitly wrapped with ^ and $.")
r.Flags.IntVar(&r.minCLFlakes, "min-cl-flakes", 5, text.Doc(`
In order to conlude that a test variant is flaky and exclude it from analysis,
it must have mixed results in <min-cl-flakes> unique CLs.
`))
return r
},
}
}
type presubmitHistoryRun struct {
baseCommandRun
out string
startTime time.Time
endTime time.Time
durationDataFrac float64
builderRegex string
testIDRegex string
minDuration time.Duration
minCLFlakes int
gerrit *gerritClient
authenticator *auth.Authenticator
authOpt *auth.Options
mu sync.Mutex
w *history.Writer
recordsWrote int
recordCountNextReport time.Time
}
func (r *presubmitHistoryRun) validate() error {
switch {
case r.out == "":
return errors.New("-out is required")
case r.startTime.IsZero():
return errors.New("-from is required")
case r.endTime.IsZero():
return errors.New("-to is required")
case r.endTime.Before(r.startTime):
return errors.New("the -to date must not be before the -from date")
default:
return nil
}
}
func (r *presubmitHistoryRun) Run(a subcommands.Application, args []string, env subcommands.Env) int {
ctx := cli.GetContext(a, r, env)
if len(args) != 0 {
return r.done(errors.New("unexpected positional arguments"))
}
if err := r.validate(); err != nil {
return r.done(err)
}
r.authenticator = auth.NewAuthenticator(ctx, auth.InteractiveLogin, *r.authOpt)
var err error
if r.gerrit, err = r.newGerritClient(); err != nil {
return r.done(errors.Annotate(err, "failed to init Gerrit client").Err())
}
// Create the history file.
if r.w, err = history.CreateFile(r.out); err != nil {
return r.done(errors.Annotate(err, "failed to create the output file").Err())
}
defer r.w.Close()
fmt.Printf("starting BigQuery queries...\n")
eg, ctx := errgroup.WithContext(ctx)
// Fetch the rejections.
var rejections int
eg.Go(func() error {
err := r.rejections(ctx, func(frag *evalpb.RejectionFragment) error {
if frag.Terminal {
rejections++
}
return r.write(&evalpb.Record{
Data: &evalpb.Record_RejectionFragment{RejectionFragment: frag},
})
})
return errors.Annotate(err, "failed to fetch rejections").Err()
})
// Fetch test durations.
var durations int
if r.durationDataFrac > 0 {
eg.Go(func() error {
err := r.durations(ctx, func(td *evalpb.TestDuration) error {
durations++
return r.write(&evalpb.Record{
Data: &evalpb.Record_TestDuration{TestDuration: td},
})
})
return errors.Annotate(err, "failed to fetch test durations").Err()
})
}
if err = eg.Wait(); err != nil {
return r.done(err)
}
r.mu.Lock()
fmt.Printf("total: %d rejections, %d durations, %d records\n", rejections, durations, r.recordsWrote)
r.mu.Unlock()
return r.done(r.w.Close())
}
// write write rec to the output file.
// Occasionally prints out progress.
func (r *presubmitHistoryRun) write(rec *evalpb.Record) error {
r.mu.Lock()
defer r.mu.Unlock()
if err := r.w.Write(rec); err != nil {
return err
}
// Occasionally print progress.
r.recordsWrote++
now := time.Now()
if r.recordCountNextReport.Before(now) {
if !r.recordCountNextReport.IsZero() {
fmt.Printf("wrote %d records\n", r.recordsWrote)
}
r.recordCountNextReport = now.Add(time.Second)
}
return nil
}
func (r *presubmitHistoryRun) bqQuery(ctx context.Context, sql string) (*bigquery.Query, error) {
http, err := r.authenticator.Client()
if err != nil {
return nil, err
}
client, err := bigquery.NewClient(ctx, "chrome-trooper-analytics", option.WithHTTPClient(http))
if err != nil {
return nil, err
}
prepRe := func(rgx string) string {
if rgx == "" || rgx == ".*" {
return ""
}
return fmt.Sprintf("^(%s)$", rgx)
}
q := client.Query(sql)
q.Parameters = []bigquery.QueryParameter{
{Name: "startTime", Value: r.startTime},
{Name: "endTime", Value: r.endTime},
{Name: "builder_regexp", Value: prepRe(r.builderRegex)},
{Name: "test_id_regexp", Value: prepRe(r.testIDRegex)},
}
return q, nil
}
var errPatchsetDeleted = errors.New("patchset deleted")
// populateChangedFiles populates ps.ChangedFiles.
// TODO(nodir): delete this function, gerrit.go and cache.go in February 2021,
// when we have enough BigQuery data with gerrit info.
func (r *presubmitHistoryRun) populateChangedFiles(ctx context.Context, ps *evalpb.GerritPatchset) error {
changedFiles, err := r.gerrit.ChangedFiles(ctx, ps)
switch {
case err != nil:
return err
case len(changedFiles) == 0:
return errPatchsetDeleted
}
repo := fmt.Sprintf("https://%s/%s", ps.Change.Host, strings.TrimSuffix(ps.Change.Project, ".git"))
ps.ChangedFiles = make([]*evalpb.SourceFile, len(changedFiles))
for i, path := range changedFiles {
ps.ChangedFiles[i] = &evalpb.SourceFile{
Repo: repo,
Path: "//" + path,
}
}
return nil
}
// newGerritClient creates a new gitiles client. Does not mutate r.
func (r *presubmitHistoryRun) newGerritClient() (*gerritClient, error) {
transport, err := r.authenticator.Transport()
if err != nil {
return nil, err
}
ucd, err := os.UserCacheDir()
if err != nil {
return nil, err
}
httpClient := &http.Client{Transport: transport}
return &gerritClient{
listFilesRPC: func(ctx context.Context, host string, req *gerritpb.ListFilesRequest) (*gerritpb.ListFilesResponse, error) {
client, err := gerrit.NewRESTClient(httpClient, host, true)
if err != nil {
return nil, errors.Annotate(err, "failed to create a Gerrit client").Err()
}
return client.ListFiles(ctx, req)
},
fileListCache: cache{
dir: filepath.Join(ucd, "chrome-rts", "gerrit-changed-files"),
memory: lru.New(1024),
valueType: reflect.TypeOf(changedFiles{}),
},
limiter: rate.NewLimiter(10, 1),
}, nil
}