blob: 2f101e9f43c5b7600847174b03a751f8a20bb146 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"time"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry"
)
// Options configures policy for the Buffer.
//
// See Defaults for default values.
type Options struct {
// [OPTIONAL] The maximum number of outstanding leases permitted.
//
// Attempting additional leases (with LeaseOne) while at the maximum will
// return nil.
//
// Requirement: Must be > 0
MaxLeases int
// [OPTIONAL] The maximum number of items to allow in a Batch before making it
// available to lease.
//
// Special value -1: unlimited
// Requirement: Must be == -1 (i.e. cut batches on BatchAgeMax/BatchSizeMax),
// or > 0
BatchItemsMax int
// [OPTIONAL] The maximum number of "size units" to allow in a Batch before
// making it available to lease.
//
// The units used here are arbitrary and are only checked vs the value
// provided to AddNoBlock.
//
// Size is explicitly provided to AddNoBlock by the caller.
//
// Inserting an item which exceeds BatchSizeMax will result in ErrItemTooLarge.
// It's up to the caller to ensure that this doesn't happen.
//
// Special value -1: unlimited
// Requirement: Must be == -1 (i.e. cut batches on BatchAgeMax/BatchItemsMax),
// or > 0
BatchSizeMax int
// [OPTIONAL] The maximum amount of time to wait before queuing a Batch for
// transmission. Note that batches are only cut by time when a worker is ready
// to process them (i.e. LeaseOne is invoked).
//
// Requirement: Must be > 0
BatchAgeMax time.Duration
// [OPTIONAL] Sets the policy for the Buffer around how many items the Buffer
// is allowed to hold, and what happens when that number is reached.
FullBehavior FullBehavior
// [OPTIONAL] If true, ensures that the next available batch is always the one
// with the oldest data.
//
// If this is false (the default), batches will be leased in the order that
// they're available to send; If a Batch has a retry with a high delay, it's
// possible that the next leased Batch actually contains newer data than
// a later batch.
//
// NOTE: if this is combined with high Retry values, it can lead to a
// head-of-line blocking situation.
//
// Requirement: May only be true if MaxLeases == 1
FIFO bool
// [OPTIONAL] Each batch will have a retry.Iterator assigned to it from this
// retry.Factory.
//
// When a Batch is NACK'd, it will be retried at "now" plus the Duration
// returned by the retry.Iterator.
//
// If the retry.Iterator returns retry.Stop, the Batch will be silently
// dropped.
Retry retry.Factory
}
// Defaults defines the defaults for Options when it contains 0-valued
// fields.
//
// DO NOT ASSIGN/WRITE TO THIS STRUCT.
var Defaults = Options{
MaxLeases: 4,
BatchItemsMax: 20,
BatchSizeMax: -1,
BatchAgeMax: 10 * time.Second,
FullBehavior: &BlockNewItems{
MaxItems: 1000,
},
Retry: func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 200 * time.Millisecond, // initial delay
Retries: -1, // no retry cap
},
Multiplier: 1.2,
MaxDelay: 60 * time.Second,
}
},
}
// normalize validates that Options is well formed and populates defaults
// which are missing.
func (o *Options) normalize() error {
switch {
case o.MaxLeases == 0:
o.MaxLeases = Defaults.MaxLeases
case o.MaxLeases > 0:
default:
return errors.Reason("MaxLeases must be > 0: got %d", o.MaxLeases).Err()
}
switch {
case o.BatchItemsMax == -1:
case o.BatchItemsMax == 0:
o.BatchItemsMax = Defaults.BatchItemsMax
case o.BatchItemsMax > 0:
default:
return errors.Reason("BatchItemsMax must be > 0 or == -1: got %d", o.BatchItemsMax).Err()
}
switch {
case o.BatchSizeMax == -1:
case o.BatchSizeMax == 0:
o.BatchSizeMax = Defaults.BatchSizeMax
case o.BatchSizeMax > 0:
default:
return errors.Reason("BatchSizeMax must be > 0 or == -1: got %d", o.BatchSizeMax).Err()
}
switch {
case o.BatchAgeMax == 0:
o.BatchAgeMax = Defaults.BatchAgeMax
case o.BatchAgeMax > 0:
default:
return errors.Reason("BatchAgeMax must be > 0: got %s", o.BatchAgeMax).Err()
}
if o.FIFO && o.MaxLeases != 1 {
return errors.Reason("FIFO is true, but MaxLeases != 1: got %d", o.MaxLeases).Err()
}
if o.FullBehavior == nil {
o.FullBehavior = Defaults.FullBehavior
}
if o.Retry == nil {
o.Retry = Defaults.Retry
}
return errors.Annotate(o.FullBehavior.Check(*o), "FullBehavior.Check").Err()
}
func (o *Options) batchItemsGuess() int {
if o.BatchItemsMax > 0 {
return o.BatchItemsMax
}
return 10
}
func (o *Options) checkItemSize(itemSize int) error {
if itemSize < 0 {
// We don't ever allow negative sizes.
return ErrItemTooSmall
}
switch {
case o.BatchSizeMax == -1:
case itemSize == 0:
return ErrItemTooSmall
case itemSize > o.BatchSizeMax:
return ErrItemTooLarge
}
return nil
}