blob: 949db3fe60db19a9f780bda8406696f0ef41fdf3 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cfgcache provides a datastore-based cache of individual config files.
//
// It is intended to be used to cache a small number (usually 1) service
// configuration files fetched from the service's config set and used in every
// request.
//
// Configuration files are assumed to be stored in text protobuf encoding.
package cfgcache
import (
"context"
"fmt"
"sync"
"time"
protov1 "github.com/golang/protobuf/proto"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/gae/service/info"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
luciproto "go.chromium.org/luci/common/proto"
"go.chromium.org/luci/config"
"go.chromium.org/luci/config/cfgclient"
"go.chromium.org/luci/config/validation"
"go.chromium.org/luci/server/caching"
)
// Entry describes what service configuration file to fetch and how to
// deserialize and validate it.
//
// Must be defined as a global variable and registered via Register(...)
// to enable the process cache and the config validation hook.
//
// You **must** setup periodical calls to Update to use Get or Fetch. They will
// be returning stale data if Update is not called periodically.
type Entry struct {
// Path is a path within the service config set to fetch.
Path string
// ConfigSet allows to provider custom config set.
//
// If empty, defaults to this service's config set.
ConfigSet string
// Type identifies proto message type with the config file schema.
//
// The actual value will never be used, only its type. All methods will
// return proto.Message of the exact same type.
Type proto.Message
// Validator is called to validate the config correctness.
//
// If nil, Update will just validate the config file matches the protobuf
// message kind identified by Type.
//
// If not nil, the Validator will be called with a deserialized message to
// continue its validation.
//
// See go.chromium.org/luci/config/validation for more details.
Validator func(c *validation.Context, msg proto.Message) error
// Rules is a config validation ruleset to register the validator in.
//
// If nil, the default validation.Rules will be used. This is usually what you
// want outside of unit tests.
Rules *validation.RuleSet
// cacheSlot holds in-process cache of the config.
cacheSlot caching.SlotHandle
// See comments for Fetch.
eagerUpdateOnce sync.Once
eagerUpdateOK bool
}
// Register registers the process cache slot and the validation hook.
//
// Must be called during the init time i.e. when initializing a global variable
// or in a package init() function. Get() will panic if used with an
// unregistered entry.
//
// Panics if called twice. Returns `e` itself.
func Register(e *Entry) *Entry {
if e.cacheSlot.Valid() {
panic("Register has already been called")
}
e.cacheSlot = caching.RegisterCacheSlot()
rules := e.Rules
if rules == nil {
rules = &validation.Rules
}
rules.Add(e.configSet(), e.Path, func(ctx *validation.Context, _, _ string, content []byte) error {
_, err := e.validate(ctx, string(content))
return err
})
return e
}
// Update fetches the freshest config and caches it in the datastore.
//
// **Must** be called periodically and asynchronously (e.g. from a GAE cron job)
// to keep the cache fresh. Get and Fetch will return stale data if Update isn't
// called periodically.
//
// This is a slow operation. Do not put it on hot code paths.
//
// Performs validation before storing the fetched config.
//
// If `meta` is non-nil, it will receive the config metadata.
func (e *Entry) Update(ctx context.Context, meta *config.Meta) (proto.Message, error) {
// Fetch the raw text body from LUCI Config service.
var raw string
var fetchedMeta config.Meta
err := cfgclient.Get(ctx, config.Set(e.configSet()), e.Path, cfgclient.String(&raw), &fetchedMeta)
if err != nil {
return nil, errors.Annotate(err, "failed to fetch %s:%s", e.configSet(), e.Path).Err()
}
// Make sure there are no blocking validation errors. This also deserializes
// the message.
valCtx := validation.Context{Context: ctx}
valCtx.SetFile(e.Path)
msg, err := e.validate(&valCtx, raw)
if err != nil {
return nil, errors.Annotate(err, "failed to perform config validation").Err()
}
if err := valCtx.Finalize(); err != nil {
blocking := err.(*validation.Error).WithSeverity(validation.Blocking)
if blocking != nil {
return nil, errors.Annotate(blocking, "validation errors").Err()
}
}
// Drop out of any namespaces or transactions.
ctx = cleanContext(ctx)
// Quick check if we have it in the datastore already. Useful to skip some
// transactions if Update is called concurrently by many processes (perhaps
// through attemptEagerUpdate).
cur := cachedConfig{ID: e.entityID()}
if datastore.Get(ctx, &cur); err == nil && cur.Meta.Revision == fetchedMeta.Revision {
logging.Infof(ctx, "Cached config %s is up-to-date at rev %q", cur.ID, cur.Meta.Revision)
if meta != nil {
*meta = fetchedMeta
}
return msg, nil
}
// Reserialize it into a binary proto to make sure older/newer client versions
// can safely use the proto message when some fields are added/deleted. Text
// protos do not guarantee that.
blob, err := proto.Marshal(msg)
if err != nil {
return nil, errors.Annotate(err, "failed to reserialize into binary proto").Err()
}
// Update the datastore entry if necessary. Do not just unconditionally
// overwrite it since it will unnecessarily flush memcache dscache entry.
err = datastore.RunInTransaction(ctx, func(ctx context.Context) error {
cur := cachedConfig{ID: e.entityID()}
if err := datastore.Get(ctx, &cur); err != nil && err != datastore.ErrNoSuchEntity {
return err
}
if cur.Meta.Revision == fetchedMeta.Revision {
logging.Infof(ctx, "Cached config %s is up-to-date at rev %q", cur.ID, cur.Meta.Revision)
return nil
}
logging.Infof(ctx, "Updating cached config %s: %q -> %q", cur.ID, cur.Meta.Revision, fetchedMeta.Revision)
return datastore.Put(ctx, &cachedConfig{
ID: cur.ID,
Config: blob,
Meta: fetchedMeta,
})
}, nil)
if err != nil {
return nil, errors.Annotate(err, "failed to update the datastore copy").Err()
}
if meta != nil {
*meta = fetchedMeta
}
return msg, nil
}
// Set overrides the cached config in the datastore.
//
// Primarily intended for tests to mock the cached config in the datastore.
func (e *Entry) Set(ctx context.Context, cfg proto.Message, meta *config.Meta) error {
if cfg.ProtoReflect().Descriptor() != e.Type.ProtoReflect().Descriptor() {
panic(fmt.Sprintf("got %s, want %s", cfg.ProtoReflect().Descriptor(), e.Type.ProtoReflect().Descriptor()))
}
blob, err := proto.Marshal(cfg)
if err != nil {
return err
}
var m config.Meta
if meta != nil {
m = *meta
}
return datastore.Put(cleanContext(ctx), &cachedConfig{
ID: e.entityID(),
Config: blob,
Meta: m,
})
}
// Get returns the cached config.
//
// Note: you **must** setup periodical calls to Update to use Get or Fetch. They
// will be returning stale data if Update is not called periodically.
//
// Uses in-memory cache to avoid hitting datastore all the time. As a result it
// may keep returning a stale config up to 1 min after the Update call. Uses
// Fetch to fetch the config if the in-memory cache is stale, see Fetch doc
// to learn its caveats, they apply to Get as well.
//
// If there's no in-memory cache available in the context, falls back to Fetch.
// This happens in tests that don't call caching.WithEmptyProcessCache to setup
// the cache.
//
// Panics if the entry wasn't registered in the process cache via Register.
//
// If `meta` is non-nil, it will receive the config metadata.
func (e *Entry) Get(ctx context.Context, meta *config.Meta) (proto.Message, error) {
val, err := e.cacheSlot.Fetch(ctx, func(any) (val any, exp time.Duration, err error) {
pc := procCache{}
if pc.Config, err = e.Fetch(ctx, &pc.Meta); err != nil {
return nil, 0, err
}
return &pc, time.Minute, nil
})
switch {
case err == caching.ErrNoProcessCache:
// A fallback useful in unit tests that may not have the process cache
// available. Production environments usually have the cache installed
// by the framework code that initializes the root context.
return e.Fetch(ctx, meta)
case err != nil:
return nil, err
default:
pc := val.(*procCache)
if meta != nil {
*meta = pc.Meta
}
return pc.Config, nil
}
}
// Fetch fetches the config from the datastore cache.
//
// Note: you **must** setup periodical calls to Update to use Get or Fetch. They
// will be returning stale data if Update is not called periodically.
//
// Strongly consistent with Update: calling Fetch right after Update will always
// return the most recent config.
//
// Prefer to use Get if possible.
//
// To simplify deploying code that uses new configs, Fetch will call Update
// itself if it notices there's no cached config in the datastore. To avoid
// overloading LUCI Config, it will do it under the lock and exactly once. If
// this attempt fails, for whatever reason, it will not be retried. Callers of
// Fetch will have to wait until the cache is explicitly updated by Update. If
// you can't risk this situation, deploy code that calls Update before deploying
// code that uses Get or Fetch.
//
// If `meta` is non-nil, it will receive the config metadata.
func (e *Entry) Fetch(ctx context.Context, meta *config.Meta) (proto.Message, error) {
ctx = cleanContext(ctx)
cached := cachedConfig{ID: e.entityID()}
err := datastore.Get(ctx, &cached)
if err == datastore.ErrNoSuchEntity && e.attemptEagerUpdate(ctx) {
err = datastore.Get(ctx, &cached)
}
if err != nil {
return nil, errors.Annotate(err, "failed to fetch cached config").Err()
}
cfg := e.newMessage()
if err := proto.Unmarshal(cached.Config, cfg); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal cached config").Err()
}
if meta != nil {
*meta = cached.Meta
}
return cfg, nil
}
////////////////////////////////////////////////////////////////////////////////
const defaultServiceConfigSet = "services/${appid}"
// cleanContext returns a context with datastore using the default namespace
// and not using transactions.
func cleanContext(ctx context.Context) context.Context {
return datastore.WithoutTransaction(info.MustNamespace(ctx, ""))
}
// cachedConfig holds binary-serialized config and its metadata.
type cachedConfig struct {
_kind string `gae:"$kind,cfgcache.CachedConfig"`
_extra datastore.PropertyMap `gae:"-,extra"`
ID string `gae:"$id"`
Config []byte `gae:",noindex"`
Meta config.Meta `gae:",noindex"`
}
// procCache is stored in the process cache.
type procCache struct {
Config proto.Message
Meta config.Meta
}
// configSet returns overridden ConfigSet or the default.
func (e *Entry) configSet() string {
if e.ConfigSet != "" {
return e.ConfigSet
}
return defaultServiceConfigSet
}
// entityID returns an ID to use for cachedConfig entity.
func (e *Entry) entityID() string {
return fmt.Sprintf("%s:%s", e.configSet(), e.Path)
}
// newMessage creates a new empty proto message of e.Type.
func (e *Entry) newMessage() proto.Message {
return e.Type.ProtoReflect().New().Interface()
}
// validate deserializes the message and passes it through the validator.
func (e *Entry) validate(ctx *validation.Context, content string) (proto.Message, error) {
msg := e.newMessage()
if err := luciproto.UnmarshalTextML(content, protov1.MessageV1(msg)); err != nil {
ctx.Errorf("failed to unmarshal as text proto: %s", err)
return nil, nil
}
if e.Validator != nil {
if err := e.Validator(ctx, msg); err != nil {
return nil, err
}
}
return msg, nil
}
// attemptEagerUpdate is called from Fetch if there's no cached config entry.
//
// Will attempt to update the cache exactly once. Returns true if the update
// happened (now or previously) and finished successfully.
func (e *Entry) attemptEagerUpdate(ctx context.Context) bool {
e.eagerUpdateOnce.Do(func() {
id := e.entityID()
meta := config.Meta{}
logging.Infof(ctx, "Attempting to bootstrap config cache %s", id)
_, err := e.Update(ctx, &meta)
if err != nil {
logging.Errorf(ctx, "Failed to bootstrap config cache %s: %s", id, err)
} else {
logging.Infof(ctx, "Successfully bootstrapped config cache %s at rev %s", id, meta.Revision)
}
e.eagerUpdateOK = err == nil
})
return e.eagerUpdateOK
}