blob: 39c588ad2d3a4f33b51f87a6a8efc61c0ca92151 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package butler
import (
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/logdog/api/logpb"
)
// assertGetText panics if the passed LogEntry does not contain Text data, or returns it.
func assertGetText(le *logpb.LogEntry) *logpb.Text {
if txt := le.GetText(); txt == nil {
panic(errors.Reason(
"wrong StreamType: got %T, expected *logpb.LogEntry_Text", le.Content,
).Err())
} else {
return txt
}
}
// getWrappedTextCallback wraps a passed callback meant to be called at the
// ends of Text lines so that it is actually called at the end of Text lines.
//
// Does not wrap callback to guarantee being called at the end of *every* Text
// line.
//
// The wrapped callback panics if:
// - the passed LogEntry is not a Text LogEntry
// - the passed LogEntry has lines in a form other than described in log.proto
func getWrappedTextCallback(cb StreamChunkCallback) StreamChunkCallback {
if cb == nil {
return nil
}
var flushed bool
var buf []*logpb.Text_Line
var streamIdx uint64
var sequence uint64
flushBuffer := func() {
if len(buf) == 0 {
return
}
data := &logpb.LogEntry{
Content: &logpb.LogEntry_Text{
Text: &logpb.Text{
Lines: buf,
},
},
StreamIndex: streamIdx,
Sequence: sequence,
}
cb(data)
streamIdx++
sequence += uint64(len(buf))
buf = nil
}
return func(le *logpb.LogEntry) {
if le == nil && !flushed { // "flush"
flushed = true
flushBuffer()
cb(nil)
return
}
if flushed {
panic(errors.New("called with nil multiple times"))
}
txt := assertGetText(le)
if len(txt.Lines) == 0 {
panic(errors.New("called with no lines"))
}
// Process the first line, which may be partial.
firstLine := txt.Lines[0]
buf = append(buf, firstLine)
if firstLine.Delimiter == "" {
if len(txt.Lines) > 1 {
panic(errors.New("partial line not last in LogEntry"))
}
return
}
// Convert buf's contents into a single line and store that.
if len(buf) > 1 {
bufSize := 0
for _, line := range buf {
bufSize += len(line.Value)
}
wholeFirstLine := &logpb.Text_Line{
Value: make([]byte, 0, bufSize),
Delimiter: firstLine.Delimiter,
}
for _, line := range buf {
wholeFirstLine.Value = append(wholeFirstLine.Value, line.Value...)
}
buf = []*logpb.Text_Line{wholeFirstLine}
}
// Process the next lines, which should be all complete with at most one partial at the end.
wholeLines := txt.Lines[1:]
var lastPartialLine *logpb.Text_Line
if lastIdx := len(wholeLines) - 1; lastIdx >= 0 && wholeLines[lastIdx].Delimiter == "" {
lastPartialLine = wholeLines[lastIdx]
wholeLines = wholeLines[:lastIdx]
}
for _, line := range wholeLines {
if line.Delimiter == "" {
panic(errors.New("partial line not last in LogEntry"))
}
buf = append(buf, line)
}
flushBuffer()
// If the last line is partial, record it.
if lastPartialLine != nil {
buf = []*logpb.Text_Line{lastPartialLine}
}
}
}