blob: 868e8427199442f65249c8d646cc2b13285472cd [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package limiter
import (
// ErrLimitReached is returned by CheckRequest when some limit is reached.
var ErrLimitReached = errors.New("the server limit reached")
var (
// Number of in-flight requests.
concurrencyCurGauge = metric.NewInt(
"Number of requests being processed right now.",
field.String("limiter"), // name of the limiter that reports the metric
// Configured maximum number of in-flight requests.
concurrencyMaxGauge = metric.NewInt(
"A configured limit on a number of concurrently processed requests.",
field.String("limiter"), // name of the limiter that reports the metric
// Counter with rejected requests.
rejectedCounter = metric.NewCounter(
"Number of rejected requests.",
field.String("limiter"), // name of the limiter that did the rejection
field.String("call"), // an RPC or an endpoint being called (if known)
field.String("peer"), // who's making the request (if known), see also peer.go.
field.String("reason")) // why the request was rejected
// Options contains configuration of a single Limiter instance.
type Options struct {
Name string // used for metric fields, logs and error messages
AdvisoryMode bool // if true, don't actually reject requests, just log
MaxConcurrentRequests int64 // a hard limit on a number of concurrent requests
// Limiter is a stateful runtime object that decides whether to accept or reject
// requests based on the current load (calculated from requests that went
// through it).
// It is also responsible for maintaining monitoring metrics that describe its
// state and what requests it rejects.
// May be running in advisory mode, in which it will do all the usual processing
// and logging, but won't actually reject the requests.
// All methods are safe for concurrent use.
type Limiter struct {
opts Options // options passed to New, as is
titleForLog string // how the limiter is named in logs and error replies
concurrency int64 // atomic int with number of current in-flight requests
// RequestInfo holds information about a single inbound request.
// Used by the limiter to decide whether to accept or reject the request.
// Pretty sparse now, but in the future will contain fields like cost, a QoS
// class and an attempt count, which will help the limiter to decide what
// requests to drop.
// Fields `CallLabel` and `PeerLabel` are intentionally pretty generic, since
// they will be used only as labels in internal maps and metric fields. Their
// internal structure and meaning are not important to the limiter, but the
// cardinality of the set of their possible values must be reasonably bounded.
type RequestInfo struct {
CallLabel string // an RPC or an endpoint being called (if known)
PeerLabel string // who's making the request (if known), see also peer.go
// New returns a new limiter.
// Returns an error if options are invalid.
func New(opts Options) (*Limiter, error) {
if opts.MaxConcurrentRequests <= 0 {
return nil, errors.New("max concurrent requests must be positive")
return &Limiter{
opts: opts,
titleForLog: fmt.Sprintf("%s<=%d", opts.Name, opts.MaxConcurrentRequests),
}, nil
// ReportMetrics updates all limiter's gauge metrics to match the current state.
// Must be called periodically (at least once per every metrics flush).
func (l *Limiter) ReportMetrics(ctx context.Context) {
concurrencyCurGauge.Set(ctx, atomic.LoadInt64(&l.concurrency), l.opts.Name)
concurrencyMaxGauge.Set(ctx, l.opts.MaxConcurrentRequests, l.opts.Name)
// CheckRequest should be called before processing a request.
// If it returns an error, the request should be declined as soon as possible
// with Unavailable/HTTP 503 status and the given error (which is an annotated
// ErrLimitReached).
// If it succeeds, the request should be processed as usual, and the returned
// callback called afterwards to notify the limiter the processing is done.
func (l *Limiter) CheckRequest(ctx context.Context, ri *RequestInfo) (done func(), err error) {
// TODO(vadimsh): This is the simplest limiter implementation possible. It
// will likely learn more tricks once we understand what features we need.
for {
cur := atomic.LoadInt64(&l.concurrency)
if cur >= l.opts.MaxConcurrentRequests && !l.opts.AdvisoryMode {
return nil, l.reject(ctx, ri, "max concurrency")
if !atomic.CompareAndSwapInt64(&l.concurrency, cur, cur+1) {
continue // race, try again
// Now that we have definitely grabbed the execution slot, report the
// advisory rejection message. Doing it sooner may result in duplications.
if cur >= l.opts.MaxConcurrentRequests && l.opts.AdvisoryMode {
_ = l.reject(ctx, ri, "max concurrency") // actually ignore the error
return func() { atomic.AddInt64(&l.concurrency, -1) }, nil
// reject is called when the request is rejected (either for real or in
// advisory mode).
// It updates metrics and logs and returns an annotated ErrLimitReached error.
func (l *Limiter) reject(ctx context.Context, ri *RequestInfo, reason string) error {
rejectedCounter.Add(ctx, 1, l.opts.Name, ri.CallLabel, ri.PeerLabel, reason)
if l.opts.AdvisoryMode {
logging.Warningf(ctx, "limiter %q in advisory mode: the request hit the %s limit", l.titleForLog, reason)
} else {
logging.Errorf(ctx, "limiter %q: the request hit the %s limit", l.titleForLog, reason)
return errors.Annotate(ErrLimitReached, "limiter %q: %s limit", l.titleForLog, reason).Err()