blob: 952a8f4825204043057b26756c1acde535a776dd [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package bundler
import (
// constraints is the set of Constraints to apply when generating a LogEntry.
type constraints struct {
// limit is the maximum size, in bytes, of the serialized LogEntry protobuf
// that may be produced.
limit int
// allowSplit indicates that bundles should be generated to fill as much of
// the specified space as possible, splitting them across multiple bundles if
// necessary.
// The parser may choose to forego bundling if the result is very suboptimal,
// but is encouraged to fill the space if it's reasonable.
allowSplit bool
// closed means that bundles should be aggressively generated with the
// expectation that no further data will be buffered. It is only relevant
// if allowSplit is also true.
closed bool
// parser is a stateful presence bound to a single log stream. A parser yields
// LogEntry messages one at a time and shapes them based on constraints.
// parser instances are owned by a single Stream and are not goroutine-safe.
type parser interface {
// appendData adds a data chunk to this parser's chunk.Buffer, taking
// ownership of the Data.
// nextEntry returns the next LogEntry in the stream.
// This method may return nil if there is insuffuicient data to produce a
// LogEntry given the
nextEntry(*constraints) (*logpb.LogEntry, error)
bufferedBytes() int64
firstChunkTime() (time.Time, bool)
func newParser(d *logpb.LogStreamDescriptor, c *counter) (parser, error) {
base := baseParser{
counter: c,
timeBase: google.TimeFromProto(d.Timestamp),
switch d.StreamType {
case logpb.StreamType_TEXT:
return &textParser{
baseParser: base,
}, nil
case logpb.StreamType_BINARY:
return &binaryParser{
baseParser: base,
}, nil
case logpb.StreamType_DATAGRAM:
return &datagramParser{
baseParser: base,
maxSize: int64(types.MaxDatagramSize),
}, nil
return nil, fmt.Errorf("unknown stream type: %v", d.StreamType)
// baseParser is a common set of parser capabilities.
type baseParser struct {
counter *counter
timeBase time.Time
nextIndex uint64
func (p *baseParser) baseLogEntry(ts time.Time) *logpb.LogEntry {
e := logpb.LogEntry{
TimeOffset: google.NewDuration(ts.Sub(p.timeBase)),
PrefixIndex: uint64(,
StreamIndex: p.nextIndex,
return &e
func (p *baseParser) appendData(d Data) {
func (p *baseParser) bufferedBytes() int64 {
return p.Len()
func (p *baseParser) firstChunkTime() (time.Time, bool) {
// Get the first data chunk in our Buffer.
chunk := p.FirstChunk()
if chunk == nil {
return time.Time{}, false
return chunk.(Data).Timestamp(), true
func memoryCorruptionIf(cond bool, err error) {
if cond {
func memoryCorruption(err error) {
if err != nil {
panic(fmt.Errorf("bundler: memory corruption: %s", err))