blob: e490428efff2e319469345523dc2f844df3a6b4d [file] [log] [blame]
// Copyright 2017 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package cache
import (
"bytes"
"context"
"errors"
"expvar"
"sync"
"cloud.google.com/go/storage"
"github.com/golang/groupcache/lru"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"go.chromium.org/goma/server/cache/gcs"
"go.chromium.org/goma/server/log"
cachepb "go.chromium.org/goma/server/proto/cache"
)
// cache is a wrapper around an *lru.Cache that adds synchronization,
// and counts the size of all keys and values.
type memcache struct {
MaxBytes int64
mu sync.RWMutex
nbytes int64 // of all keys and vlaues
lru *lru.Cache
nhit, nget int64
nevict int64 // number of evictions
nreplace int64
}
var errNoChange = errors.New("cache: no change")
type replaceError struct {
old []byte
}
func (r replaceError) Error() string {
return "cache: value is replaced"
}
// Put puts key-value pair in memcache.
// It returns errNoChange if key-value pair was already stored.
// It returns replaceError if value is replaced.
func (c *memcache) Put(ctx context.Context, key string, value []byte) error {
span := trace.FromContext(ctx)
span.Annotatef(nil, "put %s (size:%d)", key, len(value))
c.mu.Lock()
defer c.mu.Unlock()
err := c.add(ctx, key, value)
if c.MaxBytes == 0 {
return err
}
for {
if c.nbytes < c.MaxBytes {
return err
}
span.Annotatef(nil, "eviction %d exceeding max=%d", c.nbytes, c.MaxBytes)
c.lru.RemoveOldest()
}
}
// add adds key-value pair in memcache.
// It returns errNoChange if key-value pair already exists in memcache.
// It returns replaceError if key exists but value differs.
func (c *memcache) add(ctx context.Context, key string, value []byte) error {
logger := log.FromContext(ctx)
if c.lru == nil {
c.lru = &lru.Cache{
OnEvicted: func(key lru.Key, value interface{}) {
logger := log.FromContext(context.Background())
logger.Infof("mem.evict %s %d", key.(string), len(value.([]byte)))
c.nbytes -= int64(len(key.(string))) + int64(len(value.([]byte)))
c.nevict++
},
}
}
var err error
vi, ok := c.lru.Get(key)
if ok {
ov := vi.([]byte)
if bytes.Equal(ov, value) {
logger.Infof("mem.put2 %s %d", key, len(value))
return errNoChange
}
logger.Errorf("mem.repl %s %d <= %d", key, len(value), len(ov))
// replace won't call OnEvicted.
c.nbytes -= int64(len(key)) + int64(len(ov))
c.nreplace++
err = replaceError{old: ov}
} else {
logger.Infof("mem.put %s %d", key, len(value))
}
c.lru.Add(key, value)
c.nbytes += int64(len(key)) + int64(len(value))
return err
}
func (c *memcache) Get(ctx context.Context, key string) (value []byte, ok bool) {
span := trace.FromContext(ctx)
span.Annotatef(nil, "get %s", key)
logger := log.FromContext(ctx)
c.mu.Lock()
defer c.mu.Unlock()
c.nget++
if c.lru == nil {
logger.Infof("mem.miss %s", key)
return nil, false
}
vi, ok := c.lru.Get(key)
if !ok {
logger.Infof("mem.miss %s", key)
return nil, false
}
c.nhit++
logger.Infof("mem.hit %s %d", key, len(vi.([]byte)))
return vi.([]byte), true
}
// TODO: use opencensus stats, view.
type memstats struct {
MaxBytes int64
Bytes int64
Num int
Hits int64
Gets int64
Evicts int64
Replaces int64
}
func (c *memcache) stats() memstats {
c.mu.RLock()
defer c.mu.RUnlock()
var num int
if c.lru != nil {
num = c.lru.Len()
}
return memstats{
MaxBytes: c.MaxBytes,
Bytes: c.nbytes,
Num: num,
Hits: c.nhit,
Gets: c.nget,
Evicts: c.nevict,
Replaces: c.nreplace,
}
}
// Config is a configuration for Cache.
type Config struct {
// MaxBytes is maximum number of bytes used for cache.
MaxBytes int64
// TODO:
// Dir string
// MaxDiskBytes int64
Bucket *storage.BucketHandle
}
// TODO: put it in Config?
const writeBackSemaphore = 8
// Cache represents key-value cache.
type Cache struct {
mem memcache
gcs *gcs.Cache
wbsema chan bool
}
var (
mutex sync.RWMutex
caches []*Cache
)
// New creates new Cache for Config.
func New(c Config) (*Cache, error) {
cache := &Cache{
mem: memcache{
MaxBytes: c.MaxBytes,
},
}
if c.Bucket != nil {
cache.gcs = gcs.New(c.Bucket)
cache.wbsema = make(chan bool, writeBackSemaphore)
}
mutex.Lock()
caches = append(caches, cache)
mutex.Unlock()
return cache, nil
}
// Put puts new key-value pair in memcache (always; i.e. overwrite existing one)
// and cloud cache (if gcs is configured, and new value is put).
// It returns error if it fails to put cache in cloud storage.
func (c *Cache) Put(ctx context.Context, req *cachepb.PutReq) (*cachepb.PutResp, error) {
err := c.mem.Put(ctx, req.Kv.Key, req.Kv.Value)
if err == errNoChange {
return &cachepb.PutResp{}, nil
}
if c.gcs == nil {
return &cachepb.PutResp{}, nil
}
if req.WriteBack {
ctx, _ := trace.StartSpanWithRemoteParent(context.Background(), "go.chromium.org/goma/server/cache.Cache.Put.WriteBack", trace.FromContext(ctx).SpanContext())
// TODO: pass tag?
go func(ctx context.Context) {
logger := log.FromContext(ctx)
c.wbsema <- true
defer func() {
<-c.wbsema
}()
logger.Infof("gcs.put write back %s", req.Kv.Key)
_, err := c.gcs.Put(ctx, req)
if err != nil {
logger.Errorf("gcs.put write back %s: %v", req.Kv.Key, err)
return
}
logger.Infof("gcs.put write back %s: OK", req.Kv.Key)
}(ctx)
return &cachepb.PutResp{}, nil
}
return c.gcs.Put(ctx, req)
}
// Get gets key-value for requested key.
// It returns codes.NotFound if value not found in cache.
func (c *Cache) Get(ctx context.Context, req *cachepb.GetReq) (*cachepb.GetResp, error) {
resp := &cachepb.GetResp{}
v, ok := c.mem.Get(ctx, req.Key)
if ok {
resp.Kv = &cachepb.KV{
Key: req.Key,
Value: v,
}
resp.InMemory = true
return resp, nil
}
if req.Fast || c.gcs == nil {
return nil, grpc.Errorf(codes.NotFound, "cache.Get: not found %s", req.Key)
}
resp, err := c.gcs.Get(ctx, req)
if err != nil || resp.Kv == nil {
return nil, grpc.Errorf(codes.NotFound, "cache.Get(%s): %v", req.Key, err)
}
c.mem.Put(ctx, req.Key, resp.Kv.Value)
return resp, nil
}
type stats struct {
Mem memstats
GCS gcs.Stats
}
func (c *Cache) stats() stats {
return stats{
Mem: c.mem.stats(),
GCS: c.gcs.Stats(),
}
}
func publishStats() interface{} {
mutex.RLock()
defer mutex.RUnlock()
var r []stats
for _, cache := range caches {
r = append(r, cache.stats())
}
return r
}
func init() {
expvar.Publish("cache", expvar.Func(publishStats))
}