blob: 395735ce5683f9b84731e5b718a8a4f49da86efc [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package triager
import (
// stageNewRuns returns Run Creators for immediate Run creation or the earliest
// time for the next Run to be created.
// Guarantees that returned Run Creators are CL-wise disjoint, and thus can be
// created totally independently.
// In exceptional cases, also marks some CLs for purging if their trigger
// matches the existing finalized Run.
func stageNewRuns(ctx context.Context, c *prjpb.Component, cls map[int64]*clInfo, pm pmState) ([]*runcreator.Creator, time.Time, error) {
var next time.Time
var out []*runcreator.Creator
rs := runStage{
pm: pm,
c: c,
cls: cls,
visitedCLs: make(map[int64]struct{}, len(cls)),
// For determinism, iterate in fixed order:
for _, clid := range c.GetClids() {
info := cls[clid]
switch rc, nt, err := rs.stageNewRunsFrom(ctx, clid, info); {
case err != nil:
return nil, time.Time{}, err
case rc != nil:
out = append(out, rc)
next = earliest(next, nt)
return out, next, nil
type runStage struct {
// immutable
pm pmState
c *prjpb.Component
cls map[int64]*clInfo
// mutable
// visitedCLs tracks CLs already considered. Ensures that 1 CL can appear in
// at most 1 new Run.
visitedCLs map[int64]struct{}
// cachedReverseDeps maps clid to clids of CLs which depend on it.
// lazily-initialized.
cachedReverseDeps map[int64][]int64
func (rs *runStage) stageNewRunsFrom(ctx context.Context, clid int64, info *clInfo) (*runcreator.Creator, time.Time, error) {
// Only start with ready CLs. Non-ready ones can't form new Runs anyway.
if !info.ready {
return nil, time.Time{}, nil
if !rs.markVisited(clid) {
return nil, time.Time{}, nil
combo := combo{}
cgIndex := info.pcl.GetConfigGroupIndexes()[0]
cg :=
if cg.Content.GetCombineCls() != nil {
// Maximize Run's CL # to include not only all reachable dependencies, but
// also reachable dependents, recursively.
rs.expandComboVisited(info, &combo)
// Shall the decision be delayed?
delay := cg.Content.GetCombineCls().GetStabilizationDelay().AsDuration()
if next := combo.maxTriggeredTime.Add(delay); next.After(clock.Now(ctx)) {
return nil, next, nil
if combo.withNotYetLoadedDeps != nil {
return rs.postponeDueToNotYetLoadedDeps(ctx, &combo)
if len(combo.notReady) > 0 {
return rs.postponeDueToNotReadyCLs(ctx, &combo)
// At this point all CLs in combo are stable, ready and with valid deps.
if cg.Content.GetCombineCls() != nil {
// For multi-CL runs, this means all non-submitted deps are already inside
// combo.
if missing := combo.missingDeps(); len(missing) > 0 {
panic(fmt.Errorf("%s has missing deps %s", combo, missing))
// Furthermore, since all CLs are ready and related, they must belong to
// the exact same config group as the initial CL.
if cgIndexes := combo.configGroupsIndexes(); len(cgIndexes) > 1 {
panic(fmt.Errorf("%s has >1 config groups: %v", combo, cgIndexes))
// Check whether combo overlaps with any existing Runs.
// TODO(tandrii): support >1 concurrent Run on the same CL(s).
if runs := combo.overlappingRuns(); len(runs) > 0 {
for runIndex, sharedCLsCount := range runs { // take the first and only Run
prun := rs.c.GetPruns()[runIndex]
switch l := len(prun.GetClids()); {
case l < sharedCLsCount:
case l > sharedCLsCount:
// Run's scope is larger or different than this combo. Run Manager will
// soon be finalizing the Run as not all of its CLs are triggered.
// This may happen in a many cases of multi-CL Runs, for example:
// * during submitted: some CLs have already been submitted;
// * during cancellation: some CLs' votes have already been removed;
// * a newly ingested LUCI project config splits Run across multiple
// ConfigGroups or even makes one CL unwatched by the project.
return rs.postponeDueToExistingRunDiffScope(ctx, &combo, prun)
case sharedCLsCount == len(combo.all):
// The combo scope is exactly the same as Run. This is the most likely
// situation -- there is nothing for PM to do but wait.
// Note, that it's possible that Run's mode is different from the combo,
// in which case Run Manager will be finalizing the Run soon, so wait
// for notification from Run Manager anyway.
return nil, time.Time{}, nil
case sharedCLsCount > len(combo.all):
// The combo scope is larger than this Run.
// CQDaemon in this case aborts existing Run **without** removing the
// triggering CQ votes, and then immediately starts working on the
// larger-scoped Run. This isn't what user usually want though, as this
// usually means re-running all the tryjobs from scratch.
// TODO(tandrii): decide if it's OK to just purge a CL which isn't in
// active Run once CV is in charge AND just waiting if CV isn't in
// charge. This is definitely easier to implement.
// However, the problem with this potential approach is that if user
// really wants to stop existing Run of N CLs and start a larger Run on
// N+1 CLs instead, then user has to first remove all existing CQ votes,
// and then re-vote on all CLs from scratch. Worse, during the removal,
// CV/CQDaemon may temporarily see CQ votes on K < N CLs, and since
// these CQ votes are >> stabilization delay, CV/CQDaemon will happily
// start a spurious Run on K CLs, and even potentially trigger redundant
// tryjobs, which won't even be cancelled. Grrr.
// TODO(tandrii): alternatively, consider canceling the existing Run,
// similar to CQDaemon.
return rs.postponeExpandingExistingRunScope(ctx, &combo, prun)
rc, err := rs.makeCreator(ctx, &combo, cg)
if err != nil {
return nil, time.Time{}, err
// Check if Run about to be created already exists in order to detect avoid
// infinite retries if CL triggers are somehow re-used.
existing := run.Run{ID: rc.ExpectedRunID()}
switch err := datastore.Get(ctx, &existing); {
case err == datastore.ErrNoSuchEntity:
// This is the expected case.
// NOTE: actual creation may still fail due to a race, and that's fine.
return rc, time.Time{}, nil
case err != nil:
return nil, time.Time{}, errors.Annotate(err, "failed to check for existing Run %q", existing.ID).Tag(transient.Tag).Err()
case !run.IsEnded(existing.Status):
// The Run already exists. Most likely another triager called from another
// TQ was first. Check again in a few seconds, at which point PM should
// incorporate existing Run into its state.
logging.Warningf(ctx, "Run %q already exists. If this warning persists, there is a bug in PM which appears to not see this Run", existing.ID)
return nil, clock.Now(ctx).Add(5 * time.Second), nil
since := clock.Since(ctx, existing.EndTime)
if since < time.Minute {
logging.Warningf(ctx, "Recently finalized Run %q already exists, will check later", existing.ID)
return nil, existing.EndTime.Add(time.Minute), nil
logging.Warningf(ctx, "Run %q already exists, finalized %s ago; will purge CLs with reused triggers", existing.ID, since)
for _, info := range combo.all {
info.purgeReasons = append(info.purgeReasons, &changelist.CLError{
Kind: &changelist.CLError_ReusedTrigger_{
ReusedTrigger: &changelist.CLError_ReusedTrigger{
Run: string(existing.ID),
return nil, time.Time{}, nil
func (rs *runStage) reverseDeps() map[int64][]int64 {
if rs.cachedReverseDeps != nil {
return rs.cachedReverseDeps
rs.cachedReverseDeps = map[int64][]int64{}
for clid, info := range rs.cls {
if info.deps == nil {
// CL is or will be purged, so its deps weren't even triaged.
info.deps.iterateNotSubmitted(info.pcl, func(dep *changelist.Dep) {
did := dep.GetClid()
rs.cachedReverseDeps[did] = append(rs.cachedReverseDeps[did], clid)
return rs.cachedReverseDeps
func (rs *runStage) expandComboVisited(info *clInfo, result *combo) {
if info.deps != nil {
info.deps.iterateNotSubmitted(info.pcl, func(dep *changelist.Dep) {
rs.expandCombo(dep.GetClid(), result)
for _, clid := range rs.reverseDeps()[info.pcl.GetClid()] {
rs.expandCombo(clid, result)
func (rs *runStage) expandCombo(clid int64, result *combo) {
info := rs.cls[clid]
if info == nil {
// Can only happen if clid is a dep that's not yet loaded (otherwise dep
// would be in this component, and hence info would be set).
if !rs.markVisited(clid) {
rs.expandComboVisited(info, result)
func (rs *runStage) postponeDueToNotYetLoadedDeps(ctx context.Context, combo *combo) (*runcreator.Creator, time.Time, error) {
// TODO(crbug/1211576): this waiting can last forever. Component needs to
// record how long it has been waiting and abort with clear message to the
// user.
logging.Warningf(ctx, "%s waits for not yet loaded deps", combo)
return nil, time.Time{}, nil
func (rs *runStage) postponeDueToNotReadyCLs(ctx context.Context, combo *combo) (*runcreator.Creator, time.Time, error) {
// TODO(crbug/1211576): for safety, this should not wait forever.
logging.Warningf(ctx, "%s waits for not yet ready CLs", combo)
return nil, time.Time{}, nil
func (rs *runStage) postponeDueToExistingRunDiffScope(ctx context.Context, combo *combo, r *prjpb.PRun) (*runcreator.Creator, time.Time, error) {
// TODO(crbug/1211576): for safety, this should not wait forever.
logging.Warningf(ctx, "%s is waiting for a differently scoped run %q to finish", combo, r.GetId())
return nil, time.Time{}, nil
func (rs *runStage) postponeExpandingExistingRunScope(ctx context.Context, combo *combo, r *prjpb.PRun) (*runcreator.Creator, time.Time, error) {
// TODO(crbug/1211576): for safety, this should not wait forever.
logging.Warningf(ctx, "%s is waiting for smaller scoped run %q to finish", combo, r.GetId())
return nil, time.Time{}, nil
func (rs *runStage) makeCreator(ctx context.Context, combo *combo, cg *prjcfg.ConfigGroup) (*runcreator.Creator, error) {
latestIndex := -1
cls := make([]*changelist.CL, len(combo.all))
for i, info := range combo.all {
cls[i] = &changelist.CL{ID: common.CLID(info.pcl.GetClid())}
if info == combo.latestTriggeredByCQVote {
latestIndex = i
if err := datastore.Get(ctx, cls); err != nil {
// Even if one of errors is ErrEntityNotFound, this is a temporary situation as
// such CL(s) should be removed from PM state soon.
return nil, errors.Annotate(err, "failed to load CLs").Tag(transient.Tag).Err()
// Run's owner is whoever owns the latest triggered CL.
// It's guaranteed to be set because otherwise CL would have been sent for
// purging and not marked as ready.
owner, err := cls[latestIndex].Snapshot.OwnerIdentity()
if err != nil {
return nil, errors.Annotate(err, "failed to get OwnerIdentity of %d", cls[latestIndex].ID).Err()
bcls := make([]runcreator.CL, len(cls))
var opts *run.Options
for i, cl := range cls {
pcl := combo.all[i].pcl
exp, act := pcl.GetEversion(), cl.EVersion
if exp != act {
return nil, errors.Annotate(itriager.ErrOutdatedPMState, "CL %d EVersion changed %d => %d", cl.ID, exp, act).Err()
opts = run.MergeOptions(opts, run.ExtractOptions(cl.Snapshot))
// Restore email, which Project Manager doesn't track inside PCLs.
tr := trigger.Find(cl.Snapshot.GetGerrit().GetInfo(), cg.Content).GetCqVoteTrigger()
if tr.GetMode() != pcl.GetTriggers().GetCqVoteTrigger().GetMode() {
panic(fmt.Errorf("inconsistent Trigger in PM (%s) vs freshly extracted (%s)", pcl.GetTriggers().GetCqVoteTrigger(), tr))
bcls[i] = runcreator.CL{
ID: common.CLID(pcl.GetClid()),
ExpectedEVersion: pcl.GetEversion(),
TriggerInfo: tr,
Snapshot: cl.Snapshot,
cqTrigger := combo.latestTriggeredByCQVote.pcl.GetTriggers().GetCqVoteTrigger()
return &runcreator.Creator{
ConfigGroupID: cg.ID,
LUCIProject: cg.ProjectString(),
Mode: run.Mode(cqTrigger.GetMode()),
CreateTime: cqTrigger.GetTime().AsTime(),
Owner: owner,
Options: opts,
ExpectedIncompleteRunIDs: nil, // no Run is expected
OperationID: fmt.Sprintf("PM-%d", mathrand.Int63(ctx)),
InputCLs: bcls,
}, nil
// markVisited makes CL visited if not already and returns if action was taken.
func (rs *runStage) markVisited(clid int64) bool {
if _, visited := rs.visitedCLs[clid]; visited {
return false
rs.visitedCLs[clid] = struct{}{}
return true
// combo is a set of related CLs that will together form a new Run.
// The CLs in a combo are a subset of those from the component.
type combo struct {
all []*clInfo
clids map[int64]struct{}
notReady []*clInfo
withNotYetLoadedDeps *clInfo // nil if none; any one otherwise.
latestTriggeredByCQVote *clInfo
maxTriggeredTime time.Time
func (c combo) String() string {
sb := strings.Builder{}
sb.WriteString("combo(CLIDs: [")
for _, a := range c.all {
fmt.Fprintf(&sb, "%d ", a.pcl.GetClid())
if len(c.notReady) > 0 {
sb.WriteString(" notReady=[")
for _, a := range c.notReady {
fmt.Fprintf(&sb, "%d ", a.pcl.GetClid())
if c.withNotYetLoadedDeps != nil {
fmt.Fprintf(&sb, " notYetLoadedDeps of %d [", c.withNotYetLoadedDeps.pcl.GetClid())
for _, d := range c.withNotYetLoadedDeps.deps.notYetLoaded {
fmt.Fprintf(&sb, "%d ", d.GetClid())
if c.latestTriggeredByCQVote != nil {
t := c.latestTriggeredByCQVote.pcl.GetTriggers().GetCqVoteTrigger()
fmt.Fprintf(&sb, " latestTriggered=%d at %s", c.latestTriggeredByCQVote.pcl.GetClid(), t.GetTime().AsTime())
return sb.String()
func (c *combo) add(info *clInfo) {
c.all = append(c.all, info)
if c.clids == nil {
c.clids = map[int64]struct{}{info.pcl.GetClid(): {}}
} else {
c.clids[info.pcl.GetClid()] = struct{}{}
if !info.ready {
c.notReady = append(c.notReady, info)
if info.deps != nil && len(info.deps.notYetLoaded) > 0 {
c.withNotYetLoadedDeps = info
if pb := info.pcl.GetTriggers().GetCqVoteTrigger().GetTime(); pb != nil {
t := pb.AsTime()
if c.maxTriggeredTime.IsZero() || t.After(c.maxTriggeredTime) {
c.maxTriggeredTime = t
c.latestTriggeredByCQVote = info
func (c *combo) missingDeps() []*changelist.Dep {
var missing []*changelist.Dep
for _, info := range c.all {
info.deps.iterateNotSubmitted(info.pcl, func(dep *changelist.Dep) {
if _, in := c.clids[dep.GetClid()]; !in {
missing = append(missing, dep)
return missing
func (c *combo) configGroupsIndexes() []int32 {
res := make([]int32, 0, 1)
for _, info := range c.all {
idx := info.pcl.GetConfigGroupIndexes()[0]
found := false
for _, v := range res {
if v == idx {
found = true
if !found {
res = append(res, idx)
return res
// overlappingRuns returns number of CLs shared with each Run identified by its
// index.
func (c *combo) overlappingRuns() map[int32]int {
res := map[int32]int{}
for _, info := range c.all {
for _, index := range info.runIndexes {
return res