blob: a78f9cbf115c41978fcee8958c20c6ca18e168e7 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handler
import (
"context"
"sort"
"sync"
"testing"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/tq/tqtesting"
"go.chromium.org/luci/common/errors"
cfgpb "go.chromium.org/luci/cv/api/config/v2"
"go.chromium.org/luci/cv/internal/changelist"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/configs/prjcfg/prjcfgtest"
"go.chromium.org/luci/cv/internal/cvtesting"
gcancel "go.chromium.org/luci/cv/internal/gerrit/cancel"
gf "go.chromium.org/luci/cv/internal/gerrit/gerritfake"
"go.chromium.org/luci/cv/internal/gerrit/trigger"
"go.chromium.org/luci/cv/internal/prjmanager"
"go.chromium.org/luci/cv/internal/prjmanager/pmtest"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/bq"
"go.chromium.org/luci/cv/internal/run/eventpb"
"go.chromium.org/luci/cv/internal/run/impl/state"
"go.chromium.org/luci/cv/internal/run/pubsub"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestEndRun(t *testing.T) {
t.Parallel()
Convey("EndRun", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
const clid = 1
rid := common.MakeRunID("infra", ct.Clock.Now(), 1, []byte("deadbeef"))
rs := &state.RunState{
Run: run.Run{
ID: rid,
Status: run.Status_RUNNING,
CreateTime: ct.Clock.Now().Add(-2 * time.Minute),
StartTime: ct.Clock.Now().Add(-1 * time.Minute),
CLs: common.CLIDs{1},
OngoingLongOps: &run.OngoingLongOps{
Ops: map[string]*run.OngoingLongOps_Op{
"11-22": {
CancelRequested: false,
Work: &run.OngoingLongOps_Op_PostStartMessage{PostStartMessage: true},
},
},
},
},
}
anotherRID := common.MakeRunID("infra", ct.Clock.Now(), 1, []byte("cafecafe"))
cl := changelist.CL{
ID: clid,
IncompleteRuns: common.RunIDs{rid, anotherRID},
EVersion: 3,
UpdateTime: ct.Clock.Now().UTC(),
}
sort.Sort(cl.IncompleteRuns)
So(datastore.Put(ctx, &cl), ShouldBeNil)
impl, deps := makeImpl(&ct)
se := impl.endRun(ctx, rs, run.Status_FAILED)
So(rs.Status, ShouldEqual, run.Status_FAILED)
So(rs.EndTime, ShouldEqual, ct.Clock.Now())
So(rs.OngoingLongOps.GetOps()["11-22"].GetCancelRequested(), ShouldBeTrue)
So(datastore.RunInTransaction(ctx, se, nil), ShouldBeNil)
cl = changelist.CL{ID: clid}
So(datastore.Get(ctx, &cl), ShouldBeNil)
So(cl, ShouldResemble, changelist.CL{
ID: clid,
IncompleteRuns: common.RunIDs{anotherRID},
EVersion: 4,
UpdateTime: ct.Clock.Now().UTC(),
})
ct.TQ.Run(ctx, tqtesting.StopAfterTask(changelist.BatchOnCLUpdatedTaskClass))
pmtest.AssertReceivedRunFinished(ctx, rid)
pmtest.AssertReceivedCLsNotified(ctx, rid.LUCIProject(), []*changelist.CL{&cl})
So(deps.clUpdater.refreshedCLs, ShouldResemble, common.MakeCLIDs(clid))
Convey("Publish RunEnded event", func() {
var task *pubsub.PublishRunEndedTask
for _, t := range ct.TQ.Tasks() {
if p, ok := t.Payload.(*pubsub.PublishRunEndedTask); ok {
task = p
break
}
}
So(task, ShouldResembleProto, &pubsub.PublishRunEndedTask{
PublicId: rs.ID.PublicID(),
LuciProject: rs.ID.LUCIProject(),
Status: rs.Status,
Eversion: int64(rs.EVersion + 1),
})
})
})
}
func TestCancelTriggers(t *testing.T) {
t.Parallel()
Convey("CancelTriggers on a CL just deleted by its owner force-updates CL", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
const gHost = "x-review.example.com"
const gRepo = "luci-go"
const gChange = 123
const lProject = "infra"
const clid = 1
prjcfgtest.Create(ctx, lProject, &cfgpb.Config{
ConfigGroups: []*cfgpb.ConfigGroup{
{
Name: "main",
Gerrit: []*cfgpb.ConfigGroup_Gerrit{{
Url: "https://" + gHost,
Projects: []*cfgpb.ConfigGroup_Gerrit_Project{
{Name: gRepo, RefRegexp: []string{"refs/heads/.+"}},
},
}},
},
},
})
cgs, err := prjcfgtest.MustExist(ctx, lProject).GetConfigGroups(ctx)
So(err, ShouldBeNil)
cg := cgs[0]
rid := common.MakeRunID("infra", ct.Clock.Now(), 1, []byte("deadbeef"))
ci := gf.CI(gChange, gf.CQ(+2))
cl := changelist.CL{
ID: clid,
ExternalID: changelist.MustGobID(gHost, gChange),
IncompleteRuns: common.RunIDs{rid},
EVersion: 3,
UpdateTime: ct.Clock.Now().UTC(),
Snapshot: &changelist.Snapshot{
Kind: &changelist.Snapshot_Gerrit{Gerrit: &changelist.Gerrit{
Host: gHost,
Info: ci,
}},
LuciProject: lProject,
ExternalUpdateTime: timestamppb.New(ct.Clock.Now()),
},
}
rcl := run.RunCL{
ID: clid,
Run: datastore.MakeKey(ctx, run.RunKind, string(rid)),
ExternalID: cl.ExternalID,
Detail: cl.Snapshot,
Trigger: trigger.Find(ci, cg.Content),
}
So(datastore.Put(ctx, &cl, &rcl), ShouldBeNil)
// Simulate CL no longer existing in Gerrit (e.g. ct.GFake) 1 minute later,
// just as Run Manager decides to cancel the CL triggers.
ct.Clock.Add(time.Minute)
impl, deps := makeImpl(&ct)
meta := reviewInputMeta{
notify: gcancel.NONE,
message: "Dry Run OK",
attention: gcancel.NONE,
}
err = impl.cancelCLTriggers(ctx, rid, []*run.RunCL{&rcl}, []changelist.ExternalID{cl.ExternalID}, cg, meta)
// The cancelation errors out, but the CL refresh is scheduled.
// TODO(crbug/1227369): fail transiently or better yet schedule another
// retry in the future and fail with tq.Ignore.
So(gcancel.ErrPermanentTag.In(err), ShouldBeTrue)
So(deps.clUpdater.refreshedCLs, ShouldResemble, common.MakeCLIDs(clid))
})
}
type dependencies struct {
pm *prjmanager.Notifier
rm *run.Notifier
clUpdater *clUpdaterMock
}
type testHandler struct {
inner Handler
}
func validateStateMutation(passed, initialCopy, result *state.RunState) {
switch {
case cvtesting.SafeShouldResemble(result, initialCopy) == "":
// no state change, doesn't matter whether shallow copy is created or not.
return
case passed == result:
So(errors.New("handler mutated the input state but doesn't create a shallow copy before mutation"), ShouldBeNil)
case cvtesting.SafeShouldResemble(initialCopy, passed) != "":
So(errors.New("handler created a shallow copy but modifed addressable property in place; forgot to clone a proto?"), ShouldBeNil)
}
}
func (t *testHandler) Start(ctx context.Context, rs *state.RunState) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.Start(ctx, rs)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) Cancel(ctx context.Context, rs *state.RunState, reasons []string) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.Cancel(ctx, rs, reasons)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) OnCLsUpdated(ctx context.Context, rs *state.RunState, cls common.CLIDs) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.OnCLsUpdated(ctx, rs, cls)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) UpdateConfig(ctx context.Context, rs *state.RunState, ver string) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.UpdateConfig(ctx, rs, ver)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) OnCQDTryjobsUpdated(ctx context.Context, rs *state.RunState) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.OnCQDTryjobsUpdated(ctx, rs)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) OnCQDVerificationCompleted(ctx context.Context, rs *state.RunState) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.OnCQDVerificationCompleted(ctx, rs)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) OnReadyForSubmission(ctx context.Context, rs *state.RunState) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.OnReadyForSubmission(ctx, rs)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) OnCLSubmitted(ctx context.Context, rs *state.RunState, cls common.CLIDs) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.OnCLSubmitted(ctx, rs, cls)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) OnSubmissionCompleted(ctx context.Context, rs *state.RunState, sc *eventpb.SubmissionCompleted) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.OnSubmissionCompleted(ctx, rs, sc)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) OnLongOpCompleted(ctx context.Context, rs *state.RunState, result *eventpb.LongOpCompleted) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.OnLongOpCompleted(ctx, rs, result)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) TryResumeSubmission(ctx context.Context, rs *state.RunState) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.TryResumeSubmission(ctx, rs)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func (t *testHandler) Poke(ctx context.Context, rs *state.RunState) (*Result, error) {
initialCopy := rs.DeepCopy()
res, err := t.inner.Poke(ctx, rs)
if err != nil {
return nil, err
}
validateStateMutation(rs, initialCopy, res.State)
return res, err
}
func makeTestHandler(ct *cvtesting.Test) (Handler, dependencies) {
handler, dependencies := makeImpl(ct)
return &testHandler{inner: handler}, dependencies
}
// makeImpl should only be used to test common functions. For testing handler,
// please use makeTestHandler instead.
func makeImpl(ct *cvtesting.Test) (*Impl, dependencies) {
deps := dependencies{
pm: prjmanager.NewNotifier(ct.TQDispatcher),
rm: run.NewNotifier(ct.TQDispatcher),
clUpdater: &clUpdaterMock{},
}
impl := &Impl{
PM: deps.pm,
RM: deps.rm,
CLMutator: changelist.NewMutator(ct.TQDispatcher, deps.pm, deps.rm),
CLUpdater: deps.clUpdater,
TreeClient: ct.TreeFake.Client(),
GFactory: ct.GFactory(),
BQExporter: bq.NewExporter(ct.TQDispatcher, ct.BQFake, ct.Env),
Publisher: pubsub.NewPublisher(ct.TQDispatcher, ct.Env),
}
return impl, deps
}
type clUpdaterMock struct {
m sync.Mutex
refreshedCLs common.CLIDs
}
func (c *clUpdaterMock) ScheduleBatch(ctx context.Context, luciProject string, cls []*changelist.CL) error {
c.m.Lock()
for _, cl := range cls {
c.refreshedCLs = append(c.refreshedCLs, cl.ID)
}
c.m.Unlock()
return nil
}