blob: ec9118d97d1c6509afb6791f11aff5c76927abbf [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bundler
import (
"fmt"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"go.chromium.org/luci/common/data/chunkstream"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/types"
)
// constraints is the set of Constraints to apply when generating a LogEntry.
type constraints struct {
// limit is the maximum size, in bytes, of the serialized LogEntry protobuf
// that may be produced.
limit int
// allowSplit indicates that bundles should be generated to fill as much of
// the specified space as possible, splitting them across multiple bundles if
// necessary.
//
// The parser may choose to forego bundling if the result is very suboptimal,
// but is encouraged to fill the space if it's reasonable.
allowSplit bool
// closed means that bundles should be aggressively generated with the
// expectation that no further data will be buffered. It is only relevant
// if allowSplit is also true.
closed bool
}
// parser is a stateful presence bound to a single log stream. A parser yields
// LogEntry messages one at a time and shapes them based on constraints.
//
// parser instances are owned by a single Stream and are not goroutine-safe.
type parser interface {
// appendData adds a data chunk to this parser's chunk.Buffer, taking
// ownership of the Data.
appendData(Data)
// nextEntry returns the next LogEntry in the stream.
//
// This method may return nil if there is insuffuicient data to produce a
// LogEntry given the
nextEntry(*constraints) (*logpb.LogEntry, error)
bufferedBytes() int64
firstChunkTime() (time.Time, bool)
}
func newParser(d *logpb.LogStreamDescriptor, c *counter) (parser, error) {
base := baseParser{
counter: c,
timeBase: d.Timestamp.AsTime(),
}
switch d.StreamType {
case logpb.StreamType_TEXT:
return &textParser{
baseParser: base,
}, nil
case logpb.StreamType_BINARY:
return &binaryParser{
baseParser: base,
}, nil
case logpb.StreamType_DATAGRAM:
return &datagramParser{
baseParser: base,
maxSize: int64(types.MaxDatagramSize),
}, nil
default:
return nil, fmt.Errorf("unknown stream type: %v", d.StreamType)
}
}
// baseParser is a common set of parser capabilities.
type baseParser struct {
chunkstream.Buffer
counter *counter
timeBase time.Time
nextIndex uint64
}
func (p *baseParser) baseLogEntry(ts time.Time) *logpb.LogEntry {
e := logpb.LogEntry{
TimeOffset: durationpb.New(ts.Sub(p.timeBase)),
PrefixIndex: uint64(p.counter.next()),
StreamIndex: p.nextIndex,
}
p.nextIndex++
return &e
}
func (p *baseParser) appendData(d Data) {
p.Append(d)
}
func (p *baseParser) bufferedBytes() int64 {
return p.Len()
}
func (p *baseParser) firstChunkTime() (time.Time, bool) {
// Get the first data chunk in our Buffer.
chunk := p.FirstChunk()
if chunk == nil {
return time.Time{}, false
}
return chunk.(Data).Timestamp(), true
}
func memoryCorruptionIf(cond bool, err error) {
if cond {
memoryCorruption(err)
}
}
func memoryCorruption(err error) {
if err != nil {
panic(fmt.Errorf("bundler: memory corruption: %s", err))
}
}