blob: 1f8ecca39d963be68a1c264101a210211f61c4a4 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package state
import (
const concurrentComponentProcessing = 16
var errCaughtPanic = errors.New("caught panic")
// earliestDecisionTime returns the earliest decision time of all components.
// Returns the same time as time.Time and as proto, and boolean indicating that
// earliestDecisionTime is as soon as possible.
// Re-uses DecisionTime of one of the components, assuming that components are
// modified copy-on-write.
func earliestDecisionTime(cs []*prjpb.Component) (time.Time, *timestamppb.Timestamp, bool) {
var ret time.Time
var retPB *timestamppb.Timestamp
for _, c := range cs {
if c.GetTriageRequired() {
return time.Time{}, nil, true
if dt := c.GetDecisionTime(); dt != nil {
if t := dt.AsTime(); ret.IsZero() || ret.After(t) {
ret = t
retPB = dt
return ret, retPB, false
// cAction is a component action to be taken during current PM mutation.
// An action may involve creating one or more new Runs,
// or removing CQ votes from CL(s) which can't form a Run for some reason.
type cAction struct {
componentIndex int
// runsFailed is modified during actOnComponents.
runsFailed int32
// triageComponents triages components.
// Doesn't modify the state itself.
// Returns:
// * an action per each component that needs acting upon;
// * indication whether the state should be stored for debugging purpose;
// * error, if any.
func (h *Handler) triageComponents(ctx context.Context, s *State) ([]*cAction, bool, error) {
var sup itriager.PMState
sup, err := s.makeTriageSupporter(ctx)
if err != nil {
return nil, false, err
poolSize := min(concurrentComponentProcessing, len(s.PB.GetComponents()))
now := clock.Now(ctx)
var mutex sync.Mutex
var actions []*cAction
poolErr := parallel.WorkPool(poolSize, func(work chan<- func() error) {
for i, oldC := range s.PB.GetComponents() {
i, oldC := i, oldC
if !needsTriage(oldC, now) {
work <- func() error {
switch res, err := h.triageOneComponent(ctx, s, oldC, sup); {
case itriager.IsErrOutdatedPMState(err):
return nil
case err != nil:
// Log error here since only total errs count will be propagated up
// the stack.
level := logging.Error
if transient.Tag.In(err) {
level = logging.Warning
logging.Logf(ctx, level, "%s while processing component: %s", err, protojson.Format(oldC))
return err
actions = append(actions, &cAction{componentIndex: i, Result: res})
return nil
switch merrs, ok := poolErr.(errors.MultiError); {
case poolErr == nil || (ok && len(merrs) == 0):
return actions, false, nil
case !ok:
panic(fmt.Errorf("unexpected return from parallel.WorkPool: %s", poolErr))
case len(actions) > 0:
// Components are independent, so proceed despite errors on some components
// since partial progress is better than none.
logging.Warningf(ctx, "triageComponents: %d errors, but proceeding to act on %d components", len(merrs), len(actions))
return actions, errors.Contains(merrs, errCaughtPanic), nil
err := common.MostSevereError(merrs)
return nil, false, errors.Annotate(err, "failed to triage %d components, keeping the most severe error", len(merrs)).Err()
func needsTriage(c *prjpb.Component, now time.Time) bool {
if c.GetTriageRequired() {
return true // external event arrived
next := c.GetDecisionTime()
if next == nil {
return false // wait for an external event
if next.AsTime().After(now) {
return false // too early
return true
func (h *Handler) triageOneComponent(ctx context.Context, s *State, oldC *prjpb.Component, sup itriager.PMState) (res itriager.Result, err error) {
defer paniccatcher.Catch(func(p *paniccatcher.Panic) {
logging.Errorf(ctx, "caught panic %s:\n\n%s", p.Reason, p.Stack)
// Log as a separate message under debug level to avoid sending it to Cloud
// Error.
logging.Debugf(ctx, "caught panic current state:\n%s", protojson.Format(s.PB))
err = errCaughtPanic
res, err = h.ComponentTriage(ctx, oldC, sup)
if err != nil {
if res.NewValue != nil && res.NewValue == oldC {
panic(fmt.Errorf("new value re-uses prior component object, must use copy-on-write instead"))
if len(res.CLsToPurge) > 0 {
s.validatePurgeCLTasks(oldC, res.CLsToPurge)
// actOnComponents executes actions on components produced by triageComponents.
// Expects the state to already be shallow cloned.
func (h *Handler) actOnComponents(ctx context.Context, s *State, actions []*cAction) (SideEffect, error) {
// First, create Runs in parallel.
// As Run creation may take considerable time, use an earlier deadline to have
// enough time to save state for other components.
ctxRunCreation, cancel := earlierDeadline(ctx, 5*time.Second)
defer cancel()
poolSize := min(concurrentComponentProcessing, len(actions))
runsErr := parallel.WorkPool(poolSize, func(work chan<- func() error) {
for _, action := range actions {
for _, rc := range action.RunsToCreate {
action, rc := action, rc
c := s.PB.GetComponents()[action.componentIndex]
work <- func() error {
err := h.createOneRun(ctxRunCreation, rc, c)
if err != nil {
atomic.AddInt32(&action.runsFailed, 1)
// Log error here since only total errs count will be propagated up
// the stack.
level := logging.Error
if transient.Tag.In(err) {
level = logging.Warning
logging.Logf(ctx, level, "Failed to create a Run in component\n%s\ndue to error: %s", protojson.Format(c), err)
return err
// Keep runsErr for now and try to make progress on actions/components
// without errors.
// Shallow-copy the components slice, as one or more components are highly
// likely to be modified in a loop below.
s.PB.Components = append(([]*prjpb.Component)(nil), s.PB.GetComponents()...)
runsCreated, componentsUpdated := 0, 0
var clsToPurge []*prjpb.PurgeCLTask
for _, action := range actions {
if action.runsFailed > 0 {
runsCreated += len(action.RunsToCreate)
clsToPurge = append(clsToPurge, action.CLsToPurge...)
if action.NewValue != nil {
s.PB.Components[action.componentIndex] = action.NewValue
var sideEffect SideEffect
if len(clsToPurge) > 0 {
sideEffect = h.addCLsToPurge(ctx, s, clsToPurge)
proceedMsg := fmt.Sprintf("proceeding to save %d components and purge %d CLs", componentsUpdated, len(clsToPurge))
// Finally, decide the final result.
switch merrs, ok := runsErr.(errors.MultiError); {
case runsErr == nil || (ok && len(merrs) == 0):
logging.Infof(ctx, "actOnComponents: created %d Runs, %s", runsCreated, proceedMsg)
return sideEffect, nil
case !ok:
panic(fmt.Errorf("unexpected return from parallel.WorkPool"))
logging.Warningf(ctx, "actOnComponents: created %d Runs, failed to create %d Runs", runsCreated, len(merrs))
if componentsUpdated+len(clsToPurge) == 0 {
err := common.MostSevereError(merrs)
return nil, errors.Annotate(err, "failed to actOnComponents, most severe error").Err()
// All actions are independent, so proceed despite the errors since partial
// progress is better than none.
logging.Debugf(ctx, "actOnComponents: %s", proceedMsg)
return sideEffect, nil
func (h *Handler) createOneRun(ctx context.Context, rc *runcreator.Creator, c *prjpb.Component) (err error) {
defer paniccatcher.Catch(func(p *paniccatcher.Panic) {
logging.Errorf(ctx, "caught panic while creating a Run %s\n\n%s", p.Reason, p.Stack)
err = errCaughtPanic
switch _, err = rc.Create(ctx, h.CLMutator, h.PMNotifier, h.RunNotifier); {
case err == nil:
return nil
case runcreator.StateChangedTag.In(err):
// This is a transient error at component action level: on retry, the Triage()
// function will re-evaulate the state.
return transient.Tag.Apply(err)
return err
// validatePurgeCLTasks verifies correctness of tasks from Triage.
// Modifies given tasks in place.
// Panics in case of problems.
func (s *State) validatePurgeCLTasks(c *prjpb.Component, ts []*prjpb.PurgeCLTask) {
// First, verify individual tasks have expected fields set.
m := make(common.CLIDsSet, len(ts))
for _, t := range ts {
id := t.GetPurgingCl().GetClid()
switch {
case id == 0:
panic(fmt.Errorf("clid must be set"))
case m.HasI64(id):
panic(fmt.Errorf("duplicated clid %d", id))
case t.GetReasons() == nil:
panic(fmt.Errorf("at least 1 reason must be given"))
for i, r := range t.GetReasons() {
if r.GetKind() == nil {
panic(fmt.Errorf("Reason #%d is nil", i))
// Verify only CLs not yet purged are being purged.
// NOTE: This iterates all CLs currently being purged, but there should be
// very few such CLs compared to the total number of tracked CLs.
for _, p := range s.PB.GetPurgingCls() {
if m.HasI64(p.GetClid()) {
panic(fmt.Errorf("can't purge %d CL which is already being purged", p.GetClid()))
// Verify only CLs from the component are being purged.
for _, clid := range c.GetClids() {
if len(m) > 0 {
panic(fmt.Errorf("purging %v CLs outside the component", m))
// addCLsToPurge changes PB.PurgingCLs and prepares for atomic creation of TQ
// tasks to do actual purge.
// Expects given tasks to be correct (see validatePurgeCLTasks).
func (h *Handler) addCLsToPurge(ctx context.Context, s *State, ts []*prjpb.PurgeCLTask) SideEffect {
if len(ts) == 0 {
return nil
s.populatePurgeCLTasks(ctx, ts)
purgingCLs := make([]*prjpb.PurgingCL, len(ts))
for i, t := range ts {
purgingCLs[i] = t.GetPurgingCl()
s.PB.PurgingCls, _ = s.PB.COWPurgingCLs(nil, purgingCLs)
return &TriggerPurgeCLTasks{payloads: ts, clPurger: h.CLPurger}
// maxPurgingCLDuration limits the time that a TQ task has to execute
// PurgeCLTask.
const maxPurgingCLDuration = 10 * time.Minute
// populatePurgeCLTasks populates all remaining fields in PurgeCLsTasks created
// by Triage.
// Modifies given tasks in place.
func (s *State) populatePurgeCLTasks(ctx context.Context, ts []*prjpb.PurgeCLTask) {
deadline := timestamppb.New(clock.Now(ctx).Add(maxPurgingCLDuration))
opInt := deadline.AsTime().Unix()
for _, t := range ts {
id := t.GetPurgingCl().GetClid()
pcl := s.PB.GetPcls()[s.pclIndex[common.CLID(id)]]
t.Trigger = pcl.GetTriggers().GetCqVoteTrigger()
t.LuciProject = s.PB.GetLuciProject()
t.PurgingCl.Deadline = deadline
t.PurgingCl.OperationId = fmt.Sprintf("%d-%d", opInt, id)
t.ConfigGroups = make([]string, len(pcl.GetConfigGroupIndexes()))
for i, idx := range pcl.GetConfigGroupIndexes() {
id := prjcfg.MakeConfigGroupID(s.PB.GetConfigHash(), s.PB.ConfigGroupNames[idx])
t.ConfigGroups[i] = string(id)
func (s *State) makeTriageSupporter(ctx context.Context) (*triageSupporter, error) {
if s.configGroups == nil {
meta, err := prjcfg.GetHashMeta(ctx, s.PB.GetLuciProject(), s.PB.GetConfigHash())
if err != nil {
return nil, err
if s.configGroups, err = meta.GetConfigGroups(ctx); err != nil {
return nil, err
purging := make(map[int64]*prjpb.PurgingCL, len(s.PB.GetPurgingCls()))
for _, p := range s.PB.GetPurgingCls() {
purging[p.GetClid()] = p
return &triageSupporter{
pcls: s.PB.GetPcls(),
pclIndex: s.pclIndex,
purging: purging,
configGroups: s.configGroups,
}, nil
// triageSupporter provides limited access to resources of PM state.
// Implements itriager.PMState.
type triageSupporter struct {
pcls []*prjpb.PCL
pclIndex map[common.CLID]int
purging map[int64]*prjpb.PurgingCL
configGroups []*prjcfg.ConfigGroup
var _ itriager.PMState = (*triageSupporter)(nil)
func (a *triageSupporter) PCL(clid int64) *prjpb.PCL {
i, ok := a.pclIndex[common.CLID(clid)]
if !ok {
return nil
return a.pcls[i]
func (a *triageSupporter) PurgingCL(clid int64) *prjpb.PurgingCL {
return a.purging[clid]
func (a *triageSupporter) ConfigGroup(index int32) *prjcfg.ConfigGroup {
return a.configGroups[index]
func markForTriage(in []*prjpb.Component) []*prjpb.Component {
out := make([]*prjpb.Component, len(in))
for i, c := range in {
if !c.GetTriageRequired() {
c = c.CloneShallow()
c.TriageRequired = true
out[i] = c
return out
func markForTriageOnChangedPCLs(in []*prjpb.Component, pcls []*prjpb.PCL, changed common.CLIDsSet) []*prjpb.Component {
// For each changed CL `A`, expand changed set to include all CLs `B` such
// that B depends on A.
reverseDeps := make(map[int64][]int64, len(pcls)) // `A` -> all such `B` CLs
for _, p := range pcls {
for _, dep := range p.GetDeps() {
reverseDeps[dep.GetClid()] = append(reverseDeps[dep.GetClid()], p.GetClid())
expanded := make(common.CLIDsSet, len(changed))
var expand func(int64)
expand = func(clid int64) {
if expanded.HasI64(clid) {
for _, revDep := range reverseDeps[clid] {
for clid := range changed {
out := make([]*prjpb.Component, len(in))
for i, c := range in {
if !c.GetTriageRequired() {
for _, clid := range c.GetClids() {
if expanded.HasI64(clid) {
c = c.CloneShallow()
c.TriageRequired = true
out[i] = c
return out
func earlierDeadline(ctx context.Context, reserve time.Duration) (context.Context, context.CancelFunc) {
deadline, ok := ctx.Deadline()
if !ok {
return ctx, func() {} // no deadline
return clock.WithDeadline(ctx, deadline.Add(-reserve))
func min(i, j int) int {
if i < j {
return i
return j