blob: b6e685be9fc9de090945dfcfe2edd8d57a82fa29 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package bundler
import (
// Config is the Bundler configuration.
type Config struct {
// Clock is the clock instance that will be used for Bundler and stream
// timing.
Clock clock.Clock
// MaxBufferedBytes is the maximum number of bytes to buffer in memory per
// stream.
MaxBufferedBytes int64
// MaxBundleSize is the maximum bundle size in bytes that may be generated.
// If this value is zero, no size constraint will be applied to generated
// bundles.
MaxBundleSize int
// MaxBufferDelay is the maximum amount of time we're willing to buffer
// bundled data. Other factors can cause the bundle to be sent before this,
// but it is an upper bound.
MaxBufferDelay time.Duration
type bundlerStream interface {
isDrained() bool
name() string
expireTime() (time.Time, bool)
nextBundleEntry(*builder, bool) bool
streamDesc() *logpb.LogStreamDescriptor
// Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for
// stream registration and bundle consumption.
type Bundler struct {
c *Config
// finishedC is closed when makeBundles goroutine has terminated.
finishedC chan struct{}
bundleC chan *logpb.ButlerLogBundle
// streamsLock is a lock around the `streams` map and its contents. You must
// also hold this lock in order to push into streamsNotify.
streamsLock sync.Mutex
// streamsNotify has a buffer size of 1 and acts as a select-able semaphore.
streamsNotify chan struct{}
// streams is the set of currently-registered Streams.
streams map[string]bundlerStream
// flushing is true if we're blocking on CloseAndFlush().
flushing bool
// prefixCounter is a global counter for Prefix-wide streams.
prefixCounter counter
// New instantiates a new Bundler instance.
func New(c Config) *Bundler {
b := Bundler{
c: &c,
finishedC: make(chan struct{}),
bundleC: make(chan *logpb.ButlerLogBundle),
streams: map[string]bundlerStream{},
streamsNotify: make(chan struct{}, 1),
go b.makeBundles()
return &b
// Register adds a new stream to the Bundler, returning a reference to the
// registered stream.
// The Bundler takes ownership of the supplied Properties, and may modify them
// as needed.
func (b *Bundler) Register(d *logpb.LogStreamDescriptor) (Stream, error) {
// Our Properties must validate.
if err := d.Validate(false); err != nil {
return nil, err
// Enforce that the log stream descriptor's Prefix is empty.
d.Prefix = ""
// Construct a parser for this stream.
c := streamConfig{
name: d.Name,
template: logpb.ButlerLogBundle_Entry{
Desc: d,
maximumBufferDuration: b.c.MaxBufferDelay,
maximumBufferedBytes: b.c.MaxBufferedBytes,
onAppend: func(appended bool) {
if appended {
err := error(nil)
c.parser, err = newParser(d, &b.prefixCounter)
if err != nil {
return nil, fmt.Errorf("failed to create stream parser: %s", err)
defer b.streamsLock.Unlock()
// Ensure that this is not a duplicate stream name.
if s := b.streams[d.Name]; s != nil {
return nil, fmt.Errorf("a Stream is already registered for %q", d.Name)
// Create a new stream. This will kick off its processing goroutine, which
// will not stop until it is closed.
s := newStream(c)
return s, nil
// GetStreamDescs returns the set of registered stream names mapped to their
// descriptors.
// This is intended for testing purposes. DO NOT modify the resulting
// descriptors.
func (b *Bundler) GetStreamDescs() map[string]*logpb.LogStreamDescriptor {
defer b.streamsLock.Unlock()
if len(b.streams) == 0 {
return nil
streams := make(map[string]*logpb.LogStreamDescriptor, len(b.streams))
for k, s := range b.streams {
streams[k] = s.streamDesc()
return streams
// CloseAndFlush closes the Bundler, alerting it that no more streams will be
// added and that existing data may be aggressively output.
// CloseAndFlush will block until all buffered data has been consumed.
func (b *Bundler) CloseAndFlush() {
// Mark that we're flushing. This will cause us to perform more aggressive
// bundling in Next().
// Next returns the next bundle, blocking until it is available.
func (b *Bundler) Next() *logpb.ButlerLogBundle {
return <-b.bundleC
func (b *Bundler) startFlushing() {
defer b.streamsLock.Unlock()
if !b.flushing {
b.flushing = true
// makeBundles is run in its own goroutine. It runs continuously, responding
// to Stream constraints and availability and sending ButlerLogBundles through
// bundleC when available.
// makeBundles will terminate when closeC is closed and all streams are drained.
func (b *Bundler) makeBundles() {
defer close(b.finishedC)
defer close(b.bundleC)
defer b.streamsLock.Unlock()
var bb *builder
defer func() {
if bb != nil && bb.hasContent() {
b.bundleC <- bb.bundle()
for {
bb = &builder{
size: b.c.MaxBundleSize,
template: logpb.ButlerLogBundle{
Timestamp: timestamppb.New(b.getClock().Now()),
var oldestContentTime time.Time
for {
state := b.getStreamStateLocked()
// Attempt to create more bundles.
sendNow := b.bundleRoundLocked(bb, state)
// Prune and unregister any drained streams.
state.forEachStream(func(s bundlerStream) bool {
if s.isDrained() {
return true
if b.flushing && len(b.streams) == 0 {
// We're flushing, and there are no more registered streams, so we're
// completely finished.
// If we have any content in our builder, it will be exported via defer.
// If we have content, consider emitting this bundle.
if bb.hasContent() && (b.c.MaxBufferDelay == 0 || sendNow || bb.ready()) {
// Mark the first time this round where we actually saw data.
if oldestContentTime.IsZero() && bb.hasContent() {
oldestContentTime =
// We will yield our stream lock and sleep, waiting for either:
// 1) The earliest expiration time.
// 2) A streams channel signal.
// We use a Cond here because we want Streams to be able to be added
// while we're waiting for stream data.
nextExpire, has := state.nextExpire()
// If we have an oldest content time, that also means that we have
// content. Factor this constraint in.
if !oldestContentTime.IsZero() {
roundExpire := oldestContentTime.Add(b.c.MaxBufferDelay)
if !roundExpire.After( {
if !has || roundExpire.Before(nextExpire) {
nextExpire = roundExpire
has = true
// If we had no data or expire constraints, wait indefinitely for
// something to change.
// This will release our state lock during switch execution. The lock will
// be held after the switch statement has finished.
switch {
case has && nextExpire.After(
// No immediate data, so block until the next known data expiration
// time.
cctx, cancel := context.WithDeadline(context.Background(), nextExpire)
select {
case <-b.streamsNotify:
case <-cctx.Done():
case has:
// There is more data, and it has already expired, so go immediately.
// No data, and no enqueued stream data, so block indefinitely until we
// get a signal.
// If our bundler has contents, send them.
if bb.hasContent() {
b.bundleC <- bb.bundle()
// Implements a single bundle building round. This incrementally adds data from
// the stream state to the supplied builder.
// This method will block until a suitable bundle is available. Availability
// is subject both to time and data constraints:
// - If buffered data, which is timestampped at ingest, has exceeded its
// buffer duration threshold, a Bundle will be cut immediately.
// - If no data is set to expire, the Bundler may wait for more data to
// produce a more optimally-packed bundle.
// At a high level, Next operates as follows:
// 1. Freeze all stream state.
// 2. Scan streams for data that has exceeded its threshold; if data is found:
// - Aggressively pack expired data into a Bundle until the stream is
// drained (which will be unregistered later) or can't generate a new
// bundle entry with the current data in the stream buffer (e.g. only
// partial size header exists in buffer). This will allow more data
// coming in when the stream is revisisted in the next bundle round.
// - Optimally pack the remainder of the Bundle with any available data.
// - Return the Bundle.
// 3. Examine the remaining data sizes, waiting for either:
// - Enough stream data to fill our Bundle.
// - Our timeout, if the Bundler is not closed.
// 4. Pack a Bundle with the remaining data optimally, emphasizing streams
// with older data.
// Returns true if bundle some data was added that should be sent immediately.
func (b *Bundler) bundleRoundLocked(bb *builder, state *streamState) bool {
sendNow := false
// First pass: non-blocking data that has exceeded its storage threshold.
for bb.remaining() > 0 {
s :=
if s == nil || s.isDrained() {
if et, has := s.expireTime(); !has || et.After( {
// This stream (and all other streams, since we're sorted) expires in
// the future, so we're done with the first pass.
// Pull bundles from this stream.
if modified := s.nextBundleEntry(bb, true); modified {
// We have at least one time-sensitive bundle, so send this round.
sendNow = true
} else {
// Remove the stream from current stream snapshot, the stream will be
// skipped in this round to allow more data coming in.
if s.isDrained() {
// Second pass: bundle any available data.
state.forEachStream(func(s bundlerStream) bool {
if bb.remaining() == 0 {
return false
if modified := s.nextBundleEntry(bb, b.flushing); modified {
return true
return sendNow
func (b *Bundler) getStreamStateLocked() *streamState {
// Lock and collect each stream.
state := &streamState{
streams: make([]bundlerStream, 0, len(b.streams)),
now: b.getClock().Now(),
for _, s := range b.streams {
state.streams = append(state.streams, s)
return state
func (b *Bundler) registerStreamLocked(s bundlerStream) {
b.streams[] = s
func (b *Bundler) unregisterStreamLocked(s bundlerStream) {
func (b *Bundler) signalStreamUpdate() {
defer b.streamsLock.Unlock()
func (b *Bundler) signalStreamUpdateLocked() {
select {
case b.streamsNotify <- struct{}{}:
func (b *Bundler) getClock() clock.Clock {
c := b.c.Clock
if c != nil {
return c
return clock.GetSystemClock()
// streamState is a snapshot of the current stream registration. All operations
// performed on the state require streamLock to be held.
// streamState implements heap.Interface for its streams array. Streams without
// data times (nil) are considered to be greater than those with times.
type streamState struct {
streams []bundlerStream
now time.Time
var _ heap.Interface = (*streamState)(nil)
func (s *streamState) next() bundlerStream {
if len(s.streams) == 0 {
return nil
return s.streams[0]
func (s *streamState) nextExpire() (time.Time, bool) {
if next :=; next != nil {
if ts, ok := next.expireTime(); ok {
return ts, true
return time.Time{}, false
func (s *streamState) streamUpdated(name string) {
if si, idx := s.streamIndex(name); si != nil {
heap.Fix(s, idx)
func (s *streamState) forEachStream(f func(bundlerStream) bool) {
// Clone our streams, since the callback may mutate their order.
streams := make([]bundlerStream, len(s.streams))
for i, s := range s.streams {
streams[i] = s
for _, s := range streams {
if !f(s) {
// removeStream removes a stream from the stream state.
func (s *streamState) removeStream(name string) bundlerStream {
if si, idx := s.streamIndex(name); si != nil {
heap.Remove(s, idx)
return si
return nil
func (s *streamState) streamIndex(name string) (bundlerStream, int) {
for i, si := range s.streams {
if == name {
return si, i
return nil, -1
func (s *streamState) Len() int {
return len(s.streams)
func (s *streamState) Less(i, j int) bool {
si, sj := s.streams[i], s.streams[j]
if it, ok := si.expireTime(); ok {
if jt, ok := sj.expireTime(); ok {
return it.Before(jt)
// i has data, but j does not, so i < j.
return true
// i has no data, so i us greater than all other streams.
return false
func (s *streamState) Swap(i, j int) {
s.streams[i], s.streams[j] = s.streams[j], s.streams[i]
func (s *streamState) Push(x any) {
s.streams = append(s.streams, x.(bundlerStream))
func (s *streamState) Pop() any {
last := s.streams[len(s.streams)-1]
s.streams = s.streams[:len(s.streams)-1]
return last