blob: 45857fe477a9ab5f1371fc1cf71835e4cae55a72 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gaeemulation
import (
"context"
"crypto/sha1"
"encoding/hex"
"fmt"
"strings"
"time"
"github.com/gomodule/redigo/redis"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/trace"
"go.chromium.org/luci/gae/filter/dscache"
)
const (
lockPrefix = 'L' // items that hold locks start with this byte
dataPrefix = 'D' // items that hold data start with this byte
// A prefix byte + nonce.
maxLockItemLen = 1 + dscache.NonceBytes
)
// To avoid allocations in Prefix() below.
var dataPrefixBuf = []byte{dataPrefix}
// The script does "compare prefix and swap".
//
// Arguments:
// KEYS[1]: the key to operate on.
// ARGV[1]: the old value to compare to (its first maxLockItemLen bytes).
// ARGV[2]: the new value to write.
// ARGV[3]: expiration time (in sec) of the new value.
var casScript = strings.TrimSpace(fmt.Sprintf(`
if redis.call("GETRANGE", KEYS[1], 0, %d) == ARGV[1] then
return redis.call("SET", KEYS[1], ARGV[2], "EX", ARGV[3])
end
`, maxLockItemLen))
// casScriptSHA1 is SHA1 of `casScript`, to be used with EVALSHA to save on
// a round trip to redis per CAS.
var casScriptSHA1 string
func init() {
dgst := sha1.Sum([]byte(casScript))
casScriptSHA1 = hex.EncodeToString(dgst[:])
}
// redisCache implements dscache.Cache via Redis.
type redisCache struct {
pool *redis.Pool
}
func (c redisCache) do(ctx context.Context, op string, cb func(conn redis.Conn) error) (err error) {
ctx, ts := trace.StartSpan(ctx, "go.chromium.org/luci/server/redisCache."+op)
defer func() { ts.End(err) }()
conn, err := c.pool.GetContext(ctx)
if err != nil {
return errors.Annotate(err, "dscache %s", op).Err()
}
defer conn.Close()
if err = cb(conn); err != nil {
return errors.Annotate(err, "dscache %s", op).Err()
}
return nil
}
func (c redisCache) PutLocks(ctx context.Context, keys []string, timeout time.Duration) error {
if len(keys) == 0 {
return nil
}
return c.do(ctx, "PutLocks", func(conn redis.Conn) error {
for _, key := range keys {
conn.Send("SET", key, []byte{lockPrefix}, "EX", int(timeout.Seconds()))
}
_, err := conn.Do("")
return err
})
}
func (c redisCache) DropLocks(ctx context.Context, keys []string) error {
if len(keys) == 0 {
return nil
}
return c.do(ctx, "DropLocks", func(conn redis.Conn) error {
for _, key := range keys {
conn.Send("DEL", key)
}
_, err := conn.Do("")
return err
})
}
func (c redisCache) TryLockAndFetch(ctx context.Context, keys []string, nonce []byte, timeout time.Duration) ([]dscache.CacheItem, error) {
if len(keys) == 0 {
return nil, nil
}
// Prepopulate the response with nil items which mean "cache miss". It is
// always safe to return them, the dscache will fallback to using datastore
// (without touching the cache in the end).
items := make([]dscache.CacheItem, len(keys))
err := c.do(ctx, "TryLockAndFetch", func(conn redis.Conn) (err error) {
// Send a pipeline of SET NX+GET pairs.
prefixedNonce := append([]byte{lockPrefix}, nonce...)
for _, key := range keys {
if key == "" {
continue
}
conn.Send("SET", key, prefixedNonce, "NX", "EX", int(timeout.Seconds()))
conn.Send("GET", key)
}
conn.Flush()
// Parse replies.
for i, key := range keys {
if key == "" {
continue
}
conn.Receive() // skip the result of "SET", we want "GET"
if body, err := redis.Bytes(conn.Receive()); err == nil {
items[i] = &cacheItem{key: key, body: body}
}
if conn.Err() != nil {
return conn.Err() // the connection is dropped, can't fetch the rest
}
}
return nil
})
return items, err
}
func (c redisCache) CompareAndSwap(ctx context.Context, items []dscache.CacheItem) error {
if len(items) == 0 {
return nil
}
return c.do(ctx, "CompareAndSwap", func(conn redis.Conn) error {
// Preload the script to make sure Redis knows about it. Redis guarantees
// that scripts loaded in a connection survive at least until this
// connection is dropped (in practice they survive until the server is
// restarted or the script cache is manually flushed).
conn.Send("SCRIPT", "LOAD", casScript)
for _, item := range items {
item := item.(*cacheItem)
if item.lock == nil {
panic("dscache violated Cache contract: can CAS only promoted items")
}
conn.Send("EVALSHA",
casScriptSHA1, // the script to execute
1, // number of key-typed arguments (see casScript)
item.key, // the key to operate one
item.lock, // will be compared to what's in the cache right now
item.body, // the new value if comparison succeeds
int(item.exp.Seconds()),
)
}
_, err := conn.Do("")
return err
})
}
////////////////////////////////////////////////////////////////////////////////
type cacheItem struct {
key string
body []byte
lock []byte // set to the previous value of `body` after the promotion
exp time.Duration // set after the promotion
}
func (ci *cacheItem) Key() string {
return ci.key
}
func (ci *cacheItem) Nonce() []byte {
if len(ci.body) > 0 && ci.body[0] == lockPrefix {
return ci.body[1:]
}
return nil
}
func (ci *cacheItem) Data() []byte {
if len(ci.body) > 0 && ci.body[0] == dataPrefix {
return ci.body[1:]
}
return nil
}
func (ci *cacheItem) Prefix() []byte {
return dataPrefixBuf
}
func (ci *cacheItem) PromoteToData(data []byte, exp time.Duration) {
if len(data) == 0 || data[0] != dataPrefix {
panic("dscache violated CacheItem contract: data is not prefixed by Prefix()")
}
ci.promote(data, exp)
}
func (ci *cacheItem) PromoteToIndefiniteLock() {
ci.promote([]byte{lockPrefix}, time.Hour*24*30)
}
func (ci *cacheItem) promote(body []byte, exp time.Duration) {
if ci.lock != nil {
panic("already promoted")
}
if len(ci.body) == 0 || ci.body[0] != lockPrefix {
panic("not a lock item")
}
ci.lock = ci.body
// Note: this should not normally happen, but may happen if some items were
// written with different value of dscache.NonceBytes constant. We need to
// trim it for the casScript that compares only up to maxLockItemLen bytes.
if len(ci.lock) > maxLockItemLen {
ci.lock = ci.lock[:maxLockItemLen]
}
ci.body = body
ci.exp = exp
}