blob: 8d89250d3e4836267ae00152a3be586996cd2e0b [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dispatcher
import (
"context"
"golang.org/x/time/rate"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/dispatcher/buffer"
)
// ErrorFn is called to handle the error from SendFn.
//
// This is also invoked with buffer.ErrItemTooLarge if your supplied
// ItemSizeFunc returns a size larger than Buffer.BatchSizeMax (i.e. you pushed
// an item which couldn't fit inside of a Batch). Similarly, if your
// ItemSizeFunc returns <=0, this is invoked with buffer.ErrItemTooSmall.
// Channel ignores the `retry` return value of this function in these cases.
//
// It executes in the main handler loop of the dispatcher so it can make
// synchronous decisions about the dispatcher state.
//
// Blocking in this function will block ALL dispatcher actions, so be quick
// :).
//
// DO NOT WRITE TO THE CHANNEL DIRECTLY FROM THIS FUNCTION. Doing so will very
// likely cause deadlocks.
//
// This may:
// - inspect/log the error
// - manipulate the contents of failedBatch
// - return a boolean of whether this Batch should be retried or not. If
// this is false then the Batch is dropped. If it's true, then it will be
// re-queued as-is for transmission according to BufferFullBehavior.
// - pass the Batch.Data to another goroutine (in a non-blocking way!) to be
// re-queued through Channel.WriteChan.
//
// Args:
// - failedBatch - The Batch for which SendFn produced a non-nil error.
// - err - The error SendFn produced.
//
// Returns true iff the dispatcher should re-try sending this Batch, according
// to Buffer.Retry.
type ErrorFn func(failedBatch *buffer.Batch, err error) (retry bool)
// Options is the configuration options for NewChannel.
type Options struct {
// [OPTIONAL] The ErrorFn to use (see ErrorFn docs for details).
//
// Default: Logs the error (at Info for retryable errors, and Error for
// non-retryable errors) and returns true on a transient error.
ErrorFn ErrorFn
// [OPTIONAL] Called with the dropped batch any time the Channel drops a batch.
//
// This includes:
// * When FullBehavior==DropOldestBatch and we get new data.
// * When FullBehavior==DropOldestBatch and we attempt to retry old data.
// * When ErrorFn returns false for a batch.
//
// It executes in the main handler loop of the dispatcher so it can make
// synchronous decisions about the dispatcher state.
//
// Blocking in this function will block ALL dispatcher actions, so be quick
// :).
//
// DO NOT WRITE TO THE CHANNEL DIRECTLY FROM THIS FUNCTION. Doing so will very
// likely cause deadlocks.
//
// When the channel is fully drained, this will be invoked exactly once with
// `(nil, true)`. This will occur immediately before the DrainedFn is called.
// Some drop functions buffer their information, and this gives them an
// opportunity to flush out any buffered data.
//
// Default: logs (at Info level if FullBehavior==DropOldestBatch, or Warning
// level otherwise) the number of data items in the Batch being dropped.
DropFn func(b *buffer.Batch, flush bool)
// [OPTIONAL] Called exactly once when the associated Channel is closed and
// has fully drained its buffer, but before DrainC is closed.
//
// Note that this takes effect whether the Channel is shut down via Context
// cancellation or explicitly by closing Channel.C.
//
// This is useful for performing final state synchronization tasks/metrics
// finalization/helpful "everything is done!" messages/etc. without having to
// poll the Channel to see if it's done and also maintain external
// synchronization around the finalization action.
//
// Called in the main handler loop, but it's called after all other work is
// done by the Channel, so the only thing it blocks is the closure of DrainC.
//
// Default: No action.
DrainedFn func()
// [OPTIONAL] A rate limiter for how frequently this will invoke SendFn.
//
// Default: No limit.
QPSLimit *rate.Limiter
// [OPTIONAL] The minimal frequency of invoking SendFn.
//
// If greater than zero, this Channel will invoke SendFn at least this often.
// If there's a period of time longer than this with no work items, Channel
// will invoke SendFn with a nil batch.
//
// Errors returned from these nil SendFn invocations are still processed
// normally, and nil batches still count against QPSLimit.
//
// It is an error to specify a MinQPS value which is
// * greater than QPSLimit.Limit(),
// * or is rate.Inf.
//
// Default: No minimum QPS, no nil batches will be sent.
MinQPS rate.Limit
// [OPTIONAL]
// Should return the size of the given buffer item (i.e. what you push into
// Channel.C) in whatever units you like (see Buffer.BatchSizeMax).
//
// The function will only ever be called once per pushed item.
//
// [REQUIRED]
// Must be non-nil if Buffer.BatchSizeMax is specified.
//
// Must return a positive value less than Buffer.BatchSizeMax. Failure to do
// so will cause `itm` to be immediately rejected from the dispatcher.Channel
// and routed to ErrorFn with no further processing.
ItemSizeFunc func(itm any) int
Buffer buffer.Options
// Debug output for tests.
testingDbg func(string, ...any)
}
func defaultDropFnFactory(ctx context.Context, fullBehavior buffer.FullBehavior) func(*buffer.Batch, bool) {
return func(dropped *buffer.Batch, flush bool) {
if flush {
return
}
logFn := logging.Warningf
if _, ok := fullBehavior.(*buffer.DropOldestBatch); ok {
logFn = logging.Infof
}
logFn(
ctx,
"dropping Batch(len(Data): %d, Meta: %+v)",
len(dropped.Data), dropped.Meta)
}
}
func defaultErrorFnFactory(ctx context.Context) ErrorFn {
return func(failedBatch *buffer.Batch, err error) (retry bool) {
retry = transient.Tag.In(err)
logFn := logging.Errorf
if retry {
logFn = logging.Infof
}
logFn(
ctx,
"failed to send Batch(len(Data): %d, Meta: %+v): %s",
len(failedBatch.Data), failedBatch.Meta, err)
return
}
}
// ErrorFnQuiet is an implementation of Options.ErrorFn which doesn't log the
// batch, but does check for `transient.Tag` to determine `retry`.
func ErrorFnQuiet(b *buffer.Batch, err error) (retry bool) {
return transient.Tag.In(err)
}
// ErrorFnReport is an implementation of Options.ErrorFn which sends all errors
// to a buffered channel. The channel MUST be drained as quickly as possible.
// Otherwise, it may block all dispatcher actions.
//
// If `inner` error function is provided, it is used to determine `retry`.
// Otherwise, `retry` is always false.
func ErrorFnReport(bufferSize int, inner ErrorFn) (ErrorFn, <-chan error) {
errCh := make(chan error, bufferSize)
return func(b *buffer.Batch, err error) bool {
errCh <- err
if inner != nil {
return inner(b, err)
}
return false
}, errCh
}
// DropFnQuiet is an implementation of Options.DropFn which drops batches
// without logging anything.
func DropFnQuiet(*buffer.Batch, bool) {}
// DropFnSummarized returns an implementation of Options.DropFn which counts the
// number of dropped batches, and only reports it at the rate provided.
//
// Unlike the default log function, this only logs the number of dropped items
// and the duration that they were collected over.
func DropFnSummarized(ctx context.Context, lim *rate.Limiter) func(*buffer.Batch, bool) {
durationStart := clock.Now(ctx)
dropCount := 0
return func(b *buffer.Batch, flush bool) {
dataLen := 0
if b != nil {
dataLen = len(b.Data)
}
if lim.Allow() || flush {
now := clock.Now(ctx)
logging.Infof(
ctx, "dropped %d items over %s", dropCount+dataLen, now.Sub(durationStart))
durationStart = now
dropCount = 0
} else {
dropCount += dataLen
}
}
}