blob: 8ad85b68463dd339eeadeb496f33ff6d8a10cc4c [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package layered provides a two-layer cache for serializable objects.
package layered
import (
"context"
"encoding/binary"
"fmt"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/caching/lru"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/server/caching"
)
// ErrCantSatisfyMinTTL is returned by GetOrCreate if the factory function
// produces an item that expires sooner than the requested MinTTL.
var ErrCantSatisfyMinTTL = errors.New("new item produced by the factory has insufficient TTL")
// Cache implements a cache of serializable objects on top of process and
// global caches.
//
// If the global cache is not available or fails, degrades to using only process
// cache.
//
// Since global cache errors are ignored, gives no guarantees of consistency or
// item uniqueness. Thus supposed to be used only when caching results of
// computations without side effects.
type Cache struct {
// ProcessLRUCache is a handle to a process LRU cache that holds the data.
ProcessLRUCache caching.LRUHandle
// GlobalNamespace is a global cache namespace to use for the data.
GlobalNamespace string
// Marshal converts an item being cached to a byte blob.
Marshal func(item interface{}) ([]byte, error)
// Unmarshal takes output of Marshal and converts it to an item to return.
Unmarshal func(blob []byte) (interface{}, error)
}
// Option is a base interface of options for GetOrCreate call.
type Option interface {
apply(opts *options)
}
// WithMinTTL specifies minimal acceptable TTL (Time To Live) of the returned
// cached item.
//
// If the currently cached item expires sooner than the requested TTL, it will
// be forcefully refreshed. If the new (refreshed) item also expires sooner
// than the requested min TTL, GetOrCreate will return ErrCantSatisfyMinTTL.
func WithMinTTL(ttl time.Duration) Option {
if ttl <= 0 {
panic("ttl must be positive")
}
return minTTLOpt(ttl)
}
// WithRandomizedExpiration enables randomized early expiration.
//
// This is only useful if cached items are used highly concurrently from many
// goroutines.
//
// On each cache access if the remaining TTL of the cached item is less than
// 'threshold', it may randomly be considered already expired (with probability
// increasing when item nears its true expiration).
//
// This is useful to avoid a situation when many concurrent consumers discover
// at the same time that the item has expired, and then all proceed waiting
// for a refresh. With randomized early expiration only the most unlucky
// consumer will trigger the refresh and will be blocked on it.
func WithRandomizedExpiration(threshold time.Duration) Option {
if threshold < 0 {
panic("threshold must be positive")
}
return expRandThresholdOpt(threshold)
}
// GetOrCreate attempts to grab an item from process or global cache, or create
// it if it's not cached yet.
//
// Fetching an item from the global cache or instantiating a new item happens
// under a per-key lock.
//
// Expiration time is used with seconds precision. Zero expiration time means
// the item doesn't expire on its own.
func (c *Cache) GetOrCreate(ctx context.Context, key string, fn lru.Maker, opts ...Option) (interface{}, error) {
if c.GlobalNamespace == "" {
panic("empty namespace is forbidden, please specify GlobalNamespace")
}
o := options{}
for _, opt := range opts {
opt.apply(&o)
}
now := clock.Now(ctx)
lru := c.ProcessLRUCache.LRU(ctx)
// Check that the item is in the local cache, its TTL is acceptable and we
// don't want to randomly prematurely expire it, see WithRandomizedExpiration.
var ignored *itemWithExp
if v, ok := lru.Get(ctx, key); ok {
item := v.(*itemWithExp)
if item.isAcceptableTTL(now, o.minTTL) && !item.randomlyExpired(ctx, now, o.expRandThreshold) {
return item.val, nil
}
ignored = item
}
// Either the item is not in the local cache, or the cached copy expires too
// soon or we randomly decided that we want to prematurely refresh it. Attempt
// to fetch from the global cache or create a new one. Disable expiration
// randomization at this point, it has served its purpose already, since only
// unlucky callers will reach this code path.
v, err := lru.Create(ctx, key, func() (interface{}, time.Duration, error) {
// Now that we have the lock, recheck that the item still needs a refresh.
// Purposely ignore an item we decided we want to prematurely expire.
if v, ok := lru.Get(ctx, key); ok {
if item := v.(*itemWithExp); item != ignored && item.isAcceptableTTL(now, o.minTTL) {
return item, item.expiration(now), nil
}
}
// Attempt to grab it from the global cache, verifying TTL is acceptable.
if item := c.maybeFetchItem(ctx, key); item != nil && item.isAcceptableTTL(now, o.minTTL) {
return item, item.expiration(now), nil
}
// Either a cache miss, problems with the cached item or its TTL is not
// acceptable. Need a to make a new item.
var item itemWithExp
val, exp, err := fn()
item.val = val
switch {
case err != nil:
return nil, 0, err
case exp < 0:
panic("the expiration time must be non-negative")
case exp > 0: // note: if exp == 0 we want item.exp to be zero
item.exp = now.Add(exp)
if !item.isAcceptableTTL(now, o.minTTL) {
// If 'fn' is incapable of generating an item with sufficient TTL there's
// nothing else we can do.
return nil, 0, ErrCantSatisfyMinTTL
}
}
// Store the new item in the global cache. We may accidentally override
// an item here if someone else refreshed it already. But this is
// unavoidable given GlobalCache semantics and generally rare and harmless
// (given Cache guarantees or rather lack of there of).
if err := c.maybeStoreItem(ctx, key, &item, now); err != nil {
return nil, 0, err
}
return &item, item.expiration(now), nil
})
if err != nil {
return nil, err
}
return v.(*itemWithExp).val, nil
}
////////////////////////////////////////////////////////////////////////////////
// formatVersionByte indicates what serialization format is used, it is stored
// as a first byte of the serialized data.
//
// Serialized items with different value of the first byte are rejected.
const formatVersionByte = 1
// options is collection of options for GetOrCreate.
type options struct {
minTTL time.Duration
expRandThreshold time.Duration
}
type minTTLOpt time.Duration
type expRandThresholdOpt time.Duration
func (o minTTLOpt) apply(opts *options) { opts.minTTL = time.Duration(o) }
func (o expRandThresholdOpt) apply(opts *options) { opts.expRandThreshold = time.Duration(o) }
// itemWithExp is what is actually stored (pointer to it) in the process cache.
//
// It is a user-generated value plus its expiration time (or zero time if it
// doesn't expire).
type itemWithExp struct {
val interface{}
exp time.Time
}
// isAcceptableTTL returns true if item's TTL is large enough.
func (i *itemWithExp) isAcceptableTTL(now time.Time, minTTL time.Duration) bool {
if i.exp.IsZero() {
return true // never expires
}
// Note: '>=' must not be used here, since minTTL may be 0, and we don't want
// to return true on zero expiration.
return i.exp.Sub(now) > minTTL
}
// randomlyExpired returns true if the item must be considered already expired.
//
// See WithRandomizedExpiration for the rationale. The context is used only to
// grab RNG.
func (i *itemWithExp) randomlyExpired(ctx context.Context, now time.Time, threshold time.Duration) bool {
if i.exp.IsZero() {
return false // never expires
}
ttl := i.exp.Sub(now)
if ttl > threshold {
return false // far from expiration, no need to enable randomization
}
// TODO(vadimsh): The choice of distribution here was made arbitrary. Some
// literature suggests to use exponential distribution instead, but it's not
// clear how to pick parameters for it. In practice what we do here seems good
// enough. On each check we randomly expire the item with probability
// p = (threshold - ttl) / threshold. Closer the item to its true expiration
// (ttl is smaller), higher the probability.
rnd := time.Duration(mathrand.Int63n(ctx, int64(threshold)))
return rnd > ttl
}
// expiration returns expiration time to use when storing this item.
//
// Zero return value means "does not expire" (as understood by both LRU and
// Global caches). Panics if the calculated expiration is negative. Use
// isAcceptableTTL to detect this case beforehand.
func (i *itemWithExp) expiration(now time.Time) time.Duration {
if i.exp.IsZero() {
return 0 // never expires
}
d := i.exp.Sub(now)
if d <= 0 {
panic("item is already expired, isAcceptableTTL should have detected this")
}
return d
}
// maybeFetchItem attempts to fetch the item from the global cache.
//
// If the global cache is not available or the cached item there is broken
// returns nil. Logs errors inside.
func (c *Cache) maybeFetchItem(ctx context.Context, key string) *itemWithExp {
g := caching.GlobalCache(ctx, c.GlobalNamespace)
if g == nil {
return nil
}
blob, err := g.Get(ctx, key)
if err != nil {
if err != caching.ErrCacheMiss {
logging.WithError(err).Errorf(ctx, "Failed to read item %q from the global cache", key)
}
return nil
}
item, err := c.deserializeItem(blob)
if err != nil {
logging.WithError(err).Errorf(ctx, "Failed to deserialize item %q", key)
return nil
}
return item
}
// maybeStoreItem puts the item in the global cache, if possible.
//
// It returns an error only if the serialization fails. It generally means the
// serialization code is buggy and should be adjusted.
//
// Global cache errors are logged and ignored.
func (c *Cache) maybeStoreItem(ctx context.Context, key string, item *itemWithExp, now time.Time) error {
g := caching.GlobalCache(ctx, c.GlobalNamespace)
if g == nil {
return nil
}
blob, err := c.serializeItem(item)
if err != nil {
return err
}
if err = g.Set(ctx, key, blob, item.expiration(now)); err != nil {
logging.WithError(err).Errorf(ctx, "Failed to store item %q in the global cache", key)
}
return nil
}
// serializeItem packs item and its expiration time into a byte blob.
func (c *Cache) serializeItem(item *itemWithExp) ([]byte, error) {
blob, err := c.Marshal(item.val)
if err != nil {
return nil, err
}
var deadline uint64
if !item.exp.IsZero() {
deadline = uint64(item.exp.Unix())
}
// <version_byte> + <uint64 deadline timestamp> + <blob>
output := make([]byte, 9+len(blob))
output[0] = formatVersionByte
binary.LittleEndian.PutUint64(output[1:], deadline)
copy(output[9:], blob)
return output, nil
}
// deserializeItem is reverse of serializeItem.
func (c *Cache) deserializeItem(blob []byte) (item *itemWithExp, err error) {
if len(blob) < 9 {
err = fmt.Errorf("the received buffer is too small")
return
}
if blob[0] != formatVersionByte {
err = fmt.Errorf("bad format version, expecting %d, got %d", formatVersionByte, blob[0])
return
}
item = &itemWithExp{}
deadline := binary.LittleEndian.Uint64(blob[1:])
if deadline != 0 {
item.exp = time.Unix(int64(deadline), 0)
}
item.val, err = c.Unmarshal(blob[9:])
return
}