blob: 0fc082262187bd6577f4ba00a884dfb4b6c11e02 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tryjob
import (
"context"
"fmt"
"strings"
"sync"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/cv/internal/common"
)
const TaskClass = "update-tryjob"
type updaterBackend interface {
// Kind identifies the backend.
//
// It's also the first part of the Tryjob's ExternalID, e.g. "buildbucket".
// Must not contain a slash.
Kind() string
// Update should fetch the Tryjob given the current entity in Datastore.
//
// MUST not modify the given Tryjob object.
Update(ctx context.Context, saved *Tryjob) (Status, *Result, error)
}
// rmNotifier abstracts out Run Manager notifier.
type rmNotifier interface {
NotifyTryjobsUpdated(context.Context, common.RunID, *TryjobUpdatedEvents) error
}
// Updater knows how to update Tryjobs, notifying other CV parts as needed.
type Updater struct {
tqd *tq.Dispatcher
rmNotifier rmNotifier
rwmutex sync.RWMutex // guards `backends`
backends map[string]updaterBackend
}
// NewUpdater creates a new Updater.
//
// Starts without backends, but they should be added via RegisterBackend().
func NewUpdater(tqd *tq.Dispatcher, rm rmNotifier) *Updater {
u := &Updater{
tqd: tqd,
rmNotifier: rm,
backends: make(map[string]updaterBackend, 1),
}
u.tqd.RegisterTaskClass(tq.TaskClass{
ID: TaskClass,
Prototype: &UpdateTryjobTask{},
Queue: "update-tryjob",
Kind: tq.FollowsContext,
Handler: func(ctx context.Context, payload proto.Message) error {
return common.TQifyError(ctx, u.handleTask(ctx, payload.(*UpdateTryjobTask)))
},
})
return u
}
// RegisterBackend registers a backend.
//
// Panics if backend for the same kind is already registered.
func (u *Updater) RegisterBackend(b updaterBackend) {
kind := b.Kind()
if strings.ContainsRune(kind, '/') {
panic(fmt.Errorf("backend %T of kind %q must not contain '/'", b, kind))
}
u.rwmutex.Lock()
defer u.rwmutex.Unlock()
if _, exists := u.backends[kind]; exists {
panic(fmt.Errorf("backend %q is already registered", kind))
}
u.backends[kind] = b
}
// Schedule dispatches a TQ task to update the given tryjob.
//
// At least one ID must be given.
func (u *Updater) Schedule(ctx context.Context, id common.TryjobID, eid ExternalID) error {
if id == 0 && eid == "" {
return errors.New("At least one of the tryjob's IDs must be given.")
}
// id will be set, but eid may not be. In such case, it's up to the task to
// resolve it.
return u.tqd.AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("%d/%s", id, eid),
Payload: &UpdateTryjobTask{ExternalId: string(eid), Id: int64(id)},
})
}
func (u *Updater) handleTask(ctx context.Context, task *UpdateTryjobTask) error {
tj := &Tryjob{ID: common.TryjobID(task.Id)}
switch {
case task.GetId() != 0:
switch err := datastore.Get(ctx, tj); {
case err == nil:
if task.GetExternalId() != "" && task.GetExternalId() != string(tj.ExternalID) {
return errors.Reason("the given internal and external ids for the tryjob do not match").Err()
}
case err == datastore.ErrNoSuchEntity:
return errors.Annotate(err, "unknown Tryjob with id %d", task.Id).Err()
default:
return errors.Annotate(err, "loading Tryjob with id %d", task.Id).Tag(transient.Tag).Err()
}
case task.GetExternalId() != "":
var err error
switch tj, err = ExternalID(task.ExternalId).Load(ctx); {
case err != nil:
return errors.Annotate(err, "loading Tryjob with ExternalID %s", task.ExternalId).Tag(transient.Tag).Err()
case tj == nil:
return errors.Reason("unknown Tryjob with ExternalID %s", task.ExternalId).Err()
}
default:
return errors.Reason("expected at least one of {Id, ExternalId} in %+v", task).Err()
}
loadedEVer := tj.EVersion
backend, err := u.backendFor(tj)
if err != nil {
return errors.Annotate(err, "resolving backend for %v", tj).Err()
}
status, result, err := backend.Update(ctx, tj)
switch {
case err != nil:
return errors.Annotate(err, "reading status and result from %q", tj.ExternalID).Err()
case status == tj.Status && proto.Equal(tj.Result, result):
return nil
}
// Capture the error that may cause the transaction to commit, and any
// relevant tags.
var innerErr error
err = datastore.RunInTransaction(ctx, func(ctx context.Context) (err error) {
defer func() {
innerErr = err
}()
tj = &Tryjob{ID: common.TryjobID(tj.ID)}
if err := datastore.Get(ctx, tj); err != nil {
return errors.Annotate(err, "failed to load Tryjob %d", tj.ID).Tag(transient.Tag).Err()
}
if loadedEVer != tj.EVersion {
// A parallel task must have already updated this Tryjob; retry.
return errors.Reason("the tryjob data has changed").Tag(transient.Tag).Err()
}
tj.EntityUpdateTime = datastore.RoundTime(clock.Now(ctx).UTC())
tj.EVersion++
tj.Status = status
tj.Result = result
if err := datastore.Put(ctx, tj); err != nil {
return errors.Annotate(err, "failed to save Tryjob %d", tj.ID).Tag(transient.Tag).Err()
}
for _, run := range tj.AllWatchingRuns() {
if err := u.rmNotifier.NotifyTryjobsUpdated(
ctx, run, &TryjobUpdatedEvents{Events: []*TryjobUpdatedEvent{{TryjobId: int64(tj.ID)}}},
); err != nil {
return err
}
}
return nil
}, nil)
switch {
case innerErr != nil:
return innerErr
case err != nil:
return errors.Annotate(err, "failed to commit transaction updating tryjob %v", tj.ExternalID).Tag(transient.Tag).Err()
}
return nil
}
func (u *Updater) backendFor(t *Tryjob) (updaterBackend, error) {
kind, err := t.ExternalID.Kind()
if err != nil {
return nil, err
}
u.rwmutex.RLock()
defer u.rwmutex.RUnlock()
if b, exists := u.backends[kind]; exists {
return b, nil
}
return nil, errors.Reason("%q backend is not supported", kind).Err()
}