blob: 0ae9af0efc48a904b1fe9957c9cd92f8cf59af1e [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tasks
import (
pb ""
type pushRequest struct {
Message pubsub.PubsubMessage `json:"message"`
Subscription string `json:"subscription"`
type buildTaskUpdate struct {
// Message id of the pubsub message that sent this request.
msgID string
// Subscription of the pubsub message that sent this request.
subscription string
func unpackUpdateBuildTaskMsg(ctx context.Context, body io.Reader) (req buildTaskUpdate, err error) {
req = buildTaskUpdate{}
blob, err := io.ReadAll(body)
if err != nil {
return req, errors.Annotate(err, "failed to read the request body").Tag(transient.Tag).Err()
// process pubsub message
var msg pushRequest
if err := json.Unmarshal(blob, &msg); err != nil {
return req, errors.Annotate(err, "failed to unmarshal UpdateBuildTask PubSub message").Err()
// process UpdateBuildTask message data
data, err := base64.StdEncoding.DecodeString(msg.Message.Data)
if err != nil {
return req, errors.Annotate(err, "cannot decode UpdateBuildTask message data as base64").Err()
bldTskUpdte := &pb.BuildTaskUpdate{}
if err := (proto.UnmarshalOptions{DiscardUnknown: true}).Unmarshal(data, bldTskUpdte); err != nil {
return req, errors.Annotate(err, "failed to unmarshal the BuildTaskUpdate pubsub data").Err()
req.BuildTaskUpdate = bldTskUpdte
req.subscription = msg.Subscription
req.msgID = msg.Message.MessageId
return req, nil
func validateTaskStatus(taskStatus pb.Status, allowPending bool) error {
switch taskStatus {
case pb.Status_ENDED_MASK,
return errors.Reason("task.status: invalid status %s for UpdateBuildTask", taskStatus).Err()
if !allowPending && taskStatus == pb.Status_SCHEDULED {
return errors.Reason("task.status: invalid status %s for UpdateBuildTask", taskStatus).Err()
if _, ok := pb.Status_value[taskStatus.String()]; !ok {
return errors.Reason("task.status: invalid status %s for UpdateBuildTask", taskStatus).Err()
return nil
func validatePubsubSubscription(ctx context.Context, req buildTaskUpdate) error {
globalCfg, err := config.GetSettingsCfg(ctx)
if err != nil {
return errors.Annotate(err, "error fetching service config").Err()
target := req.GetTask().GetId().GetTarget()
if target == "" {
return errors.Reason("could not validate message. not provided.").Err()
isValid := false
for _, backend := range globalCfg.Backends {
if backend.Target == target {
var subscription string
switch backend.Mode.(type) {
case *pb.BackendSetting_LiteMode_:
return errors.Reason("backend target %s is in lite mode. The task update isn't supported", target).Err()
case *pb.BackendSetting_FullMode_:
subscription = fmt.Sprintf("projects/%s/subscriptions/%s", info.AppID(ctx), backend.GetFullMode().GetPubsubId())
if subscription == req.subscription {
isValid = true
if !isValid {
return errors.Reason("pubsub subscription %s did not match the one configured for target %s", req.subscription, target).Err()
return nil
func validateTask(task *pb.Task, allowPending bool) error {
if task.GetId().GetId() == "" {
return errors.Reason(" required").Err()
if task.GetUpdateId() == 0 {
return errors.Reason("task.UpdateId: required").Err()
if err := validateTaskStatus(task.Status, allowPending); err != nil {
return errors.Annotate(err, "task.Status").Err()
detailsInKb := float64(len(task.GetDetails().String()) / 1024)
if detailsInKb > 10 {
return errors.Reason("task.details is greater than 10 kb").Err()
return nil
// validateBuildTaskUpdate ensures that the build_id, task, status, and details
// are correctly set be sender.
func validateBuildTaskUpdate(ctx context.Context, req *pb.BuildTaskUpdate) error {
if req.BuildId == "" {
return errors.Reason("build_id required").Err()
return validateTask(req.Task, false)
// validateBuildTask ensures that the taskID provided in the request matches
// the taskID that is stored in the build model. If there is no task associated
// with the build, an error is returned and the update message is lost.
func validateBuildTask(ctx context.Context, req *pb.BuildTaskUpdate, infra *model.BuildInfra) error {
switch {
case infra.Proto.GetBackend() == nil:
return appstatus.Errorf(codes.NotFound, "Build %s does not support task backend", req.BuildId)
case infra.Proto.Backend.GetTask().GetId().GetId() == "":
return appstatus.Errorf(codes.NotFound, "No task is associated with the build. Cannot update.")
case infra.Proto.Backend.Task.Id.GetTarget() != req.Task.Id.GetTarget() || (infra.Proto.Backend.Task.Id.GetId() != "" && infra.Proto.Backend.Task.Id.GetId() != req.Task.Id.GetId()):
return errors.Reason("TaskID in request does not match TaskID associated with build").Err()
if protoutil.IsEnded(infra.Proto.Backend.Task.Status) {
return appstatus.Errorf(codes.FailedPrecondition, "cannot update an ended task")
return nil
func prepareUpdate(ctx context.Context, build *model.Build, infra *model.BuildInfra, task *pb.Task) ([]any, error) {
if task.UpdateId <= infra.Proto.Backend.Task.UpdateId {
// Returning nil since there is no work to do here.
// The task in the request is outdated.
return nil, nil
// Required fields to change
now := clock.Now(ctx)
build.Proto.UpdateTime = timestamppb.New(now)
proto.Merge(infra.Proto.Backend.Task, task)
toSave := []any{build, infra}
bs, steps, err := updateBuildStatusOnTaskStatusChange(ctx, build, nil, &buildstatus.StatusWithDetails{Status: task.Status}, now)
if err != nil {
return nil, err
if bs != nil {
toSave = append(toSave, bs)
if steps != nil {
toSave = append(toSave, steps)
return toSave, nil
func updateTaskEntity(ctx context.Context, req *pb.BuildTaskUpdate, buildID int64) error {
var build *model.Build
setBuildToEnd := false
txErr := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
entities, err := common.GetBuildEntities(ctx, buildID, model.BuildKind, model.BuildInfraKind)
if err != nil {
return errors.Annotate(err, "invalid Build or BuildInfra").Err()
build = entities[0].(*model.Build)
infra := entities[1].(*model.BuildInfra)
if protoutil.IsEnded(build.Status) {
// Cannot update an ended build.
logging.Infof(ctx, "build %d is ended", build.ID)
return nil
toSave, err := prepareUpdate(ctx, build, infra, req.Task)
if err != nil {
return err
setBuildToEnd = protoutil.IsEnded(req.Task.Status)
return datastore.Put(ctx, toSave)
}, nil)
if txErr != nil {
return txErr
if setBuildToEnd {
metrics.BuildCompleted(ctx, build)
return nil
// updateBuildTask allows the Backend to preemptively update the
// status of the task (e.g. if it knows that the task has crashed, etc.).
func updateBuildTask(ctx context.Context, req buildTaskUpdate) error {
buildID, err := strconv.ParseInt(req.GetBuildId(), 10, 64)
if err != nil {
return errors.Annotate(err, "bad build id").Err()
if err := validatePubsubSubscription(ctx, req); err != nil {
return errors.Annotate(err, "pubsub subscription").Err()
if err := validateBuildTaskUpdate(ctx, req.BuildTaskUpdate); err != nil {
return errors.Annotate(err, "invalid BuildTaskUpdate").Err()
logging.Infof(ctx, "Received an BuildTaskUpdate message for build %q", req.BuildId)
// TODO(b/288158829): remove it once the root cause for the Skia failure is found.
if strings.Contains(req.subscription, "skia") {
logging.Debugf(ctx, "BuildTaskUpdate.Task: %v", req.BuildTaskUpdate.Task)
entities, err := common.GetBuildEntities(ctx, buildID, model.BuildInfraKind)
if err != nil {
return errors.Annotate(err, "invalid buildInfra").Err()
infra := entities[0].(*model.BuildInfra)
// Pre-check if the task can be updated before updating it with a transaction.
// Ensures that the taskID provided in the request matches the taskID that is
// stored in the build model. If there is no task associated with the build model,
// an error is returned and the update message is lost.
err = validateBuildTask(ctx, req.BuildTaskUpdate, infra)
if err != nil {
return errors.Annotate(err, "invalid task").Err()
err = updateTaskEntity(ctx, req.BuildTaskUpdate, buildID)
if err != nil {
if _, isAppStatusErr := appstatus.Get(err); isAppStatusErr {
return err
} else {
return appstatus.Errorf(codes.Internal, "failed to update the build entity: %s", err)
return nil
// UpdateBuildTask handles task backend PubSub push messages produced by various
// task backends.
// For a retryable error, it will be tagged with transient.Tag.
func UpdateBuildTask(ctx context.Context, body io.Reader) error {
req, err := unpackUpdateBuildTaskMsg(ctx, body)
if err != nil {
return err
// Try not to process same message more than once.
cache := caching.GlobalCache(ctx, "update-build-task-pubsub-msg-id")
if cache == nil {
return errors.Reason("global cache is not found").Tag(transient.Tag).Err()
msgCached, err := cache.Get(ctx, req.msgID)
switch {
case err == caching.ErrCacheMiss: // no-op, continue
case err != nil:
return errors.Annotate(err, "failed to read %s from the global cache", req.msgID).Tag(transient.Tag).Err()
case msgCached != nil:
logging.Infof(ctx, "seen this message %s before, ignoring", req.msgID)
return nil
err = updateBuildTask(ctx, req)
if err != nil {
return err
return cache.Set(ctx, req.msgID, []byte{1}, 10*time.Minute)