blob: c61c8153ec77e2577761aa2a2619183509830dcd [file] [log] [blame]
/* Copyright 2018 Google Inc. All Rights Reserved. */
package digest
import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"
"time"
rpb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/groupcache/lru"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"go.chromium.org/goma/server/log"
cachepb "go.chromium.org/goma/server/proto/cache"
)
var (
cacheStats = stats.Int64(
"go.chromium.org/goma/server/remoteexec/digest.cache",
"digest cache operations",
stats.UnitDimensionless)
opKey = tag.MustNewKey("op")
fileExtKey = tag.MustNewKey("file_ext")
DefaultViews = []*view.View{
{
Name: "go.chromium.org/goma/server/remoteexec/digest.cache-entries",
Description: `number of digest cache entries`,
Measure: cacheStats,
Aggregation: view.Sum(),
},
{
Name: "go.chromium.org/goma/server/remoteexec/digest.cache-ops",
Description: `digest cache operations`,
Measure: cacheStats,
TagKeys: []tag.Key{
opKey,
fileExtKey,
},
Aggregation: view.Count(),
},
}
)
// Cache caches file's digest data.
type Cache struct {
c cachepb.CacheServiceClient
mu sync.Mutex
lru lru.Cache
}
// NewCache creates new cache for digest data.
func NewCache(c cachepb.CacheServiceClient, maxEntries int) *Cache {
cache := &Cache{
c: c,
}
cache.lru.MaxEntries = maxEntries
cache.lru.OnEvicted = cache.onEvicted
return cache
}
var errNoCacheClient = errors.New("no cache client")
func (c *Cache) cacheGet(ctx context.Context, key string) (*rpb.Digest, error) {
if c == nil || c.c == nil {
return nil, errNoCacheClient
}
resp, err := c.c.Get(ctx, &cachepb.GetReq{
Key: key,
})
if err != nil {
return nil, err
}
d := &rpb.Digest{}
err = proto.Unmarshal(resp.Kv.Value, d)
if err != nil {
return nil, err
}
return d, nil
}
func (c *Cache) cacheSet(ctx context.Context, key string, d *rpb.Digest) error {
if c == nil || c.c == nil {
return errNoCacheClient
}
v, err := proto.Marshal(d)
if err != nil {
return err
}
_, err = c.c.Put(ctx, &cachepb.PutReq{
Kv: &cachepb.KV{
Key: key,
Value: v,
},
})
return err
}
// Get gets source's digest.
func (c *Cache) Get(ctx context.Context, key string, src Source) (Data, error) {
var fileExt string
if gi, ok := src.(interface {
Filename() string
}); ok {
fileExt = filepathExt(gi.Filename())
}
if c != nil {
c.mu.Lock()
data, ok := c.lru.Get(lru.Key(key))
c.mu.Unlock()
if ok {
stats.RecordWithTags(ctx, []tag.Mutator{
tag.Upsert(opKey, "hit"),
tag.Upsert(fileExtKey, fileExt),
}, cacheStats.M(0))
// stochastically put it to cache client
// to make lru/lfu work?
return data.(Data), nil
}
}
var keystr string
if s := src.String(); strings.Contains(s, key) {
keystr = s
} else {
keystr = fmt.Sprintf("key:%s src:%s", key, src)
}
start := time.Now()
logger := log.FromContext(ctx)
// singleflight?
dk, err := c.cacheGet(ctx, key)
if err == nil {
logger.Infof("digest cache get %s => %v: %s", keystr, dk, time.Since(start))
d := New(src, dk)
if c != nil {
c.mu.Lock()
c.lru.Add(lru.Key(key), d)
c.mu.Unlock()
stats.RecordWithTags(ctx, []tag.Mutator{
tag.Upsert(opKey, "cache-get"),
tag.Upsert(fileExtKey, fileExt),
}, cacheStats.M(1))
}
return d, nil
}
op := "get-error"
if status.Code(err) == codes.NotFound {
op = "miss"
}
logger.Infof("digest cache %s %s %v: %s", op, keystr, err, time.Since(start))
stats.RecordWithTags(ctx, []tag.Mutator{
tag.Upsert(opKey, op),
tag.Upsert(fileExtKey, fileExt),
}, cacheStats.M(0))
d, err := FromSource(ctx, src)
if err != nil {
logger.Warnf("digest from source %s %v: %s", keystr, err, time.Since(start))
return nil, err
}
if c != nil {
c.mu.Lock()
c.lru.Add(lru.Key(key), d)
c.mu.Unlock()
logger.Infof("digest cache set %s => %v: %s", keystr, d, time.Since(start))
err = c.cacheSet(ctx, key, d.Digest())
op := "cache-set"
if err != nil {
logger.Warnf("digest cache set fail %s => %v: %v: %s", keystr, d, err, time.Since(start))
op = "cache-set-fail"
}
stats.RecordWithTags(ctx, []tag.Mutator{
tag.Upsert(opKey, op),
tag.Upsert(fileExtKey, fileExt),
}, cacheStats.M(1))
}
return d, nil
}
func (c *Cache) onEvicted(k lru.Key, value interface{}) {
ctx := context.Background()
logger := log.FromContext(ctx)
key := k.(string)
var filename, fileExt string
d := value.(Data)
if dd, ok := d.(data); ok {
src := dd.source
if gi, ok := src.(interface {
Filename() string
}); ok {
filename = gi.Filename()
fileExt = filepathExt(gi.Filename())
}
}
logger.Infof("digest cache evict %s %s %q", key, d.Digest(), filename)
stats.RecordWithTags(ctx, []tag.Mutator{
tag.Upsert(opKey, "evict"),
tag.Upsert(fileExtKey, fileExt),
}, cacheStats.M(-1))
}
func filepathExt(fname string) string {
ext := filepath.Ext(fname)
if strings.ContainsAny(ext, `/\`) {
// ext should not have path separtor.
// if it has, it would be in directory part.
ext = ""
}
return ext
}