blob: cb226746529bc960f9fb7e69cf3cf400060b71d3 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"io"
"os"
"os/exec"
"sync"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/flag/nestedflagset"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/system/environ"
"go.chromium.org/luci/common/system/exitcode"
"go.chromium.org/luci/logdog/client/bootstrapResult"
"go.chromium.org/luci/logdog/client/butler"
"go.chromium.org/luci/logdog/client/butler/streamserver"
"github.com/maruel/subcommands"
)
var subcommandRun = &subcommands.Command{
UsageLine: "run",
ShortDesc: "Bootstraps an application, passing its output through the Butler.",
LongDesc: "Bootstraps an application to stream through the Butler.",
CommandRun: func() subcommands.CommandRun {
cmd := &runCommandRun{}
cmd.Flags.StringVar(&cmd.resultPath, "result-path", "",
"If supplied, a JSON file describing the bootstrap result will be written here if the bootstrapped process "+
"is successfully executed.")
cmd.Flags.StringVar(&cmd.jsonArgsPath, "json-args-path", "",
"If specified, this is a JSON file containing the full command to run as an "+
"array of strings.")
cmd.Flags.StringVar(&cmd.chdir, "chdir", "",
"If specified, switch to this directory prior to running the command.")
cmd.Flags.BoolVar(&cmd.attach, "attach", true,
"If true, attaches the bootstrapped process's STDOUT and STDERR streams.")
cmd.Flags.BoolVar(&cmd.stdin, "forward-stdin", false,
"If true, forward STDIN to the bootstrapped process.")
// "stdout" command-line option.
stdoutFlag := new(nestedflagset.FlagSet)
cmd.stdout.addFlags(&stdoutFlag.F)
cmd.Flags.Var(stdoutFlag, "stdout", "STDOUT stream parameters.")
// "stderr" command-line option.
stderrFlag := new(nestedflagset.FlagSet)
cmd.stderr.addFlags(&stderrFlag.F)
cmd.Flags.Var(stderrFlag, "stderr", "STDERR stream parameters.")
return cmd
},
}
type runCommandRun struct {
subcommands.CommandRunBase
// If not empty, write bootstrap result JSON here.
resultPath string
// jsonArgsPath, if not empty, is the path to a JSON file containing an array
// of strings, each of which is a command-line argument to the bootstrapped
// command.
jsonArgsPath string
// chdir, if not empty, is the directory to switch to before running the
// bootstrapped command.
chdir string
// attach, if true, automatically attaches the subprocess's STDOUT and STDERR
// streams to the Butler.
attach bool
stdin bool
stdout streamConfig // Stream configuration for STDOUT.
stderr streamConfig // Stream configuration for STDERR.
}
func (cmd *runCommandRun) Run(app subcommands.Application, args []string, _ subcommands.Env) int {
a := app.(*application)
if cmd.jsonArgsPath != "" {
if len(args) > 0 {
log.Errorf(a, "Cannot supply both JSON and command-line arguments.")
return configErrorReturnCode
}
err := error(nil)
args, err = cmd.loadJSONArgs()
if err != nil {
log.Fields{
log.ErrorKey: err,
"path": cmd.jsonArgsPath,
}.Errorf(a, "Failed to load JSON arguments.")
return configErrorReturnCode
}
log.Fields{
"args": args,
"path": cmd.jsonArgsPath,
}.Debugf(a, "Loaded arguments from JSON file.")
}
if len(args) == 0 {
log.Errorf(a, "A command must be specified.")
return configErrorReturnCode
}
// Resolve
streamServer, err := streamserver.New(a, "")
if err != nil {
log.WithError(err).Errorf(a, "Invalid stream server URI.")
return configErrorReturnCode
}
// Get the actual path to the command
commandPath, err := exec.LookPath(args[0])
if err != nil {
log.Fields{
log.ErrorKey: err,
"command": args[0],
}.Errorf(a, "Failed to identify command path.")
return runtimeErrorReturnCode
}
args = args[1:]
// Verify our current working directory. "cwd" is only used for validation and
// logging.
cwd := cmd.chdir
if cwd != "" {
fi, err := os.Stat(cwd)
if err != nil {
log.Fields{
log.ErrorKey: err,
"path": cwd,
}.Errorf(a, "Failed to stat `chdir` directory.")
return runtimeErrorReturnCode
}
if !fi.IsDir() {
log.Fields{
"path": cwd,
}.Errorf(a, "Target `chdir` path is not a directory.")
return runtimeErrorReturnCode
}
} else {
// Just for output.
cwd, _ = os.Getwd()
}
// Get our output.
of, err := a.getOutputFactory()
if err != nil {
log.WithError(err).Errorf(a, "Failed to get output factory instance.")
return runtimeErrorReturnCode
}
output, err := of.configOutput(a)
if err != nil {
log.WithError(err).Errorf(a, "Failed to create output instance.")
return runtimeErrorReturnCode
}
defer output.Close()
// Update our environment for the child process to inherit
bsEnv := output.URLConstructionEnv()
// Configure stream server
streamServerOwned := true
log.Fields{
"url": streamServer.Address(),
}.Infof(a, "Creating stream server.")
if err := streamServer.Listen(); err != nil {
log.WithError(err).Errorf(a, "Failed to connect to stream server.")
return runtimeErrorReturnCode
}
log.Infof(a, "listening on %q", streamServer.Address())
defer func() {
if streamServerOwned {
streamServer.Close()
}
}()
bsEnv.StreamServerURI = streamServer.Address()
// Build our command environment.
env := environ.System()
bsEnv.Augment(env)
// Construct and execute the command.
procCtx, procCancelFunc := context.WithCancel(a)
defer procCancelFunc()
proc := exec.CommandContext(procCtx, commandPath, args...)
proc.Dir = cmd.chdir
proc.Env = env.Sorted()
if cmd.stdin {
proc.Stdin = os.Stdin
}
// Attach STDOUT / STDERR pipes if configured to do so.
//
// This will happen in one of two ways:
// - If no stream name is provided, we will not create a Butler stream for
// this output; instead, we will connect them directly to the bootstrapped
// process.
// - If a stream name is provided, we will create a Butler stream for this
// output, and tee as necessary.
//
// When connecting the stream to the Butler, we track them with a
// sync.WaitGroup because we have to wait for them to close before reaping the
// process (see exec.Cmd's StdoutPipe method).
//
// In this case, we will hand them to the Butler, which will Close then when
// it counters io.EOF.
var (
stdout, stderr io.ReadCloser
streamWG sync.WaitGroup
)
if cmd.attach {
// STDOUT
if cmd.stdout.Name == "" {
// Forward directly.
proc.Stdout = os.Stdout
} else {
// Connect to Butler.
stdout, err = proc.StdoutPipe()
if err != nil {
log.WithError(err).Errorf(a, "Failed to get STDOUT pipe.")
return runtimeErrorReturnCode
}
stdout = &callbackReadCloser{stdout, streamWG.Done}
streamWG.Add(1)
}
// STDERR
if cmd.stderr.Name == "" {
// Forward directly.
proc.Stderr = os.Stderr
} else {
// Connect to Butler.
stderr, err = proc.StderrPipe()
if err != nil {
log.WithError(err).Errorf(a, "Failed to get STDERR pipe.")
return runtimeErrorReturnCode
}
stderr = &callbackReadCloser{stderr, streamWG.Done}
streamWG.Add(1)
}
}
if log.IsLogging(a, log.Debug) {
log.Fields{
"commandPath": commandPath,
"cwd": cwd,
"args": args,
}.Debugf(a, "Executing application.")
for _, entry := range proc.Env {
log.Debugf(a, "Environment variable: %s", entry)
}
}
// Attach and run our Butler instance.
var (
executed = false
returnCode = runtimeErrorReturnCode
)
err = a.runWithButler(output, func(b *butler.Butler) error {
// We want to kill our process early if our Butler run finishes.
defer procCancelFunc()
// If we have configured a stream server, add it.
if streamServer != nil {
b.AddStreamServer(streamServer)
streamServerOwned = false
}
// Add our pipes as direct streams, if configured.
if stdout != nil {
if err := b.AddStream(stdout, cmd.stdout.properties()); err != nil {
return errors.Annotate(err, "failed to attach STDOUT pipe stream").Err()
}
}
if stderr != nil {
if err := b.AddStream(stderr, cmd.stderr.properties()); err != nil {
return errors.Annotate(err, "failed to attach STDERR pipe stream").Err()
}
}
// Execute the command. The bootstrapped application will begin executing
// in the background.
if err := proc.Start(); err != nil {
return errors.Annotate(err, "failed to start bootstrapped process").Err()
}
// Wait for the process's streams to finish. We must do this before Wait()
// on the process itself.
streamWG.Wait()
// Reap the process.
err := proc.Wait()
if rc, ok := exitcode.Get(err); ok {
if rc != 0 {
log.Fields{
"returnCode": rc,
}.Errorf(a, "Command completed with non-zero return code.")
}
returnCode = rc
executed = true
} else {
log.WithError(err).Errorf(a, "Command failed.")
}
// Wait for our Butler to finish.
b.Activate()
if err := b.Wait(); err != nil {
if err == context.Canceled {
return err
}
return errors.Annotate(err, "failed to Wait() for Butler").Err()
}
return nil
})
if err != nil {
logAnnotatedErr(a, err, "Error running bootstrapped Butler process:")
}
if !executed {
return runtimeErrorReturnCode
}
// Output our bootstrap result.
br := bootstrapResult.Result{
ReturnCode: returnCode,
Command: append([]string{commandPath}, args...),
}
if err := cmd.maybeWriteResult(a, &br); err != nil {
logAnnotatedErr(a, err, "Failed to write bootstrap result:")
}
return returnCode
}
func (cmd *runCommandRun) maybeWriteResult(ctx context.Context, r *bootstrapResult.Result) error {
if cmd.resultPath == "" {
return nil
}
log.Fields{
"path": cmd.resultPath,
}.Debugf(ctx, "Writing bootstrap result.")
if err := r.WriteJSON(cmd.resultPath); err != nil {
return errors.Annotate(err, "failed to write JSON file").
InternalReason("path(%s)", cmd.resultPath).Err()
}
return nil
}
func (cmd *runCommandRun) loadJSONArgs() ([]string, error) {
fd, err := os.Open(cmd.jsonArgsPath)
if err != nil {
return nil, err
}
defer fd.Close()
dec := json.NewDecoder(fd)
args := []string(nil)
if err := dec.Decode(&args); err != nil {
return nil, err
}
return args, nil
}
// callbackReadCloser invokes a callback method when closed.
type callbackReadCloser struct {
io.ReadCloser
callback func()
}
func (c *callbackReadCloser) Close() error {
defer c.callback()
return c.ReadCloser.Close()
}