blob: b7a802346e56ba009cdfb416ca653e92482c0edd [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package longops
import (
"fmt"
"testing"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/gae/service/datastore"
cfgpb "go.chromium.org/luci/cv/api/config/v2"
"go.chromium.org/luci/cv/internal/changelist"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/common/lease"
"go.chromium.org/luci/cv/internal/configs/prjcfg/prjcfgtest"
"go.chromium.org/luci/cv/internal/cvtesting"
gf "go.chromium.org/luci/cv/internal/gerrit/gerritfake"
"go.chromium.org/luci/cv/internal/gerrit/trigger"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/eventpb"
. "github.com/smartystreets/goconvey/convey"
)
func TestCancelTriggers(t *testing.T) {
t.Parallel()
Convey("CancelTriggers works", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
mutator := changelist.NewMutator(ct.TQDispatcher, nil, nil, nil)
const (
lProject = "infra"
gHost = "g-review.example.com"
)
runCreateTime := clock.Now(ctx)
runID := common.MakeRunID(lProject, runCreateTime, 1, []byte("deadbeef"))
cfg := cfgpb.Config{
ConfigGroups: []*cfgpb.ConfigGroup{
{Name: "test"},
},
}
prjcfgtest.Create(ctx, lProject, &cfg)
initRunAndCLs := func(cis []*gerritpb.ChangeInfo) (*run.Run, common.CLIDs) {
clids := make(common.CLIDs, len(cis))
cls := make([]*changelist.CL, len(cis))
runCLs := make([]*run.RunCL, len(cis))
for i, ci := range cis {
So(ci.GetNumber(), ShouldBeGreaterThan, 0)
So(ci.GetNumber(), ShouldBeLessThan, 1000)
triggers := trigger.Find(&trigger.FindInput{ChangeInfo: ci, ConfigGroup: cfg.GetConfigGroups()[0]})
So(triggers.GetCqVoteTrigger(), ShouldNotBeNil)
So(ct.GFake.Has(gHost, int(ci.GetNumber())), ShouldBeFalse)
ct.GFake.AddFrom(gf.WithCIs(gHost, gf.ACLRestricted(lProject), ci))
cl := changelist.MustGobID(gHost, ci.GetNumber()).MustCreateIfNotExists(ctx)
cl.Snapshot = &changelist.Snapshot{
Kind: &changelist.Snapshot_Gerrit{Gerrit: &changelist.Gerrit{
Host: gHost,
Info: ci,
}},
LuciProject: lProject,
ExternalUpdateTime: timestamppb.New(runCreateTime),
}
cl.EVersion++
clids[i] = cl.ID
runCLs[i] = &run.RunCL{
ID: cl.ID,
ExternalID: cl.ExternalID,
IndexedID: cl.ID,
Trigger: triggers.GetCqVoteTrigger(),
Run: datastore.MakeKey(ctx, common.RunKind, string(runID)),
Detail: cl.Snapshot,
}
cls[i] = cl
}
r := &run.Run{
ID: runID,
Status: run.Status_RUNNING,
CLs: clids,
Mode: run.DryRun,
ConfigGroupID: prjcfgtest.MustExist(ctx, lProject).ConfigGroupIDs[0],
}
So(datastore.Put(ctx, r, cls, runCLs), ShouldBeNil)
return r, clids
}
makeOp := func(r *run.Run) *CancelTriggersOp {
reqs := make([]*run.OngoingLongOps_Op_TriggersCancellation_Request, len(r.CLs))
for i, clid := range r.CLs {
reqs[i] = &run.OngoingLongOps_Op_TriggersCancellation_Request{
Clid: int64(clid),
Message: fmt.Sprintf("cancel message for CL %d", clid),
Notify: []run.OngoingLongOps_Op_TriggersCancellation_Whom{
run.OngoingLongOps_Op_TriggersCancellation_OWNER,
run.OngoingLongOps_Op_TriggersCancellation_REVIEWERS,
},
AddToAttention: []run.OngoingLongOps_Op_TriggersCancellation_Whom{
run.OngoingLongOps_Op_TriggersCancellation_OWNER,
run.OngoingLongOps_Op_TriggersCancellation_CQ_VOTERS,
},
AddToAttentionReason: fmt.Sprintf("attention reason for CL %d", clid),
}
}
return &CancelTriggersOp{
Base: &Base{
Op: &run.OngoingLongOps_Op{
Deadline: timestamppb.New(clock.Now(ctx).Add(10000 * time.Hour)), // infinite
CancelRequested: false,
Work: &run.OngoingLongOps_Op_CancelTriggers{
CancelTriggers: &run.OngoingLongOps_Op_TriggersCancellation{
Requests: reqs,
},
},
},
IsCancelRequested: func() bool { return false },
Run: r,
},
GFactory: ct.GFactory(),
CLMutator: mutator,
}
}
assertTriggerRemoved := func(eid changelist.ExternalID) {
host, changeID, err := changelist.ExternalID(eid).ParseGobID()
So(err, ShouldBeNil)
So(host, ShouldEqual, gHost)
changeInfo := ct.GFake.GetChange(gHost, int(changeID)).Info
So(trigger.Find(&trigger.FindInput{ChangeInfo: changeInfo, ConfigGroup: cfg.GetConfigGroups()[0]}), ShouldBeNil)
}
testHappyPath := func(prefix string, clCount, concurrency int) {
Convey(fmt.Sprintf("%s [%d CLs with concurrency %d]", prefix, clCount, concurrency), func() {
cis := make([]*gerritpb.ChangeInfo, clCount)
for i := range cis {
cis[i] = gf.CI(i+1, gf.CQ(+1), gf.Updated(runCreateTime.Add(-1*time.Minute)))
}
r, _ := initRunAndCLs(cis)
startTime := clock.Now(ctx)
op := makeOp(r)
op.CancelConcurrency = concurrency
res, err := op.Do(ctx)
So(err, ShouldBeNil)
So(res.GetStatus(), ShouldEqual, eventpb.LongOpCompleted_SUCCEEDED)
results := res.GetCancelTriggers().GetResults()
So(results, ShouldHaveLength, clCount)
processedCLIDs := make(common.CLIDsSet, clCount)
for _, result := range results {
So(processedCLIDs.HasI64(result.Id), ShouldBeFalse) // duplicate processing
processedCLIDs.AddI64(result.Id)
assertTriggerRemoved(changelist.ExternalID(result.ExternalId))
So(result.GetSuccessInfo().GetCancelledAt().AsTime(), ShouldHappenOnOrAfter, startTime)
}
})
}
testHappyPath("single", 1, 1)
testHappyPath("serial", 4, 1)
testHappyPath("concurrent", 80, 8)
// TODO(crbug/1297723): re-enable this test after fixing the flake.
SkipConvey("Retry on alreadyInLease failure", func() {
// Creating changes from 1 to `clCount`, lease the CL with duration ==
// change number * time.Minute.
clCount := 6
cis := make([]*gerritpb.ChangeInfo, clCount)
for i := 1; i <= clCount; i++ {
cis[i-1] = gf.CI(i, gf.CQ(+1), gf.Updated(runCreateTime.Add(-1*time.Minute)))
}
r, clids := initRunAndCLs(cis)
for i, clid := range clids {
_, _, err := lease.ApplyOnCL(ctx, clid, time.Duration(cis[i].GetNumber())*time.Minute, "FooBar")
So(err, ShouldBeNil)
}
startTime := clock.Now(ctx)
op := makeOp(r)
op.CancelConcurrency = clCount
op.testAfterTryCancelFn = func() {
// Advance the clock by 1 minute + 1 second so that the lease will
// be guaranteed to expire in the next attempt.
ct.Clock.Add(1*time.Minute + 1*time.Second)
}
res, err := op.Do(ctx)
So(err, ShouldBeNil)
So(res.GetStatus(), ShouldEqual, eventpb.LongOpCompleted_SUCCEEDED)
results := res.GetCancelTriggers().GetResults()
So(results, ShouldHaveLength, len(cis))
for i, result := range results {
So(result.Id, ShouldEqual, clids[i])
So(result.GetSuccessInfo().GetCancelledAt().AsTime(), ShouldHappenAfter, startTime.Add(time.Duration(cis[i].GetNumber())*time.Minute))
assertTriggerRemoved(changelist.ExternalID(result.ExternalId))
}
})
// TODO(crbug/1199880): test can retry transient failure once Gerrit fake
// gain the flakiness mode.
Convey("Failed permanently for non-transient error", func() {
cis := []*gerritpb.ChangeInfo{
gf.CI(1, gf.CQ(+1), gf.Updated(runCreateTime.Add(-1*time.Minute))),
gf.CI(2, gf.CQ(+1), gf.Updated(runCreateTime.Add(-1*time.Minute))),
}
r, clids := initRunAndCLs(cis)
ct.GFake.MutateChange(gHost, 2, func(c *gf.Change) {
c.ACLs = gf.ACLReadOnly(lProject) // can't mutate
})
op := makeOp(r)
startTime := clock.Now(ctx)
res, err := op.Do(ctx)
So(err, ShouldNotBeNil)
So(res.GetStatus(), ShouldEqual, eventpb.LongOpCompleted_FAILED)
results := res.GetCancelTriggers().GetResults()
So(results, ShouldHaveLength, len(cis))
for _, result := range results {
switch common.CLID(result.Id) {
case clids[0]: // Change 1
So(result.GetSuccessInfo().GetCancelledAt().AsTime(), ShouldHappenAfter, startTime)
case clids[1]: // Change 2
So(result.GetFailureInfo().GetFailureMessage(), ShouldNotBeEmpty)
}
So(result.ExternalId, ShouldNotBeEmpty)
}
})
Convey("Doesn't obey long op cancellation", func() {
ci := gf.CI(1, gf.CQ(+1), gf.Updated(runCreateTime.Add(-1*time.Minute)))
cis := []*gerritpb.ChangeInfo{ci}
r, clids := initRunAndCLs(cis)
op := makeOp(r)
op.IsCancelRequested = func() bool { return true }
res, err := op.Do(ctx)
So(err, ShouldBeNil)
So(res.GetStatus(), ShouldEqual, eventpb.LongOpCompleted_SUCCEEDED)
results := res.GetCancelTriggers().GetResults()
So(results, ShouldHaveLength, len(cis))
for i, result := range results {
So(result.Id, ShouldEqual, clids[i])
assertTriggerRemoved(changelist.ExternalID(result.ExternalId))
So(result.GetSuccessInfo().GetCancelledAt(), ShouldNotBeNil)
}
})
})
}
func TestGerritWhoms(t *testing.T) {
Convey("Test gerrit whoms conversion", t, func() {
for name, v := range run.OngoingLongOps_Op_TriggersCancellation_Whom_value {
switch name {
case "NONE":
So(func() {
_ = convertToGerritWhoms([]run.OngoingLongOps_Op_TriggersCancellation_Whom{run.OngoingLongOps_Op_TriggersCancellation_Whom(v)})
}, ShouldPanic)
default:
g := convertToGerritWhoms([]run.OngoingLongOps_Op_TriggersCancellation_Whom{run.OngoingLongOps_Op_TriggersCancellation_Whom(v)})
So(g, ShouldHaveLength, 1)
So(g[0], ShouldNotEqual, 0)
}
}
})
}