blob: a7221f92d882c1dcb6c76bdabffd29ccf4009e78 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package buffer implements a batching buffer with batch lease and retry
// management.
// It is meant to be used by something which handles synchronization
// (primarially the "common/sync/dispatcher.Channel"). As such, it IS NOT
// GOROUTINE-SAFE. If you use this outside of dispatcher.Channel, you must
// synchronize all access to the methods of Buffer.
package buffer
import (
// These errors can be returned from AddNoBlock.
var (
ErrBufferFull = errors.New("buffer is full")
ErrItemTooLarge = errors.New("item exceeds buffer's BatchSizeMax")
ErrItemTooSmall = errors.New("item has zero or negative size, and BatchSizeMax is set")
// Buffer batches individual data items into Batch objects.
// All access to the Buffer (as well as invoking ACK/NACK on LeasedBatches) must
// be synchronized because Buffer is not goroutine-safe.
type Buffer struct {
// User-provided Options block; dictates all policy for this Buffer.
opts Options
// A moving average of the number of batch items so far; Helps reduce the
// number of spurious re-allocations if the average number of batch items is
// roughly (within 20%) of a constant amount.
batchItemsGuess *movingAverage
// Current statistics for this Buffer. Can be read with the Stats() method,
// and is kept up to date by various functions in the Buffer.
stats Stats
// Currently-accumulating batch; Data added to the Buffer will extend this
// batch.
// NOTE: It is possible for this to be nil; if AddNoBlock fills this Batch up
// until Batch.canAccept returns false, it will be removed and pushed into
// `unleased`.
currentBatch *Batch
// Contains all of the cut, but not currently leased, Batches.
// Kept ordered by (nextSend, id), so the 0th element is always the
// next-batch-to-be-sent.
unleased batchHeap
// Contains all of the live leased batches.
liveLeases map[*Batch]struct{}
// Tracks the liveLeases which haven't been ack'd yet.
// Batches can be removed from liveLeases without being removed from
// unAckedLeases when the Batch is dropped from the Buffer due to the
// FullBehavior policy.
unAckedLeases map[*Batch]struct{}
// The `id` of the last Batch we cut. If currentBatch is non-nil, this is
// ``.
// NOTE: 0 is not a valid
lastBatchID uint64
// NewBuffer returns a new Buffer configured with the given Options.
// If there's an issue with the provided Options, this returns an error.
func NewBuffer(o *Options) (*Buffer, error) {
if o == nil {
o = &Options{}
ret := &Buffer{opts: *o} // copy o before normalizing it
if err := ret.opts.normalize(); err != nil {
return nil, errors.Annotate(err, "normalizing buffer.Options").Err()
ret.unleased.onlyID = o.FIFO
ret.batchItemsGuess = newMovingAverage(10, ret.opts.batchItemsGuess())
ret.liveLeases = map[*Batch]struct{}{}
ret.unAckedLeases = map[*Batch]struct{}{}
return ret, nil
// Returns the oldest *Batch from liveLeases.
func (buf *Buffer) oldestLiveLease() (oldest *Batch) {
for lb := range buf.liveLeases {
if oldest == nil || < {
oldest = lb
// dropOldest will drop the oldest un-dropped batch.
// Returns the dropped Batch.
func (buf *Buffer) dropOldest() (dropped *Batch) {
unleased, unleasedIdx := buf.unleased.Oldest()
leased := buf.oldestLiveLease()
switch {
case unleased != nil && (leased == nil || <
buf.stats.del(unleased, categoryUnleased)
return unleased
case leased != nil:
delete(buf.liveLeases, leased), categoryLeased, categoryDropped)
return leased
// At this point, the only thing left to drop is the currentBatch.
current := buf.currentBatch
if current == nil {
"impossible; must drop Batch, but there's NO undropped data"))
buf.currentBatch = nil
buf.stats.del(current, categoryUnleased)
return current
// AddNoBlock adds the item to the Buffer with the given size.
// Possibly drops a batch according to FullBehavior.
// Returns an error under the following conditions:
// - ErrBufferFull - If FullBehavior.ComputeState returns okToInsert=false.
// - ErrItemTooLarge - If this buffer has a BatchSizeMax configured and
// `itemSize` is larger than this.
// - ErrItemTooSmall - If this buffer has a BatchSizeMax configured and
// `itemSize` is zero, or if `itemSize` is negative.
func (buf *Buffer) AddNoBlock(now time.Time, item any, itemSize int) (dropped *Batch, err error) {
if err = buf.opts.checkItemSize(itemSize); err != nil {
okToInsert, dropBatch := buf.opts.FullBehavior.ComputeState(buf.stats)
if !okToInsert {
return nil, ErrBufferFull
// All error checks done above before we modify any state in buf.
if dropBatch {
dropped = buf.dropOldest()
if !buf.currentBatch.canAccept(&buf.opts, itemSize) {
if buf.currentBatch == nil {
buf.currentBatch = &Batch{
// Try to minimize allocations by allocating 20% more slots in Data than
// the moving average for the last 10 batches' actual use.
Data: make([]BatchItem, 0, int(buf.batchItemsGuess.get()*1.2)),
id: buf.lastBatchID + 1,
retry: buf.opts.Retry(),
nextSend: now.Add(buf.opts.BatchAgeMax),
buf.currentBatch.Data = append(buf.currentBatch.Data, BatchItem{item, itemSize})
buf.currentBatch.countedSize += itemSize
// If the currentBatch couldn't even accept the smallest size item, go ahead
// and Flush it now.
if !buf.currentBatch.canAccept(&buf.opts, 1) {
// Flush causes any buffered-but-not-batched data to be immediately cut into
// a Batch.
// No-op if there's no such data.
func (buf *Buffer) Flush(now time.Time) {
batch := buf.currentBatch
if batch == nil {
batch.nextSend = now // immediately make available to send
buf.currentBatch = nil
// NextSendTime returns the send time for the next-most-available-to-send Batch,
// or a Zero time.Time if no batches are available to send.
// NOTE: Because LeaseOne enforces MaxLeases, this time may not guarantee an
// available lease.
func (buf *Buffer) NextSendTime() time.Time {
ret := time.Time{}
if next := buf.unleased.Peek(); next != nil {
ret = next.nextSend
if buf.currentBatch != nil {
if ns := buf.currentBatch.nextSend; ret.IsZero() || ns.Before(ret) {
ret = ns
return ret
// Stats returns information about the Buffer's state.
func (buf *Buffer) Stats() Stats {
return buf.stats
// CanAddItem returns true iff the Buffer will accept an item from AddNoBlock
// without returning ErrBufferFull.
func (buf *Buffer) CanAddItem() bool {
okToInsert, _ := buf.opts.FullBehavior.ComputeState(buf.stats)
return okToInsert
// LeaseOne returns the most-available-to-send Batch from this Buffer.
// The caller must invoke one of ACK/NACK on the Batch. The Batch will count
// against this Buffer's Stats().Total() until the caller does so.
// Returns nil if no batch is available to lease, or if the Buffer has reached
// MaxLeases.
func (buf *Buffer) LeaseOne(now time.Time) (leased *Batch) {
cur, next := buf.currentBatch, buf.unleased.Peek()
switch {
case len(buf.unAckedLeases) == int(buf.opts.MaxLeases):
// too many outstanding leases
case next != nil && !now.Before(next.nextSend):
// next unleased batch is fine to use
case cur != nil && !now.Before(cur.nextSend):
// currentBatch has data we can send
// Nothing's ready.
return buf.forceLeaseOne()
// leases the next available batch, regardless of its marked nextSend time.
func (buf *Buffer) forceLeaseOne() (leased *Batch) {
if buf.unleased.Len() == 0 {
return nil
leased = buf.unleased.PopBatch()
buf.unAckedLeases[leased] = struct{}{}
buf.liveLeases[leased] = struct{}{}, categoryUnleased, categoryLeased)
// ForceLeaseAll leases and returns all unleased Batches immediately, regardless
// of their send times.
// This is useful for cancelation scenarios where you no longer want to do
// full processing on the remaining batches.
// NOTE: It's helpful to call Flush before ForceLeaseAll to include the
// currently buffered, but unbatched, data.
func (buf *Buffer) ForceLeaseAll() []*Batch {
if buf.unleased.Len() == 0 {
return nil
ret := make([]*Batch, 0, buf.unleased.Len())
for buf.unleased.Len() > 0 {
ret = append(ret, buf.forceLeaseOne())
return ret
// ACK records that all the items in the batch have been processed.
// The Batch is no longer tracked in any form by the Buffer.
// Calling ACK/NACK on the same Batch twice will panic.
// Calling ACK/NACK on a Batch not returned from LeaseOne will panic.
func (buf *Buffer) ACK(leased *Batch) {
func (buf *Buffer) removeLease(leased *Batch) (live bool) {
if leased == nil {
// Nil batch. Bail out.
if _, known := buf.unAckedLeases[leased]; !known {
panic(errors.New("unknown *Batch; are you ACK/NACK'ing multiple times?"))
delete(buf.unAckedLeases, leased)
if _, live = buf.liveLeases[leased]; live {
delete(buf.liveLeases, leased)
buf.stats.del(leased, categoryLeased)
} else {
buf.stats.del(leased, categoryDropped)
// NACK analyzes the current state of Batch.Data, potentially reducing
// UnleasedItemCount in the Buffer's if the given Batch.Data length is
// smaller than when the Batch was originally leased.
// The Batch will be re-enqueued unless:
// - The Batch's retry Iterator returns retry.Stop
// - The Batch has been dropped already due to FullBehavior policy. If this
// is the case, AddNoBlock would already have returned this *Batch pointer
// to you.
// Calling ACK/NACK on the same Batch twice will panic.
// Calling ACK/NACK on a Batch not returned from LeaseOne will panic.
func (buf *Buffer) NACK(ctx context.Context, err error, leased *Batch) {
if live := buf.removeLease(leased); !live {
// TODO(iannucci): decouple retry from context (pass in 'now' instead)
switch toWait := leased.retry.Next(ctx, err); {
case toWait == retry.Stop:
leased.nextSend = clock.Now(ctx).Add(toWait)
leased.countedItems = intMin(len(leased.Data), leased.countedItems)
var newSize int
for i := range leased.Data {
newSize += leased.Data[i].Size
leased.countedSize = intMin(newSize, leased.countedSize)
buf.stats.add(leased, categoryUnleased)
func intMin(a, b int) int {
if a < b {
return a
return b
func intMax(a, b int) int {
if a > b {
return a
return b