blob: 1901e311b92769b8271fccba3cf051c4daffd912 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastorecache
import (
"bytes"
"context"
"fmt"
"net/http"
"strconv"
"testing"
"time"
"go.chromium.org/luci/common/errors"
"go.chromium.org/gae/service/datastore"
. "github.com/smartystreets/goconvey/convey"
)
func testManagerImpl(t *testing.T, mgrTemplate *manager) {
t.Parallel()
Convey(`A testing manager setup`, t, withTestEnv(func(te *testEnv) {
// Build our Manager, using template parameters.
cache := makeTestCache("test-cache")
mgr := cache.manager()
if mgrTemplate != nil {
mgr.queryBatchSize = mgrTemplate.queryBatchSize
}
const cronPath = "/cron/manager"
mgr.installCronRoute(cronPath, te.Router, te.Middleware)
runCron := func() int {
datastore.GetTestable(te).CatchupIndexes()
resp, err := http.Get(te.Server.URL + cronPath)
if err != nil {
panic(fmt.Errorf("failed to GET: %s", err))
}
return resp.StatusCode
}
statsForShard := func(id int) *managerShardStats {
st := managerShardStats{
Shard: id + 1,
}
So(datastore.Get(cache.withNamespace(te), &st), ShouldBeNil)
return &st
}
// Useful times.
var (
now = te.Clock.Now()
accessedRecently = now.Add(-time.Second)
accessedNeedsPrune = now.Add(-cache.pruneInterval())
refreshUnnecessary = now.Add(-time.Second)
refreshNeeded = now.Add(-cache.refreshInterval)
)
Convey(`Can run on an empty data set.`, func() {
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
})
})
Convey(`Returns error code for cache update errors.`, func() {
So(datastore.Put(cache.withNamespace(te), &entry{
CacheName: cache.Name,
Key: []byte("foo"),
LastAccessed: accessedRecently,
LastRefreshed: refreshNeeded,
}), ShouldBeNil)
datastore.GetTestable(te).CatchupIndexes()
// No refresh function installed, so this will fail.
So(runCron(), ShouldEqual, http.StatusInternalServerError)
})
Convey(`With cache entries installed`, func() {
var entries []*entry
for i, ce := range []struct {
key string
lastAccessed time.Time
lastRefreshed time.Time
}{
{"idle", accessedRecently, refreshUnnecessary},
{"refreshMe", accessedRecently, refreshNeeded},
{"pruneMe", accessedNeedsPrune, refreshUnnecessary},
{"pruneMeInsteeadOfRefresh", accessedNeedsPrune, refreshNeeded},
} {
entries = append(entries, &entry{
CacheName: cache.Name,
Key: []byte(ce.key),
LastAccessed: ce.lastAccessed,
LastRefreshed: ce.lastRefreshed,
Data: []byte(ce.key),
Schema: "test",
Description: fmt.Sprintf("test entry #%d", i),
})
}
So(datastore.Put(cache.withNamespace(te), entries), ShouldBeNil)
Convey(`Will refresh/prune the entries.`, func() {
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) {
return Value{
Schema: "new schema",
Description: "new hotness",
Data: []byte("REFRESHED"),
}, nil
}
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
LastEntryCount: 4,
})
So(cache.refreshes, ShouldEqual, 1)
// All "refreshMe" should have their data updated.
for _, e := range entries {
if !bytes.Equal(e.Key, []byte("refreshMe")) {
continue
}
So(datastore.Get(cache.withNamespace(te), e), ShouldBeNil)
So(e.Data, ShouldResemble, []byte("REFRESHED"))
}
Convey(`A second run will not refresh any entries.`, func() {
cache.refreshFn = nil
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
LastEntryCount: 2, // Pruned half of them.
})
})
})
Convey(`Will not prune any entries if AccessUpdateInterval is 0.`, func() {
cache.refreshFn = func(ctx context.Context, key []byte, v Value) (Value, error) { return v, nil }
cache.AccessUpdateInterval = 0
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
LastEntryCount: 4,
})
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
LastEntryCount: 4, // None pruned.
})
})
Convey(`Will not prune any entries if PruneFactor is 0.`, func() {
cache.refreshFn = func(ctx context.Context, key []byte, v Value) (Value, error) { return v, nil }
cache.PruneFactor = 0
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
LastEntryCount: 4,
})
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
LastEntryCount: 4, // None pruned.
})
})
Convey(`Will error if datastore PutMulti is broken.`, func() {
te.DatastoreFB.BreakFeatures(errors.New("test error"), "PutMulti")
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) { return Value{}, nil }
So(runCron(), ShouldEqual, http.StatusInternalServerError)
So(cache.refreshes, ShouldEqual, 1)
})
Convey(`Will error if datastore DeleteMulti is broken.`, func() {
te.DatastoreFB.BreakFeatures(errors.New("test error"), "DeleteMulti")
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) { return Value{}, nil }
So(runCron(), ShouldEqual, http.StatusInternalServerError)
So(cache.refreshes, ShouldEqual, 1)
})
Convey(`Will not refresh entries if there is an error.`, func() {
// (The error is that we are refreshing, but there is no refresh
// handler installed).
So(runCron(), ShouldEqual, http.StatusInternalServerError)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: time.Time{},
LastEntryCount: 4,
})
})
Convey(`Will delete cache entries, if requested.`, func() {
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) { return Value{}, ErrDeleteCacheEntry }
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
LastEntryCount: 4,
})
// One of the four should have been refreshed, and requeted deletion.
So(cache.refreshes, ShouldEqual, 1)
// Do a second run. Nothing should be refreshed, since all refresh
// candidates were deleted.
cache.reset()
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: now,
LastEntryCount: 1, // Only "idle" remains.
})
// One of the four should have been refreshed, and requeted
// deletion.
So(cache.refreshes, ShouldEqual, 0)
})
})
Convey(`With cache entries, but no Handler function.`, func() {
cache.HandlerFunc = nil
var entries []*entry
for i, la := range []time.Time{
accessedRecently,
accessedNeedsPrune,
} {
entries = append(entries, &entry{
CacheName: cache.Name,
Key: []byte(strconv.Itoa(i)),
LastAccessed: la,
LastRefreshed: refreshNeeded,
Data: nil,
Schema: "test",
Description: fmt.Sprintf("test entry #%d", i),
})
}
So(datastore.Put(cache.withNamespace(te), entries), ShouldBeNil)
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: te.Clock.Now(),
LastEntryCount: 2,
})
// All are expired.
te.Clock.Add(cache.pruneInterval())
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: te.Clock.Now(),
LastEntryCount: 1,
})
// All have now been pruned.
So(runCron(), ShouldEqual, http.StatusOK)
So(statsForShard(0), ShouldResemble, &managerShardStats{
Shard: 1,
LastSuccessfulRun: te.Clock.Now(),
LastEntryCount: 0,
})
})
Convey(`Will return an error if Run is broken during Handler query.`, func() {
te.DatastoreFB.BreakFeatures(errors.New("test error"), "Run")
So(runCron(), ShouldEqual, http.StatusInternalServerError)
})
Convey(`Will return an error if Run is broken during entry refresh.`, func() {
// We will Put "queryBatchSize+1" expired entries, then execute our
// handler. The first round will refresh, at which point our refresh
// handler will break Run. The next round will encounter the broken Run
// and
entries := make([]*entry, mgr.queryBatchSize+1)
for i := range entries {
entries[i] = &entry{
CacheName: cache.Name,
Key: []byte(fmt.Sprintf("entry-%d", i)),
LastAccessed: accessedRecently,
}
}
So(datastore.Put(cache.withNamespace(te), entries), ShouldBeNil)
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) {
te.DatastoreFB.BreakFeatures(errors.New("test error"), "Run")
return Value{}, nil
}
So(runCron(), ShouldEqual, http.StatusInternalServerError)
})
}))
}
// TestManager runs the manager tests against the production manager instance.
// This exercises the production settings.
func TestManager(t *testing.T) {
testManagerImpl(t, nil)
}
// TestManagerConstrained runs the manager tests against a resource-constrained
// manager instance. This exercises limited (but value) manager settings.
func TestManagerConstrained(t *testing.T) {
testManagerImpl(t, &manager{
queryBatchSize: 1,
})
}