blob: 7fe2a38d9d5a1f605a6a5a552edfa51b6a1a43dd [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !copybara
// +build !copybara
package swarmingimpl
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/maruel/subcommands"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"go.chromium.org/luci/cipd/client/cipd"
"go.chromium.org/luci/cipd/client/cipd/ensure"
"go.chromium.org/luci/cipd/client/cipd/template"
"go.chromium.org/luci/client/cmd/swarming/swarmingimpl/base"
"go.chromium.org/luci/client/cmd/swarming/swarmingimpl/output"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/system/environ"
"go.chromium.org/luci/grpc/prpc"
"go.chromium.org/luci/hardcoded/chromeinfra"
"go.chromium.org/luci/lucictx"
rdbcli "go.chromium.org/luci/resultdb/cli"
resultpb "go.chromium.org/luci/resultdb/proto/v1"
"go.chromium.org/luci/swarming/client/swarming"
swarmingv2 "go.chromium.org/luci/swarming/proto/api_v2"
"go.chromium.org/luci/swarming/runner"
)
// CmdReproduce returns an object for the `reproduce` subcommand.
func CmdReproduce(authFlags base.AuthFlags) *subcommands.Command {
return &subcommands.Command{
UsageLine: "reproduce -S <server> <task ID>",
ShortDesc: "fetches the task command and runs it locally",
LongDesc: "Fetches a TaskRequest and runs the same commands that were run on the bot.",
CommandRun: func() subcommands.CommandRun {
return base.NewCommandRun(authFlags, &reproduceImpl{
cipdDownloader: downloadCIPDPackages,
createInvocation: createInvocation,
}, base.Features{
MinArgs: 1,
MaxArgs: 1,
UsesCAS: true,
OutputJSON: base.OutputJSON{
Enabled: false,
},
})
},
}
}
type reproduceImpl struct {
work string
out string
realm string
resultsHost string
taskID string
// cipdDownloader is used in testing to insert a mock CIPD downloader.
cipdDownloader func(context.Context, string, map[string]ensure.PackageSlice) error
// createInvocation is used in testing to insert a mock method.
createInvocation func(context.Context, *http.Client, string, string) (lucictx.Exported, func(), error)
}
func (cmd *reproduceImpl) RegisterFlags(fs *flag.FlagSet) {
fs.StringVar(&cmd.work, "work", "work", "Directory to map the task input files into and execute the task. Will be cleared!")
fs.StringVar(&cmd.out, "out", "out", "Directory that will hold the task results. Will be cleared!")
fs.StringVar(&cmd.realm, "realm", "", "Realm to create invocation in if ResultDB is enabled.")
fs.StringVar(&cmd.resultsHost, "results-host", chromeinfra.ResultDBHost, "Hostname of the ResultDB service to use. e.g. 'results.api.cr.dev'.")
}
func (cmd *reproduceImpl) ParseInputs(args []string, env subcommands.Env) error {
var err error
if cmd.work, err = filepath.Abs(cmd.work); err != nil {
return errors.Annotate(err, "failed to get absolute representation of work directory").Err()
}
if cmd.out, err = filepath.Abs(cmd.out); err != nil {
return errors.Annotate(err, "failed to get absolute representation of out directory").Err()
}
cmd.taskID = args[0]
return nil
}
func (cmd *reproduceImpl) Execute(ctx context.Context, svc swarming.Client, sink *output.Sink, extra base.Extra) error {
tr, err := svc.TaskRequest(ctx, cmd.taskID)
if err != nil {
return errors.Annotate(err, "failed to get task request: %s", cmd.taskID).Err()
}
// In practice, later slices are less likely to assume that there is a named
// cache that is not available locally.
properties := tr.TaskSlices[len(tr.TaskSlices)-1].Properties
execCmd, err := cmd.prepareTaskRequestEnvironment(ctx, properties, svc, extra.AuthFlags)
if err != nil {
return errors.Annotate(err, "failed to create command from task request").Err()
}
return cmd.executeTaskRequestCommand(ctx, tr, execCmd, extra.AuthFlags)
}
func (cmd *reproduceImpl) executeTaskRequestCommand(ctx context.Context, tr *swarmingv2.TaskRequestResponse, execCmd *exec.Cmd, auth base.AuthFlags) error {
// Enable ResultDB if necessary.
if tr.Resultdb != nil && tr.Resultdb.Enable {
if cmd.realm == "" {
return errors.Reason("must provide -realm if task request has ResultDB enabled").Err()
}
authcli, err := auth.NewHTTPClient(ctx)
if err != nil {
return errors.Annotate(err, "failed to create client").Err()
}
exported, invFinalizer, err := cmd.createInvocation(ctx, authcli, cmd.realm, cmd.resultsHost)
if err != nil {
return errors.Annotate(err, "failed to create Invocation").Err()
}
defer invFinalizer()
exported.SetInCmd(execCmd)
defer func() { _ = exported.Close() }()
}
if err := execCmd.Start(); err != nil {
return errors.Annotate(err, "failed to start command: %v", execCmd).Err()
}
if err := execCmd.Wait(); err != nil {
return errors.Annotate(err, "failed to complete command: %v", execCmd).Err()
}
return nil
}
func (cmd *reproduceImpl) prepareTaskRequestEnvironment(ctx context.Context, properties *swarmingv2.TaskProperties, svc swarming.Client, auth base.AuthFlags) (*exec.Cmd, error) {
execDir := cmd.work
if properties.RelativeCwd != "" {
// TODO(vadimsh): Forbid "..".
execDir = filepath.Join(execDir, properties.RelativeCwd)
}
if err := prepareDir(execDir); err != nil {
return nil, err
}
if err := prepareDir(cmd.out); err != nil {
return nil, err
}
// Set environment variables.
cmdEnvMap := environ.FromCtx(ctx)
for _, env := range properties.Env {
if env.Value == "" {
cmdEnvMap.Remove(env.Key)
} else {
cmdEnvMap.Set(env.Key, env.Value)
}
}
// Set environment prefixes.
for _, prefix := range properties.EnvPrefixes {
paths := make([]string, 0, len(prefix.Value)+1)
for _, value := range prefix.Value {
paths = append(paths, filepath.Clean(filepath.Join(cmd.work, value)))
}
cur, ok := cmdEnvMap.Lookup(prefix.Key)
if ok {
paths = append(paths, cur)
}
cmdEnvMap.Set(prefix.Key, strings.Join(paths, string(os.PathListSeparator)))
}
// Support RBE-CAS input in task request.
if properties.CasInputRoot != nil {
if _, err := svc.FilesFromCAS(ctx, cmd.work, properties.CasInputRoot); err != nil {
return nil, errors.Annotate(err, "failed to fetch files from RBE-CAS").Err()
}
}
// Support CIPD package download in task request.
if properties.CipdInput != nil {
packages := properties.CipdInput.Packages
slicesByPath := map[string]ensure.PackageSlice{}
for _, pkg := range packages {
path := pkg.Path
// CIPD deals with 'root' as ''.
if path == "." {
path = ""
}
if _, ok := slicesByPath[path]; !ok {
slicesByPath[path] = make(ensure.PackageSlice, 0, len(packages))
}
slicesByPath[path] = append(
slicesByPath[path], ensure.PackageDef{UnresolvedVersion: pkg.Version, PackageTemplate: pkg.PackageName})
}
if err := cmd.cipdDownloader(ctx, cmd.work, slicesByPath); err != nil {
return nil, err
}
}
// Create a Command that can run the task request.
processedCmds, err := runner.ProcessCommand(ctx, properties.Command, cmd.out, "")
if err != nil {
return nil, errors.Annotate(err, "failed to process command in properties").Err()
}
execCmd := exec.CommandContext(ctx, processedCmds[0], processedCmds[1:]...)
execCmd.Env = cmdEnvMap.Sorted()
execCmd.Dir = execDir
execCmd.Stdout = os.Stdout
execCmd.Stderr = os.Stderr
return execCmd, nil
}
func downloadCIPDPackages(ctx context.Context, workdir string, slicesByPath map[string]ensure.PackageSlice) error {
// Create CIPD client.
client, err := cipd.NewClientFromEnv(ctx, cipd.ClientOptions{Root: workdir})
if err != nil {
return errors.Annotate(err, "failed to create CIPD client").Err()
}
defer client.Close(ctx)
// Resolve versions.
resolver := cipd.Resolver{Client: client}
resolved, err := resolver.Resolve(ctx, &ensure.File{
ServiceURL: client.Options().ServiceURL,
PackagesBySubdir: slicesByPath,
}, template.DefaultExpander())
if err != nil {
return errors.Annotate(err, "failed to resolve CIPD package versions").Err()
}
// Download packages.
if _, err := client.EnsurePackages(ctx, resolved.PackagesBySubdir, &cipd.EnsureOptions{
Paranoia: resolved.ParanoidMode,
}); err != nil {
return errors.Annotate(err, "failed to install or update CIPD packages").Err()
}
return nil
}
func prepareDir(dir string) error {
if err := os.RemoveAll(dir); err != nil {
return errors.Annotate(err, "failed to remove directory: %s", dir).Err()
}
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return errors.Annotate(err, "failed to create directory: %s", dir).Err()
}
return nil
}
func createInvocation(ctx context.Context, authcli *http.Client, realm string, resultsHost string) (lucictx.Exported, func(), error) {
// TODO(vadimsh): Construct prpc.Client centrally in cmdbase with correct
// user agent.
recorder := resultpb.NewRecorderPRPCClient(&prpc.Client{
C: authcli,
Host: resultsHost,
Options: prpc.DefaultOptions(),
})
invID, err := rdbcli.GenInvID(ctx)
if err != nil {
return nil, nil, err
}
md := metadata.MD{}
invocation, err := recorder.CreateInvocation(ctx, &resultpb.CreateInvocationRequest{
InvocationId: invID,
Invocation: &resultpb.Invocation{
Realm: realm,
},
}, grpc.Header(&md))
if err != nil {
return nil, nil, err
}
tks := md.Get("update-token")
if len(tks) != 1 {
return nil, nil, errors.Reason("Missing header: update-token").Err()
}
exported, err := lucictx.Export(
lucictx.SetResultDB(ctx, &lucictx.ResultDB{
Hostname: resultsHost,
CurrentInvocation: &lucictx.ResultDBInvocation{Name: invocation.Name, UpdateToken: tks[0]},
}))
if err != nil {
return nil, nil, errors.Annotate(err, "failed to export context").Err()
}
return exported, func() {
ctx = metadata.AppendToOutgoingContext(ctx, "update-token", tks[0])
if _, err := recorder.FinalizeInvocation(ctx, &resultpb.FinalizeInvocationRequest{
Name: invocation.Name,
}); err != nil {
logging.WithError(err).Warningf(ctx, "failed to finalize invocation")
}
fmt.Printf("created invocation = %s\n", rdbcli.MustReturnInvURL(resultsHost, invocation.Name))
}, nil
}