// Copyright 2020 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cfgcache provides a datastore-based cache of individual config files.
// It is intended to be used to cache a small number (usually 1) service
// configuration files fetched from the service's config set and used in every
// request.
// Configuration files are assumed to be stored in text protobuf encoding.
package cfgcache
import (
protov1 ""
luciproto ""
// Entry describes what service configuration file to fetch and how to
// deserialize and validate it.
// Must be defined as a global variable and registered via Register(...)
// to enable the process cache and the config validation hook.
// You **must** setup periodical calls to Update to use Get or Fetch. They will
// be returning stale data if Update is not called periodically.
type Entry struct {
// Path is a path within the service config set to fetch.
Path string
// ConfigSet allows to provider custom config set.
// If empty, defaults to this service's config set.
ConfigSet string
// Type identifies proto message type with the config file schema.
// The actual value will never be used, only its type. All methods will
// return proto.Message of the exact same type.
Type proto.Message
// Validator is called to validate the config correctness.
// If nil, Update will just validate the config file matches the protobuf
// message kind identified by Type.
// If not nil, the Validator will be called with a deserialized message to
// continue its validation.
// See for more details.
Validator func(c *validation.Context, msg proto.Message) error
// Rules is a config validation ruleset to register the validator in.
// If nil, the default validation.Rules will be used. This is usually what you
// want outside of unit tests.
Rules *validation.RuleSet
// cacheSlot holds in-process cache of the config.
cacheSlot caching.SlotHandle
// See comments for Fetch.
eagerUpdateOnce sync.Once
eagerUpdateOK bool
// Register registers the process cache slot and the validation hook.
// Must be called during the init time i.e. when initializing a global variable
// or in a package init() function. Get() will panic if used with an
// unregistered entry.
// Panics if called twice. Returns `e` itself.
func Register(e *Entry) *Entry {
if e.cacheSlot.Valid() {
panic("Register has already been called")
e.cacheSlot = caching.RegisterCacheSlot()
rules := e.Rules
if rules == nil {
rules = &validation.Rules
rules.Add(e.configSet(), e.Path, func(ctx *validation.Context, _, _ string, content []byte) error {
_, err := e.validate(ctx, string(content))
return err
return e
// Update fetches the freshest config and caches it in the datastore.
// **Must** be called periodically and asynchronously (e.g. from a GAE cron job)
// to keep the cache fresh. Get and Fetch will return stale data if Update isn't
// called periodically.
// This is a slow operation. Do not put it on hot code paths.
// Performs validation before storing the fetched config.
// If `meta` is non-nil, it will receive the config metadata.
func (e *Entry) Update(ctx context.Context, meta *config.Meta) (proto.Message, error) {
// Fetch the raw text body from LUCI Config service.
var raw string
var fetchedMeta config.Meta
err := cfgclient.Get(ctx, config.Set(e.configSet()), e.Path, cfgclient.String(&raw), &fetchedMeta)
if err != nil {
return nil, errors.Annotate(err, "failed to fetch %s:%s", e.configSet(), e.Path).Err()
// Make sure there are no blocking validation errors. This also deserializes
// the message.
valCtx := validation.Context{Context: ctx}
msg, err := e.validate(&valCtx, raw)
if err != nil {
return nil, errors.Annotate(err, "failed to perform config validation").Err()
if err := valCtx.Finalize(); err != nil {
blocking := err.(*validation.Error).WithSeverity(validation.Blocking)
if blocking != nil {
return nil, errors.Annotate(blocking, "validation errors").Err()
// Drop out of any namespaces or transactions.
ctx = cleanContext(ctx)
// Quick check if we have it in the datastore already. Useful to skip some
// transactions if Update is called concurrently by many processes (perhaps
// through attemptEagerUpdate).
cur := cachedConfig{ID: e.entityID()}
if datastore.Get(ctx, &cur); err == nil && cur.Meta.Revision == fetchedMeta.Revision {
logging.Infof(ctx, "Cached config %s is up-to-date at rev %q", cur.ID, cur.Meta.Revision)
if meta != nil {
*meta = fetchedMeta
return msg, nil
// Reserialize it into a binary proto to make sure older/newer client versions
// can safely use the proto message when some fields are added/deleted. Text
// protos do not guarantee that.
blob, err := proto.Marshal(msg)
if err != nil {
return nil, errors.Annotate(err, "failed to reserialize into binary proto").Err()
// Update the datastore entry if necessary. Do not just unconditionally
// overwrite it since it will unnecessarily flush memcache dscache entry.
err = datastore.RunInTransaction(ctx, func(ctx context.Context) error {
cur := cachedConfig{ID: e.entityID()}
if err := datastore.Get(ctx, &cur); err != nil && err != datastore.ErrNoSuchEntity {
return err
if cur.Meta.Revision == fetchedMeta.Revision {
logging.Infof(ctx, "Cached config %s is up-to-date at rev %q", cur.ID, cur.Meta.Revision)
return nil
logging.Infof(ctx, "Updating cached config %s: %q -> %q", cur.ID, cur.Meta.Revision, fetchedMeta.Revision)
return datastore.Put(ctx, &cachedConfig{
ID: cur.ID,
Config: blob,
Meta: fetchedMeta,
}, nil)
if err != nil {
return nil, errors.Annotate(err, "failed to update the datastore copy").Err()
if meta != nil {
*meta = fetchedMeta
return msg, nil
// Set overrides the cached config in the datastore.
// Primarily intended for tests to mock the cached config in the datastore.
func (e *Entry) Set(ctx context.Context, cfg proto.Message, meta *config.Meta) error {
if cfg.ProtoReflect().Descriptor() != e.Type.ProtoReflect().Descriptor() {
panic(fmt.Sprintf("got %s, want %s", cfg.ProtoReflect().Descriptor(), e.Type.ProtoReflect().Descriptor()))
blob, err := proto.Marshal(cfg)
if err != nil {
return err
var m config.Meta
if meta != nil {
m = *meta
return datastore.Put(cleanContext(ctx), &cachedConfig{
ID: e.entityID(),
Config: blob,
Meta: m,
// Get returns the cached config.
// Note: you **must** setup periodical calls to Update to use Get or Fetch. They
// will be returning stale data if Update is not called periodically.
// Uses in-memory cache to avoid hitting datastore all the time. As a result it
// may keep returning a stale config up to 1 min after the Update call. Uses
// Fetch to fetch the config if the in-memory cache is stale, see Fetch doc
// to learn its caveats, they apply to Get as well.
// If there's no in-memory cache available in the context, falls back to Fetch.
// This happens in tests that don't call caching.WithEmptyProcessCache to setup
// the cache.
// Panics if the entry wasn't registered in the process cache via Register.
// If `meta` is non-nil, it will receive the config metadata.
func (e *Entry) Get(ctx context.Context, meta *config.Meta) (proto.Message, error) {
val, err := e.cacheSlot.Fetch(ctx, func(any) (val any, exp time.Duration, err error) {
pc := procCache{}
if pc.Config, err = e.Fetch(ctx, &pc.Meta); err != nil {
return nil, 0, err
return &pc, time.Minute, nil
switch {
case err == caching.ErrNoProcessCache:
// A fallback useful in unit tests that may not have the process cache
// available. Production environments usually have the cache installed
// by the framework code that initializes the root context.
return e.Fetch(ctx, meta)
case err != nil:
return nil, err
pc := val.(*procCache)
if meta != nil {
*meta = pc.Meta
return pc.Config, nil
// Fetch fetches the config from the datastore cache.
// Note: you **must** setup periodical calls to Update to use Get or Fetch. They
// will be returning stale data if Update is not called periodically.
// Strongly consistent with Update: calling Fetch right after Update will always
// return the most recent config.
// Prefer to use Get if possible.
// To simplify deploying code that uses new configs, Fetch will call Update
// itself if it notices there's no cached config in the datastore. To avoid
// overloading LUCI Config, it will do it under the lock and exactly once. If
// this attempt fails, for whatever reason, it will not be retried. Callers of
// Fetch will have to wait until the cache is explicitly updated by Update. If
// you can't risk this situation, deploy code that calls Update before deploying
// code that uses Get or Fetch.
// If `meta` is non-nil, it will receive the config metadata.
func (e *Entry) Fetch(ctx context.Context, meta *config.Meta) (proto.Message, error) {
ctx = cleanContext(ctx)
cached := cachedConfig{ID: e.entityID()}
err := datastore.Get(ctx, &cached)
if err == datastore.ErrNoSuchEntity && e.attemptEagerUpdate(ctx) {
err = datastore.Get(ctx, &cached)
if err != nil {
return nil, errors.Annotate(err, "failed to fetch cached config").Err()
cfg := e.newMessage()
if err := proto.Unmarshal(cached.Config, cfg); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal cached config").Err()
if meta != nil {
*meta = cached.Meta
return cfg, nil
const defaultServiceConfigSet = "services/${appid}"
// cleanContext returns a context with datastore using the default namespace
// and not using transactions.
func cleanContext(ctx context.Context) context.Context {
return datastore.WithoutTransaction(info.MustNamespace(ctx, ""))
// cachedConfig holds binary-serialized config and its metadata.
type cachedConfig struct {
_kind string `gae:"$kind,cfgcache.CachedConfig"`
_extra datastore.PropertyMap `gae:"-,extra"`
ID string `gae:"$id"`
Config []byte `gae:",noindex"`
Meta config.Meta `gae:",noindex"`
// procCache is stored in the process cache.
type procCache struct {
Config proto.Message
Meta config.Meta
// configSet returns overridden ConfigSet or the default.
func (e *Entry) configSet() string {
if e.ConfigSet != "" {
return e.ConfigSet
return defaultServiceConfigSet
// entityID returns an ID to use for cachedConfig entity.
func (e *Entry) entityID() string {
return fmt.Sprintf("%s:%s", e.configSet(), e.Path)
// newMessage creates a new empty proto message of e.Type.
func (e *Entry) newMessage() proto.Message {
return e.Type.ProtoReflect().New().Interface()
// validate deserializes the message and passes it through the validator.
func (e *Entry) validate(ctx *validation.Context, content string) (proto.Message, error) {
msg := e.newMessage()
if err := luciproto.UnmarshalTextML(content, protov1.MessageV1(msg)); err != nil {
ctx.Errorf("failed to unmarshal as text proto: %s", err)
return nil, nil
if e.Validator != nil {
if err := e.Validator(ctx, msg); err != nil {
return nil, err
return msg, nil
// attemptEagerUpdate is called from Fetch if there's no cached config entry.
// Will attempt to update the cache exactly once. Returns true if the update
// happened (now or previously) and finished successfully.
func (e *Entry) attemptEagerUpdate(ctx context.Context) bool {
e.eagerUpdateOnce.Do(func() {
id := e.entityID()
meta := config.Meta{}
logging.Infof(ctx, "Attempting to bootstrap config cache %s", id)
_, err := e.Update(ctx, &meta)
if err != nil {
logging.Errorf(ctx, "Failed to bootstrap config cache %s: %s", id, err)
} else {
logging.Infof(ctx, "Successfully bootstrapped config cache %s at rev %s", id, meta.Revision)
e.eagerUpdateOK = err == nil
return e.eagerUpdateOK