blob: 6a5f3840f2613d57654a6d0a85acdd75de22c285 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastorecache
import (
"context"
"math"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/errors"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/server/router"
"go.chromium.org/gae/service/datastore"
"go.chromium.org/gae/service/info"
)
const (
initialLoadLockRetries = 3
initialLoadDelay = 10 * time.Millisecond
// expireFactor determines when a cache entry is considered expired. If
// a cache entry isn't updated within expireFactor refresh intervals, it will
// be treated as expired. In a fail-open system this is a warning. In a fail-
// closed system, this is an error.
expireFactor = 3
// accessUpdateDeltaFactor is the fraction of the access update interval to
// use for prabilistic update staggering.
//
// If this value is N, the delta will be (ACCESS_UPDATE_INTERVAL / N). This
// value should always be >2. Higher values will cause more datastore writes
// but less potential concurrent writes. Lower values will cause more
// potential concurrent conflicts, but will require a lower datastore load.
// It's all generally unimpactful anyway because the operation is deconflicted
// via Locker, so a lot of concurrent writes will end up being no-ops.
accessUpdateDeltaFactor = 20
// DefaultCacheNamespace is the default datastore namespace for cache entries.
DefaultCacheNamespace = "luci.datastoreCache"
)
// Value is a cached value.
type Value struct {
// Schema is an optional schema string that will be encoded in the cache
// entry.
Schema string
// Data is the cache entry's data value.
Data []byte
// Description is an optional description string that will be added to the
// datastore entry for humans.
Description string
}
// Cache defines a generic basic datastore cache. Content that is added to
// the cache is periodically refreshed and (if unused) pruned by a supportive
// cron task.
//
// Using this cache requires a cache handler to be installed and a cron task to
// be configured to hit the handler's endpoint.
//
// The cache works as follows:
// - The cache is scanned for an item
// - Locks are used to provide best-effort deduplication of refreshes for
// the same entity. Inability to lock will not prevent cache operations.
// - Upon access a entry's "Accessed" timestamp will be updated to
// note that it is still in use. This happens probabilistically after an
// "access update interval" so we don't pointlessly incur the update cost
// every time.
//
// The support cron task will execute periodically and maintain the cache:
// - If a cached entry's "Accessed" timestamp falls too far behind, it will be
// deleted as part of a periodic cron task.
// - If the cached entry is near expiration, the cron task will refresh the
// entry's data.
//
// TODO(dnj): This caching scheme intentionally lends itself to sharding. This
// would be implemented by having the maintenance cron task kick off processing
// shard tasks, each querying a subset of the cached entity keyspace, rather
// than handling the whole key space itself. To this end, some areas of code for
// this cache will be programmed to operate on shards, even though currently the
// number of shards will always equal 1 (#0).
type Cache struct {
// Name is the name of this cache. This must be unique from other caches
// managed by this GAE instance, and will be used to differentiate cache
// keys from other caches sharing the same datastore.
//
// If Name is empty, the cache will choose a default name. It is critical
// that either Name or Namespace be unique to this cache, or else its cache
// entries will conflict with other cache instances.
Name string
// Namespace, if not empty, overrides the datastore namespace where entries
// for this cache will be stored. If empty, DefaultCacheNamespace will be
// used.
Namespace string
// AccessUpdateInterval is the amount of time after a cached entry has been
// last marked as accessed before it should be re-marked as accessed. We do
// this sparingly, enough that the entity is not likely to be considered
// candidate for pruning if it's being actively used.
//
// Recommended interval is 1 day. The only hard requirement is that this is
// less than the PruneInterval. If this is <= 0, cached entities will never
// have their access times updated.
AccessUpdateInterval time.Duration
// PruneFactor is the number of additional AccessUpdateInterval periods old
// that a cache entry can be before it becomes candidate for pruning.
//
// An entry becomes candidate for pruning after
// [AccessUpdateInterval * (PruneFactor+1)] time has passed since the last
// AccessUpdateInterval.
//
// If this is <= 0, no entities will ever be pruned. This is potentially
// acceptable when the total number of expected cached entries is expected to
// be low.
PruneFactor int
// Parallel is the number of parallel refreshes that this Handler type should
// execute during the course of a single maintenance run.
//
// If this is <= 0, at most one parallel request will happen per shard.
Parallel int
// HandlerFunc returns a Handler implementation to use for this cache. It is
// used both by the running application and by the manager cron to perform
// cache operations.
//
// If Handler is nil, or if Handler returns nil, the cache will not be
// accessible.
HandlerFunc func(context.Context) Handler
}
func (cache *Cache) withNamespace(ctx context.Context) context.Context {
// Put our basic sanity check here.
if cache.Name == "" && cache.Namespace == "" {
panic(errors.New("a Cache must specify either a Name or a Namespace"))
}
ns := cache.Namespace
if ns == "" {
ns = DefaultCacheNamespace
}
return info.MustNamespace(ctx, ns)
}
// pruneInterval calculates the prune interval. This is a function of the
// cache's AccessUpdateInterval and PruneFactor.
//
// If either AccessUpdateInterval or PruneFactor is <= 0, this will return 0,
// indicating no prune interval.
func (cache *Cache) pruneInterval() time.Duration {
if cache.AccessUpdateInterval > 0 && cache.PruneFactor > 0 {
return cache.AccessUpdateInterval * time.Duration(1+cache.PruneFactor)
}
return 0
}
func (cache *Cache) manager() *manager {
return &manager{
cache: cache,
queryBatchSize: 500,
}
}
// InstallCronRoute installs a handler for this Cache's management cron task
// into the supplied Router at the specified path.
//
// It is recommended to assert in the middleware that this endpoint is only
// accessible from a cron task.
func (cache *Cache) InstallCronRoute(path string, r *router.Router, base router.MiddlewareChain) {
m := cache.manager()
m.installCronRoute(path, r, base)
}
// Get retrieves a cached Value from the cache.
//
// If the value is not defined, the Handler's Refresh function will be used to
// obtain the value and, upon success, the Value will be added to the cache
// and returned.
//
// If the Refresh function returns an error, that error will be propagated
// as-is and returned.
func (cache *Cache) Get(ctx context.Context, key []byte) (Value, error) {
// Leave any current transaction.
ctx = datastore.WithoutTransaction(ctx)
var h Handler
if hf := cache.HandlerFunc; hf != nil {
h = hf(ctx)
}
if h == nil {
return Value{}, errors.New("unable to generate Handler")
}
// Determine which Locker to use.
locker := h.Locker(ctx)
if locker == nil {
locker = nopLocker{}
}
bci := boundCacheInst{
Cache: cache,
h: h,
locker: locker,
}
return bci.get(ctx, key)
}
// boundCacheInst is a Cache instance bound to a specific HTTP request.
//
// It contains a generated Handler and is bound to a specific cache request.
type boundCacheInst struct {
*Cache
h Handler
locker Locker
}
func (bci *boundCacheInst) get(ctx context.Context, key []byte) (Value, error) {
// Enter our cache namespace. Retain the user's namespace so, in the event
// that we have to call into the Handler, we do so with the user's setup.
userNS := info.GetNamespace(ctx)
ctx = bci.withNamespace(ctx)
// Get our current time. We will pass this around so that all methods that
// need to examine or assign time will use the same moment for this round.
now := datastore.RoundTime(clock.Now(ctx).UTC())
// Configure our datastore cache entry to Get.
e := entry{
CacheName: bci.Name,
Key: key,
}
// Install logging fields.
ctx = log.SetFields(ctx, log.Fields{
"namespace": bci.Namespace,
"key": e.keyHash(),
})
// Load the current cache entry.
var (
entryWasMissing = false
createCacheEntry = true
)
switch err := datastore.Get(ctx, &e); err {
case nil:
// We successfully retrieved the entry. Do we need to update its access
// time?
if ai := bci.AccessUpdateInterval; ai > 0 && shouldAttemptUpdate(ai, now, e.LastAccessed, bci.getRandFunc(ctx)) {
bci.tryUpdateLastAccessed(ctx, &e, now)
}
// Check if our cache entry has expired. If it has, we may be using stale
// data.
_ = bci.checkExpired(ctx, &e, now)
return e.toValue(), nil
case datastore.ErrNoSuchEntity:
// No cache entry in the datastore. We will have to Refresh.
entryWasMissing = true
default:
// Unexpected error from datastore.
log.WithError(err).Errorf(ctx, "Failed to retrieve cache entry from datastore.")
// We are failing open, so log the error. We will use Refresh to reload the
// cache entry, but will refrain from actually committing it, since this is
// a failure mode.
createCacheEntry = false
entryWasMissing = true
}
// We've chosen to refresh our entry. Many other parallel processes may also
// have chosen to do this, so let's lock around that refresh in an attempt
// to make this a singleton.
//
// We'll try a few times, sleeping a short time in between if we fail to get
// the lock. We don't want to wait too long and stall our parent application,
// though.
var (
attempts = 0
acquiredLock = false
refreshValue Value
delta time.Duration
)
err := retry.Retry(clock.Tag(ctx, "datastoreCacheLockRetry"),
transient.Only(bci.refreshRetryFactory), func() error {
// This is a retry. If the entry was missing before, check to see if some
// other process has since added it to the datastore.
// We only check this on the 2+ retry because we already checked this
// condition above in the initial datastore Get.
//
// If this fails, we'll continue to try and perform the Refresh.
if entryWasMissing && attempts > 0 {
switch err := datastore.Get(ctx, &e); err {
case nil:
// Successfully loaded the entry!
refreshValue = e.toValue()
createCacheEntry = false
return nil
case datastore.ErrNoSuchEntity:
// The entry doesn't exist yet.
default:
log.WithError(err).Warningf(ctx, "Error checking datastore for cache entry.")
}
}
attempts++
// Take out a lock on this entry and Refresh it.
err := bci.locker.TryWithLock(ctx, e.lockKey(), func(ctx context.Context) (err error) {
acquiredLock = true
// Perform our initial refresh.
refreshValue, delta, err = doRefresh(ctx, bci.h, &e, userNS)
return
})
switch err {
case nil:
// Successfully loaded.
return nil
case ErrFailedToLock:
// Retry after delay.
return transient.Tag.Apply(err)
default:
log.WithError(err).Warningf(ctx, "Unexpected failure obtaining initial load lock.")
return err
}
}, func(err error, d time.Duration) {
log.WithError(err).Debugf(ctx, "Failed to acquire initial refresh lock. Retrying...")
})
if err != nil {
// If we actually acquired the lock, then this refresh was a failure, and
// we will propagate the error (verbatim).
if acquiredLock {
return Value{}, err
}
log.WithError(err).Warningf(ctx, "Failed to cooperatively Refresh entry. Manually Refreshing.")
if refreshValue, delta, err = doRefresh(ctx, bci.h, &e, userNS); err != nil {
// Propagate the Refresh error verbatim.
log.WithError(err).Errorf(ctx, "Refresh returned an error.")
return Value{}, err
}
}
// We successfully performed the initial Refresh! Do we store the entry?
if createCacheEntry {
e.Created = now
e.LastRefreshed = now
e.LastAccessed = now
e.LastRefreshDelta = int64(delta)
e.loadValue(refreshValue)
switch err := datastore.Put(ctx, &e); err {
case nil:
log.Debugf(ctx, "Performed initial cache data Refresh for: %s", e.Description)
default:
log.WithError(err).Warningf(ctx, "Failed to Put initial cache data Refresh entry.")
}
}
return refreshValue, nil
}
func (bci *boundCacheInst) checkExpired(ctx context.Context, e *entry, now time.Time) bool {
// Is this entry significantly past its expiration threshold?
if now.Sub(e.LastRefreshed) >= (expireFactor * bci.h.RefreshInterval(e.Key)) {
log.Fields{
"lastRefreshed": e.LastRefreshed,
}.Infof(ctx, "Stale cache entry is past its refresh interval.")
return true
}
return false
}
func (bci *boundCacheInst) refreshRetryFactory() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: initialLoadDelay,
Retries: initialLoadLockRetries,
},
}
}
func (bci *boundCacheInst) tryUpdateLastAccessed(ctx context.Context, e *entry, now time.Time) {
// Take out a memcache lock on this entry. We do this to prevent multiple
// cache Gets that all examine "e" at the same time don't waste each other's
// time spamming last accessed updates.
err := bci.locker.TryWithLock(ctx, e.lockKey(), func(ctx context.Context) error {
e.LastAccessed = now
return datastore.Put(ctx, e)
})
switch err {
case nil, ErrFailedToLock:
// If we didn't get to update it, no big deal. The next access will try
// again now that the lock is free.
default:
// Something unexpected happened, so let's log it.
log.WithError(err).Warningf(ctx, "Failed to update cache entry 'last accessed' time.")
}
}
func (bci *boundCacheInst) getRandFunc(ctx context.Context) randFunc {
return func() float64 { return mathrand.Float64(ctx) }
}
func doRefresh(ctx context.Context, h Handler, e *entry, userNS string) (v Value, delta time.Duration, err error) {
// Execute the Refresh in the user's namespace.
start := clock.Now(ctx)
if v, err = h.Refresh(info.MustNamespace(ctx, userNS), e.Key, e.toValue()); err != nil {
return
}
delta = clock.Now(ctx).Sub(start)
return
}
// randFunc returns a random number between [0,1).
type randFunc func() float64
// shouldAttemptUpdate returns true if a cache entry update should be attempted
// given the supplied parameters.
//
// We probabilistically update the "last accessed" field based on how close we
// are to the configured AccessUpdateInterval using the xFetch function.
func shouldAttemptUpdate(ai time.Duration, now, lastAccessed time.Time, rf randFunc) bool {
return xFetch(now, lastAccessed.Add(ai), (ai / accessUpdateDeltaFactor), 1, rf)
}
// xFetch returns true if recomputation should be performed.
//
// The decision is probabilistic, based on the evaluation of a probability
// distribution that scales according to the time since the value was last
// recomputed.
//
// delta is a factor representing the amount of time that it takes to
// recalculate the value. Higher delta values allow for earlier recomputation
// for values that take longer to calculate.
//
// beta can be set to >1 to favor earlier recomputations; however, in practice
// beta=1 works well.
func xFetch(now, expiry time.Time, delta time.Duration, beta float64, rf randFunc) bool {
offset := time.Duration(float64(delta) * beta * math.Log(rf()))
return !now.Add(-offset).Before(expiry)
}