| // Copyright 2019 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Command bbagent is Buildbucket's agent running in swarming. |
| // |
| // This executable creates a luciexe 'host' environment, and runs the |
| // Buildbucket build's exe within this environment. Please see |
| // https://go.chromium.org/luci/luciexe for details about the 'luciexe' |
| // protocol. |
| // |
| // This command is an implementation detail of Buildbucket. |
| package main |
| |
| import ( |
| "context" |
| "flag" |
| "fmt" |
| "log" |
| "os" |
| "os/exec" |
| "path" |
| "path/filepath" |
| "runtime" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "net/http" |
| _ "net/http/pprof" |
| |
| "github.com/golang/protobuf/jsonpb" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/protobuf/proto" |
| "google.golang.org/protobuf/types/known/durationpb" |
| "google.golang.org/protobuf/types/known/fieldmaskpb" |
| "google.golang.org/protobuf/types/known/timestamppb" |
| |
| "go.chromium.org/luci/buildbucket" |
| "go.chromium.org/luci/buildbucket/cmd/bbagent/bbinput" |
| bbpb "go.chromium.org/luci/buildbucket/proto" |
| "go.chromium.org/luci/buildbucket/protoutil" |
| "go.chromium.org/luci/cipd/client/cipd/platform" |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/data/stringset" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/logging/gologger" |
| "go.chromium.org/luci/common/retry" |
| "go.chromium.org/luci/common/sync/dispatcher" |
| "go.chromium.org/luci/common/system/environ" |
| "go.chromium.org/luci/grpc/grpcutil" |
| "go.chromium.org/luci/lucictx" |
| "go.chromium.org/luci/luciexe" |
| "go.chromium.org/luci/luciexe/host" |
| "go.chromium.org/luci/luciexe/invoke" |
| ) |
| |
| type closeOnceCh struct { |
| ch chan struct{} |
| once sync.Once |
| } |
| |
| func newCloseOnceCh() *closeOnceCh { |
| return &closeOnceCh{ |
| ch: make(chan struct{}), |
| once: sync.Once{}, |
| } |
| } |
| |
| func (c *closeOnceCh) close() { |
| c.once.Do(func() { close(c.ch) }) |
| } |
| |
| type clientInput struct { |
| bbclient BuildsClient |
| input *bbpb.BBAgentArgs |
| } |
| |
| // stopInfo contains different channels involved in causing the build to stop and shutdown |
| type stopInfo struct { |
| invokeErr chan error |
| shutdownCh *closeOnceCh |
| canceledBuildCh *closeOnceCh |
| dispatcherErrCh <-chan error |
| } |
| |
| // stopEvents monitors events that cause the build to stop. |
| // * Monitors the `dispatcherErrCh` and checks for fatal error. |
| // - Stops the build shuttling and shuts down the luciexe if a fatal error |
| // is received. |
| // |
| // * Monitors the returned build from UpdateBuild rpcs and checks if the |
| // |
| // build has been canceled. |
| // * Shuts down the luciexe if the build is canceled. |
| func (si stopInfo) stopEvents(ctx context.Context, c clientInput, fatalUpdateBuildErrorSlot *atomic.Value) { |
| stopped := false |
| for { |
| select { |
| case err := <-si.invokeErr: |
| checkReport(ctx, c, errors.Annotate(err, "could not invoke luciexe").Err()) |
| case err := <-si.dispatcherErrCh: |
| if !stopped && grpcutil.Code(err) == codes.InvalidArgument { |
| si.shutdownCh.close() |
| fatalUpdateBuildErrorSlot.Store(err) |
| stopped = true |
| } |
| case <-si.canceledBuildCh.ch: |
| // The build has been canceled, bail out early. |
| si.shutdownCh.close() |
| case <-ctx.Done(): |
| return |
| } |
| } |
| } |
| |
| func check(ctx context.Context, err error) { |
| if err != nil { |
| logging.Errorf(ctx, err.Error()) |
| os.Exit(1) |
| } |
| } |
| |
| // checkReport logs errors, tries to report them to buildbucket, then exits |
| func checkReport(ctx context.Context, c clientInput, err error) { |
| if err != nil { |
| logging.Errorf(ctx, err.Error()) |
| if _, bbErr := c.bbclient.UpdateBuild( |
| ctx, |
| &bbpb.UpdateBuildRequest{ |
| Build: &bbpb.Build{ |
| Id: c.input.Build.Id, |
| Status: bbpb.Status_INFRA_FAILURE, |
| SummaryMarkdown: fmt.Sprintf("fatal error in startup: %s", err), |
| }, |
| UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"build.status", "build.summary_markdown"}}, |
| }); bbErr != nil { |
| logging.Errorf(ctx, "Failed to report INFRA_FAILURE status to Buildbucket: %s", bbErr) |
| } |
| os.Exit(1) |
| } |
| } |
| |
| func cancelBuild(ctx context.Context, bbclient BuildsClient, bld *bbpb.Build) (retCode int) { |
| logging.Infof(ctx, "The build is in the cancel process, cancel time is %s. Actually cancel it now.", bld.CancelTime.AsTime()) |
| _, err := bbclient.UpdateBuild( |
| ctx, |
| &bbpb.UpdateBuildRequest{ |
| Build: &bbpb.Build{ |
| Id: bld.Id, |
| Status: bbpb.Status_CANCELED, |
| }, |
| UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"build.status"}}, |
| }) |
| if err != nil { |
| logging.Errorf(ctx, "failed to actually cancel the build: %s", err) |
| return 1 |
| } |
| return 0 |
| } |
| |
| func parseBbAgentArgs(ctx context.Context, arg string) (clientInput, *bbpb.BuildSecrets) { |
| // TODO(crbug/1219018): Remove CLI BBAgentArgs mode in favor of -host + -build-id. |
| logging.Debugf(ctx, "parsing BBAgentArgs") |
| input, err := bbinput.Parse(arg) |
| check(ctx, errors.Annotate(err, "could not unmarshal BBAgentArgs").Err()) |
| bbclient, secrets, err := newBuildsClient(ctx, input.Build.Infra.Buildbucket.GetHostname(), retry.Default) |
| check(ctx, errors.Annotate(err, "could not connect to Buildbucket").Err()) |
| return clientInput{bbclient, input}, secrets |
| } |
| |
| func parseHostBuildID(ctx context.Context, hostname *string, buildID *int64) (clientInput, *bbpb.BuildSecrets) { |
| logging.Debugf(ctx, "fetching build %d", *buildID) |
| bbclient, secrets, err := newBuildsClient(ctx, *hostname, defaultRetryStrategy) |
| check(ctx, errors.Annotate(err, "could not connect to Buildbucket").Err()) |
| // Get everything from the build. |
| // Here we use UpdateBuild instead of GetBuild, so that |
| // * bbagent can always get the build because of the build token. |
| // * This was not guaranteed for GetBuild, because it's possible that a |
| // service account has permission to run a build but doesn't have |
| // permission to view the build. |
| // * bbagent could tear down the build earlier if the parent build is canceled. |
| // TODO(crbug.com/1019272): we should also use this RPC to set the initial |
| // status of the build to STARTED (and also be prepared to quit in the case |
| // that this build got double-scheduled). |
| build, err := bbclient.UpdateBuild( |
| ctx, |
| &bbpb.UpdateBuildRequest{ |
| Build: &bbpb.Build{ |
| Id: *buildID, |
| }, |
| Mask: &bbpb.BuildMask{ |
| AllFields: true, |
| }, |
| }) |
| |
| check(ctx, errors.Annotate(err, "failed to fetch build").Err()) |
| input := &bbpb.BBAgentArgs{ |
| Build: build, |
| CacheDir: protoutil.CacheDir(build), |
| KnownPublicGerritHosts: build.Infra.Buildbucket.KnownPublicGerritHosts, |
| PayloadPath: protoutil.ExePayloadPath(build), |
| } |
| return clientInput{bbclient, input}, secrets |
| } |
| |
| // backendTaskInfoExists checks if the backend task info exists in build proto. |
| // |
| // Currently only check input.Infra.Swarming.BotDimensions. |
| // TODO(crbug.com/1370221): also check the info from input.Infra.Backend.Task. |
| func backendTaskInfoExists(ci clientInput) bool { |
| return len(ci.input.Build.GetInfra().GetSwarming().GetBotDimensions()) > 0 |
| } |
| |
| // backFillTaskInfo gets the task info from LUCI_CONTEXT and backs fill it to input.Build. |
| // |
| // Currently only read from swarming part of LUCI_CONTEXT and only update |
| // input.Infra.Swarming.BotDimensions. |
| // TODO(crbug.com/1370221): read LUCI_CONTEXT based on the backend task target, |
| // and then update input.Infra.Backend.Task. |
| func backFillTaskInfo(ctx context.Context, ci clientInput) int { |
| swarmingCtx := lucictx.GetSwarming(ctx) |
| if swarmingCtx == nil || swarmingCtx.GetTask() == nil || len(swarmingCtx.Task.GetBotDimensions()) == 0 { |
| logging.Errorf(ctx, "incomplete swarming context") |
| return 1 |
| } |
| |
| botDimensions := make([]*bbpb.StringPair, 0, len(swarmingCtx.Task.BotDimensions)) |
| for _, dim := range swarmingCtx.Task.BotDimensions { |
| parts := strings.SplitN(dim, ":", 2) |
| if len(parts) != 2 { |
| logging.Errorf(ctx, "bot_dimensions %s in swarming context is malformatted", dim) |
| continue |
| } |
| botDimensions = append(botDimensions, &bbpb.StringPair{Key: parts[0], Value: parts[1]}) |
| } |
| ci.input.Build.Infra.Swarming.BotDimensions = botDimensions |
| return 0 |
| } |
| |
| // prepareInputBuild sets status=STARTED and adds log entries |
| func prepareInputBuild(ctx context.Context, build, updatedBuild *bbpb.Build) { |
| build.Status = updatedBuild.Status |
| build.StartTime = updatedBuild.StartTime |
| build.UpdateTime = updatedBuild.UpdateTime |
| |
| // TODO(iannucci): this is sketchy, but we preemptively add the log entries |
| // for the top level user stdout/stderr streams. |
| // |
| // Really, `invoke.Start` is the one that knows how to arrange the |
| // Output.Logs, but host.Run makes a copy of this build immediately. Find |
| // a way to set these up nicely (maybe have opts.BaseBuild be a function |
| // returning an immutable bbpb.Build?). |
| build.Output = &bbpb.Build_Output{ |
| Logs: []*bbpb.Log{ |
| {Name: "stdout", Url: "stdout"}, |
| {Name: "stderr", Url: "stderr"}, |
| }, |
| } |
| populateSwarmingInfoFromEnv(build, environ.System()) |
| } |
| |
| // downloadInputs downloads CIPD and CAS inputs then updates the build. |
| func downloadInputs(ctx context.Context, cwd string, c clientInput) int { |
| // Most likely happens in `led get-build` process where it creates from an old build |
| // before new Agent field was there. This new feature shouldn't work for those builds. |
| if c.input.Build.Infra.Buildbucket.Agent == nil { |
| checkReport(ctx, c, errors.New("Cannot enable downloading cipd pkgs feature; Build Agent field is not set")) |
| } |
| |
| agent := c.input.Build.Infra.Buildbucket.Agent |
| agent.Output = &bbpb.BuildInfra_Buildbucket_Agent_Output{ |
| Status: bbpb.Status_STARTED, |
| } |
| updateReq := &bbpb.UpdateBuildRequest{ |
| Build: &bbpb.Build{ |
| Id: c.input.Build.Id, |
| Infra: &bbpb.BuildInfra{ |
| Buildbucket: &bbpb.BuildInfra_Buildbucket{ |
| Agent: agent, |
| }, |
| }, |
| }, |
| UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"build.infra.buildbucket.agent.output"}}, |
| Mask: readMask, |
| } |
| bldStartCipd, err := c.bbclient.UpdateBuild(ctx, updateReq) |
| if err != nil { |
| // Carry on and bear the non-fatal update failure. |
| logging.Warningf(ctx, "Failed to report build agent STARTED status: %s", err) |
| } |
| // The build has been canceled, bail out early. |
| if bldStartCipd.CancelTime != nil { |
| return cancelBuild(ctx, c.bbclient, bldStartCipd) |
| } |
| |
| agent.Output.AgentPlatform = platform.CurrentPlatform() |
| |
| // Encapsulate all the installation logic with a defer to set the |
| // TotalDuration. As we add more installation logic (e.g. RBE-CAS), |
| // TotalDuration should continue to surround that logic. |
| err = func() error { |
| start := clock.Now(ctx) |
| defer func() { |
| agent.Output.TotalDuration = &durationpb.Duration{ |
| Seconds: int64(clock.Since(ctx, start).Round(time.Second).Seconds()), |
| } |
| }() |
| if err := prependPath(c.input.Build, cwd); err != nil { |
| return err |
| } |
| if err := installCipdPackages(ctx, c.input.Build, cwd); err != nil { |
| return err |
| } |
| return downloadCasFiles(ctx, c.input.Build, cwd) |
| }() |
| |
| if err != nil { |
| logging.Errorf(ctx, "Failure in installing user packages: %s", err) |
| agent.Output.Status = bbpb.Status_FAILURE |
| agent.Output.SummaryHtml = err.Error() |
| updateReq.Build.Status = bbpb.Status_INFRA_FAILURE |
| updateReq.Build.SummaryMarkdown = "Failed to install user packages for this build" |
| updateReq.UpdateMask.Paths = append(updateReq.UpdateMask.Paths, "build.status", "build.summary_markdown") |
| } else { |
| agent.Output.Status = bbpb.Status_SUCCESS |
| if c.input.Build.Exe != nil { |
| updateReq.UpdateMask.Paths = append(updateReq.UpdateMask.Paths, "build.infra.buildbucket.agent.purposes") |
| } |
| } |
| |
| bldCompleteCipd, bbErr := c.bbclient.UpdateBuild(ctx, updateReq) |
| if bbErr != nil { |
| logging.Warningf(ctx, "Failed to report build agent output status: %s", bbErr) |
| } |
| if err != nil { |
| os.Exit(-1) |
| } |
| // The build has been canceled, bail out early. |
| if bldCompleteCipd.CancelTime != nil { |
| return cancelBuild(ctx, c.bbclient, bldCompleteCipd) |
| } |
| return 0 |
| } |
| |
| func resolveExe(path string) (string, error) { |
| if filepath.Ext(path) != "" { |
| return path, nil |
| } |
| |
| lme := errors.NewLazyMultiError(2) |
| for i, ext := range []string{".exe", ".bat"} { |
| candidate := path + ext |
| if _, err := os.Stat(candidate); !lme.Assign(i, err) { |
| return candidate, nil |
| } |
| } |
| |
| me := lme.Get().(errors.MultiError) |
| return path, errors.Reason("cannot find .exe (%q) or .bat (%q)", me[0], me[1]).Err() |
| } |
| |
| // processCmd resolves the cmd by constructing the absolute path and resolving |
| // the exe suffix. |
| func processCmd(path, cmd string) (string, error) { |
| relPath := filepath.Join(path, cmd) |
| absPath, err := filepath.Abs(relPath) |
| if err != nil { |
| return "", errors.Annotate(err, "absoluting %q", relPath).Err() |
| } |
| if runtime.GOOS == "windows" { |
| absPath, err = resolveExe(absPath) |
| if err != nil { |
| return "", errors.Annotate(err, "resolving %q", absPath).Err() |
| } |
| } |
| return absPath, nil |
| } |
| |
| // processExeArgs processes the given "Executable" message into a single command |
| // which bbagent will invoke as a luciexe. |
| // |
| // This includes resolving paths relative to the current working directory |
| // (expected to be the task's root). |
| func processExeArgs(ctx context.Context, c clientInput) []string { |
| exeArgs := make([]string, 0, len(c.input.Build.Exe.Wrapper)+len(c.input.Build.Exe.Cmd)+1) |
| |
| if len(c.input.Build.Exe.Wrapper) != 0 { |
| exeArgs = append(exeArgs, c.input.Build.Exe.Wrapper...) |
| exeArgs = append(exeArgs, "--") |
| |
| if strings.Contains(exeArgs[0], "/") || strings.Contains(exeArgs[0], "\\") { |
| absPath, err := filepath.Abs(exeArgs[0]) |
| checkReport(ctx, c, errors.Annotate(err, "absoluting wrapper path: %q", exeArgs[0]).Err()) |
| exeArgs[0] = absPath |
| } |
| |
| cmdPath, err := exec.LookPath(exeArgs[0]) |
| checkReport(ctx, c, errors.Annotate(err, "wrapper not found: %q", exeArgs[0]).Err()) |
| exeArgs[0] = cmdPath |
| } |
| |
| exeCmd := c.input.Build.Exe.Cmd[0] |
| payloadPath := c.input.PayloadPath |
| if len(c.input.Build.Exe.Cmd) == 0 { |
| // TODO(iannucci): delete me with ExecutablePath. |
| payloadPath, exeCmd = path.Split(c.input.ExecutablePath) |
| } else { |
| for p, purpose := range c.input.Build.GetInfra().GetBuildbucket().GetAgent().GetPurposes() { |
| if purpose == bbpb.BuildInfra_Buildbucket_Agent_PURPOSE_EXE_PAYLOAD { |
| payloadPath = p |
| break |
| } |
| } |
| } |
| exePath, err := processCmd(payloadPath, exeCmd) |
| checkReport(ctx, c, err) |
| exeArgs = append(exeArgs, exePath) |
| exeArgs = append(exeArgs, c.input.Build.Exe.Cmd[1:]...) |
| |
| return exeArgs |
| } |
| |
| // readyToFinalize returns true if fatalErr is nil and there's no additional |
| // errors finalizing the build. |
| func readyToFinalize(ctx context.Context, finalBuild *bbpb.Build, fatalErr error, statusDetails *bbpb.StatusDetails, outputFile *luciexe.OutputFlag) bool { |
| if statusDetails != nil { |
| if finalBuild.StatusDetails == nil { |
| finalBuild.StatusDetails = &bbpb.StatusDetails{} |
| } |
| proto.Merge(finalBuild.StatusDetails, statusDetails) |
| } |
| |
| // set final times |
| now := timestamppb.New(clock.Now(ctx)) |
| finalBuild.UpdateTime = now |
| finalBuild.EndTime = now |
| |
| var finalErrs errors.MultiError |
| if fatalErr != nil { |
| finalErrs = append(finalErrs, errors.Annotate(fatalErr, "fatal error in buildbucket.UpdateBuild").Err()) |
| } |
| if err := outputFile.Write(finalBuild); err != nil { |
| finalErrs = append(finalErrs, errors.Annotate(err, "writing final build").Err()) |
| } |
| |
| if len(finalErrs) > 0 { |
| errors.Log(ctx, finalErrs) |
| |
| // we had some really bad error, just downgrade status and add a message to |
| // summary markdown. |
| finalBuild.Status = bbpb.Status_INFRA_FAILURE |
| originalSM := finalBuild.SummaryMarkdown |
| finalBuild.SummaryMarkdown = fmt.Sprintf("FATAL: %s", finalErrs.Error()) |
| if originalSM != "" { |
| finalBuild.SummaryMarkdown += "\n\n" + originalSM |
| } |
| } |
| |
| return len(finalErrs) == 0 |
| } |
| |
| func finalizeBuild(ctx context.Context, bbclient BuildsClient, finalBuild *bbpb.Build, updateMask []string) int { |
| var retcode int |
| // No need to check the returned build here because it's already finalizing |
| // the build. |
| _, bbErr := bbclient.UpdateBuild( |
| ctx, |
| &bbpb.UpdateBuildRequest{ |
| Build: finalBuild, |
| UpdateMask: &fieldmaskpb.FieldMask{Paths: updateMask}, |
| }) |
| if bbErr != nil { |
| logging.Errorf(ctx, "Failed to update Buildbucket due to %s", bbErr) |
| retcode = 2 |
| } |
| return retcode |
| } |
| |
| func mainImpl() int { |
| ctx := logging.SetLevel(gologger.StdConfig.Use(context.Background()), logging.Info) |
| |
| hostname := flag.String("host", "", "Buildbucket server hostname") |
| buildID := flag.Int64("build-id", 0, "Buildbucket build ID") |
| useGCEAccount := flag.Bool("use-gce-account", false, "Use GCE metadata service account for all calls") |
| outputFile := luciexe.AddOutputFlagToSet(flag.CommandLine) |
| |
| flag.Parse() |
| args := flag.Args() |
| |
| if *useGCEAccount { |
| ctx = setLocalAuth(ctx) |
| } |
| |
| var bbclientInput clientInput |
| var secrets *bbpb.BuildSecrets |
| var err error |
| |
| switch { |
| case len(args) == 1: |
| bbclientInput, secrets = parseBbAgentArgs(ctx, args[0]) |
| case *hostname != "" && *buildID > 0: |
| bbclientInput, secrets = parseHostBuildID(ctx, hostname, buildID) |
| default: |
| check(ctx, errors.Reason("-host and -build-id are required").Err()) |
| } |
| |
| // Manipulate the context and obtain a context with cancel |
| ctx = setBuildbucketContext(ctx, hostname, secrets) |
| ctx = setRealmContext(ctx, bbclientInput.input) |
| |
| logdogOutput, err := mkLogdogOutput(ctx, bbclientInput.input.Build.Infra.Logdog) |
| check(ctx, errors.Annotate(err, "could not create logdog output").Err()) |
| |
| var ( |
| cctx context.Context |
| cancel func() |
| ) |
| if dl := lucictx.GetDeadline(ctx); dl.GetSoftDeadline() != 0 { |
| softDeadline := dl.SoftDeadlineTime() |
| gracePeriod := time.Duration(dl.GetGracePeriod() * float64(time.Second)) |
| cctx, cancel = context.WithDeadline(ctx, softDeadline.Add(gracePeriod)) |
| } else { |
| cctx, cancel = context.WithCancel(ctx) |
| } |
| defer cancel() |
| |
| // We send a single status=STARTED here, and will send the final build status |
| // after the user executable completes. |
| updatedBuild, err := bbclientInput.bbclient.UpdateBuild( |
| cctx, |
| &bbpb.UpdateBuildRequest{ |
| Build: &bbpb.Build{ |
| Id: bbclientInput.input.Build.Id, |
| Status: bbpb.Status_STARTED, |
| }, |
| UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"build.status"}}, |
| Mask: readMask, |
| }) |
| check(ctx, errors.Annotate(err, "failed to report status STARTED to Buildbucket").Err()) |
| |
| // The build has been canceled, bail out early. |
| if updatedBuild.CancelTime != nil { |
| return cancelBuild(ctx, bbclientInput.bbclient, updatedBuild) |
| } |
| |
| prepareInputBuild(cctx, bbclientInput.input.Build, updatedBuild) |
| |
| cctx = setResultDBContext(cctx, bbclientInput.input.Build, secrets) |
| |
| // TODO(crbug.com/1211789) - As part of adding 'dry_run' functionality |
| // to ScheduleBuild, it was necessary to start saving `tags` in the |
| // Build message (previously they were ephemeral in the datastore model). |
| // This had the side effect that setting host.Options.BaseBuild = input.Build |
| // would cause bbagent to regurgitate input tags back to Buildbucket. |
| // |
| // Normally this would be fine (Buildbucket would deduplicate them), |
| // except in the case of some special tags (like "buildset"). |
| // |
| // We strip the input tags here just for host.Options.BaseBuild to |
| // avoid this scenario; however it has the side effect that led tasks |
| // which are scheduled directly on Swarming will not show the tags on |
| // the Milo UI. When led jobs eventually become "real" buildbucket jobs |
| // this discrepancy would go away (and it may also make sense to remove |
| // BaseBuild from host.Options, since it really only needs to carry the |
| // user-code-generated-delta at that point). |
| hostOptionsBaseBuild := proto.Clone(bbclientInput.input.Build).(*bbpb.Build) |
| hostOptionsBaseBuild.Tags = nil |
| |
| opts := &host.Options{ |
| BaseBuild: hostOptionsBaseBuild, |
| ButlerLogLevel: logging.Warning, |
| // TODO(crbug.com/1219086) - generate a correct URL for LED tasks. |
| ViewerURL: fmt.Sprintf("https://%s/build/%d", |
| bbclientInput.input.Build.Infra.Buildbucket.Hostname, bbclientInput.input.Build.Id), |
| LogdogOutput: logdogOutput, |
| ExeAuth: host.DefaultExeAuth("bbagent", bbclientInput.input.KnownPublicGerritHosts), |
| } |
| cwd, err := os.Getwd() |
| checkReport(ctx, bbclientInput, errors.Annotate(err, "getting cwd").Err()) |
| opts.BaseDir = filepath.Join(cwd, "x") |
| |
| initialJSONPB, err := (&jsonpb.Marshaler{ |
| OrigName: true, Indent: " ", |
| }).MarshalToString(bbclientInput.input) |
| checkReport(ctx, bbclientInput, errors.Annotate(err, "marshalling input args").Err()) |
| logging.Infof(ctx, "Input args:\n%s", initialJSONPB) |
| |
| // Downloading cipd packages if any. |
| if stringset.NewFromSlice(bbclientInput.input.Build.Input.Experiments...).Has(buildbucket.ExperimentBBAgentDownloadCipd) { |
| if retcode := downloadInputs(cctx, cwd, bbclientInput); retcode != 0 { |
| return retcode |
| } |
| } |
| |
| // Backfill backend task info if they are missing. |
| if !backendTaskInfoExists(bbclientInput) { |
| if retcode := backFillTaskInfo(ctx, bbclientInput); retcode != 0 { |
| return retcode |
| } |
| } |
| |
| exeArgs := processExeArgs(ctx, bbclientInput) |
| dispatcherOpts, dispatcherErrCh := channelOpts(cctx) |
| canceledBuildCh := newCloseOnceCh() |
| invokeErr := make(chan error) |
| // Use a dedicated BuildsClient for dispatcher, which turns off retries. |
| // dispatcher.Channel will handle retries instead. |
| bbclientForDispatcher, _, err := newBuildsClient(cctx, bbclientInput.input.Build.Infra.Buildbucket.GetHostname(), func() retry.Iterator { return nil }) |
| if err != nil { |
| checkReport(ctx, bbclientInput, errors.Annotate(err, "could not connect to Buildbucket").Err()) |
| } |
| buildsCh, err := dispatcher.NewChannel(cctx, dispatcherOpts, mkSendFn(cctx, bbclientForDispatcher, bbclientInput.input.Build.Id, canceledBuildCh)) |
| checkReport(ctx, bbclientInput, errors.Annotate(err, "could not create builds dispatcher channel").Err()) |
| defer buildsCh.CloseAndDrain(cctx) |
| |
| shutdownCh := newCloseOnceCh() |
| var statusDetails *bbpb.StatusDetails |
| var subprocErr error |
| builds, err := host.Run(cctx, opts, func(ctx context.Context, hostOpts host.Options) { |
| logging.Infof(ctx, "running luciexe: %q", exeArgs) |
| logging.Infof(ctx, " (cache dir): %q", bbclientInput.input.CacheDir) |
| invokeOpts := &invoke.Options{ |
| BaseDir: hostOpts.BaseDir, |
| CacheDir: bbclientInput.input.CacheDir, |
| Env: environ.System(), |
| } |
| experiments := stringset.NewFromSlice(bbclientInput.input.Build.Input.Experiments...) |
| nopy2 := experiments.Has(buildbucket.ExperimentOmitPython2) |
| if nopy2 { |
| invokeOpts.Env.Set("LUCI_OMIT_PYTHON2", "true") |
| } |
| if nopy2 || experiments.Has(buildbucket.ExperimentRecipePY3) { |
| invokeOpts.Env.Set("RECIPES_USE_PY3", "true") |
| } |
| // Buildbucket assigns some grace period to the surrounding task which is |
| // more than what the user requested in `input.Build.GracePeriod`. We |
| // reserve the difference here so the user task only gets what they asked |
| // for. |
| deadline := lucictx.GetDeadline(ctx) |
| toReserve := deadline.GracePeriodDuration() - bbclientInput.input.Build.GracePeriod.AsDuration() |
| logging.Infof( |
| ctx, "Reserving %s out of %s of grace_period from LUCI_CONTEXT.", |
| toReserve, lucictx.GetDeadline(ctx).GracePeriodDuration()) |
| dctx, shutdown := lucictx.TrackSoftDeadline(ctx, toReserve) |
| go func() { |
| select { |
| case <-shutdownCh.ch: |
| shutdown() |
| case <-dctx.Done(): |
| } |
| }() |
| defer close(invokeErr) |
| subp, err := invoke.Start(dctx, exeArgs, bbclientInput.input.Build, invokeOpts) |
| if err != nil { |
| invokeErr <- err |
| return |
| } |
| |
| var build *bbpb.Build |
| build, subprocErr = subp.Wait() |
| statusDetails = build.StatusDetails |
| }) |
| if err != nil { |
| checkReport(ctx, bbclientInput, errors.Annotate(err, "could not start luciexe host environment").Err()) |
| } |
| |
| var ( |
| finalBuild *bbpb.Build = proto.Clone(bbclientInput.input.Build).(*bbpb.Build) |
| fatalUpdateBuildErrorSlot atomic.Value |
| ) |
| |
| si := stopInfo{ |
| invokeErr, |
| shutdownCh, |
| canceledBuildCh, |
| dispatcherErrCh, |
| } |
| |
| go si.stopEvents(cctx, bbclientInput, &fatalUpdateBuildErrorSlot) |
| |
| // Now all we do is shuttle builds through to the buildbucket client channel |
| // until there are no more builds to shuttle. |
| for build := range builds { |
| if fatalUpdateBuildErrorSlot.Load() == nil { |
| buildsCh.C <- build |
| finalBuild = build |
| } |
| } |
| buildsCh.CloseAndDrain(cctx) |
| |
| // Now that the builds channel has been closed, update bb directly. |
| updateMask := []string{ |
| "build.status", |
| "build.status_details", |
| "build.summary_markdown", |
| } |
| var retcode int |
| |
| fatalUpdateBuildErr, _ := fatalUpdateBuildErrorSlot.Load().(error) |
| if readyToFinalize(cctx, finalBuild, fatalUpdateBuildErr, statusDetails, outputFile) { |
| updateMask = append(updateMask, "build.steps", "build.output") |
| } else { |
| // readyToFinalize indicated that something is really wrong; Omit steps and |
| // output from the final push to minimize potential issues. |
| retcode = 1 |
| } |
| |
| retcode = finalizeBuild(cctx, bbclientInput.bbclient, finalBuild, updateMask) |
| |
| if retcode == 0 && subprocErr != nil { |
| errors.Walk(subprocErr, func(err error) bool { |
| exit, ok := err.(*exec.ExitError) |
| if ok { |
| retcode = exit.ExitCode() |
| logging.Infof(cctx, "Returning exit code from user subprocess: %d", retcode) |
| } |
| return !ok |
| }) |
| if retcode == 0 { |
| retcode = 3 |
| logging.Errorf(cctx, "Non retcode-containing error from user subprocess: %s", subprocErr) |
| } |
| } |
| return retcode |
| } |
| |
| func main() { |
| go func() { |
| // serves "/debug" endpoints for pprof. |
| log.Println(http.ListenAndServe("localhost:6060", nil)) |
| }() |
| os.Exit(mainImpl()) |
| } |