blob: a35454b97df59a7078c38641ff8a675f1d654c86 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
cc ""
. ""
. ""
// TestCollector runs through a series of end-to-end Collector workflows and
// ensures that the Collector behaves appropriately.
func testCollectorImpl(t *testing.T, caching bool) {
Convey(fmt.Sprintf(`Using a test configuration with caching == %v`, caching), t, func() {
c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
tcc := &testCoordinator{}
st := &testStorage{Storage: &memory.Storage{}}
coll := &Collector{
Coordinator: tcc,
Storage: st,
defer coll.Close()
bb := bundleBuilder{
Context: c,
if caching {
coll.Coordinator = cc.NewCache(coll.Coordinator, 0, 0)
Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
bb.addFullStream("foo/+/bar", 128)
bb.addFullStream("foo/+/baz", 256)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 127)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 127})
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/baz", 255)
So(st, shouldHaveStoredStream, "test-project", "foo/+/baz", indexRange{0, 255})
Convey(`Will return a transient error if a transient error happened while registering.`, func() {
tcc.registerCallback = func(cc.LogStreamState) error { return errors.New("test error", transient.Tag) }
bb.addFullStream("foo/+/bar", 128)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeTrue)
Convey(`Will return an error if a non-transient error happened while registering.`, func() {
tcc.registerCallback = func(cc.LogStreamState) error { return errors.New("test error") }
bb.addFullStream("foo/+/bar", 128)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeFalse)
// This will happen when one registration request registers non-terminal,
// and a follow-on registration request registers with a terminal index. The
// latter registration request will idempotently succeed, but not accept the
// terminal index, so termination is still required.
Convey(`Will terminate a stream if a terminal registration returns a non-terminal response.`, func() {
terminateCalled := false
tcc.terminateCallback = func(cc.TerminateRequest) error {
terminateCalled = true
return nil
bb.addStreamEntries("foo/+/bar", -1, 0, 1)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
bb.addStreamEntries("foo/+/bar", 3, 2, 3)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(terminateCalled, ShouldBeTrue)
Convey(`Will return a transient error if a transient error happened while terminating.`, func() {
tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error", transient.Tag) }
// Register independently from terminate so we don't bundle RPC.
bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
// Add terminal index.
bb.addStreamEntries("foo/+/bar", 5, 5)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeTrue)
Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error") }
// Register independently from terminate so we don't bundle RPC.
bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
// Add terminal index.
bb.addStreamEntries("foo/+/bar", 5, 5)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeFalse)
Convey(`Will return a transient error if a transient error happened on storage.`, func() {
// Single transient error.
count := int32(0)
st.err = func() error {
if atomic.AddInt32(&count, 1) == 1 {
return errors.New("test error", transient.Tag)
return nil
bb.addFullStream("foo/+/bar", 128)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeTrue)
Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
bb.addFullStream("foo/+/bar", 32)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeFalse)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 32)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 31})
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/trash", 1337)
So(st, shouldHaveStoredStream, "test-project", "foo/+/trash", 4, 5, 6, 7, 8)
Convey(`Will drop streams with missing (invalid) secrets.`, func() {
b := bb.genBase()
b.Secret = nil
bb.addFullStream("foo/+/bar", 4)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid prefix secret")
So(transient.Tag.In(err), ShouldBeFalse)
Convey(`Will drop messages with mismatching secrets.`, func() {
bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
// Push another bundle with a different secret.
b := bb.genBase()
b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecretLength)
be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
be.TerminalIndex = 1337
bb.addFullStream("foo/+/baz", 3)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 2})
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/baz", 2)
So(st, shouldHaveStoredStream, "test-project", "foo/+/baz", indexRange{0, 2})
Convey(`With an empty project name, will drop the stream.`, func() {
b := bb.genBase()
b.Project = ""
bb.addFullStream("foo/+/baz", 3)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid bundle project name")
So(transient.Tag.In(err), ShouldBeFalse)
Convey(`Will drop streams with invalid project names.`, func() {
b := bb.genBase()
b.Project = "!!!invalid name!!!"
So(types.ProjectName(b.Project).Validate(), ShouldNotBeNil)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid bundle project name")
So(transient.Tag.In(err), ShouldBeFalse)
Convey(`Will drop streams with empty bundle prefixes.`, func() {
b := bb.genBase()
b.Prefix = ""
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid bundle prefix")
So(transient.Tag.In(err), ShouldBeFalse)
Convey(`Will drop streams with invalid bundle prefixes.`, func() {
b := bb.genBase()
b.Prefix = "!!!invalid prefix!!!"
So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid bundle prefix")
So(transient.Tag.In(err), ShouldBeFalse)
Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() {
bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "mismatched bundle and entry prefixes")
So(transient.Tag.In(err), ShouldBeFalse)
Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
So(coll.Process(c, []byte{0x00}), ShouldBeNil)
So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
buf := bytes.Buffer{}
w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
w.Write(&buf, &logpb.ButlerLogBundle{})
So(coll.Process(c, buf.Bytes()), ShouldBeNil)
So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
Convey(`Will not ingest records if the stream is archived.`, func() {
Project: "test-project",
Path: "foo/+/bar",
Secret: testSecret,
TerminalIndex: -1,
Archived: true,
bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar")
Convey(`Will not ingest records if the stream is purged.`, func() {
Project: "test-project",
Path: "foo/+/bar",
Secret: testSecret,
TerminalIndex: -1,
Purged: true,
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar")
Convey(`Will not ingest a bundle with no bundle entries.`, func() {
So(coll.Process(c, bb.bundle()), ShouldBeNil)
Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
// Add a binary log entry. This does NOT match the text descriptor, and
// should fail validation.
be.Logs = append(be.Logs, &logpb.LogEntry{
StreamIndex: 2,
Sequence: 2,
Content: &logpb.LogEntry_Binary{
Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
// TestCollector runs through a series of end-to-end Collector workflows and
// ensures that the Collector behaves appropriately.
func TestCollector(t *testing.T) {
testCollectorImpl(t, false)
// TestCollectorWithCaching runs through a series of end-to-end Collector
// workflows and ensures that the Collector behaves appropriately.
func TestCollectorWithCaching(t *testing.T) {
testCollectorImpl(t, true)