blob: 1bc4b87e577e3e8234575807396543ca8b6f179f [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migration
import (
"context"
"strings"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/gae/service/datastore"
cvbqpb "go.chromium.org/luci/cv/api/bigquery/v1"
migrationpb "go.chromium.org/luci/cv/api/migration"
"go.chromium.org/luci/cv/internal/changelist"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/gerrit/trigger"
"go.chromium.org/luci/cv/internal/run"
)
func fetchActiveRuns(ctx context.Context, project string) ([]*migrationpb.ActiveRun, error) {
runs, err := fetchRunsWithStatus(ctx, project, run.Status_RUNNING)
switch {
case err != nil:
return nil, err
case len(runs) == 0:
return nil, nil
}
// Remove runs with corresponding VerifiedCQDRun entities or those being
// cancelled.
runs, err = pruneInactiveRuns(ctx, runs)
switch {
case err != nil:
return nil, err
case len(runs) == 0:
return nil, nil
}
// Load all RunCLs and populate ActiveRuns concurrently, but leave FyiDeps
// computation for later, since these can't be filled from RunCLs anyway.
ret := make([]*migrationpb.ActiveRun, len(runs))
err = parallel.WorkPool(min(len(runs), 32), func(workCh chan<- func() error) {
for i, r := range runs {
i, r := i, r
workCh <- func() error {
var err error
ret[i], err = makeActiveRun(ctx, r)
return err
}
}
})
if err != nil {
return nil, common.MostSevereError(err)
}
// Finally, process all FyiDeps at once.
cls := map[common.CLID]*changelist.CL{}
for _, r := range ret {
for _, d := range r.GetFyiDeps() {
clid := common.CLID(d.GetId())
if _, exists := cls[clid]; !exists {
cls[clid] = &changelist.CL{ID: clid}
}
}
}
if len(cls) == 0 {
return ret, nil
}
if _, err := changelist.LoadCLsMap(ctx, cls); err != nil {
return nil, err
}
for _, r := range ret {
for _, d := range r.GetFyiDeps() {
cl := cls[common.CLID(d.GetId())]
d.Gc = &cvbqpb.GerritChange{
Host: cl.Snapshot.GetGerrit().GetHost(),
Project: cl.Snapshot.GetGerrit().GetInfo().GetProject(),
Change: cl.Snapshot.GetGerrit().GetInfo().GetNumber(),
Patchset: int64(cl.Snapshot.GetPatchset()),
EarliestEquivalentPatchset: int64(cl.Snapshot.GetMinEquivalentPatchset()),
}
d.Files = cl.Snapshot.GetGerrit().GetFiles()
d.Info = cl.Snapshot.GetGerrit().GetInfo()
}
}
return ret, nil
}
// makeActiveRun makes ActiveRun except for filling FYI deps with details,
// which is done later for all Runs in order to de-dupe common FYI deps.
// This is especially helpful for large CL stacks in single-CL Run mode.
func makeActiveRun(ctx context.Context, r *run.Run) (*migrationpb.ActiveRun, error) {
runCLs, err := run.LoadRunCLs(ctx, r.ID, r.CLs)
if err != nil {
return nil, err
}
known := make(common.CLIDsSet, len(r.CLs))
allDeps := common.CLIDsSet{}
mcls := make([]*migrationpb.RunCL, len(runCLs))
for i, cl := range runCLs {
known.Add(cl.ID)
trigger := &migrationpb.RunCL_Trigger{
Email: cl.Trigger.GetEmail(),
Time: cl.Trigger.GetTime(),
AccountId: cl.Trigger.GetGerritAccountId(),
}
mcl := &migrationpb.RunCL{
Id: int64(cl.ID),
Gc: &cvbqpb.GerritChange{
Host: cl.Detail.GetGerrit().GetHost(),
Project: cl.Detail.GetGerrit().GetInfo().GetProject(),
Change: cl.Detail.GetGerrit().GetInfo().GetNumber(),
Patchset: int64(cl.Detail.GetPatchset()),
EarliestEquivalentPatchset: int64(cl.Detail.GetMinEquivalentPatchset()),
Mode: r.Mode.BQAttemptMode(),
},
Files: cl.Detail.GetGerrit().GetFiles(),
Info: cl.Detail.GetGerrit().GetInfo(),
Trigger: trigger,
Deps: make([]*migrationpb.RunCL_Dep, len(cl.Detail.GetDeps())),
}
for i, dep := range cl.Detail.GetDeps() {
allDeps.AddI64(dep.GetClid())
mcl.Deps[i] = &migrationpb.RunCL_Dep{
Id: dep.GetClid(),
}
if dep.GetKind() == changelist.DepKind_HARD {
mcl.Deps[i].Hard = true
}
}
mcls[i] = mcl
}
var fyiDeps []*migrationpb.RunCL
for clid := range allDeps {
if known.Has(clid) {
continue
}
fyiDeps = append(fyiDeps, &migrationpb.RunCL{Id: int64(clid)})
}
return &migrationpb.ActiveRun{
Id: string(r.ID),
Cls: mcls,
FyiDeps: fyiDeps,
}, nil
}
func fetchRunsWithStatus(ctx context.Context, project string, status run.Status) ([]*run.Run, error) {
var runs []*run.Run
q := run.NewQueryWithLUCIProject(ctx, project).Eq("Status", status)
if err := datastore.GetAll(ctx, q, &runs); err != nil {
return nil, errors.Annotate(err, "failed to fetch Run entities").Tag(transient.Tag).Err()
}
return runs, nil
}
// fetchAttempt loads Run from Datastore given its CQD attempt key hash.
//
// Returns nil, nil if such Run doesn't exist.
func fetchAttempt(ctx context.Context, key string) (*run.Run, error) {
q := datastore.NewQuery(run.RunKind).Eq("CQDAttemptKey", key)
var out []*run.Run
if err := datastore.GetAll(ctx, q, &out); err != nil {
return nil, errors.Annotate(err, "failed to fetch Run with CQDAttemptKeyHash=%q", key).Tag(transient.Tag).Err()
}
switch l := len(out); l {
case 0:
return nil, nil
case 1:
return out[0], nil
default:
sb := strings.Builder{}
for _, r := range out {
sb.WriteRune(' ')
sb.WriteString(string(r.ID))
}
logging.Errorf(ctx, "Found %d Runs with CQDAttemptKeyHash=%q: [%s]", l, key, sb.String())
// To unblock CQDaemon, choose the latest Run, which given ID generation
// scheme must be the first in the output.
return out[0], nil
}
}
// fetchRun loads Run from Datastore by ID if given, falling back to CQD attempt
// key hash otherwise.
//
// Returns nil, nil if such Run doesn't exist.
func fetchRun(ctx context.Context, id common.RunID, attemptKey string) (*run.Run, error) {
if id == "" {
return fetchAttempt(ctx, attemptKey)
}
res := &run.Run{ID: id}
switch err := datastore.Get(ctx, res); {
case err == datastore.ErrNoSuchEntity:
return nil, nil
case err != nil:
return nil, errors.Annotate(err, "failed to fetch Run entity").Tag(transient.Tag).Err()
default:
return res, nil
}
}
// VerifiedCQDRun is the Run reported by CQDaemon after verification completes.
type VerifiedCQDRun struct {
_kind string `gae:"$kind,migration.VerifiedCQDRun"`
// ID is ID of this Run in CV.
ID common.RunID `gae:"$id"`
// Payload is what CQDaemon has reported.
Payload *migrationpb.ReportVerifiedRunRequest
// RecordTime is when this entity was inserted.
UpdateTime time.Time `gae:",noindex"`
}
func saveVerifiedCQDRun(ctx context.Context, req *migrationpb.ReportVerifiedRunRequest, notify func(context.Context) error) error {
runID := common.RunID(req.GetRun().GetId())
req.GetRun().Id = "" // will be stored as VerifiedCQDRun.ID
try := 0
err := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
try++
v := VerifiedCQDRun{ID: runID}
switch err := datastore.Get(ctx, &v); {
case err == datastore.ErrNoSuchEntity:
// expected.
case err != nil:
return err
default:
// Do not overwrite existing one, since CV must be already finalizing it.
logging.Warningf(ctx, "VerifiedCQDRun %q in %d-th try: already exists", runID, try)
return nil
}
v = VerifiedCQDRun{
ID: runID,
UpdateTime: datastore.RoundTime(clock.Now(ctx).UTC()),
Payload: req,
}
if err := datastore.Put(ctx, &v); err != nil {
return err
}
return notify(ctx)
}, nil)
return errors.Annotate(err, "failed to record VerifiedCQDRun %q after %d tries", runID, try).Tag(transient.Tag).Err()
}
// pruneInactiveRuns removes Runs for which VerifiedCQDRun have already been
// written or iff the run is about to be cancelled.
//
// Modifies the Runs slice in place, but also returns it for readability.
func pruneInactiveRuns(ctx context.Context, in []*run.Run) ([]*run.Run, error) {
out := in[:0]
keys := make([]*datastore.Key, len(in))
for i, r := range in {
keys[i] = datastore.MakeKey(ctx, "migration.VerifiedCQDRun", string(r.ID))
}
exists, err := datastore.Exists(ctx, keys)
if err != nil {
return nil, errors.Annotate(err, "failed to check VerifiedCQDRun existence").Tag(transient.Tag).Err()
}
for i, r := range in {
if !exists.Get(0, i) {
out = append(out, r)
}
}
return out, nil
}
// FinishedCQDRun contains info about a finished Run reported by the CQDaemon.
//
// To be removed after the first milestone is reached.
//
// TODO(crbug/1227523): wipe all such entities.
type FinishedCQDRun struct {
_kind string `gae:"$kind,migration.FinishedCQDRun"`
// AttemptKey is the CQD ID of the Run.
//
// Once CV starts creating Runs, the CV's Run for the same Run will contain
// the AttemptKey as a substring.
AttemptKey string `gae:"$id"`
// RunID may be set if CQD's Attempt has corresponding CV Run at the time of
// saving of this entity.
//
// Although the CV RunID is also stored in the Payload, a separate field is
// necessary for Datastore indexing.
RunID common.RunID
// RecordTime is when this entity was inserted.
UpdateTime time.Time `gae:",noindex"`
// Everything that CQD has sent.
Payload *migrationpb.ReportedRun
}
// LoadFinishedCQDRun loads from Datastore a FinishedCQDRun.
//
// Expects exactly 1 FinishedCQDRun to exist.
func LoadFinishedCQDRun(ctx context.Context, rid common.RunID) (*FinishedCQDRun, error) {
var frs []*FinishedCQDRun
q := datastore.NewQuery("migration.FinishedCQDRun").Eq("RunID", rid).Limit(2)
switch err := datastore.GetAll(ctx, q, &frs); {
case err != nil:
return nil, errors.Annotate(err, "failed to fetch FinishedCQDRun").Tag(transient.Tag).Err()
case len(frs) == 1:
return frs[0], nil
// 2 checks below are defensive coding: neither is supposed to happen in
// practice unless the way Attempt key is generated differs between CV and
// CQDaemon.
case len(frs) > 1:
return nil, errors.Reason(">1 FinishedCQDRun for Run %s", rid).Err()
default:
return nil, errors.Reason("no FinishedCQDRun for Run %s", rid).Err()
}
}
// LoadUnclaimedFinishedCQDRun returns a FinishedCQDRun with matching attemptKey
// and not associatd with a Run or nil if such an entity doesn't exist.
func LoadUnclaimedFinishedCQDRun(ctx context.Context, attemptKey string) (*FinishedCQDRun, error) {
f := &FinishedCQDRun{AttemptKey: attemptKey}
switch err := datastore.Get(ctx, f); {
case err == datastore.ErrNoSuchEntity:
return nil, nil
case err != nil:
return nil, errors.Annotate(err, "failed to load FinishedCQDRun for %q", attemptKey).Tag(transient.Tag).Err()
case f.RunID == "":
logging.Warningf(ctx, "Detected previously unclaimed FinishedCQDRun %q", attemptKey)
return f, nil
default:
return nil, nil
}
}
// ClaimFinishedCQRun associates a FinishedCQDRun with a Run.
func ClaimFinishedCQRun(ctx context.Context, fr *FinishedCQDRun, rid common.RunID) error {
if fr.RunID != "" {
return errors.Reason("given FinishedCQDRun must not be assocaited with a Run yet, but it is %q", fr.RunID).Err()
}
var innerErr error
err := datastore.RunInTransaction(ctx, func(ctx context.Context) (err error) {
defer func() { innerErr = err }()
tmp := FinishedCQDRun{AttemptKey: fr.AttemptKey}
switch err := datastore.Get(ctx, &tmp); {
case err != nil:
return errors.Annotate(err, "failed to load FinishedCQDRun for %q", fr.AttemptKey).Tag(transient.Tag).Err()
case tmp.RunID == rid:
return nil // already claimed, probably this is a retry of DS transaction.
case tmp.RunID != "":
return errors.Reason("FinishedCQDRun %q in Datastore is already associated with %q, not %q", fr.AttemptKey, tmp.RunID, rid).Err()
}
fr.RunID = rid
fr.Payload.Id = string(rid)
if err := datastore.Put(ctx, fr); err != nil {
return errors.Annotate(err, "failed to save FinishedCQDRun for %q", fr.AttemptKey).Tag(transient.Tag).Err()
}
return nil
}, nil)
switch {
case innerErr != nil:
return innerErr
case err != nil:
return errors.Annotate(err, "failed to ClaimFinishedCQRun %q for %q", fr.AttemptKey, rid).Tag(transient.Tag).Err()
default:
logging.Warningf(ctx, "ClaimFinishedCQRun %q FinishedCQDRun with Run %q", fr.AttemptKey, rid)
return nil
}
}
// makeGerritSetReviewRequest creates request to post a message to Gerrit at
// CQDaemon's request.
func makeGerritSetReviewRequest(r *run.Run, ci *gerritpb.ChangeInfo, msg, curRevision string, sendEmail bool) *gerritpb.SetReviewRequest {
req := &gerritpb.SetReviewRequest{
Number: ci.GetNumber(),
Project: ci.GetProject(),
RevisionId: curRevision,
Message: msg,
Tag: "autogenerated:cq",
Notify: gerritpb.Notify_NOTIFY_OWNER, // by default
}
switch {
case !sendEmail:
req.Notify = gerritpb.Notify_NOTIFY_NONE
case r.Mode == run.FullRun:
req.Notify = gerritpb.Notify_NOTIFY_OWNER_REVIEWERS
fallthrough
default:
// notify CQ label voters, too.
// This doesn't take into account additional labels, but it's good enough
// during the migration.
var accounts []int64
for _, vote := range ci.GetLabels()[trigger.CQLabelName].GetAll() {
if vote.GetValue() != 0 {
accounts = append(accounts, vote.GetUser().GetAccountId())
}
}
req.NotifyDetails = &gerritpb.NotifyDetails{
Recipients: []*gerritpb.NotifyDetails_Recipient{
{
RecipientType: gerritpb.NotifyDetails_RECIPIENT_TYPE_TO,
Info: &gerritpb.NotifyDetails_Info{
Accounts: accounts,
},
},
},
}
}
return req
}
func min(i, j int) int {
if i < j {
return i
}
return j
}