blob: df7d5ccc0b24719ad7b6e06c99c00de22588e5c1 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"time"
"go.chromium.org/luci/common/retry"
)
// BatchItem is just a container for the user-provided work items.
//
// This includes the Size of `Data` at the time the work item was added to
// the buffer. This will not be modified by Buffer, but can be adjusted
// by your application while handling a Batch if your handler needs to trim
// this somehow.
type BatchItem struct {
Item any
Size int
}
// Batch represents a collection of individual work items and associated
// metadata.
//
// Batches are are cut by the Channel according to Options.Buffer, and can be
// manipulated by ErrorFn and SendFn.
//
// ErrorFn and SendFn may manipulate the contents of the Batch (Data and Meta)
// to do things such as:
// - Associate a UID with the Batch (e.g. in the Meta field) to identify it to
// remote services for deduplication.
// - Remove already-processed items from Data in case the SendFn partially
// succeeded.
//
// The dispatcher accounts for the number of work items in the Batch as it
// leases the Batch out; initially the Batch's length will be len(Data). If the
// SendFn reduces the length of Data before the NACK, the accounted number of
// work items will be accordingly reduced. The accounted length can never grow
// (i.e. extending Data doesn't do anything).
//
// Similarly, if the buffer is configured with BatchSize, the accounted Size of
// the batch is defined as the sum of the cached sizes in Data. Reducing this
// amount (by removing items, or potentially reducing the Size in a BatchItem)
// will reduce the effective Size of this Batch, but adding to Data cannot
// increase the Size of the batch.
type Batch struct {
// Data is the work items pushed into the Buffer, plus their Size as provided
// to AddNoBlock.
Data []BatchItem
// Meta is an object which dispatcher.Channel will treat as totally opaque;
// You may manipulate it in SendFn or ErrorFn as you see fit. This can be used
// for e.g. associating a nonce with the Batch for retries, or stashing
// a constructed RPC proto, etc.
Meta any
// id is a 1-based counter which is generated by Buffer when the Batch
// is created. Within a Buffer it is monotonically increasing.
id uint64
// retry is the retry.Iterator associated with this Batch. Its Next method
// will be called when it is NACK'd.
retry retry.Iterator
// nextSend is the next timestamp after which this Batch is eligible for
// sending.
//
// While the batch is the `currentBatch` in the buffer, this timestamp
// represents the deadline for cutting this batch.
nextSend time.Time
// countedItems is the number of items in this Batch as the Buffer counts it.
// It starts as the original value of len(Batch.Data) and can decrease if
// len(Batch.Data) is smaller on a NACK().
countedItems int
// countedSize is the number of size units of this Batch as the Buffer counts
// it. It starts as the sum of the Size of Data, and can decrease if
// BatchItems in Data have their Size reduced or if items are removed
// from BatchItems.
countedSize int
}
func (b *Batch) canAccept(o *Options, itemSize int) bool {
switch {
case b == nil:
return false
case o.BatchItemsMax > -1 && b.countedItems+1 > o.BatchItemsMax:
return false
case o.BatchSizeMax > -1 && b.countedSize+itemSize > o.BatchSizeMax:
return false
}
return true
}