blob: bc676a8c934361894d841e4809b9c9f22bb335c1 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bundler
import (
"bytes"
"time"
"unicode/utf8"
"go.chromium.org/luci/logdog/api/logpb"
)
const (
posixNewline = "\n"
windowsNewline = "\r\n"
)
var (
posixNewlineBytes = []byte(posixNewline)
)
// textParser is a parser implementation for the LogDog TEXT stream type.
type textParser struct {
baseParser
sequence int64
buf bytes.Buffer
}
var _ parser = (*textParser)(nil)
func (p *textParser) nextEntry(c *constraints) (*logpb.LogEntry, error) {
limit := int64(c.limit)
ts := time.Time{}
txt := logpb.Text{}
lineCount := 0
for limit > 0 {
br := p.ViewLimit(limit)
if br.Remaining() == 0 {
// Exceeded either limit or available buffer data.
break
}
// Use the timestamp of the first data chunk.
if len(txt.Lines) == 0 {
ts, _ = p.firstChunkTime()
} else if ct, _ := p.firstChunkTime(); !ct.Equal(ts) {
// New timestamp, so need new LogEntry.
break
}
// Find the index of our delimiter.
//
// We do this using a cross-platform approach that works on POSIX systems,
// Mac (>=OSX), and Windows: we scan for "\n", then look backwards to see if
// it was preceded by "\r" (for Windows-style newlines, "\r\n").
idx := br.Index(posixNewlineBytes)
newline := ""
if idx >= 0 {
br = br.CloneLimit(idx)
newline = posixNewline
} else if !c.allowSplit {
// No delimiter within our limit, and we're not allowed to split, so we're
// done.
break
}
// Load the exportable data into our buffer.
p.buf.Reset()
p.buf.ReadFrom(br)
// Does our exportable buffer end with "\r"? If so, treat it as a possible
// Windows newline sequence.
if p.buf.Len() > 0 && p.buf.Bytes()[p.buf.Len()-1] == byte('\r') {
split := false
if newline != "" {
// "\n" => "\r\n"
newline = windowsNewline
split = true
} else {
// If we're closed and this is the last byte in the stream, it is a
// dangling "\r" and we should include it. Otherwise, leave it for the
// next round.
split = !(c.closed && int64(p.buf.Len()) == p.Len())
}
if split {
p.buf.Truncate(p.buf.Len() - 1)
}
}
partial := (idx < 0)
if !partial {
lineCount++
}
// If we didn't have a delimiter, make sure we don't terminate in the middle
// of a UTF8 character.
if partial {
count := 0
lidx := -1
b := p.buf.Bytes()
for len(b) > 0 {
r, sz := utf8.DecodeRune(b)
count += sz
if r != utf8.RuneError {
lidx = count
}
b = b[sz:]
}
if lidx < 0 {
break
}
p.buf.Truncate(lidx)
}
txt.Lines = append(txt.Lines, &logpb.Text_Line{
Value: append([]byte(nil), p.buf.Bytes()...), // Make a copy.
Delimiter: newline,
})
p.Consume(int64(p.buf.Len() + len(newline)))
limit -= int64(p.buf.Len() + len(newline))
}
if len(txt.Lines) == 0 {
return nil, nil
}
le := p.baseLogEntry(ts)
le.Sequence = uint64(p.sequence)
le.Content = &logpb.LogEntry_Text{Text: &txt}
p.sequence += int64(lineCount)
return le, nil
}