blob: 0d5f9eeac84aef3203f4c3f949ebfe0ee46e97ce [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fetcher
import (
"context"
"errors"
"io"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
. "github.com/smartystreets/goconvey/convey"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/testing/assertions"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/types"
)
type testSourceCommand struct {
logEntries []*logpb.LogEntry
err error
panic bool
tidx types.MessageIndex
tidxSet bool
}
// addLogs adds a text log entry for each named indices. The text entry contains
// a single line, "#x", where "x" is the log index.
func (cmd *testSourceCommand) logs(indices ...int64) *testSourceCommand {
for _, idx := range indices {
cmd.logEntries = append(cmd.logEntries, &logpb.LogEntry{
StreamIndex: uint64(idx),
})
}
return cmd
}
func (cmd *testSourceCommand) terminalIndex(v types.MessageIndex) *testSourceCommand {
cmd.tidx, cmd.tidxSet = v, true
return cmd
}
func (cmd *testSourceCommand) error(err error, panic bool) *testSourceCommand {
cmd.err, cmd.panic = err, panic
return cmd
}
type testSource struct {
sync.Mutex
logs map[types.MessageIndex]*logpb.LogEntry
err error
panic bool
terminal types.MessageIndex
history []int
}
func newTestSource() *testSource {
return &testSource{
terminal: -1,
logs: make(map[types.MessageIndex]*logpb.LogEntry),
}
}
func (ts *testSource) Descriptor() *logpb.LogStreamDescriptor { return nil }
func (ts *testSource) LogEntries(c context.Context, req *LogRequest) ([]*logpb.LogEntry, types.MessageIndex, error) {
ts.Lock()
defer ts.Unlock()
if ts.err != nil {
if ts.panic {
panic(ts.err)
}
return nil, 0, ts.err
}
// We have at least our next log. Build our return value.
maxCount := req.Count
if maxCount <= 0 {
maxCount = len(ts.logs)
}
maxBytes := req.Bytes
var logs []*logpb.LogEntry
bytes := int64(0)
index := req.Index
for {
if len(logs) >= maxCount {
break
}
log, ok := ts.logs[index]
if !ok {
break
}
size := int64(5) // We've rigged all logs to have size 5.
if len(logs) > 0 && maxBytes > 0 && (bytes+size) > maxBytes {
break
}
logs = append(logs, log)
index++
bytes += size
}
ts.history = append(ts.history, len(logs))
return logs, ts.terminal, nil
}
func (ts *testSource) send(cmd *testSourceCommand) {
ts.Lock()
defer ts.Unlock()
if cmd.err != nil {
ts.err, ts.panic = cmd.err, cmd.panic
}
if cmd.tidxSet {
ts.terminal = cmd.tidx
}
for _, le := range cmd.logEntries {
ts.logs[types.MessageIndex(le.StreamIndex)] = le
}
}
func (ts *testSource) getHistory() []int {
ts.Lock()
defer ts.Unlock()
h := make([]int, len(ts.history))
copy(h, ts.history)
return h
}
func loadLogs(f *Fetcher, count int) (result []types.MessageIndex, err error) {
for {
// Specific limit, hit that limit.
if count > 0 && len(result) >= count {
return
}
var le *logpb.LogEntry
le, err = f.NextLogEntry()
if le != nil {
result = append(result, types.MessageIndex(le.StreamIndex))
}
if err != nil {
return
}
}
}
func TestFetcher(t *testing.T) {
t.Parallel()
Convey(`A testing log Source`, t, func() {
c, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
c, cancelFunc := context.WithCancel(c)
ts := newTestSource()
o := Options{
Source: ts,
// All message byte sizes will be 5.
sizeFunc: func(proto.Message) int {
return 5
},
}
newFetcher := func() *Fetcher { return New(c, o) }
reap := func(f *Fetcher) {
cancelFunc()
loadLogs(f, 0)
}
Convey(`Uses defaults values when not overridden, and stops when cancelled.`, func() {
f := newFetcher()
defer reap(f)
So(f.o.BufferCount, ShouldEqual, 0)
So(f.o.BufferBytes, ShouldEqual, DefaultBufferBytes)
So(f.o.PrefetchFactor, ShouldEqual, 1)
})
Convey(`With a Count limit of 3.`, func() {
o.BufferCount = 3
Convey(`Will pull 6 sequential log records.`, func() {
var cmd testSourceCommand
ts.send(cmd.logs(0, 1, 2, 3, 4, 5).terminalIndex(5))
f := newFetcher()
defer reap(f)
logs, err := loadLogs(f, 0)
So(err, ShouldEqual, io.EOF)
So(logs, ShouldResemble, []types.MessageIndex{0, 1, 2, 3, 4, 5})
})
Convey(`Will immediately bail out if RequireCompleteStream is set`, func() {
var cmd testSourceCommand
ts.send(cmd.logs(0, 1, 2, 3, 4, 5).terminalIndex(-1))
o.RequireCompleteStream = true
f := newFetcher()
defer reap(f)
logs, err := loadLogs(f, 0)
So(err, ShouldEqual, ErrIncompleteStream)
So(logs, ShouldBeNil)
})
Convey(`Can read two log records and be cancelled.`, func() {
var cmd testSourceCommand
ts.send(cmd.logs(0, 1, 2, 3, 4, 5))
f := newFetcher()
defer reap(f)
logs, err := loadLogs(f, 2)
So(err, ShouldBeNil)
So(logs, ShouldResemble, []types.MessageIndex{0, 1})
cancelFunc()
_, err = loadLogs(f, 0)
So(err, ShouldEqual, context.Canceled)
})
Convey(`Will delay for more log records if none are available.`, func() {
delayed := false
tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
// Add the remaining logs.
delayed = true
var cmd testSourceCommand
ts.send(cmd.logs(1, 2).terminalIndex(2))
tc.Add(d)
})
var cmd testSourceCommand
ts.send(cmd.logs(0))
f := newFetcher()
defer reap(f)
logs, err := loadLogs(f, 1)
So(err, ShouldBeNil)
So(logs, ShouldResemble, []types.MessageIndex{0})
logs, err = loadLogs(f, 0)
So(err, ShouldEqual, io.EOF)
So(logs, ShouldResemble, []types.MessageIndex{1, 2})
So(delayed, ShouldBeTrue)
})
Convey(`When an error is countered getting the terminal index, returns the error.`, func() {
var cmd testSourceCommand
ts.send(cmd.error(errors.New("test error"), false))
f := newFetcher()
defer reap(f)
_, err := loadLogs(f, 0)
So(err, assertions.ShouldErrLike, "test error")
})
Convey(`When an error is countered fetching logs, returns the error.`, func() {
var cmd testSourceCommand
ts.send(cmd.logs(0, 1, 2).error(errors.New("test error"), false))
f := newFetcher()
defer reap(f)
_, err := loadLogs(f, 0)
So(err, assertions.ShouldErrLike, "test error")
})
Convey(`If the source panics, it is caught and returned as an error.`, func() {
var cmd testSourceCommand
ts.send(cmd.error(errors.New("test error"), true))
f := newFetcher()
defer reap(f)
_, err := loadLogs(f, 0)
So(err, assertions.ShouldErrLike, "panic during fetch")
})
})
Convey(`With a byte limit of 15`, func() {
o.BufferBytes = 15
o.PrefetchFactor = 2
var cmd testSourceCommand
ts.send(cmd.logs(0, 1, 2, 3, 4, 5, 6).terminalIndex(6))
f := newFetcher()
defer reap(f)
// First fetch should have asked for 30 bytes (2*15), so 6 logs. After
// first log was kicked, there is a deficit of one log.
logs, err := loadLogs(f, 0)
So(err, ShouldEqual, io.EOF)
So(logs, ShouldResemble, []types.MessageIndex{0, 1, 2, 3, 4, 5, 6})
So(ts.getHistory(), ShouldResemble, []int{6, 1})
})
Convey(`With an index of 1 and a maximum count of 1, fetches exactly 1 log.`, func() {
o.Index = 1
o.Count = 1
var cmd testSourceCommand
ts.send(cmd.logs(0, 1, 2, 3, 4, 5, 6).terminalIndex(6))
f := newFetcher()
defer reap(f)
// First fetch will ask for exactly one log.
logs, err := loadLogs(f, 0)
So(err, ShouldEqual, io.EOF)
So(logs, ShouldResemble, []types.MessageIndex{1})
So(ts.getHistory(), ShouldResemble, []int{1})
})
})
}