// Copyright 2021 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
gerritpb ""
cfgpb ""
migrationpb ""
bbfacade ""
bblistener ""
gf ""
gerritupdater ""
pmimpl ""
adminpb ""
runbq ""
runimpl ""
cvpubsub ""
tjupdate ""
const (
dsFlakinessFlagName = "cv.dsflakiness"
tqConcurrentFlagName = "cv.tqparallel"
extraVerboseFlagName = "cv.verbose"
fastClockFlagName = "cv.fastclock"
// TODO(crbug/1344711): Change to a dev host or an example host.
buildbucketHost = chromeinfra.BuildbucketHost
committers = "committer-group"
dryRunners = "dry-runner-group"
newPatchsetRunners = "new-patchset-runner-group"
var (
dsFlakinessFlag = flag.Float64(dsFlakinessFlagName, 0, "DS flakiness probability between 0(default) and 1.0 (always fails)")
tqParallelFlag = flag.Bool(tqConcurrentFlagName, false, "Runs TQ tasks in parallel")
extraVerbosityFlag = flag.Bool(extraVerboseFlagName, false, "Extra verbose mode. Use in combination with -v")
fastClockFlag = flag.Int(fastClockFlagName, 0, "Use FastClock running at this multiplier over physical clock")
func init() {
// HACK: Bump up greatly eventbox tombstone delay, especially useful in case
// of tqParallelFlag: the fake test clock is ran at much much
// higher speed than real clock, which results in spurious stale eventbox
// listing errors.
eventbox.TombstonesDelay = time.Hour
// Test encapsulates e2e setup for a CV test.
// Embeds cvtesting.Test, which sets CV's dependencies and some simple CV
// components (e.g. TreeClient), while this Test focuses on setup of CV's own
// components.
// Typical use:
// ct := Test{CVDev: true}
// ctx, cancel := ct.SetUp()
// defer cancel()
// ...
// ct.RunUntil(ctx, func() bool { return len(ct.LoadRunsOf("project")) > 0 })
type Test struct {
*cvtesting.Test // auto-initialized if nil
PMNotifier *prjmanager.Notifier
RunNotifier *run.Notifier
AdminServer adminpb.AdminServer
MigrationServer migrationpb.MigrationServer
// dsFlakiness enables ds flakiness for "RunUntil".
dsFlakiness float64
dsFlakinessRand rand.Source
tqSweepChannel dispatcher.Channel
cqdsMu sync.Mutex
// cqds are fake CQDaemons indexed by LUCI project name.
cqds map[string]*cqdfake.CQDFake
// SetUp sets up the end to end test.
// Must be called exactly once.
func (t *Test) SetUp() (context.Context, func()) {
if t.Test == nil {
t.Test = &cvtesting.Test{}
switch speedUp := *fastClockFlag; {
case speedUp < 0:
panic(fmt.Errorf("invalid %s %d: must be >= 0", fastClockFlagName, speedUp))
case speedUp > 0:
t.Clock = testclock.NewFastClock(
time.Date(2020, time.February, 2, 13, 30, 00, 0, time.FixedZone("Fake local", 3*60*60)),
// Use default testclock.
// Delegate most setup to cvtesting.Test.
ctx, ctxCancel := t.Test.SetUp()
cleanupFns := []func(){ctxCancel}
if (*dsFlakinessFlag) != 0 {
t.dsFlakiness = *dsFlakinessFlag
if t.dsFlakiness < 0 || t.dsFlakiness > 1 {
panic(fmt.Errorf("invalid %s %f: must be between 0.0 and 1.0", dsFlakinessFlagName, t.dsFlakiness))
logging.Warningf(ctx, "Using %.4f flaky Datastore", t.dsFlakiness)
t.dsFlakinessRand = rand.NewSource(0)
cleanupFns = append(cleanupFns, t.startTQSweeping(ctx))
gFactory := t.GFactory()
t.PMNotifier = prjmanager.NewNotifier(t.TQDispatcher)
t.RunNotifier = run.NewNotifier(t.TQDispatcher)
tjNotifier := tryjob.NewNotifier(t.TQDispatcher)
clMutator := changelist.NewMutator(t.TQDispatcher, t.PMNotifier, t.RunNotifier, tjNotifier)
clUpdater := changelist.NewUpdater(t.TQDispatcher, clMutator)
bbFactory := t.BuildbucketFake.NewClientFactory()
topic, sub, cleanupFn := t.makeBuildbucketPubsub(ctx)
cleanupFns = append(cleanupFns, cleanupFn)
t.BuildbucketFake.RegisterPubsubTopic(buildbucketHost, topic)
cleanupFn = bblistener.StartListenerForTest(ctx, sub, tjNotifier)
cleanupFns = append(cleanupFns, cleanupFn)
gerritupdater.RegisterUpdater(clUpdater, gFactory)
_ = pmimpl.New(t.PMNotifier, t.RunNotifier, clMutator, gFactory, clUpdater)
_ = runimpl.New(t.RunNotifier, t.PMNotifier, tjNotifier, clMutator, clUpdater, gFactory, bbFactory, t.TreeFake.Client(), t.BQFake, t.Env)
bbFacade := &bbfacade.Facade{
ClientFactory: bbFactory,
tryjobUpdater := tjupdate.NewUpdater(t.Env, tjNotifier, t.RunNotifier)
tryjobCancellator := tjcancel.NewCancellator(tjNotifier)
t.MigrationServer = &migration.MigrationServer{
RunNotifier: t.RunNotifier,
GFactory: gFactory,
t.AdminServer = admin.New(t.TQDispatcher, &dsmapper.Controller{}, clUpdater, t.PMNotifier, t.RunNotifier)
return ctx, func() {
for i := len(cleanupFns) - 1; i >= 0; i-- {
// RunUntil runs TQ tasks, while stopIf returns false.
// If `dsFlakinessFlag` is set, uses flaky datastore for running TQ tasks.
// If `tqParallelFlag` is set, runs TQ tasks concurrently.
// Not goroutine safe.
func (t *Test) RunUntil(ctx context.Context, stopIf func() bool) {
t.RunUntilT(ctx, 100, stopIf)
// RunUntilT is the same as RunUntil but with custom approximate number of tasks
// if ran in serial mode.
// Depending on command line test options, allows different number of tasks
// executions.
func (t *Test) RunUntilT(ctx context.Context, targetTasksCount int, stopIf func() bool) {
// Default to 10x targetTasksCount tasks s.t. test writers don't need to
// calculate exactly how many tasks they need and also to be more robust
// against future changes in CV impl which may slightly increase number of TQ
// tasks.
maxTasks := 10 * float64(targetTasksCount)
taskCtx := ctx
if t.dsFlakiness > 0 {
maxTasks *= math.Max(1.0, math.Round(1000*t.dsFlakiness))
taskCtx = t.flakifyDS(ctx)
if *tqParallelFlag {
// Executing tasks concurrently usually leads to Datastore and other
// contention, so allow more retries.
maxTasks *= 10
i := 0
tooLong := false
var finished []string
tqOpts := []tqtesting.RunOption{
// StopAfter must be first and also always return false s.t. we correctly
// record all finished tasks.
tqtesting.StopAfter(func(task *tqtesting.Task) bool {
finished = append(finished, fmt.Sprintf("%30s (attempt# %d)", task.Class, task.Attempts))
return false
// StopBefore is actually used to for conditional stopping.
// Note that it can `return true` (meaning stop) before any task was run at
// all.
tqtesting.StopBefore(func(t *tqtesting.Task) bool {
switch {
case stopIf():
return true
case float64(i) >= maxTasks:
tooLong = true
return true
if i%1000 == 0 {
logging.Debugf(ctx, "RunUntil is running %d task", i)
return false
if *tqParallelFlag {
tqOpts = append(tqOpts, tqtesting.ParallelExecute())
t.TQ.Run(taskCtx, tqOpts...)
// Log only here after all tasks-in-progress are completed.
outstanding := stringset.New(len(t.TQ.Tasks().Pending()))
for _, task := range t.TQ.Tasks().Pending() {
logging.Debugf(ctx, "RunUntil ran %d iterations, finished %d tasks, left %d tasks: %s", i, len(finished), len(outstanding), outstanding.ToSortedSlice())
if *extraVerbosityFlag {
for i, v := range finished {
logging.Debugf(ctx, " finished #%d task: %s", i, v)
if tooLong {
panic(errors.New("RunUntil ran for too long!"))
if err := ctx.Err(); err != nil {
// MustCQD returns a CQDaemon fake for the given project, starting a new one if
// necessary, in which case it's lifetime is limited by the given context.
func (t *Test) MustCQD(ctx context.Context, luciProject string) *cqdfake.CQDFake {
defer t.cqdsMu.Unlock()
if cqd, exists := t.cqds[luciProject]; exists {
return cqd
cqd := &cqdfake.CQDFake{
LUCIProject: luciProject,
CV: t.MigrationServer,
GFake: t.GFake,
if t.cqds == nil {
t.cqds = make(map[string]*cqdfake.CQDFake, 1)
t.cqds[luciProject] = cqd
return cqd
// Methods to examine state.
// Now returns test clock time in UTC.
func (t *Test) Now() time.Time {
return t.Clock.Now().UTC()
// LoadProject returns Project entity or nil if not exists.
func (t *Test) LoadProject(ctx context.Context, lProject string) *prjmanager.Project {
p, err := prjmanager.Load(ctx, lProject)
if err != nil {
return p
// LoadRun returns Run entity or nil if not exists.
func (t *Test) LoadRun(ctx context.Context, id common.RunID) *run.Run {
r, err := run.LoadRun(ctx, id)
if err != nil {
return r
// LoadRunsOf loads all Runs of a project from Datastore.
func (t *Test) LoadRunsOf(ctx context.Context, lProject string) []*run.Run {
runs, _, err := run.ProjectQueryBuilder{Project: lProject}.LoadRuns(ctx)
if err != nil {
return runs
// LoadGerritRuns loads all Runs from Datastore which include a Gerrit CL.
func (t *Test) LoadGerritRuns(ctx context.Context, gHost string, gChange int64) []*run.Run {
cl := t.LoadGerritCL(ctx, gHost, gChange)
if cl == nil {
return nil
runs, _, err := run.CLQueryBuilder{CLID: cl.ID}.LoadRuns(ctx)
if err != nil {
return runs
// EarliestCreatedRunOf returns the earliest created Run in a project.
// If there are several such runs, may return any one of them.
// Returns nil if there are no Runs.
func (t *Test) EarliestCreatedRunOf(ctx context.Context, lProject string) *run.Run {
var earliest *run.Run
for _, r := range t.LoadRunsOf(ctx, lProject) {
if earliest == nil || earliest.CreateTime.After(r.CreateTime) {
earliest = r
return earliest
// LatestRunWithGerritCL returns the latest created Run containing given CL.
// If there are several, returns the one with latest .StartTime.
// Returns nil if there is such Runs, including if Gerrit CL isn't yet in DS.
func (t *Test) LatestRunWithGerritCL(ctx context.Context, gHost string, gChange int64) *run.Run {
var ret *run.Run
for _, r := range t.LoadGerritRuns(ctx, gHost, gChange) {
switch {
case ret == nil:
ret = r
case ret.CreateTime.After(r.CreateTime):
ret = r
case ret.CreateTime.Equal(r.CreateTime) && ret.StartTime.Before(r.StartTime):
ret = r
return ret
// LoadCL returns CL entity or nil if not exists.
func (t *Test) LoadCL(ctx context.Context, id common.CLID) *changelist.CL {
cl := &changelist.CL{ID: id}
switch err := datastore.Get(ctx, cl); {
case err == datastore.ErrNoSuchEntity:
return nil
case err != nil:
return cl
// LoadGerritCL returns CL entity or nil if not exists.
func (t *Test) LoadGerritCL(ctx context.Context, gHost string, gChange int64) *changelist.CL {
cl, err := changelist.MustGobID(gHost, gChange).Load(ctx)
if err != nil {
return cl
// MaxVote returns max vote of a Gerrit CL loaded from Gerrit fake.
// Returns 0 if there are no votes.
// Panics if CL doesn't exist.
func (t *Test) MaxVote(ctx context.Context, gHost string, gChange int64, gLabel string) int32 {
c := t.GFake.GetChange(gHost, int(gChange))
if c == nil {
panic(fmt.Errorf("%s/%d doesn't exist", gHost, gChange))
max := int32(0)
for _, v := range c.Info.GetLabels()[gLabel].GetAll() {
if v.GetValue() > max {
max = v.GetValue()
return max
// MaxCQVote returns max CQ vote of a Gerrit CL loaded from Gerrit fake.
// Returns 0 if there are no votes.
// Panics if CL doesn't exist.
func (t *Test) MaxCQVote(ctx context.Context, gHost string, gChange int64) int32 {
return t.MaxVote(ctx, gHost, gChange, trigger.CQLabelName)
// LastMessage returns the last message posted on a Gerrit CL from Gerrit fake.
// Returns nil if there are no messages.
// Panics if the CL doesn't exist.
func (t *Test) LastMessage(gHost string, gChange int64) *gerritpb.ChangeMessageInfo {
return gf.LastMessage(t.GFake.GetChange(gHost, int(gChange)).Info)
// ExportedBQAttemptsCount returns number of exported CQ Attempts.
func (t *Test) ExportedBQAttemptsCount() int {
return t.BQFake.RowsCount("", runbq.CVDataset, runbq.CVTable)
// MigrationFetchActiveRuns fetches active Run(s).
func (t *Test) MigrationFetchActiveRuns(ctx context.Context, project string) []*migrationpb.ActiveRun {
req := &migrationpb.FetchActiveRunsRequest{LuciProject: project}
res, err := t.MigrationServer.FetchActiveRuns(t.MigrationContext(ctx), req)
if err != nil {
return res.GetActiveRuns()
// MigrationContext returns context authorized to call to the Migration API.
func (t *Test) MigrationContext(ctx context.Context) context.Context {
return auth.WithState(ctx, &authtest.FakeState{
Identity: "",
IdentityGroups: []string{migration.AllowGroup},
// AddCommitter adds a given member into the committer group.
func (t *Test) AddCommitter(email string) {
t.AddMember(email, committers)
// AddDryRunner adds a given member into the dry-runner group.
func (t *Test) AddDryRunner(email string) {
t.AddMember(email, dryRunners)
// AddNewPatchsetRunner adds a given member into the new-patchset-runner group.
func (t *Test) AddNewPatchsetRunner(email string) {
t.AddMember(email, newPatchsetRunners)
// LogPhase emits easy to recognize log like
// ===========================
// PHASE: ....
// ===========================
func (t *Test) LogPhase(ctx context.Context, format string, args ...interface{}) {
line := strings.Repeat("=", 80)
format = fmt.Sprintf("\n%s\nPHASE: %s\n%s", line, format, line)
logging.Debugf(ctx, format, args...)
// MakeCfgSingular return project config with a single ConfigGroup.
func MakeCfgSingular(cgName, gHost, gRepo, gRef string) *cfgpb.Config {
return &cfgpb.Config{
ConfigGroups: []*cfgpb.ConfigGroup{
Name: cgName,
Gerrit: []*cfgpb.ConfigGroup_Gerrit{
Url: "https://" + gHost + "/",
Projects: []*cfgpb.ConfigGroup_Gerrit_Project{
Name: gRepo,
RefRegexp: []string{gRef},
Verifiers: &cfgpb.Verifiers{
GerritCqAbility: &cfgpb.Verifiers_GerritCQAbility{
CommitterList: []string{committers},
DryRunAccessList: []string{dryRunners},
NewPatchsetRunAccessList: []string{newPatchsetRunners},
// MakeCfgCombinable return project config with a combinable ConfigGroup.
func MakeCfgCombinable(cgName, gHost, gRepo, gRef string) *cfgpb.Config {
return &cfgpb.Config{
ConfigGroups: []*cfgpb.ConfigGroup{
Name: cgName,
Gerrit: []*cfgpb.ConfigGroup_Gerrit{
Url: "https://" + gHost + "/",
Projects: []*cfgpb.ConfigGroup_Gerrit_Project{
Name: gRepo,
RefRegexp: []string{gRef},
CombineCls: &cfgpb.CombineCLs{
StabilizationDelay: durationpb.New(5 * time.Minute),
Verifiers: &cfgpb.Verifiers{
GerritCqAbility: &cfgpb.Verifiers_GerritCQAbility{
CommitterList: []string{committers},
DryRunAccessList: []string{dryRunners},
// DS flakiness & TQ sweep implementation.
// flakifyDS returns context with flaky Datastore.
func (t *Test) flakifyDS(ctx context.Context) context.Context {
ctx, fb := featureBreaker.FilterRDS(ctx, nil)
Rand: t.dsFlakinessRand,
DeadlineProbability: t.dsFlakiness,
// NOTE: Feature breaker currently doesn't allow simulating
// an error from actually successful DeleteMulti/PutMulti, a.k.a. submarine
// writes. However, the CommitTransaction feature breaker is simulating
// returning an error from an actually successful transaction, which makes
// ConcurrentTransactionProbability incorrectly simulated.
// NOTE: A transaction with 1 Get and 1 Put will roll a dice 4 times:
// BeginTransaction, GetMulti, PutMulti, CommitTransaction.
// However, in Cloud Datastore client, PutMulti within transaction doesn't
// reach Datastore until the end of the transaction.
// TODO(tandrii): make realistic feature breaker with submarine writes outside
// of transaction and easier to control probabilities of transaction breaker.
// For now, use 5x higher probability of failure for mutations,
// which for simple Get/Put transactions results (2x + 10x) higher probability
// than a non-transactional Get.
Rand: t.dsFlakinessRand,
DeadlineProbability: math.Min(t.dsFlakiness*5, 1.0),
ConcurrentTransactionProbability: 0,
return ctx
// startTQSweeping starts asynchronous sweeping for the duration of the test.
// This is necessary if flaky DS is used.
func (t *Test) startTQSweeping(ctx context.Context) (deferme func()) {
t.TQDispatcher.Sweeper = tq.NewInProcSweeper(tq.InProcSweeperOptions{
SweepShards: 1,
SubmitBatchSize: 1,
var err error
t.tqSweepChannel, err = dispatcher.NewChannel(
Buffer: buffer.Options{
BatchItemsMax: 1, // incoming event => sweep ASAP.
MaxLeases: 1, // at most 1 sweep concurrently
// 2+ outstanding requests to sweep should result in just 1 sweep.
FullBehavior: &buffer.DropOldestBatch{MaxLiveItems: 1},
// This is only useful if something is misconfigured to avoid pointless
// retries, because the individual sweeps must not fail as we use
// non-flaky Datastore for sweeping.
Retry: retry.None,
func(*buffer.Batch) error { return t.TQDispatcher.Sweep(ctx) },
if err != nil {
return func() { t.tqSweepChannel.CloseAndDrain(ctx) }
// enqueueTQSweep ensures a TQ sweep will happen strictly afterwards.
// Noop if TQ sweeping is not required.
func (t *Test) enqueueTQSweep(ctx context.Context) {
if t.TQDispatcher.Sweeper != nil {
t.tqSweepChannel.C <- struct{}{}
// RunEndedPubSubTasks returns all the succeeded TQ tasks with RunEnded pubsub
// events.
func (t *Test) RunEndedPubSubTasks() tqtesting.TaskList {
return t.SucceededTQTasks.Filter(func(t *tqtesting.Task) bool {
_, ok := t.Payload.(*cvpubsub.PublishRunEndedTask)
return ok
func (t *Test) makeBuildbucketPubsub(ctx context.Context) (*pubsub.Topic, *pubsub.Subscription, func()) {
srv := pstest.NewServer()
client, err := pubsub.NewClient(ctx, t.Env.GAEInfo.CloudProject,
if err != nil {
topic, err := client.CreateTopic(ctx, "bb-build")
if err != nil {
sub, err := client.CreateSubscription(ctx, bblistener.SubscriptionID, pubsub.SubscriptionConfig{Topic: topic})
if err != nil {
return topic, sub, func() {
_ = client.Close()
_ = srv.Close()