blob: 645ded9e153b31cb89f5db447c8ed21e72b837ca [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bundler
import (
"io"
"go.chromium.org/luci/common/data/recordio"
"go.chromium.org/luci/logdog/api/logpb"
)
// datagramParser is a parser implementation for the LogDog datagram stream
// type.
type datagramParser struct {
baseParser
// maxSize is the maximum allowed datagram size. Datagrams larger than this
// will result in a processing error.
maxSize int64
// seq is the current datagram sequence number.
seq int64
// remaining is the amount of data remaining in a datagram that has previously
// been emitted partially.
//
// This will be zero if we're not continuing a partial datagram.
remaining int64
// index is the index of this datagram. This is zero unless the datagram is
// a continuation of a previous partial datagram, in which case this is the
// continuation's index.
index int64
// size is the size of the current partial datagram.
//
// This value is only valid if we're continuing a partial datagram (i.e., if
// remaining is non-zero).
size int64
}
var _ parser = (*datagramParser)(nil)
func (s *datagramParser) nextEntry(c *constraints) (*logpb.LogEntry, error) {
// Use the current Buffer timestamp.
ts, has := s.firstChunkTime()
if !has {
// No chunks, so no data.
return nil, nil
}
// If remaining is zero, we don't have a buffered size header.
//
// Note that zero-size datagrams will store zero here on load; however, such
// datagrams will never fail to emit a LogEntry, so s.remaining will have been
// reset to zero by the next call.
if s.remaining == 0 {
bv := s.View()
// Read the next datagram size header.
rio := recordio.NewReader(bv, s.maxSize)
size, _, err := rio.ReadFrame()
if err != nil {
switch err {
case io.EOF:
// Not enough data for a size header.
return nil, nil
case recordio.ErrFrameTooLarge:
return nil, recordio.ErrFrameTooLarge
}
// Other errors should not be possible, since all operations are against
// in-memory buffers.
memoryCorruption(err)
}
s.index = 0
s.size = size
s.remaining = size
// Don't need to read the size header again.
s.Consume(bv.Consumed())
}
// If we read this, will it be partial?
emitCount := s.remaining
continued := false
if emitCount > int64(c.limit) {
continued = true
emitCount = int64(c.limit)
}
bv := s.ViewLimit(s.remaining)
if r := bv.Remaining(); r < emitCount {
// Not enough buffered data to complete the datagram in one round.
continued = true
emitCount = r
}
if s.remaining > 0 && emitCount == 0 {
// The datagram has data, but we can't emit any of it. No point in issuing
// a zero-size partial datagram.
return nil, nil
}
// We're not willing to emit a partial datagram unless we're allowed to
// split.
if continued && !c.allowSplit {
return nil, nil
}
dg := logpb.Datagram{}
if continued || s.index > 0 {
dg.Partial = &logpb.Datagram_Partial{
Index: uint32(s.index),
Size: uint64(s.size),
Last: !continued,
}
}
if emitCount > 0 {
dg.Data = make([]byte, emitCount)
bv.Read(dg.Data)
s.Consume(emitCount)
}
le := s.baseLogEntry(ts)
le.Sequence = uint64(s.seq)
le.Content = &logpb.LogEntry_Datagram{Datagram: &dg}
if !continued {
s.seq++
s.remaining = 0
// Will reset remaining partial fields on next read, since remaining == 0.
} else {
s.index++
s.remaining -= emitCount
}
return le, nil
}