blob: f1498cc5e1ae259b0b82e71450f504c740111313 [file] [log] [blame]
// Copyright 2021 The Chromium Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cli
import (
"context"
"flag"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"infra/chromeperf/pinpoint"
"infra/chromeperf/pinpoint/proto"
"cloud.google.com/go/storage"
"go.chromium.org/luci/common/data/text"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"google.golang.org/protobuf/encoding/prototext"
)
// downloadResultsToDir copies the results associated with the provided job to the dstDir.
// The file that is written is returned.
func downloadResultsToDir(ctx context.Context, gcs *storage.Client, dstDir string, result *proto.ResultFile) (string, error) {
bucket, path := result.GcsBucket, result.Path
filename := filepath.Base(path)
dstFile, err := filepath.Abs(filepath.Join(dstDir, filename))
if err != nil {
return "", errors.Annotate(err, "failed getting absolute file path from %q %q", dstFile, filename).Err()
}
if err := removeExisting(dstFile); err != nil {
return "", errors.Annotate(err, "cannot download result file").Err()
}
src, err := gcs.Bucket(bucket).Object(path).NewReader(ctx)
if err != nil {
return "", errors.Annotate(err, "error requesting gs://%v/%v", bucket, path).Err()
}
defer src.Close()
tmp, err := os.CreateTemp("", filepath.Base(path))
if err != nil {
return "", err
}
// On early error, make sure to clean up after ourselves; these will error
// out if things are successful, but that shouldn't cause a problem.
defer os.Remove(tmp.Name())
defer tmp.Close()
if _, err := io.Copy(tmp, src); err != nil {
return "", errors.Annotate(err, "error downloading gs://%v/%v to file %q", bucket, path, tmp.Name()).Err()
}
if err := tmp.Close(); err != nil {
return "", errors.Annotate(err, "error flushing %v", tmp.Name()).Err()
}
if err := os.Rename(tmp.Name(), dstFile); err != nil {
return "", err
}
return dstFile, nil
}
type downloadResultsMixin struct {
resultsDir string
downloadResults bool
openResults bool
}
// TODO(chowski): if we are actually going to use mixins a lot, probably should
// add some support in the pinpointCommand wrapper type somehow.
func (drm *downloadResultsMixin) RegisterFlags(flags *flag.FlagSet, userCfg userConfig) {
flags.BoolVar(&drm.downloadResults, "download-results", userCfg.DownloadResults, text.Doc(`
If set, results are downloaded to the -results-dir. Note that files
will be overwritten if they exist already.
Override default from user configuration file.
`))
flags.StringVar(&drm.resultsDir, "results-dir", userCfg.ResultsDir, text.Doc(`
Ignored unless -download-results is set; the directory to store
results in.
`))
flags.BoolVar(&drm.openResults, "open-results", false, text.Doc(`
Ignored unless -download-results is set; if set, the results will
automatically be opened in a browser. Requires xdg-open to be
installed.
`))
}
func (drm *downloadResultsMixin) doDownloadResults(ctx context.Context, job *proto.Job) error {
if !drm.downloadResults || job.GetName() == "" {
return nil
}
if job.GetState() != proto.Job_SUCCEEDED {
logging.Infof(ctx, "Can't download results: must be in state SUCCEEDED, got %s", job.GetState())
return nil
}
gcs, err := storage.NewClient(ctx)
if err != nil {
// The error for not finding credentials is not structured in any way,
// so, we have to guess based on error text.
if strings.Contains(err.Error(), "could not find default credentials") {
return errors.Reason("failed to initialize Google Cloud Storage connection, error was: %v\n\nConsider running this command:\n\n\tgcloud auth application-default login\n", err).Err()
}
return errors.Annotate(err, "couldn't connect to Google Cloud Storage (GCS)").Err()
}
defer gcs.Close()
var errs errors.MultiError
for _, result := range job.GetResultFiles() {
dstFile, err := downloadResultsToDir(ctx, gcs, drm.resultsDir, result)
if err != nil {
errs = append(errs, err)
continue
}
logging.Infof(ctx, "Downloaded result file %v", dstFile)
if drm.openResults {
if err := exec.Command("xdg-open", dstFile).Run(); err != nil {
// Doesn't count as a fatal error, just is inconvenient.
logging.Errorf(ctx, "Couldn't open file with xdg-open: %v", err)
}
}
}
if len(errs) > 0 {
return errs
}
return nil
}
type waitForJobMixin struct {
wait bool
// TODO(dberris): Centralise the logging and allow for quiet mode.
quiet bool
}
func (wjm *waitForJobMixin) RegisterFlags(flags *flag.FlagSet, uc userConfig) {
flags.BoolVar(&wjm.wait, "wait", uc.Wait, text.Doc(`
When enabled, will wait for a job to complete.
`))
flags.BoolVar(&wjm.quiet, "quiet", uc.Quiet, text.Doc(`
Suppress progress output when waiting.
`))
}
// waitForJob can return a nil job in case `j` is also nil, and return a valid
// pointer to a Job in case there's an error, indicating partial success.
func (wjm *waitForJobMixin) waitForJob(
ctx context.Context,
c proto.PinpointClient,
j *proto.Job,
o io.Writer) (*proto.Job, error) {
if !wjm.wait || j == nil {
return j, nil
}
req := &proto.GetJobRequest{Name: pinpoint.LegacyJobName(j.Name)}
poll := time.NewTicker(10 * time.Second)
defer poll.Stop()
lastJob := j
for {
resp, err := c.GetJob(ctx, req)
if err != nil {
return j, errors.Annotate(err, "failed during GetJob").Err()
}
if !wjm.quiet && lastJob.GetLastUpdateTime().AsTime() != resp.GetLastUpdateTime().AsTime() {
out := prototext.MarshalOptions{Multiline: true}.Format(lastJob)
fmt.Fprintln(o, out)
fmt.Fprintln(o, "--------------------------------")
}
lastJob = resp
if s := lastJob.State; s != proto.Job_RUNNING && s != proto.Job_PENDING {
if !wjm.quiet {
fmt.Fprintf(o, "Final state for job %q: %v\n", lastJob.Name, s)
}
break
}
select {
case <-ctx.Done():
return lastJob, errors.Annotate(ctx.Err(), "polling for job wait cancelled").Err()
case <-poll.C:
// loop back around and retry.
}
}
return lastJob, nil
}