blob: 0ba4a88530db92b59fa814edec83a8ed14ecfcb5 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package search
import (
"container/heap"
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/auth/identity"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/data/strpair"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/appstatus"
bb "go.chromium.org/luci/buildbucket"
"go.chromium.org/luci/buildbucket/appengine/internal/buildid"
"go.chromium.org/luci/buildbucket/appengine/internal/perm"
"go.chromium.org/luci/buildbucket/appengine/model"
"go.chromium.org/luci/buildbucket/bbperms"
pb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/buildbucket/protoutil"
)
const (
defaultPageSize = 100
maxPageSize = 1000
)
var (
PageTokenRegex = regexp.MustCompile(`^id>\d+$`)
)
// Query is the intermediate to store the arguments for ds search query.
type Query struct {
Builder *pb.BuilderID
Tags strpair.Map
Status pb.Status
CreatedBy identity.Identity
StartTime time.Time
EndTime time.Time
ExperimentFilters stringset.Set
BuildIDHigh int64
BuildIDLow int64
DescendantOf int64
ChildOf int64
PageSize int32
PageToken string
}
// NewQuery builds a Query from a pb.SearchBuildsRequest.
// It assumes req is valid, otherwise may panic.
func NewQuery(req *pb.SearchBuildsRequest) *Query {
if req.GetPredicate() == nil {
return &Query{
PageSize: fixPageSize(req.GetPageSize()),
PageToken: req.GetPageToken(),
}
}
p := req.Predicate
q := &Query{
Builder: p.GetBuilder(),
Tags: protoutil.StringPairMap(p.Tags),
Status: p.Status,
CreatedBy: identity.Identity(fixCreatedBy(p.CreatedBy)),
StartTime: mustTimestamp(p.CreateTime.GetStartTime()),
EndTime: mustTimestamp(p.CreateTime.GetEndTime()),
ExperimentFilters: stringset.NewFromSlice(p.Experiments...),
DescendantOf: p.DescendantOf,
ChildOf: p.ChildOf,
PageSize: fixPageSize(req.PageSize),
PageToken: req.PageToken,
}
// Filter by gerrit changes.
for _, change := range p.GerritChanges {
q.Tags.Add("buildset", protoutil.GerritBuildSet(change))
}
// Filter by build range.
// BuildIds less or equal to 0 means no boundary.
// Convert BuildRange to [buildLow, buildHigh).
// Note that unlike buildLow/buildHigh, BuildRange in req encapsulates the fact
// that build ids are decreasing. So we need to reverse the order.
if p.Build.GetStartBuildId() > 0 {
// Add 1 because startBuildId is inclusive and buildHigh is exclusive.
q.BuildIDHigh = p.Build.GetStartBuildId() + 1
}
if p.Build.GetEndBuildId() > 0 {
// Subtract 1 because endBuildId is exclusive and buildLow is inclusive.
q.BuildIDLow = p.Build.GetEndBuildId() - 1
}
// Filter by canary. Note that validateExperiment has already verified that
// p.Experiments doesn't contain a filter for ExperimentBBCanarySoftware.
if c := p.GetCanary(); c == pb.Trinary_YES {
q.ExperimentFilters.Add("+" + bb.ExperimentBBCanarySoftware)
} else if c == pb.Trinary_NO {
q.ExperimentFilters.Add("-" + bb.ExperimentBBCanarySoftware)
}
// Apply IncludeExperimental.
//
// If the user explicitly requested builds which were non_production, then we
// ignore this.
if !p.IncludeExperimental && !q.ExperimentFilters.Has("+"+bb.ExperimentNonProduction) {
q.ExperimentFilters.Add("-" + bb.ExperimentNonProduction)
}
return q
}
// IndexedTags returns the indexed tags.
func IndexedTags(tags strpair.Map) []string {
set := make(stringset.Set)
for k, vals := range tags {
if k != "buildset" && k != "build_address" {
continue
}
for _, val := range vals {
set.Add(strpair.Format(k, val))
}
}
return set.ToSortedSlice()
}
// UpdateTagIndex updates the tag index for the given builds. Panics if any
// build.Proto.Builder is unspecified.
func UpdateTagIndex(ctx context.Context, builds []*model.Build) errors.MultiError {
merr := make(errors.MultiError, len(builds))
tagToBldIdx := make(map[string][]int) // tag -> builds indexes
indexEntries := make(map[string][]model.TagIndexEntry) // tag -> entries
for i, b := range builds {
for _, t := range IndexedTags(strpair.ParseMap(b.Tags)) {
indexEntries[t] = append(indexEntries[t], model.TagIndexEntry{
BuildID: b.ID,
BucketID: protoutil.FormatBucketID(b.Proto.Builder.Project, b.Proto.Builder.Bucket),
CreatedTime: mustTimestamp(b.Proto.CreateTime),
})
tagToBldIdx[t] = append(tagToBldIdx[t], i)
}
}
_ = parallel.WorkPool(64, func(work chan<- func() error) {
for tag, ents := range indexEntries {
tag := tag
ents := ents
work <- func() error {
if err := model.UpdateTagIndex(ctx, tag, ents); err != nil {
for _, i := range tagToBldIdx[tag] {
merr[i] = err
}
}
return nil
}
}
})
if merr.First() != nil {
return merr
}
return nil
}
// Fetch performs main build search logic.
func (q *Query) Fetch(ctx context.Context) (*pb.SearchBuildsResponse, error) {
if !buildid.MayContainBuilds(q.StartTime, q.EndTime) {
return &pb.SearchBuildsResponse{}, nil
}
// Verify bucket ACL permission.
if q.Builder != nil && q.Builder.Bucket != "" {
if err := perm.HasInBuilder(ctx, bbperms.BuildsList, q.Builder); err != nil {
return nil, err
}
}
cpy := *q
q = &cpy
q.PageSize = fixPageSize(q.PageSize)
// Determine which subflow - directly query on Builds or on TagIndex.
if len(IndexedTags(q.Tags)) != 0 {
switch res, err := q.fetchOnTagIndex(ctx); {
case model.TagIndexIncomplete.In(err):
logging.Warningf(ctx, "Falling back to querying search on builds")
case err != nil:
return nil, err
default:
return res, nil
}
}
return q.fetchOnBuild(ctx)
}
// fetchOnBuild fetches directly on Build entity.
func (q *Query) fetchOnBuild(ctx context.Context) (*pb.SearchBuildsResponse, error) {
dq := datastore.NewQuery(model.BuildKind)
for _, tag := range q.Tags.Format() {
dq = dq.Eq("tags", tag)
}
switch {
case q.Status == pb.Status_ENDED_MASK:
dq = dq.Eq("incomplete", false)
case q.Status != pb.Status_STATUS_UNSPECIFIED:
dq = dq.Eq("status_v2", q.Status)
}
if q.CreatedBy != "" {
dq = dq.Eq("created_by", q.CreatedBy)
}
var dropExperimental bool
q.ExperimentFilters.Iter(func(filter string) bool {
if filter[0] == '-' && filter[1:] == bb.ExperimentNonProduction {
// filter these in post
dropExperimental = true
} else {
dq = dq.Eq("experiments", filter)
}
return true
})
idLow, idHigh := q.idRange()
if idLow != 0 {
dq = dq.Gte("__key__", datastore.KeyForObj(ctx, &model.Build{ID: idLow}))
}
if idHigh != 0 {
dq = dq.Lt("__key__", datastore.KeyForObj(ctx, &model.Build{ID: idHigh}))
}
if idLow != 0 && idHigh != 0 && idLow >= idHigh {
return &pb.SearchBuildsResponse{}, nil
}
if q.DescendantOf != 0 {
dq = dq.Eq("ancestor_ids", q.DescendantOf)
}
if q.ChildOf != 0 {
dq = dq.Eq("parent_id", q.ChildOf)
}
var queries []*datastore.Query
var buckets []string
var err error
switch {
case q.Builder.GetBuilder() != "":
queries = append(queries, dq.Eq("builder_id", protoutil.FormatBuilderID(q.Builder)))
case q.Builder.GetBucket() != "":
buckets = []string{protoutil.FormatBucketID(q.Builder.Project, q.Builder.Bucket)}
default:
switch buckets, err = perm.BucketsByPerm(ctx, bbperms.BuildersList, q.Builder.GetProject()); {
case err != nil:
return nil, errors.Annotate(err, "error fetching accessible buckets").Err()
case len(buckets) == 0:
return &pb.SearchBuildsResponse{}, nil
}
}
for _, bucket := range buckets {
queries = append(queries, dq.Eq("bucket_id", bucket))
}
rsp := &pb.SearchBuildsResponse{}
logging.Debugf(ctx, "datastore query for FetchOnBuild: %v", queries)
err = datastore.RunMulti(ctx, queries, func(b *model.Build) error {
if len(rsp.Builds) >= int(q.PageSize) {
return datastore.Stop
}
// Check the build status again, as the index might be stale.
if q.Status != pb.Status_STATUS_UNSPECIFIED &&
q.Status != pb.Status_ENDED_MASK &&
q.Status != b.Status {
return nil
}
// Filter non-production builds here instead of at the datastore level to
// reduce the zigzag merge in index scans as the majority of builds are
// production.
if dropExperimental && b.ExperimentStatus(bb.ExperimentNonProduction) == pb.Trinary_YES {
return nil
}
rsp.Builds = append(rsp.Builds, b.ToSimpleBuildProto(ctx))
return nil
})
if err != nil {
return nil, err
}
if len(rsp.Builds) == int(q.PageSize) {
rsp.NextPageToken = fmt.Sprintf("id>%d", rsp.Builds[q.PageSize-1].Id)
}
return rsp, nil
}
// fetchOnTagIndex searches for builds using the TagIndex entities.
func (q *Query) fetchOnTagIndex(ctx context.Context) (*pb.SearchBuildsResponse, error) {
// Have checked earlier that len(IndexedTags) > 0.
// Choose the most selective tag to search by.
indexedTag := IndexedTags(q.Tags)[0]
k, v := strpair.Parse(indexedTag)
// Load tag index entries and put them to a min-heap, sorted by build ID.
entries, err := model.SearchTagIndex(ctx, k, v)
if err != nil {
return nil, err
}
var eHeap minHeap
switch filteredEntries, err := q.filterEntries(ctx, entries); {
case err != nil:
return nil, err
case len(filteredEntries) == 0:
return &pb.SearchBuildsResponse{}, nil
default:
eHeap = filteredEntries
}
heap.Init(&eHeap)
// Find the builds.
results := make([]*pb.Build, 0, q.PageSize) // Ordered by build id by ascending.
var lastConsideredEntry *model.TagIndexEntry
inconsistentEntries := 0
var entriesToFetch []*model.TagIndexEntry
tags := q.Tags.Format()
// We don't record "-luci.non_production" on every build, so when the user
// asked for this filter, we replace it with a negated filter for the opposite
// experiment (i.e. `"+luci.non_production" not in b.experiments`).
//
// We could use b.ExperimentStatus here, but since we have to convert
// b.Experiments to a stringset anyway, we avoid looping twice by checking
// if nonProdFilter is in that stringset.
expFilter := q.ExperimentFilters.Dup()
nonProdFilter := ""
if expFilter.Del("-" + bb.ExperimentNonProduction) {
nonProdFilter = "+" + bb.ExperimentNonProduction
}
for len(results) < int(q.PageSize) {
toFetchCount := int(q.PageSize) - len(results)
entriesToFetch = entriesToFetch[:0]
for eHeap.Len() > 0 && len(entriesToFetch) < toFetchCount {
entry := heap.Pop(&eHeap).(*model.TagIndexEntry)
prev := lastConsideredEntry
lastConsideredEntry = entry
// Tolerate duplicates.
if prev != nil && prev.BuildID == entry.BuildID {
continue
}
entriesToFetch = append(entriesToFetch, entry)
}
if len(entriesToFetch) == 0 {
break
}
// Fetch builds
builds := make([]*model.Build, len(entriesToFetch))
for i, e := range entriesToFetch {
builds[i] = &model.Build{ID: e.BuildID}
}
// The non-existent builds will be filtered out in the filtering builds for-loop as they have no tags.
if err := model.GetIgnoreMissing(ctx, builds); err != nil {
logging.Errorf(ctx, "error fetching builds on fetchOnTagIndex code path : %s", err)
return nil, errors.Annotate(err, "error fetching builds").Err()
}
// Filter builds
for i, b := range builds {
buildTags := stringset.NewFromSlice(b.Tags...)
// Check for inconsistent entries.
if b.BucketID != entriesToFetch[i].BucketID || !buildTags.Has(indexedTag) {
logging.Warningf(ctx, "entry with build_id %d is inconsistent", b.ID)
inconsistentEntries++
continue
}
// Check user-supplied filters.
if !buildTags.HasAll(tags...) ||
(q.Status == pb.Status_ENDED_MASK && b.Incomplete) ||
(q.Status != pb.Status_STATUS_UNSPECIFIED && q.Status != pb.Status_ENDED_MASK && q.Status != b.Status) ||
(q.CreatedBy != "" && q.CreatedBy != b.CreatedBy) ||
(q.Builder.GetBuilder() != "" && b.Proto.Builder.Builder != q.Builder.Builder) ||
(q.Builder.GetProject() != "" && b.Proto.Builder.Project != q.Builder.Project) {
continue
}
bExps := stringset.NewFromSlice(b.Experiments...)
if !bExps.Contains(expFilter) {
continue
}
if nonProdFilter != "" && bExps.Has(nonProdFilter) {
continue
}
results = append(results, b.ToSimpleBuildProto(ctx))
}
}
// TODO(crbug/1090540): add metrics for inconsistentEntries.
rsp := &pb.SearchBuildsResponse{
Builds: results,
}
if len(results) == int(q.PageSize) && lastConsideredEntry != nil {
rsp.NextPageToken = fmt.Sprintf("id>%d", lastConsideredEntry.BuildID)
}
return rsp, nil
}
// filterEntries filters tag index entries by the build ID ranges and buckets
// conditions in the Query.
func (q *Query) filterEntries(ctx context.Context, entries []*model.TagIndexEntry) ([]*model.TagIndexEntry, error) {
idLow, idHigh := q.idRange()
if idHigh == 0 {
idHigh = int64(uint64(1)<<63 - 1)
}
if idLow >= idHigh {
return nil, nil
}
bucketID := protoutil.FormatBucketID(q.Builder.GetProject(), q.Builder.GetBucket())
preprocessed := make([]*model.TagIndexEntry, 0, len(entries))
// A cache whether the user has the access permission to buckets.
hasAccessCache := map[string]bool{}
for _, e := range entries {
if e.BuildID < idLow || e.BuildID >= idHigh {
continue
}
// If the bucket in query is not specified, the permission was not checked earlier.
// In this case, check the permission.
if q.Builder.GetBucket() == "" {
has, ok := hasAccessCache[e.BucketID]
if !ok {
proj, bkt, _ := protoutil.ParseBucketID(e.BucketID)
if err := perm.HasInBucket(ctx, bbperms.BuildsList, proj, bkt); err == nil {
has = true
} else {
status, ok := appstatus.Get(err)
if !ok || (status.Code() != codes.PermissionDenied && status.Code() != codes.NotFound) {
return nil, err
}
}
hasAccessCache[e.BucketID] = has
}
if !has {
continue
}
} else if bucketID != e.BucketID {
continue
}
preprocessed = append(preprocessed, e)
}
return preprocessed, nil
}
// idRange computes the id range from q.BuildIdLow/q.BuildIdHigh, q.StartTime/q.EndTime and q.StartCursor.
// Returning 0 means no boundary.
func (q *Query) idRange() (idLow, idHigh int64) {
if q.BuildIDLow != 0 || q.BuildIDHigh != 0 {
idLow, idHigh = q.BuildIDLow, q.BuildIDHigh
} else {
idLow, idHigh = buildid.IDRange(q.StartTime, q.EndTime)
}
if q.PageToken != "" {
if minExclusiveID, _ := strconv.ParseInt(q.PageToken[len("id>"):], 10, 64); minExclusiveID+1 > idLow {
idLow = minExclusiveID + 1
}
}
return
}
// fixPageSize ensures the size is positive and less than or equal to maxPageSize.
func fixPageSize(size int32) int32 {
switch {
case size <= 0:
return defaultPageSize
case size > maxPageSize:
return maxPageSize
default:
return size
}
}
// fixPageSize ensures the createdBy identity string is the format "kind:value".
func fixCreatedBy(createdBy string) string {
if createdBy != "" && !strings.Contains(createdBy, ":") {
createdBy = fmt.Sprintf("user:%s", createdBy)
}
return createdBy
}
// mustTimestamp converts a protobuf timestamp to a time.Time and panics on failures.
// It returns zero time for nil timestamp.
func mustTimestamp(ts *timestamppb.Timestamp) time.Time {
if ts == nil {
return time.Time{}
}
if err := ts.CheckValid(); err != nil {
panic(err)
}
t := ts.AsTime()
return t
}
// minHeap holds a slice of TagIndexEntry and implements heap.Interface.
type minHeap []*model.TagIndexEntry
var _ heap.Interface = &minHeap{}
func (m minHeap) Len() int { return len(m) }
func (m minHeap) Less(i, j int) bool { return m[i].BuildID < m[j].BuildID }
func (m minHeap) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m *minHeap) Push(x any) {
*m = append(*m, x.(*model.TagIndexEntry))
}
func (m *minHeap) Pop() any {
old := *m
n := len(old)
item := old[n-1]
*m = old[0 : n-1]
return item
}