blob: 35304b2067851bce997e0f11364e6d03e7bfff3c [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package tjcancel contains code in charge of cancelling stale tryjobs.
//
// Cancellator responds to tasks scheduled when a new patch is uploaded,
// looking for and cancelling stale tryjobs.
package tjcancel
import (
"context"
"fmt"
"strings"
"sync"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/tryjob"
)
// Cancellator is patterned after Updater to support multiple tryjob backends.
type Cancellator struct {
tqd *tq.Dispatcher
// guards backends map.
rwmutex sync.RWMutex
backends map[string]cancellatorBackend
}
func NewCancellator(tn *tryjob.Notifier) *Cancellator {
c := &Cancellator{
backends: make(map[string]cancellatorBackend),
}
tn.Bindings.CancelStale.AttachHandler(func(ctx context.Context, payload proto.Message) error {
return common.TQifyError(ctx, c.handleTask(ctx, payload.(*tryjob.CancelStaleTryjobsTask)))
})
return c
}
// RegisterBackend registers a backend.
//
// Panics if backend for the same kind is already registered.
func (c *Cancellator) RegisterBackend(b cancellatorBackend) {
kind := b.Kind()
if strings.ContainsRune(kind, '/') {
panic(fmt.Errorf("backend %T of kind %q must not contain '/'", b, kind))
}
c.rwmutex.Lock()
defer c.rwmutex.Unlock()
if _, exists := c.backends[kind]; exists {
panic(fmt.Errorf("backend %q is already registered", kind))
}
c.backends[kind] = b
}
func (c *Cancellator) handleTask(ctx context.Context, task *tryjob.CancelStaleTryjobsTask) error {
if task.PreviousMinEquivPatchset >= task.CurrentMinEquivPatchset {
panic(fmt.Errorf("patchset numbers expected to increase monotonically"))
}
atOrAfter := tryjob.MakeCLPatchset(common.CLID(task.GetClid()), task.GetPreviousMinEquivPatchset())
before := tryjob.MakeCLPatchset(common.CLID(task.GetClid()), task.GetCurrentMinEquivPatchset())
q := datastore.NewQuery(tryjob.TryjobKind).Gte("CLPatchsets", atOrAfter).Lt("CLPatchsets", before)
toCancel := make([]*tryjob.Tryjob, 0)
// The task should be safe to re-run even if it makes partial progress.
// If the cancellation succeeded but the entity was not successfully
// updated, on the next run the cancellation should be a no-op and the
// entity update will be tried again.
// Any tryjobs that are successfully cancelled and their entities updated
// will not appear in subsequent runs of the query, ensuring progress.
dsErr := datastore.Run(ctx, q, func(tj *tryjob.Tryjob) error {
if tj.IsEnded() || !tj.TriggeredByCV || tj.Definition.SkipStaleCheck {
return nil
}
switch ended, err := c.allWatchingRunsEnded(ctx, tj); {
case err != nil:
return err
// External ID is required to cancel tryjob.
// It may be unset for a few reasons:
// - The task triggering the tryjob is stuck.
// - The transaction saving the external id got rolled back and is
// waiting to retry.
// - RM has a bug.
case ended && tj.ExternalID != "":
toCancel = append(toCancel, tj)
}
return nil
})
switch {
case len(toCancel) == 0:
return dsErr
case dsErr != nil:
logging.Warningf(ctx, "the query to fetch stale tryjobs returned an error along with partial results, will continue to cancel the returned tryjobs first")
fallthrough
default:
if err := c.cancelAll(ctx, toCancel); err != nil {
return err
}
return dsErr
}
}
func (c *Cancellator) cancelAll(ctx context.Context, tjs []*tryjob.Tryjob) error {
if len(tjs) == 0 {
return nil
}
errs := parallel.WorkPool(min(8, len(tjs)), func(work chan<- func() error) {
for _, tj := range tjs {
tj := tj
work <- func() error {
be, err := c.backendFor(tj)
if err != nil {
return err
}
// TODO(crbug/1308930): use Buildbucket's batch API to reduce
// number of RPCs.
err = be.CancelTryjob(ctx, tj)
if err != nil {
return err
}
return datastore.RunInTransaction(ctx, func(ctx context.Context) error {
if err := datastore.Get(ctx, tj); err != nil {
return err
}
if tj.IsEnded() {
return nil
}
tj.Status = tryjob.Status_CANCELLED
tj.EVersion++
tj.EntityUpdateTime = datastore.RoundTime(clock.Now(ctx).UTC())
return datastore.Put(ctx, tj)
}, nil)
}
}
})
return common.MostSevereError(errs)
}
// cancellatorBackend is implemented by tryjobs backends, e.g. buildbucket.
type cancellatorBackend interface {
// Kind identifies the backend
//
// It's also the first part of the Tryjob's ExternalID, e.g. "buildbucket".
// Must not contain a slash.
Kind() string
// CancelTryjob should cancel the tryjob given.
//
// MUST not modify the given Tryjob object.
// If the tryjob was already cancelled, it should not return an error.
CancelTryjob(ctx context.Context, tj *tryjob.Tryjob) error
}
// allWatchingRunsEnded checks if all of the runs that are watching the given
// tryjob have ended.
func (c *Cancellator) allWatchingRunsEnded(ctx context.Context, tryjob *tryjob.Tryjob) (bool, error) {
runIds := tryjob.AllWatchingRuns()
runs, errs := run.LoadRunsFromIDs(runIds...).Do(ctx)
for i, r := range runs {
switch {
case errs[i] == datastore.ErrNoSuchEntity:
return false, errors.Reason("Tryjob %s is associated with a non-existent Run %s", tryjob.ExternalID, runIds[i]).Err()
case errs[i] != nil:
return false, errors.Annotate(errs[i], "failed to load run %s", runIds[i]).Err()
case !run.IsEnded(r.Status):
return false, nil
}
}
return true, nil
}
func (c *Cancellator) backendFor(t *tryjob.Tryjob) (cancellatorBackend, error) {
kind, err := t.ExternalID.Kind()
if err != nil {
return nil, err
}
c.rwmutex.RLock()
defer c.rwmutex.RUnlock()
if b, exists := c.backends[kind]; exists {
return b, nil
}
return nil, errors.Reason("%q backend is not supported", kind).Err()
}
func min(x, y int) int {
if x <= y {
return x
}
return y
}