blob: bd030a723b6a95808f0d60387790f6fbc409073a [file] [log] [blame]
// Copyright 2015 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package dscache
import (
"time"
ds "github.com/luci/gae/service/datastore"
mc "github.com/luci/gae/service/memcache"
"github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"golang.org/x/net/context"
)
type dsCache struct {
ds.RawInterface
*supportContext
}
var _ ds.RawInterface = (*dsCache)(nil)
func (d *dsCache) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
return d.mutation(keys, func() error {
return d.RawInterface.DeleteMulti(keys, cb)
})
}
func (d *dsCache) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB) error {
return d.mutation(keys, func() error {
return d.RawInterface.PutMulti(keys, vals, cb)
})
}
func (d *dsCache) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetMultiCB) error {
lockItems, nonce := d.mkRandLockItems(keys, metas)
if len(lockItems) == 0 {
return d.RawInterface.GetMulti(keys, metas, cb)
}
if err := mc.Add(d.c, lockItems...); err != nil {
// Ignore this error. Either we couldn't add them because they exist
// (so, not an issue), or because memcache is having sad times (in which
// case we'll see so in the Get which immediately follows this).
}
if err := errors.Filter(mc.Get(d.c, lockItems...), mc.ErrCacheMiss); err != nil {
(log.Fields{log.ErrorKey: err}).Debugf(
d.c, "dscache: GetMulti: memcache.Get")
}
p := d.makeFetchPlan(&facts{keys, metas, lockItems, nonce})
if !p.empty() {
// looks like we have something to pull from datastore, and maybe some work
// to save stuff back to memcache.
toCas := []mc.Item{}
j := 0
err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.PropertyMap, err error) error {
i := p.idxMap[j]
toSave := p.toSave[j]
j++
data := []byte(nil)
// true: save entity to memcache
// false: lock entity in memcache forever
shouldSave := true
if err == nil {
p.decoded[i] = pm
if toSave != nil {
data = encodeItemValue(pm)
if len(data) > internalValueSizeLimit {
shouldSave = false
log.Warningf(
d.c, "dscache: encoded entity too big (%d/%d)!",
len(data), internalValueSizeLimit)
}
}
} else {
p.lme.Assign(i, err)
if err != ds.ErrNoSuchEntity {
return nil // aka continue to the next entry
}
}
if toSave != nil {
if shouldSave { // save
mg := metas.GetSingle(i)
expSecs := ds.GetMetaDefault(mg, CacheExpirationMeta, CacheTimeSeconds).(int64)
toSave.SetFlags(uint32(ItemHasData))
toSave.SetExpiration(time.Duration(expSecs) * time.Second)
toSave.SetValue(data)
} else {
// Set a lock with an infinite timeout. No one else should try to
// serialize this item to memcache until something Put/Delete's it.
toSave.SetFlags(uint32(ItemHasLock))
toSave.SetExpiration(0)
toSave.SetValue(nil)
}
toCas = append(toCas, toSave)
}
return nil
})
if err != nil {
return err
}
if len(toCas) > 0 {
// we have entries to save back to memcache.
if err := mc.CompareAndSwap(d.c, toCas...); err != nil {
(log.Fields{log.ErrorKey: err}).Debugf(
d.c, "dscache: GetMulti: memcache.CompareAndSwap")
}
}
}
// finally, run the callback for all of the decoded items and the errors,
// if any.
for i, dec := range p.decoded {
if err := cb(dec, p.lme.GetOne(i)); err != nil {
return err
}
}
return nil
}
func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.TransactionOptions) error {
txnState := dsTxnState{}
err := d.RawInterface.RunInTransaction(func(ctx context.Context) error {
txnState.reset()
err := f(context.WithValue(ctx, &dsTxnCacheKey, &txnState))
if err == nil {
err = txnState.apply(d.supportContext)
}
return err
}, opts)
if err == nil {
txnState.release(d.supportContext)
}
return err
}