// Copyright 2015 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package logs
import (
ds ""
logdog ""
ct ""
. ""
. ""
func shouldHaveLogPaths(actual interface{}, expected ...interface{}) string {
resp := actual.(*logdog.QueryResponse)
var paths []string
if len(resp.Streams) > 0 {
paths = make([]string, len(resp.Streams))
for i, s := range resp.Streams {
paths[i] = s.Path
var exp []string
for _, e := range expected {
switch t := e.(type) {
case []string:
exp = append(exp, t...)
case string:
exp = append(exp, t)
case types.StreamPath:
exp = append(exp, string(t))
panic(fmt.Errorf("unsupported expected type %T: %v", t, t))
return ShouldResemble(paths, exp)
func TestQuery(t *testing.T) {
Convey(`With a testing configuration, a Query request`, t, func() {
c, env := ct.Install(true)
c, fb := featureBreaker.FilterRDS(c, nil)
svr := New()
svrBase := svr.(*logdog.DecoratedLogs).Service.(*server)
const project = "proj-foo"
// Stock query request, will be modified by each test.
req := logdog.QueryRequest{
Project: project,
Tags: map[string]string{},
// Install a set of stock log streams to query against.
prefixToStreamPaths := map[string][]string{}
prefixToAllStreamPaths := map[string][]string{}
streams := map[string]*ct.TestStream{}
for i, v := range []types.StreamPath{
} {
tls := ct.MakeStream(c, project, "", v)
tls.Desc.ContentType = tls.Stream.Prefix
tls.Desc.Tags = map[string]string{
"prefix": tls.Stream.Prefix,
"name": tls.Stream.Name,
// Set an empty tag for each name segment.
for _, p := range types.StreamName(tls.Stream.Name).Segments() {
tls.Desc.Tags[p] = ""
now := env.Clock.Now().UTC()
prefix := tls.Stream.Prefix
psegs := types.StreamName(tls.Stream.Name).Segments()
if prefix == "meta" {
for _, p := range psegs {
switch p {
case "purged":
tls.Stream.Purged = true
case "archived":
tls.State.ArchiveStreamURL = ""
tls.State.ArchivedTime = now
So(tls.State.ArchivalState().Archived(), ShouldBeTrue)
fallthrough // Archived streams are also terminated.
case "terminated":
tls.State.TerminalIndex = 1337
tls.State.TerminatedTime = now
So(tls.State.Terminated(), ShouldBeTrue)
case "datagram":
tls.Desc.StreamType = logpb.StreamType_DATAGRAM
case "binary":
tls.Desc.StreamType = logpb.StreamType_BINARY
if err := tls.Put(c); err != nil {
panic(fmt.Errorf("failed to put log stream %d: %v", i, err))
streams[string(v)] = tls
if !tls.Stream.Purged {
prefixToStreamPaths[prefix] = append(
prefixToStreamPaths[prefix], string(v))
prefixToAllStreamPaths[prefix] = append(
prefixToAllStreamPaths[prefix], string(v))
// Invert streamPaths since we will return results in descending Created
// order.
invert := func(s []string) {
for i := 0; i < len(s)/2; i++ {
eidx := len(s) - i - 1
s[i], s[eidx] = s[eidx], s[i]
for _, paths := range prefixToStreamPaths {
for _, paths := range prefixToAllStreamPaths {
Convey(`An empty query will return an error.`, func() {
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInvalidArgument, "invalid query `path`")
Convey(`Handles non-existent project.`, func() {
req.Project = "does-not-exist"
req.Path = "testing/+/**"
Convey(`Anon`, func() {
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCUnauthenticated)
Convey(`User`, func() {
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCPermissionDenied)
Convey(`Authorization rules`, func() {
// "proj-exclusive" has restricted legacy ACLs, see coordinatorTest.
const project = "proj-exclusive"
req.Project = "proj-exclusive"
req.Path = "exclusive/+/**"
const (
User = ""
Anon = identity.AnonymousIdentity
Legacy = "" // present in @legacy realm ACL
const (
NoRealm = ""
AllowedRealm = "allowed"
ForbiddenRealm = "forbidden"
const (
AllowLegacy = true
ForbidLegacy = false
realm := func(short string) string {
return realms.Join(project, short)
cases := []struct {
enforce bool // is enforce_realms_in ON or OFF
ident identity.Identity // who's making the call
realm string // the realm to put the stream in
allowLegacy bool // mock legacy ACLs to allow access
code codes.Code // the expected gRPC code
// Enforcement is on, legacy ACLs are ignored.
{true, User, NoRealm, AllowLegacy, codes.PermissionDenied},
{true, User, NoRealm, ForbidLegacy, codes.PermissionDenied},
{true, Legacy, NoRealm, AllowLegacy, codes.OK},
{true, Legacy, NoRealm, ForbidLegacy, codes.OK},
{true, User, AllowedRealm, AllowLegacy, codes.OK},
{true, User, AllowedRealm, ForbidLegacy, codes.OK},
{true, User, ForbiddenRealm, AllowLegacy, codes.PermissionDenied},
{true, User, ForbiddenRealm, ForbidLegacy, codes.PermissionDenied},
// Enforcement is off, have a fallback on legacy ACLs.
{false, User, NoRealm, AllowLegacy, codes.OK},
{false, User, NoRealm, ForbidLegacy, codes.PermissionDenied},
{false, Legacy, NoRealm, AllowLegacy, codes.OK},
{false, Legacy, NoRealm, ForbidLegacy, codes.OK},
{false, User, AllowedRealm, AllowLegacy, codes.OK},
{false, User, AllowedRealm, ForbidLegacy, codes.OK},
{false, User, ForbiddenRealm, AllowLegacy, codes.OK},
{false, User, ForbiddenRealm, ForbidLegacy, codes.PermissionDenied},
// Permission denied for anon => Unauthenticated.
{true, Anon, ForbiddenRealm, AllowLegacy, codes.Unauthenticated},
{false, Anon, NoRealm, ForbidLegacy, codes.Unauthenticated},
for i, test := range cases {
Convey(fmt.Sprintf("Case #%d", i), func() {
tls := ct.MakeStream(c, project, test.realm, "exclusive/+/foo")
mocks := []authtest.MockedDatum{
authtest.MockPermission(test.ident, realm(AllowedRealm), coordinator.PermLogsList),
authtest.MockPermission(Legacy, realm(realms.LegacyRealm), coordinator.PermLogsList),
if test.enforce {
mocks = append(mocks, authtest.MockRealmData(
&protocol.RealmData{EnforceInService: []string{env.ServiceID}},
if test.allowLegacy {
mocks = append(mocks, authtest.MockMembership(test.ident, "auth"))
c := auth.WithState(c, &authtest.FakeState{
Identity: test.ident,
FakeDB: authtest.NewFakeDB(mocks...),
resp, err := svr.Query(c, &req)
So(status.Code(err), ShouldEqual, test.code)
if err == nil {
So(resp.Project, ShouldEqual, project)
So(resp.Realm, ShouldEqual, test.realm)
Convey(`An empty query will include purged streams if admin.`, func() {
req.Path = "meta/+/**"
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, prefixToAllStreamPaths["meta"])
Convey(`A query with an invalid path will return BadRequest error.`, func() {
req.Path = "***"
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInvalidArgument, "invalid query `path`")
Convey(`A query with an invalid Next cursor will return BadRequest error.`, func() {
req.Next = "invalid"
req.Path = "testing/+/**"
fb.BreakFeatures(errors.New("testing error"), "DecodeCursor")
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInvalidArgument, "invalid `next` value")
Convey(`A datastore query error will return InternalServer error.`, func() {
req.Path = "testing/+/**"
fb.BreakFeatures(errors.New("testing error"), "Run")
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInternal)
Convey(`When querying for "testing/+/baz"`, func() {
req.Path = "testing/+/baz"
tls := streams["testing/+/baz"]
Convey(`State is not returned.`, func() {
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "testing/+/baz")
So(resp.Streams, ShouldHaveLength, 1)
So(resp.Streams[0].State, ShouldBeNil)
So(resp.Streams[0].Desc, ShouldBeNil)
So(resp.Streams[0].DescProto, ShouldBeNil)
Convey(`When requesting state`, func() {
req.State = true
Convey(`When not requesting protobufs, returns a descriptor structure.`, func() {
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "testing/+/baz")
So(resp.Streams, ShouldHaveLength, 1)
So(resp.Streams[0].State, ShouldResemble, buildLogStreamState(tls.Stream, tls.State))
So(resp.Streams[0].Desc, ShouldResembleProto, tls.Desc)
So(resp.Streams[0].DescProto, ShouldBeNil)
Convey(`When not requesting protobufs, and with a corrupt descriptor, returns InternalServer error.`, func() {
tls.Stream.Descriptor = []byte{0x00} // Invalid protobuf, zero tag.
if err := tls.Put(c); err != nil {
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInternal)
Convey(`When requesting protobufs, returns the raw protobuf descriptor.`, func() {
req.Proto = true
resp, err := svr.Query(c, &req)
So(err, ShouldBeNil)
So(resp, shouldHaveLogPaths, "testing/+/baz")
So(resp.Streams, ShouldHaveLength, 1)
So(resp.Streams[0].State, ShouldResemble, buildLogStreamState(tls.Stream, tls.State))
So(resp.Streams[0].Desc, ShouldResembleProto, tls.Desc)
Convey(`With a query limit of 3`, func() {
svrBase.resultLimit = 3
Convey(`Can iteratively query to retrieve all stream paths.`, func() {
var seen []string
req.Path = "testing/+/**"
streamPaths := append([]string(nil), prefixToStreamPaths["testing"]...)
next := ""
for {
req.Next = next
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
for _, svr := range resp.Streams {
seen = append(seen, svr.Path)
next = resp.Next
if next == "" {
So(seen, ShouldResemble, streamPaths)
Convey(`When querying against timestamp constraints`, func() {
req.Older = timestamppb.New(env.Clock.Now())
Convey(`Querying for entries created at or after 2 seconds ago (latest 2 entries).`, func() {
req.Path = "testing/+/**"
req.Newer = timestamppb.New(env.Clock.Now().Add(-2*time.Second - time.Millisecond))
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "testing/+/baz", "testing/+/foo/bar/baz")
Convey(`With a query limit of 2`, func() {
svrBase.resultLimit = 2
Convey(`A query request will return the newest 2 entries and have a Next cursor for the next 2.`, func() {
req.Path = "testing/+/**"
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "testing/+/baz", "testing/+/foo/bar/baz")
So(resp.Next, ShouldNotEqual, "")
// Iterate.
req.Next = resp.Next
resp, err = svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "testing/+/foo/bar", "testing/+/foo")
Convey(`A datastore query error will return InternalServer error.`, func() {
req.Path = "testing/+/**"
fb.BreakFeatures(errors.New("testing error"), "Run")
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInternal)
Convey(`When querying for meta streams`, func() {
req.Path = "meta/+/**"
Convey(`When purged=yes, returns BadRequest error.`, func() {
req.Purged = logdog.QueryRequest_YES
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInvalidArgument, "non-admin user cannot request purged log streams")
Convey(`When the user is an administrator`, func() {
Convey(`When purged=yes, returns [terminated/archived/purged, purged]`, func() {
req.Purged = logdog.QueryRequest_YES
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths,
Convey(`When purged=no, returns [binary, datagram, archived, terminated]`, func() {
req.Purged = logdog.QueryRequest_NO
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths,
Convey(`When querying for text streams, returns [archived, terminated]`, func() {
req.StreamType = &logdog.QueryRequest_StreamTypeFilter{Value: logpb.StreamType_TEXT}
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "meta/+/archived/foo", "meta/+/terminated/foo")
Convey(`When querying for binary streams, returns [binary]`, func() {
req.StreamType = &logdog.QueryRequest_StreamTypeFilter{Value: logpb.StreamType_BINARY}
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "meta/+/binary/foo")
Convey(`When querying for datagram streams, returns [datagram]`, func() {
req.StreamType = &logdog.QueryRequest_StreamTypeFilter{Value: logpb.StreamType_DATAGRAM}
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "meta/+/datagram/foo")
Convey(`When querying for an invalid stream type, returns a BadRequest error.`, func() {
req.StreamType = &logdog.QueryRequest_StreamTypeFilter{Value: -1}
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInvalidArgument)
Convey(`When querying for content type "other", returns [other/+/baz, other/+/foo/bar].`, func() {
req.Path = "other/+/**"
req.ContentType = "other"
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "other/+/baz", "other/+/foo/bar")
Convey(`When querying for tags`, func() {
Convey(`Tag "baz", returns [testing/+/baz, testing/+/foo/bar/baz]`, func() {
req.Path = "testing/+/**"
req.Tags["baz"] = ""
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "testing/+/baz", "testing/+/foo/bar/baz")
Convey(`Tags "prefix=testing", "baz", returns [testing/+/baz, testing/+/foo/bar/baz]`, func() {
req.Path = "testing/+/**"
req.Tags["baz"] = ""
req.Tags["prefix"] = "testing"
resp, err := svr.Query(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogPaths, "testing/+/baz", "testing/+/foo/bar/baz")
Convey(`When an invalid tag is specified, returns BadRequest error`, func() {
req.Path = "testing/+/**"
req.Tags["+++not a valid tag+++"] = ""
_, err := svr.Query(c, &req)
So(err, ShouldBeRPCInvalidArgument, "invalid tag constraint")