blob: 4d625511fb2838e35b37dc7240fa7e9dd3ff8aad [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastore
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"go.chromium.org/gae/service/info"
"golang.org/x/net/context"
. "github.com/smartystreets/goconvey/convey"
)
type counterFilter struct {
run int32
put int32
get int32
delete int32
}
func (cf *counterFilter) filter() RawFilter {
return func(c context.Context, rds RawInterface) RawInterface {
return &counterFilterInst{
RawInterface: rds,
counterFilter: cf,
}
}
}
type counterFilterInst struct {
RawInterface
*counterFilter
}
func (rc *counterFilterInst) Run(fq *FinalizedQuery, cb RawRunCB) error {
atomic.AddInt32(&rc.run, 1)
return rc.RawInterface.Run(fq, cb)
}
func (rc *counterFilterInst) PutMulti(keys []*Key, pmap []PropertyMap, cb NewKeyCB) error {
atomic.AddInt32(&rc.put, 1)
return rc.RawInterface.PutMulti(keys, pmap, cb)
}
func (rc *counterFilterInst) GetMulti(keys []*Key, meta MultiMetaGetter, cb GetMultiCB) error {
atomic.AddInt32(&rc.get, 1)
return rc.RawInterface.GetMulti(keys, meta, cb)
}
func (rc *counterFilterInst) DeleteMulti(keys []*Key, cb DeleteMultiCB) error {
atomic.AddInt32(&rc.delete, 1)
return rc.RawInterface.DeleteMulti(keys, cb)
}
func TestQueryBatch(t *testing.T) {
t.Parallel()
Convey("A testing datastore with a data set installed", t, func() {
c := info.Set(context.Background(), fakeInfo{})
fds := fakeDatastore{
entities: 2048,
}
c = SetRawFactory(c, fds.factory())
cf := counterFilter{}
c = AddRawFilters(c, cf.filter())
// Given query batch size, how many Run calls will be executed to pull
// "total" results?
expectedBatchRunCalls := func(batchSize, total int32) int32 {
if batchSize <= 0 {
return 1
}
exp := total / batchSize
if total%batchSize != 0 {
exp++
}
return exp
}
// Get all items in the query, then reset the counter.
all := []*CommonStruct(nil)
if err := GetAll(c, NewQuery(""), &all); err != nil {
panic(err)
}
cf.run = 0
for _, sizeBase := range []int32{
1,
16,
1024,
2048,
} {
// Adjust to hit edge cases.
for _, delta := range []int32{-1, 0, 1} {
batchSize := sizeBase + delta
if batchSize <= 0 {
continue
}
getAllBatch := func(c context.Context, batchSize int32, query *Query) ([]*CommonStruct, error) {
var out []*CommonStruct
err := RunBatch(c, batchSize, query, func(cs *CommonStruct) {
out = append(out, cs)
})
return out, err
}
Convey(fmt.Sprintf(`Batching with size %d installed`, batchSize), func() {
q := NewQuery("")
Convey(`Can retrieve all of the items.`, func() {
got, err := getAllBatch(c, batchSize, q)
So(err, ShouldBeNil)
So(got, ShouldResemble, all)
// One call for every sub-query, plus one to hit Stop.
runCalls := (int32(len(all)) / batchSize) + 1
So(cf.run, ShouldEqual, runCalls)
})
Convey(`With a limit of 128, will retrieve 128 items.`, func() {
const limit = 128
q = q.Limit(int32(limit))
got, err := getAllBatch(c, batchSize, q)
So(err, ShouldBeNil)
So(got, ShouldResemble, all[:limit])
So(cf.run, ShouldEqual, expectedBatchRunCalls(batchSize, limit))
})
})
}
}
Convey(`Test iterative Run with cursors.`, func() {
// This test will have a naive outer loop that fetches pages in large
// increments using cursors. The outer loop will use the Batcher
// internally, which will fetch smaller page sizes.
testIterativeRun := func(rounds, outerFetchSize, batchSize int32) error {
// Clear state and configure.
cf.run = 0
fds.entities = rounds * outerFetchSize
var (
outerCount int32
cursor Cursor
)
for i := int32(0); i < rounds; i++ {
// Fetch "outerFetchSize" items from our Batcher.
q := NewQuery("").Limit(outerFetchSize)
if cursor != nil {
q = q.Start(cursor)
}
err := RunBatch(c, batchSize, q, func(v CommonStruct, getCursor CursorCB) (err error) {
if v.Value != int64(outerCount) {
return fmt.Errorf("query value doesn't match count (%d != %d)", v.Value, outerCount)
}
outerCount++
// Retain our cursor from this round.
cursor, err = getCursor()
return
})
if err != nil {
return err
}
}
// Make sure we iterated through everything.
if outerCount != fds.entities {
return fmt.Errorf("query returned incomplete results (%d != %d)", outerCount, fds.entities)
}
// Make sure the appropriate number of real queries was executed.
expectedRunCount := expectedBatchRunCalls(batchSize, outerFetchSize) * rounds
if cf.run != expectedRunCount {
return fmt.Errorf("unexpected number of raw Run calls (%d != %d)", cf.run, expectedRunCount)
}
return nil
}
So(testIterativeRun(3, 2, 1), ShouldBeNil)
So(testIterativeRun(3, 5, 2), ShouldBeNil)
So(testIterativeRun(3, 1000, 250), ShouldBeNil)
// We'll use fetch/batch sizes that are not direct multiples of each other
// so we can test some incongruent boundaries.
So(testIterativeRun(3, 900, 250), ShouldBeNil)
})
})
}
func TestBatchFilter(t *testing.T) {
t.Parallel()
type IndexEntity struct {
_kind string `gae:"$kind,Index"`
Key *Key `gae:"$key"`
Value int64
}
Convey("A testing datastore", t, func() {
c := info.Set(context.Background(), fakeInfo{})
fds := fakeDatastore{}
c = SetRawFactory(c, fds.factory())
cf := counterFilter{}
c = AddRawFilters(c, cf.filter())
expectedRounds := func(constraint, size int) int {
v := size / constraint
if size%constraint != 0 {
v++
}
return v
}
for _, sz := range []int32{11, 10, 7, 5, 2} {
Convey(fmt.Sprintf("With maximunm Put size %d", sz), func(convey C) {
fds.convey = convey
fds.constraints.MaxGetSize = 10
fds.constraints.MaxPutSize = 10
fds.constraints.MaxDeleteSize = 10
css := make([]*IndexEntity, 10)
for i := range css {
css[i] = &IndexEntity{Value: int64(i + 1)}
}
So(Put(c, css), ShouldBeNil)
So(cf.put, ShouldEqual, expectedRounds(fds.constraints.MaxPutSize, len(css)))
for i, ent := range css {
So(ent.Key, ShouldNotBeNil)
So(ent.Key.IntID(), ShouldEqual, i+1)
}
Convey(`Get`, func() {
// Clear Value and Get, populating Value from Key.IntID.
for _, ent := range css {
ent.Value = 0
}
So(Get(c, css), ShouldBeNil)
So(cf.get, ShouldEqual, expectedRounds(fds.constraints.MaxGetSize, len(css)))
for i, ent := range css {
So(ent.Value, ShouldEqual, i+1)
}
})
Convey(`Delete`, func() {
// Record which entities get deleted.
var lock sync.Mutex
deleted := make(map[int64]struct{}, len(css))
fds.onDelete = func(k *Key) {
lock.Lock()
defer lock.Unlock()
deleted[k.IntID()] = struct{}{}
}
So(Delete(c, css), ShouldBeNil)
So(cf.delete, ShouldEqual, expectedRounds(fds.constraints.MaxDeleteSize, len(css)))
// Confirm that all entities have been deleted.
So(len(deleted), ShouldEqual, len(css))
for i := range css {
_, ok := deleted[int64(i+1)]
So(ok, ShouldBeTrue)
}
})
})
}
})
}