blob: 1a09655d231dfd1358bb08e810c1688883a8bdae [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package changelist
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/errors"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/testing/ftt"
"go.chromium.org/luci/common/testing/truth/assert"
"go.chromium.org/luci/common/testing/truth/should"
"go.chromium.org/luci/gae/filter/featureBreaker"
"go.chromium.org/luci/gae/filter/featureBreaker/flaky"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/tq/tqtesting"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/cvtesting"
)
func TestMutatorSingleCL(t *testing.T) {
t.Parallel()
ftt.Run("Mutator works on a single CL", t, func(t *ftt.Test) {
ct := cvtesting.Test{}
ctx := ct.SetUp(t)
const lProject = "infra"
const run1 = lProject + "/1"
const run2 = lProject + "/2"
const gHost = "x-review.example.com"
const gChange = 44
eid := MustGobID(gHost, gChange)
pm := pmMock{}
rm := rmMock{}
tj := tjMock{}
m := NewMutator(ct.TQDispatcher, &pm, &rm, &tj)
execBatchOnCLUpdatedTask := func() {
assert.Loosely(t, ct.TQ.Tasks(), should.HaveLength(1))
assert.Loosely(t, ct.TQ.Tasks()[0].Class, should.Resemble(BatchOnCLUpdatedTaskClass))
ct.TQ.Run(ctx, tqtesting.StopAfterTask(BatchOnCLUpdatedTaskClass))
}
expectNoNotifications := func() {
assert.Loosely(t, ct.TQ.Tasks(), should.HaveLength(0))
assert.Loosely(t, pm.byProject, should.BeEmpty)
assert.Loosely(t, rm.byRun, should.BeEmpty)
assert.Loosely(t, tj.clsNotified, should.BeEmpty)
}
t.Run("Upsert method", func(t *ftt.Test) {
t.Run("creates", func(t *ftt.Test) {
s := makeSnapshot(lProject, ct.Clock.Now())
cl, err := m.Upsert(ctx, lProject, eid, func(cl *CL) error {
cl.Snapshot = s
return nil
})
assert.NoErr(t, err)
assert.That(t, cl.ExternalID, should.Equal(eid))
assert.Loosely(t, cl.EVersion, should.Equal(1))
assert.Loosely(t, cl.UpdateTime, should.Match(ct.Clock.Now()))
assert.Loosely(t, cl.RetentionKey, should.NotBeEmpty)
assert.Loosely(t, cl.Snapshot, should.Resemble(s))
execBatchOnCLUpdatedTask()
assert.Loosely(t, pm.byProject, should.Resemble(map[string]map[common.CLID]int64{
lProject: {cl.ID: cl.EVersion},
}))
assert.Loosely(t, rm.byRun, should.BeEmpty)
assert.Loosely(t, tj.clsNotified, should.BeEmpty)
})
t.Run("skips creation", func(t *ftt.Test) {
// This is a special case which isn't supposed to be needed,
// but it's kept here for completeness.
cl, err := m.Upsert(ctx, lProject, eid, func(cl *CL) error {
return ErrStopMutation
})
assert.NoErr(t, err)
assert.Loosely(t, cl, should.BeNil)
expectNoNotifications()
})
t.Run("updates", func(t *ftt.Test) {
s1 := makeSnapshot(lProject, ct.Clock.Now())
cl := eid.MustCreateIfNotExists(ctx)
cl.Snapshot = s1
cl.IncompleteRuns = common.MakeRunIDs(run1)
assert.Loosely(t, datastore.Put(ctx, cl), should.BeNil)
ct.Clock.Add(time.Second)
s2 := makeSnapshot(lProject, ct.Clock.Now())
s2.MinEquivalentPatchset++
var priorSnapshot *Snapshot
cl, err := m.Upsert(ctx, lProject, eid, func(cl *CL) error {
if priorSnapshot == nil {
priorSnapshot = cl.Snapshot
}
cl.Snapshot = s2
cl.IncompleteRuns.InsertSorted(run2) // idempotent
return nil
})
assert.NoErr(t, err)
assert.Loosely(t, priorSnapshot, should.Resemble(s1))
assert.That(t, cl.ExternalID, should.Equal(eid))
assert.Loosely(t, cl.EVersion, should.Equal(2))
assert.That(t, cl.UpdateTime, should.Match(ct.Clock.Now().UTC()))
assert.Loosely(t, cl.RetentionKey, should.NotBeEmpty)
assert.Loosely(t, cl.Snapshot, should.Resemble(s2))
assert.Loosely(t, tj.clsNotified, should.HaveLength(1))
for clid, content := range tj.clsNotified {
assert.That(t, clid, should.Equal(cl.ID))
assert.That(t, content.prevMinEquiPS, should.Equal(s1.GetMinEquivalentPatchset()))
assert.That(t, content.curMinEquiPS, should.Equal(s2.GetMinEquivalentPatchset()))
}
execBatchOnCLUpdatedTask()
assert.Loosely(t, pm.byProject, should.Resemble(map[string]map[common.CLID]int64{
lProject: {cl.ID: cl.EVersion},
}))
assert.Loosely(t, rm.byRun, should.Resemble(map[common.RunID]map[common.CLID]int64{
run1: {cl.ID: cl.EVersion},
run2: {cl.ID: cl.EVersion},
}))
})
t.Run("skips an update", func(t *ftt.Test) {
priorCL := eid.MustCreateIfNotExists(ctx)
ct.Clock.Add(time.Second)
cl, err := m.Upsert(ctx, lProject, eid, func(cl *CL) error {
return ErrStopMutation
})
assert.NoErr(t, err)
assert.That(t, cl.ExternalID, should.Equal(eid))
assert.That(t, cl.EVersion, should.Equal(priorCL.EVersion))
assert.That(t, cl.UpdateTime, should.Match(priorCL.UpdateTime))
assert.That(t, cl.RetentionKey, should.Equal(priorCL.RetentionKey))
assert.Loosely(t, pm.byProject, should.BeEmpty)
assert.Loosely(t, rm.byRun, should.BeEmpty)
assert.Loosely(t, tj.clsNotified, should.BeEmpty)
})
t.Run("propagates error without wrapping", func(t *ftt.Test) {
myErr := errors.New("my error")
_, err := m.Upsert(ctx, lProject, eid, func(cl *CL) error {
return myErr
})
assert.That(t, myErr, should.Equal(err))
})
})
t.Run("Update method", func(t *ftt.Test) {
t.Run("updates", func(t *ftt.Test) {
s1 := makeSnapshot("prior-project", ct.Clock.Now())
priorCL := eid.MustCreateIfNotExists(ctx)
priorCL.Snapshot = s1
assert.Loosely(t, datastore.Put(ctx, priorCL), should.BeNil)
ct.Clock.Add(time.Second)
s2 := makeSnapshot(lProject, ct.Clock.Now())
s2.MinEquivalentPatchset++
cl, err := m.Update(ctx, lProject, priorCL.ID, func(cl *CL) error {
cl.Snapshot = s2
return nil
})
assert.NoErr(t, err)
assert.Loosely(t, cl.ID, should.Resemble(priorCL.ID))
assert.That(t, cl.ExternalID, should.Equal(eid))
assert.Loosely(t, cl.EVersion, should.Equal(2))
assert.That(t, cl.UpdateTime, should.Match(ct.Clock.Now().UTC()))
assert.Loosely(t, cl.RetentionKey, should.NotBeEmpty)
assert.Loosely(t, cl.Snapshot, should.Resemble(s2))
execBatchOnCLUpdatedTask()
assert.That(t, pm.byProject[lProject][cl.ID], should.Equal(cl.EVersion))
assert.Loosely(t, pm.byProject, should.Resemble(map[string]map[common.CLID]int64{
"prior-project": {cl.ID: cl.EVersion},
lProject: {cl.ID: cl.EVersion},
}))
assert.Loosely(t, rm.byRun, should.BeEmpty)
assert.Loosely(t, tj.clsNotified, should.HaveLength(1))
for clid, content := range tj.clsNotified {
assert.That(t, clid, should.Equal(cl.ID))
assert.That(t, content.prevMinEquiPS, should.Equal(s1.GetMinEquivalentPatchset()))
assert.That(t, content.curMinEquiPS, should.Equal(s2.GetMinEquivalentPatchset()))
}
})
t.Run("schedule tryjob cancel for abandoned CL", func(t *ftt.Test) {
s1 := makeSnapshot(lProject, ct.Clock.Now())
cl := eid.MustCreateIfNotExists(ctx)
cl.Snapshot = s1
assert.Loosely(t, datastore.Put(ctx, cl), should.BeNil)
ct.Clock.Add(time.Second)
s2 := makeSnapshot(lProject, ct.Clock.Now())
s2.GetGerrit().GetInfo().Status = gerritpb.ChangeStatus_ABANDONED
cl, err := m.Update(ctx, lProject, cl.ID, func(cl *CL) error {
cl.Snapshot = s2
return nil
})
assert.NoErr(t, err)
assert.Loosely(t, cl.Snapshot, should.Resemble(s2))
assert.Loosely(t, tj.clsNotified, should.HaveLength(1))
for clid, content := range tj.clsNotified {
assert.That(t, clid, should.Equal(cl.ID))
assert.That(t, content.prevMinEquiPS, should.Equal(s2.GetMinEquivalentPatchset()))
assert.That(t, content.curMinEquiPS, should.Equal(s2.GetPatchset()+1))
}
})
t.Run("skips an actual update", func(t *ftt.Test) {
priorCL := eid.MustCreateIfNotExists(ctx)
ct.Clock.Add(time.Second)
cl, err := m.Update(ctx, lProject, priorCL.ID, func(cl *CL) error {
return ErrStopMutation
})
assert.NoErr(t, err)
assert.Loosely(t, cl.ID, should.Resemble(priorCL.ID))
assert.That(t, cl.ExternalID, should.Equal(eid))
assert.That(t, cl.EVersion, should.Equal(priorCL.EVersion))
assert.That(t, cl.UpdateTime, should.Match(priorCL.UpdateTime))
assert.That(t, cl.RetentionKey, should.Equal(priorCL.RetentionKey))
expectNoNotifications()
})
t.Run("propagates error without wrapping", func(t *ftt.Test) {
priorCL := eid.MustCreateIfNotExists(ctx)
myErr := errors.New("my error")
_, err := m.Update(ctx, lProject, priorCL.ID, func(cl *CL) error {
return myErr
})
assert.That(t, myErr, should.Equal(err))
})
t.Run("errors out non-transiently if CL doesn't exist", func(t *ftt.Test) {
_, err := m.Update(ctx, lProject, 123, func(cl *CL) error {
panic("must not be called")
})
assert.That(t, err, should.ErrLikeError(datastore.ErrNoSuchEntity))
assert.Loosely(t, transient.Tag.In(err), should.BeFalse)
})
})
t.Run("Invalid MutationCallback", func(t *ftt.Test) {
type badCallback func(cl *CL)
cases := func(kind string, repro func(bad badCallback) error) {
t.Run(kind, func(t *ftt.Test) {
assert.Loosely(t, repro(func(cl *CL) { cl.EVersion = 2 }), should.ErrLike("CL.EVersion"))
assert.Loosely(t, repro(func(cl *CL) { cl.UpdateTime = ct.Clock.Now() }), should.ErrLike("CL.UpdateTime"))
assert.Loosely(t, repro(func(cl *CL) { cl.ID++ }), should.ErrLike("CL.ID"))
assert.Loosely(t, repro(func(cl *CL) { cl.ExternalID = "don't do this" }), should.ErrLike("CL.ExternalID"))
})
}
cases("Upsert creation", func(bad badCallback) error {
_, err := m.Upsert(ctx, lProject, eid, func(cl *CL) error {
bad(cl)
return nil
})
return err
})
cases("Upsert update", func(bad badCallback) error {
eid.MustCreateIfNotExists(ctx)
ct.Clock.Add(time.Second)
_, err := m.Upsert(ctx, lProject, eid, func(cl *CL) error {
bad(cl)
return nil
})
return err
})
cases("Update", func(bad badCallback) error {
cl := eid.MustCreateIfNotExists(ctx)
ct.Clock.Add(time.Second)
_, err := m.Update(ctx, lProject, cl.ID, func(cl *CL) error {
bad(cl)
return nil
})
return err
})
})
})
}
func TestMutatorBatch(t *testing.T) {
t.Parallel()
ftt.Run("Mutator works on batch of CLs", t, func(t *ftt.Test) {
ct := cvtesting.Test{}
ctx := ct.SetUp(t)
const lProjectAlt = "alt"
const lProject = "infra"
const run1 = lProject + "/1"
const run2 = lProject + "/2"
const run3 = lProject + "/3"
const gHost = "x-review.example.com"
const gChangeFirst = 100000
const N = 12
pm := pmMock{}
rm := rmMock{}
tj := tjMock{}
m := NewMutator(ct.TQDispatcher, &pm, &rm, &tj)
t.Run(fmt.Sprintf("with %d CLs already in Datastore", N), func(t *ftt.Test) {
var clids common.CLIDs
var expectedAltProject, expectedRun1, expectedRun2 common.CLIDs
for gChange := gChangeFirst; gChange < gChangeFirst+N; gChange++ {
cl := MustGobID(gHost, int64(gChange)).MustCreateIfNotExists(ctx)
clids = append(clids, cl.ID)
if gChange%2 == 0 {
cl.Snapshot = makeSnapshot(lProjectAlt, ct.Clock.Now())
expectedAltProject = append(expectedAltProject, cl.ID)
} else {
cl.Snapshot = makeSnapshot(lProject, ct.Clock.Now())
}
if gChange%3 == 0 {
cl.IncompleteRuns = append(cl.IncompleteRuns, run1)
expectedRun1 = append(expectedRun1, cl.ID)
}
if gChange%5 == 0 {
cl.IncompleteRuns = append(cl.IncompleteRuns, run2)
expectedRun2 = append(expectedRun2, cl.ID)
}
// Ensure each CL has unique EVersion later on.
cl.EVersion = int64(10 * gChange)
assert.Loosely(t, datastore.Put(ctx, cl), should.BeNil)
}
ct.Clock.Add(time.Minute)
// In all cases below, run3 is added to the list of incomplete CLs.
verify := func(resCLs []*CL) {
// Ensure the returned CLs are exactly what was stored in Datastore,
// and compute eversion map at the same time.
dsCLs, err := LoadCLsByIDs(ctx, clids)
assert.NoErr(t, err)
eversions := make(map[common.CLID]int64, len(dsCLs))
for i := range dsCLs {
assert.Loosely(t, dsCLs[i].IncompleteRuns.ContainsSorted(run3), should.BeTrue)
assert.That(t, dsCLs[i].ID, should.Equal(resCLs[i].ID))
assert.That(t, dsCLs[i].EVersion, should.Equal(resCLs[i].EVersion))
assert.That(t, dsCLs[i].UpdateTime, should.Match(resCLs[i].UpdateTime))
assert.That(t, dsCLs[i].RetentionKey, should.Equal(resCLs[i].RetentionKey))
assert.Loosely(t, dsCLs[i].IncompleteRuns, should.Resemble(resCLs[i].IncompleteRuns))
eversions[dsCLs[i].ID] = dsCLs[i].EVersion
}
// Ensure Project and Run managers were notified correctly.
assertNotified := func(actual map[common.CLID]int64, expectedIDs common.CLIDs) {
expected := make(map[common.CLID]int64, len(expectedIDs))
for _, id := range expectedIDs {
expected[id] = eversions[id]
}
assert.Loosely(t, actual, should.Resemble(expected))
}
// The project in the context of which CLs were mutated must be notified
// on all CLs.
assertNotified(pm.byProject[lProject], clids)
// Ditto for the run3, which was added to all CLs.
assertNotified(rm.byRun[run3], clids)
// Others must be notified on relevant CLs, only.
assertNotified(pm.byProject[lProjectAlt], expectedAltProject)
assertNotified(rm.byRun[run1], expectedRun1)
assertNotified(rm.byRun[run2], expectedRun2)
assert.Loosely(t, tj.clsNotified, should.BeEmpty)
}
t.Run("BeginBatch + FinalizeBatch", func(t *ftt.Test) {
var resCLs []*CL
transErr := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
resCLs = nil // reset in case of retries
muts, err := m.BeginBatch(ctx, lProject, clids)
assert.NoErr(t, err)
eg, _ := errgroup.WithContext(ctx)
for i := range muts {
mut := muts[i]
eg.Go(func() error {
mut.CL.IncompleteRuns = append(mut.CL.IncompleteRuns, run3)
return nil
})
}
assert.Loosely(t, eg.Wait(), should.BeNil)
resCLs, err = m.FinalizeBatch(ctx, muts)
return err
}, nil)
assert.Loosely(t, transErr, should.BeNil)
// Execute the expected BatchOnCLUpdatedTask.
assert.Loosely(t, ct.TQ.Tasks(), should.HaveLength(1))
ct.TQ.Run(ctx, tqtesting.StopWhenDrained())
verify(resCLs)
})
t.Run("Manual Adopt + FinalizeBatch", func(t *ftt.Test) {
var resCLs []*CL
transErr := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
resCLs = nil // reset in case of retries
muts := make([]*CLMutation, len(clids))
eg, egCtx := errgroup.WithContext(ctx)
for i, id := range clids {
i, id := i, id
eg.Go(func() error {
cl := &CL{ID: id}
if err := datastore.Get(egCtx, cl); err != nil {
return err
}
muts[i] = m.Adopt(ctx, lProject, cl)
muts[i].CL.IncompleteRuns = append(muts[i].CL.IncompleteRuns, run3)
return nil
})
}
assert.Loosely(t, eg.Wait(), should.BeNil)
var err error
resCLs, err = m.FinalizeBatch(ctx, muts)
return err
}, nil)
assert.Loosely(t, transErr, should.BeNil)
// Execute the expected BatchOnCLUpdatedTask.
assert.Loosely(t, ct.TQ.Tasks(), should.HaveLength(1))
ct.TQ.Run(ctx, tqtesting.StopWhenDrained())
verify(resCLs)
})
t.Run("BeginBatch + manual finalization", func(t *ftt.Test) {
// This is inefficient and really shouldn't be done in production.
var resCLs []*CL
transErr := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
resCLs = make([]*CL, len(clids)) // reset in case of retries
muts, err := m.BeginBatch(ctx, lProject, clids)
assert.NoErr(t, err)
eg, egCtx := errgroup.WithContext(ctx)
for i, mut := range muts {
i, mut := i, mut
eg.Go(func() error {
mut.CL.IncompleteRuns = append(mut.CL.IncompleteRuns, run3)
var err error
resCLs[i], err = mut.Finalize(egCtx)
return err
})
}
return eg.Wait()
}, nil)
assert.Loosely(t, transErr, should.BeNil)
tasks := ct.TQ.Tasks()
assert.Loosely(t, tasks, should.HaveLength(N))
for _, tsk := range tasks {
assert.Loosely(t, tsk.Class, should.Resemble(BatchOnCLUpdatedTaskClass))
ct.TQ.Run(ctx, tqtesting.StopAfterTask(BatchOnCLUpdatedTaskClass))
}
verify(resCLs)
})
})
})
}
func TestMutatorConcurrent(t *testing.T) {
t.Parallel()
ftt.Run("Mutator works on single CL when called concurrently with flaky datastore", t, func(t *ftt.Test) {
ct := cvtesting.Test{}
ctx := ct.SetUp(t)
// Truncate to seconds to reduce noise in diffs of proto timestamps.
// use Seconds with lots of 0s at the end for easy grasp of assertion
// failures since they are done on protos.
epoch := (&timestamppb.Timestamp{Seconds: 14500000000}).AsTime()
ct.Clock.Set(epoch)
const lProject = "infra"
const gHost = "x-review.example.com"
const gChange = 44
eid := MustGobID(gHost, gChange)
pm := pmMock{}
rm := rmMock{}
tj := tjMock{}
m := NewMutator(ct.TQDispatcher, &pm, &rm, &tj)
ctx, fb := featureBreaker.FilterRDS(ctx, nil)
// Use a single random source for all flaky.Errors(...) instances. Otherwise
// they repeat the same random pattern each time withBrokenDS is called.
rnd := rand.NewSource(0)
// Make datastore very faulty.
fb.BreakFeaturesWithCallback(
flaky.Errors(flaky.Params{
Rand: rnd,
DeadlineProbability: 0.4,
ConcurrentTransactionProbability: 0.4,
}),
featureBreaker.DatastoreFeatures...,
)
// Number of tries per worker.
// With probabilities above, it typically takes <60 tries.
//
// This value was set to 300 before 2024-01-16 and it flaked once, so
// let's increase it to 30000.
const R = 30000
// Number of workers.
const N = 20
wg := sync.WaitGroup{}
wg.Add(N)
for d := 0; d < N; d++ {
// Simulate each worker trying to update Snapshot and DependentMeta to
// at least pre-determined timestamp.
// For extra coverage, use different timestamps for them.
// For a co-prime p,N:
// assert sorted(set([((p*d)%N) for d in xrange(N)])) == range(N)
// 47, 59 are actual primes.
snapTS := epoch.Add(time.Second * time.Duration((47*d)%N))
accTS := epoch.Add(time.Second * time.Duration((73*d)%N))
go func() {
defer wg.Done()
snap := makeSnapshot(lProject, snapTS)
acc := makeAccess(lProject, accTS)
var err error
for i := 0; i < R; i++ {
// Make this thing a little more robust against flakiness and sleep for a millisecond
// every so often.
if i%1000 == 0 {
time.Sleep(1 * time.Millisecond)
}
_, err = m.Upsert(ctx, lProject, eid, func(cl *CL) error {
ret := ErrStopMutation
if upTime := cl.Snapshot.GetExternalUpdateTime(); upTime == nil || upTime.AsTime().Before(snapTS) {
cl.Snapshot = snap
ret = nil
}
if upTime := cl.Access.GetByProject()[lProject].GetUpdateTime(); upTime == nil || upTime.AsTime().Before(accTS) {
cl.Access = acc
ret = nil
}
return ret
})
if err == nil {
t.Logf("succeeded after %d tries", i)
return
}
}
panic(errors.Annotate(err, "all %d tries exhausted", R).Err())
}()
}
wg.Wait()
// "Fix" datastore, letting us examine it.
fb.BreakFeaturesWithCallback(
func(context.Context, string) error { return nil },
featureBreaker.DatastoreFeatures...,
)
cl, err := eid.Load(ctx)
assert.NoErr(t, err)
assert.Loosely(t, cl, should.NotBeNil)
// Since all workers have succeeded, the latest snapshot
// (by ExternalUpdateTime) must be the current snapshot in datastore.
latestTS := epoch.Add((N - 1) * time.Second)
assert.That(t, cl.Snapshot.GetExternalUpdateTime().AsTime(), should.Match(latestTS))
assert.Loosely(t, cl.Access.GetByProject()[lProject].GetUpdateTime().AsTime(), should.Resemble(latestTS))
// Furthermore, there must have been at most N non-noop UpdateSnapshot calls
// (one per worker, iff they did it exactly in the increasing order of
// timestamps.
t.Logf("%d updates done", cl.EVersion)
assert.Loosely(t, cl.EVersion, should.BeLessThan(N+1))
})
}
func makeAccess(luciProject string, updatedTime time.Time) *Access {
return &Access{ByProject: map[string]*Access_Project{
luciProject: {
NoAccess: true,
NoAccessTime: timestamppb.New(updatedTime),
UpdateTime: timestamppb.New(updatedTime),
},
}}
}
type pmMock struct {
m sync.Mutex
byProject map[string]map[common.CLID]int64 // latest max EVersion
}
func (p *pmMock) NotifyCLsUpdated(ctx context.Context, project string, events *CLUpdatedEvents) error {
p.m.Lock()
defer p.m.Unlock()
if p.byProject == nil {
p.byProject = make(map[string]map[common.CLID]int64, 1)
}
m := p.byProject[project]
if m == nil {
m = make(map[common.CLID]int64, len(events.GetEvents()))
p.byProject[project] = m
}
for _, e := range events.GetEvents() {
clid := common.CLID(e.GetClid())
m[clid] = max(m[clid], e.GetEversion())
}
return nil
}
type rmMock struct {
m sync.Mutex
byRun map[common.RunID]map[common.CLID]int64 // latest max EVersion
}
func (r *rmMock) NotifyCLsUpdated(ctx context.Context, rid common.RunID, events *CLUpdatedEvents) error {
r.m.Lock()
defer r.m.Unlock()
if r.byRun == nil {
r.byRun = make(map[common.RunID]map[common.CLID]int64, 1)
}
m := r.byRun[rid]
if m == nil {
m = make(map[common.CLID]int64, 1)
r.byRun[rid] = m
}
for _, e := range events.GetEvents() {
clid := common.CLID(e.GetClid())
m[clid] = max(m[clid], e.GetEversion())
}
return nil
}
type tjMock struct {
clsNotified map[common.CLID]struct {
prevMinEquiPS, curMinEquiPS int32
}
mutex sync.Mutex
}
func (t *tjMock) ScheduleCancelStale(ctx context.Context, clid common.CLID, prevMinEquivalentPatchset, currentMinEquivalentPatchset int32, eta time.Time) error {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.clsNotified == nil {
t.clsNotified = make(map[common.CLID]struct {
prevMinEquiPS int32
curMinEquiPS int32
})
}
t.clsNotified[clid] = struct {
prevMinEquiPS int32
curMinEquiPS int32
}{
prevMinEquiPS: prevMinEquivalentPatchset,
curMinEquiPS: currentMinEquivalentPatchset,
}
return nil
}