blob: 8f3788fc6bc24fa4ec29322bdd93011a720d149c [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sweep
import (
"context"
"fmt"
"sync/atomic"
"testing"
taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/luci/server/tq/internal/reminder"
"go.chromium.org/luci/server/tq/internal/testutil"
. "github.com/smartystreets/goconvey/convey"
)
func TestBatching(t *testing.T) {
t.Parallel()
Convey("With BatchProcessor", t, func() {
ctx := context.Background()
db := &testutil.FakeDB{}
sub := &submitter{}
p := BatchProcessor{
Context: ctx,
DB: db,
Submitter: sub,
BatchSize: 3,
ConcurrentBatches: 20,
}
makeRem := func(id string) *reminder.Reminder {
r := &reminder.Reminder{ID: id}
r.AttachPayload(&reminder.Payload{
CreateTaskRequest: &taskspb.CreateTaskRequest{
Parent: id + " body",
},
})
So(db.SaveReminder(ctx, r), ShouldBeNil)
return r
}
var r []*reminder.Reminder
for i := 0; i < 100; i++ {
r = append(r, makeRem(fmt.Sprintf("rem-%d", i)))
}
So(db.AllReminders(), ShouldHaveLength, len(r))
Convey("Works", func() {
So(p.Start(), ShouldBeNil)
p.Enqueue(ctx, r)
So(p.Stop(), ShouldEqual, len(r))
So(sub.req, ShouldHaveLength, len(r))
So(db.AllReminders(), ShouldHaveLength, 0)
})
Convey("Waits for processing to finish", func() {
p.BatchSize = 1000 // to make sure Enqueue doesn't block
var stopped int32
ch := make(chan struct{})
sub.cb = func(*reminder.Payload) error {
if atomic.LoadInt32(&stopped) == 1 {
panic("processing while stopped")
}
ch <- struct{}{}
return nil
}
So(p.Start(), ShouldBeNil)
p.Enqueue(ctx, r)
result := make(chan int, 1)
go func() {
result <- p.Stop()
atomic.StoreInt32(&stopped, 1)
}()
for i := 0; i < len(r); i++ {
<-ch
}
So(<-result, ShouldEqual, len(r))
})
Convey("Context cancel", func() {
ctx, cancel := context.WithCancel(ctx)
p.Context = ctx
var stopped int32
var count int32
sub.cb = func(*reminder.Payload) error {
if atomic.LoadInt32(&stopped) == 1 {
panic("processing while stopped")
}
if count := atomic.AddInt32(&count, 1); count >= 50 {
if count > 50 {
return status.Errorf(codes.Canceled, "boo")
}
cancel()
}
return nil
}
p.Start()
p.Enqueue(ctx, r)
p.Stop()
atomic.StoreInt32(&stopped, 1)
So(sub.req, ShouldHaveLength, 50)
})
})
}