blob: 5d315e38f2202c7a7a6312a2acc0b178e1398054 [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package submit
import (
"context"
"fmt"
"strings"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/gerrit"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/eventpb"
)
// RM encapsulates interaction with Run Manager by `RunCLsSubmitter`.
type RM interface {
Invoke(ctx context.Context, runID common.RunID, eta time.Time) error
NotifyReadyForSubmission(ctx context.Context, runID common.RunID, eta time.Time) error
NotifyCLsSubmitted(ctx context.Context, runID common.RunID, clids common.CLIDs) error
NotifySubmissionCompleted(ctx context.Context, runID common.RunID, sc *eventpb.SubmissionCompleted, invokeRM bool) error
}
// RunCLsSubmitter submits CLs for a Run.
type RunCLsSubmitter struct {
// All fields are immutable.
// runID is the ID of the Run to be submitted.
runID common.RunID
// deadline is when this submission should be stopped.
deadline time.Time
// clids contains ids of cls to be submitted in submission order.
clids common.CLIDs
// rm is used to interact with Run Manager.
rm RM
// gFactory is used to interact with Gerrit.
gFactory gerrit.Factory
}
// NewSubmitter creates a new RunCLsSubmitter.
func NewSubmitter(ctx context.Context, runID common.RunID, submission *run.Submission, rm RM, g gerrit.Factory) *RunCLsSubmitter {
notSubmittedCLs := make(common.CLIDs, 0, len(submission.GetCls())-len(submission.GetSubmittedCls()))
submitted := common.MakeCLIDs(submission.GetSubmittedCls()...).Set()
for _, cl := range submission.GetCls() {
clid := common.CLID(cl)
if _, ok := submitted[clid]; !ok {
notSubmittedCLs = append(notSubmittedCLs, clid)
}
}
return &RunCLsSubmitter{
runID: runID,
deadline: submission.GetDeadline().AsTime(),
clids: notSubmittedCLs,
rm: rm,
gFactory: g,
}
}
// ErrTransientSubmissionFailure indicates that the submission has failed
// transiently and the same task should be retried.
var ErrTransientSubmissionFailure = errors.New("submission failed transiently", transient.Tag)
// Submit tries to incrementally submits all CLs within deadline.
//
// Notifies RunManager whenever a CL is submitted successfully. Reports the
// final submission result to RunManager and releases the submit queue at the
// end.
//
// Submission is considered as failure if this Run no longer holds the submit
// queue or can't submit all CLs within deadline.
//
// Returns `ErrTransientSubmissionFailure` or error that happened when reporting
// final result to Run Manager, or releasing submit queue only. The rest of the
// errors that happened during the submission will be reported to Run Manager as
// part of the submission result.
func (s RunCLsSubmitter) Submit(ctx context.Context) error {
switch cur, err := CurrentRun(ctx, s.runID.LUCIProject()); {
case err != nil:
return s.endSubmission(ctx, classifyErr(ctx, err))
case cur != s.runID:
logging.Errorf(ctx, "BUG: run no longer holds submit queue, currently held by %q", cur)
return s.endSubmission(ctx, &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_FAILED_PERMANENT,
})
}
cls, err := run.LoadRunCLs(ctx, s.runID, s.clids)
if err != nil {
return s.endSubmission(ctx, classifyErr(ctx, err))
}
dctx, cancel := clock.WithDeadline(ctx, s.deadline)
defer cancel()
sc := s.submitCLs(dctx, cls)
return s.endSubmission(ctx, sc)
}
// endSubmission notifies RM about submission result and release Submit Queue
// if necessary.
func (s RunCLsSubmitter) endSubmission(ctx context.Context, sc *eventpb.SubmissionCompleted) error {
if sc.GetResult() == eventpb.SubmissionResult_FAILED_TRANSIENT {
// Do not release queue for transient failure.
if err := s.rm.NotifySubmissionCompleted(ctx, s.runID, sc, true); err != nil {
return err
}
return ErrTransientSubmissionFailure
}
var innerErr error
now := clock.Now(ctx)
err := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
if sc.Result == eventpb.SubmissionResult_SUCCEEDED {
if innerErr = ReleaseOnSuccess(ctx, s.rm.NotifyReadyForSubmission, s.runID, now); innerErr != nil {
return innerErr
}
} else {
if innerErr = Release(ctx, s.rm.NotifyReadyForSubmission, s.runID); innerErr != nil {
return innerErr
}
}
sc.QueueReleaseTimestamp = timestamppb.New(now)
if innerErr = s.rm.NotifySubmissionCompleted(ctx, s.runID, sc, false); innerErr != nil {
return innerErr
}
return nil
}, nil)
switch {
case innerErr != nil:
return innerErr
case err != nil:
return errors.Annotate(err, "failed to release submit queue and notify RM").Tag(transient.Tag).Err()
}
return s.rm.Invoke(ctx, s.runID, time.Time{})
}
var perCLRetryFactory retry.Factory = transient.Only(func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 200 * time.Millisecond,
Retries: 10,
},
Multiplier: 2,
}
})
// submitCLs sequentially submits the provided CLs.
//
// Retries on transient failure of submitting individual CL based on
// `perCLRetryFactory`.
func (s RunCLsSubmitter) submitCLs(ctx context.Context, cls []*run.RunCL) *eventpb.SubmissionCompleted {
for _, cl := range cls {
if clock.Now(ctx).After(s.deadline) {
return &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_FAILED_PERMANENT,
FailureReason: &eventpb.SubmissionCompleted_Timeout{
Timeout: true,
},
}
}
var submitted bool
var msg string
err := retry.Retry(ctx, perCLRetryFactory, func() error {
if !submitted {
switch err := s.submitCL(ctx, cl); {
case err == nil:
submitted = true
default:
var isTransient bool
msg, isTransient = classifyGerritErr(ctx, err)
if isTransient {
return transient.Tag.Apply(err)
}
// Ensure err is not tagged with transient.
return transient.Tag.Off().Apply(err)
}
}
return s.rm.NotifyCLsSubmitted(ctx, s.runID, common.CLIDs{cl.ID})
}, retry.LogCallback(ctx, fmt.Sprintf("submit cl [id=%d, external_id=%q]", cl.ID, cl.ExternalID)))
if err != nil {
evt := classifyErr(ctx, err)
if !submitted {
evt.FailureReason = &eventpb.SubmissionCompleted_ClFailures{
ClFailures: &eventpb.SubmissionCompleted_CLSubmissionFailures{
Failures: []*eventpb.SubmissionCompleted_CLSubmissionFailure{
{Clid: int64(cl.ID), Message: msg},
},
},
}
}
return evt
}
}
return &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_SUCCEEDED,
}
}
// submitCL makes a request to Gerrit to submit (merge) the CL.
func (s RunCLsSubmitter) submitCL(ctx context.Context, cl *run.RunCL) error {
gc, err := s.gFactory.MakeClient(ctx, cl.Detail.GetGerrit().GetHost(), s.runID.LUCIProject())
if err != nil {
return err
}
ci := cl.Detail.GetGerrit().GetInfo()
submitErr := s.gFactory.MakeMirrorIterator(ctx).RetryIfStale(func(opt grpc.CallOption) error {
_, err := gc.SubmitRevision(ctx, &gerritpb.SubmitRevisionRequest{
Number: ci.GetNumber(),
RevisionId: ci.GetCurrentRevision(),
Project: ci.GetProject(),
})
grpcStatus, ok := status.FromError(errors.Unwrap(err))
if !ok {
return err
}
switch code, msg := grpcStatus.Code(), grpcStatus.Message(); {
case code == codes.OK:
return nil
case code == codes.Internal && strings.Contains(msg, "LOCK_FAILURE"):
// 503 Lock Failure is normally caused by stale replica serving the
// request.
// Reference: go/gob/users/user-faq#an-update-of-the-change-is-failing-with-the-503-lock-failure
return gerrit.ErrStaleData
default:
return err
}
})
if submitErr == nil {
return nil
}
// Sometimes, Gerrit may return error but change is actually merged.
// Load the change again to check whether it is actually merged.
merged := false
_ = s.gFactory.MakeMirrorIterator(ctx).RetryIfStale(func(opt grpc.CallOption) error {
latest, err := gc.GetChange(ctx, &gerritpb.GetChangeRequest{
Number: ci.GetNumber(),
Project: ci.GetProject(),
}, opt)
switch {
case err != nil:
return err
case latest.GetStatus() == gerritpb.ChangeStatus_MERGED:
// It is possible that somebody else submitted the change, but this is
// unlikely enough that we presume CV did it. If necessary, it's possible
// to examine Change messages to see who actually did it.
merged = true
return nil
case latest.GetUpdated().AsTime().Before(ci.GetUpdated().AsTime()):
return gerrit.ErrStaleData
default:
merged = false
return nil
}
})
if merged {
return nil
}
return submitErr
}
// Determines the type of submission completion result based on the given error.
func classifyErr(ctx context.Context, err error) *eventpb.SubmissionCompleted {
switch {
case err == nil:
return &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_SUCCEEDED,
}
case transient.Tag.In(err):
logging.Warningf(ctx, "Submission ended with FAILED_TRANSIENT: %s", err)
return &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_FAILED_TRANSIENT,
}
default:
logging.Warningf(ctx, "Submission ended with FAILED_PERMANENT: %s", err)
return &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_FAILED_PERMANENT,
}
}
}
const (
failedPreconditionMsgFmt = "Gerrit rejected submission of this CL with " +
"error: %s\nHint: rebasing CL in Gerrit UI and re-submitting usually " +
"works"
gerritTimeoutMsg = "Failed to submit this CL because Gerrit took too " +
"long to respond. If this error persists after retries, rebasing and " +
"then re-submitting your CL might help."
permDeniedMsg = "Failed to submit this CL due to permission denied. " +
"Please contact your project admin to grant the submit access to " +
"your LUCI project scoped account in Gerrit config."
resourceExhaustedMsg = "Failed to submit this CL because Gerrit " +
"throttled the submission."
unexpectedMsgFmt = "Failed to submit this CL because of unexpected error " +
"from Gerrit: %s"
)
// classifyGerritErr returns the message to be posted on the CL for the given
// submission error and whether the error is transient.
func classifyGerritErr(ctx context.Context, err error) (msg string, isTransient bool) {
grpcStatus, ok := status.FromError(errors.Unwrap(err))
if !ok {
return fmt.Sprintf(unexpectedMsgFmt, err), transient.Tag.In(err)
}
switch code, msg := grpcStatus.Code(), grpcStatus.Message(); code {
case codes.PermissionDenied:
return permDeniedMsg, false
case codes.FailedPrecondition:
// Gerrit returns 409. Either because the change can't be merged, or
// this revision isn't the latest.
return fmt.Sprintf(failedPreconditionMsgFmt, msg), false
case codes.ResourceExhausted:
return resourceExhaustedMsg, true
case codes.Internal:
return fmt.Sprintf(unexpectedMsgFmt, msg), true
case codes.DeadlineExceeded:
return gerritTimeoutMsg, true
default:
logging.Warningf(ctx, "unclassified grpc code [%s] received from Gerrit. Full error: %s", code, err)
return fmt.Sprintf(unexpectedMsgFmt, msg), false
}
}