blob: c0861780c7946bce34c9309c1de2f4d625404d98 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"fmt"
"sort"
"testing"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
bbpb "go.chromium.org/luci/buildbucket/proto"
bbutil "go.chromium.org/luci/buildbucket/protoutil"
"go.chromium.org/luci/common/clock"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/gae/service/datastore"
cfgpb "go.chromium.org/luci/cv/api/config/v2"
"go.chromium.org/luci/cv/internal/configs/prjcfg/prjcfgtest"
gf "go.chromium.org/luci/cv/internal/gerrit/gerritfake"
"go.chromium.org/luci/cv/internal/run"
. "github.com/smartystreets/goconvey/convey"
)
// TODO(tandrii): this is a slow test (~0.6s on my laptop),
// but it will become faster once LoadGerritRuns is optimized.
func TestConcurrentRunsSingular(t *testing.T) {
t.Parallel()
Convey("CV juggles a bunch of concurrent Runs", t, func() {
ct := Test{}
ctx, cancel := ct.SetUp(t)
defer cancel()
const lProject = "infra"
const gHost = "g-review"
const gRepo = "re/po"
const gRef = "refs/heads/main"
const gChangeFirst = 1001
const N = 50
cfg := MakeCfgSingular("cg0", gHost, gRepo, gRef, &cfgpb.Verifiers_Tryjob_Builder{
Host: buildbucketHost,
Name: fmt.Sprintf("%s/try/test-builder", lProject),
})
ct.BuildbucketFake.EnsureBuilders(cfg)
prjcfgtest.Create(ctx, lProject, cfg)
So(ct.PMNotifier.UpdateConfig(ctx, lProject), ShouldBeNil)
// Prepare a bunch of actions to play out over time.
actions := make([]struct {
gChange int
user string
mode run.Mode
triggerTime time.Time // ever-increasing
finishTime time.Time // pseudo-random
finalStatus run.Status
}, N)
var expectSubmitted, expectFinished, expectFailed []int
for i := range actions {
a := &actions[i]
a.gChange = gChangeFirst + i
a.user = fmt.Sprintf("user-%d", (i%10)+1)
ct.GFake.AddLinkedAccountMapping([]*gerritpb.EmailInfo{
&gerritpb.EmailInfo{Email: fmt.Sprintf("%s@example.com", a.user)},
})
ct.GFake.AddFrom(gf.WithCIs(gHost, gf.ACLRestricted(lProject), gf.CI(
a.gChange, gf.Project(gRepo), gf.Ref(gRef), gf.PS(1), gf.Owner(a.user),
gf.Updated(ct.Clock.Now()),
)))
// DryRunner(s) can trigger a DryRun w/o an approval and
// a FullRun w/ approval.
a.mode = run.DryRun
ct.AddDryRunner(a.user)
if i%3 == 0 {
a.mode = run.FullRun
}
a.triggerTime = ct.Clock.Now().Add(time.Duration(i*3) * time.Second)
a.finishTime = a.triggerTime.Add(time.Duration(i*13%5) * time.Minute)
if i%2 == 0 {
a.finalStatus = run.Status_SUCCEEDED
if a.mode == run.FullRun {
expectSubmitted = append(expectSubmitted, a.gChange)
} else {
expectFinished = append(expectFinished, a.gChange)
}
} else {
a.finalStatus = run.Status_FAILED
expectFailed = append(expectFailed, a.gChange)
}
}
indexesByFinishTime := make([]int, len(actions))
for i := range actions {
indexesByFinishTime[i] = i
}
sort.Slice(indexesByFinishTime, func(i, j int) bool {
idxI, idxJ := indexesByFinishTime[i], indexesByFinishTime[j]
return actions[idxI].finishTime.Before(actions[idxJ].finishTime)
})
// Making sure the Tryjob ends at ~the finish time.
bbClient := ct.BuildbucketFake.MustNewClient(ctx, buildbucketHost, lProject)
go func() {
timer := clock.NewTimer(ctx)
for {
timer.Reset(1 * time.Minute)
select {
case <-ctx.Done():
return
case <-timer.GetC():
req := &bbpb.SearchBuildsRequest{
Predicate: &bbpb.BuildPredicate{},
}
for {
res, err := bbClient.SearchBuilds(ctx, req)
if err != nil {
panic(err)
}
for _, b := range res.GetBuilds() {
if bbutil.IsEnded(b.Status) {
continue
}
gChange := b.GetInput().GetGerritChanges()[0].GetChange()
action := actions[gChange-gChangeFirst]
if !clock.Now(ctx).Before(action.finishTime) {
ct.BuildbucketFake.MutateBuild(ctx, buildbucketHost, b.Id, func(b *bbpb.Build) {
if action.finalStatus == run.Status_SUCCEEDED {
b.Status = bbpb.Status_SUCCESS
} else {
b.Status = bbpb.Status_FAILURE
}
b.StartTime = timestamppb.New(ct.Clock.Now())
b.EndTime = timestamppb.New(ct.Clock.Now())
})
}
}
req.PageToken = res.GetNextPageToken()
if req.GetPageToken() == "" {
break
}
}
}
}
}()
ct.LogPhase(ctx, fmt.Sprintf("Triggering CQ on %d CLs", len(actions)))
for i := range actions {
a := &actions[i]
if !ct.Clock.Now().After(a.triggerTime) {
ct.RunUntil(ctx, func() bool { return ct.Clock.Now().After(a.triggerTime) })
}
ct.GFake.MutateChange(gHost, a.gChange, func(c *gf.Change) {
val := 1
if a.mode == run.FullRun {
val = 2
// FullRun requires an approval; self-stamp it
gf.Approve()(c.Info)
}
gf.CQ(val, a.triggerTime, gf.U(a.user))(c.Info)
gf.Updated(a.triggerTime)(c.Info)
})
}
// Now iterate in increasing finishAt, checking state of Gerrit CL.
var actualSubmitted, actualFinished, actualFailed, actualWeird []int
for _, i := range indexesByFinishTime {
a := actions[i]
ct.LogPhase(ctx, fmt.Sprintf("Checking state of #%d %s expected state %s", a.gChange, a.mode, a.finalStatus))
var runs []*run.Run
ct.RunUntil(ctx, func() bool {
if !ct.Clock.Now().After(a.finishTime) {
return false
}
runs = ct.LoadGerritRuns(ctx, gHost, int64(a.gChange))
return len(runs) > 0
})
So(runs, ShouldHaveLength, 1)
r := runs[0]
ct.RunUntil(ctx, func() bool {
r = ct.LoadRun(ctx, runs[0].ID)
return run.IsEnded(r.Status)
})
switch {
case r.Status == run.Status_FAILED:
actualFailed = append(actualFailed, a.gChange)
So(ct.MaxCQVote(ctx, gHost, int64(a.gChange)), ShouldEqual, 0)
case r.Status == run.Status_SUCCEEDED && a.mode == run.DryRun:
actualFinished = append(actualFinished, a.gChange)
So(ct.MaxCQVote(ctx, gHost, int64(a.gChange)), ShouldEqual, 0)
case r.Status == run.Status_SUCCEEDED && a.mode == run.FullRun:
actualSubmitted = append(actualSubmitted, a.gChange)
So(ct.GFake.GetChange(gHost, a.gChange).Info.GetStatus(), ShouldEqual, gerritpb.ChangeStatus_MERGED)
default:
actualWeird = append(actualWeird, a.gChange)
}
So(r.CreateTime, ShouldEqual, datastore.RoundTime(a.triggerTime.UTC()))
So(r.EndTime, ShouldHappenAfter, a.finishTime)
}
sort.Ints(actualSubmitted)
sort.Ints(actualFailed)
sort.Ints(actualFinished)
So(actualSubmitted, ShouldResemble, expectSubmitted)
So(actualFailed, ShouldResemble, expectFailed)
So(actualFinished, ShouldResemble, expectFinished)
So(actualWeird, ShouldBeEmpty)
So(ct.LoadRunsOf(ctx, lProject), ShouldHaveLength, len(actions))
ct.LogPhase(ctx, "Wait for all BQ exports to complete")
ct.RunUntil(ctx, func() bool { return ct.ExportedBQAttemptsCount() == len(actions) })
ct.LogPhase(ctx, "Wait for all PubSub messages are sent")
ct.RunUntil(ctx, func() bool { return len(ct.RunEndedPubSubTasks()) == len(actions) })
})
}