blob: ff380569c8dd316d15a4fdfb320da76269af678c [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package archive
import (
"bytes"
"fmt"
"io"
"strconv"
"testing"
"time"
"github.com/golang/protobuf/proto"
"go.chromium.org/luci/common/data/recordio"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/iotools"
"go.chromium.org/luci/common/proto/google"
"go.chromium.org/luci/logdog/api/logpb"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func gen(i int) *logpb.LogEntry {
return &logpb.LogEntry{
TimeOffset: google.NewDuration(time.Duration(i) * time.Second),
PrefixIndex: uint64(i) * 2,
StreamIndex: uint64(i),
Sequence: uint64(i),
Content: &logpb.LogEntry_Text{
Text: &logpb.Text{
Lines: []*logpb.Text_Line{
{Value: []byte(strconv.Itoa(i)), Delimiter: "\n"},
},
},
},
}
}
type testSource struct {
logs []*logpb.LogEntry
err error
}
func (s *testSource) addLogEntry(le *logpb.LogEntry) {
s.logs = append(s.logs, le)
}
func (s *testSource) add(indices ...int) {
for _, i := range indices {
s.addLogEntry(gen(i))
}
}
func (s *testSource) addEntries(entries ...*logpb.LogEntry) {
s.logs = append(s.logs, entries...)
}
func (s *testSource) NextLogEntry() (le *logpb.LogEntry, err error) {
if err = s.err; err != nil {
return
}
if len(s.logs) == 0 {
err = io.EOF
return
}
le, s.logs = s.logs[0], s.logs[1:]
return
}
type errWriter struct {
io.Writer
err error
}
func (w *errWriter) Write(d []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
return w.Writer.Write(d)
}
// indexParams umarshals an index from a byte stream and removes any entries.
func indexParams(d []byte) *logpb.LogIndex {
var index logpb.LogIndex
if err := proto.Unmarshal(d, &index); err != nil {
panic(fmt.Errorf("failed to unmarshal index protobuf: %v", err))
}
index.Entries = nil
return &index
}
type indexChecker struct {
fixedSize int
}
func (ic *indexChecker) size(pb proto.Message) int {
if ic.fixedSize > 0 {
return ic.fixedSize
}
return proto.Size(pb)
}
// shouldContainIndexFor validates the correctness and completeness of the
// supplied index.
//
// actual should be a *bytes.Buffer that contains a serialized index protobuf.
//
// expected[0] should be the log stream descriptor.
// expected[1] should be a *bytes.Buffer that contains the log RecordIO stream.
//
// If additional expected elements are supplied, they are the specific integers
// that should appear in the index. Otherwise, it is assumed that the log is a
// complete index.
func (ic *indexChecker) shouldContainIndexFor(actual interface{}, expected ...interface{}) string {
indexB := actual.(*bytes.Buffer)
if len(expected) < 2 {
return "at least two expected arguments are required"
}
desc := expected[0].(*logpb.LogStreamDescriptor)
logB := expected[1].(*bytes.Buffer)
expected = expected[2:]
// Load our log index.
index := logpb.LogIndex{}
if err := proto.Unmarshal(indexB.Bytes(), &index); err != nil {
return fmt.Sprintf("failed to unmarshal index protobuf: %v", err)
}
// Descriptors must match.
if err := ShouldResembleProto(index.Desc, desc); err != "" {
return err
}
// Catalog the log entries in "expected".
entries := map[uint64]*logpb.LogEntry{}
offsets := map[uint64]int64{}
csizes := map[uint64]uint64{}
var eidx []uint64
// Skip the first frame (descriptor).
cr := iotools.CountingReader{
Reader: logB,
}
csize := uint64(0)
r := recordio.NewReader(&cr, 1024*1024)
d, err := r.ReadFrameAll()
if err != nil {
return "failed to skip descriptor frame"
}
for {
offset := cr.Count
d, err = r.ReadFrameAll()
if err == io.EOF {
break
}
if err != nil {
return fmt.Sprintf("failed to read entry #%d: %v", len(entries), err)
}
le := logpb.LogEntry{}
if err := proto.Unmarshal(d, &le); err != nil {
return fmt.Sprintf("failed to unmarshal entry #%d: %v", len(entries), err)
}
entries[le.StreamIndex] = &le
offsets[le.StreamIndex] = offset
csizes[le.StreamIndex] = csize
csize += uint64(ic.size(&le))
eidx = append(eidx, le.StreamIndex)
}
// Determine our expected archive indexes.
if len(expected) > 0 {
eidx = make([]uint64, 0, len(expected))
for _, e := range expected {
eidx = append(eidx, uint64(e.(int)))
}
}
iidx := make([]uint64, len(index.Entries))
for i, e := range index.Entries {
iidx[i] = e.StreamIndex
}
if err := ShouldResemble(iidx, eidx); err != "" {
return err
}
for i, cur := range index.Entries {
cidx := eidx[i]
le, offset := entries[uint64(cidx)], offsets[cidx]
if le == nil {
return fmt.Sprintf("no log entry for stream index %d", cidx)
}
if cur.StreamIndex != le.StreamIndex {
return fmt.Sprintf("index entry %d has incorrect stream index (%d != %d)", i, cur.StreamIndex, le.StreamIndex)
}
if cur.Offset != uint64(offset) {
return fmt.Sprintf("index entry %d has incorrect offset (%d != %d)", i, cur.Offset, offset)
}
if cur.PrefixIndex != le.PrefixIndex {
return fmt.Sprintf("index entry %d has incorrect prefix index (%d != %d)", i, cur.StreamIndex, le.PrefixIndex)
}
if curOff, leOff := google.DurationFromProto(cur.TimeOffset), google.DurationFromProto(le.TimeOffset); curOff != leOff {
return fmt.Sprintf("index entry %d has incorrect time offset (%v != %v)", i, curOff, leOff)
}
if cur.Sequence != le.Sequence {
return fmt.Sprintf("index entry %d has incorrect sequence (%d != %d)", i, cur.Sequence, le.Sequence)
}
}
return ""
}
func TestArchive(t *testing.T) {
Convey(`A Manifest connected to Buffer Writers`, t, func() {
var logB, indexB bytes.Buffer
desc := &logpb.LogStreamDescriptor{
Prefix: "test",
Name: "foo",
}
ic := indexChecker{}
ts := testSource{}
m := Manifest{
Desc: desc,
Source: &ts,
LogWriter: &logB,
IndexWriter: &indexB,
}
Convey(`A sequence of logs will build a complete index.`, func() {
ts.add(0, 1, 2, 3, 4, 5, 6)
So(Archive(m), ShouldBeNil)
So(&indexB, ic.shouldContainIndexFor, desc, &logB)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 12,
LastStreamIndex: 6,
LogEntryCount: 7,
})
})
Convey(`A sequence of non-contiguous logs will build a complete index.`, func() {
ts.add(0, 1, 3, 6)
So(Archive(m), ShouldBeNil)
So(&indexB, ic.shouldContainIndexFor, desc, &logB, 0, 1, 3, 6)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 12,
LastStreamIndex: 6,
LogEntryCount: 4,
})
})
Convey(`Out of order logs are ignored`, func() {
Convey(`When StreamIndex is out of order.`, func() {
ts.add(0, 2, 1, 3)
So(Archive(m), ShouldBeNil)
So(&indexB, ic.shouldContainIndexFor, desc, &logB, 0, 2, 3)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 6,
LastStreamIndex: 3,
LogEntryCount: 3,
})
})
Convey(`When PrefixIndex is out of order.`, func() {
ts.add(0, 1)
le := gen(2)
le.PrefixIndex = 1
ts.addEntries(le)
ts.add(3, 4)
So(Archive(m), ShouldBeNil)
So(&indexB, ic.shouldContainIndexFor, desc, &logB, 0, 1, 3, 4)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 8,
LastStreamIndex: 4,
LogEntryCount: 4,
})
})
Convey(`When Sequence is out of order.`, func() {
ts.add(0, 1)
le := gen(2)
le.Sequence = 0
ts.addEntries(le)
ts.add(3, 4)
So(Archive(m), ShouldBeNil)
So(&indexB, ic.shouldContainIndexFor, desc, &logB, 0, 1, 3, 4)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 8,
LastStreamIndex: 4,
LogEntryCount: 4,
})
})
})
Convey(`When TimeOffset is out of order, it is bumped to max.`, func() {
ts.add(0, 1)
le := gen(2)
le.TimeOffset = nil // 0
ts.addEntries(le)
ts.add(3, 4)
So(Archive(m), ShouldBeNil)
So(&indexB, ic.shouldContainIndexFor, desc, &logB, 0, 1, 2, 3, 4)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 8,
LastStreamIndex: 4,
LogEntryCount: 5,
})
})
Convey(`Source errors will be returned`, func() {
Convey(`nil LogEntry`, func() {
ts.addLogEntry(nil)
So(Archive(m), ShouldErrLike, "nil LogEntry")
})
Convey(`Error returned`, func() {
ts.add(0, 1, 2, 3, 4, 5)
ts.err = errors.New("test error")
So(Archive(m), ShouldErrLike, "test error")
})
})
Convey(`Writer errors will be returned`, func() {
ts.add(0, 1, 2, 3, 4, 5)
Convey(`For log writer errors.`, func() {
m.LogWriter = &errWriter{m.LogWriter, errors.New("test error")}
So(errors.SingleError(Archive(m)), ShouldErrLike, "test error")
})
Convey(`For index writer errors.`, func() {
m.IndexWriter = &errWriter{m.IndexWriter, errors.New("test error")}
So(errors.SingleError(Archive(m)), ShouldErrLike, "test error")
})
Convey(`When all Writers fail.`, func() {
m.LogWriter = &errWriter{m.LogWriter, errors.New("test error")}
m.IndexWriter = &errWriter{m.IndexWriter, errors.New("test error")}
So(Archive(m), ShouldNotBeNil)
})
})
Convey(`When building sparse index`, func() {
ts.add(0, 1, 2, 3, 4, 5)
Convey(`Can build an index for every 3 StreamIndex.`, func() {
m.StreamIndexRange = 3
So(Archive(m), ShouldBeNil)
So(&indexB, ic.shouldContainIndexFor, desc, &logB, 0, 3, 5)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 10,
LastStreamIndex: 5,
LogEntryCount: 6,
})
})
Convey(`Can build an index for every 3 PrefixIndex.`, func() {
m.PrefixIndexRange = 3
So(Archive(m), ShouldBeNil)
// Note that in our generated logs, PrefixIndex = 2*StreamIndex.
So(&indexB, ic.shouldContainIndexFor, desc, &logB, 0, 2, 4, 5)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 10,
LastStreamIndex: 5,
LogEntryCount: 6,
})
})
Convey(`Can build an index for every 13 bytes.`, func() {
ic.fixedSize = 5
m.ByteRange = 13
m.sizeFunc = func(pb proto.Message) int {
// Stub all LogEntry to be 5 bytes.
return 5
}
So(Archive(m), ShouldBeNil)
So(&indexB, ic.shouldContainIndexFor, desc, &logB, 0, 2, 5)
So(indexParams(indexB.Bytes()), ShouldResembleProto, &logpb.LogIndex{
Desc: desc,
LastPrefixIndex: 10,
LastStreamIndex: 5,
LogEntryCount: 6,
})
})
})
})
}