blob: a35454b97df59a7078c38641ff8a675f1d654c86 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"bytes"
"context"
"fmt"
"sync/atomic"
"testing"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/client/butlerproto"
"go.chromium.org/luci/logdog/common/storage/memory"
"go.chromium.org/luci/logdog/common/types"
cc "go.chromium.org/luci/logdog/server/collector/coordinator"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
// TestCollector runs through a series of end-to-end Collector workflows and
// ensures that the Collector behaves appropriately.
func testCollectorImpl(t *testing.T, caching bool) {
Convey(fmt.Sprintf(`Using a test configuration with caching == %v`, caching), t, func() {
c, _ := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
tcc := &testCoordinator{}
st := &testStorage{Storage: &memory.Storage{}}
coll := &Collector{
Coordinator: tcc,
Storage: st,
}
defer coll.Close()
bb := bundleBuilder{
Context: c,
}
if caching {
coll.Coordinator = cc.NewCache(coll.Coordinator, 0, 0)
}
Convey(`Can process multiple single full streams from a Butler bundle.`, func() {
bb.addFullStream("foo/+/bar", 128)
bb.addFullStream("foo/+/baz", 256)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 127)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 127})
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/baz", 255)
So(st, shouldHaveStoredStream, "test-project", "foo/+/baz", indexRange{0, 255})
})
Convey(`Will return a transient error if a transient error happened while registering.`, func() {
tcc.registerCallback = func(cc.LogStreamState) error { return errors.New("test error", transient.Tag) }
bb.addFullStream("foo/+/bar", 128)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeTrue)
})
Convey(`Will return an error if a non-transient error happened while registering.`, func() {
tcc.registerCallback = func(cc.LogStreamState) error { return errors.New("test error") }
bb.addFullStream("foo/+/bar", 128)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeFalse)
})
// This will happen when one registration request registers non-terminal,
// and a follow-on registration request registers with a terminal index. The
// latter registration request will idempotently succeed, but not accept the
// terminal index, so termination is still required.
Convey(`Will terminate a stream if a terminal registration returns a non-terminal response.`, func() {
terminateCalled := false
tcc.terminateCallback = func(cc.TerminateRequest) error {
terminateCalled = true
return nil
}
bb.addStreamEntries("foo/+/bar", -1, 0, 1)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
bb.addStreamEntries("foo/+/bar", 3, 2, 3)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(terminateCalled, ShouldBeTrue)
})
Convey(`Will return a transient error if a transient error happened while terminating.`, func() {
tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error", transient.Tag) }
// Register independently from terminate so we don't bundle RPC.
bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
// Add terminal index.
bb.addStreamEntries("foo/+/bar", 5, 5)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeTrue)
})
Convey(`Will return an error if a non-transient error happened while terminating.`, func() {
tcc.terminateCallback = func(cc.TerminateRequest) error { return errors.New("test error") }
// Register independently from terminate so we don't bundle RPC.
bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2, 3, 4)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
// Add terminal index.
bb.addStreamEntries("foo/+/bar", 5, 5)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeFalse)
})
Convey(`Will return a transient error if a transient error happened on storage.`, func() {
// Single transient error.
count := int32(0)
st.err = func() error {
if atomic.AddInt32(&count, 1) == 1 {
return errors.New("test error", transient.Tag)
}
return nil
}
bb.addFullStream("foo/+/bar", 128)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeTrue)
})
Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
bb.addBundleEntry(be)
bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
bb.addFullStream("foo/+/bar", 32)
err := coll.Process(c, bb.bundle())
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeFalse)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", 32)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 31})
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/trash", 1337)
So(st, shouldHaveStoredStream, "test-project", "foo/+/trash", 4, 5, 6, 7, 8)
})
Convey(`Will drop streams with missing (invalid) secrets.`, func() {
b := bb.genBase()
b.Secret = nil
bb.addFullStream("foo/+/bar", 4)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid prefix secret")
So(transient.Tag.In(err), ShouldBeFalse)
})
Convey(`Will drop messages with mismatching secrets.`, func() {
bb.addStreamEntries("foo/+/bar", -1, 0, 1, 2)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
// Push another bundle with a different secret.
b := bb.genBase()
b.Secret = bytes.Repeat([]byte{0xAA}, types.PrefixSecretLength)
be := bb.genBundleEntry("foo/+/bar", 4, 3, 4)
be.TerminalIndex = 1337
bb.addBundleEntry(be)
bb.addFullStream("foo/+/baz", 3)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar", indexRange{0, 2})
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/baz", 2)
So(st, shouldHaveStoredStream, "test-project", "foo/+/baz", indexRange{0, 2})
})
Convey(`With an empty project name, will drop the stream.`, func() {
b := bb.genBase()
b.Project = ""
bb.addFullStream("foo/+/baz", 3)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid bundle project name")
So(transient.Tag.In(err), ShouldBeFalse)
})
Convey(`Will drop streams with invalid project names.`, func() {
b := bb.genBase()
b.Project = "!!!invalid name!!!"
So(types.ProjectName(b.Project).Validate(), ShouldNotBeNil)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid bundle project name")
So(transient.Tag.In(err), ShouldBeFalse)
})
Convey(`Will drop streams with empty bundle prefixes.`, func() {
b := bb.genBase()
b.Prefix = ""
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid bundle prefix")
So(transient.Tag.In(err), ShouldBeFalse)
})
Convey(`Will drop streams with invalid bundle prefixes.`, func() {
b := bb.genBase()
b.Prefix = "!!!invalid prefix!!!"
So(types.StreamName(b.Prefix).Validate(), ShouldNotBeNil)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "invalid bundle prefix")
So(transient.Tag.In(err), ShouldBeFalse)
})
Convey(`Will drop streams whose descriptor prefix doesn't match its bundle's prefix.`, func() {
bb.addStreamEntries("baz/+/bar", 3, 0, 1, 2, 3, 4)
err := coll.Process(c, bb.bundle())
So(err, ShouldErrLike, "mismatched bundle and entry prefixes")
So(transient.Tag.In(err), ShouldBeFalse)
})
Convey(`Will return no error if the data has a corrupt bundle header.`, func() {
So(coll.Process(c, []byte{0x00}), ShouldBeNil)
So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
})
Convey(`Will drop bundles with unknown ProtoVersion string.`, func() {
buf := bytes.Buffer{}
w := butlerproto.Writer{ProtoVersion: "!!!invalid!!!"}
w.Write(&buf, &logpb.ButlerLogBundle{})
So(coll.Process(c, buf.Bytes()), ShouldBeNil)
So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
})
Convey(`Will not ingest records if the stream is archived.`, func() {
tcc.register(cc.LogStreamState{
Project: "test-project",
Path: "foo/+/bar",
Secret: testSecret,
TerminalIndex: -1,
Archived: true,
})
bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar")
})
Convey(`Will not ingest records if the stream is purged.`, func() {
tcc.register(cc.LogStreamState{
Project: "test-project",
Path: "foo/+/bar",
Secret: testSecret,
TerminalIndex: -1,
Purged: true,
})
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "test-project", "foo/+/bar", -1)
So(st, shouldHaveStoredStream, "test-project", "foo/+/bar")
})
Convey(`Will not ingest a bundle with no bundle entries.`, func() {
So(coll.Process(c, bb.bundle()), ShouldBeNil)
})
Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
// Add a binary log entry. This does NOT match the text descriptor, and
// should fail validation.
be.Logs = append(be.Logs, &logpb.LogEntry{
StreamIndex: 2,
Sequence: 2,
Content: &logpb.LogEntry_Binary{
&logpb.Binary{
Data: []byte{0xd0, 0x6f, 0x00, 0xd5},
},
},
})
bb.addBundleEntry(be)
So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
So(tcc, shouldNotHaveRegisteredStream, "test-project", "foo/+/bar")
})
})
}
// TestCollector runs through a series of end-to-end Collector workflows and
// ensures that the Collector behaves appropriately.
func TestCollector(t *testing.T) {
t.Parallel()
testCollectorImpl(t, false)
}
// TestCollectorWithCaching runs through a series of end-to-end Collector
// workflows and ensures that the Collector behaves appropriately.
func TestCollectorWithCaching(t *testing.T) {
t.Parallel()
testCollectorImpl(t, true)
}