blob: 5ed1eb19d8b5deb0f3d036e03b8f938708f67826 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package state
import (
"context"
"fmt"
"sort"
"testing"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/auth/identity"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/gae/service/datastore"
cfgpb "go.chromium.org/luci/cv/api/config/v2"
"go.chromium.org/luci/cv/internal/changelist"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/configs/prjcfg/prjcfgtest"
"go.chromium.org/luci/cv/internal/cvtesting"
gf "go.chromium.org/luci/cv/internal/gerrit/gerritfake"
"go.chromium.org/luci/cv/internal/gerrit/trigger"
"go.chromium.org/luci/cv/internal/prjmanager"
"go.chromium.org/luci/cv/internal/prjmanager/itriager"
"go.chromium.org/luci/cv/internal/prjmanager/pmtest"
"go.chromium.org/luci/cv/internal/prjmanager/prjpb"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/runcreator"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestEarliestDecisionTime(t *testing.T) {
t.Parallel()
Convey("earliestDecisionTime works", t, func() {
now := testclock.TestRecentTimeUTC
t0 := now.Add(time.Hour)
earliest := func(cs []*prjpb.Component) time.Time {
t, tPB, asap := earliestDecisionTime(cs)
if asap {
return now
}
if t.IsZero() {
So(tPB, ShouldBeNil)
} else {
So(tPB.AsTime(), ShouldResemble, t)
}
return t
}
cs := []*prjpb.Component{
{DecisionTime: nil},
}
So(earliest(cs), ShouldResemble, time.Time{})
cs = append(cs, &prjpb.Component{DecisionTime: timestamppb.New(t0.Add(time.Second))})
So(earliest(cs), ShouldResemble, t0.Add(time.Second))
cs = append(cs, &prjpb.Component{})
So(earliest(cs), ShouldResemble, t0.Add(time.Second))
cs = append(cs, &prjpb.Component{DecisionTime: timestamppb.New(t0.Add(time.Hour))})
So(earliest(cs), ShouldResemble, t0.Add(time.Second))
cs = append(cs, &prjpb.Component{DecisionTime: timestamppb.New(t0)})
So(earliest(cs), ShouldResemble, t0)
cs = append(cs, &prjpb.Component{
TriageRequired: true,
// DecisionTime in this case doesn't matter.
DecisionTime: timestamppb.New(t0.Add(10 * time.Hour)),
})
So(earliest(cs), ShouldResemble, now)
})
}
func TestComponentsActions(t *testing.T) {
t.Parallel()
Convey("Component actions logic work in the abstract", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
now := ct.Clock.Now()
const lProject = "luci-project"
prjcfgtest.Create(ctx, lProject, &cfgpb.Config{ConfigGroups: []*cfgpb.ConfigGroup{{Name: "main"}}})
meta := prjcfgtest.MustExist(ctx, lProject)
pmNotifier := prjmanager.NewNotifier(ct.TQDispatcher)
runNotifier := run.NewNotifier(ct.TQDispatcher)
h := Handler{
PMNotifier: pmNotifier,
RunNotifier: runNotifier,
CLMutator: changelist.NewMutator(ct.TQDispatcher, pmNotifier, runNotifier),
}
state := &State{
PB: &prjpb.PState{
LuciProject: lProject,
Status: prjpb.Status_STARTED,
ConfigHash: meta.Hash(),
Pcls: []*prjpb.PCL{
{Clid: 1},
{Clid: 2},
{Clid: 3},
{Clid: 999},
},
Components: []*prjpb.Component{
{Clids: []int64{999}}, // never sees any action.
{Clids: []int64{1}, DecisionTime: timestamppb.New(now.Add(1 * time.Minute))},
{Clids: []int64{2}, DecisionTime: timestamppb.New(now.Add(2 * time.Minute))},
{Clids: []int64{3}, DecisionTime: timestamppb.New(now.Add(3 * time.Minute))},
},
NextEvalTime: timestamppb.New(now.Add(1 * time.Minute)),
},
}
pb := backupPB(state)
markComponentsForTriage := func(indexes ...int) {
for _, i := range indexes {
state.PB.GetComponents()[i].TriageRequired = true
}
pb = backupPB(state)
}
markTriaged := func(c *prjpb.Component) *prjpb.Component {
if !c.GetTriageRequired() {
panic(fmt.Errorf("must required triage"))
}
o := c.CloneShallow()
o.TriageRequired = false
return o
}
calledOn := make(chan *prjpb.Component, len(state.PB.Components))
collectCalledOn := func() []int {
var out []int
loop:
for {
select {
case c := <-calledOn:
out = append(out, int(c.GetClids()[0]))
default:
break loop
}
}
sort.Ints(out)
return out
}
Convey("noop at triage", func() {
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
calledOn <- c
return itriager.Result{}, nil
}
actions, saveForDebug, err := h.triageComponents(ctx, state)
So(err, ShouldBeNil)
So(saveForDebug, ShouldBeFalse)
So(actions, ShouldBeNil)
So(state.PB, ShouldResembleProto, pb)
So(collectCalledOn(), ShouldBeEmpty)
Convey("ExecDeferred", func() {
state2, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldBeNil)
So(state.PB, ShouldResembleProto, pb)
So(state2, ShouldEqual, state) // pointer comparison
So(sideEffect, ShouldBeNil)
// Always creates new task iff there is NextEvalTime.
So(pmtest.ETAsOF(ct.TQ.Tasks(), lProject), ShouldNotBeEmpty)
})
})
Convey("triage called on TriageRequired components or when decision time is <= now", func() {
ct.Clock.Set(state.PB.Components[1].DecisionTime.AsTime())
c1next := state.PB.Components[1].DecisionTime.AsTime().Add(time.Hour)
markComponentsForTriage(3)
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
calledOn <- c
switch c.GetClids()[0] {
case 1:
c = c.CloneShallow()
c.DecisionTime = timestamppb.New(c1next)
return itriager.Result{NewValue: c}, nil
case 3:
return itriager.Result{NewValue: markTriaged(c)}, nil
}
panic("unreachable")
}
actions, saveForDebug, err := h.triageComponents(ctx, state)
So(err, ShouldBeNil)
So(saveForDebug, ShouldBeFalse)
So(actions, ShouldHaveLength, 2)
So(collectCalledOn(), ShouldResemble, []int{1, 3})
Convey("ExecDeferred", func() {
state2, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldBeNil)
So(sideEffect, ShouldBeNil)
pb.NextEvalTime = timestamppb.New(now.Add(2 * time.Minute))
pb.Components[1].DecisionTime = timestamppb.New(c1next)
pb.Components[3].TriageRequired = false
So(state2.PB, ShouldResembleProto, pb)
So(pmtest.ETAsWithin(ct.TQ.Tasks(), lProject, time.Second, now.Add(2*time.Minute)), ShouldNotBeEmpty)
})
})
Convey("purges CLs", func() {
markComponentsForTriage(1, 2, 3)
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
switch clid := c.GetClids()[0]; clid {
case 1, 3:
return itriager.Result{CLsToPurge: []*prjpb.PurgeCLTask{
{
PurgingCl: &prjpb.PurgingCL{Clid: clid},
Reasons: []*changelist.CLError{
{Kind: &changelist.CLError_OwnerLacksEmail{OwnerLacksEmail: true}},
},
},
}}, nil
case 2:
return itriager.Result{}, nil
}
panic("unreachable")
}
actions, saveForDebug, err := h.triageComponents(ctx, state)
So(err, ShouldBeNil)
So(saveForDebug, ShouldBeFalse)
So(actions, ShouldHaveLength, 3)
So(state.PB, ShouldResembleProto, pb)
Convey("ExecDeferred", func() {
state2, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldBeNil)
expectedDeadline := timestamppb.New(now.Add(maxPurgingCLDuration))
So(state2.PB.GetPurgingCls(), ShouldResembleProto, []*prjpb.PurgingCL{
{Clid: 1, OperationId: "1580640000-1", Deadline: expectedDeadline},
{Clid: 3, OperationId: "1580640000-3", Deadline: expectedDeadline},
})
So(sideEffect, ShouldHaveSameTypeAs, &TriggerPurgeCLTasks{})
ps := sideEffect.(*TriggerPurgeCLTasks).payloads
So(ps, ShouldHaveLength, 2)
// Unlike PB.PurgingCls, the tasks aren't necessarily sorted.
sort.Slice(ps, func(i, j int) bool { return ps[i].GetPurgingCl().GetClid() < ps[j].GetPurgingCl().GetClid() })
So(ps[0].GetPurgingCl(), ShouldResembleProto, state2.PB.GetPurgingCls()[0]) // CL#1
So(ps[0].GetTrigger(), ShouldResembleProto, state2.PB.GetPcls()[1 /*CL#1*/].GetTrigger())
So(ps[0].GetLuciProject(), ShouldEqual, lProject)
So(ps[1].GetPurgingCl(), ShouldResembleProto, state2.PB.GetPurgingCls()[1]) // CL#3
})
})
Convey("partial failure in triage", func() {
markComponentsForTriage(1, 2, 3)
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
switch c.GetClids()[0] {
case 1:
return itriager.Result{}, errors.New("oops1")
case 2, 3:
return itriager.Result{NewValue: markTriaged(c)}, nil
}
panic("unreachable")
}
actions, saveForDebug, err := h.triageComponents(ctx, state)
So(err, ShouldBeNil)
So(saveForDebug, ShouldBeFalse)
So(actions, ShouldHaveLength, 2)
So(state.PB, ShouldResembleProto, pb)
Convey("ExecDeferred", func() {
// Execute slightly after #1 component decision time.
ct.Clock.Set(pb.Components[1].DecisionTime.AsTime().Add(time.Microsecond))
state2, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldBeNil)
So(sideEffect, ShouldBeNil)
pb.Components[2].TriageRequired = false
pb.Components[3].TriageRequired = false
pb.NextEvalTime = timestamppb.New(ct.Clock.Now()) // re-triage ASAP.
So(state2.PB, ShouldResembleProto, pb)
// Self-poke task must be scheduled for earliest possible from now.
So(pmtest.ETAsWithin(ct.TQ.Tasks(), lProject, time.Second, ct.Clock.Now().Add(prjpb.PMTaskInterval)), ShouldNotBeEmpty)
})
})
Convey("outdated PMState detected during triage", func() {
markComponentsForTriage(1, 2, 3)
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
switch c.GetClids()[0] {
case 1:
return itriager.Result{}, errors.Annotate(itriager.ErrOutdatedPMState, "smth changed").Err()
case 2, 3:
return itriager.Result{NewValue: markTriaged(c)}, nil
}
panic("unreachable")
}
actions, saveForDebug, err := h.triageComponents(ctx, state)
So(err, ShouldBeNil)
So(saveForDebug, ShouldBeFalse)
So(actions, ShouldHaveLength, 2)
So(state.PB, ShouldResembleProto, pb)
Convey("ExecDeferred", func() {
state2, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldBeNil)
So(sideEffect, ShouldBeNil)
pb.Components[2].TriageRequired = false
pb.Components[3].TriageRequired = false
pb.NextEvalTime = timestamppb.New(ct.Clock.Now()) // re-triage ASAP.
So(state2.PB, ShouldResembleProto, pb)
// Self-poke task must be scheduled for earliest possible from now.
So(pmtest.ETAsWithin(ct.TQ.Tasks(), lProject, time.Second, ct.Clock.Now().Add(prjpb.PMTaskInterval)), ShouldNotBeEmpty)
})
})
Convey("100% failure in triage", func() {
markComponentsForTriage(1, 2)
h.ComponentTriage = func(_ context.Context, _ *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
return itriager.Result{}, errors.New("oops")
}
_, _, err := h.triageComponents(ctx, state)
So(err, ShouldErrLike, "failed to triage 2 components")
So(state.PB, ShouldResembleProto, pb)
Convey("ExecDeferred", func() {
state2, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldNotBeNil)
So(sideEffect, ShouldBeNil)
So(state2, ShouldBeNil)
})
})
Convey("Catches panic in triage", func() {
markComponentsForTriage(1)
h.ComponentTriage = func(_ context.Context, _ *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
panic(errors.New("oops"))
}
_, _, err := h.ExecDeferred(ctx, state)
So(err, ShouldErrLike, errCaughtPanic)
So(state.PB, ShouldResembleProto, pb)
})
Convey("With Run Creation", func() {
// Run creation requires ProjectStateOffload entity to exist.
So(datastore.Put(ctx, &prjmanager.ProjectStateOffload{
ConfigHash: prjcfgtest.MustExist(ctx, lProject).ConfigGroupIDs[0].Hash(),
Project: datastore.MakeKey(ctx, prjmanager.ProjectKind, lProject),
Status: prjpb.Status_STARTED,
}), ShouldBeNil)
makeRunCreator := func(clid int64, fail bool) *runcreator.Creator {
cfgGroups, err := prjcfgtest.MustExist(ctx, lProject).GetConfigGroups(ctx)
if err != nil {
panic(err)
}
ci := gf.CI(int(clid), gf.PS(1), gf.CQ(+1, ct.Clock.Now(), gf.U("user-1")))
cl := &changelist.CL{
ID: common.CLID(clid),
EVersion: 1,
Snapshot: &changelist.Snapshot{Kind: &changelist.Snapshot_Gerrit{Gerrit: &changelist.Gerrit{
Host: "gerrit-review.example.com",
Info: ci,
}}},
}
if fail {
// Simulate EVersion mismatch to fail run creation.
cl.EVersion = 2
}
err = datastore.Put(ctx, cl)
if err != nil {
panic(err)
}
cl.EVersion = 1
return &runcreator.Creator{
LUCIProject: lProject,
ConfigGroupID: cfgGroups[0].ID,
Mode: run.DryRun,
OperationID: fmt.Sprintf("op-%d-%t", clid, fail),
Owner: identity.Identity("user:user-1@example.com"),
Options: &run.Options{},
InputCLs: []runcreator.CL{{
ID: common.CLID(clid),
ExpectedEVersion: 1,
Snapshot: cl.Snapshot,
TriggerInfo: trigger.Find(ci, cfgGroups[0].Content),
}},
}
}
findRunOf := func(clid int) *run.Run {
switch runs, _, err := (run.CLQueryBuilder{CLID: common.CLID(clid)}).LoadRuns(ctx); {
case err != nil:
panic(err)
case len(runs) == 0:
return nil
case len(runs) > 1:
panic(fmt.Errorf("%d Runs for given CL", len(runs)))
default:
return runs[0]
}
}
Convey("100% success", func() {
markComponentsForTriage(1)
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
rc := makeRunCreator(1, false /* succeed */)
return itriager.Result{NewValue: markTriaged(c), RunsToCreate: []*runcreator.Creator{rc}}, nil
}
state2, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldBeNil)
So(sideEffect, ShouldBeNil)
pb.Components[1].TriageRequired = false // must be saved, since Run Creation succeeded.
So(state2.PB, ShouldResembleProto, pb)
So(findRunOf(1), ShouldNotBeNil)
})
Convey("100% failure", func() {
markComponentsForTriage(1)
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
rc := makeRunCreator(1, true /* fail */)
return itriager.Result{NewValue: markTriaged(c), RunsToCreate: []*runcreator.Creator{rc}}, nil
}
_, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldErrLike, "failed to actOnComponents")
So(sideEffect, ShouldBeNil)
So(findRunOf(1), ShouldBeNil)
})
Convey("Partial failure", func() {
markComponentsForTriage(1, 2, 3)
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
clid := c.GetClids()[0]
// Set up each component trying to create a Run,
// and #2 and #3 additionally purging a CL,
// but #2 failing to create a Run.
failIf := clid == 2
rc := makeRunCreator(clid, failIf)
res := itriager.Result{NewValue: markTriaged(c), RunsToCreate: []*runcreator.Creator{rc}}
if clid != 1 {
// Contrived example, since in practice purging a CL concurrently
// with Run creation in the same component ought to happen only iff
// there are several CLs and presumably on different CLs.
res.CLsToPurge = []*prjpb.PurgeCLTask{
{
PurgingCl: &prjpb.PurgingCL{Clid: clid},
Reasons: []*changelist.CLError{
{Kind: &changelist.CLError_OwnerLacksEmail{OwnerLacksEmail: true}},
},
},
}
}
return res, nil
}
state2, sideEffect, err := h.ExecDeferred(ctx, state)
So(err, ShouldBeNil)
// Only #3 component purge must be a SideEffect.
So(sideEffect, ShouldHaveSameTypeAs, &TriggerPurgeCLTasks{})
ps := sideEffect.(*TriggerPurgeCLTasks).payloads
So(ps, ShouldHaveLength, 1)
So(ps[0].GetPurgingCl().GetClid(), ShouldEqual, 3)
So(findRunOf(1), ShouldNotBeNil)
pb.Components[1].TriageRequired = false
// Component #2 must remain unchanged.
So(findRunOf(3), ShouldNotBeNil)
pb.Components[3].TriageRequired = false
pb.PurgingCls = []*prjpb.PurgingCL{
{
Clid: 3, OperationId: "1580640000-3",
Deadline: timestamppb.New(ct.Clock.Now().Add(maxPurgingCLDuration)),
},
}
pb.NextEvalTime = timestamppb.New(ct.Clock.Now()) // re-triage ASAP.
So(state2.PB, ShouldResembleProto, pb)
})
Convey("Catches panic", func() {
markComponentsForTriage(1)
h.ComponentTriage = func(_ context.Context, c *prjpb.Component, _ itriager.PMState) (itriager.Result, error) {
rc := makeRunCreator(1, false)
rc.LUCIProject = "" // causes panic because of incorrect usage.
return itriager.Result{NewValue: markTriaged(c), RunsToCreate: []*runcreator.Creator{rc}}, nil
}
_, _, err := h.ExecDeferred(ctx, state)
So(err, ShouldErrLike, errCaughtPanic)
So(state.PB, ShouldResembleProto, pb)
})
})
})
}