blob: c0f82283e4653d75efc7943c26322bd23caad8e7 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handler
import (
"context"
"fmt"
"sort"
"strconv"
"testing"
"time"
"go.chromium.org/luci/common/clock"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/gae/service/datastore"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
cfgpb "go.chromium.org/luci/cv/api/config/v2"
"go.chromium.org/luci/cv/internal/changelist"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/common/tree"
"go.chromium.org/luci/cv/internal/configs/prjcfg"
"go.chromium.org/luci/cv/internal/configs/prjcfg/prjcfgtest"
"go.chromium.org/luci/cv/internal/cvtesting"
gf "go.chromium.org/luci/cv/internal/gerrit/gerritfake"
"go.chromium.org/luci/cv/internal/gerrit/trigger"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/eventpb"
"go.chromium.org/luci/cv/internal/run/impl/state"
"go.chromium.org/luci/cv/internal/run/impl/submit"
"go.chromium.org/luci/cv/internal/run/runtest"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestOnReadyForSubmission(t *testing.T) {
t.Parallel()
Convey("OnReadyForSubmission", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
const lProject = "l_project"
const gHost = "x-review.example.com"
rid := common.MakeRunID(lProject, ct.Clock.Now().Add(-2*time.Minute), 1, []byte("deadbeef"))
runCLs := common.CLIDs{1, 2}
r := run.Run{
ID: rid,
Status: run.Status_RUNNING,
CreateTime: ct.Clock.Now().UTC().Add(-2 * time.Minute),
StartTime: ct.Clock.Now().UTC().Add(-1 * time.Minute),
CLs: runCLs,
}
cg := &cfgpb.Config{
ConfigGroups: []*cfgpb.ConfigGroup{
{
Name: "main",
Verifiers: &cfgpb.Verifiers{
TreeStatus: &cfgpb.Verifiers_TreeStatus{
Url: "tree.example.com",
},
},
},
},
}
prjcfgtest.Create(ctx, rid.LUCIProject(), cg)
meta, err := prjcfg.GetLatestMeta(ctx, rid.LUCIProject())
So(err, ShouldBeNil)
So(meta.ConfigGroupIDs, ShouldHaveLength, 1)
r.ConfigGroupID = meta.ConfigGroupIDs[0]
// 1 depends on 2
ci1 := gf.CI(
1111, gf.PS(2),
gf.CQ(2, ct.Clock.Now().Add(-2*time.Minute), gf.U("user-100")),
gf.Updated(clock.Now(ctx).Add(-1*time.Minute)))
ci2 := gf.CI(
2222, gf.PS(3),
gf.CQ(2, ct.Clock.Now().Add(-2*time.Minute), gf.U("user-100")),
gf.Updated(clock.Now(ctx).Add(-1*time.Minute)))
So(datastore.Put(ctx,
&run.RunCL{
ID: 1,
Run: datastore.MakeKey(ctx, common.RunKind, string(rid)),
ExternalID: changelist.MustGobID(gHost, ci1.GetNumber()),
Detail: &changelist.Snapshot{
Kind: &changelist.Snapshot_Gerrit{
Gerrit: &changelist.Gerrit{
Host: gHost,
Info: proto.Clone(ci1).(*gerritpb.ChangeInfo),
},
},
Deps: []*changelist.Dep{
{Clid: 2, Kind: changelist.DepKind_HARD},
},
},
},
&run.RunCL{
ID: 2,
Run: datastore.MakeKey(ctx, common.RunKind, string(rid)),
ExternalID: changelist.MustGobID(gHost, ci2.GetNumber()),
Detail: &changelist.Snapshot{
Kind: &changelist.Snapshot_Gerrit{
Gerrit: &changelist.Gerrit{
Host: gHost,
Info: proto.Clone(ci2).(*gerritpb.ChangeInfo),
},
},
},
},
), ShouldBeNil)
rs := &state.RunState{Run: r}
h, deps := makeTestHandler(&ct)
statuses := []run.Status{
run.Status_SUCCEEDED,
run.Status_FAILED,
run.Status_CANCELLED,
}
for _, status := range statuses {
Convey(fmt.Sprintf("Release submit queue when Run is %s", status), func() {
So(datastore.RunInTransaction(ctx, func(ctx context.Context) error {
waitlisted, err := submit.TryAcquire(ctx, deps.rm.NotifyReadyForSubmission, rs.ID, nil)
So(waitlisted, ShouldBeFalse)
return err
}, nil), ShouldBeNil)
rs.Status = status
res, err := h.OnReadyForSubmission(ctx, rs)
So(err, ShouldBeNil)
expectedState := &state.RunState{
Run: rs.Run,
LogEntries: []*run.LogEntry{
{
Time: timestamppb.New(clock.Now(ctx)),
Kind: &run.LogEntry_ReleasedSubmitQueue_{
ReleasedSubmitQueue: &run.LogEntry_ReleasedSubmitQueue{},
},
},
},
}
So(res.State, cvtesting.SafeShouldResemble, expectedState)
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
current, waitlist, err := submit.LoadCurrentAndWaitlist(ctx, rs.ID)
So(err, ShouldBeNil)
So(current, ShouldBeEmpty)
So(waitlist, ShouldBeEmpty)
})
}
Convey("No-Op when status is SUBMITTING", func() {
rs.Status = run.Status_SUBMITTING
res, err := h.OnReadyForSubmission(ctx, rs)
So(err, ShouldBeNil)
So(res.State, ShouldEqual, rs)
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
})
for _, status := range []run.Status{run.Status_RUNNING, run.Status_WAITING_FOR_SUBMISSION} {
now := ct.Clock.Now().UTC()
ctx = context.WithValue(ctx, &fakeTaskIDKey, "task-foo")
Convey(fmt.Sprintf("When status is %s", status), func() {
rs.Status = status
Convey("Mark submitting if Submit Queue is acquired and tree is open", func() {
res, err := h.OnReadyForSubmission(ctx, rs)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_SUBMITTING)
So(res.State.Submission, ShouldResembleProto, &run.Submission{
Deadline: timestamppb.New(now.Add(submissionDuration)),
Cls: []int64{2, 1}, // in submission order
TaskId: "task-foo",
TreeOpen: true,
LastTreeCheckTime: timestamppb.New(now),
})
So(res.State.SubmissionScheduled, ShouldBeTrue)
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldNotBeNil)
So(submit.MustCurrentRun(ctx, lProject), ShouldEqual, rid)
runtest.AssertReceivedReadyForSubmission(ctx, rid, now.Add(10*time.Second))
So(res.State.LogEntries, ShouldHaveLength, 2)
So(res.State.LogEntries[0].Kind, ShouldHaveSameTypeAs, &run.LogEntry_AcquiredSubmitQueue_{})
So(res.State.LogEntries[1].Kind.(*run.LogEntry_TreeChecked_).TreeChecked.Open, ShouldBeTrue)
// SubmitQueue not yet released.
})
Convey("Add Run to waitlist when Submit Queue is occupied", func() {
// another run has taken the current slot
anotherRunID := common.MakeRunID(lProject, now, 1, []byte("cafecafe"))
So(datastore.RunInTransaction(ctx, func(ctx context.Context) error {
_, err := submit.TryAcquire(ctx, deps.rm.NotifyReadyForSubmission, anotherRunID, nil)
So(err, ShouldBeNil)
return nil
}, nil), ShouldBeNil)
So(submit.MustCurrentRun(ctx, lProject), ShouldEqual, anotherRunID)
res, err := h.OnReadyForSubmission(ctx, rs)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_WAITING_FOR_SUBMISSION)
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
_, waitlist, err := submit.LoadCurrentAndWaitlist(ctx, rid)
So(err, ShouldBeNil)
So(waitlist.Index(rid), ShouldEqual, 0)
So(res.State.LogEntries, ShouldHaveLength, 1)
So(res.State.LogEntries[0].Kind, ShouldHaveSameTypeAs, &run.LogEntry_Waitlisted_{})
})
Convey("Revisit after 1 mintues if tree is closed", func() {
ct.TreeFake.ModifyState(ctx, tree.Closed)
res, err := h.OnReadyForSubmission(ctx, rs)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_WAITING_FOR_SUBMISSION)
So(res.State.Submission, ShouldResembleProto, &run.Submission{
TreeOpen: false,
LastTreeCheckTime: timestamppb.New(now),
})
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
runtest.AssertReceivedPoke(ctx, rid, now.Add(1*time.Minute))
// The Run must not occupy the Submit Queue
So(submit.MustCurrentRun(ctx, lProject), ShouldNotEqual, rid)
So(res.State.LogEntries, ShouldHaveLength, 3)
So(res.State.LogEntries[0].Kind, ShouldHaveSameTypeAs, &run.LogEntry_AcquiredSubmitQueue_{})
So(res.State.LogEntries[1].Kind, ShouldHaveSameTypeAs, &run.LogEntry_TreeChecked_{})
So(res.State.LogEntries[2].Kind, ShouldHaveSameTypeAs, &run.LogEntry_ReleasedSubmitQueue_{})
So(res.State.LogEntries[1].Kind.(*run.LogEntry_TreeChecked_).TreeChecked.Open, ShouldBeFalse)
})
Convey("Set TreeErrorSince on first failure", func() {
ct.TreeFake.ModifyState(ctx, tree.StateUnknown)
ct.TreeFake.InjectErr(fmt.Errorf("error while fetching tree status"))
res, err := h.OnReadyForSubmission(ctx, rs)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_WAITING_FOR_SUBMISSION)
So(res.State.Submission, ShouldResembleProto, &run.Submission{
TreeOpen: false,
LastTreeCheckTime: timestamppb.New(now),
TreeErrorSince: timestamppb.New(now),
})
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
runtest.AssertReceivedPoke(ctx, rid, now.Add(1*time.Minute))
// The Run must not occupy the Submit Queue
So(submit.MustCurrentRun(ctx, lProject), ShouldNotEqual, rid)
So(res.State.LogEntries, ShouldHaveLength, 2)
So(res.State.LogEntries[0].Kind, ShouldHaveSameTypeAs, &run.LogEntry_AcquiredSubmitQueue_{})
So(res.State.LogEntries[1].Kind, ShouldHaveSameTypeAs, &run.LogEntry_ReleasedSubmitQueue_{})
})
})
}
})
}
func TestOnSubmissionCompleted(t *testing.T) {
t.Parallel()
Convey("OnSubmissionCompleted", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
const lProject = "infra"
const gHost = "x-review.example.com"
rid := common.MakeRunID(lProject, ct.Clock.Now().Add(-2*time.Minute), 1, []byte("deadbeef"))
runCLs := common.CLIDs{1, 2}
r := run.Run{
ID: rid,
Mode: run.FullRun,
Status: run.Status_SUBMITTING,
CreateTime: ct.Clock.Now().UTC().Add(-2 * time.Minute),
StartTime: ct.Clock.Now().UTC().Add(-1 * time.Minute),
CLs: runCLs,
}
So(datastore.Put(ctx, &r), ShouldBeNil)
cg := &cfgpb.Config{
ConfigGroups: []*cfgpb.ConfigGroup{
{Name: "main"},
},
}
prjcfgtest.Create(ctx, rid.LUCIProject(), cg)
meta, err := prjcfg.GetLatestMeta(ctx, rid.LUCIProject())
So(err, ShouldBeNil)
So(meta.ConfigGroupIDs, ShouldHaveLength, 1)
r.ConfigGroupID = meta.ConfigGroupIDs[0]
genCL := func(clid common.CLID, change int, deps ...common.CLID) (*gerritpb.ChangeInfo, *changelist.CL, *run.RunCL) {
ci := gf.CI(
change, gf.PS(2),
gf.Owner("user-99"),
gf.CQ(1, ct.Clock.Now().Add(-5*time.Minute), gf.U("user-101")),
gf.CQ(2, ct.Clock.Now().Add(-2*time.Minute), gf.U("user-100")),
gf.Updated(clock.Now(ctx).Add(-1*time.Minute)))
triggers := trigger.Find(&trigger.FindInput{ChangeInfo: ci, ConfigGroup: cg.ConfigGroups[0]})
So(triggers.GetCqVoteTrigger(), ShouldResembleProto, &run.Trigger{
Time: timestamppb.New(ct.Clock.Now().Add(-2 * time.Minute)),
Mode: string(run.FullRun),
Email: "user-100@example.com",
GerritAccountId: 100,
})
cl := &changelist.CL{
ID: clid,
ExternalID: changelist.MustGobID(gHost, ci.GetNumber()),
EVersion: 10,
Snapshot: &changelist.Snapshot{
ExternalUpdateTime: timestamppb.New(clock.Now(ctx).Add(-1 * time.Minute)),
LuciProject: lProject,
Patchset: 2,
MinEquivalentPatchset: 1,
Kind: &changelist.Snapshot_Gerrit{
Gerrit: &changelist.Gerrit{
Host: gHost,
Info: proto.Clone(ci).(*gerritpb.ChangeInfo),
},
},
},
}
runCL := &run.RunCL{
ID: clid,
Run: datastore.MakeKey(ctx, common.RunKind, string(rid)),
ExternalID: changelist.MustGobID(gHost, ci.GetNumber()),
Detail: &changelist.Snapshot{
Kind: &changelist.Snapshot_Gerrit{
Gerrit: &changelist.Gerrit{
Host: gHost,
Info: proto.Clone(ci).(*gerritpb.ChangeInfo),
},
},
},
Trigger: triggers.GetCqVoteTrigger(),
}
if len(deps) > 0 {
cl.Snapshot.Deps = make([]*changelist.Dep, len(deps))
runCL.Detail.Deps = make([]*changelist.Dep, len(deps))
for i, dep := range deps {
cl.Snapshot.Deps[i] = &changelist.Dep{
Clid: int64(dep),
Kind: changelist.DepKind_HARD,
}
runCL.Detail.Deps[i] = &changelist.Dep{
Clid: int64(dep),
Kind: changelist.DepKind_HARD,
}
}
}
return ci, cl, runCL
}
ci1, cl1, runCL1 := genCL(1, 1111, 2)
ci2, cl2, runCL2 := genCL(2, 2222)
So(datastore.Put(ctx, cl1, cl2, runCL1, runCL2), ShouldBeNil)
ct.GFake.CreateChange(&gf.Change{
Host: gHost,
Info: proto.Clone(ci1).(*gerritpb.ChangeInfo),
ACLs: gf.ACLRestricted(lProject),
})
ct.GFake.CreateChange(&gf.Change{
Host: gHost,
Info: proto.Clone(ci2).(*gerritpb.ChangeInfo),
ACLs: gf.ACLRestricted(lProject),
})
ct.GFake.SetDependsOn(gHost, ci1, ci2)
rs := &state.RunState{Run: r}
h, deps := makeTestHandler(&ct)
statuses := []run.Status{
run.Status_SUCCEEDED,
run.Status_FAILED,
run.Status_CANCELLED,
}
for _, status := range statuses {
Convey(fmt.Sprintf("Release submit queue when Run is %s", status), func() {
So(datastore.RunInTransaction(ctx, func(ctx context.Context) error {
waitlisted, err := submit.TryAcquire(ctx, deps.rm.NotifyReadyForSubmission, rs.ID, nil)
So(waitlisted, ShouldBeFalse)
return err
}, nil), ShouldBeNil)
rs.Status = status
res, err := h.OnSubmissionCompleted(ctx, rs, nil)
So(err, ShouldBeNil)
expectedState := &state.RunState{
Run: rs.Run,
LogEntries: []*run.LogEntry{
{
Time: timestamppb.New(clock.Now(ctx)),
Kind: &run.LogEntry_ReleasedSubmitQueue_{
ReleasedSubmitQueue: &run.LogEntry_ReleasedSubmitQueue{},
},
},
},
}
So(res.State, cvtesting.SafeShouldResemble, expectedState)
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
current, waitlist, err := submit.LoadCurrentAndWaitlist(ctx, rs.ID)
So(err, ShouldBeNil)
So(current, ShouldBeEmpty)
So(waitlist, ShouldBeEmpty)
})
}
ctx = context.WithValue(ctx, &fakeTaskIDKey, "task-foo")
Convey("Succeeded", func() {
sc := &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_SUCCEEDED,
}
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_SUCCEEDED)
So(res.State.EndTime, ShouldEqual, ct.Clock.Now().UTC())
So(res.SideEffectFn, ShouldNotBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
})
selfSetReviewRequests := func() (ret []*gerritpb.SetReviewRequest) {
for _, req := range ct.GFake.Requests() {
switch r, ok := req.(*gerritpb.SetReviewRequest); {
case !ok:
case r.GetOnBehalfOf() != 0:
default:
ret = append(ret, r)
}
}
sort.SliceStable(ret, func(i, j int) bool {
return ret[i].Number < ret[j].Number
})
return
}
assertNotify := func(req *gerritpb.SetReviewRequest, accts ...int64) {
So(req, ShouldNotBeNil)
So(req.GetNotify(), ShouldEqual, gerritpb.Notify_NOTIFY_NONE)
So(req.GetNotifyDetails(), ShouldResembleProto, &gerritpb.NotifyDetails{
Recipients: []*gerritpb.NotifyDetails_Recipient{
{
RecipientType: gerritpb.NotifyDetails_RECIPIENT_TYPE_TO,
Info: &gerritpb.NotifyDetails_Info{
Accounts: accts,
},
},
},
})
}
assertAttentionSet := func(req *gerritpb.SetReviewRequest, reason string, accs ...int64) {
So(req, ShouldNotBeNil)
expected := []*gerritpb.AttentionSetInput{}
for _, a := range accs {
expected = append(
expected,
&gerritpb.AttentionSetInput{
User: strconv.FormatInt(a, 10),
Reason: "ps#2: " + reason,
},
)
}
actual := req.GetAddToAttentionSet()
sort.SliceStable(actual, func(i, j int) bool {
lhs, _ := strconv.Atoi(actual[i].User)
rhs, _ := strconv.Atoi(actual[j].User)
return lhs < rhs
})
So(actual, ShouldResembleProto, expected)
}
Convey("Transient failure", func() {
sc := &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_FAILED_TRANSIENT,
}
Convey("When deadline is not exceeded", func() {
rs.Submission = &run.Submission{
Deadline: timestamppb.New(ct.Clock.Now().UTC().Add(10 * time.Minute)),
}
Convey("Resume submission if TaskID matches", func() {
rs.Submission.TaskId = "task-foo" // same task ID as the current task
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_SUBMITTING)
So(res.State.Submission, ShouldResembleProto, &run.Submission{
Deadline: timestamppb.New(ct.Clock.Now().UTC().Add(10 * time.Minute)),
TaskId: "task-foo",
}) // unchanged
So(res.State.SubmissionScheduled, ShouldBeTrue)
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldNotBeNil)
})
Convey("Invoke RM at deadline if TaskID doesn't match", func() {
ctx, rmDispatcher := runtest.MockDispatch(ctx)
rs.Submission.TaskId = "another-task"
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
expectedState := &state.RunState{
Run: rs.Run,
LogEntries: []*run.LogEntry{
{
Time: timestamppb.New(clock.Now(ctx)),
Kind: &run.LogEntry_SubmissionFailure_{
SubmissionFailure: &run.LogEntry_SubmissionFailure{
Event: &eventpb.SubmissionCompleted{Result: eventpb.SubmissionResult_FAILED_TRANSIENT},
},
},
},
},
}
So(res.State, cvtesting.SafeShouldResemble, expectedState)
So(res.SideEffectFn, ShouldBeNil)
So(res.PreserveEvents, ShouldBeTrue)
So(res.PostProcessFn, ShouldBeNil)
So(rmDispatcher.LatestETAof(string(rid)), ShouldHappenOnOrAfter, rs.Submission.Deadline.AsTime())
})
})
Convey("When deadline is exceeded", func() {
rs.Submission = &run.Submission{
Deadline: timestamppb.New(ct.Clock.Now().UTC().Add(-10 * time.Minute)),
TaskId: "task-foo",
}
So(datastore.RunInTransaction(ctx, func(ctx context.Context) error {
waitlisted, err := submit.TryAcquire(ctx, deps.rm.NotifyReadyForSubmission, rid, nil)
So(waitlisted, ShouldBeFalse)
return err
}, nil), ShouldBeNil)
Convey("Single CL Run", func() {
rs.Submission.Cls = []int64{2}
Convey("Not submitted", func() {
runAndVerify := func(verifyMsgFn func(lastMsg string)) {
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_FAILED)
So(res.State.EndTime, ShouldEqual, ct.Clock.Now())
for i, f := range sc.GetClFailures().GetFailures() {
So(res.State.Submission.GetFailedCls()[i], ShouldEqual, f.GetClid())
}
So(res.SideEffectFn, ShouldNotBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
ci := ct.GFake.GetChange(gHost, int(ci2.GetNumber())).Info
verifyMsgFn(gf.LastMessage(ci).GetMessage())
for _, vote := range ci.GetLabels()[trigger.CQLabelName].GetAll() {
So(vote.GetValue(), ShouldEqual, 0)
}
So(submit.MustCurrentRun(ctx, lProject), ShouldNotEqual, rs.ID)
}
Convey("CL failure", func() {
sc.FailureReason = &eventpb.SubmissionCompleted_ClFailures{
ClFailures: &eventpb.SubmissionCompleted_CLSubmissionFailures{
Failures: []*eventpb.SubmissionCompleted_CLSubmissionFailure{
{Clid: 2, Message: "some transient failure"},
},
},
}
runAndVerify(func(lastMsg string) {
So(lastMsg, ShouldContainSubstring, "CL failed to submit because of transient failure: some transient failure. However, submission is running out of time to retry.")
So(lastMsg, ShouldNotContainSubstring, "None of the CLs in the Run has been submitted")
})
reqs := selfSetReviewRequests()
So(reqs, ShouldHaveLength, 1)
So(reqs[0].GetNumber(), ShouldEqual, ci2.GetNumber())
assertNotify(reqs[0], 99, 100, 101)
assertAttentionSet(reqs[0], submissionFailureAttentionReason, 99, 100, 101)
})
Convey("Unclassified failure", func() {
runAndVerify(func(lastMsg string) {
So(lastMsg, ShouldContainSubstring, timeoutMsg)
So(lastMsg, ShouldNotContainSubstring, "None of the CLs in the Run were submitted by CV")
})
})
})
Convey("Submitted", func() {
rs.Submission.SubmittedCls = []int64{2}
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_SUCCEEDED)
So(res.State.EndTime, ShouldEqual, ct.Clock.Now())
So(res.SideEffectFn, ShouldNotBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
So(ct.GFake.GetChange(gHost, int(ci2.GetNumber())).Info, ShouldResembleProto, ci2) // unchanged
So(submit.MustCurrentRun(ctx, lProject), ShouldNotEqual, rs.ID)
})
})
Convey("Multi CLs Run", func() {
rs.Submission.Cls = []int64{2, 1}
runAndVerify := func(verifyMsgFn func(changeNum int64, lastMsg string)) {
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_FAILED)
So(res.State.EndTime, ShouldEqual, ct.Clock.Now())
for i, f := range sc.GetClFailures().GetFailures() {
So(res.State.Submission.GetFailedCls()[i], ShouldEqual, f.GetClid())
}
So(res.SideEffectFn, ShouldNotBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
for _, ci := range []*gerritpb.ChangeInfo{ci1, ci2} {
ci := ct.GFake.GetChange(gHost, int(ci.GetNumber())).Info
verifyMsgFn(ci.GetNumber(), gf.LastMessage(ci).GetMessage())
if ct.GFake.GetChange(gHost, int(ci.GetNumber())).Info.GetStatus() != gerritpb.ChangeStatus_MERGED {
for _, vote := range ci.GetLabels()[trigger.CQLabelName].GetAll() {
So(vote.GetValue(), ShouldEqual, 0)
}
}
}
So(submit.MustCurrentRun(ctx, lProject), ShouldNotEqual, rs.ID)
}
Convey("None of the CLs are submitted", func() {
Convey("CL failure", func() {
sc.FailureReason = &eventpb.SubmissionCompleted_ClFailures{
ClFailures: &eventpb.SubmissionCompleted_CLSubmissionFailures{
Failures: []*eventpb.SubmissionCompleted_CLSubmissionFailure{
{Clid: 2, Message: "some transient failure"},
},
},
}
runAndVerify(func(changeNum int64, lastMsg string) {
switch changeNum {
case ci1.GetNumber():
So(lastMsg, ShouldContainSubstring, "This CL is not submitted because submission has failed for the following CL(s) which this CL depends on.\n* https://x-review.example.com/c/2222")
case ci2.GetNumber():
So(lastMsg, ShouldContainSubstring, "CL failed to submit because of transient failure: some transient failure. However, submission is running out of time to retry.")
default:
panic(fmt.Errorf("unknown change: %d", changeNum))
}
So(lastMsg, ShouldContainSubstring, "None of the CLs in the Run has been submitted")
So(lastMsg, ShouldContainSubstring, "CLs:\n* https://x-review.example.com/c/2222\n* https://x-review.example.com/c/1111")
})
reqs := selfSetReviewRequests()
So(reqs, ShouldHaveLength, 2) // each for CL1 and CL2
So(reqs[0].GetNumber(), ShouldEqual, ci1.GetNumber())
assertNotify(reqs[0], 99, 100, 101)
assertAttentionSet(reqs[0], submissionFailureAttentionReason, 99, 100, 101)
So(reqs[1].GetNumber(), ShouldEqual, ci2.GetNumber())
assertNotify(reqs[0], 99, 100, 101)
assertAttentionSet(reqs[0], submissionFailureAttentionReason, 99, 100, 101)
})
Convey("Unclassified failure", func() {
runAndVerify(func(_ int64, lastMsg string) {
So(lastMsg, ShouldContainSubstring, timeoutMsg)
So(lastMsg, ShouldContainSubstring, "None of the CLs in the Run has been submitted")
So(lastMsg, ShouldContainSubstring, "CLs:\n* https://x-review.example.com/c/2222\n* https://x-review.example.com/c/1111")
})
})
})
Convey("CLs partially submitted", func() {
rs.Submission.SubmittedCls = []int64{2}
ct.GFake.MutateChange(gHost, int(ci2.GetNumber()), func(c *gf.Change) {
gf.PS(int(ci2.GetRevisions()[ci2.GetCurrentRevision()].GetNumber()) + 1)(c.Info)
gf.Status(gerritpb.ChangeStatus_MERGED)(c.Info)
})
Convey("CL failure", func() {
sc.FailureReason = &eventpb.SubmissionCompleted_ClFailures{
ClFailures: &eventpb.SubmissionCompleted_CLSubmissionFailures{
Failures: []*eventpb.SubmissionCompleted_CLSubmissionFailure{
{Clid: 1, Message: "some transient failure"},
},
},
}
runAndVerify(func(changeNum int64, lastMsg string) {
switch changeNum {
case ci1.GetNumber():
So(lastMsg, ShouldContainSubstring, "CL failed to submit because of transient failure: some transient failure. However, submission is running out of time to retry.")
So(lastMsg, ShouldContainSubstring, "CLs in the Run have been submitted partially.")
So(lastMsg, ShouldContainSubstring, "Not submitted:\n* https://x-review.example.com/c/1111")
So(lastMsg, ShouldContainSubstring, "Submitted:\n* https://x-review.example.com/c/2222")
case ci2.GetNumber():
default:
panic(fmt.Errorf("unknown change: %d", changeNum))
}
})
reqs := selfSetReviewRequests()
So(reqs, ShouldHaveLength, 2) // for both submitted and failed CLs
So(reqs[0].GetNumber(), ShouldEqual, ci1.GetNumber())
assertNotify(reqs[0], 99, 100, 101)
assertAttentionSet(reqs[0], submissionFailureAttentionReason, 99, 100, 101)
So(reqs[0].Message, ShouldContainSubstring, "CL failed to submit because of transient failure")
// The 2nd Gerrit message should be for the submitted CL to indicate
// the submission failure on the dependent CLs.
assertNotify(reqs[1], 99, 100, 101)
assertAttentionSet(reqs[1], "failed to submit dependent CLs", 99, 100, 101)
So(reqs[1].Message, ShouldContainSubstring, "This CL is submitted. However, submission has failed for the following CL(s) which depend on this CL.")
})
Convey("Unclassified failure", func() {
runAndVerify(func(changeNum int64, lastMsg string) {
switch changeNum {
case ci1.GetNumber():
So(lastMsg, ShouldContainSubstring, timeoutMsg)
So(lastMsg, ShouldContainSubstring, "CLs in the Run have been submitted partially")
So(lastMsg, ShouldContainSubstring, "Not submitted:\n* https://x-review.example.com/c/1111")
So(lastMsg, ShouldContainSubstring, "Submitted:\n* https://x-review.example.com/c/2222")
case ci2.GetNumber():
default:
panic(fmt.Errorf("unknown change: %d", changeNum))
}
})
})
})
Convey("CLs fully submitted", func() {
rs.Submission.SubmittedCls = []int64{2, 1}
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_SUCCEEDED)
So(res.State.EndTime, ShouldEqual, ct.Clock.Now())
So(res.SideEffectFn, ShouldNotBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
// both untouched
So(ct.GFake.GetChange(gHost, int(ci1.GetNumber())).Info, ShouldResembleProto, ci1)
So(ct.GFake.GetChange(gHost, int(ci2.GetNumber())).Info, ShouldResembleProto, ci2)
So(submit.MustCurrentRun(ctx, lProject), ShouldNotEqual, rs.ID)
})
})
})
})
Convey("Permanent failure", func() {
sc := &eventpb.SubmissionCompleted{
Result: eventpb.SubmissionResult_FAILED_PERMANENT,
}
rs.Submission = &run.Submission{
Deadline: timestamppb.New(ct.Clock.Now().UTC().Add(10 * time.Minute)),
TaskId: "task-foo",
}
Convey("Single CL Run", func() {
rs.Submission.Cls = []int64{2}
runAndVerify := func(verifyMsgFn func(lastMsg string)) {
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_FAILED)
So(res.State.EndTime, ShouldEqual, ct.Clock.Now())
for i, f := range sc.GetClFailures().GetFailures() {
So(res.State.Submission.GetFailedCls()[i], ShouldEqual, f.GetClid())
}
So(res.SideEffectFn, ShouldNotBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
ci := ct.GFake.GetChange(gHost, int(ci2.GetNumber())).Info
verifyMsgFn(gf.LastMessage(ci).GetMessage())
for _, vote := range ci.GetLabels()[trigger.CQLabelName].GetAll() {
So(vote.GetValue(), ShouldEqual, 0)
}
}
Convey("CL Submission failure", func() {
sc.FailureReason = &eventpb.SubmissionCompleted_ClFailures{
ClFailures: &eventpb.SubmissionCompleted_CLSubmissionFailures{
Failures: []*eventpb.SubmissionCompleted_CLSubmissionFailure{
{
Clid: 2,
Message: "CV failed to submit this CL because of merge conflict",
},
},
},
}
runAndVerify(func(lastMsg string) {
So(lastMsg, ShouldContainSubstring, "CV failed to submit this CL because of merge conflict")
So(lastMsg, ShouldNotContainSubstring, "None of the CLs in the Run were submitted by CV")
})
reqs := selfSetReviewRequests()
So(reqs, ShouldHaveLength, 1)
So(reqs[0].GetNumber(), ShouldEqual, ci2.GetNumber())
assertNotify(reqs[0], 99, 100, 101)
assertAttentionSet(reqs[0], submissionFailureAttentionReason, 99, 100, 101)
})
Convey("Unclassified failure", func() {
runAndVerify(func(lastMsg string) {
So(lastMsg, ShouldContainSubstring, defaultMsg)
So(lastMsg, ShouldNotContainSubstring, "None of the CLs in the Run were submitted by CV")
})
})
})
Convey("Multi CLs Run", func() {
rs.Submission.Cls = []int64{2, 1}
runAndVerify := func(verifyMsgFn func(changeNum int64, lastMsg string)) {
res, err := h.OnSubmissionCompleted(ctx, rs, sc)
So(err, ShouldBeNil)
So(res.State.Status, ShouldEqual, run.Status_FAILED)
So(res.State.EndTime, ShouldEqual, ct.Clock.Now())
So(res.SideEffectFn, ShouldNotBeNil)
So(res.PreserveEvents, ShouldBeFalse)
So(res.PostProcessFn, ShouldBeNil)
for _, ci := range []*gerritpb.ChangeInfo{ci1, ci2} {
ci := ct.GFake.GetChange(gHost, int(ci.GetNumber())).Info
verifyMsgFn(ci.GetNumber(), gf.LastMessage(ci).GetMessage())
if ct.GFake.GetChange(gHost, int(ci.GetNumber())).Info.GetStatus() != gerritpb.ChangeStatus_MERGED {
for _, vote := range ci.GetLabels()[trigger.CQLabelName].GetAll() {
So(vote.GetValue(), ShouldEqual, 0)
}
}
}
}
Convey("None of the CLs are submitted", func() {
Convey("CL Submission failure", func() {
sc.FailureReason = &eventpb.SubmissionCompleted_ClFailures{
ClFailures: &eventpb.SubmissionCompleted_CLSubmissionFailures{
Failures: []*eventpb.SubmissionCompleted_CLSubmissionFailure{
{
Clid: 2,
Message: "Failed to submit this CL because of merge conflict",
},
},
},
}
runAndVerify(func(changeNum int64, lastMsg string) {
switch changeNum {
case 1111:
So(lastMsg, ShouldContainSubstring, "This CL is not submitted because submission has failed for the following CL(s) which this CL depends on.\n* https://x-review.example.com/c/2222")
case 2222:
So(lastMsg, ShouldContainSubstring, "Failed to submit this CL because of merge conflict")
}
So(lastMsg, ShouldContainSubstring, "None of the CLs in the Run has been submitted")
So(lastMsg, ShouldContainSubstring, "CLs:\n* https://x-review.example.com/c/2222\n* https://x-review.example.com/c/1111")
})
reqs := selfSetReviewRequests()
So(reqs, ShouldHaveLength, 2) // each for CL1 and CL2
So(reqs[0].GetNumber(), ShouldEqual, ci1.GetNumber())
assertNotify(reqs[0], 99, 100, 101)
assertAttentionSet(reqs[0], submissionFailureAttentionReason, 99, 100, 101)
So(reqs[1].GetNumber(), ShouldEqual, ci2.GetNumber())
assertNotify(reqs[1], 99, 100, 101)
assertAttentionSet(reqs[1], submissionFailureAttentionReason, 99, 100, 101)
})
Convey("Unclassified failure", func() {
runAndVerify(func(changeNum int64, lastMsg string) {
So(lastMsg, ShouldContainSubstring, defaultMsg)
So(lastMsg, ShouldContainSubstring, "None of the CLs in the Run has been submitted")
So(lastMsg, ShouldContainSubstring, "CLs:\n* https://x-review.example.com/c/2222\n* https://x-review.example.com/c/1111")
})
})
})
Convey("CLs partially submitted", func() {
rs.Submission.SubmittedCls = []int64{2}
ct.GFake.MutateChange(gHost, int(ci2.GetNumber()), func(c *gf.Change) {
gf.PS(int(ci2.GetRevisions()[ci2.GetCurrentRevision()].GetNumber()) + 1)(c.Info)
gf.Status(gerritpb.ChangeStatus_MERGED)(c.Info)
})
Convey("CL Submission failure", func() {
sc.FailureReason = &eventpb.SubmissionCompleted_ClFailures{
ClFailures: &eventpb.SubmissionCompleted_CLSubmissionFailures{
Failures: []*eventpb.SubmissionCompleted_CLSubmissionFailure{
{
Clid: 1,
Message: "Failed to submit this CL because of merge conflict",
},
},
},
}
runAndVerify(func(changeNum int64, lastMsg string) {
if changeNum == ci1.GetNumber() {
So(lastMsg, ShouldContainSubstring, "Failed to submit this CL because of merge conflict")
}
})
reqs := selfSetReviewRequests()
So(reqs, ShouldHaveLength, 2) // for both submitted and failed CLs
So(reqs[0].GetNumber(), ShouldEqual, ci1.GetNumber())
assertNotify(reqs[0], 99, 100, 101)
assertAttentionSet(reqs[0], submissionFailureAttentionReason, 99, 100, 101)
So(reqs[0].Message, ShouldContainSubstring, "Failed to submit this CL")
// The 2nd Gerrit message should be for the submitted CL to indicate
// the submission failure on the dependent CLs.
assertNotify(reqs[1], 99, 100, 101)
assertAttentionSet(reqs[1], "failed to submit dependent CLs", 99, 100, 101)
So(reqs[1].Message, ShouldContainSubstring, "This CL is submitted. However, submission has failed for the following CL(s) which depend on this CL.")
})
Convey("don't attempt posting dependent failure message if posted already", func() {
ct.GFake.MutateChange(gHost, int(ci2.GetNumber()), func(c *gf.Change) {
msgs := c.Info.GetMessages()
msgs = append(msgs, &gerritpb.ChangeMessageInfo{
Message: partiallySubmittedMsgForSubmittedCLs,
})
gf.Messages(msgs...)(c.Info)
})
runAndVerify(func(changeNum int64, lastMsg string) {
if changeNum == ci2.GetNumber() {
So(lastMsg, ShouldContainSubstring, partiallySubmittedMsgForSubmittedCLs)
}
})
reqs := selfSetReviewRequests()
So(reqs, ShouldHaveLength, 1)
So(reqs[0].GetNumber(), ShouldEqual, ci1.GetNumber()) // no request to ci2
})
Convey("Unclassified failure", func() {
runAndVerify(func(changeNum int64, lastMsg string) {
if changeNum == ci1.GetNumber() {
So(lastMsg, ShouldContainSubstring, defaultMsg)
}
})
})
})
})
})
})
}
func TestOnCLsSubmitted(t *testing.T) {
t.Parallel()
Convey("OnCLsSubmitted", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
rid := common.MakeRunID("infra", ct.Clock.Now(), 1, []byte("deadbeef"))
rs := &state.RunState{Run: run.Run{
ID: rid,
Status: run.Status_SUBMITTING,
CreateTime: ct.Clock.Now().UTC().Add(-2 * time.Minute),
StartTime: ct.Clock.Now().UTC().Add(-1 * time.Minute),
CLs: common.CLIDs{1, 3, 5, 7},
Submission: &run.Submission{
Cls: []int64{3, 1, 7, 5}, // in submission order
},
}}
h, _ := makeTestHandler(&ct)
Convey("Single", func() {
res, err := h.OnCLsSubmitted(ctx, rs, common.CLIDs{3})
So(err, ShouldBeNil)
So(res.State.Submission.SubmittedCls, ShouldResemble, []int64{3})
})
Convey("Duplicate", func() {
res, err := h.OnCLsSubmitted(ctx, rs, common.CLIDs{3, 3, 3, 3, 1, 1, 1})
So(err, ShouldBeNil)
So(res.State.Submission.SubmittedCls, ShouldResemble, []int64{3, 1})
})
Convey("Obey Submission order", func() {
res, err := h.OnCLsSubmitted(ctx, rs, common.CLIDs{1, 3, 5, 7})
So(err, ShouldBeNil)
So(res.State.Submission.SubmittedCls, ShouldResemble, []int64{3, 1, 7, 5})
})
Convey("Merge to existing", func() {
rs.Submission.SubmittedCls = []int64{3, 1}
// 1 should be deduped
res, err := h.OnCLsSubmitted(ctx, rs, common.CLIDs{1, 7})
So(err, ShouldBeNil)
So(res.State.Submission.SubmittedCls, ShouldResemble, []int64{3, 1, 7})
})
Convey("Last cl arrives first", func() {
res, err := h.OnCLsSubmitted(ctx, rs, common.CLIDs{5})
So(err, ShouldBeNil)
So(res.State.Submission.SubmittedCls, ShouldResemble, []int64{5})
rs = res.State
res, err = h.OnCLsSubmitted(ctx, rs, common.CLIDs{1, 3})
So(err, ShouldBeNil)
So(res.State.Submission.SubmittedCls, ShouldResemble, []int64{3, 1, 5})
rs = res.State
res, err = h.OnCLsSubmitted(ctx, rs, common.CLIDs{7})
So(err, ShouldBeNil)
So(res.State.Submission.SubmittedCls, ShouldResemble, []int64{3, 1, 7, 5})
})
Convey("Error for unknown CLs", func() {
res, err := h.OnCLsSubmitted(ctx, rs, common.CLIDs{1, 3, 5, 7, 9, 11})
So(err, ShouldErrLike, "received CLsSubmitted event for cls not belonging to this Run: [9 11]")
So(res, ShouldBeNil)
})
})
}