blob: 06137c024bf0eb3c44becb99d9eed34de971c5b9 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package coordinator
import (
"bytes"
"compress/zlib"
"fmt"
"io/ioutil"
"strings"
"time"
"github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/logdog/common/storage/caching"
"github.com/luci/gae/service/memcache"
"golang.org/x/net/context"
)
const (
schemaVersion = "v1"
// defaultCompressionThreshold is the threshold where entries will become
// compressed. If the size of data exceeds this threshold, it will be
// compressed with zlib in the cache.
defaultCompressionThreshold = 64 * 1024 // 64KiB
)
// StorageCache implements a generic caching.Cache for Storage instances.
type StorageCache struct {
compressionThreshold int
}
// Get implements caching.Cache.
func (sc *StorageCache) Get(c context.Context, items ...*caching.Item) {
mcItems := make([]memcache.Item, len(items))
for i, itm := range items {
mcItems[i] = memcache.NewItem(c, sc.mkCacheKey(itm))
}
err := memcache.Get(c, mcItems...)
sc.memcacheErrCB(err, len(mcItems), func(err error, i int) {
// By default, no data.
items[i].Data = nil
switch err {
case nil:
itemData := mcItems[i].Value()
if len(itemData) == 0 {
log.Warningf(c, "Cached storage missing compression byte.")
return
}
isCompressed, itemData := itemData[0], itemData[1:]
if isCompressed != 0x00 {
// This entry is compressed.
zr, err := zlib.NewReader(bytes.NewReader(itemData))
if err != nil {
log.Fields{
log.ErrorKey: err,
"key": mcItems[i].Key(),
}.Warningf(c, "Failed to create ZLIB reader.")
return
}
defer zr.Close()
if itemData, err = ioutil.ReadAll(zr); err != nil {
log.Fields{
log.ErrorKey: err,
"key": mcItems[i].Key(),
}.Warningf(c, "Failed to decompress cached item.")
return
}
}
items[i].Data = itemData
case memcache.ErrCacheMiss:
break
default:
log.Fields{
log.ErrorKey: err,
"key": mcItems[i].Key(),
}.Warningf(c, "Error retrieving cached entry.")
}
})
}
// Put implements caching.Cache.
func (sc *StorageCache) Put(c context.Context, exp time.Duration, items ...*caching.Item) {
threshold := sc.compressionThreshold
if threshold == 0 {
threshold = defaultCompressionThreshold
}
var (
buf bytes.Buffer
zw zlib.Writer
usedZW bool
)
defer func() {
if usedZW {
zw.Close()
}
}()
mcItems := make([]memcache.Item, 0, len(items))
for _, itm := range items {
if itm.Data == nil {
continue
}
// Compress the data in the cache item.
writeItemData := func(d []byte) bool {
buf.Reset()
buf.Grow(len(d) + 1)
if len(d) < threshold {
// Do not compress the item. Write a "0x00" to indicate that it is
// not compressed.
if err := buf.WriteByte(0x00); err != nil {
log.WithError(err).Warningf(c, "Failed to write compression byte.")
return false
}
if _, err := buf.Write(d); err != nil {
log.WithError(err).Warningf(c, "Failed to write storage cache data.")
return false
}
return true
}
// Compress the item. Write a "0x01" to indicate that it is compressed.
zw := zlib.NewWriter(&buf)
if err := buf.WriteByte(0x01); err != nil {
log.WithError(err).Warningf(c, "Failed to write compression byte.")
return false
}
defer zw.Close()
if _, err := zw.Write(d); err != nil {
log.WithError(err).Warningf(c, "Failed to compress storage cache data.")
return false
}
if err := zw.Flush(); err != nil {
log.WithError(err).Warningf(c, "Failed to flush compressed storage cache data.")
return false
}
return true
}
if !writeItemData(itm.Data) {
continue
}
mcItem := memcache.NewItem(c, sc.mkCacheKey(itm))
mcItem.SetValue(append([]byte(nil), buf.Bytes()...))
if exp > 0 {
mcItem.SetExpiration(exp)
}
mcItems = append(mcItems, mcItem)
}
err := memcache.Set(c, mcItems...)
sc.memcacheErrCB(err, len(mcItems), func(err error, i int) {
switch err {
case nil, memcache.ErrNotStored:
break
default:
log.Fields{
log.ErrorKey: err,
"key": mcItems[i].Key(),
}.Warningf(c, "Error storing cached entry.")
}
})
}
func (*StorageCache) memcacheErrCB(err error, count int, cb func(error, int)) {
merr, _ := err.(errors.MultiError)
if merr != nil && len(merr) != count {
panic(fmt.Errorf("MultiError count mismatch (%d != %d)", len(merr), count))
}
for i := 0; i < count; i++ {
switch {
case err == nil:
cb(nil, i)
case merr == nil:
cb(err, i)
default:
cb(merr[i], i)
}
}
}
func (*StorageCache) mkCacheKey(itm *caching.Item) string {
return strings.Join([]string{
"storage_cache",
schemaVersion,
itm.Schema,
itm.Type,
itm.Key,
}, "_")
}