blob: fbabf4117ff435c35b939bd6a10ece1b7b072746 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config
import (
"context"
"sync"
"github.com/golang/protobuf/proto"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
luciproto "go.chromium.org/luci/common/proto"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/config"
"go.chromium.org/luci/config/cfgclient"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/gae/service/info"
"go.chromium.org/luci/logdog/api/config/svcconfig"
)
const (
serviceConfigKind = "logdog.ServiceConfig"
projectConfigKind = "logdog.ProjectConfig"
serviceConfigPath = "services.cfg"
)
type cachedConfig struct {
Kind string `gae:"$kind"` // logdog.ServiceConfig or logdog.ProjectConfig
ID string `gae:"$id"` // e.g. "services.cfg" or "chromium"
Config []byte `gae:",noindex"` // binary-serialized proto
Meta config.Meta `gae:",noindex"` // mostly for Revision
_extra datastore.PropertyMap `gae:"-,extra"`
}
func (c *cachedConfig) update(cfg *config.Config, typ proto.Message) error {
if err := luciproto.UnmarshalTextML(cfg.Content, typ); err != nil {
return err
}
blob, err := proto.Marshal(typ)
if err != nil {
return err
}
c.Config = blob
c.Meta = cfg.Meta
return nil
}
// Sync makes sure configs in the datastore match what's in LUCI Config.
//
// Performs as much work as possible (even in presence of errors). Logs errors
// inside in addition to returning them as a single errors.MultiError.
func Sync(ctx context.Context) error {
wg := sync.WaitGroup{}
wg.Add(2)
var prjErr errors.MultiError
go func() {
defer wg.Done()
if prjErr = syncProjectConfigs(ctx); prjErr != nil {
logging.Errorf(ctx, "Failed to sync project configs: %s", prjErr)
}
}()
var svcErr error
go func() {
defer wg.Done()
if svcErr = syncServiceConfig(ctx); svcErr != nil {
logging.Errorf(ctx, "Failed to sync services.cfg: %s", svcErr)
}
}()
wg.Wait()
// Reuse projErr as our final multierror.
if svcErr != nil {
prjErr = append(prjErr, svcErr)
}
if len(prjErr) != 0 {
return prjErr
}
return nil
}
func syncProjectConfigs(ctx context.Context) (merr errors.MultiError) {
wg := sync.WaitGroup{}
wg.Add(2)
// Fetch freshest configs from the LUCI Config.
var freshMap map[string]config.Config
var freshErr error
go func() {
defer wg.Done()
var fresh []config.Config
fresh, freshErr = cfgclient.Client(ctx).GetProjectConfigs(ctx, "${appid}.cfg", false)
if freshErr == nil {
freshMap = make(map[string]config.Config, len(fresh))
for _, cfg := range fresh {
if p := cfg.ConfigSet.Project(); p != "" {
freshMap[p] = cfg
}
}
}
}()
// Grab existing configs in the datastore.
var existingMap map[string]*cachedConfig
var existingErr error
go func() {
defer wg.Done()
var existing []*cachedConfig
existingErr = datastore.GetAll(ctx, datastore.NewQuery(projectConfigKind), &existing)
if existingErr == nil {
existingMap = make(map[string]*cachedConfig, len(existing))
for _, ent := range existing {
existingMap[ent.ID] = ent
}
}
}()
wg.Wait()
if freshErr != nil {
merr = append(merr, errors.Annotate(freshErr, "failed to fetch project configs from LUCI Config").Err())
}
if existingErr != nil {
merr = append(merr, errors.Annotate(existingErr, "failed to fetch cached project configs from datastore").Err())
}
if len(merr) != 0 {
return merr
}
// Create new and update existing entities.
var toPut []*cachedConfig
for projectID, cfg := range freshMap {
ent, ok := existingMap[projectID]
if !ok {
ent = &cachedConfig{
Kind: projectConfigKind,
ID: projectID,
}
}
if ent.Meta.Revision == cfg.Meta.Revision {
logging.Infof(ctx, "Project config of %q is up-to-date at rev %s", projectID, cfg.Meta.Revision)
continue
}
logging.Infof(ctx, "Updating project config of %q: %s => %s", projectID, ent.Meta.Revision, cfg.Meta.Revision)
if err := ent.update(&cfg, &svcconfig.ProjectConfig{}); err != nil {
logging.Errorf(ctx, "Skipping bad project config %q at rev %s: %s", projectID, cfg.Meta.Revision, err)
merr = append(merr, errors.Annotate(err, "bad project config %q at rev %s", projectID, cfg.Meta.Revision).Err())
} else {
toPut = append(toPut, ent)
}
}
if err := datastore.Put(ctx, toPut); err != nil {
merr = append(merr, errors.Annotate(err, "failed to update some project configs in datastore").Err())
}
// Delete stale entities.
var toDelete []*datastore.Key
for projectID, ent := range existingMap {
if _, ok := freshMap[projectID]; !ok {
toDelete = append(toDelete, datastore.KeyForObj(ctx, ent))
}
}
if err := datastore.Delete(ctx, toDelete); err != nil {
merr = append(merr, errors.Annotate(err, "failed to delete some stale project configs").Err())
}
return merr
}
func syncServiceConfig(ctx context.Context) error {
cfg, err := cfgclient.Client(ctx).GetConfig(ctx, "services/${appid}", serviceConfigPath, false)
if err != nil {
return errors.Annotate(err, "failed to fetch service config from LUCI Config").Err()
}
ent := cachedConfig{
Kind: serviceConfigKind,
ID: serviceConfigPath,
}
if err := datastore.Get(ctx, &ent); err != nil && err != datastore.ErrNoSuchEntity {
return errors.Annotate(err, "failed to fetch service config from datastore").Err()
}
if ent.Meta.Revision == cfg.Meta.Revision {
logging.Infof(ctx, "Service config %q is up-to-date at rev %s", serviceConfigPath, cfg.Meta.Revision)
return nil
}
logging.Infof(ctx, "Updating service config %q: %s => %s", serviceConfigPath, ent.Meta.Revision, cfg.Meta.Revision)
if err := ent.update(cfg, &svcconfig.Config{}); err != nil {
return errors.Annotate(err, "bad service config at rev %s", cfg.Meta.Revision).Err()
}
if err := datastore.Put(ctx, &ent); err != nil {
return errors.Annotate(err, "failed to store updated config").Err()
}
return nil
}
// fromDatastore fetches the config from datastore and deserializes it.
//
// Returns datastore.ErrNoSuchEntity if the config doesn't exist, a transient
// error if the operation fails or a fatal error if the proto can't be
// deserialized.
func fromDatastore(ctx context.Context, kind, id string, msg proto.Message) error {
ctx = datastore.WithoutTransaction(info.MustNamespace(ctx, ""))
ent := cachedConfig{Kind: kind, ID: id}
switch err := datastore.Get(ctx, &ent); {
case err == datastore.ErrNoSuchEntity:
return err
case err != nil:
return errors.Annotate(err, "failed to access datastore").Tag(transient.Tag).Err()
default:
return proto.Unmarshal(ent.Config, msg)
}
}