| // Copyright 2017 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package policy |
| |
| import ( |
| "context" |
| "crypto/sha256" |
| "encoding/hex" |
| "time" |
| |
| "google.golang.org/protobuf/proto" |
| |
| "go.chromium.org/luci/common/data/caching/lazyslot" |
| "go.chromium.org/luci/common/data/rand/mathrand" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/config/validation" |
| ) |
| |
| // ErrNoPolicy is returned by Queryable(...) if a policy is not yet available. |
| // |
| // This happens when the service is deployed for the first time and policy |
| // configs aren't fetched yet. This error will not show up if ImportConfigs |
| // succeeded at least once. |
| var ErrNoPolicy = errors.New("policy config is not imported yet") |
| |
| // Policy describes how to fetch, store and parse policy documents. |
| // |
| // This is a singleton-like object that should be shared by multiple requests. |
| // |
| // Each instance corresponds to one kind of a policy and it keeps a Queryable |
| // form if the corresponding policy cached in local memory, occasionally |
| // updating it based on the configs stored in the datastore (that are in turn |
| // periodically updated from a cron). |
| type Policy struct { |
| // Name defines the name of the policy, e.g. "delegation rules". |
| // |
| // It is used in datastore IDs and for logging. |
| Name string |
| |
| // Fetch fetches and parses all relevant text proto files. |
| // |
| // This is a user-supplied callback. |
| // |
| // Called from cron when ingesting new configs. It must return either a non |
| // empty bundle with configs or an error. |
| Fetch func(c context.Context, f ConfigFetcher) (ConfigBundle, error) |
| |
| // Validate verifies the fetched config files are semantically valid. |
| // |
| // This is a user-supplied callback. Must be a pure function. |
| // |
| // Reports all errors through the given validation.Context object. The config |
| // is considered valid if there are no errors reported. A valid config must be |
| // accepted by Prepare without errors. |
| // |
| // Called from cron when ingesting new configs. |
| Validate func(v *validation.Context, cfg ConfigBundle) |
| |
| // Prepare converts validated configs into an optimized queryable form. |
| // |
| // This is a user-supplied callback. Must be a pure function. |
| // |
| // The result of the processing is cached in local instance memory for 1 min. |
| // It is supposed to be a read-only object, optimized for performing queries |
| // over it. |
| // |
| // Users of Policy should type-cast it to an appropriate type. |
| Prepare func(c context.Context, cfg ConfigBundle, revision string) (Queryable, error) |
| |
| cache lazyslot.Slot // holds and updates in-memory cache of Queryable |
| } |
| |
| // Queryable is validated and parsed configs in a form optimized for queries. |
| // |
| // This object is shared between multiple requests and kept in memory for as |
| // long as it still matches the current config. |
| type Queryable interface { |
| // ConfigRevision returns the revision passed to Policy.Prepare. |
| // |
| // It is a revision of configs used to construct this object. Used for |
| // logging. |
| ConfigRevision() string |
| } |
| |
| // ConfigFetcher hides details of interaction with LUCI Config. |
| // |
| // Passed to Fetch callback. |
| type ConfigFetcher interface { |
| // FetchTextProto fetches text-serialized protobuf message at a given path. |
| // |
| // The path is relative to the token server config set root in LUCI config. |
| // |
| // On success returns nil and fills in 'out' (which should be a pointer to |
| // a concrete proto message class). May return transient error (e.g. timeouts) |
| // and fatal ones (e.g. bad proto file). |
| FetchTextProto(c context.Context, path string, out proto.Message) error |
| } |
| |
| // ImportConfigs updates configs stored in the datastore. |
| // |
| // Is should be periodically called from a cron. |
| // |
| // Returns the revision of the configs that are now in the datastore. It's |
| // either the imported revision, if configs change, or a previously known |
| // revision, if configs at HEAD are same. |
| // |
| // Validation errors are returned as *validation.Error struct. Use type cast to |
| // sniff them, if necessary. |
| func (p *Policy) ImportConfigs(c context.Context) (rev string, err error) { |
| c = logging.SetField(c, "policy", p.Name) |
| |
| // Fetch and parse text protos stored in LUCI config. The fetcher will also |
| // record the revision of the fetched files. |
| fetcher := luciConfigFetcher{} |
| bundle, err := p.Fetch(c, &fetcher) |
| if err == nil && len(bundle) == 0 { |
| err = errors.New("no configs fetched by the callback") |
| } |
| if err != nil { |
| return "", errors.Annotate(err, "failed to fetch policy configs").Err() |
| } |
| rev = fetcher.Revision() |
| |
| // Convert configs into a form appropriate for the datastore. We'll skip the |
| // rest of the import if this exact blob is already in the datastore (based on |
| // SHA256 digest). |
| cfgBlob, err := serializeBundle(bundle) |
| if err != nil { |
| return "", errors.Annotate(err, "failed to serialize the configs").Err() |
| } |
| digest := sha256.Sum256(cfgBlob) |
| digestHex := hex.EncodeToString(digest[:]) |
| logging.Infof(c, "Got %d bytes of configs (SHA256 %s)", len(cfgBlob), digestHex) |
| |
| // Do we have it already? |
| existingHdr, err := getImportedPolicyHeader(c, p.Name) |
| if err != nil { |
| return "", errors.Annotate(err, "failed to grab ImportedPolicyHeader").Err() |
| } |
| if existingHdr != nil && digestHex == existingHdr.SHA256 { |
| logging.Infof( |
| c, "Configs are up-to-date. Last changed at rev %s, last checked rev is %s.", |
| existingHdr.Revision, rev) |
| return existingHdr.Revision, nil |
| } |
| |
| existingRev := "(nil)" |
| if existingHdr != nil { |
| existingRev = existingHdr.Revision |
| } |
| logging.Infof(c, "Policy config changed: %s -> %s", existingRev, rev) |
| |
| if p.Validate != nil { |
| ctx := &validation.Context{Context: c} |
| p.Validate(ctx, bundle) |
| if err := ctx.Finalize(); err != nil { |
| return "", errors.Annotate(err, "configs at rev %s are invalid", rev).Err() |
| } |
| } |
| |
| // Double check that they actually can be parsed into a queryable form. If |
| // not, the Policy callbacks are buggy. |
| queriable, err := p.Prepare(c, bundle, rev) |
| if err == nil && queriable.ConfigRevision() != rev { |
| err = errors.New("wrong revision in result of Prepare callback") |
| } |
| if err != nil { |
| return "", errors.Annotate(err, "failed to convert configs into a queryable form").Err() |
| } |
| |
| logging.Infof(c, "Storing new configs") |
| if err := updateImportedPolicy(c, p.Name, rev, digestHex, cfgBlob); err != nil { |
| return "", err |
| } |
| |
| return rev, nil |
| } |
| |
| // Queryable returns a form of the policy document optimized for queries. |
| // |
| // This is hot function called from each RPC handler. It uses local in-memory |
| // cache to store the configs, synchronizing it with the state stored in the |
| // datastore once a minute. |
| // |
| // Returns ErrNoPolicy if the policy config wasn't imported yet. |
| func (p *Policy) Queryable(c context.Context) (Queryable, error) { |
| val, err := p.cache.Get(c, func(prev any) (newQ any, exp time.Duration, err error) { |
| prevQ, _ := prev.(Queryable) |
| newQ, err = p.grabQueryable(c, prevQ) |
| if err == nil { |
| exp = cacheExpiry(c) |
| } |
| return |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return val.(Queryable), nil |
| } |
| |
| // grabQueryable is called whenever cached Queryable in p.cache expires. |
| func (p *Policy) grabQueryable(c context.Context, prevQ Queryable) (Queryable, error) { |
| c = logging.SetField(c, "policy", p.Name) |
| |
| logging.Infof(c, "Checking version of the policy in the datastore") |
| hdr, err := getImportedPolicyHeader(c, p.Name) |
| switch { |
| case err != nil: |
| return nil, errors.Annotate(err, "failed to fetch importedPolicyHeader entity").Err() |
| case hdr == nil: |
| return nil, ErrNoPolicy |
| } |
| |
| // Reuse existing Queryable object if configs didn't change. |
| if prevQ != nil && prevQ.ConfigRevision() == hdr.Revision { |
| return prevQ, nil |
| } |
| |
| // Fetch new configs. |
| logging.Infof(c, "Fetching policy configs from the datastore") |
| body, err := getImportedPolicyBody(c, p.Name) |
| switch { |
| case err != nil: |
| return nil, errors.Annotate(err, "failed to fetch importedPolicyBody entity").Err() |
| case body == nil: // this is rare, the body shouldn't disappear |
| logging.Errorf(c, "The policy body is unexpectedly gone") |
| return nil, ErrNoPolicy |
| } |
| |
| // An error here and below can happen if previously validated config is no |
| // longer valid (e.g. if the service code is updated and new code doesn't like |
| // the stored config anymore). |
| // |
| // If this check fails, the service is effectively offline until configs are |
| // updated. Presumably, it is better than silently using no longer valid |
| // config. |
| logging.Infof(c, "Using configs at rev %s", body.Revision) |
| configs, unknown, err := deserializeBundle(body.Data) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to deserialize cached configs").Err() |
| } |
| if len(unknown) != 0 { |
| for _, cfg := range unknown { |
| logging.Errorf(c, "Unknown proto type %q in cached config %q", cfg.Kind, cfg.Path) |
| } |
| return nil, errors.New("failed to deserialize some cached configs") |
| } |
| queryable, err := p.Prepare(c, configs, body.Revision) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to process cached configs").Err() |
| } |
| |
| return queryable, nil |
| } |
| |
| // cacheExpiry returns a random duration from [4 min, 5 min). |
| // |
| // It's used to define when to refresh in-memory Queryable cache. We randomize |
| // it to desynchronize cache updates of different Policy instances. |
| func cacheExpiry(c context.Context) time.Duration { |
| rnd := time.Duration(mathrand.Int63n(c, int64(time.Minute))) |
| return 4*time.Minute + rnd |
| } |