package dispatcher
import (
// ErrorFn is called to handle the error from SendFn.
// This is also invoked with buffer.ErrItemTooLarge if your supplied
// ItemSizeFunc returns a size larger than Buffer.BatchSizeMax (i.e. you pushed
// an item which couldn't fit inside of a Batch). Similarly, if your
// ItemSizeFunc returns <=0, this is invoked with buffer.ErrItemTooSmall.
// Channel ignores the `retry` return value of this function in these cases.
// It executes in the main handler loop of the dispatcher so it can make
// synchronous decisions about the dispatcher state.
// Blocking in this function will block ALL dispatcher actions, so be quick
// :).
// likely cause deadlocks.
// This may:
// - inspect/log the error
// - manipulate the contents of failedBatch
// - return a boolean of whether this Batch should be retried or not. If
// this is false then the Batch is dropped. If it's true, then it will be
// re-queued as-is for transmission according to BufferFullBehavior.
// - pass the Batch.Data to another goroutine (in a non-blocking way!) to be
// re-queued through Channel.WriteChan.
// Args:
// - failedBatch - The Batch for which SendFn produced a non-nil error.
// - err - The error SendFn produced.
// Returns true iff the dispatcher should re-try sending this Batch, according
// to Buffer.Retry.
type ErrorFn func(failedBatch *buffer.Batch, err error) (retry bool)
// Options is the configuration options for NewChannel.
type Options struct {
// [OPTIONAL] The ErrorFn to use (see ErrorFn docs for details).
// Default: Logs the error (at Info for retryable errors, and Error for
// non-retryable errors) and returns true on a transient error.
ErrorFn ErrorFn
// [OPTIONAL] Called with the dropped batch any time the Channel drops a batch.
// This includes:
// * When FullBehavior==DropOldestBatch and we get new data.
// * When FullBehavior==DropOldestBatch and we attempt to retry old data.
// * When ErrorFn returns false for a batch.
// It executes in the main handler loop of the dispatcher so it can make
// synchronous decisions about the dispatcher state.
// Blocking in this function will block ALL dispatcher actions, so be quick
// :).
// likely cause deadlocks.
// When the channel is fully drained, this will be invoked exactly once with
// `(nil, true)`. This will occur immediately before the DrainedFn is called.
// Some drop functions buffer their information, and this gives them an
// opportunity to flush out any buffered data.
// Default: logs (at Info level if FullBehavior==DropOldestBatch, or Warning
// level otherwise) the number of data items in the Batch being dropped.
DropFn func(b *buffer.Batch, flush bool)
// [OPTIONAL] Called exactly once when the associated Channel is closed and
// has fully drained its buffer, but before DrainC is closed.
// Note that this takes effect whether the Channel is shut down via Context
// cancellation or explicitly by closing Channel.C.
// This is useful for performing final state synchronization tasks/metrics
// finalization/helpful "everything is done!" messages/etc. without having to
// poll the Channel to see if it's done and also maintain external
// synchronization around the finalization action.
// Called in the main handler loop, but it's called after all other work is
// done by the Channel, so the only thing it blocks is the closure of DrainC.
// Default: No action.
DrainedFn func()
// [OPTIONAL] A rate limiter for how frequently this will invoke SendFn.
// Default: No limit.
QPSLimit *rate.Limiter
// [OPTIONAL] The minimal frequency of invoking SendFn.
// If greater than zero, this Channel will invoke SendFn at least this often.
// If there's a period of time longer than this with no work items, Channel
// will invoke SendFn with a nil batch.
// Errors returned from these nil SendFn invocations are still processed
// normally, and nil batches still count against QPSLimit.
// It is an error to specify a MinQPS value which is
// * greater than QPSLimit.Limit(),
// * or is rate.Inf.
// Default: No minimum QPS, no nil batches will be sent.
MinQPS rate.Limit
// Should return the size of the given buffer item (i.e. what you push into
// Channel.C) in whatever units you like (see Buffer.BatchSizeMax).
// The function will only ever be called once per pushed item.
// Must be non-nil if Buffer.BatchSizeMax is specified.
// Must return a positive value less than Buffer.BatchSizeMax. Failure to do
// so will cause `itm` to be immediately rejected from the dispatcher.Channel
// and routed to ErrorFn with no further processing.
ItemSizeFunc func(itm any) int
Buffer buffer.Options
// Debug output for tests.
testingDbg func(string, ...any)
func defaultDropFnFactory(ctx context.Context, fullBehavior buffer.FullBehavior) func(*buffer.Batch, bool) {
return func(dropped *buffer.Batch, flush bool) {
if flush {
logFn := logging.Warningf
if _, ok := fullBehavior.(*buffer.DropOldestBatch); ok {
logFn = logging.Infof
"dropping Batch(len(Data): %d, Meta: %+v)",
len(dropped.Data), dropped.Meta)
func defaultErrorFnFactory(ctx context.Context) ErrorFn {
return func(failedBatch *buffer.Batch, err error) (retry bool) {
retry = transient.Tag.In(err)
logFn := logging.Errorf
if retry {
logFn = logging.Infof
"failed to send Batch(len(Data): %d, Meta: %+v): %s",
len(failedBatch.Data), failedBatch.Meta, err)
// ErrorFnQuiet is an implementation of Options.ErrorFn which doesn't log the
// batch, but does check for `transient.Tag` to determine `retry`.
func ErrorFnQuiet(b *buffer.Batch, err error) (retry bool) {
return transient.Tag.In(err)
// ErrorFnReport is an implementation of Options.ErrorFn which sends all errors
// to a buffered channel. The channel MUST be drained as quickly as possible.
// Otherwise, it may block all dispatcher actions.
// If `inner` error function is provided, it is used to determine `retry`.
// Otherwise, `retry` is always false.
func ErrorFnReport(bufferSize int, inner ErrorFn) (ErrorFn, <-chan error) {
errCh := make(chan error, bufferSize)
return func(b *buffer.Batch, err error) bool {
errCh <- err
if inner != nil {
return inner(b, err)
return false
}, errCh
// DropFnQuiet is an implementation of Options.DropFn which drops batches
// without logging anything.
func DropFnQuiet(*buffer.Batch, bool) {}
// DropFnSummarized returns an implementation of Options.DropFn which counts the
// number of dropped batches, and only reports it at the rate provided.
// Unlike the default log function, this only logs the number of dropped items
// and the duration that they were collected over.
func DropFnSummarized(ctx context.Context, lim *rate.Limiter) func(*buffer.Batch, bool) {
durationStart := clock.Now(ctx)
dropCount := 0
return func(b *buffer.Batch, flush bool) {
dataLen := 0
if b != nil {
dataLen = len(b.Data)
if lim.Allow() || flush {
now := clock.Now(ctx)
ctx, "dropped %d items over %s", dropCount+dataLen, now.Sub(durationStart))
durationStart = now
dropCount = 0
} else {
dropCount += dataLen