blob: 65bcc7f63dda5f3f63b371be577b26b54d27742f [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastorecache
import (
"context"
"fmt"
"testing"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/gae/service/datastore"
"go.chromium.org/gae/service/info"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestCache(t *testing.T) {
t.Parallel()
Convey(`A testing Cache setup`, t, withTestEnv(func(te *testEnv) {
currentValue := Value{
Schema: "testSchema",
Description: "test cache entry",
Data: []byte("first"),
}
cache := makeTestCache("test")
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) { return currentValue, nil }
// otherLocker is a memLocker instance (used by makeTestCache) that
// identifies itself as a different client than the one running the test.
//
// This is used to create lock conflicts.
otherLocker := memLocker{"other client ID"}
Convey(`Will panic if it has neither a name or namespace configured.`, func() {
cache.Name = ""
cache.Namespace = ""
So(func() {
cache.Get(te, []byte("foo"))
}, ShouldPanic)
})
Convey(`Will error on Get if no Handler is installed.`, func() {
cache.HandlerFunc = nil
_, err := cache.Get(te, []byte("foo"))
So(err, ShouldErrLike, "unable to generate Handler")
})
Convey(`Can perform an initial Get.`, func() {
v, err := cache.Get(te, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, currentValue)
So(cache.refreshes, ShouldEqual, 1)
Convey(`A successive Get will load from datastore.`, func() {
cache.reset()
v, err := cache.Get(te, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, currentValue)
So(cache.refreshes, ShouldEqual, 0)
})
Convey(`After that entry has expired, a successive Get will return the stale data.`, func() {
cache.reset()
te.Clock.Add(expireFactor * cache.refreshInterval)
v, err := cache.Get(te, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, currentValue)
So(cache.refreshes, ShouldEqual, 0)
})
Convey(`When GetMulti is broken, a successive Get will refresh.`, func() {
cache.reset()
te.DatastoreFB.BreakFeatures(errors.New("test error"), "GetMulti")
v, err := cache.Get(te, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, currentValue)
So(cache.refreshes, ShouldEqual, 1)
})
})
Convey(`Can perform a Get while in another entity's transaction`, func() {
err := datastore.RunInTransaction(te, func(ctx context.Context) error {
// Hit some random entity first. Since we're not cross-group, this means
// that any invalid single-group datastore accesses from the cache will
// error.
r, err := datastore.Exists(ctx, datastore.PropertyMap{
"$kind": datastore.MkProperty("SomeEntitySomewhere"),
"$id": datastore.MkProperty(1),
})
if err != nil {
return err
}
So(r.Any(), ShouldBeFalse)
// Perform a cache Get.
v, err := cache.Get(ctx, []byte("foo"))
if err != nil {
return err
}
So(v, ShouldResemble, currentValue)
return nil
}, nil)
So(err, ShouldBeNil)
So(cache.refreshes, ShouldEqual, 1)
})
Convey(`The refresh function is called with the original namespace.`, func() {
c := info.MustNamespace(te, "dog")
cache.refreshFn = func(ctx context.Context, key []byte, v Value) (Value, error) {
So(info.GetNamespace(ctx), ShouldEqual, "dog")
return currentValue, nil
}
v, err := cache.Get(c, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, currentValue)
So(cache.refreshes, ShouldEqual, 1)
})
Convey(`When PutMulti is broken, will fail open.`, func() {
te.DatastoreFB.BreakFeatures(errors.New("test error"), "PutMulti")
cache.reset()
v, err := cache.Get(te, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, currentValue)
So(cache.refreshes, ShouldEqual, 1)
cache.reset()
v, err = cache.Get(te, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, currentValue)
So(cache.refreshes, ShouldEqual, 1)
})
Convey(`When Refresh returns an error, it is propagated.`, func() {
testErr := errors.New("test error")
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) { return Value{}, testErr }
_, err := cache.Get(te, []byte("foo"))
So(err, ShouldEqual, testErr)
_, err = cache.Get(te, []byte("foo"))
So(err, ShouldEqual, testErr)
})
Convey(`When the entry's lock is held`, func() {
e := entry{
CacheName: cache.Name,
Key: []byte("foo"),
}
// When a sleep is requested, automatically jump into the future.
var sleepCB func()
sleeps := 0
te.Clock.SetTimerCallback(func(d time.Duration, t clock.Timer) {
for _, tag := range testclock.GetTags(t) {
if tag == "datastoreCacheLockRetry" {
if sleepCB != nil {
sleepCB()
}
sleeps++
break
}
}
te.Clock.Add(d)
})
Convey(`Will manually refresh with original namespace after retries.`, func() {
cache.refreshFn = func(ctx context.Context, key []byte, v Value) (Value, error) {
So(info.GetNamespace(ctx), ShouldEqual, "dog")
return currentValue, nil
}
// Hold the entity's refresh lock. Our cache will not be able to acquire
// it.
var v Value
err := otherLocker.TryWithLock(cache.withNamespace(te), e.lockKey(), func(ctx context.Context) (err error) {
v, err = cache.Get(info.MustNamespace(te, "dog"), []byte("foo"))
return
})
So(err, ShouldBeNil)
So(v, ShouldResemble, currentValue)
So(sleeps, ShouldEqual, initialLoadLockRetries)
So(cache.refreshes, ShouldEqual, 1)
})
Convey(`Will propagate a lock-less refresh failure.`, func() {
terr := errors.New("test error")
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) { return Value{}, terr }
// Hold the entity's refresh lock. Our cache will not be able to acquire
// it.
err := otherLocker.TryWithLock(cache.withNamespace(te), e.lockKey(), func(ctx context.Context) (err error) {
_, err = cache.Get(te, []byte("foo"))
return
})
So(err, ShouldEqual, terr)
So(sleeps, ShouldEqual, initialLoadLockRetries)
So(cache.refreshes, ShouldEqual, 1)
})
Convey(`Will not refresh if the value is put in between sleeps.`, func() {
sleepCB = func() {
So(datastore.Put(cache.withNamespace(te), &entry{
CacheName: cache.Name,
Key: []byte("foo"),
Data: []byte("ohaithere"),
}), ShouldBeNil)
}
// Hold the entity's refresh lock. Our cache will not be able to acquire
// it.
err := otherLocker.TryWithLock(cache.withNamespace(te), e.lockKey(), func(ctx context.Context) (err error) {
_, err = cache.Get(te, []byte("foo"))
return
})
So(err, ShouldBeNil)
So(sleeps, ShouldEqual, 1)
So(cache.refreshes, ShouldEqual, 0)
})
Convey(`(Fail Open) Will refresh if GetMulti is broken during retries.`, func() {
te.DatastoreFB.BreakFeatures(errors.New("test error"), "GetMulti")
// Hold the entity's refresh lock. Our cache will not be able to acquire
// it.
err := otherLocker.TryWithLock(cache.withNamespace(te), e.lockKey(), func(ctx context.Context) (err error) {
_, err = cache.Get(te, []byte("foo"))
return
})
So(err, ShouldBeNil)
So(sleeps, ShouldEqual, initialLoadLockRetries)
So(cache.refreshes, ShouldEqual, 1)
})
})
Convey(`When multiple entries refresh at the same time, only one call is made.`, func() {
const agents = 32
// The first agent will acquire a memlock on this cache item. All of the
// other ones will enter a retry loop, attempting to get the lock. Each
// retry round will sleep a little in between attempts.
//
// Our first agent will acquire the lock and refresh. We'll block it there
// to let the other agents catch up. The other agents will fail to acquire
// the lock and sleep, where we'll intercept them in the sleep timer
// callback.
//
// After all of our agents are synchronized at the same point, we'll
// release our first agent and let it finish. This will Put the result in
// datastore. After the first agent finishes, we'll start time again and
// let the other agents retry. Their next attempt will find the result
// from the first agent in datastore and return it without refreshing
// themselves.
agentSleepingC := make(chan struct{})
agentReleaseC := make(chan struct{})
te.Clock.SetTimerCallback(func(d time.Duration, t clock.Timer) {
for _, tag := range testclock.GetTags(t) {
if tag == "datastoreCacheLockRetry" {
agentSleepingC <- struct{}{}
<-agentReleaseC
break
}
}
te.Clock.Add(d)
})
// Refresh will block pending refreshC. We'll release this when all of
// the other agents have tried to sleep following failure to lock.
refreshC := make(chan struct{})
cache.refreshFn = func(context.Context, []byte, Value) (Value, error) {
<-refreshC
return currentValue, nil
}
// Start our refresh agents in parallel.
values, errors := make([]Value, agents), make([]error, agents)
doneC := make(chan int)
for i := 0; i < agents; i++ {
go func(idx int) {
defer func() {
doneC <- idx
}()
c := info.GetTestable(te).SetRequestID(fmt.Sprintf("agent-%d", idx))
values[idx], errors[idx] = cache.Get(c, []byte("foo"))
}(i)
}
// Wait for all but one of our agents to sleep. The non-sleeping one will
// have gotten the lock on the entry.
for i := 1; i < agents; i++ {
<-agentSleepingC
}
// Allow our agent-with-lock to complete its Refresh.
close(refreshC)
// Wait for our agent-with-lock to finish.
<-doneC
// Release our sleeping agents.
close(agentReleaseC)
// Reap the remaining agents.
for i := 1; i < agents; i++ {
<-doneC
}
So(cache.refreshes, ShouldEqual, 1)
for i := 0; i < agents; i++ {
So(values[i], ShouldResemble, currentValue)
So(errors[i], ShouldBeNil)
}
})
Convey(`When accessing an entry past its access update threshold`, func() {
accessedAWhileAgo := te.Clock.Now().Add(-cache.AccessUpdateInterval)
ev := Value{
Schema: "test schema",
Data: []byte("bar"),
Description: "test entry",
}
e := entry{
CacheName: cache.Name,
Key: []byte("foo"),
LastRefreshed: te.Clock.Now(),
LastAccessed: accessedAWhileAgo,
}
e.loadValue(ev)
So(datastore.Put(cache.withNamespace(te), &e), ShouldBeNil)
Convey(`Will update its LastAccessed timestamp.`, func() {
v, err := cache.Get(te, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, ev)
So(datastore.Get(cache.withNamespace(te), &e), ShouldBeNil)
So(e.LastAccessed, ShouldResemble, te.Clock.Now())
})
Convey(`Will ignore accessed timestamp PutMulti failures.`, func() {
te.DatastoreFB.BreakFeatures(errors.New("test error"), "PutMulti")
v, err := cache.Get(te, []byte("foo"))
So(err, ShouldBeNil)
So(v, ShouldResemble, ev)
So(datastore.Get(cache.withNamespace(te), &e), ShouldBeNil)
So(e.LastAccessed, ShouldResemble, accessedAWhileAgo)
})
Convey(`Will ignore accessed timestamp if the entry's lock is held.`, func() {
var v Value
err := otherLocker.TryWithLock(cache.withNamespace(te), e.lockKey(), func(ctx context.Context) (err error) {
v, err = cache.Get(te, []byte("foo"))
return
})
So(err, ShouldBeNil)
So(v, ShouldResemble, ev)
So(datastore.Get(cache.withNamespace(te), &e), ShouldBeNil)
So(e.LastAccessed, ShouldResemble, accessedAWhileAgo)
})
})
}))
}