| // Copyright 2020 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package gaeemulation |
| |
| import ( |
| "context" |
| "crypto/sha1" |
| "encoding/hex" |
| "fmt" |
| "strings" |
| "time" |
| |
| "github.com/gomodule/redigo/redis" |
| |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/trace" |
| "go.chromium.org/luci/gae/filter/dscache" |
| ) |
| |
| const ( |
| lockPrefix = 'L' // items that hold locks start with this byte |
| dataPrefix = 'D' // items that hold data start with this byte |
| |
| // A prefix byte + nonce. |
| maxLockItemLen = 1 + dscache.NonceBytes |
| ) |
| |
| // To avoid allocations in Prefix() below. |
| var dataPrefixBuf = []byte{dataPrefix} |
| |
| // The script does "compare prefix and swap". |
| // |
| // Arguments: |
| // KEYS[1]: the key to operate on. |
| // ARGV[1]: the old value to compare to (its first maxLockItemLen bytes). |
| // ARGV[2]: the new value to write. |
| // ARGV[3]: expiration time (in sec) of the new value. |
| var casScript = strings.TrimSpace(fmt.Sprintf(` |
| if redis.call("GETRANGE", KEYS[1], 0, %d) == ARGV[1] then |
| return redis.call("SET", KEYS[1], ARGV[2], "EX", ARGV[3]) |
| end |
| `, maxLockItemLen)) |
| |
| // casScriptSHA1 is SHA1 of `casScript`, to be used with EVALSHA to save on |
| // a round trip to redis per CAS. |
| var casScriptSHA1 string |
| |
| func init() { |
| dgst := sha1.Sum([]byte(casScript)) |
| casScriptSHA1 = hex.EncodeToString(dgst[:]) |
| } |
| |
| // redisCache implements dscache.Cache via Redis. |
| type redisCache struct { |
| pool *redis.Pool |
| } |
| |
| func (c redisCache) do(ctx context.Context, op string, cb func(conn redis.Conn) error) (err error) { |
| ctx, ts := trace.StartSpan(ctx, "go.chromium.org/luci/server/redisCache."+op) |
| defer func() { ts.End(err) }() |
| |
| conn, err := c.pool.GetContext(ctx) |
| if err != nil { |
| return errors.Annotate(err, "dscache %s", op).Err() |
| } |
| defer conn.Close() |
| |
| if err = cb(conn); err != nil { |
| return errors.Annotate(err, "dscache %s", op).Err() |
| } |
| return nil |
| } |
| |
| func (c redisCache) PutLocks(ctx context.Context, keys []string, timeout time.Duration) error { |
| if len(keys) == 0 { |
| return nil |
| } |
| return c.do(ctx, "PutLocks", func(conn redis.Conn) error { |
| for _, key := range keys { |
| conn.Send("SET", key, []byte{lockPrefix}, "EX", int(timeout.Seconds())) |
| } |
| _, err := conn.Do("") |
| return err |
| }) |
| } |
| |
| func (c redisCache) DropLocks(ctx context.Context, keys []string) error { |
| if len(keys) == 0 { |
| return nil |
| } |
| return c.do(ctx, "DropLocks", func(conn redis.Conn) error { |
| for _, key := range keys { |
| conn.Send("DEL", key) |
| } |
| _, err := conn.Do("") |
| return err |
| }) |
| } |
| |
| func (c redisCache) TryLockAndFetch(ctx context.Context, keys []string, nonce []byte, timeout time.Duration) ([]dscache.CacheItem, error) { |
| if len(keys) == 0 { |
| return nil, nil |
| } |
| |
| // Prepopulate the response with nil items which mean "cache miss". It is |
| // always safe to return them, the dscache will fallback to using datastore |
| // (without touching the cache in the end). |
| items := make([]dscache.CacheItem, len(keys)) |
| |
| err := c.do(ctx, "TryLockAndFetch", func(conn redis.Conn) (err error) { |
| // Send a pipeline of SET NX+GET pairs. |
| prefixedNonce := append([]byte{lockPrefix}, nonce...) |
| for _, key := range keys { |
| if key == "" { |
| continue |
| } |
| conn.Send("SET", key, prefixedNonce, "NX", "EX", int(timeout.Seconds())) |
| conn.Send("GET", key) |
| } |
| conn.Flush() |
| |
| // Parse replies. |
| for i, key := range keys { |
| if key == "" { |
| continue |
| } |
| conn.Receive() // skip the result of "SET", we want "GET" |
| if body, err := redis.Bytes(conn.Receive()); err == nil { |
| items[i] = &cacheItem{key: key, body: body} |
| } |
| if conn.Err() != nil { |
| return conn.Err() // the connection is dropped, can't fetch the rest |
| } |
| } |
| return nil |
| }) |
| |
| return items, err |
| } |
| |
| func (c redisCache) CompareAndSwap(ctx context.Context, items []dscache.CacheItem) error { |
| if len(items) == 0 { |
| return nil |
| } |
| |
| return c.do(ctx, "CompareAndSwap", func(conn redis.Conn) error { |
| // Preload the script to make sure Redis knows about it. Redis guarantees |
| // that scripts loaded in a connection survive at least until this |
| // connection is dropped (in practice they survive until the server is |
| // restarted or the script cache is manually flushed). |
| conn.Send("SCRIPT", "LOAD", casScript) |
| |
| for _, item := range items { |
| item := item.(*cacheItem) |
| if item.lock == nil { |
| panic("dscache violated Cache contract: can CAS only promoted items") |
| } |
| conn.Send("EVALSHA", |
| casScriptSHA1, // the script to execute |
| 1, // number of key-typed arguments (see casScript) |
| item.key, // the key to operate one |
| item.lock, // will be compared to what's in the cache right now |
| item.body, // the new value if comparison succeeds |
| int(item.exp.Seconds()), |
| ) |
| } |
| |
| _, err := conn.Do("") |
| return err |
| }) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| type cacheItem struct { |
| key string |
| body []byte |
| |
| lock []byte // set to the previous value of `body` after the promotion |
| exp time.Duration // set after the promotion |
| } |
| |
| func (ci *cacheItem) Key() string { |
| return ci.key |
| } |
| |
| func (ci *cacheItem) Nonce() []byte { |
| if len(ci.body) > 0 && ci.body[0] == lockPrefix { |
| return ci.body[1:] |
| } |
| return nil |
| } |
| |
| func (ci *cacheItem) Data() []byte { |
| if len(ci.body) > 0 && ci.body[0] == dataPrefix { |
| return ci.body[1:] |
| } |
| return nil |
| } |
| |
| func (ci *cacheItem) Prefix() []byte { |
| return dataPrefixBuf |
| } |
| |
| func (ci *cacheItem) PromoteToData(data []byte, exp time.Duration) { |
| if len(data) == 0 || data[0] != dataPrefix { |
| panic("dscache violated CacheItem contract: data is not prefixed by Prefix()") |
| } |
| ci.promote(data, exp) |
| } |
| |
| func (ci *cacheItem) PromoteToIndefiniteLock() { |
| ci.promote([]byte{lockPrefix}, time.Hour*24*30) |
| } |
| |
| func (ci *cacheItem) promote(body []byte, exp time.Duration) { |
| if ci.lock != nil { |
| panic("already promoted") |
| } |
| if len(ci.body) == 0 || ci.body[0] != lockPrefix { |
| panic("not a lock item") |
| } |
| ci.lock = ci.body |
| // Note: this should not normally happen, but may happen if some items were |
| // written with different value of dscache.NonceBytes constant. We need to |
| // trim it for the casScript that compares only up to maxLockItemLen bytes. |
| if len(ci.lock) > maxLockItemLen { |
| ci.lock = ci.lock[:maxLockItemLen] |
| } |
| ci.body = body |
| ci.exp = exp |
| } |