blob: 10d0992a5a5732e704cc673f133c59374227d0a5 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package submit
import (
"context"
"testing"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/gae/service/datastore"
cfgpb "go.chromium.org/luci/cv/api/config/v2"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/cvtesting"
. "github.com/smartystreets/goconvey/convey"
)
func TestQueue(t *testing.T) {
t.Parallel()
Convey("Queue", t, func() {
ct := cvtesting.Test{}
ctx, cancel := ct.SetUp()
defer cancel()
notifier := &fakeNotifier{}
const lProject = "lProject"
run1 := common.MakeRunID(lProject, clock.Now(ctx), 1, []byte("deaddead"))
run2 := common.MakeRunID(lProject, clock.Now(ctx), 1, []byte("beefbeef"))
run3 := common.MakeRunID(lProject, clock.Now(ctx), 1, []byte("cafecafe"))
mustLoadQueue := func() *queue {
q, err := loadQueue(ctx, lProject)
So(err, ShouldBeNil)
return q
}
mustTryAcquire := func(ctx context.Context, runID common.RunID, opts *cfgpb.SubmitOptions) bool {
var waitlisted bool
var innerErr error
err := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
waitlisted, innerErr = TryAcquire(ctx, notifier.notify, runID, opts)
return innerErr
}, nil)
So(innerErr, ShouldBeNil)
So(err, ShouldBeNil)
return waitlisted
}
Convey("TryAcquire", func() {
Convey("When queue is empty", func() {
waitlisted := mustTryAcquire(ctx, run1, nil)
So(waitlisted, ShouldBeFalse)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Current: run1,
})
Convey("And acquire same Run again", func() {
waitlisted := mustTryAcquire(ctx, run1, nil)
So(waitlisted, ShouldBeFalse)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Current: run1,
})
})
})
Convey("Waitlisted", func() {
So(datastore.Put(ctx, &queue{
ID: lProject,
Current: run1,
Opts: nil,
}), ShouldBeNil)
waitlisted := mustTryAcquire(ctx, run2, nil)
So(waitlisted, ShouldBeTrue)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Current: run1,
Waitlist: common.RunIDs{run2},
})
waitlisted = mustTryAcquire(ctx, run3, nil)
So(waitlisted, ShouldBeTrue)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Current: run1,
Waitlist: common.RunIDs{run2, run3},
})
Convey("And acquire same Run in waitlist again", func() {
for _, r := range []common.RunID{run2, run3} {
waitlisted := mustTryAcquire(ctx, r, nil)
So(waitlisted, ShouldBeTrue)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Current: run1,
Waitlist: common.RunIDs{run2, run3},
})
}
})
})
Convey("Promote the first in the Waitlist if Current is empty", func() {
So(datastore.Put(ctx, &queue{
ID: lProject,
Waitlist: common.RunIDs{run2, run3},
Opts: nil,
}), ShouldBeNil)
waitlisted := mustTryAcquire(ctx, run2, nil)
So(waitlisted, ShouldBeFalse)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Current: run2,
Waitlist: common.RunIDs{run3},
})
})
})
Convey("Release", func() {
mustRelease := func(ctx context.Context, runID common.RunID) {
var innerErr error
err := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
innerErr = Release(ctx, notifier.notify, runID)
return innerErr
}, nil)
So(innerErr, ShouldBeNil)
So(err, ShouldBeNil)
}
So(datastore.Put(ctx, &queue{
ID: lProject,
Current: run1,
Waitlist: common.RunIDs{run2, run3},
}), ShouldBeNil)
Convey("Current slot", func() {
mustRelease(ctx, run1)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Waitlist: common.RunIDs{run2, run3},
})
So(notifier.notifyETAs(ctx, run2), ShouldResemble, []time.Time{ct.Clock.Now()})
for _, r := range []common.RunID{run1, run3} {
So(notifier.notifyETAs(ctx, r), ShouldBeEmpty)
}
})
Convey("Run in waitlist", func() {
mustRelease(ctx, run2)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Current: run1,
Waitlist: common.RunIDs{run3},
})
for _, r := range []common.RunID{run1, run2, run3} {
So(notifier.notifyETAs(ctx, r), ShouldBeEmpty)
}
})
Convey("Non-existing run", func() {
nonExisting := common.MakeRunID(lProject, clock.Now(ctx), 1, []byte("badbadbad"))
mustRelease(ctx, nonExisting)
So(mustLoadQueue(), cvtesting.SafeShouldResemble, &queue{
ID: lProject,
Current: run1,
Waitlist: common.RunIDs{run2, run3},
})
for _, r := range []common.RunID{run1, run2, run3} {
So(notifier.notifyETAs(ctx, r), ShouldBeEmpty)
}
})
})
submitOpts := &cfgpb.SubmitOptions{
MaxBurst: 2,
BurstDelay: durationpb.New(1 * time.Minute),
}
now := datastore.RoundTime(ct.Clock.Now()).UTC()
Convey("With SubmitOpts", func() {
Convey("ReleaseOnSuccess", func() {
mustReleaseOnSuccess := func(ctx context.Context, runID common.RunID, submittedAt time.Time) {
var innerErr error
err := datastore.RunInTransaction(ctx, func(ctx context.Context) error {
innerErr = ReleaseOnSuccess(ctx, notifier.notify, runID, submittedAt)
return innerErr
}, nil)
So(innerErr, ShouldBeNil)
So(err, ShouldBeNil)
}
q := &queue{
ID: lProject,
Current: run1,
Waitlist: common.RunIDs{run2},
Opts: submitOpts,
}
So(datastore.Put(ctx, q), ShouldBeNil)
Convey("Records submitted timestamp", func() {
mustReleaseOnSuccess(ctx, run1, now.Add(-1*time.Second))
So(mustLoadQueue().History, ShouldResemble, []time.Time{now.Add(-1 * time.Second)})
})
Convey("Cleanup old history", func() {
q.History = []time.Time{
now.Add(-1 * time.Hour), // removed
now.Add(-1 * time.Minute), // kept
}
So(datastore.Put(ctx, q), ShouldBeNil)
mustReleaseOnSuccess(ctx, run1, now.Add(-1*time.Second))
So(mustLoadQueue().History, ShouldResemble, []time.Time{
now.Add(-1 * time.Minute),
now.Add(-1 * time.Second),
})
})
Convey("Ensure order", func() {
q.History = []time.Time{now.Add(-100 * time.Millisecond)}
So(datastore.Put(ctx, q), ShouldBeNil)
mustReleaseOnSuccess(ctx, run1, now.Add(-200*time.Millisecond))
So(mustLoadQueue().History, ShouldResemble, []time.Time{
now.Add(-200 * time.Millisecond),
now.Add(-100 * time.Millisecond),
})
})
Convey("Notify the next Run immediately if below max burst", func() {
q.History = []time.Time{now.Add(-2 * time.Minute)}
So(datastore.Put(ctx, q), ShouldBeNil)
mustReleaseOnSuccess(ctx, run1, now.Add(-1*time.Second))
So(notifier.notifyETAs(ctx, run2), ShouldResemble, []time.Time{ct.Clock.Now()})
})
Convey("Notify the next Run with later ETA if quota has been exhausted", func() {
q.History = []time.Time{
now.Add(-45 * time.Second),
now.Add(-30 * time.Second),
// next submit should happen on (now - 30s) + 1min = now + 30s
}
So(datastore.Put(ctx, q), ShouldBeNil)
mustReleaseOnSuccess(ctx, run1, now.Add(-1*time.Second))
So(notifier.notifyETAs(ctx, run2), ShouldResemble, []time.Time{now.Add(30 * time.Second)})
})
})
Convey("TryAcquire", func() {
q := &queue{
ID: lProject,
Opts: submitOpts,
}
So(datastore.Put(ctx, q), ShouldBeNil)
Convey("Succeeds if below max burst", func() {
q.History = []time.Time{
now.Add(-2 * time.Minute),
now.Add(-30 * time.Second),
}
So(datastore.Put(ctx, q), ShouldBeNil)
So(mustTryAcquire(ctx, run1, submitOpts), ShouldBeFalse)
So(notifier.notifyETAs(ctx, run1), ShouldBeNil)
})
Convey("Waitlist the Run and notify it with later ETA if quota has been exhausted", func() {
q.History = []time.Time{
now.Add(-2 * time.Minute),
now.Add(-45 * time.Second),
now.Add(-30 * time.Second),
// next submit should happen on (now - 45s) + 1min = now + 15s
}
So(datastore.Put(ctx, q), ShouldBeNil)
So(mustTryAcquire(ctx, run1, submitOpts), ShouldBeTrue)
So(notifier.notifyETAs(ctx, run1), ShouldResemble, []time.Time{now.Add(15 * time.Second)})
})
})
})
})
}
type fakeNotifier struct {
notifications map[common.RunID][]time.Time
}
func (f *fakeNotifier) notify(ctx context.Context, runID common.RunID, eta time.Time) error {
if f.notifications == nil {
f.notifications = make(map[common.RunID][]time.Time, 1)
}
if _, ok := f.notifications[runID]; !ok {
f.notifications[runID] = make([]time.Time, 0, 1)
}
f.notifications[runID] = append(f.notifications[runID], eta)
return nil
}
func (f *fakeNotifier) notifyETAs(ctx context.Context, runID common.RunID) []time.Time {
etas, ok := f.notifications[runID]
if !ok {
return nil
}
ret := make([]time.Time, len(etas))
copy(ret, etas)
return ret
}