| // Copyright 2022 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package tasks |
| |
| import ( |
| "context" |
| "encoding/base64" |
| "encoding/json" |
| "fmt" |
| "io" |
| "path/filepath" |
| "sort" |
| "strconv" |
| "strings" |
| "time" |
| |
| "github.com/google/uuid" |
| "google.golang.org/api/googleapi" |
| "google.golang.org/protobuf/proto" |
| |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/retry/transient" |
| "go.chromium.org/luci/gae/service/datastore" |
| "go.chromium.org/luci/gae/service/info" |
| "go.chromium.org/luci/server/auth/realms" |
| "go.chromium.org/luci/server/caching" |
| "go.chromium.org/luci/server/tq" |
| apipb "go.chromium.org/luci/swarming/proto/api_v2" |
| |
| "go.chromium.org/luci/buildbucket" |
| "go.chromium.org/luci/buildbucket/appengine/internal/buildstatus" |
| "go.chromium.org/luci/buildbucket/appengine/internal/buildtoken" |
| "go.chromium.org/luci/buildbucket/appengine/internal/clients" |
| "go.chromium.org/luci/buildbucket/appengine/internal/metrics" |
| "go.chromium.org/luci/buildbucket/appengine/model" |
| taskdefs "go.chromium.org/luci/buildbucket/appengine/tasks/defs" |
| "go.chromium.org/luci/buildbucket/cmd/bbagent/bbinput" |
| pb "go.chromium.org/luci/buildbucket/proto" |
| "go.chromium.org/luci/buildbucket/protoutil" |
| ) |
| |
| const ( |
| // cacheDir is the path, relative to the swarming run dir, to the directory that |
| // contains the mounted swarming named caches. It will be prepended to paths of |
| // caches defined in global or builder configs. |
| cacheDir = "cache" |
| |
| // pubsubTopicTemplate is the topic template where Swarming publishes |
| // notifications on the task update. |
| pubsubTopicTemplate = "projects/%s/topics/swarming-go" |
| |
| // swarmingCreateTaskGiveUpTimeout indicates how long to retry |
| // the createSwarmingTask before giving up with INFRA_FAILURE. |
| swarmingCreateTaskGiveUpTimeout = 10 * 60 * time.Second |
| |
| // swarmingTimeFormat is time format used by swarming. |
| swarmingTimeFormat = "2006-01-02T15:04:05.999999999" |
| ) |
| |
| // userdata will be sent back and forth between Swarming and Buildbucket. |
| type userdata struct { |
| BuildID int64 `json:"build_id"` |
| CreatedTS int64 `json:"created_ts"` |
| SwarmingHostname string `json:"swarming_hostname"` |
| } |
| |
| // notification captures all fields that Buildbucket needs from the message of Swarming notification subscription. |
| type notification struct { |
| messageID string |
| taskID string |
| *userdata |
| } |
| |
| // SyncBuild synchronizes the build with Swarming. |
| // If the swarming task does not exist yet, creates it. |
| // Otherwise, updates the build state to match swarming task state. |
| // Enqueues a new sync push task if the build did not end. |
| // |
| // Cloud tasks handler will retry the task if any error is thrown, unless it's |
| // tagged with tq.Fatal. |
| func SyncBuild(ctx context.Context, buildID int64, generation int64) error { |
| bld := &model.Build{ID: buildID} |
| infra := &model.BuildInfra{Build: datastore.KeyForObj(ctx, bld)} |
| switch err := datastore.Get(ctx, bld, infra); { |
| case errors.Contains(err, datastore.ErrNoSuchEntity): |
| return tq.Fatal.Apply(errors.Annotate(err, "build %d or buildInfra not found", buildID).Err()) |
| case err != nil: |
| return transient.Tag.Apply(errors.Annotate(err, "failed to fetch build %d or buildInfra", buildID).Err()) |
| } |
| if protoutil.IsEnded(bld.Status) { |
| logging.Infof(ctx, "build %d is ended", buildID) |
| return nil |
| } |
| if clock.Now(ctx).Sub(bld.CreateTime) > model.BuildMaxCompletionTime { |
| logging.Infof(ctx, "build %d (create_time:%s) has passed the sync deadline: %s", buildID, bld.CreateTime, model.BuildMaxCompletionTime) |
| return nil |
| } |
| |
| bld.Proto.Infra = infra.Proto |
| swarm, err := clients.NewSwarmingClient(ctx, infra.Proto.Swarming.Hostname, bld.Project) |
| if err != nil { |
| logging.Errorf(ctx, "failed to create a swarming client for build %d (%s), in %s: %s", buildID, bld.Project, infra.Proto.Swarming.Hostname, err) |
| return failBuild(ctx, buildID, fmt.Sprintf("failed to create a swarming client:%s", err)) |
| } |
| if bld.Proto.Infra.Swarming.TaskId == "" { |
| if err := createSwarmingTask(ctx, bld, swarm); err != nil { |
| // Mark build as Infra_failure for fatal and non-retryable errors. |
| if tq.Fatal.In(err) { |
| return failBuild(ctx, bld.ID, err.Error()) |
| } |
| return err |
| } |
| } else { |
| if err := syncBuildWithTaskResult(ctx, bld.ID, bld.Proto.Infra.Swarming.TaskId, swarm); err != nil { |
| // Tq should retry non-fatal errors. |
| if !tq.Fatal.In(err) { |
| return err |
| } |
| // For fatal errors, we just log it and continue to the part of enqueueing |
| // the next generation sync task. |
| logging.Errorf(ctx, "Dropping the sync task due to the fatal error: %s", err) |
| } |
| } |
| |
| // Enqueue a continuation sync task in 5m. |
| if clock.Now(ctx).Sub(bld.CreateTime) < model.BuildMaxCompletionTime { |
| if err := SyncSwarmingBuildTask(ctx, &taskdefs.SyncSwarmingBuildTask{BuildId: buildID, Generation: generation + 1}, 5*time.Minute); err != nil { |
| return transient.Tag.Apply(errors.Annotate(err, "failed to enqueue the continuation sync task for build %d", buildID).Err()) |
| } |
| } |
| return nil |
| } |
| |
| // SubNotify handles swarming-go PubSub push messages produced by Swarming. |
| // For a retryable error, it will be tagged with transient.Tag. |
| func SubNotify(ctx context.Context, body io.Reader) error { |
| nt, err := unpackMsg(ctx, body) |
| if err != nil { |
| return err |
| } |
| // TODO(crbug/1328646): delete the log once the new Go flow becomes stable. |
| logging.Infof(ctx, "Received message - messageID:%s, taskID:%s, userdata:%+v", nt.messageID, nt.taskID, nt.userdata) |
| |
| // Try not to process same message more than once. |
| cache := caching.GlobalCache(ctx, "swarming-pubsub-msg-id") |
| if cache == nil { |
| return errors.Reason("global cache is not found").Tag(transient.Tag).Err() |
| } |
| msgCached, err := cache.Get(ctx, nt.messageID) |
| switch { |
| case err == caching.ErrCacheMiss: // no-op, continue |
| case err != nil: |
| return errors.Annotate(err, "failed to read %s from the global cache", nt.messageID).Tag(transient.Tag).Err() |
| case msgCached != nil: |
| logging.Infof(ctx, "seen this message %s before, ignoring", nt.messageID) |
| return nil |
| } |
| |
| taskURL := func(hostname, taskID string) string { |
| return fmt.Sprintf("https://%s/task?id=%s", hostname, taskID) |
| } |
| // load build and build infra. |
| logging.Infof(ctx, "received swarming notification for build %d", nt.BuildID) |
| bld := &model.Build{ID: nt.BuildID} |
| infra := &model.BuildInfra{Build: datastore.KeyForObj(ctx, bld)} |
| switch err := datastore.Get(ctx, bld, infra); { |
| case errors.Contains(err, datastore.ErrNoSuchEntity): |
| if clock.Now(ctx).Sub(time.Unix(0, nt.CreatedTS*int64(time.Microsecond)).UTC()) < time.Minute { |
| return errors.Annotate(err, "Build %d or BuildInfra for task %s not found yet", nt.BuildID, taskURL(nt.SwarmingHostname, nt.taskID)).Tag(transient.Tag).Err() |
| } |
| return errors.Annotate(err, "Build %d or BuildInfra for task %s not found", nt.BuildID, taskURL(nt.SwarmingHostname, nt.taskID)).Err() |
| case err != nil: |
| return errors.Annotate(err, "failed to fetch build %d or buildInfra", nt.BuildID).Tag(transient.Tag).Err() |
| } |
| if protoutil.IsEnded(bld.Status) { |
| logging.Infof(ctx, "build(%d) is completed and immutable.", nt.BuildID) |
| return nil |
| } |
| |
| // ensure the loaded build is associated with the task. |
| bld.Proto.Infra = infra.Proto |
| sw := bld.Proto.GetInfra().GetSwarming() |
| if nt.SwarmingHostname != sw.GetHostname() { |
| return errors.Reason("swarming_hostname %s of build %d does not match %s", sw.Hostname, nt.BuildID, nt.SwarmingHostname).Err() |
| } |
| if strings.TrimSpace(sw.GetTaskId()) == "" { |
| return errors.Reason("build %d is not associated with a task", nt.BuildID).Tag(transient.Tag).Err() |
| } |
| if nt.taskID != sw.GetTaskId() { |
| return errors.Reason("swarming_task_id %s of build %d does not match %s", sw.TaskId, nt.BuildID, nt.taskID).Err() |
| } |
| |
| // update build. |
| swarm, err := clients.NewSwarmingClient(ctx, sw.Hostname, bld.Project) |
| if err != nil { |
| return errors.Annotate(err, "failed to create a swarming client for build %d (%s), in %s", nt.BuildID, bld.Project, sw.Hostname).Err() |
| } |
| if err := syncBuildWithTaskResult(ctx, nt.BuildID, sw.TaskId, swarm); err != nil { |
| return err |
| } |
| |
| return cache.Set(ctx, nt.messageID, []byte{1}, 10*time.Minute) |
| } |
| |
| func HandleCancelSwarmingTask(ctx context.Context, hostname string, taskID string, realm string) error { |
| // Validate |
| switch err := realms.ValidateRealmName(realm, realms.GlobalScope); { |
| case err != nil: |
| return tq.Fatal.Apply(err) |
| case hostname == "": |
| return errors.Reason("hostname is empty").Tag(tq.Fatal).Err() |
| case taskID == "": |
| return errors.Reason("taskID is empty").Tag(tq.Fatal).Err() |
| } |
| |
| // Send the cancellation request. |
| project, _ := realms.Split(realm) |
| swarm, err := clients.NewSwarmingClient(ctx, hostname, project) |
| if err != nil { |
| return errors.Annotate(err, "failed to create a swarming client for task %s in %s", taskID, hostname).Tag(tq.Fatal).Err() |
| } |
| res, err := swarm.CancelTask(ctx, &apipb.TaskCancelRequest{KillRunning: true, TaskId: taskID}) |
| if err != nil { |
| if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code >= 500 { |
| return errors.Annotate(err, "transient error in cancelling the task %s", taskID).Tag(transient.Tag).Err() |
| } |
| return errors.Annotate(err, "fatal error in cancelling the task %s", taskID).Tag(tq.Fatal).Err() |
| } |
| |
| // Non-Canceled in the body indicates the task may have already ended. Hence, just logging it. |
| if !res.Canceled { |
| logging.Warningf(ctx, "Swarming response for cancelling task %s: %+v", taskID, res) |
| } |
| return nil |
| } |
| |
| // unpackMsg unpacks swarming-go pubsub message and extracts message id, |
| // swarming hostname, creation time, task id and build id. |
| func unpackMsg(ctx context.Context, body io.Reader) (*notification, error) { |
| blob, err := io.ReadAll(body) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to read the request body").Tag(transient.Tag).Err() |
| } |
| |
| // process pubsub message |
| // See https://cloud.google.com/pubsub/docs/push#receive_push |
| var msg struct { |
| Message struct { |
| Attributes map[string]string `json:"attributes,omitempty"` |
| Data string `json:"data,omitempty"` |
| MessageID string `json:"messageId,omitempty"` |
| } `json:"message"` |
| } |
| if err := json.Unmarshal(blob, &msg); err != nil { |
| return nil, errors.Annotate(err, "failed to unmarshal swarming PubSub message").Err() |
| } |
| |
| // process swarming message data |
| swarmData, err := base64.StdEncoding.DecodeString(msg.Message.Data) |
| if err != nil { |
| return nil, errors.Annotate(err, "cannot decode message data as base64").Err() |
| } |
| var data struct { |
| TaskID string `json:"task_id"` |
| Userdata string `json:"userdata"` |
| } |
| if err := json.Unmarshal(swarmData, &data); err != nil { |
| return nil, errors.Annotate(err, "failed to unmarshal the swarming pubsub data").Err() |
| } |
| ud := &userdata{} |
| if err := json.Unmarshal([]byte(data.Userdata), ud); err != nil { |
| return nil, errors.Annotate(err, "failed to unmarshal userdata").Err() |
| } |
| |
| // validate swarming message data |
| switch { |
| case strings.TrimSpace(data.TaskID) == "": |
| return nil, errors.Reason("task_id not found in message data").Err() |
| case ud.BuildID <= 0: |
| return nil, errors.Reason("invalid build_id %d", ud.BuildID).Err() |
| case ud.CreatedTS <= 0: |
| return nil, errors.Reason("invalid created_ts %d", ud.CreatedTS).Err() |
| case strings.TrimSpace(ud.SwarmingHostname) == "": |
| return nil, errors.Reason("swarming hostname not found in userdata").Err() |
| case strings.Contains(ud.SwarmingHostname, "://"): |
| return nil, errors.Reason("swarming hostname %s must not contain '://'", ud.SwarmingHostname).Err() |
| } |
| |
| return ¬ification{ |
| messageID: msg.Message.MessageID, |
| taskID: data.TaskID, |
| userdata: ud, |
| }, nil |
| } |
| |
| // syncBuildWithTaskResult syncs Build entity in the datastore with a result of the swarming task. |
| func syncBuildWithTaskResult(ctx context.Context, buildID int64, taskID string, swarm clients.SwarmingClient) error { |
| taskResult, err := swarm.GetTaskResult(ctx, taskID) |
| if err != nil { |
| logging.Errorf(ctx, "failed to fetch swarming task %s for build %d: %s", taskID, buildID, err) |
| if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code >= 400 && apiErr.Code < 500 { |
| return failBuild(ctx, buildID, fmt.Sprintf("invalid swarming task %s", taskID)) |
| } |
| return transient.Tag.Apply(err) |
| } |
| if taskResult == nil { |
| return failBuild(ctx, buildID, fmt.Sprintf("Swarming task %s unexpectedly disappeared", taskID)) |
| } |
| |
| var statusChanged bool |
| bld := &model.Build{ |
| ID: buildID, |
| } |
| err = datastore.RunInTransaction(ctx, func(ctx context.Context) (err error) { |
| infra := &model.BuildInfra{Build: datastore.KeyForObj(ctx, bld)} |
| if err := datastore.Get(ctx, bld, infra); err != nil { |
| return transient.Tag.Apply(errors.Annotate(err, "failed to fetch build or buildInfra: %d", bld.ID).Err()) |
| } |
| |
| if protoutil.IsEnded(bld.Status) { |
| return nil |
| } |
| if bld.Status == pb.Status_STARTED && taskResult.State == apipb.TaskState_PENDING { |
| // Most probably, race between PubSub push handler and Cron job. |
| // With swarming, a build cannot go from STARTED back to PENDING, |
| // so ignore this. |
| return nil |
| } |
| |
| botDimsChanged := updateBotDimensions(infra, taskResult) |
| |
| bs, steps, err := updateBuildStatusFromTaskResult(ctx, bld, taskResult) |
| if err != nil { |
| return tq.Fatal.Apply(err) |
| } |
| |
| shouldUpdate := false |
| if bs != nil { |
| shouldUpdate = true |
| statusChanged = true |
| } |
| if bs == nil && botDimsChanged && bld.Proto.Status == pb.Status_STARTED { |
| shouldUpdate = true |
| } |
| if !shouldUpdate { |
| return nil |
| } |
| |
| toPut := []any{bld, infra} |
| if bs != nil { |
| toPut = append(toPut, bs) |
| } |
| if steps != nil { |
| toPut = append(toPut, steps) |
| } |
| return transient.Tag.Apply(datastore.Put(ctx, toPut...)) |
| }, nil) |
| |
| switch { |
| case err != nil: |
| case !statusChanged: |
| case bld.Status == pb.Status_STARTED: |
| metrics.BuildStarted(ctx, bld) |
| case protoutil.IsEnded(bld.Status): |
| metrics.BuildCompleted(ctx, bld) |
| } |
| return err |
| } |
| |
| // updateBotDimensions mutates the infra entity to update the bot dimensions |
| // according to the given task result. |
| // Note, it will not write the entities into Datastore. |
| func updateBotDimensions(infra *model.BuildInfra, taskResult *apipb.TaskResultResponse) bool { |
| sw := infra.Proto.Swarming |
| botDimsChanged := false |
| |
| // Update BotDimensions |
| oldBotDimsMap := protoutil.StringPairMap(sw.BotDimensions) |
| newBotDims := []*pb.StringPair{} |
| for _, dim := range taskResult.BotDimensions { |
| for _, v := range dim.Value { |
| if !botDimsChanged && !oldBotDimsMap.Contains(dim.Key, v) { |
| botDimsChanged = true |
| } |
| newBotDims = append(newBotDims, &pb.StringPair{Key: dim.Key, Value: v}) |
| } |
| } |
| if len(newBotDims) != len(sw.BotDimensions) { |
| botDimsChanged = true |
| } |
| sw.BotDimensions = newBotDims |
| |
| sort.Slice(sw.BotDimensions, func(i, j int) bool { |
| if sw.BotDimensions[i].Key == sw.BotDimensions[j].Key { |
| return sw.BotDimensions[i].Value < sw.BotDimensions[j].Value |
| } |
| return sw.BotDimensions[i].Key < sw.BotDimensions[j].Key |
| }) |
| return botDimsChanged |
| } |
| |
| // updateBuildStatusFromTaskResult mutates the build entity to update the top |
| // level status, and also the update time of the build. |
| // Note, it will not write the entities into Datastore. |
| func updateBuildStatusFromTaskResult(ctx context.Context, bld *model.Build, taskResult *apipb.TaskResultResponse) (bs *model.BuildStatus, steps *model.BuildSteps, err error) { |
| now := clock.Now(ctx) |
| oldStatus := bld.Status |
| // A helper function to correctly set Build ended status from taskResult. It |
| // corrects the build start_time only if start_time is empty and taskResult |
| // has start_ts populated. |
| setEndStatus := func(st pb.Status, details *pb.StatusDetails) { |
| if !protoutil.IsEnded(st) { |
| return |
| } |
| if bld.Proto.StartTime == nil && taskResult.StartedTs.AsTime().Unix() != 0 { |
| startTime := taskResult.StartedTs.AsTime() |
| // Backfill build start time. |
| protoutil.SetStatus(startTime, bld.Proto, pb.Status_STARTED) |
| } |
| |
| endTime := now |
| if t := taskResult.CompletedTs; t != nil { |
| endTime = t.AsTime() |
| } else if t := taskResult.AbandonedTs; t != nil { |
| endTime = t.AsTime() |
| } |
| // It is possible that swarming task was marked as NO_RESOURCE the moment |
| // it was created. Swarming VM time is not synchronized with buildbucket VM |
| // time, so adjust end_time if needed. |
| if endTime.Before(bld.Proto.CreateTime.AsTime()) { |
| endTime = bld.Proto.CreateTime.AsTime() |
| } |
| |
| stWithDetails := &buildstatus.StatusWithDetails{Status: st, Details: details} |
| bs, steps, err = updateBuildStatusOnTaskStatusChange(ctx, bld, stWithDetails, stWithDetails, endTime) |
| } |
| |
| // Update build status |
| switch taskResult.State { |
| case apipb.TaskState_PENDING: |
| if bld.Status == pb.Status_STATUS_UNSPECIFIED { |
| // Scheduled Build should have SCHEDULED status already, so in theory this |
| // should not happen. |
| // Adding a log to confirm this. |
| logging.Debugf(ctx, "build %d has unspecified status, setting it to pending", bld.ID) |
| protoutil.SetStatus(now, bld.Proto, pb.Status_SCHEDULED) |
| } else { |
| // Most probably, race between PubSub push handler and Cron job. |
| // With swarming, a build cannot go from STARTED/ended back to PENDING, |
| // so ignore this. |
| return |
| } |
| case apipb.TaskState_RUNNING: |
| updateTime := now |
| if t := taskResult.StartedTs; t != nil { |
| updateTime = t.AsTime() |
| } |
| stWithDetails := &buildstatus.StatusWithDetails{Status: pb.Status_STARTED} |
| bs, steps, err = updateBuildStatusOnTaskStatusChange(ctx, bld, stWithDetails, stWithDetails, updateTime) |
| case apipb.TaskState_CANCELED, apipb.TaskState_KILLED: |
| setEndStatus(pb.Status_CANCELED, nil) |
| case apipb.TaskState_NO_RESOURCE: |
| setEndStatus(pb.Status_INFRA_FAILURE, &pb.StatusDetails{ |
| ResourceExhaustion: &pb.StatusDetails_ResourceExhaustion{}, |
| }) |
| case apipb.TaskState_EXPIRED: |
| setEndStatus(pb.Status_INFRA_FAILURE, &pb.StatusDetails{ |
| ResourceExhaustion: &pb.StatusDetails_ResourceExhaustion{}, |
| Timeout: &pb.StatusDetails_Timeout{}, |
| }) |
| case apipb.TaskState_TIMED_OUT: |
| setEndStatus(pb.Status_INFRA_FAILURE, &pb.StatusDetails{ |
| Timeout: &pb.StatusDetails_Timeout{}, |
| }) |
| case apipb.TaskState_BOT_DIED, apipb.TaskState_CLIENT_ERROR: |
| // BB only supplies bbagent CIPD packages in a task, no other user packages. |
| // So the CLIENT_ERROR task state should be treated as build INFRA_FAILURE. |
| setEndStatus(pb.Status_INFRA_FAILURE, nil) |
| case apipb.TaskState_COMPLETED: |
| if taskResult.Failure { |
| switch bld.Proto.Output.GetStatus() { |
| case pb.Status_FAILURE: |
| setEndStatus(pb.Status_FAILURE, nil) |
| case pb.Status_CANCELED: |
| setEndStatus(pb.Status_CANCELED, nil) |
| default: |
| //If this truly was a non-infra failure, bbagent would catch that and |
| //mark the build as FAILURE. |
| //That did not happen, so this is an infra failure. |
| setEndStatus(pb.Status_INFRA_FAILURE, nil) |
| } |
| } else { |
| finalStatus := pb.Status_SUCCESS |
| if protoutil.IsEnded(bld.Proto.Output.GetStatus()) { |
| // Swarming task ends with COMPLETED(SUCCESS), use the build status |
| // as final status. |
| finalStatus = bld.Proto.Output.GetStatus() |
| } |
| setEndStatus(finalStatus, nil) |
| } |
| default: |
| err = errors.Reason("Unexpected task state: %s", taskResult.State).Err() |
| return |
| } |
| |
| if bld.Proto.Status != oldStatus { |
| logging.Infof(ctx, "Build %d status: %s -> %s", bld.ID, oldStatus, bld.Proto.Status) |
| return |
| } |
| return |
| } |
| |
| // createSwarmingTask creates a swarming task for the build. |
| // Requires build.proto.infra to be populated. |
| // If the returned error is fatal and non-retryable, the tq.Fatal tag will be added. |
| func createSwarmingTask(ctx context.Context, build *model.Build, swarm clients.SwarmingClient) error { |
| taskReq, err := computeSwarmingNewTaskReq(ctx, build) |
| if err != nil { |
| return tq.Fatal.Apply(err) |
| } |
| |
| // Insert secret bytes. |
| token, err := buildtoken.GenerateToken(ctx, build.ID, pb.TokenBody_BUILD) |
| if err != nil { |
| return tq.Fatal.Apply(err) |
| } |
| secrets := &pb.BuildSecrets{ |
| StartBuildToken: token, |
| BuildToken: token, |
| ResultdbInvocationUpdateToken: build.ResultDBUpdateToken, |
| } |
| secretsBytes, err := proto.Marshal(secrets) |
| if err != nil { |
| return tq.Fatal.Apply(err) |
| } |
| for _, t := range taskReq.TaskSlices { |
| t.Properties.SecretBytes = secretsBytes |
| } |
| |
| // Create a swarming task |
| res, err := swarm.CreateTask(ctx, taskReq) |
| if err != nil { |
| // Give up if HTTP 500s are happening continuously. Otherwise re-throw the |
| // error so Cloud Tasks retries the task. |
| if apiErr, _ := err.(*googleapi.Error); apiErr == nil || apiErr.Code >= 500 { |
| if clock.Now(ctx).Sub(build.CreateTime) < swarmingCreateTaskGiveUpTimeout { |
| return transient.Tag.Apply(errors.Annotate(err, "failed to create a swarming task").Err()) |
| } |
| logging.Errorf(ctx, "Give up Swarming task creation retry after %s", swarmingCreateTaskGiveUpTimeout.String()) |
| } |
| // Strip out secret bytes and dump the task definition to the log. |
| for _, t := range taskReq.TaskSlices { |
| t.Properties.SecretBytes = nil |
| } |
| logging.Errorf(ctx, "Swarming task creation failure:%s. CreateTask request: %+v\nResponse: %+v", err, taskReq, res) |
| return tq.Fatal.Apply(errors.Annotate(err, "failed to create a swarming task").Err()) |
| } |
| |
| // Update the build with the build token and new task id. |
| err = datastore.RunInTransaction(ctx, func(ctx context.Context) error { |
| bld := &model.Build{ |
| ID: build.ID, |
| } |
| infra := &model.BuildInfra{Build: datastore.KeyForObj(ctx, bld)} |
| if err := datastore.Get(ctx, bld, infra); err != nil { |
| return errors.Annotate(err, "failed to fetch build or buildInfra: %d", bld.ID).Err() |
| } |
| |
| if infra.Proto.Swarming.TaskId != "" { |
| return errors.Reason("build already has a task %s", infra.Proto.Swarming.TaskId).Err() |
| } |
| infra.Proto.Swarming.TaskId = res.TaskId |
| bld.UpdateToken = token |
| |
| return datastore.Put(ctx, bld, infra) |
| }, nil) |
| if err != nil { |
| // now that swarm.CreateTask is idempotent, we should reuse the task, |
| // instead of cancelling it. |
| logging.Errorf(ctx, "created a task %s, but failed to update datastore with the error:%s", res.TaskId, err) |
| return transient.Tag.Apply(errors.Annotate(err, "failed to update build %d", build.ID).Err()) |
| } |
| return nil |
| } |
| |
| func computeSwarmingNewTaskReq(ctx context.Context, build *model.Build) (*apipb.NewTaskRequest, error) { |
| sw := build.Proto.GetInfra().GetSwarming() |
| if sw == nil { |
| return nil, errors.New("build.Proto.Infra.Swarming isn't set") |
| } |
| taskReq := &apipb.NewTaskRequest{ |
| // to prevent accidental multiple task creation |
| RequestUuid: uuid.NewSHA1(uuid.Nil, []byte(strconv.FormatInt(build.ID, 10))).String(), |
| Name: fmt.Sprintf("bb-%d-%s", build.ID, build.BuilderID), |
| Realm: build.Realm(), |
| Tags: computeTags(ctx, build), |
| Priority: int32(sw.Priority), |
| PoolTaskTemplate: apipb.NewTaskRequest_SKIP, |
| } |
| |
| if build.Proto.Number > 0 { |
| taskReq.Name = fmt.Sprintf("%s-%d", taskReq.Name, build.Proto.Number) |
| } |
| |
| taskSlices, err := computeTaskSlice(build) |
| if err != nil { |
| errors.Annotate(err, "failed to computing task slices").Err() |
| } |
| taskReq.TaskSlices = taskSlices |
| |
| // Only makes swarming to track the build's parent if Buildbucket doesn't |
| // track. |
| // Buildbucket should track the parent/child build relationships for all |
| // Buildbucket Builds. |
| // Except for children of led builds, whose parents are still tracked by |
| // swarming using sw.parent_run_id. |
| // TODO(crbug.com/1031205): remove the check on |
| // luci.buildbucket.parent_tracking after this experiment is on globally and |
| // we're ready to remove it. |
| if sw.ParentRunId != "" && (len(build.Proto.AncestorIds) == 0 || |
| !strings.Contains(build.ExperimentsString(), buildbucket.ExperimentParentTracking)) { |
| taskReq.ParentTaskId = sw.ParentRunId |
| } |
| |
| if sw.TaskServiceAccount != "" { |
| taskReq.ServiceAccount = sw.TaskServiceAccount |
| } |
| |
| taskReq.PubsubTopic = fmt.Sprintf(pubsubTopicTemplate, info.AppID(ctx)) |
| ud := &userdata{ |
| BuildID: build.ID, |
| CreatedTS: clock.Now(ctx).UnixNano() / int64(time.Microsecond), |
| SwarmingHostname: sw.Hostname, |
| } |
| udBytes, err := json.Marshal(ud) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to marshal pubsub userdata").Err() |
| } |
| taskReq.PubsubUserdata = string(udBytes) |
| return taskReq, err |
| } |
| |
| // computeTags computes the Swarming task request tags to use. |
| // Note it doesn't compute kitchen related tags. |
| func computeTags(ctx context.Context, build *model.Build) []string { |
| tags := []string{ |
| "buildbucket_bucket:" + build.BucketID, |
| fmt.Sprintf("buildbucket_build_id:%d", build.ID), |
| fmt.Sprintf("buildbucket_hostname:%s", build.Proto.GetInfra().GetBuildbucket().GetHostname()), |
| "luci_project:" + build.Project, |
| } |
| if build.Canary { |
| tags = append(tags, "buildbucket_template_canary:1") |
| } else { |
| tags = append(tags, "buildbucket_template_canary:0") |
| } |
| |
| tags = append(tags, build.Tags...) |
| sort.Strings(tags) |
| return tags |
| } |
| |
| // computeTaskSlice computes swarming task slices. |
| // build.Proto.Infra must be set. |
| func computeTaskSlice(build *model.Build) ([]*apipb.TaskSlice, error) { |
| // expiration_secs -> []*StringPair |
| dims := map[int64][]*apipb.StringPair{} |
| for _, cache := range build.Proto.GetInfra().GetSwarming().GetCaches() { |
| expSecs := cache.WaitForWarmCache.GetSeconds() |
| if expSecs <= 0 { |
| continue |
| } |
| if _, ok := dims[expSecs]; !ok { |
| dims[expSecs] = []*apipb.StringPair{} |
| } |
| dims[expSecs] = append(dims[expSecs], &apipb.StringPair{ |
| Key: "caches", |
| Value: cache.Name, |
| }) |
| } |
| for _, dim := range build.Proto.GetInfra().GetSwarming().GetTaskDimensions() { |
| expSecs := dim.Expiration.GetSeconds() |
| if _, ok := dims[expSecs]; !ok { |
| dims[expSecs] = []*apipb.StringPair{} |
| } |
| dims[expSecs] = append(dims[expSecs], &apipb.StringPair{ |
| Key: dim.Key, |
| Value: dim.Value, |
| }) |
| } |
| |
| // extract base dim and delete it from the map. |
| baseDim, ok := dims[0] |
| if !ok { |
| baseDim = []*apipb.StringPair{} |
| } |
| delete(dims, 0) |
| if len(dims) > 6 { |
| return nil, errors.New("At most 6 different expiration_secs to be allowed in swarming") |
| } |
| |
| baseSlice := &apipb.TaskSlice{ |
| ExpirationSecs: int32(build.Proto.GetSchedulingTimeout().GetSeconds()), |
| WaitForCapacity: build.Proto.GetWaitForCapacity(), |
| Properties: &apipb.TaskProperties{ |
| CipdInput: computeCipdInput(build), |
| ExecutionTimeoutSecs: int32(build.Proto.GetExecutionTimeout().GetSeconds()), |
| GracePeriodSecs: int32(build.Proto.GetGracePeriod().GetSeconds() + bbagentReservedGracePeriod), |
| Caches: computeTaskSliceCaches(build), |
| Dimensions: baseDim, |
| EnvPrefixes: computeEnvPrefixes(build), |
| Env: []*apipb.StringPair{ |
| {Key: "BUILDBUCKET_EXPERIMENTAL", Value: strings.ToUpper(strconv.FormatBool(build.Experimental))}, |
| }, |
| Command: computeCommand(build), |
| }, |
| } |
| |
| // sort dims map by expiration_sec. |
| var expSecs []int |
| for expSec := range dims { |
| expSecs = append(expSecs, int(expSec)) |
| } |
| sort.Ints(expSecs) |
| |
| // TODO(vadimsh): Remove this when no longer needed, ETA Oct 2022. This is |
| // used to load test Swarming's slice expiration mechanism. |
| sliceWaitForCapacity := build.Proto.GetWaitForCapacity() && |
| strings.Contains(build.ExperimentsString(), buildbucket.ExperimentWaitForCapacity) |
| |
| // Create extra task slices by copying the base task slice. Adding the |
| // corresponding expiration and desired dimensions |
| lastExp := 0 |
| taskSlices := make([]*apipb.TaskSlice, len(expSecs)+1) |
| for i, sec := range expSecs { |
| prop := &apipb.TaskProperties{} |
| if err := deepCopy(baseSlice.Properties, prop); err != nil { |
| return nil, err |
| } |
| taskSlices[i] = &apipb.TaskSlice{ |
| WaitForCapacity: sliceWaitForCapacity, |
| ExpirationSecs: int32(sec - lastExp), |
| Properties: prop, |
| } |
| // dims[i] should be added into all previous non-expired task slices. |
| for j := 0; j <= i; j++ { |
| taskSlices[j].Properties.Dimensions = append(taskSlices[j].Properties.Dimensions, dims[int64(sec)]...) |
| } |
| lastExp = sec |
| } |
| |
| // Tweak expiration on the baseSlice, which is the last slice. |
| exp := max(int(baseSlice.ExpirationSecs)-lastExp, 60) |
| baseSlice.ExpirationSecs = int32(exp) |
| taskSlices[len(taskSlices)-1] = baseSlice |
| |
| sortDim := func(strPairs []*apipb.StringPair) { |
| sort.Slice(strPairs, func(i, j int) bool { |
| if strPairs[i].Key == strPairs[j].Key { |
| return strPairs[i].Value < strPairs[j].Value |
| } |
| return strPairs[i].Key < strPairs[j].Key |
| }) |
| } |
| // sort dimensions in each task slice. |
| for _, t := range taskSlices { |
| sortDim(t.Properties.Dimensions) |
| } |
| return taskSlices, nil |
| } |
| |
| // computeTaskSliceCaches computes the task slice caches. |
| func computeTaskSliceCaches(build *model.Build) []*apipb.CacheEntry { |
| infra := build.Proto.Infra |
| caches := make([]*apipb.CacheEntry, 0, len(infra.Swarming.GetCaches())+2) |
| for _, c := range build.Proto.Infra.Swarming.GetCaches() { |
| caches = append(caches, &apipb.CacheEntry{ |
| Name: c.Name, |
| Path: filepath.Join(cacheDir, c.Path), |
| }) |
| } |
| if cipdClientCache := infra.Buildbucket.GetAgent().GetCipdClientCache(); cipdClientCache != nil { |
| caches = append(caches, &apipb.CacheEntry{ |
| Name: cipdClientCache.Name, |
| Path: filepath.Join(cacheDir, cipdClientCache.Path), |
| }) |
| } |
| if cipdPackagesCache := infra.Buildbucket.GetAgent().GetCipdPackagesCache(); cipdPackagesCache != nil { |
| caches = append(caches, &apipb.CacheEntry{ |
| Name: cipdPackagesCache.Name, |
| Path: filepath.Join(cacheDir, cipdPackagesCache.Path), |
| }) |
| } |
| |
| return caches |
| } |
| |
| // computeCipdInput returns swarming task CIPD input. |
| // Note: this function only considers v2 bbagent builds. |
| // The build.Proto.Infra.Buildbucket.Agent.Source must be set |
| func computeCipdInput(build *model.Build) *apipb.CipdInput { |
| return &apipb.CipdInput{ |
| Packages: []*apipb.CipdPackage{{ |
| PackageName: build.Proto.GetInfra().GetBuildbucket().GetAgent().GetSource().GetCipd().GetPackage(), |
| Version: build.Proto.GetInfra().GetBuildbucket().GetAgent().GetSource().GetCipd().GetVersion(), |
| Path: ".", |
| }}, |
| } |
| } |
| |
| // computeEnvPrefixes returns env_prefixes key in swarming properties. |
| // Note: this function only considers v2 bbagent builds. |
| func computeEnvPrefixes(build *model.Build) []*apipb.StringListPair { |
| prefixesMap := map[string][]string{} |
| for _, c := range build.Proto.GetInfra().GetSwarming().GetCaches() { |
| if c.EnvVar != "" { |
| if _, ok := prefixesMap[c.EnvVar]; !ok { |
| prefixesMap[c.EnvVar] = []string{} |
| } |
| prefixesMap[c.EnvVar] = append(prefixesMap[c.EnvVar], filepath.Join(cacheDir, c.Path)) |
| } |
| } |
| var keys []string |
| for key := range prefixesMap { |
| keys = append(keys, key) |
| } |
| sort.Strings(keys) |
| prefixes := make([]*apipb.StringListPair, len(keys)) |
| for i, key := range keys { |
| prefixes[i] = &apipb.StringListPair{ |
| Key: key, |
| Value: prefixesMap[key], |
| } |
| } |
| return prefixes |
| } |
| |
| // computeCommand computes the command for bbagent. |
| func computeCommand(build *model.Build) []string { |
| if strings.Contains(build.ExperimentsString(), buildbucket.ExperimentBBAgentGetBuild) { |
| return []string{ |
| "bbagent${EXECUTABLE_SUFFIX}", |
| "-host", |
| build.Proto.GetInfra().GetBuildbucket().GetHostname(), |
| "-build-id", |
| strconv.FormatInt(build.ID, 10), |
| } |
| } |
| |
| return []string{ |
| "bbagent${EXECUTABLE_SUFFIX}", |
| bbinput.Encode(&pb.BBAgentArgs{ |
| Build: build.Proto, |
| CacheDir: build.Proto.GetInfra().GetBbagent().GetCacheDir(), |
| KnownPublicGerritHosts: build.Proto.GetInfra().GetBuildbucket().GetKnownPublicGerritHosts(), |
| PayloadPath: build.Proto.GetInfra().GetBbagent().GetPayloadPath(), |
| }), |
| } |
| } |
| |
| func max(a, b int) int { |
| if a > b { |
| return a |
| } |
| return b |
| } |
| |
| // deepCopy deep copies src to dst using json marshaling for non-proto messages. |
| func deepCopy(src, dst any) error { |
| srcBytes, err := json.Marshal(src) |
| if err != nil { |
| return err |
| } |
| return json.Unmarshal(srcBytes, dst) |
| } |