blob: 5fc37176a82d3d2dc2eb6766a592f8fc1082270b [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package longops
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/configs/prjcfg"
"go.chromium.org/luci/cv/internal/gerrit"
"go.chromium.org/luci/cv/internal/gerrit/botdata"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/eventpb"
"go.chromium.org/luci/cv/internal/run/impl/util"
"go.chromium.org/luci/cv/internal/usertext"
)
// notPosted means start message wasn't posted yet.
//
// exists for readability only.
var notPosted time.Time
// PostStartMessageOp posts a start message on each of the Run CLs.
//
// PostStartMessageOp is a single-use object.
type PostStartMessageOp struct {
// All public fields must be set.
*Base
GFactory gerrit.Factory
Env *common.Env
// These private fields are set internally as implementation details.
rcls []*run.RunCL
cfg *prjcfg.ConfigGroup
botdataCLs []botdata.ChangeID
lock sync.Mutex
latestPostedAt time.Time
}
// Do actually posts the start messages.
func (op *PostStartMessageOp) Do(ctx context.Context) (*eventpb.LongOpCompleted, error) {
op.assertCalledOnce()
if op.IsCancelRequested() {
return &eventpb.LongOpCompleted{Status: eventpb.LongOpCompleted_CANCELLED}, nil
}
if err := op.prepare(ctx); err != nil {
return nil, err
}
errs := make(errors.MultiError, len(op.rcls))
poolError := parallel.WorkPool(min(len(op.rcls), 8), func(work chan<- func() error) {
for i, rcl := range op.rcls {
i, rcl := i, rcl
work <- func() error {
switch posted, err := op.doCL(ctx, rcl); {
case err != nil:
// all errors will be aggregated across all CLs below.
errs[i] = err
default:
op.lock.Lock()
if op.latestPostedAt.IsZero() || op.latestPostedAt.Before(posted) {
op.latestPostedAt = posted
}
op.lock.Unlock()
}
return nil
}
}
})
if poolError != nil {
panic(fmt.Errorf("unexpected WorkPool error %s", poolError))
}
// Aggregate all errors into the final result.
psm := &eventpb.LongOpCompleted_PostStartMessage{}
result := &eventpb.LongOpCompleted{
Status: eventpb.LongOpCompleted_SUCCEEDED, // be hopeful by default
Result: &eventpb.LongOpCompleted_PostStartMessage_{
PostStartMessage: psm,
},
}
var firstError error
for i, rcl := range op.rcls {
switch err := errs[i]; {
case err == nil:
psm.Posted = append(psm.Posted, int64(rcl.ID))
case errors.Unwrap(err) == errCancelHonored:
result.Status = eventpb.LongOpCompleted_CANCELLED
errs[i] = nil
case !transient.Tag.In(err):
if psm.GetPermanentErrors() == nil {
psm.PermanentErrors = make(map[int64]string, 1)
firstError = err
}
psm.GetPermanentErrors()[int64(rcl.ID)] = err.Error()
logging.Errorf(ctx, "failed to post start message on CL %d %q: %s", rcl.ID, rcl.ExternalID, err)
case firstError == nil:
// First transient error.
firstError = err
}
}
switch {
case len(psm.GetPermanentErrors()) > 0:
result.Status = eventpb.LongOpCompleted_FAILED
return result, firstError
case firstError != nil:
// Must be a transient error, expect a retry.
return nil, firstError
default:
psm.Time = timestamppb.New(op.latestPostedAt)
return result, nil
}
}
func (op *PostStartMessageOp) prepare(ctx context.Context) error {
eg, eCtx := errgroup.WithContext(ctx)
eg.Go(func() (err error) {
op.rcls, err = run.LoadRunCLs(eCtx, op.Run.ID, op.Run.CLs)
return
})
eg.Go(func() (err error) {
op.cfg, err = prjcfg.GetConfigGroup(eCtx, op.Run.ID.LUCIProject(), op.Run.ConfigGroupID)
return
})
if err := eg.Wait(); err != nil {
return err
}
op.botdataCLs = make([]botdata.ChangeID, len(op.rcls))
for i, rcl := range op.rcls {
op.botdataCLs[i].Host = rcl.Detail.GetGerrit().GetHost()
op.botdataCLs[i].Number = rcl.Detail.GetGerrit().GetInfo().GetNumber()
}
return nil
}
func (op *PostStartMessageOp) doCL(ctx context.Context, rcl *run.RunCL) (time.Time, error) {
if rcl.Detail.GetGerrit() == nil {
panic(fmt.Errorf("CL %d is not a Gerrit CL", rcl.ID))
}
if op.IsCancelRequested() {
return notPosted, errCancelHonored
}
queryOpts := []gerritpb.QueryOption{gerritpb.QueryOption_MESSAGES}
switch postedAt, err := util.IsActionTakenOnGerritCL(ctx, op.GFactory, rcl, queryOpts, op.hasStartMessagePosted); {
case err != nil:
return notPosted, errors.Annotate(err, "failed to check if message was already posted").Err()
case postedAt != notPosted:
logging.Debugf(ctx, "CL %d %s already has the starting message at %s", rcl.ID, rcl.ExternalID, postedAt)
return postedAt, nil
}
if op.IsCancelRequested() {
return notPosted, errCancelHonored
}
req, err := op.makeSetReviewReq(rcl)
if err != nil {
return notPosted, err
}
if err := util.MutateGerritCL(ctx, op.GFactory, rcl, req, 2*time.Minute, "post-start-message"); err != nil {
return notPosted, err
}
// NOTE: to avoid another round-trip to Gerrit, use the CV time here even
// though it isn't the same as what Gerrit recorded.
return clock.Now(ctx).Truncate(time.Second), nil
}
// hasStartMessagePosted returns when the start message was posted on a CL or
// zero time.
func (op *PostStartMessageOp) hasStartMessagePosted(rcl *run.RunCL, ci *gerritpb.ChangeInfo) time.Time {
// Look at latest messages first for efficiency,
// and skip all messages which are too old.
clTriggeredAt := rcl.Trigger.Time.AsTime()
for i := len(ci.GetMessages()) - 1; i >= 0; i-- {
m := ci.GetMessages()[i]
t := m.GetDate().AsTime()
if t.Before(clTriggeredAt) {
// i-th message is too old, no need to check even older ones.
return notPosted
}
switch data, ok := botdata.Parse(m); {
case !ok:
case data.Action != botdata.Start:
case data.TriggeredAt != clTriggeredAt:
case data.Revision != rcl.Detail.GetGerrit().GetInfo().GetCurrentRevision():
case len(data.CLs) != len(op.Run.CLs):
default:
return t
}
}
return notPosted
}
func (op *PostStartMessageOp) makeSetReviewReq(rcl *run.RunCL) (*gerritpb.SetReviewRequest, error) {
humanMsg := usertext.OnRunStartedGerritMessage(op.Run, op.cfg, op.Env)
bd := botdata.BotData{
Action: botdata.Start,
CLs: op.botdataCLs,
Revision: rcl.Detail.GetGerrit().GetInfo().GetCurrentRevision(),
TriggeredAt: rcl.Trigger.Time.AsTime(),
}
msg, err := botdata.Append(humanMsg, bd)
if err != nil {
return nil, errors.Annotate(err, "failed to generate the starting message").Err()
}
return &gerritpb.SetReviewRequest{
Project: rcl.Detail.GetGerrit().GetInfo().GetProject(),
Number: rcl.Detail.GetGerrit().GetInfo().GetNumber(),
RevisionId: rcl.Detail.GetGerrit().GetInfo().GetCurrentRevision(),
Tag: op.Run.Mode.GerritMessageTag(),
Notify: gerritpb.Notify_NOTIFY_NONE,
Message: msg,
}, nil
}