blob: 593ac916d2619f670710779359994724e4c3d053 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bundler
import (
"fmt"
"sort"
"go.chromium.org/luci/logdog/api/logpb"
)
// builderStream is builder data that is tracked for each individual stream.
type builderStream struct {
// ButlerLogBundle_Entry is the stream's in-progress bundle entry.
logpb.ButlerLogBundle_Entry
// size incrementally tracks the size of the stream's entry.
size int
}
// builder incrementally constructs ButlerLogBundle entries.
type builder struct {
// size is the maximum permitted bundle size.
size int
// template is the base bundle template.
template logpb.ButlerLogBundle
// templateCachedSize is the cached size of the ButlerLogBundle template.
templateCachedSize int
// smap maps the builder state for each individual stream by stream name.
streams map[string]*builderStream
}
func (b *builder) remaining() int {
return b.size - b.bundleSize()
}
func (b *builder) ready() bool {
// Have we reached our desired size?
return b.hasContent() && (b.bundleSize() >= b.size)
}
func (b *builder) bundleSize() int {
if b.templateCachedSize == 0 {
b.templateCachedSize = protoSize(&b.template)
}
size := b.templateCachedSize
for _, bs := range b.streams {
size += sizeOfBundleEntryTag + varintLength(uint64(bs.size)) + bs.size
}
return size
}
func (b *builder) hasContent() bool {
return len(b.streams) > 0
}
func (b *builder) add(template *logpb.ButlerLogBundle_Entry, le *logpb.LogEntry) {
bs := b.getCreateBuilderStream(template)
bs.Logs = append(bs.Logs, le)
psize := protoSize(le)
// Pay the cost of the additional LogEntry.
bs.size += sizeOfLogEntryTag + varintLength(uint64(psize)) + psize
}
func (b *builder) setStreamTerminal(template *logpb.ButlerLogBundle_Entry, tidx uint64) {
bs := b.getCreateBuilderStream(template)
if bs.Terminal {
if bs.TerminalIndex != tidx {
panic(fmt.Errorf("attempt to change terminal index %d => %d", bs.TerminalIndex, tidx))
}
return
}
bs.Terminal = true
bs.TerminalIndex = tidx
// Pay the cost of the additional terminal fields.
bs.size += ((sizeOfTerminalTag + sizeOfBoolTrue) +
(sizeOfTerminalIndexTag + varintLength(bs.TerminalIndex)))
}
func (b *builder) bundle() *logpb.ButlerLogBundle {
bundle := b.template
names := make([]string, 0, len(b.streams))
for k := range b.streams {
names = append(names, k)
}
sort.Strings(names)
bundle.Entries = make([]*logpb.ButlerLogBundle_Entry, len(names))
for idx, name := range names {
bundle.Entries[idx] = &b.streams[name].ButlerLogBundle_Entry
}
return &bundle
}
func (b *builder) getCreateBuilderStream(template *logpb.ButlerLogBundle_Entry) *builderStream {
if bs := b.streams[template.Desc.Name]; bs != nil {
return bs
}
// Initialize our maps (first time only).
if b.streams == nil {
b.streams = map[string]*builderStream{}
}
bs := builderStream{
ButlerLogBundle_Entry: *template,
size: protoSize(template),
}
b.streams[template.Desc.Name] = &bs
return &bs
}