blob: 273dac6d618d7363451e0ec85cbb21668e0a5be1 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package impl
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/tq/tqtesting"
"go.chromium.org/luci/cv/internal/changelist"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/common/eventbox"
"go.chromium.org/luci/cv/internal/cvtesting"
"go.chromium.org/luci/cv/internal/gerrit/updater"
"go.chromium.org/luci/cv/internal/prjmanager"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/run/eventpb"
submitpb "go.chromium.org/luci/cv/internal/run/eventpb"
"go.chromium.org/luci/cv/internal/run/impl/handler"
"go.chromium.org/luci/cv/internal/run/impl/state"
"go.chromium.org/luci/cv/internal/run/runtest"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestRunManager(t *testing.T) {
t.Parallel()
Convey("RunManager", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
const runID = "chromium/222-1-deadbeef"
const initialEVersion = 10
So(datastore.Put(ctx, &run.Run{
ID: runID,
Status: run.Status_RUNNING,
EVersion: initialEVersion,
}), ShouldBeNil)
currentRun := func(ctx context.Context) *run.Run {
ret := &run.Run{ID: runID}
So(datastore.Get(ctx, ret), ShouldBeNil)
return ret
}
notifier := run.NewNotifier(ct.TQDispatcher)
pm := prjmanager.NewNotifier(ct.TQDispatcher)
clMutator := changelist.NewMutator(ct.TQDispatcher, pm, notifier)
u := updater.New(ct.TQDispatcher, ct.GFactory(), clMutator)
_ = New(notifier, pm, clMutator, u, ct.GFactory(), ct.TreeFake.Client(), ct.BQFake)
// sorted by the order of execution.
eventTestcases := []struct {
event *eventpb.Event
sendFn func(context.Context) error
invokedHandlerMethod string
}{
{
&eventpb.Event{
Event: &eventpb.Event_Cancel{
Cancel: &eventpb.Cancel{},
},
},
func(ctx context.Context) error {
return notifier.Cancel(ctx, runID)
},
"Cancel",
},
{
&eventpb.Event{
Event: &eventpb.Event_Start{
Start: &eventpb.Start{},
},
},
func(ctx context.Context) error {
return notifier.Start(ctx, runID)
},
"Start",
},
{
&eventpb.Event{
Event: &eventpb.Event_NewConfig{
NewConfig: &eventpb.NewConfig{
Hash: "deadbeef",
Eversion: 2,
},
},
},
func(ctx context.Context) error {
return notifier.UpdateConfig(ctx, runID, "deadbeef", 2)
},
"UpdateConfig",
},
{
&eventpb.Event{
Event: &eventpb.Event_ClsUpdated{
ClsUpdated: &changelist.CLUpdatedEvents{
Events: []*changelist.CLUpdatedEvent{
{
Clid: int64(1),
Eversion: int64(2),
},
},
},
},
},
func(ctx context.Context) error {
return notifier.NotifyCLsUpdated(ctx, runID, &changelist.CLUpdatedEvents{
Events: []*changelist.CLUpdatedEvent{
{
Clid: int64(1),
Eversion: int64(2),
},
},
})
},
"OnCLsUpdated",
},
{
&eventpb.Event{
Event: &eventpb.Event_CqdTryjobsUpdated{
CqdTryjobsUpdated: &eventpb.CQDTryjobsUpdated{},
},
},
func(ctx context.Context) error {
return notifier.NotifyCQDTryjobsUpdated(ctx, runID)
},
"OnCQDTryjobsUpdated",
},
{
&eventpb.Event{
Event: &eventpb.Event_CqdVerificationCompleted{
CqdVerificationCompleted: &eventpb.CQDVerificationCompleted{},
},
},
func(ctx context.Context) error {
return notifier.NotifyCQDVerificationCompleted(ctx, runID)
},
"OnCQDVerificationCompleted",
},
{
&eventpb.Event{
Event: &eventpb.Event_ClSubmitted{
ClSubmitted: &eventpb.CLSubmitted{
Clid: 1,
},
},
},
func(ctx context.Context) error {
return notifier.SendNow(ctx, runID, &eventpb.Event{
Event: &eventpb.Event_ClSubmitted{
ClSubmitted: &eventpb.CLSubmitted{
Clid: 1,
},
},
})
},
"OnCLSubmitted",
},
{
&eventpb.Event{
Event: &eventpb.Event_SubmissionCompleted{
SubmissionCompleted: &submitpb.SubmissionCompleted{
Result: submitpb.SubmissionResult_SUCCEEDED,
},
},
},
func(ctx context.Context) error {
return notifier.SendNow(ctx, runID, &eventpb.Event{
Event: &eventpb.Event_SubmissionCompleted{
SubmissionCompleted: &submitpb.SubmissionCompleted{
Result: submitpb.SubmissionResult_SUCCEEDED,
},
},
})
},
"OnSubmissionCompleted",
},
{
&eventpb.Event{
Event: &eventpb.Event_ReadyForSubmission{
ReadyForSubmission: &eventpb.ReadyForSubmission{},
},
},
func(ctx context.Context) error {
return notifier.SendNow(ctx, runID, &eventpb.Event{
Event: &eventpb.Event_ReadyForSubmission{
ReadyForSubmission: &eventpb.ReadyForSubmission{},
},
})
},
"OnReadyForSubmission",
},
{
&eventpb.Event{
Event: &eventpb.Event_Poke{
Poke: &eventpb.Poke{},
},
},
func(ctx context.Context) error {
return notifier.PokeNow(ctx, runID)
},
"Poke",
},
}
for _, et := range eventTestcases {
Convey(fmt.Sprintf("Can process Event %T", et.event.GetEvent()), func() {
fh := &fakeHandler{}
ctx = context.WithValue(ctx, &fakeHandlerKey, fh)
So(et.sendFn(ctx), ShouldBeNil)
runtest.AssertInEventbox(ctx, runID, et.event)
So(runtest.Runs(ct.TQ.Tasks()), ShouldResemble, common.RunIDs{runID})
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
So(fh.invocations[0], ShouldEqual, et.invokedHandlerMethod)
So(currentRun(ctx).EVersion, ShouldEqual, initialEVersion+1)
runtest.AssertNotInEventbox(ctx, runID, et.event) // consumed
})
}
Convey("Process Events in order", func() {
fh := &fakeHandler{}
ctx = context.WithValue(ctx, &fakeHandlerKey, fh)
var expectInvokedMethods []string
for _, et := range eventTestcases {
// skipping Cancel because when Start and Cancel are both present.
// only Cancel will execute. See next test
if et.event.GetCancel() == nil {
expectInvokedMethods = append(expectInvokedMethods, et.invokedHandlerMethod)
}
}
rand.Shuffle(len(eventTestcases), func(i, j int) {
eventTestcases[i], eventTestcases[j] = eventTestcases[j], eventTestcases[i]
})
for _, etc := range eventTestcases {
if etc.event.GetCancel() == nil {
So(etc.sendFn(ctx), ShouldBeNil)
}
}
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
expectInvokedMethods = append(expectInvokedMethods, "TryResumeSubmission") // always invoked
So(fh.invocations, ShouldResemble, expectInvokedMethods)
So(currentRun(ctx).EVersion, ShouldEqual, initialEVersion+1)
})
Convey("Don't Start if received both Cancel and Start Event", func() {
fh := &fakeHandler{}
ctx = context.WithValue(ctx, &fakeHandlerKey, fh)
notifier.Start(ctx, runID)
notifier.Cancel(ctx, runID)
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
So(fh.invocations[0], ShouldEqual, "Cancel")
for _, inv := range fh.invocations[1:] {
So(inv, ShouldNotEqual, "Start")
}
So(currentRun(ctx).EVersion, ShouldEqual, initialEVersion+1)
runtest.AssertNotInEventbox(ctx, runID, &eventpb.Event{
Event: &eventpb.Event_Cancel{
Cancel: &eventpb.Cancel{},
},
},
&eventpb.Event{
Event: &eventpb.Event_Start{
Start: &eventpb.Start{},
},
},
)
})
Convey("Can Preserve events", func() {
fh := &fakeHandler{preserveEvents: true}
ctx = context.WithValue(ctx, &fakeHandlerKey, fh)
So(notifier.Start(ctx, runID), ShouldBeNil)
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
So(currentRun(ctx).EVersion, ShouldEqual, initialEVersion+1)
runtest.AssertInEventbox(ctx, runID,
&eventpb.Event{
Event: &eventpb.Event_Start{
Start: &eventpb.Start{},
},
},
)
})
Convey("Can save RunLog", func() {
fh := &fakeHandler{startAddsLogEntries: []*run.LogEntry{
{
Time: timestamppb.New(clock.Now(ctx)),
Kind: &run.LogEntry_Created_{Created: &run.LogEntry_Created{
ConfigGroupId: "deadbeef/main",
}},
},
}}
ctx = context.WithValue(ctx, &fakeHandlerKey, fh)
So(notifier.Start(ctx, runID), ShouldBeNil)
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
So(currentRun(ctx).EVersion, ShouldEqual, initialEVersion+1)
entries, err := run.LoadRunLogEntries(ctx, runID)
So(err, ShouldBeNil)
So(entries, ShouldResembleProto, fh.startAddsLogEntries)
})
Convey("Can run PostProcessFn", func() {
var postProcessFnExecuted bool
fh := &fakeHandler{
postProcessFn: func(c context.Context) error {
postProcessFnExecuted = true
return nil
},
}
ctx = context.WithValue(ctx, &fakeHandlerKey, fh)
So(notifier.Start(ctx, runID), ShouldBeNil)
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
So(postProcessFnExecuted, ShouldBeTrue)
})
})
Convey("Poke", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
const runID = "chromium/222-1-deadbeef"
tCreate := ct.Clock.Now().UTC().Add(-2 * time.Minute)
So(datastore.Put(ctx, &run.Run{
ID: runID,
Status: run.Status_RUNNING,
CreateTime: tCreate,
StartTime: tCreate.Add(1 * time.Minute),
EVersion: 10,
}), ShouldBeNil)
notifier := run.NewNotifier(ct.TQDispatcher)
pm := prjmanager.NewNotifier(ct.TQDispatcher)
clMutator := changelist.NewMutator(ct.TQDispatcher, pm, notifier)
u := updater.New(ct.TQDispatcher, ct.GFactory(), clMutator)
_ = New(notifier, pm, clMutator, u, ct.GFactory(), ct.TreeFake.Client(), ct.BQFake)
Convey("Recursive", func() {
So(notifier.PokeNow(ctx, runID), ShouldBeNil)
So(runtest.Runs(ct.TQ.Tasks()), ShouldResemble, common.RunIDs{runID})
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
for i := 0; i < 10; i++ {
now := clock.Now(ctx)
runtest.AssertInEventbox(ctx, runID, &eventpb.Event{
Event: &eventpb.Event_Poke{
Poke: &eventpb.Poke{},
},
ProcessAfter: timestamppb.New(now.Add(pokeInterval)),
})
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
}
Convey("Stops after Run is finalized", func() {
So(datastore.Put(ctx, &run.Run{
ID: runID,
Status: run.Status_CANCELLED,
CreateTime: tCreate,
StartTime: tCreate.Add(1 * time.Minute),
EndTime: ct.Clock.Now().UTC(),
EVersion: 11,
}), ShouldBeNil)
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
runtest.AssertEventboxEmpty(ctx, runID)
})
})
Convey("Existing event due during the interval", func() {
So(notifier.PokeNow(ctx, runID), ShouldBeNil)
So(notifier.PokeAfter(ctx, runID, 30*time.Second), ShouldBeNil)
ct.TQ.Run(ctx, tqtesting.StopAfterTask(eventpb.ManageRunTaskClass))
runtest.AssertNotInEventbox(ctx, runID, &eventpb.Event{
Event: &eventpb.Event_Poke{
Poke: &eventpb.Poke{},
},
ProcessAfter: timestamppb.New(clock.Now(ctx).Add(pokeInterval)),
})
So(runtest.Tasks(ct.TQ.Tasks()), ShouldHaveLength, 1)
task := runtest.Tasks(ct.TQ.Tasks())[0]
So(task.ETA, ShouldResemble, clock.Now(ctx).UTC().Add(30*time.Second))
So(task.Payload, ShouldResembleProto, &eventpb.ManageRunTask{RunId: string(runID)})
})
})
}
type fakeHandler struct {
invocations []string
preserveEvents bool
postProcessFn eventbox.PostProcessFn
startAddsLogEntries []*run.LogEntry
}
var _ handler.Handler = &fakeHandler{}
func (fh *fakeHandler) Start(ctx context.Context, rs *state.RunState) (*handler.Result, error) {
fh.addInvocation("Start")
rs = rs.ShallowCopy()
if len(fh.startAddsLogEntries) > 0 {
rs.LogEntries = append(rs.LogEntries, fh.startAddsLogEntries...)
}
return &handler.Result{
State: rs,
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) Cancel(ctx context.Context, rs *state.RunState) (*handler.Result, error) {
fh.addInvocation("Cancel")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) OnCLsUpdated(ctx context.Context, rs *state.RunState, _ common.CLIDs) (*handler.Result, error) {
fh.addInvocation("OnCLsUpdated")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) OnReadyForSubmission(ctx context.Context, rs *state.RunState) (*handler.Result, error) {
fh.addInvocation("OnReadyForSubmission")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
// OnCLSubmitted records provided CLs have been submitted.
func (fh *fakeHandler) OnCLSubmitted(ctx context.Context, rs *state.RunState, clids common.CLIDs) (*handler.Result, error) {
fh.addInvocation("OnCLSubmitted")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) OnSubmissionCompleted(ctx context.Context, rs *state.RunState, sc *submitpb.SubmissionCompleted) (*handler.Result, error) {
fh.addInvocation("OnSubmissionCompleted")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) TryResumeSubmission(ctx context.Context, rs *state.RunState) (*handler.Result, error) {
fh.addInvocation("TryResumeSubmission")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) OnCQDTryjobsUpdated(ctx context.Context, rs *state.RunState) (*handler.Result, error) {
fh.addInvocation("OnCQDTryjobsUpdated")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) OnCQDVerificationCompleted(ctx context.Context, rs *state.RunState) (*handler.Result, error) {
fh.addInvocation("OnCQDVerificationCompleted")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) Poke(ctx context.Context, rs *state.RunState) (*handler.Result, error) {
fh.addInvocation("Poke")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) UpdateConfig(ctx context.Context, rs *state.RunState, hash string) (*handler.Result, error) {
fh.addInvocation("UpdateConfig")
return &handler.Result{
State: rs.ShallowCopy(),
PreserveEvents: fh.preserveEvents,
PostProcessFn: fh.postProcessFn,
}, nil
}
func (fh *fakeHandler) addInvocation(method string) {
fh.invocations = append(fh.invocations, method)
}