blob: adf21c5aa18fdb2e2486c2baacbb74abac1aa5ae [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"go.chromium.org/luci/common/errors"
)
// FullBehavior allows you to customize the Buffer's behavior when it gets too
// full.
//
// Generally you'll pick one of DropOldestBatch or BlockNewItems.
type FullBehavior interface {
// Check inspects Options to see if it's compatible with this FullBehavior
// implementation.
//
// Called exactly once during Buffer creation before any other methods are
// used.
Check(Options) error
// ComputeState evaluates the state of the Buffer (via Stats) and returns:
//
// * okToInsert - User can add an item without blocking.
// * dropBatch - Buffer needs to drop the oldest batch if the user does
// insert data.
//
// Called after Check.
ComputeState(stats Stats) (okToInsert, dropBatch bool)
// NeedsItemSize should return true iff this FullBehavior requires item sizes
// to effectively apply its policy.
//
// Called after Check.
NeedsItemSize() bool
}
// BlockNewItems prevents the Buffer from accepting any new items as long as it
// it has MaxItems worth of items.
//
// This will never drop batches, but will block inserts.
type BlockNewItems struct {
// The maximum number of items that this Buffer is allowed to house (including
// both leased and unleased items).
//
// Default: -1 if BatchItemsMax != -1 else max(1000, BatchItemsMax)
// Required: Must be >= BatchItemsMax or -1 (only MaxSize applies)
MaxItems int
// The maximum* number of 'size units' that this Buffer is allowed to house
// (including both leased and unleased items).
//
// NOTE(*): This only blocks addition of new items once the buffer is
// at-or-past capacitiy. e.g. if the buffer has a MaxSize of 1000 and
// a BatchSizeMax of 100, and you insert items worth 999 units, this will
// still allow the addition of another item (of up to 100 units) before
// claiming the buffer is full.
//
// NOTE: This may only be set if BatchSizeMax is > 0; Otherwise buffer will
// not enforce item sizes, and so BlockNewItems will not be able to enforce
// this policy.
//
// Default: -1 if BatchSizeMax != -1 else BatchSizeMax * 5
// Required: Must be >= BatchSizeMax or -1 (only MaxItems applies)
MaxSize int
}
var _ FullBehavior = (*BlockNewItems)(nil)
// ComputeState implements FullBehavior.ComputeState.
func (b *BlockNewItems) ComputeState(stats Stats) (okToInsert, dropBatch bool) {
itemsOK := b.MaxItems == -1 || stats.Total() < b.MaxItems
sizeOK := b.MaxSize == -1 || stats.TotalSize() < b.MaxSize
okToInsert = itemsOK && sizeOK
return
}
// Check implements FullBehavior.Check.
func (b *BlockNewItems) Check(opts Options) (err error) {
if b.MaxItems == 0 {
b.MaxItems = negOneOrMax(opts.BatchItemsMax, 1000)
}
if b.MaxSize == 0 {
b.MaxSize = negOneOrMult(opts.BatchSizeMax, 5)
}
switch {
case b.MaxItems == -1:
case b.MaxItems < opts.BatchItemsMax:
return errors.Reason("BlockNewItems.MaxItems must be >= BatchItemsMax[%d]: got %d",
opts.BatchItemsMax, b.MaxItems).Err()
}
switch {
case b.MaxSize == -1:
case b.MaxSize < opts.BatchSizeMax:
return errors.Reason("BlockNewItems.MaxSize must be >= BatchSizeMax[%d]: got %d",
opts.BatchSizeMax, b.MaxSize).Err()
case opts.BatchSizeMax == -1:
return errors.Reason("BlockNewItems.MaxSize may only be set with BatchSizeMax[%d] > 0",
opts.BatchSizeMax).Err()
}
if b.MaxItems == -1 && b.MaxSize == -1 {
return errors.Reason("BlockNewItems must have one of MaxItems or MaxSize > 0").Err()
}
return
}
// NeedsItemSize implements FullBehavior.NeedsItemSize.
func (b *BlockNewItems) NeedsItemSize() bool {
return b.MaxSize > 0
}
// DropOldestBatch will drop buffered data whenever the number of unleased items
// plus leased items would grow beyond MaxLiveItems.
//
// This will never block inserts, but will drop batches.
type DropOldestBatch struct {
// The maximum number of leased and unleased items that the Buffer may have
// before dropping data.
//
// Once a batch is dropped, it no longer counts against MaxLiveItems, but it
// may still be in memory if the dropped batch was currently leased.
//
// NOTE: The maximum Stats.Total number of items the Buffer could have at
// a given time is:
//
// MaxLiveItems + (BatchItemsMax * MaxLeases)
//
// Default: -1 if BatchItemsMax == -1 else max(1000, BatchItemsMax)
// Required: Must be >= BatchItemsMax or -1 (only MaxLiveSize applies)
MaxLiveItems int
// The maximum number of leased and unleased size units that the Buffer may
// have before dropping data.
//
// Once a batch is dropped, it no longer counts against MaxLiveSize, but it
// may still be in memory if the dropped batch was currently leased at the
// time it was dropped.
//
// NOTE: The maximum Stats.TotalSize the Buffer could have at a given
// time is:
//
// MaxLiveSize + (BatchSizeMax * MaxLeases)
//
// NOTE: This may only be set if BatchSizeMax is > 0; Otherwise buffer will
// not size inserted items, and so DropOldestBatch will not be able to enforce
// this policy.
//
// Default: -1 if BatchSizeMax == -1 else BatchSizeMax * 5
// Required: Must be >= BatchSizeMax or -1 (only MaxLiveItems applies)
MaxLiveSize int
}
var _ FullBehavior = (*DropOldestBatch)(nil)
// ComputeState implements FullBehavior.ComputeState.
func (d *DropOldestBatch) ComputeState(stats Stats) (okToInsert, dropBatch bool) {
okToInsert = true
itemsDrop := d.MaxLiveItems > -1 && stats.Total() >= d.MaxLiveItems
sizeDrop := d.MaxLiveSize > -1 && stats.TotalSize() >= d.MaxLiveSize
dropBatch = itemsDrop || sizeDrop
return
}
// Check implements FullBehavior.Check.
func (d *DropOldestBatch) Check(opts Options) (err error) {
if d.MaxLiveItems == 0 {
d.MaxLiveItems = negOneOrMax(opts.BatchItemsMax, 1000)
}
if d.MaxLiveSize == 0 {
d.MaxLiveSize = negOneOrMult(opts.BatchSizeMax, 5)
}
switch {
case d.MaxLiveItems == -1:
case d.MaxLiveItems < opts.BatchItemsMax:
return errors.Reason("DropOldestBatch.MaxLiveItems must be >= BatchItemsMax[%d]: got %d",
opts.BatchItemsMax, d.MaxLiveItems).Err()
}
switch {
case d.MaxLiveSize == -1:
case d.MaxLiveSize < opts.BatchSizeMax:
return errors.Reason("DropOldestBatch.MaxLiveSize must be >= BatchSizeMax[%d]: got %d",
opts.BatchSizeMax, d.MaxLiveSize).Err()
case opts.BatchSizeMax == -1:
return errors.Reason("DropOldestBatch.MaxLiveSize may only be set with BatchSizeMax[%d] > 0",
opts.BatchSizeMax).Err()
}
if d.MaxLiveItems == -1 && d.MaxLiveSize == -1 {
return errors.Reason("DropOldestBatch must have one of MaxLiveItems or MaxLiveSize > 0").Err()
}
return
}
// NeedsItemSize implements FullBehavior.NeedsItemSize.
func (d *DropOldestBatch) NeedsItemSize() bool {
return d.MaxLiveSize > 0
}
// InfiniteGrowth will not drop data or block new items. It just grows until
// your computer runs out of memory.
//
// This will never block inserts, and will not drop batches.
type InfiniteGrowth struct{}
var _ FullBehavior = InfiniteGrowth{}
// ComputeState implements FullBehavior.ComputeState.
func (i InfiniteGrowth) ComputeState(Stats) (okToInsert, dropBatch bool) { return true, false }
// Check implements FullBehavior.Check.
func (i InfiniteGrowth) Check(opts Options) (err error) { return nil }
// NeedsItemSize implements FullBehavior.NeedsItemSize.
func (i InfiniteGrowth) NeedsItemSize() bool { return false }
func negOneOrMult(a, b int) int {
if a == -1 {
return -1
}
return a * b
}
func negOneOrMax(a, b int) int {
if a == -1 {
return -1
}
return intMax(a, b)
}