blob: abe7f36e8dfae68b3c8e2591743484f3f95f6114 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package coordinator
import (
"fmt"
"testing"
"time"
"github.com/luci/gae/impl/memory"
ds "github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/common/clock/testclock"
"github.com/luci/luci-go/common/proto/google"
"github.com/luci/luci-go/logdog/api/logpb"
"github.com/luci/luci-go/logdog/common/types"
"golang.org/x/net/context"
. "github.com/luci/luci-go/common/testing/assertions"
. "github.com/smartystreets/goconvey/convey"
)
func shouldHaveLogStreams(actual interface{}, expected ...interface{}) string {
var names []string
switch t := actual.(type) {
case error:
return t.Error()
case []*LogStream:
for _, ls := range t {
names = append(names, ls.Name)
}
default:
return fmt.Sprintf("unknown 'actual' type: %T", t)
}
exp := make([]string, len(expected))
for i, v := range expected {
s, ok := v.(string)
if !ok {
panic("non-string stream name specified")
}
exp[i] = s
}
return ShouldResemble(names, exp)
}
// sps constructs a ds.PropertySlice from a set of interfaces.
func sps(values ...interface{}) ds.PropertySlice {
ps := make(ds.PropertySlice, len(values))
for i, v := range values {
ps[i] = ds.MkProperty(v)
}
return ps
}
func updateLogStreamID(ls *LogStream) {
ls.ID = LogStreamID(ls.Path())
}
func TestLogStream(t *testing.T) {
t.Parallel()
Convey(`A testing log stream`, t, func() {
c, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
c = memory.Use(c)
ds.GetTestable(c).AutoIndex(true)
ds.GetTestable(c).Consistent(true)
now := ds.RoundTime(tc.Now().UTC())
ls := LogStream{
ID: LogStreamID("testing/+/log/stream"),
Prefix: "testing",
Name: "log/stream",
Created: now.UTC(),
ContentType: string(types.ContentTypeText),
}
desc := logpb.LogStreamDescriptor{
Prefix: "testing",
Name: "log/stream",
StreamType: logpb.StreamType_TEXT,
ContentType: "application/text",
Timestamp: google.NewTimestamp(now),
Tags: map[string]string{
"foo": "bar",
"baz": "qux",
"quux": "",
},
}
Convey(`Will skip invalid tags when loading.`, func() {
pmap, err := ls.Save(false)
So(err, ShouldBeNil)
pmap["_Tags"] = sps(encodeKey("!!!invalid key!!!"))
So(ls.Load(pmap), ShouldBeNil)
So(ls.Tags, ShouldResemble, TagMap(nil))
})
Convey(`With invalid tags will fail to encode.`, func() {
ls.Tags = TagMap{
"!!!invalid key!!!": "value",
}
ls.SetDSValidate(false)
_, err := ls.Save(true)
So(err, ShouldErrLike, "failed to encode tags")
})
Convey(`Can populate the LogStream with descriptor state.`, func() {
So(ls.LoadDescriptor(&desc), ShouldBeNil)
So(ls.Validate(), ShouldBeNil)
Convey(`Will not validate`, func() {
Convey(`Without a valid Prefix`, func() {
ls.Prefix = "!!!not a valid prefix!!!"
updateLogStreamID(&ls)
So(ls.Validate(), ShouldErrLike, "invalid prefix")
})
Convey(`Without a valid Name`, func() {
ls.Name = "!!!not a valid name!!!"
updateLogStreamID(&ls)
So(ls.Validate(), ShouldErrLike, "invalid name")
})
Convey(`Without a valid content type`, func() {
ls.ContentType = ""
So(ls.Validate(), ShouldErrLike, "empty content type")
})
Convey(`Without a valid created time`, func() {
ls.Created = time.Time{}
So(ls.Validate(), ShouldErrLike, "created time is not set")
})
Convey(`Without a valid stream type`, func() {
ls.StreamType = -1
So(ls.Validate(), ShouldErrLike, "unsupported stream type")
})
Convey(`Without an invalid tag: empty key`, func() {
ls.Tags[""] = "empty"
So(ls.Validate(), ShouldErrLike, "invalid tag")
})
Convey(`Without an invalid tag: bad key`, func() {
ls.Tags["!"] = "bad-value"
So(ls.Validate(), ShouldErrLike, "invalid tag")
})
Convey(`With an invalid descriptor protobuf`, func() {
ls.Descriptor = []byte{0x00} // Invalid tag, "0".
So(ls.Validate(), ShouldErrLike, "could not unmarshal descriptor")
})
})
Convey(`Can write the LogStream to the Datastore.`, func() {
So(ds.Put(c, &ls), ShouldBeNil)
Convey(`Can read the LogStream back from the Datastore.`, func() {
ls2 := LogStream{ID: ls.ID}
So(ds.Get(c, &ls2), ShouldBeNil)
So(ls2, ShouldResemble, ls)
})
})
})
Convey(`Will refuse to populate from an invalid descriptor.`, func() {
desc.StreamType = -1
So(ls.LoadDescriptor(&desc), ShouldErrLike, "invalid descriptor")
})
Convey(`Writing multiple LogStream entries`, func() {
times := map[string]time.Time{}
streamNames := []string{
"foo/bar",
"foo/bar/baz",
"baz/qux",
"cat/dog",
"cat/bird/dog",
"bird/plane",
}
for i, name := range streamNames {
lsCopy := ls
lsCopy.Name = name
lsCopy.Created = ds.RoundTime(now.Add(time.Duration(i) * time.Second))
updateLogStreamID(&lsCopy)
descCopy := desc
descCopy.Name = name
if err := lsCopy.LoadDescriptor(&descCopy); err != nil {
panic(err)
}
So(ds.Put(c, &lsCopy), ShouldBeNil)
times[name] = lsCopy.Created
}
Convey(`When querying LogStream by -Created`, func() {
q := ds.NewQuery("LogStream").Order("-Created")
Convey(`LogStream path queries`, func() {
Convey(`A query for "foo/bar" should return "foo/bar".`, func() {
q, err := AddLogStreamPathFilter(q, "**/+/foo/bar")
So(err, ShouldBeNil)
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, "foo/bar")
})
Convey(`A query for "foo/bar/*" should return "foo/bar/baz".`, func() {
q, err := AddLogStreamPathFilter(q, "**/+/foo/bar/*")
So(err, ShouldBeNil)
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, "foo/bar/baz")
})
Convey(`A query for "foo/**" should return "foo/bar/baz" and "foo/bar".`, func() {
q, err := AddLogStreamPathFilter(q, "**/+/foo/**")
So(err, ShouldBeNil)
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, "foo/bar/baz", "foo/bar")
})
Convey(`A query for "cat/**/dog" should return "cat/dog" and "cat/bird/dog".`, func() {
q, err := AddLogStreamPathFilter(q, "**/+/cat/**/dog")
So(err, ShouldBeNil)
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, "cat/bird/dog", "cat/dog")
})
})
Convey(`A timestamp inequality query for all records returns them in reverse order.`, func() {
// Reverse "streamNames".
si := make([]interface{}, len(streamNames))
for i := 0; i < len(streamNames); i++ {
si[i] = interface{}(streamNames[len(streamNames)-i-1])
}
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, si...)
})
Convey(`A query for "cat/**/dog" should return "cat/bird/dog" and "cat/dog".`, func() {
q, err := AddLogStreamPathFilter(q, "**/+/cat/**/dog")
So(err, ShouldBeNil)
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, "cat/bird/dog", "cat/dog")
})
Convey(`A query for streams older than "baz/qux" returns {"foo/bar/baz", and "foo/bar"}.`, func() {
q = AddOlderFilter(q, times["baz/qux"])
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, "foo/bar/baz", "foo/bar")
})
Convey(`A query for streams newer than "cat/dog" returns {"bird/plane", "cat/bird/dog"}.`, func() {
q = AddNewerFilter(q, times["cat/dog"])
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, "bird/plane", "cat/bird/dog")
})
Convey(`A query for "cat/**/dog" newer than "cat/dog" returns {"cat/bird/dog"}.`, func() {
q = AddNewerFilter(q, times["cat/dog"])
q, err := AddLogStreamPathFilter(q, "**/+/cat/**/dog")
So(err, ShouldBeNil)
var streams []*LogStream
So(ds.GetAll(c, q, &streams), ShouldBeNil)
So(streams, shouldHaveLogStreams, "cat/bird/dog")
})
})
})
})
}
func TestLogStreamPathFilter(t *testing.T) {
t.Parallel()
Convey(`A testing query`, t, func() {
q := ds.NewQuery("LogStream")
Convey(`Will construct a non-globbing query as Prefix/Name equality.`, func() {
q, err := AddLogStreamPathFilter(q, "foo/bar/+/baz/qux")
So(err, ShouldBeNil)
fq, err := q.Finalize()
So(err, ShouldBeNil)
So(fq.EqFilters(), ShouldResemble, map[string]ds.PropertySlice{
"Prefix": sps("foo/bar"),
"Name": sps("baz/qux"),
})
})
Convey(`Will refuse to query an invalid Prefix/Name.`, func() {
_, err := AddLogStreamPathFilter(q, "////+/baz/qux")
So(err, ShouldErrLike, "invalid Prefix component")
_, err = AddLogStreamPathFilter(q, "foo/bar/+//////")
So(err, ShouldErrLike, "invalid Name component")
})
Convey(`Will not impose any filters on an empty Prefix.`, func() {
q, err := AddLogStreamPathFilter(q, "/+/baz/qux")
So(err, ShouldBeNil)
fq, err := q.Finalize()
So(err, ShouldBeNil)
So(fq.EqFilters(), ShouldResemble, map[string]ds.PropertySlice{
"Name": sps("baz/qux"),
})
})
Convey(`Will not impose any filters on an empty Name.`, func() {
q, err := AddLogStreamPathFilter(q, "baz/qux")
So(err, ShouldBeNil)
fq, err := q.Finalize()
So(err, ShouldBeNil)
So(fq.EqFilters(), ShouldResemble, map[string]ds.PropertySlice{
"Prefix": sps("baz/qux"),
})
})
Convey(`Will glob out single Prefix components.`, func() {
q, err := AddLogStreamPathFilter(q, "foo/*/*/bar/*/baz/qux/*")
So(err, ShouldBeNil)
fq, err := q.Finalize()
So(err, ShouldBeNil)
So(fq.EqFilters(), ShouldResemble, map[string]ds.PropertySlice{
"_C": sps("PC:8", "PF:0:foo", "PF:3:bar", "PF:5:baz", "PF:6:qux"),
})
})
Convey(`Will handle end-of-query globbing.`, func() {
q, err := AddLogStreamPathFilter(q, "foo/*/bar/**")
So(err, ShouldBeNil)
fq, err := q.Finalize()
So(err, ShouldBeNil)
So(fq.EqFilters(), ShouldResemble, map[string]ds.PropertySlice{
"_C": sps("PF:0:foo", "PF:2:bar"),
})
})
Convey(`Will handle beginning-of-query globbing.`, func() {
q, err := AddLogStreamPathFilter(q, "**/foo/*/bar")
So(err, ShouldBeNil)
fq, err := q.Finalize()
So(err, ShouldBeNil)
So(fq.EqFilters(), ShouldResemble, map[string]ds.PropertySlice{
"_C": sps("PR:0:bar", "PR:2:foo"),
})
})
Convey(`Can handle middle-of-query globbing.`, func() {
q, err := AddLogStreamPathFilter(q, "*/foo/*/**/bar/*/baz/*")
So(err, ShouldBeNil)
fq, err := q.Finalize()
So(err, ShouldBeNil)
So(fq.EqFilters(), ShouldResemble, map[string]ds.PropertySlice{
"_C": sps("PF:1:foo", "PR:1:baz", "PR:3:bar"),
})
})
Convey(`Will error if more than one greedy glob is present.`, func() {
_, err := AddLogStreamPathFilter(q, "*/foo/**/bar/**")
So(err, ShouldErrLike, "cannot have more than one greedy glob")
})
})
}