blob: a801a5f70de10c5fb76e6cdf5d9bc4fa7b091b2e [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package coordinator
import (
"context"
"fmt"
"testing"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/gae/impl/memory"
ds "go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/types"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func shouldHaveLogPaths(actual interface{}, expected ...interface{}) string {
var names []string
switch t := actual.(type) {
case error:
return t.Error()
case []*LogStream:
for _, ls := range t {
names = append(names, string(ls.Path()))
}
default:
return fmt.Sprintf("unknown 'actual' type: %T", t)
}
exp := make([]string, len(expected))
for i, v := range expected {
s, ok := v.(string)
if !ok {
panic("non-string stream name specified")
}
exp[i] = s
}
return ShouldResemble(names, exp)
}
func updateLogStreamID(ls *LogStream) {
ls.ID = LogStreamID(ls.Path())
}
func TestLogStream(t *testing.T) {
t.Parallel()
Convey(`A testing log stream`, t, func() {
c, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
c = memory.Use(c)
ds.GetTestable(c).AutoIndex(true)
ds.GetTestable(c).Consistent(true)
now := ds.RoundTime(tc.Now().UTC())
ls := LogStream{
ID: LogStreamID("testing/+/log/stream"),
Prefix: "testing",
Name: "log/stream",
Created: now.UTC(),
}
desc := logpb.LogStreamDescriptor{
Prefix: "testing",
Name: "log/stream",
StreamType: logpb.StreamType_TEXT,
ContentType: string(types.ContentTypeText),
Timestamp: timestamppb.New(now),
Tags: map[string]string{
"foo": "bar",
"baz": "qux",
"quux": "",
},
}
Convey(`Can populate the LogStream with descriptor state.`, func() {
So(ls.LoadDescriptor(&desc), ShouldBeNil)
So(ls.Validate(), ShouldBeNil)
Convey(`Will not validate`, func() {
Convey(`Without a valid Prefix`, func() {
ls.Prefix = "!!!not a valid prefix!!!"
updateLogStreamID(&ls)
So(ls.Validate(), ShouldErrLike, "invalid prefix")
})
Convey(`Without a valid Name`, func() {
ls.Name = "!!!not a valid name!!!"
updateLogStreamID(&ls)
So(ls.Validate(), ShouldErrLike, "invalid name")
})
Convey(`Without a valid created time`, func() {
ls.Created = time.Time{}
So(ls.Validate(), ShouldErrLike, "created time is not set")
})
Convey(`With an invalid descriptor protobuf`, func() {
ls.Descriptor = []byte{0x00} // Invalid tag, "0".
So(ls.Validate(), ShouldErrLike, "could not unmarshal descriptor")
})
})
Convey(`Can write the LogStream to the Datastore.`, func() {
So(ds.Put(c, &ls), ShouldBeNil)
Convey(`Can read the LogStream back from the Datastore.`, func() {
ls2 := LogStream{ID: ls.ID}
So(ds.Get(c, &ls2), ShouldBeNil)
So(ls2, ShouldResemble, ls)
})
})
})
Convey(`Will refuse to populate from an invalid descriptor.`, func() {
desc.StreamType = -1
So(ls.LoadDescriptor(&desc), ShouldErrLike, "invalid descriptor")
})
Convey(`Writing multiple LogStream entries`, func() {
times := map[string]*timestamppb.Timestamp{}
streamPaths := []string{
"testing/+/foo/bar",
"testing/+/foo/bar/baz",
"testing/+/baz/qux",
"testing/+/cat/dog",
"testing/+/cat/bird/dog",
"testing/+/bird/plane",
}
for i, path := range streamPaths {
_, splitName := types.StreamPath(path).Split()
name := string(splitName)
lsCopy := ls
lsCopy.Name = name
lsCopy.Created = ds.RoundTime(now.Add(time.Duration(i) * time.Second))
updateLogStreamID(&lsCopy)
descCopy := desc
descCopy.Name = name
if err := lsCopy.LoadDescriptor(&descCopy); err != nil {
panic(fmt.Errorf("in %#v: %s", descCopy, err))
}
So(ds.Put(c, &lsCopy), ShouldBeNil)
times[name] = timestamppb.New(lsCopy.Created)
}
getAll := func(q *LogStreamQuery) []*LogStream {
var streams []*LogStream
err := q.Run(c, func(ls *LogStream, _ ds.CursorCB) error {
streams = append(streams, ls)
return nil
})
So(err, ShouldBeNil)
return streams
}
Convey(`When querying LogStream`, func() {
Convey(`LogStream path queries`, func() {
Convey(`A query for "foo/bar" should return "foo/bar".`, func() {
q, err := NewLogStreamQuery("testing/+/foo/bar")
So(err, ShouldBeNil)
So(getAll(q), shouldHaveLogPaths, "testing/+/foo/bar")
})
Convey(`A query for "foo/bar/*" should return "foo/bar/baz".`, func() {
q, err := NewLogStreamQuery("testing/+/foo/bar/*")
So(err, ShouldBeNil)
So(getAll(q), shouldHaveLogPaths, "testing/+/foo/bar/baz")
})
Convey(`A query for "foo/**" should return "foo/bar/baz" and "foo/bar".`, func() {
q, err := NewLogStreamQuery("testing/+/foo/**")
So(err, ShouldBeNil)
So(getAll(q), shouldHaveLogPaths,
"testing/+/foo/bar/baz", "testing/+/foo/bar")
})
Convey(`A query for "cat/**/dog" should return "cat/dog" and "cat/bird/dog".`, func() {
q, err := NewLogStreamQuery("testing/+/cat/**/dog")
So(err, ShouldBeNil)
So(getAll(q), shouldHaveLogPaths,
"testing/+/cat/bird/dog",
"testing/+/cat/dog",
)
})
})
Convey(`A timestamp inequality query for all records returns them in reverse order.`, func() {
// Reverse "streamPaths".
si := make([]interface{}, len(streamPaths))
for i := 0; i < len(streamPaths); i++ {
si[i] = interface{}(streamPaths[len(streamPaths)-i-1])
}
q, err := NewLogStreamQuery("testing")
So(err, ShouldBeNil)
So(getAll(q), shouldHaveLogPaths, si...)
})
Convey(`A query for "cat/**/dog" should return "cat/bird/dog" and "cat/dog".`, func() {
q, err := NewLogStreamQuery("testing/+/cat/**/dog")
So(err, ShouldBeNil)
So(getAll(q), shouldHaveLogPaths,
"testing/+/cat/bird/dog", "testing/+/cat/dog")
})
Convey(`A query for streams older than "baz/qux" returns {"foo/bar/baz", and "foo/bar"}.`, func() {
q, err := NewLogStreamQuery("testing")
So(err, ShouldBeNil)
q.TimeBound(nil, times["baz/qux"])
So(getAll(q), shouldHaveLogPaths,
"testing/+/foo/bar/baz", "testing/+/foo/bar")
})
Convey(`A query for streams newer than "cat/dog" returns {"bird/plane", "cat/bird/dog"}.`, func() {
q, err := NewLogStreamQuery("testing")
So(err, ShouldBeNil)
q.TimeBound(times["cat/dog"], nil)
So(getAll(q), shouldHaveLogPaths,
"testing/+/bird/plane",
"testing/+/cat/bird/dog",
)
})
Convey(`A query for "cat/**/dog" newer than "cat/dog" returns {"cat/bird/dog"}.`, func() {
q, err := NewLogStreamQuery("testing/+/cat/**/dog")
So(err, ShouldBeNil)
q.TimeBound(times["cat/dog"], nil)
So(getAll(q), shouldHaveLogPaths, "testing/+/cat/bird/dog")
})
})
})
})
}
func TestNewLogStreamGlob(t *testing.T) {
t.Parallel()
mkLS := func(path string, now time.Time) *LogStream {
prefix, name := types.StreamPath(path).Split()
ret := &LogStream{Created: now}
So(ret.LoadDescriptor(&logpb.LogStreamDescriptor{
Prefix: string(prefix),
Name: string(name),
ContentType: string(types.ContentTypeText),
Timestamp: timestamppb.New(now),
}), ShouldBeNil)
updateLogStreamID(ret)
return ret
}
getAllMatches := func(q *LogStreamQuery, logPaths ...string) []*LogStream {
ctx := memory.Use(context.Background())
ds.GetTestable(ctx).AutoIndex(true)
ds.GetTestable(ctx).Consistent(true)
logStreams := make([]*LogStream, len(logPaths))
now := testclock.TestTimeUTC
for i, path := range logPaths {
logStreams[i] = mkLS(path, now)
now = now.Add(time.Second)
}
So(ds.Put(ctx, logStreams), ShouldBeNil)
var streams []*LogStream
err := q.Run(ctx, func(ls *LogStream, _ ds.CursorCB) error {
streams = append(streams, ls)
return nil
})
So(err, ShouldBeNil)
return streams
}
Convey(`A testing query`, t, func() {
Convey(`Will construct a non-globbing query as Prefix/Name equality.`, func() {
q, err := NewLogStreamQuery("foo/bar/+/baz/qux")
So(err, ShouldBeNil)
So(getAllMatches(q,
"foo/bar/+/baz/qux",
"foo/bar/+/baz/qux/other",
"foo/bar/+/baz",
"other/prefix/+/baz/qux",
), shouldHaveLogPaths,
"foo/bar/+/baz/qux",
)
})
Convey(`Will refuse to query an invalid Prefix/Name.`, func() {
_, err := NewLogStreamQuery("////+/baz/qux")
So(err, ShouldErrLike, "prefix invalid")
_, err = NewLogStreamQuery("foo/bar/+//////")
So(err, ShouldErrLike, "name invalid")
})
Convey(`Returns error on empty prefix.`, func() {
_, err := NewLogStreamQuery("/+/baz/qux")
So(err, ShouldErrLike, "prefix invalid: empty")
})
Convey(`Treats empty name like **.`, func() {
q, err := NewLogStreamQuery("baz/qux")
So(err, ShouldBeNil)
So(getAllMatches(q,
"baz/qux/+/narp",
"baz/qux/+/blats/stuff",
"baz/qux/+/nerds/cool_pants",
"other/prefix/+/baz/qux",
), shouldHaveLogPaths,
"baz/qux/+/nerds/cool_pants",
"baz/qux/+/blats/stuff",
"baz/qux/+/narp",
)
})
Convey(`Properly escapes non-* metachars.`, func() {
q, err := NewLogStreamQuery("baz/qux/+/hi..../**")
So(err, ShouldBeNil)
So(getAllMatches(q,
"baz/qux/+/hi....",
"baz/qux/+/hi..../some_stuff",
"baz/qux/+/hiblat",
"baz/qux/+/hiblat/some_stuff",
), shouldHaveLogPaths,
"baz/qux/+/hi..../some_stuff",
"baz/qux/+/hi....",
)
})
Convey(`Will glob out single Name components.`, func() {
q, err := NewLogStreamQuery("pfx/+/foo/*/*/bar/*/baz/qux/*")
So(err, ShouldBeNil)
So(getAllMatches(q,
"pfx/+/foo/a/b/bar/c/baz/qux/d",
"pfx/+/foo/bar/baz/qux",
"pfx/+/foo/a/extra/b/bar/c/baz/qux/d",
), shouldHaveLogPaths,
"pfx/+/foo/a/b/bar/c/baz/qux/d",
)
})
Convey(`Will handle end-of-query globbing.`, func() {
q, err := NewLogStreamQuery("pfx/+/foo/*/bar/**")
So(err, ShouldBeNil)
So(getAllMatches(q,
"pfx/+/foo/a/bar",
"pfx/+/foo/a/bar/stuff",
"pfx/+/foo/a/bar/even/more/stuff",
"pfx/+/foo/a/extra/bar",
"pfx/+/nope/a/bar",
), shouldHaveLogPaths,
"pfx/+/foo/a/bar/even/more/stuff",
"pfx/+/foo/a/bar/stuff",
"pfx/+/foo/a/bar",
)
})
Convey(`Will handle beginning-of-query globbing.`, func() {
q, err := NewLogStreamQuery("pfx/+/**/foo/*/bar")
So(err, ShouldBeNil)
So(getAllMatches(q,
"pfx/+/extra/foo/a/bar",
"pfx/+/even/more/extra/foo/a/bar",
"pfx/+/foo/a/bar",
"pfx/+/foo/a/bar/extra",
"pfx/+/foo/bar",
), shouldHaveLogPaths,
"pfx/+/foo/a/bar",
"pfx/+/even/more/extra/foo/a/bar",
"pfx/+/extra/foo/a/bar",
)
})
Convey(`Can handle middle-of-query globbing.`, func() {
q, err := NewLogStreamQuery("pfx/+/*/foo/*/**/bar/*/baz/*")
So(err, ShouldBeNil)
So(getAllMatches(q,
"pfx/+/a/foo/b/stuff/bar/c/baz/d",
"pfx/+/a/foo/b/lots/of/stuff/bar/c/baz/d",
"pfx/+/a/foo/b/bar/c/baz/d",
"pfx/+/foo/a/bar/b/baz/c",
), shouldHaveLogPaths,
"pfx/+/a/foo/b/bar/c/baz/d",
"pfx/+/a/foo/b/lots/of/stuff/bar/c/baz/d",
"pfx/+/a/foo/b/stuff/bar/c/baz/d",
)
})
})
}