blob: a7221f92d882c1dcb6c76bdabffd29ccf4009e78 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package buffer implements a batching buffer with batch lease and retry
// management.
//
// It is meant to be used by something which handles synchronization
// (primarially the "common/sync/dispatcher.Channel"). As such, it IS NOT
// GOROUTINE-SAFE. If you use this outside of dispatcher.Channel, you must
// synchronize all access to the methods of Buffer.
package buffer
import (
"context"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry"
)
// These errors can be returned from AddNoBlock.
var (
ErrBufferFull = errors.New("buffer is full")
ErrItemTooLarge = errors.New("item exceeds buffer's BatchSizeMax")
ErrItemTooSmall = errors.New("item has zero or negative size, and BatchSizeMax is set")
)
// Buffer batches individual data items into Batch objects.
//
// All access to the Buffer (as well as invoking ACK/NACK on LeasedBatches) must
// be synchronized because Buffer is not goroutine-safe.
type Buffer struct {
// User-provided Options block; dictates all policy for this Buffer.
opts Options
// A moving average of the number of batch items so far; Helps reduce the
// number of spurious re-allocations if the average number of batch items is
// roughly (within 20%) of a constant amount.
batchItemsGuess *movingAverage
// Current statistics for this Buffer. Can be read with the Stats() method,
// and is kept up to date by various functions in the Buffer.
stats Stats
// Currently-accumulating batch; Data added to the Buffer will extend this
// batch.
//
// NOTE: It is possible for this to be nil; if AddNoBlock fills this Batch up
// until Batch.canAccept returns false, it will be removed and pushed into
// `unleased`.
currentBatch *Batch
// Contains all of the cut, but not currently leased, Batches.
//
// Kept ordered by (nextSend, id), so the 0th element is always the
// next-batch-to-be-sent.
unleased batchHeap
// Contains all of the live leased batches.
liveLeases map[*Batch]struct{}
// Tracks the liveLeases which haven't been ack'd yet.
//
// Batches can be removed from liveLeases without being removed from
// unAckedLeases when the Batch is dropped from the Buffer due to the
// FullBehavior policy.
unAckedLeases map[*Batch]struct{}
// The `id` of the last Batch we cut. If currentBatch is non-nil, this is
// `currentBatch.id`.
//
// NOTE: 0 is not a valid Batch.id.
lastBatchID uint64
}
// NewBuffer returns a new Buffer configured with the given Options.
//
// If there's an issue with the provided Options, this returns an error.
func NewBuffer(o *Options) (*Buffer, error) {
if o == nil {
o = &Options{}
}
ret := &Buffer{opts: *o} // copy o before normalizing it
if err := ret.opts.normalize(); err != nil {
return nil, errors.Annotate(err, "normalizing buffer.Options").Err()
}
ret.unleased.onlyID = o.FIFO
ret.batchItemsGuess = newMovingAverage(10, ret.opts.batchItemsGuess())
ret.liveLeases = map[*Batch]struct{}{}
ret.unAckedLeases = map[*Batch]struct{}{}
return ret, nil
}
// Returns the oldest *Batch from liveLeases.
func (buf *Buffer) oldestLiveLease() (oldest *Batch) {
for lb := range buf.liveLeases {
if oldest == nil || lb.id < oldest.id {
oldest = lb
}
}
return
}
// dropOldest will drop the oldest un-dropped batch.
//
// Returns the dropped Batch.
func (buf *Buffer) dropOldest() (dropped *Batch) {
unleased, unleasedIdx := buf.unleased.Oldest()
leased := buf.oldestLiveLease()
switch {
case unleased != nil && (leased == nil || unleased.id < leased.id):
buf.unleased.RemoveAt(unleasedIdx)
buf.stats.del(unleased, categoryUnleased)
return unleased
case leased != nil:
delete(buf.liveLeases, leased)
buf.stats.mv(leased, categoryLeased, categoryDropped)
return leased
default:
// At this point, the only thing left to drop is the currentBatch.
current := buf.currentBatch
if current == nil {
panic(errors.New(
"impossible; must drop Batch, but there's NO undropped data"))
}
buf.currentBatch = nil
buf.stats.del(current, categoryUnleased)
return current
}
}
// AddNoBlock adds the item to the Buffer with the given size.
//
// Possibly drops a batch according to FullBehavior.
//
// Returns an error under the following conditions:
//
// - ErrBufferFull - If FullBehavior.ComputeState returns okToInsert=false.
// - ErrItemTooLarge - If this buffer has a BatchSizeMax configured and
// `itemSize` is larger than this.
// - ErrItemTooSmall - If this buffer has a BatchSizeMax configured and
// `itemSize` is zero, or if `itemSize` is negative.
func (buf *Buffer) AddNoBlock(now time.Time, item any, itemSize int) (dropped *Batch, err error) {
if err = buf.opts.checkItemSize(itemSize); err != nil {
return
}
okToInsert, dropBatch := buf.opts.FullBehavior.ComputeState(buf.stats)
if !okToInsert {
return nil, ErrBufferFull
}
// All error checks done above before we modify any state in buf.
if dropBatch {
dropped = buf.dropOldest()
}
if !buf.currentBatch.canAccept(&buf.opts, itemSize) {
buf.Flush(now)
}
if buf.currentBatch == nil {
buf.currentBatch = &Batch{
// Try to minimize allocations by allocating 20% more slots in Data than
// the moving average for the last 10 batches' actual use.
Data: make([]BatchItem, 0, int(buf.batchItemsGuess.get()*1.2)),
id: buf.lastBatchID + 1,
retry: buf.opts.Retry(),
nextSend: now.Add(buf.opts.BatchAgeMax),
}
buf.lastBatchID++
}
buf.currentBatch.Data = append(buf.currentBatch.Data, BatchItem{item, itemSize})
buf.currentBatch.countedItems++
buf.currentBatch.countedSize += itemSize
buf.stats.addOneUnleased(itemSize)
// If the currentBatch couldn't even accept the smallest size item, go ahead
// and Flush it now.
if !buf.currentBatch.canAccept(&buf.opts, 1) {
buf.Flush(now)
}
return
}
// Flush causes any buffered-but-not-batched data to be immediately cut into
// a Batch.
//
// No-op if there's no such data.
func (buf *Buffer) Flush(now time.Time) {
batch := buf.currentBatch
if batch == nil {
return
}
batch.nextSend = now // immediately make available to send
buf.unleased.PushBatch(batch)
buf.batchItemsGuess.record(batch.countedItems)
buf.currentBatch = nil
return
}
// NextSendTime returns the send time for the next-most-available-to-send Batch,
// or a Zero time.Time if no batches are available to send.
//
// NOTE: Because LeaseOne enforces MaxLeases, this time may not guarantee an
// available lease.
func (buf *Buffer) NextSendTime() time.Time {
ret := time.Time{}
if next := buf.unleased.Peek(); next != nil {
ret = next.nextSend
}
if buf.currentBatch != nil {
if ns := buf.currentBatch.nextSend; ret.IsZero() || ns.Before(ret) {
ret = ns
}
}
return ret
}
// Stats returns information about the Buffer's state.
func (buf *Buffer) Stats() Stats {
return buf.stats
}
// CanAddItem returns true iff the Buffer will accept an item from AddNoBlock
// without returning ErrBufferFull.
func (buf *Buffer) CanAddItem() bool {
okToInsert, _ := buf.opts.FullBehavior.ComputeState(buf.stats)
return okToInsert
}
// LeaseOne returns the most-available-to-send Batch from this Buffer.
//
// The caller must invoke one of ACK/NACK on the Batch. The Batch will count
// against this Buffer's Stats().Total() until the caller does so.
//
// Returns nil if no batch is available to lease, or if the Buffer has reached
// MaxLeases.
func (buf *Buffer) LeaseOne(now time.Time) (leased *Batch) {
cur, next := buf.currentBatch, buf.unleased.Peek()
switch {
case len(buf.unAckedLeases) == int(buf.opts.MaxLeases):
// too many outstanding leases
return
case next != nil && !now.Before(next.nextSend):
// next unleased batch is fine to use
case cur != nil && !now.Before(cur.nextSend):
// currentBatch has data we can send
buf.Flush(now)
default:
// Nothing's ready.
return
}
return buf.forceLeaseOne()
}
// leases the next available batch, regardless of its marked nextSend time.
func (buf *Buffer) forceLeaseOne() (leased *Batch) {
if buf.unleased.Len() == 0 {
return nil
}
leased = buf.unleased.PopBatch()
buf.unAckedLeases[leased] = struct{}{}
buf.liveLeases[leased] = struct{}{}
buf.stats.mv(leased, categoryUnleased, categoryLeased)
return
}
// ForceLeaseAll leases and returns all unleased Batches immediately, regardless
// of their send times.
//
// This is useful for cancelation scenarios where you no longer want to do
// full processing on the remaining batches.
//
// NOTE: It's helpful to call Flush before ForceLeaseAll to include the
// currently buffered, but unbatched, data.
func (buf *Buffer) ForceLeaseAll() []*Batch {
if buf.unleased.Len() == 0 {
return nil
}
ret := make([]*Batch, 0, buf.unleased.Len())
for buf.unleased.Len() > 0 {
ret = append(ret, buf.forceLeaseOne())
}
return ret
}
// ACK records that all the items in the batch have been processed.
//
// The Batch is no longer tracked in any form by the Buffer.
//
// Calling ACK/NACK on the same Batch twice will panic.
// Calling ACK/NACK on a Batch not returned from LeaseOne will panic.
func (buf *Buffer) ACK(leased *Batch) {
buf.removeLease(leased)
}
func (buf *Buffer) removeLease(leased *Batch) (live bool) {
if leased == nil {
// Nil batch. Bail out.
return
}
if _, known := buf.unAckedLeases[leased]; !known {
panic(errors.New("unknown *Batch; are you ACK/NACK'ing multiple times?"))
}
delete(buf.unAckedLeases, leased)
if _, live = buf.liveLeases[leased]; live {
delete(buf.liveLeases, leased)
buf.stats.del(leased, categoryLeased)
} else {
buf.stats.del(leased, categoryDropped)
}
return
}
// NACK analyzes the current state of Batch.Data, potentially reducing
// UnleasedItemCount in the Buffer's if the given Batch.Data length is
// smaller than when the Batch was originally leased.
//
// The Batch will be re-enqueued unless:
// - The Batch's retry Iterator returns retry.Stop
// - The Batch has been dropped already due to FullBehavior policy. If this
// is the case, AddNoBlock would already have returned this *Batch pointer
// to you.
//
// Calling ACK/NACK on the same Batch twice will panic.
// Calling ACK/NACK on a Batch not returned from LeaseOne will panic.
func (buf *Buffer) NACK(ctx context.Context, err error, leased *Batch) {
if live := buf.removeLease(leased); !live {
return
}
// TODO(iannucci): decouple retry from context (pass in 'now' instead)
switch toWait := leased.retry.Next(ctx, err); {
case toWait == retry.Stop:
return
default:
leased.nextSend = clock.Now(ctx).Add(toWait)
}
leased.countedItems = intMin(len(leased.Data), leased.countedItems)
var newSize int
for i := range leased.Data {
newSize += leased.Data[i].Size
}
leased.countedSize = intMin(newSize, leased.countedSize)
buf.unleased.PushBatch(leased)
buf.stats.add(leased, categoryUnleased)
return
}
func intMin(a, b int) int {
if a < b {
return a
}
return b
}
func intMax(a, b int) int {
if a > b {
return a
}
return b
}