blob: 5917526f387103d90f8d3fbba53d9ccba74429a8 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"bytes"
"context"
"os"
"sort"
"sync"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
api "go.chromium.org/luci/cipd/api/cipd/v1"
"go.chromium.org/luci/cipd/client/cipd/fs"
"go.chromium.org/luci/cipd/client/cipd/internal/messages"
"go.chromium.org/luci/cipd/common"
"go.chromium.org/luci/cipd/common/cipderr"
)
const (
tagCacheMaxSize = 300
tagCacheMaxExeSize = 20
tagCacheFilename = "tagcache.db"
)
// TagCache provides a mapping (package name, tag) -> instance ID.
//
// This mapping is safe to cache because tags are not detachable: once a tag is
// successfully resolved to an instance ID it is guaranteed to resolve to same
// instance ID later or not resolve at all (e.g. if one tag is attached to
// multiple instances, in which case the tag is misused anyway). In any case,
// returning a cached instance ID does make sense. The primary purpose of this
// cache is to avoid round trips to the service to increase reliability of
// 'cipd ensure' calls that use only tags to specify versions. It happens to be
// the most common case of 'cipd ensure' usage by far.
//
// Additionally, this TagCache stores a mapping of (pin, file_name) ->
// encode(ObjectRef of extracted file) to assist in the `selfupdate` flow.
//
// Whenever selfupdate resolves what CIPD package instance ID (pin) it SHOULD be
// at, it looks at '(pin, 'cipd') => binary hash' map to figure out what hash
// the client itself SHOULD have for this instance ID. The client then
// calculates the hash of itself to see if it's actually already at that
// instance ID.
type TagCache struct {
fs fs.FileSystem
service string
lock sync.Mutex
cache *messages.TagCache // the last loaded state, if not nil.
addedTags map[tagKey]*messages.TagCache_Entry // entries added by AddTag
addedFiles map[fileKey]*messages.TagCache_FileEntry // entries added by AddExtractedObjectRef
}
type tagKey string
type fileKey string
// makeTagKey constructs key for the TagCache.addedTags map.
//
// We use string math instead of a struct to avoid reimplementing sorting
// interface, ain't nobody got time for that.
func makeTagKey(pkg, tag string) tagKey {
return tagKey(pkg + ":" + tag)
}
// makeFileKey constructs key for the TagCache.addedFiles map.
//
// It is distinct from tagKey structurally, thus uses different type.
func makeFileKey(pkg, instance, file string) fileKey {
return fileKey(pkg + ":" + instance + ":" + file)
}
// NewTagCache initializes TagCache.
//
// fs will be the root of the cache. It will be searched for tagcache.db file.
func NewTagCache(fs fs.FileSystem, service string) *TagCache {
return &TagCache{fs: fs, service: service}
}
func (c *TagCache) lazyLoadLocked(ctx context.Context) (err error) {
// Lazy-load the cache the first time it is used. We reload it again in Save
// right before overwriting the file. We don't reload it anywhere else though,
// so the implementation essentially assumes Save is called relatively soon
// after ResolveTag call. If it's not the case, cache updates made by other
// processes will be "invisible" to the TagCache. It still tries not to
// overwrite them in Save, so it's fine.
if c.cache == nil {
c.cache, err = c.loadFromDisk(ctx, false)
}
return err
}
// ResolveTag returns cached tag or empty Pin{} if such tag is not in the cache.
//
// Returns error if the cache can't be read.
func (c *TagCache) ResolveTag(ctx context.Context, pkg, tag string) (pin common.Pin, err error) {
if err = common.ValidatePackageName(pkg); err != nil {
return pin, err
}
if err = common.ValidateInstanceTag(tag); err != nil {
return pin, err
}
c.lock.Lock()
defer c.lock.Unlock()
// Already added with AddTag recently?
if e := c.addedTags[makeTagKey(pkg, tag)]; e != nil {
return common.Pin{
PackageName: e.Package,
InstanceID: e.InstanceId,
}, nil
}
if err = c.lazyLoadLocked(ctx); err != nil {
return pin, err
}
// Most recently used tags are usually at the end, search in reverse as a
// silly optimization.
for i := len(c.cache.Entries) - 1; i >= 0; i-- {
e := c.cache.Entries[i]
if e.Package == pkg && e.Tag == tag {
return common.Pin{
PackageName: pkg,
InstanceID: e.InstanceId,
}, nil
}
}
return pin, err
}
// ResolveExtractedObjectRef returns ObjectRef or nil if that file is not in the
// cache.
//
// Returns error if the cache can't be read.
func (c *TagCache) ResolveExtractedObjectRef(ctx context.Context, pin common.Pin, fileName string) (*api.ObjectRef, error) {
if err := common.ValidatePin(pin, common.AnyHash); err != nil {
return nil, err
}
ignoreBrokenObjectRef := func(iid string) *api.ObjectRef {
if err := common.ValidateInstanceID(iid, common.AnyHash); err != nil {
logging.Errorf(ctx, "Stored object_ref %q for %q in %s is invalid, ignoring it: %s", iid, fileName, pin, err)
return nil
}
return common.InstanceIDToObjectRef(iid)
}
c.lock.Lock()
defer c.lock.Unlock()
key := makeFileKey(pin.PackageName, pin.InstanceID, fileName)
if e := c.addedFiles[key]; e != nil {
return ignoreBrokenObjectRef(e.ObjectRef), nil
}
if err := c.lazyLoadLocked(ctx); err != nil {
return nil, err
}
// Most recently used entries are usually at the end, search in reverse as a
// silly optimization.
for i := len(c.cache.FileEntries) - 1; i >= 0; i-- {
e := c.cache.FileEntries[i]
if e.Package == pin.PackageName && e.InstanceId == pin.InstanceID && e.FileName == fileName {
return ignoreBrokenObjectRef(e.ObjectRef), nil
}
}
return nil, nil
}
// AddTag records that (pin.PackageName, tag) maps to pin.InstanceID.
//
// Call 'Save' later to persist these changes to the cache file on disk.
func (c *TagCache) AddTag(ctx context.Context, pin common.Pin, tag string) error {
if err := common.ValidatePin(pin, common.AnyHash); err != nil {
return err
}
if err := common.ValidateInstanceTag(tag); err != nil {
return err
}
c.lock.Lock()
defer c.lock.Unlock()
if c.addedTags == nil {
c.addedTags = make(map[tagKey]*messages.TagCache_Entry, 1)
}
// 'Save' will merge this into 'c.cache' before dumping to disk.
c.addedTags[makeTagKey(pin.PackageName, tag)] = &messages.TagCache_Entry{
Service: c.service,
Package: pin.PackageName,
Tag: tag,
InstanceId: pin.InstanceID,
}
return nil
}
// AddExtractedObjectRef records that fileName extracted from the package at
// the given pin has the given hash.
//
// The hash is represented as ObjectRef, which is a tuple (hash algo, hex
// digest).
//
// Call 'Save' later to persist these changes to the cache file on disk.
func (c *TagCache) AddExtractedObjectRef(ctx context.Context, pin common.Pin, fileName string, ref *api.ObjectRef) error {
if err := common.ValidatePin(pin, common.AnyHash); err != nil {
return err
}
if err := common.ValidateObjectRef(ref, common.AnyHash); err != nil {
return err
}
c.lock.Lock()
defer c.lock.Unlock()
if c.addedFiles == nil {
c.addedFiles = make(map[fileKey]*messages.TagCache_FileEntry)
}
c.addedFiles[makeFileKey(pin.PackageName, pin.InstanceID, fileName)] = &messages.TagCache_FileEntry{
Service: c.service,
Package: pin.PackageName,
InstanceId: pin.InstanceID,
FileName: fileName,
ObjectRef: common.ObjectRefToInstanceID(ref),
}
return nil
}
// Save stores all pending cache updates to the file system.
//
// It effectively resets the object to the initial state.
func (c *TagCache) Save(ctx context.Context) error {
c.lock.Lock()
defer c.lock.Unlock()
// Nothing to store? Just clean the state, so that ResolveTag can fetch the
// up-to-date cache from disk later if needed.
if len(c.addedTags) == 0 && len(c.addedFiles) == 0 {
c.cache = nil
return nil
}
// Sort all new entries, for consistency.
sortedTags := make([]string, 0, len(c.addedTags))
for k := range c.addedTags {
sortedTags = append(sortedTags, string(k))
}
sort.Strings(sortedTags)
sortedFiles := make([]string, 0, len(c.addedFiles))
for k := range c.addedFiles {
sortedFiles = append(sortedFiles, string(k))
}
sort.Strings(sortedFiles)
// Load the most recent data to avoid overwriting it. Load ALL entries, even
// if they belong to different service: we have one global cache file and
// should preserve all entries there.
recent, err := c.loadFromDisk(ctx, true)
if err != nil {
return err
}
// Copy existing entries, except the ones we are moving to the tail. Carefully
// copy entries belonging to other services too, we must not overwrite them.
mergedTags := make([]*messages.TagCache_Entry, 0, len(recent.Entries)+len(c.addedTags))
for _, e := range recent.Entries {
key := makeTagKey(e.Package, e.Tag)
if e.Service != c.service || c.addedTags[key] == nil {
mergedTags = append(mergedTags, e)
}
}
// Add new entries to the tail.
for _, k := range sortedTags {
mergedTags = append(mergedTags, c.addedTags[tagKey(k)])
}
// Trim the end result, discard the head: it's where old items are.
if len(mergedTags) > tagCacheMaxSize {
mergedTags = mergedTags[len(mergedTags)-tagCacheMaxSize:]
}
// Do the same for file entries.
mergedFiles := make([]*messages.TagCache_FileEntry, 0, len(recent.FileEntries)+len(c.addedFiles))
for _, e := range recent.FileEntries {
key := makeFileKey(e.Package, e.InstanceId, e.FileName)
if e.Service != c.service || c.addedFiles[key] == nil {
mergedFiles = append(mergedFiles, e)
}
}
for _, k := range sortedFiles {
mergedFiles = append(mergedFiles, c.addedFiles[fileKey(k)])
}
if len(mergedFiles) > tagCacheMaxExeSize {
mergedFiles = mergedFiles[len(mergedFiles)-tagCacheMaxExeSize:]
}
// Serialize and write to disk. We still can accidentally replace someone
// else's changes, but the probability should be relatively low. It can happen
// only if two processes call 'Save' at the exact same time.
updated := &messages.TagCache{Entries: mergedTags, FileEntries: mergedFiles}
if err := c.dumpToDisk(ctx, updated); err != nil {
return err
}
// The state is persisted now.
c.cache = updated
c.addedTags = nil
c.addedFiles = nil
return nil
}
// loadFromDisk loads and parses the tag cache file.
//
// If 'allService' is true, returns all cache entries (regardless of what
// service they correspond to), otherwise filters out all entries that don't
// match c.service.
//
// Returns empty struct if the file is missing or corrupted. Returns errors if
// the file can't be read.
func (c *TagCache) loadFromDisk(ctx context.Context, allServices bool) (*messages.TagCache, error) {
path, err := c.fs.RootRelToAbs(tagCacheFilename)
if err != nil {
return nil, errors.Annotate(err, "bad tag cache path").Tag(cipderr.BadArgument).Err()
}
blob, err := os.ReadFile(path)
switch {
case os.IsNotExist(err):
return &messages.TagCache{}, nil
case err != nil:
return nil, errors.Annotate(err, "reading tag cache").Tag(cipderr.IO).Err()
}
// Just ignore the corrupted cache file.
cache := messages.TagCache{}
if err := UnmarshalWithSHA256(blob, &cache); err != nil {
logging.Warningf(ctx, "Can't deserialize tag cache: %s", err)
return &messages.TagCache{}, nil
}
// Throw away all entries with empty Service. They are not longer valid.
// Also apply 'allServices' filter.
filtered := &messages.TagCache{}
for _, e := range cache.Entries {
if e.Service != "" && (e.Service == c.service || allServices) {
filtered.Entries = append(filtered.Entries, e)
}
}
for _, e := range cache.FileEntries {
if e.Service != "" && (e.Service == c.service || allServices) {
filtered.FileEntries = append(filtered.FileEntries, e)
}
}
return filtered, nil
}
// dumpToDisk serializes the tag cache and writes it to the file system.
func (c *TagCache) dumpToDisk(ctx context.Context, msg *messages.TagCache) error {
path, err := c.fs.RootRelToAbs(tagCacheFilename)
if err != nil {
return errors.Annotate(err, "bad tag cache path").Tag(cipderr.BadArgument).Err()
}
blob, err := MarshalWithSHA256(msg)
if err != nil {
return errors.Annotate(err, "serializing tag cache").Tag(cipderr.BadArgument).Err()
}
if err := fs.EnsureFile(ctx, c.fs, path, bytes.NewReader(blob)); err != nil {
return errors.Annotate(err, "writing tag cache").Tag(cipderr.IO).Err()
}
return nil
}