blob: 6d8640a9845aeb1829717b19a7eb29abd2ff5b4b [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"context"
"sort"
"time"
"go.opentelemetry.io/otel/attribute"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/analysis/internal/clustering/rules"
"go.chromium.org/luci/analysis/internal/clustering/rules/lang"
"go.chromium.org/luci/analysis/internal/tracing"
)
// CachedRule represents a "compiled" version of a failure
// association rule.
// It should be treated as immutable, and is therefore safe to
// share across multiple threads.
type CachedRule struct {
// The failure association rule.
Rule rules.Entry
// The parsed and compiled failure association rule.
Expr *lang.Expr
}
// NewCachedRule initialises a new CachedRule from the given failure
// association rule.
func NewCachedRule(rule *rules.Entry) (*CachedRule, error) {
expr, err := lang.Parse(rule.RuleDefinition)
if err != nil {
return nil, err
}
return &CachedRule{
Rule: *rule,
Expr: expr,
}, nil
}
// Ruleset represents a version of the set of failure
// association rules in use by a LUCI Project.
// It should be treated as immutable, and therefore safe to share
// across multiple threads.
type Ruleset struct {
// The LUCI Project.
Project string
// ActiveRulesSorted is the set of active failure association rules
// (should be used by LUCI Analysis for matching), sorted in descending
// PredicateLastUpdated time order.
ActiveRulesSorted []*CachedRule
// ActiveRulesByID stores active failure association
// rules by their Rule ID.
ActiveRulesByID map[string]*CachedRule
// Version versions the contents of the Ruleset. These timestamps only
// change if a rule is modified.
Version rules.Version
// LastRefresh contains the monotonic clock reading when the last ruleset
// refresh was initiated. The refresh is guaranteed to contain all rules
// changes made prior to this timestamp.
LastRefresh time.Time
}
// ActiveRulesWithPredicateUpdatedSince returns the set of rules that are
// active and whose predicates have been updated since (but not including)
// the given time.
// Rules which have been made inactive since the given time will NOT be
// returned. To check if a previous rule has been made inactive, consider
// using IsRuleActive instead.
// The returned slice must not be mutated.
func (r *Ruleset) ActiveRulesWithPredicateUpdatedSince(t time.Time) []*CachedRule {
// Use the property that ActiveRules is sorted by descending
// LastUpdated time.
for i, rule := range r.ActiveRulesSorted {
if !rule.Rule.PredicateLastUpdateTime.After(t) {
// This is the first rule that has not been updated since time t.
// Return all rules up to (but not including) this rule.
return r.ActiveRulesSorted[:i]
}
}
return r.ActiveRulesSorted
}
// Returns whether the given ruleID is an active rule.
func (r *Ruleset) IsRuleActive(ruleID string) bool {
_, ok := r.ActiveRulesByID[ruleID]
return ok
}
// newEmptyRuleset initialises a new empty ruleset.
// This initial ruleset is invalid and must be refreshed before use.
func newEmptyRuleset(project string) *Ruleset {
return &Ruleset{
Project: project,
ActiveRulesSorted: nil,
ActiveRulesByID: make(map[string]*CachedRule),
// The zero predicate last updated time is not valid and will be
// rejected by clustering state validation if we ever try to save
// it to Spanner as a chunk's RulesVersion.
Version: rules.Version{},
LastRefresh: time.Time{},
}
}
// NewRuleset creates a new ruleset with the given project,
// active rules, rules last updated and last refresh time.
func NewRuleset(project string, activeRules []*CachedRule, version rules.Version, lastRefresh time.Time) *Ruleset {
return &Ruleset{
Project: project,
ActiveRulesSorted: sortByDescendingPredicateLastUpdated(activeRules),
ActiveRulesByID: rulesByID(activeRules),
Version: version,
LastRefresh: lastRefresh,
}
}
// refresh updates the ruleset. To ensure existing users of the rulset
// do not observe changes while they are using it, a new copy is returned.
func (r *Ruleset) refresh(ctx context.Context) (ruleset *Ruleset, err error) {
// Under our design assumption of 10,000 active rules per project,
// pulling and compiling all rules could take a meaningful amount
// of time (@ 1KB per rule, = ~10MB).
ctx, s := tracing.Start(ctx, "go.chromium.org/luci/analysis/internal/clustering/rules/cache.Refresh",
attribute.String("project", r.Project),
)
defer func() { tracing.End(s, err) }()
// Use clock reading before refresh. The refresh is guaranteed
// to contain all rule changes committed to Spanner prior to
// this timestamp.
lastRefresh := clock.Now(ctx)
txn, cancel := span.ReadOnlyTransaction(ctx)
defer cancel()
var activeRules []*CachedRule
if r.Version == (rules.Version{}) {
// On the first refresh, query all active rules.
ruleRows, err := rules.ReadActive(txn, r.Project)
if err != nil {
return nil, err
}
activeRules, err = cachedRulesFromFullRead(ruleRows)
if err != nil {
return nil, err
}
} else {
// On subsequent refreshes, query just the differences.
delta, err := rules.ReadDelta(txn, r.Project, r.Version.Total)
if err != nil {
return nil, err
}
activeRules, err = cachedRulesFromDelta(r.ActiveRulesSorted, delta)
if err != nil {
return nil, err
}
}
// Get the version of set of rules read by ReadActive/ReadDelta.
// Must occur in the same spanner transaction as ReadActive/ReadDelta.
// If the project has no rules, this returns rules.StartingEpoch.
rulesVersion, err := rules.ReadVersion(txn, r.Project)
if err != nil {
return nil, err
}
return NewRuleset(r.Project, activeRules, rulesVersion, lastRefresh), nil
}
// cachedRulesFromFullRead obtains a set of cached rules from the given set of
// active failure association rules.
func cachedRulesFromFullRead(activeRules []*rules.Entry) ([]*CachedRule, error) {
var result []*CachedRule
for _, r := range activeRules {
cr, err := NewCachedRule(r)
if err != nil {
return nil, errors.Annotate(err, "rule %s is invalid", r.RuleID).Err()
}
result = append(result, cr)
}
return result, nil
}
// cachedRulesFromDelta applies deltas to an existing list of rules,
// to obtain an updated set of rules.
func cachedRulesFromDelta(existing []*CachedRule, delta []*rules.Entry) ([]*CachedRule, error) {
ruleByID := make(map[string]*CachedRule)
for _, r := range existing {
ruleByID[r.Rule.RuleID] = r
}
for _, d := range delta {
if d.IsActive {
cr, err := NewCachedRule(d)
if err != nil {
return nil, errors.Annotate(err, "rule %s is invalid", d.RuleID).Err()
}
ruleByID[d.RuleID] = cr
} else {
// Delete the rule, if it exists.
delete(ruleByID, d.RuleID)
}
}
var results []*CachedRule
for _, r := range ruleByID {
results = append(results, r)
}
return results, nil
}
// sortByDescendingPredicateLastUpdated sorts the given rules in descending
// predicate last-updated time order. If two rules have the same
// PredicateLastUpdated time, they are sorted in RuleID order.
func sortByDescendingPredicateLastUpdated(rules []*CachedRule) []*CachedRule {
result := make([]*CachedRule, len(rules))
copy(result, rules)
sort.Slice(result, func(i, j int) bool {
if result[i].Rule.PredicateLastUpdateTime.Equal(result[j].Rule.PredicateLastUpdateTime) {
return result[i].Rule.RuleID < result[j].Rule.RuleID
}
return result[i].Rule.PredicateLastUpdateTime.After(result[j].Rule.PredicateLastUpdateTime)
})
return result
}
// rulesByID creates a mapping from rule ID to rules for the given list
// of failure association rules.
func rulesByID(rules []*CachedRule) map[string]*CachedRule {
result := make(map[string]*CachedRule)
for _, r := range rules {
result[r.Rule.RuleID] = r
}
return result
}