blob: 72018dbd628d49a0f356ae63097761a4946cf410 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package run
import (
"container/heap"
"context"
"fmt"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/configs/prjcfg"
)
// CLQueryBuilder builds datastore.Query for searching Runs of a given CL.
type CLQueryBuilder struct {
// CLID of the CL being searched for. Required.
CLID common.CLID
// Optional extra CLs that must be included.
AdditionalCLIDs common.CLIDsSet
// Project optionally restricts Runs to the given LUCI project.
Project string
// MaxExcl restricts query to Runs with ID lexicographically smaller.
//
// This means query will return union of:
// * all Runs created after this Run in the same project,
// * all Runs in lexicographically smaller projects,
// unless .Project is set to the same project (recommended).
MaxExcl common.RunID
// MinExcl restricts query to Runs with ID lexicographically larger.
//
// This means query will return union of:
// * all Runs created before this Run in the same project,
// * all Runs in lexicographically larger projects,
// unless .Project is set to the same project (recommended).
MinExcl common.RunID
// Limit limits the number of results if positive. Ignored otherwise.
Limit int32
}
// isSatisfied returns whether the given Run satisfies the query.
func (b CLQueryBuilder) isSatisfied(r *Run) bool {
// If there are additional CLs that must be included,
// check whether all of these are indeed included.
if len(b.AdditionalCLIDs) > 0 {
count := 0
for _, clid := range r.CLs {
if b.AdditionalCLIDs.Has(clid) {
count += 1
}
}
if count != len(b.AdditionalCLIDs) {
return false
}
}
switch {
case r == nil:
case b.Project != "" && r.ID.LUCIProject() != b.Project:
case b.MinExcl != "" && r.ID <= b.MinExcl:
case b.MaxExcl != "" && r.ID >= b.MaxExcl:
default:
return true
}
return false
}
// AfterInProject constrains CLQueryBuilder to Runs created after this Run but
// belonging to the same LUCI project.
//
// Panics if CLQueryBuilder is already constrained to a different LUCI Project.
func (b CLQueryBuilder) AfterInProject(id common.RunID) CLQueryBuilder {
if p := id.LUCIProject(); p != b.Project {
if b.Project != "" {
panic(fmt.Errorf("invalid CLQueryBuilder.AfterInProject(%q): .Project is already set to %q", id, b.Project))
}
b.Project = p
}
b.MaxExcl = id
return b
}
// BeforeInProject constrains CLQueryBuilder to Runs created before this Run but
// belonging to the same LUCI project.
//
// Panics if CLQueryBuilder is already constrained to a different LUCI Project.
func (b CLQueryBuilder) BeforeInProject(id common.RunID) CLQueryBuilder {
if p := id.LUCIProject(); p != b.Project {
if b.Project != "" {
panic(fmt.Errorf("invalid CLQueryBuilder.BeforeInProject(%q): .Project is already set to %q", id, b.Project))
}
b.Project = p
}
b.MinExcl = id
return b
}
// PageToken constraints CLQueryBuilder to continue searching from the prior
// search.
func (b CLQueryBuilder) PageToken(pt *PageToken) CLQueryBuilder {
if pt != nil {
b.MinExcl = common.RunID(pt.GetRun())
}
return b
}
// BuildKeysOnly returns keys-only query on RunCL entities.
//
// It's exposed primarily for debugging reasons.
func (b CLQueryBuilder) BuildKeysOnly(ctx context.Context) *datastore.Query {
q := datastore.NewQuery(RunCLKind).Eq("IndexedID", b.CLID).KeysOnly(true)
if b.Limit > 0 {
q = q.Limit(b.Limit)
}
min := string(b.MinExcl)
max := string(b.MaxExcl)
if b.Project != "" {
prMin, prMax := rangeOfProjectIDs(b.Project)
if min == "" || min < prMin {
min = prMin
}
if max == "" || max > prMax {
max = prMax
}
}
if min != "" {
q = q.Gt("__key__", datastore.MakeKey(ctx, common.RunKind, min, RunCLKind, int64(b.CLID)))
}
if max != "" {
q = q.Lt("__key__", datastore.MakeKey(ctx, common.RunKind, max, RunCLKind, int64(b.CLID)))
}
return q
}
// GetAllRunKeys runs the query across all matched RunCLs entities and returns
// Datastore keys to corresponding Run entities.
func (b CLQueryBuilder) GetAllRunKeys(ctx context.Context) ([]*datastore.Key, error) {
// Fetch RunCL keys.
var keys []*datastore.Key
if err := datastore.GetAll(ctx, b.BuildKeysOnly(ctx), &keys); err != nil {
return nil, errors.Annotate(err, "failed to fetch RunCLs IDs").Tag(transient.Tag).Err()
}
// Replace each RunCL key with its parent (Run) key.
for i := range keys {
keys[i] = keys[i].Parent()
}
return keys, nil
}
// LoadRuns returns matched Runs and the page token to continue search later.
func (b CLQueryBuilder) LoadRuns(ctx context.Context, checkers ...LoadRunChecker) ([]*Run, *PageToken, error) {
return loadRunsFromQuery(ctx, b, checkers...)
}
// qLimit implements runKeysQuery interface.
func (b CLQueryBuilder) qLimit() int32 { return b.Limit }
// qPageToken implements runKeysQuery interface.
func (b CLQueryBuilder) qPageToken(pt *PageToken) runKeysQuery { return b.PageToken(pt) }
// ProjectQueryBuilder builds datastore.Query for searching Runs scoped to a
// LUCI project.
type ProjectQueryBuilder struct {
// Project is the LUCI project. Required.
Project string
// Status optionally restricts query to Runs with this status.
Status Status
// MaxExcl restricts query to Runs with ID lexicographically smaller. Optional.
//
// This means query is restricted to Runs created after this Run.
//
// This Run must belong to the same LUCI project.
MaxExcl common.RunID
// MinExcl restricts query to Runs with ID lexicographically larger. Optional.
//
// This means query is restricted to Runs created before this Run.
//
// This Run must belong to the same LUCI project.
MinExcl common.RunID
// Limit limits the number of results if positive. Ignored otherwise.
Limit int32
}
// isSatisfied returns whether the given Run satisfies the query.
func (b ProjectQueryBuilder) isSatisfied(r *Run) bool {
switch {
case r == nil:
case r.ID.LUCIProject() != b.Project:
case b.Status == Status_ENDED_MASK && !IsEnded(r.Status):
case b.Status != Status_ENDED_MASK && b.Status != Status_STATUS_UNSPECIFIED && r.Status != b.Status:
case b.MinExcl != "" && r.ID <= b.MinExcl:
case b.MaxExcl != "" && r.ID >= b.MaxExcl:
default:
return true
}
return false
}
// After restricts the query to Runs created after the given Run.
//
// Panics if ProjectQueryBuilder is already constrained to a different Project.
func (b ProjectQueryBuilder) After(id common.RunID) ProjectQueryBuilder {
if p := id.LUCIProject(); p != b.Project {
if b.Project != "" {
panic(fmt.Errorf("invalid ProjectQueryBuilder.After(%q): .Project is already set to %q", id, b.Project))
}
b.Project = p
}
b.MaxExcl = id
return b
}
// Before restricts the query to Runs created before the given Run.
//
// Panics if ProjectQueryBuilder is already constrained to a different Project.
func (b ProjectQueryBuilder) Before(id common.RunID) ProjectQueryBuilder {
if p := id.LUCIProject(); p != b.Project {
if b.Project != "" {
panic(fmt.Errorf("invalid ProjectQueryBuilder.Before(%q): .Project is already set to %q", id, b.Project))
}
b.Project = p
}
b.MinExcl = id
return b
}
// PageToken constraints ProjectQueryBuilder to continue searching from the
// prior search.
func (b ProjectQueryBuilder) PageToken(pt *PageToken) ProjectQueryBuilder {
if pt != nil {
b.MinExcl = common.RunID(pt.GetRun())
}
return b
}
// BuildKeysOnly returns keys-only query on Run entities.
//
// It's exposed primarily for debugging reasons.
//
// WARNING: panics if Status is magic Status_ENDED_MASK,
// as it's not feasible to perform this as 1 query.
func (b ProjectQueryBuilder) BuildKeysOnly(ctx context.Context) *datastore.Query {
q := datastore.NewQuery(common.RunKind).KeysOnly(true)
switch b.Status {
case Status_ENDED_MASK:
panic(fmt.Errorf("Status=Status_ENDED_MASK is not yet supported"))
case Status_STATUS_UNSPECIFIED:
default:
q = q.Eq("Status", int(b.Status))
}
if b.Limit > 0 {
q = q.Limit(b.Limit)
}
if b.Project == "" {
panic(fmt.Errorf("Project is not set"))
}
min, max := rangeOfProjectIDs(b.Project)
switch {
case b.MinExcl == "":
case b.MinExcl.LUCIProject() != b.Project:
panic(fmt.Errorf("MinExcl %q doesn't match Project %q", b.MinExcl, b.Project))
default:
min = string(b.MinExcl)
}
q = q.Gt("__key__", datastore.MakeKey(ctx, common.RunKind, min))
switch {
case b.MaxExcl == "":
case b.MaxExcl.LUCIProject() != b.Project:
panic(fmt.Errorf("MaxExcl %q doesn't match Project %q", b.MaxExcl, b.Project))
default:
max = string(b.MaxExcl)
}
q = q.Lt("__key__", datastore.MakeKey(ctx, common.RunKind, max))
return q
}
// GetAllRunKeys runs the query and returns Datastore keys to Run entities.
func (b ProjectQueryBuilder) GetAllRunKeys(ctx context.Context) ([]*datastore.Key, error) {
var keys []*datastore.Key
if b.Status != Status_ENDED_MASK {
if err := datastore.GetAll(ctx, b.BuildKeysOnly(ctx), &keys); err != nil {
return nil, errors.Annotate(err, "failed to fetch Runs IDs").Tag(transient.Tag).Err()
}
return keys, nil
}
// Status_ENDED_MASK requires several dedicated queries.
queries := make([]*datastore.Query, len(finalStatuses))
for i, s := range finalStatuses {
cpy := b
cpy.Status = s
queries[i] = cpy.BuildKeysOnly(ctx)
}
err := datastore.RunMulti(ctx, queries, func(k *datastore.Key) error {
keys = append(keys, k)
if b.Limit > 0 && len(keys) == int(b.Limit) {
return datastore.Stop
}
return nil
})
if err != nil {
return nil, errors.Annotate(err, "failed to fetch Runs IDs").Tag(transient.Tag).Err()
}
return keys, err
}
// LoadRuns returns matched Runs and the page token to continue search later.
func (b ProjectQueryBuilder) LoadRuns(ctx context.Context, checkers ...LoadRunChecker) ([]*Run, *PageToken, error) {
return loadRunsFromQuery(ctx, b, checkers...)
}
// qLimit implements runKeysQuery interface.
func (b ProjectQueryBuilder) qLimit() int32 { return b.Limit }
// qPageToken implements runKeysQuery interface.
func (b ProjectQueryBuilder) qPageToken(pt *PageToken) runKeysQuery { return b.PageToken(pt) }
// RecentQueryBuilder builds a VERY SLOW query for searching recent Runs
// across all LUCI projects.
//
// If two runs have the same timestamp, orders Runs first by the LUCI Project
// name and then by the remainder of the Run's ID.
//
// Beware: two Runs having the same timestamp is actually quite likely with
// Google Gerrit because it rounds updates to second granularity, which then
// makes its way as Run Creation time.
//
// **WARNING**: this is the most inefficient way to be used infrequently for CV
// admin needs only. Behind the scenes, it issues a Datastore query per active
// LUCI project.
//
// Doesn't yet support restricting search to a specific time range, but it can
// be easily implemented if necessary.
type RecentQueryBuilder struct {
// Status optionally restricts query to Runs with this status.
Status Status
// Limit limits the number of results if positive. Ignored otherwise.
Limit int32
// CheckProjectAccess checks if the calling user has access to the LUCI
// project. Optional.
//
// If provided, Runs from the LUCI Projects that requested user doesn't have
// access to won't be returned without even querying.
// If not provided, search is done across all projects.
CheckProjectAccess func(context.Context, string) (bool, error)
// lastFoundRunID is basically a page token.
lastFoundRunID common.RunID
// boundaryInverseTS / boundaryProject are just caches based on
// lastFoundRunID set by PageToken(). See keysForProject() for their use.
boundaryInverseTS string
boundaryProject string
// availableProjects is set on a local temp copy of the RecentQueryBuilder by
// the LoadRuns(). It's then used by loadAvailableProjects as a cache if not
// nil. Empty slice means a cached value of no available projects,
// possibly because caller has no access to any project.
availableProjects []string
}
// PageToken constraints RecentQueryBuilder to continue searching from the
// prior search.
func (b RecentQueryBuilder) PageToken(pt *PageToken) RecentQueryBuilder {
if pt != nil {
b.lastFoundRunID = common.RunID(pt.GetRun())
b.boundaryProject = b.lastFoundRunID.LUCIProject()
b.boundaryInverseTS = b.lastFoundRunID.InverseTS()
}
return b
}
// GetAllRunKeys runs the query and returns Datastore keys to Run entities.
//
// WARNING: very slow.
//
// Since RunID includes LUCI project, RunIDs aren't lexicographically ordered by
// creation time across LUCI projects. So, the brute force is to query each
// known to CV LUCI project for most recent Run IDs, and then merge and select
// the next page of resulting keys.
func (b RecentQueryBuilder) GetAllRunKeys(ctx context.Context) ([]*datastore.Key, error) {
projects, err := b.loadAvailableProjects(ctx)
switch {
case err != nil:
return nil, err
case len(projects) == 0:
return nil, nil
}
// Do a query per project, in parallel.
// KeysOnly queries are cheap in both time and money.
allKeys := make([][]*datastore.Key, len(projects))
errs := parallel.WorkPool(min(16, len(projects)), func(work chan<- func() error) {
for i, p := range projects {
i, p := i, p
work <- func() error {
var err error
allKeys[i], err = b.keysForProject(ctx, p)
return err
}
}
})
if errs != nil {
return nil, common.MostSevereError(errs)
}
// Finally, merge resulting keys maintaining the documented order.
return b.selectLatest(allKeys...), nil
}
// LoadRuns returns matched Runs and the page token to continue search later.
func (b RecentQueryBuilder) LoadRuns(ctx context.Context, checkers ...LoadRunChecker) ([]*Run, *PageToken, error) {
// Load all enabled & disabled projects and set the cache.
//
// NOTE: the cache is bound to this copy of RecentQueryBuilder and whichever
// copies are made from it by the loadRunsFromQuery, but the caller of the
// RecentQueryBuilder.LoadRuns never gets to see such a copy.
switch projects, err := b.loadAvailableProjects(ctx); {
case err != nil:
return nil, nil, err
case len(projects) == 0:
b.availableProjects = []string{}
default:
b.availableProjects = projects
}
return loadRunsFromQuery(ctx, b, checkers...)
}
// loadAvailableProjects caches active visible projects.
func (b RecentQueryBuilder) loadAvailableProjects(ctx context.Context) ([]string, error) {
// Use cache if it is set.
// NOTE: Empty slice is a valid cache value of 0 available projects.
if b.availableProjects != nil {
return b.availableProjects, nil
}
// Load all enabled & disabled projects.
projects, err := prjcfg.GetAllProjectIDs(ctx, false)
if err != nil {
return nil, err
}
if b.CheckProjectAccess == nil {
return projects, nil
}
filtered := projects[:0]
for _, p := range projects {
switch ok, err := b.CheckProjectAccess(ctx, p); {
case err != nil:
return nil, err
case ok:
filtered = append(filtered, p)
}
}
return filtered, nil
}
// qLimit implements runKeysQuery interface.
func (b RecentQueryBuilder) qLimit() int32 { return b.Limit }
// qPageToken implements runKeysQuery interface.
func (b RecentQueryBuilder) qPageToken(pt *PageToken) runKeysQuery { return b.PageToken(pt) }
// isSatisfied returns whether the given Run satisfies the query.
func (b RecentQueryBuilder) isSatisfied(r *Run) bool {
switch {
case r == nil:
case b.Status == Status_ENDED_MASK && !IsEnded(r.Status):
case b.Status != Status_ENDED_MASK && b.Status != Status_STATUS_UNSPECIFIED && r.Status != b.Status:
default:
return true
}
return false
}
// keysForProject returns matching Run keys for a specific project.
func (b RecentQueryBuilder) keysForProject(ctx context.Context, project string) ([]*datastore.Key, error) {
pqb := ProjectQueryBuilder{
Status: b.Status,
Project: project,
Limit: b.Limit,
}
switch {
case b.boundaryProject == "":
// No page token.
case b.boundaryProject > project:
// Must be a strictly older Run, i.e. have strictly higher InverseTS than
// the boundaryInverseTS. Since '-' (ASCII code 45) follows the InverseTS
// in RunID schema, all Run IDs with the same InverseTS will be smaller
// than 'InverseTS.' ('.' has ASCII code 46).
pqb.MinExcl = common.RunID(fmt.Sprintf("%s/%s%c", project, b.boundaryInverseTS, ('-' + 1)))
case b.boundaryProject < project:
// Must have the same or higher InverseTS.
// Since '-' follows the InverseTS in RunID schema, any RunID with the
// same InverseTS will be strictly greater than "InverseTS-".
pqb.MinExcl = common.RunID(fmt.Sprintf("%s/%s%c", project, b.boundaryInverseTS, '-'))
default:
// Same LUCI project, can use the last found Run as the boundary.
// This is actually important in case there are several Runs in this project
// with the same timestamp.
pqb.MinExcl = b.lastFoundRunID
}
return pqb.GetAllRunKeys(ctx)
}
// selectLatest returns up to the limit of Run IDs ordered by:
// * DESC Created (== ASC InverseTS, or latest first)
// * ASC Project
// * ASC RunID (the remaining part of RunID)
//
// IDs in each input slice must be be in Created DESC order.
//
// Mutates inputs.
func (b RecentQueryBuilder) selectLatest(inputs ...[]*datastore.Key) []*datastore.Key {
popLatest := func(idx int) (runHeapKey, bool) {
input := inputs[idx]
if len(input) == 0 {
return runHeapKey{}, false
}
inputs[idx] = input[1:]
rid := common.RunID(input[0].StringID())
inverseTS := rid.InverseTS()
project := rid.LUCIProject()
remaining := rid[len(project)+1+len(inverseTS):]
sortKey := fmt.Sprintf("%s/%s/%s", inverseTS, project, remaining)
return runHeapKey{input[0], sortKey, idx}, true
}
h := make(runHeap, 0, len(inputs))
// Init the heap with the latest element from each non-empty input.
for idx := range inputs {
if v, ok := popLatest(idx); ok {
h = append(h, v)
}
}
heap.Init(&h)
var out []*datastore.Key
for len(h) > 0 {
v := heap.Pop(&h).(runHeapKey)
out = append(out, v.dsKey)
if len(out) == int(b.Limit) { // if Limit is <= 0, continues until heap is empty.
break
}
if v, ok := popLatest(v.idx); ok {
heap.Push(&h, v)
}
}
return out
}
// rangeOfProjectIDs returns (min..max) non-existent Run IDs, such that
// the following
// min < $RunID < max
// for all valid $RunID belonging to the given project.
func rangeOfProjectIDs(project string) (string, string) {
// ID starts with the LUCI Project name followed by '/' and a 13-digit
// number. So it must be lexicographically greater than "project/0" and
// less than "project/:" (int(':') == int('9') + 1).
return project + "/0", project + "/:"
}
// runKeysQuery abstracts out existing ...QueryBuilder in this file.
type runKeysQuery interface {
GetAllRunKeys(context.Context) ([]*datastore.Key, error)
isSatisfied(*Run) bool
qPageToken(*PageToken) runKeysQuery // must return a copy
qLimit() int32
}
// queryStopAfterDuration and queryStopAfterIterations gracefully stop
// loadRunsFromQuery after both are reached.
const queryStopAfterDuration = 5 * time.Second
const queryStopAfterIterations = 5
// loadRunsFromQuery returns matched Runs and a page token.
//
// If limit is specified, continues loading Runs until the limit is reached.
func loadRunsFromQuery(ctx context.Context, q runKeysQuery, checkers ...LoadRunChecker) ([]*Run, *PageToken, error) {
if l := len(checkers); l > 1 {
panic(fmt.Errorf("at most 1 LoadRunChecker allowed, %d given", l))
}
var out []*Run
// Loop until we fetch the limit.
startTime := clock.Now(ctx)
originalQuery := q
limit := int(q.qLimit())
for iteration := 1; ; iteration++ {
// Fetch the next `limit` of keys in all iterations, even though we may
// already have some in `out` since the keys-only queries are cheap,
// and this way code is simpler.
keys, err := q.GetAllRunKeys(ctx)
switch {
case err != nil:
return nil, nil, err
case len(keys) == 0:
// Search space exhausted.
return out, nil, nil
}
loader := LoadRunsFromKeys(keys...)
if len(checkers) == 1 {
loader = loader.Checker(checkers[0])
}
runs, err := loader.DoIgnoreNotFound(ctx)
if err != nil {
return nil, nil, err
}
// Even for queries which can do everything using native Datastore
// query, there is a window of time between the Datastore query
// fetching keys of satisfying Runs and actual Runs being fetched.
// During this window, the Runs ultimately fetched could have been
// modified, so check again and skip all fetched Runs which no longer
// satisfy the query.
for _, r := range runs {
if q.isSatisfied(r) {
out = append(out, r)
}
}
switch {
case limit <= 0:
return out, nil, nil
case len(out) < limit:
// Prepare to iterating from the last considered key.
pt := &PageToken{Run: keys[len(keys)-1].StringID()}
q = q.qPageToken(pt)
if iteration >= queryStopAfterIterations && clock.Since(ctx, startTime) > queryStopAfterDuration {
// Avoid excessive looping when most Runs are filtered out by
// returning an incomplete page of results earlier with a valid
// page token, so that if necessary, the caller can continue
// later.
logging.Debugf(ctx, "loadRunsFromQuery stops after %d iterations and %s time, returning %d out of %d requested Runs [query: %s]",
iteration, clock.Since(ctx, startTime), len(out), limit, originalQuery)
return out, pt, nil
}
case len(out) == limit && len(keys) < limit:
// Even though some Runs may have been filtered by checkers and/or
// isSatisfied called, we know we processed all the keys most recently
// fetched and there are no more keys.
// So, page token is not necessary.
return out, nil, nil
case len(out) == limit:
return out, &PageToken{Run: keys[len(keys)-1].StringID()}, nil
default:
firstNotReturnedID := string(out[limit].ID)
// The firstNotReturnedID would have been a perfect page token iff
// *inclusive* PageToken was supported. But since we need an *exclusive*
// page token, find a key preceding the key corresponding to
// firstNotReturnedID.
//
// NOTE: the keys themselves aren't necessarily ordered by ASC Run ID,
// e.g. in case of RecentQueryBuilder. Therefore, we must iterate them in
// order and can't do binary search.
for i, k := range keys {
if k.StringID() == firstNotReturnedID {
return out[:limit], &PageToken{Run: keys[i-1].StringID()}, nil
}
}
panic("unreachable")
}
}
}