blob: e8fbef4e6733203d69d8ab5c7efac3d2e666e8ad [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package longops
import (
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/dispatcher"
"go.chromium.org/luci/common/sync/dispatcher/buffer"
"go.chromium.org/luci/cv/internal/changelist"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/common/lease"
"go.chromium.org/luci/cv/internal/configs/prjcfg"
"go.chromium.org/luci/cv/internal/gerrit"
"go.chromium.org/luci/cv/internal/gerrit/cancel"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/eventpb"
)
// CancelTriggersOp cancels the triggers for the provided CLs.
//
// CancelTriggersOp keeps retrying on lease error and transient failure for each
// CL till the long op deadline is exceeded or cancellation either succeeds
// or fails non-transiently.
//
// CancelTriggersOp doesn't obey longop's cancellation request because if
// this long op is left half-done, for example, triggers on half of the CLs are
// untouched, a new Run may be created for those CLs.
//
// CancelTriggersOp is a single-use object.
type CancelTriggersOp struct {
*Base
GFactory gerrit.Factory
CLMutator *changelist.Mutator
// CancelConcurrency is the number of CLs that will be cancelled concurrently.
//
// Default is 8.
CancelConcurrency int
// Private fields that will be populated internally during long op execution.
inputs []cancel.Input
results []cancelResult
// testAfterTryCancelFn is always called after each try to cancel the trigger
// of a CL.
//
// This is only set for testing purpose.
testAfterTryCancelFn func()
}
const defaultCancelConcurrency = 8
// Do actually cancels the triggers.
func (op *CancelTriggersOp) Do(ctx context.Context) (*eventpb.LongOpCompleted, error) {
op.assertCalledOnce()
if err := op.loadInputs(ctx); err != nil {
return nil, err
}
op.executeInParallel(ctx)
longOpStatus := eventpb.LongOpCompleted_SUCCEEDED // be optimistic
ct := &eventpb.LongOpCompleted_CancelTriggers{
Results: make([]*eventpb.LongOpCompleted_CancelTriggers_Result, len(op.results)),
}
var lastTransErr, lastPermErr error
for i, result := range op.results {
cl := op.inputs[i].CL
ct.Results[i] = &eventpb.LongOpCompleted_CancelTriggers_Result{
Id: int64(cl.ID),
ExternalId: string(cl.ExternalID),
}
switch err := result.err; {
case err == nil:
ct.Results[i].Detail = &eventpb.LongOpCompleted_CancelTriggers_Result_SuccessInfo{
SuccessInfo: &eventpb.LongOpCompleted_CancelTriggers_Result_Success{
CancelledAt: timestamppb.New(result.cancelledAt),
},
}
default:
longOpStatus = eventpb.LongOpCompleted_FAILED
ct.Results[i].Detail = &eventpb.LongOpCompleted_CancelTriggers_Result_FailureInfo{
FailureInfo: &eventpb.LongOpCompleted_CancelTriggers_Result_Failure{
FailureMessage: err.Error(),
},
}
logging.Errorf(ctx, "failed to cancel the trigger of CL %d %q: %s", cl.ID, cl.ExternalID, err)
if transient.Tag.In(err) {
lastTransErr = err
} else {
lastPermErr = err
}
}
}
ret := &eventpb.LongOpCompleted{
Status: longOpStatus,
Result: &eventpb.LongOpCompleted_CancelTriggers_{
CancelTriggers: ct,
},
}
switch ctxErr := ctx.Err(); {
// Returns the event in error case as well because the event will be
// reported back to Run Manager.
case ctxErr == context.DeadlineExceeded:
logging.Errorf(ctx, "running out of time to cancel triggers")
return ret, ctxErr
case ctxErr == context.Canceled:
logging.Errorf(ctx, "context is cancelled while cancelling triggers")
return ret, ctxErr
case ctxErr != nil:
panic(fmt.Errorf("unexpected context error: %s", ctxErr))
case lastPermErr != nil:
return ret, lastPermErr
case lastTransErr != nil:
// Don't return a transient error to prevent long op from retrying.
// The transient error should have been retried many times in this long op.
return ret, transient.Tag.Off().Apply(lastTransErr)
default:
return ret, nil
}
}
func (op *CancelTriggersOp) loadInputs(ctx context.Context) error {
var (
clsToCancel []*changelist.CL
triggers map[common.CLID]*run.Triggers
cfg *prjcfg.ConfigGroup
allRunCLExternalIDs []changelist.ExternalID
)
eg, ctx := errgroup.WithContext(ctx)
requests := op.Op.GetCancelTriggers().GetRequests()
eg.Go(func() (err error) {
clids := make(common.CLIDs, len(requests))
for i, req := range requests {
clids[i] = common.CLID(req.Clid)
}
clsToCancel, err = changelist.LoadCLsByIDs(ctx, clids)
return err
})
eg.Go(func() error {
runCLs, err := run.LoadRunCLs(ctx, op.Run.ID, op.Run.CLs)
if err != nil {
return err
}
triggers = make(map[common.CLID]*run.Triggers, len(runCLs))
allRunCLExternalIDs = make([]changelist.ExternalID, len(runCLs))
for i, runCL := range runCLs {
triggers[runCL.ID] = triggers[runCL.ID].WithTrigger(runCL.Trigger)
allRunCLExternalIDs[i] = runCL.ExternalID
}
return nil
})
eg.Go(func() (err error) {
cfg, err = prjcfg.GetConfigGroup(ctx, op.Run.ID.LUCIProject(), op.Run.ConfigGroupID)
return err
})
if err := eg.Wait(); err != nil {
return err
}
op.inputs = make([]cancel.Input, len(requests))
op.results = make([]cancelResult, len(requests))
luciProject := op.Run.ID.LUCIProject()
for i := range requests {
cl, req := clsToCancel[i], requests[i]
op.inputs[i] = cancel.Input{
CL: cl,
Triggers: triggers[cl.ID],
LUCIProject: luciProject,
Message: req.Message,
Requester: "Trigger Cancellation",
Notify: convertToGerritWhoms(req.Notify),
LeaseDuration: time.Minute,
ConfigGroups: []*prjcfg.ConfigGroup{cfg},
RunCLExternalIDs: allRunCLExternalIDs,
AddToAttentionSet: convertToGerritWhoms(req.AddToAttention),
AttentionReason: req.AddToAttentionReason,
GFactory: op.GFactory,
CLMutator: op.CLMutator,
}
op.results[i] = cancelResult{
err: notAttemptedYetErr,
}
}
return nil
}
func convertToGerritWhoms(whoms []run.OngoingLongOps_Op_TriggersCancellation_Whom) gerrit.Whoms {
ret := make(gerrit.Whoms, len(whoms))
for i, whom := range whoms {
switch whom {
case run.OngoingLongOps_Op_TriggersCancellation_OWNER:
ret[i] = gerrit.Owner
case run.OngoingLongOps_Op_TriggersCancellation_REVIEWERS:
ret[i] = gerrit.Reviewers
case run.OngoingLongOps_Op_TriggersCancellation_CQ_VOTERS:
ret[i] = gerrit.CQVoters
case run.OngoingLongOps_Op_TriggersCancellation_PS_UPLOADER:
ret[i] = gerrit.PSUploader
default:
panic(fmt.Errorf("unrecognized whom [%s] in trigger cancellation", whom))
}
}
return ret
}
type cancelItem struct {
index int
input cancel.Input
}
type cancelResult struct {
cancelledAt time.Time
err error
}
// notAttemptedYetErr is the initial error set in cancelResult.
var notAttemptedYetErr = errors.New("not attempted cancellation yet")
// executeInParallel cancels the triggers of the provided CLs in parallel
// and keeps retrying on transient or alreadyInLease failure until the context
// is done.
func (op *CancelTriggersOp) executeInParallel(ctx context.Context) {
dc := op.makeDispatcherChannel(ctx)
for i, input := range op.inputs {
dc.C <- cancelItem{index: i, input: input}
}
dc.Close()
<-dc.DrainC
}
func (op *CancelTriggersOp) makeDispatcherChannel(ctx context.Context) dispatcher.Channel {
concurrency := op.CancelConcurrency
if concurrency == 0 {
concurrency = defaultCancelConcurrency
}
concurrency = min(concurrency, len(op.inputs))
dc, err := dispatcher.NewChannel(ctx, &dispatcher.Options{
ErrorFn: func(failedBatch *buffer.Batch, err error) (retry bool) {
_, isLeaseErr := lease.IsAlreadyInLeaseErr(err)
return isLeaseErr || transient.Tag.In(err)
},
DropFn: dispatcher.DropFnQuiet,
Buffer: buffer.Options{
MaxLeases: concurrency,
BatchItemsMax: 1,
FullBehavior: &buffer.BlockNewItems{
MaxItems: len(op.results),
},
Retry: op.makeRetryFactory(),
},
}, func(data *buffer.Batch) error {
ci, ok := data.Data[0].Item.(cancelItem)
if !ok {
panic(fmt.Errorf("unexpected batch data item %s", data.Data[0].Item))
}
result := &op.results[ci.index]
result.err = cancel.Cancel(ctx, ci.input)
if result.err == nil {
result.cancelledAt = clock.Now(ctx)
}
if op.testAfterTryCancelFn != nil {
op.testAfterTryCancelFn()
}
return result.err
})
if err != nil {
panic(fmt.Errorf("unexpected failure when creating dispatcher channel: %s", err))
}
return dc
}
func (op *CancelTriggersOp) makeRetryFactory() retry.Factory {
return func() retry.Iterator {
return &cancellationRetryIterator{
inner: &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 100 * time.Millisecond,
Retries: -1, // unlimited
},
Multiplier: 2,
MaxDelay: 1 * time.Minute,
},
}
}
}
// cancellationRetryIterator retries on transient failure in an exponential
// fashion. If the error is alreadyInLease, this iterator returns the earliest
// time of the lease expiry and the inner's iterator next retry.
type cancellationRetryIterator struct {
inner retry.Iterator
}
// Next implements retry.Iterator
func (c cancellationRetryIterator) Next(ctx context.Context, err error) time.Duration {
switch leaseErr, isLeaseErr := lease.IsAlreadyInLeaseErr(err); {
case isLeaseErr:
innerNext := c.inner.Next(ctx, err)
if timeToExpire := clock.Until(ctx, leaseErr.ExpireTime); timeToExpire > 0 && timeToExpire < innerNext {
return timeToExpire
}
return innerNext
case transient.Tag.In(err):
return c.inner.Next(ctx, err)
default:
return retry.Stop
}
}