blob: 9f04e6ee2290b8fbb0dd8b11ba42b47855334a02 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package bundler
import (
var (
// dataBufferSize is the size (in bytes) of the Data objects that a Stream
// will lease.
dataBufferSize = 4096
// Stream is an individual Bundler Stream. Data is added to the Stream as a
// series of ordered binary chunks.
// A Stream is not goroutine-safe.
type Stream interface {
// LeaseData allocates and returns a Data block that stream data can be
// loaded into. The caller should Release() the Data, or transfer ownership to
// something that will (e.g., Append()).
// If the leased data is not Released, it is merely inefficient, not fatal.
LeaseData() Data
// Append adds a sequential chunk of data to the Stream. Append may block if
// the data isn't ready to be consumed.
// Append takes ownership of the data regardless of whether or not it returns
// an error. The supplied Data must not be referenced after calling Append.
Append(Data) error
// Close closes the Stream, flushing any remaining data.
// streamConfig is the set of static configuration parameters for the stream.
type streamConfig struct {
// name is the name of this stream.
name string
// parser is the stream parser to use.
parser parser
// maximumBufferedBytes is the maximum number of bytes that this stream will
// retain in its parser before blocking subsequent Append attempts.
maximumBufferedBytes int64
// maximumBufferDuration is the maximum amount of time that a block of data
// can be comfortably buffered in the stream.
maximumBufferDuration time.Duration
// template is the minimally-populated Butler log bundle entry.
template logpb.ButlerLogBundle_Entry
// onAppend, if not nil, is invoked when an attempt to append data to the
// stream occurs. If true is passed, the data was successfully appended. If
// false was passed, the data could not be appended immediately and the stream
// will block pending data consumption.
// The stream's append lock will be held when this method is called.
onAppend func(bool)
// streamImpl is a Stream implementation that is bound to a Bundler.
type streamImpl struct {
c *streamConfig
// drained is true if the stream is finished emitting data, including its
// terminal state.
// It is an atomic value, with zero indicating not drained and non-zero
// indicating drained. It should be accessed via isDrained, and set with
// setDrained.
drained int32
// parserLock is a Mutex protecting the stream's parser instance and its
// underlying chunk.Buffer. Any access to either of these fields must be done
// while holding this lock.
parserLock sync.Mutex
// dataConsumedSignalC is a channel that can be used to signal when data has
// been consumed. It is set via signalDataConsumed.
dataConsumedSignalC chan struct{}
// stateLock protects stream state against concurrent access.
stateLock sync.Mutex
// closed, if non-zero, indicates that we have been closed and our stream has
// finished reading.
// stateLock must be held when accessing this field.
closed bool
// lastLogEntry is a pointer to the last LogEntry that was exported.
// stateLock must be held when accessing this field.
lastLogEntry *logpb.LogEntry
// appendErr is the error that should be returned by Append. It is set when
// stream content processing hits a fatal state.
appendErr error
func newStream(c streamConfig) *streamImpl {
return &streamImpl{
c: &c,
dataConsumedSignalC: make(chan struct{}, 1),
func (s *streamImpl) LeaseData() Data {
return globalDataPoolRegistry.getPool(dataBufferSize).getData()
func (s *streamImpl) Append(d Data) error {
// Block/loop until we've successfully appended the data.
for {
dLen := int64(len(d.Bytes()))
if err := s.appendError(); err != nil || dLen == 0 {
return err
s.withParserLock(func() error {
if s.c.parser.bufferedBytes() == 0 ||
s.c.parser.bufferedBytes()+dLen <= s.c.maximumBufferedBytes {
d = nil
return nil
// The data was appended; we're done.
if s.c.onAppend != nil {
s.c.onAppend(d == nil)
if d == nil {
// Not ready to append; wait for a data event and re-evaluate.
if d != nil {
return nil
// Signals our Append loop that data has been consumed.
func (s *streamImpl) signalDataConsumed() {
select {
case s.dataConsumedSignalC <- struct{}{}:
func (s *streamImpl) Close() {
defer s.stateLock.Unlock()
func (s *streamImpl) closeLocked() {
s.closed = true
if s.c.onAppend != nil {
// If anyone is listening, notify that our state has changed; it doesn't
// actually matter WHEN this state notification happens, just that it
// happens after closed=true.
// The current implementation of Bundler has this as b.signalStreamUpdate(),
// which is synchronized with Bundler.streamsLock so doing this without
// a goroutine can lead to deadlock.
go s.c.onAppend(true)
func (s *streamImpl) name() string {
// isDrained returns true if this stream is finished emitting data, including
// its terminal state.
// This can happen if either:
// - The stream is closed and has no more buffered data, or
// - The stream has encountered a fatal error during processing.
func (s *streamImpl) isDrained() bool {
return atomic.LoadInt32(&s.drained) != 0
// setDrained marks this stream as drained.
func (s *streamImpl) setDrained() {
atomic.StoreInt32(&s.drained, 1)
// noMoreDataLocked returns true if our stream has been closed and its buffer
// is empty.
// The stream's stateLock must be held when calling this method.
func (s *streamImpl) noMoreDataLocked() bool {
if !s.closed {
return false
// If we have an append error, we will no longer accept or consume data.
if s.appendErr != nil {
return true
var bufSize int64
s.withParserLock(func() error {
bufSize = s.c.parser.bufferedBytes()
return nil
return bufSize == 0
// expireTime returns the Time when the oldest chunk in the stream will expire.
// This is calculated ask:
// oldest.Timestamp + stream.maximumBufferDuration
// If there is no buffered data, oldest will return nil.
func (s *streamImpl) expireTime() (t time.Time, has bool) {
s.withParserLock(func() error {
t, has = s.c.parser.firstChunkTime()
return nil
if has {
t = t.Add(s.c.maximumBufferDuration)
// nextBundleEntry generates bundles for this stream. The total bundle data size
// must not exceed the supplied size.
// If no bundle entry could be generated given the constraints, nil will be
// returned.
// It is possible for some entries to be returned alongside an error.
func (s *streamImpl) nextBundleEntry(bb *builder, aggressive bool) bool {
defer s.stateLock.Unlock()
// If we're not drained, try and get the next bundle.
modified := false
if !s.noMoreDataLocked() {
err := error(nil)
modified, err = s.nextBundleEntryLocked(bb, aggressive)
if err != nil {
if modified {
// If we're drained, populate our terminal state.
if s.noMoreDataLocked() {
if s.lastLogEntry != nil {
bb.setStreamTerminal(&s.c.template, s.lastLogEntry.StreamIndex)
return modified
func (s *streamImpl) nextBundleEntryLocked(bb *builder, aggressive bool) (bool, error) {
c := constraints{
allowSplit: aggressive,
closed: s.closed,
// Extract as many entries as possible from the stream. As we extract, adjust
// our byte size.
// If we're closed, this will continue to consume until finished. If an error
// occurs, shut down data collection.
modified := false
for c.limit = bb.remaining(); c.limit > 0; c.limit = bb.remaining() {
emittedLog := false
err := s.withParserLock(func() error {
le, err := s.c.parser.nextEntry(&c)
if err != nil {
return err
if le == nil {
return nil
// Enforce basic log entry consistency.
if err := s.fixupLogEntry(s.lastLogEntry, le); err != nil {
return err
emittedLog = true
modified = true
bb.add(&s.c.template, le)
s.lastLogEntry = le
return nil
if err != nil || !emittedLog {
return modified, err
return modified, nil
// fixupLogEntry asserts and corrects a log entry's stream offset and ordering
// given the previous entry in the stream.
// If prev is nil, that means that cur is expected to be the first log entry
// in the stream.
func (s *streamImpl) fixupLogEntry(prev, cur *logpb.LogEntry) error {
if prev == nil {
if cur.StreamIndex != 0 {
return fmt.Errorf("first log entry is not zero index (%d)", cur.StreamIndex)
} else {
if cur.StreamIndex != prev.StreamIndex+1 {
return fmt.Errorf("non-contiguous stream indices (%d != %d)", cur.StreamIndex, prev.StreamIndex+1)
if cur.TimeOffset.AsDuration() < prev.TimeOffset.AsDuration() {
to := *prev.TimeOffset
cur.TimeOffset = &to
return nil
func (s *streamImpl) withParserLock(f func() error) error {
defer s.parserLock.Unlock()
return f()
func (s *streamImpl) appendError() error {
defer s.stateLock.Unlock()
return s.appendErr
func (s *streamImpl) setAppendErrorLocked(err error) {
s.appendErr = err
func (s *streamImpl) streamDesc() *logpb.LogStreamDescriptor {
return s.c.template.Desc