blob: 0b94a96eb4ca1ff78dc1bee2ecbc9d8038dbce73 [file] [log] [blame]
// Copyright 2016 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/maruel/subcommands"
"go.chromium.org/luci/auth/authctx"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/buildbucket/protoutil"
"go.chromium.org/luci/common/cli"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/system/environ"
"go.chromium.org/luci/common/system/exitcode"
"go.chromium.org/luci/common/system/filesystem"
"go.chromium.org/luci/grpc/prpc"
"go.chromium.org/luci/logdog/client/butler"
"go.chromium.org/luci/logdog/client/butler/bootstrap"
"go.chromium.org/luci/logdog/client/butler/output"
out "go.chromium.org/luci/logdog/client/butler/output/logdog"
"go.chromium.org/luci/logdog/client/butler/output/null"
"go.chromium.org/luci/logdog/client/butler/streamserver"
"go.chromium.org/luci/logdog/client/butlerlib/streamclient"
"go.chromium.org/luci/logdog/common/types"
"go.chromium.org/luci/lucictx"
"go.chromium.org/luci/luciexe/legacy/annotee"
"go.chromium.org/luci/luciexe/legacy/annotee/annotation"
annopb "go.chromium.org/luci/luciexe/legacy/annotee/proto"
"infra/libs/infraenv"
"infra/tools/kitchen/build"
"infra/tools/kitchen/cookflags"
"infra/tools/kitchen/third_party/recipe_engine"
)
const (
// BootstrapStepName is the name of kitchen's step where it makes preparations
// for running a recipe, e.g. fetches a repository.
BootstrapStepName = "recipe bootstrap"
// defaultRPCTimeout is the default LogDog RPC timeout to apply.
defaultRPCTimeout = 30 * time.Second
// logDogViewerURLTag is a special LogDog tag that is recognized by the LogDog
// viewer as a link to the log stream's build page.
logDogViewerURLTag = "logdog.viewer_url"
// Buildbucket rejects RPCs if SummaryMarkdown is longer than this.
maxSummaryLength = 4000
)
// cmdCook runs a recipe.
var cmdCook = &subcommands.Command{
UsageLine: "cook -recipe <recipe>",
ShortDesc: "bootstraps a LUCI job.",
LongDesc: "Bootstraps a LUCI job.",
CommandRun: func() subcommands.CommandRun {
var c cookRun
c.CookFlags.Register(&c.Flags)
return &c
},
}
type cookRun struct {
subcommands.CommandRunBase
cookflags.CookFlags
engine recipeEngine
kitchenProps *kitchenProperties
systemAuth *authctx.Context // used by kitchen itself
recipeAuth *authctx.Context // used by the recipe
buildSecrets *buildbucketpb.BuildSecrets
bu *buildUpdater
}
// kitchenProperties defines the structure of "$kitchen" build property.
//
// It is consumed exclusively by Kitchen and not even passed along to the recipe
// engine.
type kitchenProperties struct {
GitAuth bool `json:"git_auth"`
DevShell bool `json:"devshell"`
EmulateGCE bool `json:"emulate_gce"`
DockerAuth bool `json:"docker_auth"`
FirebaseAuth bool `json:"firebase_auth"`
}
// normalizeFlags validates and normalizes flags.
func (c *cookRun) normalizeFlags() error {
if err := c.CookFlags.Normalize(); err != nil {
return err
}
c.engine.recipeName = c.RecipeName
return nil
}
// runRecipe runs the recipe.
func (c *cookRun) runRecipe(ctx context.Context, env environ.Env) *build.BuildRunResult {
result := &build.BuildRunResult{
Recipe: &build.BuildRunResult_Recipe{
Name: c.RecipeName,
},
}
fail := func(err error) *build.BuildRunResult {
if err == nil {
panic("do not call fail with nil err")
}
if result.InfraFailure != nil {
panic("bug! forgot to return the result on previous error")
}
result.InfraFailure = infraFailure(err)
return result
}
// The ready-to-run recipe must already be present on the file system in
// checkout-dir.
recipesPath, err := exec.LookPath(filepath.Join(c.CheckoutDir, "recipes"))
if err != nil {
return fail(errors.Annotate(err, "could not find bundled recipes").Err())
}
// LookPath can return an absolute OR relative path. Use Abs to make sure.
recipesPath, err = filepath.Abs(recipesPath)
if err != nil {
return fail(errors.Annotate(err, "could not convert bundled recipes to abspath").Err())
}
c.engine.cmdPrefix = []string{recipesPath}
// Setup our working directory. This is cwd for the recipe itself.
// Previously this was unnecessarily configurable; Now we hard-code it to
// "$CWD/k", which is the shortest path we can make. This is important to
// allow tasks on Windows to have as many characters as possible; otherwise
// they run into MAX_PATH issues.
c.engine.workDir, err = prepareRecipeRunWorkDir("k")
if err != nil {
return fail(errors.Annotate(err, "failed to prepare workdir").Err())
}
// Tell the recipe to write the result protobuf message to a file and read
// it below.
c.engine.outputResultJSONFile = filepath.Join(c.TempDir, "recipe-result.json")
// Run the recipe in the appropriate auth context by exporting it into the
// environ of the recipe engine.
exported, err := lucictx.ExportInto(c.recipeAuth.Export(ctx, env), c.TempDir)
if err != nil {
return fail(errors.Annotate(err, "failed to export LUCI_CONTEXT").Err())
}
defer exported.Close()
exported.SetInEnviron(env)
result.AnnotationUrl = c.AnnotationURL.String()
if err = c.runWithLogdogButler(ctx, env, result); err != nil {
return fail(errors.Annotate(err, "failed to run recipe").Err())
}
setAnnotationText(result.Annotations)
// Now read the recipe result file.
recipeResultFile, err := os.Open(c.engine.outputResultJSONFile)
if err != nil {
// The recipe result file must exist and be readable.
// If it is not, it is a fatal error.
return fail(errors.Annotate(err,
"could not read recipe result file at %q", c.engine.outputResultJSONFile).Err())
}
defer recipeResultFile.Close()
if c.RecipeResultByteLimit > 0 {
st, err := recipeResultFile.Stat()
if err != nil {
return fail(errors.Annotate(err,
"could not stat recipe result file at %q", c.engine.outputResultJSONFile).Err())
}
if sz := st.Size(); sz > int64(c.RecipeResultByteLimit) {
return fail(errors.Reason("recipe result file is %d bytes which is more than %d",
sz, c.RecipeResultByteLimit).Err())
}
}
result.RecipeResult = &recipe_engine.Result{}
err = (&jsonpb.Unmarshaler{
AllowUnknownFields: true,
}).Unmarshal(recipeResultFile, result.RecipeResult)
if err != nil {
return fail(errors.Annotate(err, "could not parse recipe result").Err())
}
// TODO(nodir): remove result.
if result.RecipeResult.GetFailure() != nil && result.RecipeResult.GetFailure().GetFailure() == nil {
// The recipe run has failed and the failure type is not step failure.
result.InfraFailure = &build.InfraFailure{
Text: fmt.Sprintf("recipe infra failure: %s", result.RecipeResult.GetFailure().HumanReason),
Type: build.InfraFailure_RECIPE_INFRA_FAILURE,
}
return result
}
return result
}
// pathModuleProperties returns properties for the "recipe_engine/path" module.
func (c *cookRun) pathModuleProperties() (map[string]string, error) {
recipeTempDir := filepath.Join(c.TempDir, "rt")
if err := ensureDir(recipeTempDir); err != nil {
return nil, err
}
paths := []struct{ name, path string }{
{"cache_dir", c.CacheDir},
{"temp_dir", recipeTempDir},
}
props := make(map[string]string, len(paths))
for _, p := range paths {
if p.path == "" {
continue
}
native := filepath.FromSlash(p.path)
if err := filesystem.AbsPath(&native); err != nil {
return nil, err
}
props[p.name] = native
}
return props, nil
}
// prepareProperties parses the properties specified by flags, validates them,
// add some extra properties to describe current build environment and pops
// properties consumed specifically by kitchen.
//
// May mutate some other properties too.
func (c *cookRun) prepareProperties(env environ.Env) (map[string]interface{}, *kitchenProperties, error) {
props, err := parseProperties(c.Properties, c.PropertiesFile)
if err != nil {
return nil, nil, errors.Annotate(err, "could not parse properties").Err()
}
if props == nil {
props = map[string]interface{}{}
}
// Reject reserved properties.
rejectProperties := []string{
"$recipe_engine/path",
"$recipe_engine/step",
"bot_id",
// not specifying path_config means that all paths must be passed
// explicitly. We do that below.
"path_config",
}
for _, p := range rejectProperties {
if _, ok := props[p]; ok {
return nil, nil, errors.Reason("%s property must not be set", p).Err()
}
}
// Configure paths that the recipe will use.
pathProps, err := c.pathModuleProperties()
if err != nil {
return nil, nil, err
}
props["$recipe_engine/path"] = pathProps
// Use "generic" infra path config. See
// https://chromium.googlesource.com/chromium/tools/depot_tools/+/master/recipes/recipe_modules/infra_paths/
props["path_config"] = "generic"
botID, ok := env.Get("SWARMING_BOT_ID")
if !ok {
return nil, nil, errors.Reason("no Swarming bot ID in $SWARMING_BOT_ID").Err()
}
props["bot_id"] = botID
// Extract "$kitchen" properties into more usable struct.
kitchenProps := &kitchenProperties{
GitAuth: true,
DevShell: true,
EmulateGCE: false, // TODO(vadimsh): Make it 'true' if it works reliably.
DockerAuth: true,
FirebaseAuth: false,
}
if val, _ := props["$kitchen"]; val != nil {
blob, err := json.Marshal(val)
if err != nil {
return nil, nil, errors.Annotate(err, "impossible serialization error").Err()
}
if err := json.Unmarshal(blob, kitchenProps); err != nil {
return nil, nil, errors.Annotate(err, "failed to deserialize $kitchen properties").Err()
}
}
delete(props, "$kitchen")
// EmulateGCE supersedes DevShell.
if kitchenProps.EmulateGCE {
kitchenProps.DevShell = false
}
return props, kitchenProps, nil
}
func (c *cookRun) Run(a subcommands.Application, args []string, env subcommands.Env) int {
// The first thing we do is write a result file in case we crash or get killed.
// Note that this code is not reachable if subcommands package could not
// parse flags.
result := &buildbucketpb.Build{
Status: buildbucketpb.Status_INFRA_FAILURE,
SummaryMarkdown: "kitchen crashed or got killed",
}
if err := c.flushResult(result); err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}
ctx := cli.GetContext(a, c, env)
sysEnv := environ.System()
result, recipeExitCode := c.run(ctx, args, sysEnv)
fmt.Println(strings.Repeat("-", 35), "RESULTS", strings.Repeat("-", 36))
proto.MarshalText(os.Stdout, result)
fmt.Println(strings.Repeat("-", 80))
if err := c.flushResult(result); err != nil {
fmt.Fprintf(os.Stderr, "could not flush result to a file: %s\n", err)
return 1
}
if result.Status == buildbucketpb.Status_INFRA_FAILURE {
fmt.Fprintln(os.Stderr, "run failed because of an infra failure")
return 1
}
return recipeExitCode
}
// run runs the cook subcommmand and returns Build result and recipe exit code.
// If the returned Build.Status == INFRA_FAILURE, then the recipe may not have run
// and the exit code is bogus.
func (c *cookRun) run(ctx context.Context, args []string, env environ.Env) (*buildbucketpb.Build, int) {
fail := func(err error) (*buildbucketpb.Build, int) {
return &buildbucketpb.Build{
Status: buildbucketpb.Status_INFRA_FAILURE,
SummaryMarkdown: err.Error(),
}, 1
}
// Process input.
if len(args) != 0 {
return fail(errors.Reason("unexpected arguments: %v", args).Err())
}
if _, err := os.Getwd(); err != nil {
return fail(errors.Reason("failed to resolve CWD: %s", err).Err())
}
if err := c.normalizeFlags(); err != nil {
return fail(err)
}
// initialize temp dir.
if c.TempDir == "" {
tdir, err := ioutil.TempDir("", "kitchen")
if err != nil {
return fail(errors.Annotate(err, "failed to create temporary directory").Err())
}
c.TempDir = tdir
defer func() {
if rmErr := os.RemoveAll(tdir); rmErr != nil {
log.Warningf(ctx, "Failed to clean up temporary directory at [%s]: %s", tdir, rmErr)
}
}()
}
// Prepare recipe properties. Print them too.
var err error
if c.engine.properties, c.kitchenProps, err = c.prepareProperties(env); err != nil {
return fail(err)
}
if err = c.reportProperties(ctx, "recipe engine", c.engine.properties); err != nil {
return fail(err)
}
if err = c.reportProperties(ctx, "kitchen", c.kitchenProps); err != nil {
return fail(err)
}
if err = c.updateEnv(env); err != nil {
return fail(errors.Annotate(err, "failed to update the environment").Err())
}
// Make kitchen use the new $PATH too. This is needed for exec.LookPath called
// by kitchen to pick up binaries in the modified $PATH. In practice, we do it
// so that kitchen uses the installed git wrapper.
//
// All other env modifications must be performed using 'env' object.
path, _ := env.Get("PATH")
if err = os.Setenv("PATH", path); err != nil {
return fail(errors.Annotate(err, "failed to update process PATH").Err())
}
// Read BuildSecrets message from swarming secret bytes.
if c.buildSecrets, err = readBuildSecrets(ctx); err != nil {
return fail(errors.Annotate(err, "failed to read build secrets").Err())
}
// Get resultdb parameters from the buildbucket property and build secrets.
ctx = c.syncResultDBInfo(ctx)
if err != nil {
return fail(errors.Annotate(err, "failed to sync resultdb info").Err())
}
// Create systemAuth and recipeAuth authentication contexts, since we are
// about to start making authenticated requests now.
if err := c.setupAuth(ctx); err != nil {
return fail(errors.Annotate(err, "failed to setup auth").Err())
}
defer c.recipeAuth.Close(ctx)
defer c.systemAuth.Close(ctx)
// Must happen after c.systemAuth is initialized.
// We create a build updater even if c.CallUpdateBuild is false because we use it to
// construct the req.Build, which is needed by flushResult. Extracting the logic to construct
// req.Build from the BuildUpdater would require large changes, and we plan to replace for
// this code entirely with LUCI runner.
c.bu, err = c.newBuildUpdater()
if err != nil {
return fail(errors.Annotate(err, "failed to create a build updater").Err())
}
// Run the recipe.
result := c.runRecipe(ctx, env)
req, err := c.bu.ParseAnnotations(ctx, result.Annotations)
if err != nil {
return fail(errors.Annotate(err, "failed to parse final annotations").Err())
}
// Mark incomplete steps as canceled.
endTime, err := ptypes.TimestampProto(clock.Now(ctx))
if err != nil {
return fail(err)
}
for _, s := range req.Build.Steps {
if !protoutil.IsEnded(s.Status) {
s.Status = buildbucketpb.Status_CANCELED
if s.SummaryMarkdown != "" {
s.SummaryMarkdown += "\n"
}
s.SummaryMarkdown += "step was canceled because it did not end before build ended"
s.EndTime = endTime
}
}
// If the build failed, update the build status.
// If it succeeded, do not set it just yet, since there are more ways
// the swarming task can fail.
switch {
case result.InfraFailure != nil:
req.Build.Status = buildbucketpb.Status_INFRA_FAILURE
req.Build.SummaryMarkdown = result.InfraFailure.Text
req.UpdateMask.Paths = append(req.UpdateMask.Paths, "build.status", "build.summary_markdown")
case result.RecipeResult.GetFailure() != nil:
// Note: if this recipe failure is an infra failure,
// result.InfraFailure above is non-nil.
req.Build.Status = buildbucketpb.Status_FAILURE
req.Build.SummaryMarkdown = result.RecipeResult.GetFailure().HumanReason
req.UpdateMask.Paths = append(req.UpdateMask.Paths, "build.status", "build.summary_markdown")
}
// Buildbucket rejects the request if SummaryMarkdown is longer than a
// certain length.
if len(req.Build.SummaryMarkdown) > maxSummaryLength {
dots := "..."
req.Build.SummaryMarkdown = req.Build.SummaryMarkdown[:maxSummaryLength-len(dots)] + dots
}
if c.CallUpdateBuild {
// The final UpdateBuild call is critical.
// If it fails, it is fatal to the build.
if err := c.bu.UpdateBuild(ctx, req); err != nil {
return fail(errors.Annotate(err, "failed to send final build state to buildbucket").Err())
}
}
recipeExitCode := 1
if result.RecipeExitCode != nil {
recipeExitCode = int(result.RecipeExitCode.Value)
}
// After the call to UpdateBuild we can safely set the Build successful.
if recipeExitCode == 0 {
req.Build.Status = buildbucketpb.Status_SUCCESS
}
return req.Build, recipeExitCode
}
// flushResult writes the result to c.OutputResultJSOPath file
// if the path is specified.
func (c *cookRun) flushResult(result *buildbucketpb.Build) (err error) {
if c.OutputResultJSONPath == "" {
return nil
}
defer func() {
if err != nil {
err = errors.Annotate(err, "could not write result file at %q", c.OutputResultJSONPath).Err()
}
}()
f, err := os.Create(c.OutputResultJSONPath)
if err != nil {
return err
}
defer func() {
err = f.Close()
if err != nil {
err = errors.Annotate(err, "could not close file").Err()
}
}()
m := jsonpb.Marshaler{EmitDefaults: true}
return m.Marshal(f, result)
}
// updateEnv updates temp path env variables in env.
func (c *cookRun) updateEnv(env environ.Env) error {
if c.TempDir == "" {
// It should have been initialized in c.run.
panic("TempDir was not initialized earlier")
}
// Tell subprocesses to use a subdirectory in Kitchen's temp dir. Note that
// we can't use TempDir directly because some overzealous scripts like to
// remove everything they find under TEMPDIR, and it breaks Kitchen internals
// that keep some files in c.TempDir (in particular git and gsutil configs
// setup by AuthContext).
tmp := filepath.Join(c.TempDir, "t")
if err := os.Mkdir(tmp, 0700); err != nil && !os.IsExist(err) {
return errors.Annotate(err, "failed to create temp dir for the task (%s)", tmp).Err()
}
for _, v := range []string{"TEMPDIR", "TMPDIR", "TEMP", "TMP", "MAC_CHROMIUM_TMPDIR"} {
env.Set(v, tmp)
}
return nil
}
// reportProperties serializes to JSON and logs given properties.
func (c *cookRun) reportProperties(ctx context.Context, realm string, props interface{}) error {
propsJSON, err := json.MarshalIndent(props, "", " ")
if err != nil {
return errors.Annotate(err, "could not marshal properties to JSON").Err()
}
log.Infof(ctx, "using %s properties:\n%s", realm, propsJSON)
return nil
}
// setupAuth prepares systemAuth and recipeAuth contexts based on incoming
// environment and command line flags.
//
// Such contexts can be used by Kitchen itself or by subprocesses launched by
// Kitchen.
//
// There are two such contexts: a system context and a recipe context.
//
// The system auth context is used for running logdog and updating Buildbucket
// build state. On Swarming all these actions will use bot-associated account
// (specified in Swarming bot config), whose logical name (usually "system") is
// provided via "-luci-system-account" command-line flag.
//
// The recipe auth context is used for actually running the recipe. It is the
// context the kitchen starts with by default. On Swarming this will be the
// context associated with service account specified in the Swarming task
// definition.
func (c *cookRun) setupAuth(ctx context.Context) error {
// Construct authentication option with the set of scopes to be used through
// out Kitchen. This is superset of all scopes we might need. It is more
// efficient to create a single token with all the scopes than make a bunch of
// smaller-scoped tokens. We trust Google APIs enough to send widely-scoped
// tokens to them.
//
// Note that kitchen subprocesses (git, recipes engine, etc) are still free to
// request whatever scopes they need (though LUCI_CONTEXT protocol). The
// scopes here are only for parts of Kitchen (LogDog client, BigQuery export,
// Devshell proxy, etc).
//
// See https://developers.google.com/identity/protocols/googlescopes for list of
// available scopes.
authOpts := infraenv.DefaultAuthOptions()
authOpts.Scopes = []string{
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
}
if c.kitchenProps.FirebaseAuth {
authOpts.Scopes = append(authOpts.Scopes, "https://www.googleapis.com/auth/firebase")
}
// If we are given -luci-system-account flag, use the corresponding logical
// account if it's in the LUCI_CONTEXT (fail if not).
//
// Otherwise, we run Kitchen with whatever is default account now (don't
// switch to a system one). Happens when running Kitchen manually locally. It
// picks up the developer account.
systemCtx := ctx
if c.SystemAccount != "" {
var err error
systemCtx, err = lucictx.SwitchLocalAccount(ctx, c.SystemAccount)
if err != nil {
return errors.Annotate(err, "failed to prepare system auth context").Err()
}
}
systemAuth := &authctx.Context{
ID: "system",
Options: authOpts,
EnableGitAuth: c.kitchenProps.GitAuth,
EnableDevShell: c.kitchenProps.DevShell,
EnableGCEEmulation: c.kitchenProps.EmulateGCE,
EnableDockerAuth: c.kitchenProps.DockerAuth,
EnableFirebaseAuth: c.kitchenProps.FirebaseAuth,
KnownGerritHosts: c.KnownGerritHost,
}
if err := systemAuth.Launch(systemCtx, c.TempDir); err != nil {
return errors.Annotate(err, "failed to start system auth context").Err()
}
// Recipes always use the account that is set as default when kitchen starts
// (it is a task-associated account on Swarming). So don't switch accounts.
recipeAuth := &authctx.Context{
ID: "task",
Options: authOpts,
EnableGitAuth: c.kitchenProps.GitAuth,
EnableDevShell: c.kitchenProps.DevShell,
EnableGCEEmulation: c.kitchenProps.EmulateGCE,
EnableDockerAuth: c.kitchenProps.DockerAuth,
EnableFirebaseAuth: c.kitchenProps.FirebaseAuth,
KnownGerritHosts: c.KnownGerritHost,
}
if err := recipeAuth.Launch(ctx, c.TempDir); err != nil {
systemAuth.Close(ctx) // best effort cleanup
return errors.Annotate(err, "failed to start recipe auth context").Err()
}
// Log the actual service account emails corresponding to each context.
systemAuth.Report(ctx)
recipeAuth.Report(ctx)
c.systemAuth = systemAuth
c.recipeAuth = recipeAuth
return nil
}
// globalTags returns tags to be applied to all logdog streams by default.
func (c *cookRun) globalTags(env environ.Env) map[string]string {
ret := make(map[string]string, len(c.GlobalLogDogTags)+1)
if c.BuildURL != "" {
ret[logDogViewerURLTag] = c.BuildURL
}
// SWARMING_SERVER is the full URL: https://example.com
// We want just the hostname.
if v, ok := env.Get("SWARMING_SERVER"); ok {
if u, err := url.Parse(v); err == nil && u.Host != "" {
ret["swarming.host"] = u.Host
}
}
if v, ok := env.Get("SWARMING_TASK_ID"); ok {
ret["swarming.run_id"] = v
}
if v, ok := env.Get("SWARMING_BOT_ID"); ok {
ret["bot_id"] = v
}
// Prefer user-supplied tags to our generated ones.
for k, v := range c.GlobalLogDogTags {
ret[k] = v
}
return ret
}
// butlerOutput creates LogDog output destination.
// The caller is responsible for closing it.
func (c *cookRun) butlerOutput(ctx context.Context) (output.Output, error) {
if c.NullOutput {
return &null.Output{}, nil
}
prefix, _ := c.AnnotationURL.Path.Split()
ocfg := out.Config{
Auth: out.LegacyAuth(c.systemAuth.Authenticator()),
Host: c.AnnotationURL.Host,
Project: c.AnnotationURL.Project,
Prefix: prefix,
SourceInfo: []string{"Kitchen"},
RPCTimeout: defaultRPCTimeout,
PublishContext: withNonCancel(ctx),
}
return ocfg.Register(ctx)
}
func (c *cookRun) newButler(ctx context.Context, out output.Output, env environ.Env) (*butler.Butler, error) {
cfg := butler.Config{
Output: out,
BufferLogs: true,
MaxBufferAge: butler.DefaultMaxBufferAge,
GlobalTags: c.globalTags(env),
}
return butler.New(ctx, cfg)
}
// runWithLogdogButler runs the supplied command through the a LogDog Butler
// engine instance. This involves:
// - Configuring / setting up the Butler.
// - Initiating a LogDog Pub/Sub Output, registering with remote server.
// - Running the recipe process.
// - Hook its output streams up through an Annotee processor.
// - Wait for the subprocess to finish.
// - Shut down the Butler instance.
// Writes res.RecipeExitCode and res.Annotations on success.
func (c *cookRun) runWithLogdogButler(ctx context.Context, env environ.Env, res *build.BuildRunResult) (err error) {
log.Infof(ctx, "Using LogDog URL: %s", &c.AnnotationURL)
// Install a global gRPC logger adapter. This routes gRPC log messages that
// are emitted through our logger. We only log gRPC prints if our logger is
// configured to log debug-level or lower.
disableGRPCLogging(ctx)
// Create our stream server instance.
streamServer, err := c.getLogDogStreamServer(withNonCancel(ctx))
if err != nil {
return errors.Annotate(err, "failed to generate stream server").Err()
}
if err = streamServer.Listen(); err != nil {
return errors.Annotate(err, "failed to listen on stream server").Err()
}
defer func() {
if streamServer != nil {
streamServer.Close()
}
}()
log.Debugf(ctx, "Generated stream server at: %s", streamServer.Address())
// Use the annotation stream's prefix component for our Butler run.
prefix, annoName := c.AnnotationURL.Path.Split()
// Augment our environment with Butler parameters.
bsEnv := bootstrap.Environment{
Project: c.AnnotationURL.Project,
Prefix: string(prefix),
StreamServerURI: streamServer.Address(),
CoordinatorHost: c.AnnotationURL.Host,
}
bsEnv.Augment(env)
// Create a Butler.
butlerOutput, err := c.butlerOutput(ctx)
if err != nil {
return errors.Annotate(err, "failed to create LogDog Output instance").Err()
}
defer butlerOutput.Close()
b, err := c.newButler(withNonCancel(ctx), butlerOutput, env)
if err != nil {
return errors.Annotate(err, "failed to create Butler instance").Err()
}
defer func() {
b.Activate()
if ierr := b.Wait(); ierr != nil {
ierr = errors.Annotate(ierr, "failed to Wait() for Butler").Err()
logAnnotatedErr(ctx, ierr)
// Promote to function output error if we don't have one yet.
if err == nil {
err = ierr
}
}
}()
b.AddStreamServer(streamServer)
streamServer = nil
// Start our bootstrapped subprocess.
//
// We need to consume all of its streams prior to waiting for completion (see
// exec.Cmd).
//
// We'll set up our own cancellation function to help ensure that the process
// is properly terminated regardless of any encountered errors.
procCtx, procCancelFunc := context.WithCancel(ctx)
defer procCancelFunc()
proc, err := c.engine.commandRun(procCtx, filepath.Join(c.TempDir, "rr"), env)
if err != nil {
return errors.Annotate(err, "failed to build recipe command").Err()
}
// Build pipes for our STDOUT and STDERR streams.
stdout, err := proc.StdoutPipe()
if err != nil {
return errors.Annotate(err, "failed to get STDOUT pipe").Err()
}
defer stdout.Close()
stderr, err := proc.StderrPipe()
if err != nil {
return errors.Annotate(err, "failed to get STDERR pipe").Err()
}
defer stderr.Close()
// Start our bootstrapped subprocess.
printCommand(ctx, proc)
if err = proc.Start(); err != nil {
return errors.Annotate(err, "failed to start command").Err()
}
// While the subprocess runs, continuously read its output.
execMetadata := annotation.ProbeExecution(proc.Args, proc.Env, proc.Dir)
if res.Annotations, err = c.watchSubprocessOutput(ctx, annoName, stdout, stderr, execMetadata, b, procCancelFunc); err != nil {
log.Errorf(ctx, "failed to read subprocess output: %s", err)
log.Errorf(ctx, "killing it and waiting for it to die")
procCancelFunc()
// do not let the subprocess outlive this one.
if werr := proc.Wait(); werr != nil {
log.Errorf(ctx, "the subprocess died: %s", werr)
}
return errors.Annotate(err, "failed to read subprocess output").Err()
}
// Wait for the subprocess to finish.
// Most likely it already finished because watching stops when stdout/stderr
// are closed.
waitErr := proc.Wait()
switch waitRC, hasRC := exitcode.Get(waitErr); {
case hasRC:
log.Warningf(ctx, "subprocess exited with code %d", waitRC)
res.RecipeExitCode = &build.OptionalInt32{Value: int32(waitRC)}
case waitErr != nil:
return errors.Annotate(waitErr, "failed to Wait() for process").Err()
}
return nil
}
// watchSubprocessOutput annotates stdout/stderr and writes the annotation
// messages to a stream, and optionally sends build info to Buildbucket server
// (if c.CallUpdateBuild is true).
func (c *cookRun) watchSubprocessOutput(ctx context.Context, annStreamName types.StreamName, stdout, stderr io.Reader, execMetadata *annotation.Execution, b *butler.Butler, stopSubprocess func()) (build *annopb.Step, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Determine our base path and annotation subpath.
basePath, annoSubpath := annStreamName.Split()
annoteeOpts := annotee.Options{
Base: basePath,
AnnotationSubpath: annoSubpath,
Client: streamclient.NewLoopback(b, ""),
Execution: execMetadata,
MetadataUpdateInterval: 30 * time.Second,
Offline: false,
CloseSteps: true,
}
var stopBU func() error
if c.CallUpdateBuild {
annoteeOpts.AnnotationUpdated = c.bu.AnnotationUpdated
errC := make(chan error)
done := make(chan struct{})
stopBU = func() error {
close(done)
return <-errC
}
go func() {
err := c.bu.Run(ctx, done)
if err != nil {
log.Errorf(ctx, "build runner failed; killing the subprocess: %s", err)
stopSubprocess()
}
errC <- err
}()
}
annoteeProcessor := annotee.New(withNonCancel(ctx), annoteeOpts)
// Run STDOUT/STDERR streams through the processor.
streams := []*annotee.Stream{
{
Reader: stdout,
Name: annotee.STDOUT,
Annotate: true,
},
{
Reader: stderr,
Name: annotee.STDERR,
Annotate: true,
},
}
// Run the process' output streams through Annotee. This will block until
// they are all consumed.
if err := annoteeProcessor.RunStreams(streams); err != nil {
return nil, errors.Annotate(err, "failed to process streams through Annotee").Err()
}
// Stop the build updater, if any.
if stopBU != nil {
if err := stopBU(); err != nil {
return nil, errors.Annotate(err, "build updater failed").Err()
}
}
// Read the final annotation.
return annoteeProcessor.Finish().RootStep().Proto(), nil
}
// newBuildUpdater creates a buildUpdater that uses system auth for RPCs.
func (c *cookRun) newBuildUpdater() (*buildUpdater, error) {
httpClient, err := c.systemAuth.Authenticator().Client()
if err != nil {
return nil, errors.Annotate(err, "failed to create a system-auth HTTP client for updating build state on the server").Err()
}
return &buildUpdater{
annAddr: &c.AnnotationURL,
buildID: c.BuildbucketBuildID,
buildToken: c.buildSecrets.BuildToken,
client: buildbucketpb.NewBuildsPRPCClient(&prpc.Client{
Host: c.BuildbucketHostname,
C: httpClient,
}),
annotations: make(chan []byte),
}, nil
}
// getLogDogStreamServer returns a LogDog stream server instance configured for
// the current operating system.
//
// Because Windows doesn't have UNIX domain sockets, and Linux doesn't have
// named pipes, this becomes platform-specific.
func (c *cookRun) getLogDogStreamServer(ctx context.Context) (*streamserver.StreamServer, error) {
if runtime.GOOS == "windows" {
return streamserver.New(ctx, "kitchen")
}
return streamserver.New(ctx, filepath.Join(c.TempDir, "ld.sock"))
}
// syncResultDBInfo syncs resultDB info in build proto and lucictx.
// If there are resultdb's parameters in the build proto, copies them and the
// build secrets onto the appropriate section of lucictx (for buildbucket builds).
// Otherwise copies resultdb data from lucictx and update build proto (for led builds).
func (c *cookRun) syncResultDBInfo(ctx context.Context) context.Context {
if bbProp, ok := c.engine.properties["$recipe_engine/buildbucket"]; ok {
// The "build" value of the property above was parsed from json encoded text.
// Marshal it to a byte array and then populate a proto from it with jsonpb.
if buildMap, ok := bbProp.(map[string]interface{})["build"]; ok {
buildJSON, err := json.Marshal(buildMap)
if err != nil {
panic("Impossible marshaling error")
}
buildProto := &buildbucketpb.Build{}
err = (&jsonpb.Unmarshaler{
AllowUnknownFields: true,
}).Unmarshal(bytes.NewReader(buildJSON), buildProto)
if err != nil {
panic(errors.Annotate(err, "while decoding '$recipe_engine/buildbucket' property").Err())
}
if buildProto.GetInfra().GetResultdb().GetInvocation() != "" {
log.Infof(ctx, "found invocation in build proto: %s", buildProto.GetInfra().GetResultdb().GetInvocation())
return lucictx.SetResultDB(ctx, &lucictx.ResultDB{
Hostname: buildProto.GetInfra().GetResultdb().GetHostname(),
CurrentInvocation: &lucictx.ResultDBInvocation{
Name: buildProto.GetInfra().GetResultdb().GetInvocation(),
UpdateToken: c.buildSecrets.ResultdbInvocationUpdateToken,
},
})
} else if resultDBCtx := lucictx.GetResultDB(ctx); resultDBCtx != nil {
log.Infof(ctx, "found invocation in lucictx: %s", resultDBCtx.CurrentInvocation.Name)
buildProto.Infra.Resultdb = &buildbucketpb.BuildInfra_ResultDB{
Hostname: resultDBCtx.Hostname,
Invocation: resultDBCtx.CurrentInvocation.Name,
}
buf := &bytes.Buffer{}
if err := (&jsonpb.Marshaler{}).Marshal(buf, buildProto); err != nil {
panic("Failed to marshal build proto")
}
if err := json.Unmarshal(buf.Bytes(), &buildMap); err != nil {
panic("Failed to unmarshal json")
}
c.engine.properties["$recipe_engine/buildbucket"].(map[string]interface{})["build"] = buildMap
}
}
}
return ctx
}
func parseProperties(properties map[string]interface{}, propertiesFile string) (result map[string]interface{}, err error) {
if len(properties) > 0 {
return properties, nil
}
if propertiesFile != "" {
b, err := ioutil.ReadFile(propertiesFile)
if err != nil {
err = errors.Reason("could not read properties file %s\n%s", propertiesFile, err).Err()
return nil, err
}
err = json.Unmarshal(b, &result)
if err != nil {
err = errors.Reason("could not parse JSON from file %s\n%s\n%s",
propertiesFile, b, err).Err()
}
}
return
}
func setAnnotationText(s *annopb.Step) {
// TODO(nodir,iaanucci): clean this up when we define a new UI proto
s.Text = nil
for _, substep := range s.Substep {
ss := substep.GetStep()
if ss != nil && ss.Status == annopb.Status_FAILURE && ss.Name != "Failure reason" {
s.Text = append(s.Text, fmt.Sprintf("Failure %s", ss.Name))
}
}
}