blob: f18c64773f43ff697c61cd123e5008f5fec78d80 [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package quota
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"strings"
"text/template"
"github.com/gomodule/redigo/redis"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/server/redisconn"
pb "go.chromium.org/luci/server/quotabeta/proto"
"go.chromium.org/luci/server/quotabeta/quotaconfig"
)
// ErrInsufficientQuota is returned by UpdateQuota when the updates were not
// applied due to insufficient quota.
var ErrInsufficientQuota = errors.New("insufficient quota")
var cfgKey = "cfg"
// Use returns a context.Context directing this package to use the given
// quotaconfig.Interface.
func Use(ctx context.Context, cfg quotaconfig.Interface) context.Context {
return context.WithValue(ctx, &cfgKey, cfg)
}
// getInterface returns the quotaconfig.Interface available in the given
// context.Context. Panics if no quotaconfig.Interface is available (see Use).
func getInterface(ctx context.Context) quotaconfig.Interface {
cfg, ok := ctx.Value(&cfgKey).(quotaconfig.Interface)
if !ok {
panic(errors.Reason("quotaconfig.Interface implementation not found (ensure quota.Use is called in server.Main)").Err())
}
return cfg
}
type Options struct {
// RequestID is a string to use for deduplication of successful quota
// adjustments, valid for one hour. Only successful updates are deduplicated,
// mainly for the case where time has passed and quota has replenished
// enough for a previously failed adjustment to succeed.
//
// Callers should ensure the same Options and function parameters are provided
// each time a request ID is reused, since any reuse within a one hour period
// is subject to deduplication regardless of other Options and parameters.
RequestID string
// User is a value to substitute for ${user} in policy names.
// Policy defaults are sourced from the unsubstituted name.
User string
}
// dedupePrefix is a *template.Template for a Lua script which fetches the given
// deduplication key from Redis, returning if it exists and is temporally valid.
// Prefix to other scripts to create a script which exits early when the
// deduplication key is valid. Scripts should be suffixed with dedupeSuffix to
// save the deduplication key.
//
// Template variables:
// Key: A string to use for deduplication.
// Now: Current time in seconds since epoch.
var dedupePrefix = template.Must(template.New("dedupePrefix").Parse(`
local deadline = redis.call("HINCRBY", "deduplicationKeys", "{{.Key}}", 0)
if deadline >= {{.Now}} then
return
end
`))
// dedupeSuffix is a *template.Template for a Lua script which writes the given
// deduplication key to Redis. Should be used with dedupePrefix.
//
// Template variables:
// Key: A string to use for deduplication.
// Deadline: Time in seconds since epoch after which the key no longer dedupes.
var dedupeSuffix = template.Must(template.New("dedupeSuffix").Parse(`
redis.call("HMSET", "deduplicationKeys", "{{.Key}}", {{.Deadline}})
`))
// updateEntry is a *template.Template for a Lua script which fetches the given
// quota entry from Redis, initializing it if it doesn't exist, replenishes
// quota since the last update, and updates the amount. Does not modify the
// entry in the database, instead entries must explicitly be stored by
// concatenating this script with setEntry. This enables atomic updates of
// multiple entries by running a script which concatenates multiple updateEntry
// calls followed by corresponding setEntry calls. Such a script updates entries
// in the database iff all updates would succeed.
//
// Template variables:
// Var: Name of a Lua variable to store the quota entry in memory.
// Name: Name of the quota entry to update.
// Default: Default number of resources to initialize new entries with.
// Now: Update time in seconds since epoch.
// Replenishment: Amount of resources to replenish every second.
// Amount: Amount by which to update resources.
var updateEntry = template.Must(template.New("updateEntry").Parse(`
{{.Var}} = {}
{{.Var}}["name"] = "{{.Name}}"
-- Check last updated time to determine if this entry exists.
{{.Var}}["updated"] = redis.call("HINCRBY", "{{.Name}}", "updated", 0)
if {{.Var}}["updated"] == 0 then
-- Delete the updated time of 0 to avoid leaving partial entries
-- in the database in case of error.
redis.call("DEL", "{{.Name}}")
{{.Var}}["resources"] = {{.Default}}
{{.Var}}["updated"] = {{.Now}}
elseif {{.Var}}["updated"] > {{.Now}} then
return redis.error_reply("\"{{.Name}}\" last updated in the future")
else
{{.Var}}["resources"] = redis.call("HINCRBY", "{{.Name}}", "resources", 0)
end
-- Replenish resources up to the cap before updating.
{{.Var}}["resources"] = {{.Var}}["resources"] + ({{.Now}} - {{.Var}}["updated"]) * {{.Replenishment}}
{{.Var}}["updated"] = {{.Now}}
-- Cap resources at the default amount.
if {{.Var}}["resources"] > {{.Default}} then
{{.Var}}["resources"] = {{.Default}}
end
-- Check that the update would succeed before updating.
if {{.Var}}["resources"] + {{.Amount}} < 0 then
return redis.error_reply("\"{{.Name}}\" has insufficient resources")
end
-- Update resources up to the cap.
{{.Var}}["resources"] = {{.Var}}["resources"] + {{.Amount}}
{{.Var}}["updated"] = {{.Now}}
-- Cap resources at the default amount.
if {{.Var}}["resources"] > {{.Default}} then
{{.Var}}["resources"] = {{.Default}}
end
`))
// setEntry is a *template.Template for a Lua script which sets the given quota
// entry in Redis. Should be used after updateEntry.
//
// Template variables:
// Var: Name of a Lua variable holding the quota entry in memory.
var setEntry = template.Must(template.New("setEntry").Parse(`
redis.call("HMSET", {{.Var}}["name"], "resources", {{.Var}}["resources"], "updated", {{.Var}}["updated"])
`))
// UpdateQuota atomically adjusts the given quota entries using the given map of
// policy names to numeric update amounts as well as the given *Options. Returns
// ErrInsufficientQuota when the adjustments were not made due to insufficient
// quota.
//
// Panics if quotaconfig.Interface is not available in the given context.Context
// (see WithConfig).
func UpdateQuota(ctx context.Context, updates map[string]int64, opts *Options) error {
now := clock.Now(ctx).Unix()
cfg := getInterface(ctx)
defs := make(map[string]*pb.Policy, len(updates))
adjs := make(map[string]int64, len(updates))
i := 0
for pol, val := range updates {
name := pol
if strings.Contains(pol, "${user}") {
if opts == nil || opts.User == "" {
return errors.Reason("user unspecified for %q", pol).Err()
}
name = strings.ReplaceAll(name, "${user}", opts.User)
}
name = fmt.Sprintf("entry:%x", sha256.Sum256([]byte(name)))
def, err := cfg.Get(ctx, pol)
if err != nil {
return errors.Annotate(err, "fetching config").Err()
}
defs[name] = def
adjs[name] = val
i++
}
conn, err := redisconn.Get(ctx)
if err != nil {
return errors.Annotate(err, "establishing connection").Err()
}
defer conn.Close()
s := bytes.NewBufferString("local entries = {}\n")
if opts != nil && opts.RequestID != "" {
if err := dedupePrefix.Execute(s, map[string]interface{}{
"Key": opts.RequestID,
"Now": now,
}); err != nil {
return errors.Annotate(err, "rendering template %q", dedupePrefix.Name()).Err()
}
}
i = 0
for name, adj := range adjs {
if err := updateEntry.Execute(s, map[string]interface{}{
"Var": fmt.Sprintf("entries[%d]", i),
"Name": name,
"Default": defs[name].Resources,
"Now": now,
"Replenishment": defs[name].Replenishment,
"Amount": adj,
}); err != nil {
return errors.Annotate(err, "rendering template %q", updateEntry.Name()).Err()
}
i++
}
for i--; i >= 0; i-- {
if err := setEntry.Execute(s, map[string]interface{}{
"Var": fmt.Sprintf("entries[%d]", i),
}); err != nil {
return errors.Annotate(err, "rendering template %q", setEntry.Name()).Err()
}
}
if opts != nil && opts.RequestID != "" {
if err := dedupeSuffix.Execute(s, map[string]interface{}{
"Key": opts.RequestID,
"Deadline": now + 3600,
}); err != nil {
return errors.Annotate(err, "rendering template %q", dedupeSuffix.Name()).Err()
}
}
if _, err := redis.NewScript(0, s.String()).Do(conn); err != nil {
if strings.HasSuffix(err.Error(), "insufficient resources") {
return ErrInsufficientQuota
}
return err
}
return nil
}