blob: 2c1f41ce0d760d7ed2329321d4cbe8ed214567fe [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package bbfacade
import (
bbpb ""
bbutil ""
// AcceptedAdditionalPropKeys are additional properties keys that if present
// in the requested properties of the build, LUCI CV should still consider the
// build as reusable.
// LUCI CV checks requested properties rather than input properties because
// LUCI CV only cares about whether the properties used by a build is different
// from the pre-defined properties in Project Config (assuming change in
// properties may result in change in build result). Requested properties are
// properties provided in ScheduleBuild which currently is the only way
// to add/modify build properties. LUCI CV premits certain keys which are either
// added by LUCI CV itself or known that they won't change build behavior.
var AcceptedAdditionalPropKeys = stringset.NewFromSlice(
"$recipe_engine/cv", // future proof
var searchBuildsMask *bbpb.BuildMask
func init() {
searchBuildsMask = proto.Clone(defaultMask).(*bbpb.BuildMask)
if err := searchBuildsMask.Fields.Append((*bbpb.Build)(nil),
); err != nil {
// Search searches Buildbucket for builds that match all provided CLs and
// any of the provided definitions.
// Also filters out builds that specify extra properties. See:
// `AcceptedAdditionalPropKeys`.
// `cb` is invoked for each matching Tryjob converted from Buildbucket build
// until `cb` returns false or all matching Tryjobs are exhausted or error
// occurs. The Tryjob `cb` receives only populates following fields:
// * ExternalID
// * Definition
// * Status
// * Result
// Also, the Tryjobs are guaranteed to have decreasing build ID (in other
// word, from newest to oldest) ONLY within the same host.
// For example, for following matching builds:
// * host: A, build: 100, create time: now
// * host: A, build: 101, create time: now - 2min
// * host: B, build: 1000, create time: now - 1min
// * host: B, build: 1001, create time: now - 3min
// It is possible that `cb` is called in following orders:
// * host: B, build: 1000, create time: now - 1min
// * host: A, build: 100, create time: now
// * host: B, build: 1001, create time: now - 3min
// * host: A, build: 101, create time: now - 2min
// TODO(yiwzhang): ensure `cb` get called from newest to oldest builds across
// all hosts.
// Uses the provided `luciProject` for authentication. If any of the given
// definitions defines builder from other LUCI Project, this other LUCI Project
// should grant bucket READ permission to the provided `luciProject`.
// Otherwise, the builds won't show up in the search result.
func (f *Facade) Search(ctx context.Context, cls []*run.RunCL, definitions []*tryjob.Definition, luciProject string, cb func(*tryjob.Tryjob) bool) error {
shouldStop, stop := makeStopFunction()
workers, err := f.makeSearchWorkers(ctx, cls, definitions, luciProject, shouldStop)
if err != nil {
return err
var wg sync.WaitGroup
resultCh := make(chan searchResult)
for _, worker := range workers {
worker := worker
go func() {
defer wg.Done(), resultCh)
go func() {
for res := range resultCh {
switch {
case shouldStop(): // draining
case res.err != nil:
err = res.err
case !cb(res.tryjob):
return err
func makeStopFunction() (shouldStop func() bool, stop func()) {
var stopIndicator int32
shouldStop = func() bool {
return atomic.LoadInt32(&stopIndicator) > 0
stop = func() {
atomic.AddInt32(&stopIndicator, 1)
return shouldStop, stop
// searchWorker is a worker search builds against a single Buildbucket host.
// The matching build will be pushed to `resultCh` one by one including any
// error occurred during the search. `resultCh` is closed when either a error
// is returned or all matching builds have been exhausted.
// Algorithm for searching:
// * Pick the CL with smallest (patchset - min_equivalent_patchset) as the
// search predicate
// * Page the search response and accept a build if
// * The Gerrit changes of the build matches the input CLs. Match means
// host and change number are the same and the patchset is in between
// cl.min_equivalent_patchset and cl.patchset
// * The builder of the build should be either the main builder or the
// equivalent builder of any of the input definitions
// * The requested properties only have keys specified in
// `AcceptedPropertyKeys`
type searchWorker struct {
bbHost string
luciProject string
bbClient buildbucket.Client
clSearchTarget *run.RunCL
acceptedCLRanges map[string]patchsetRange
builderToDefinition map[string]*tryjob.Definition
shouldStop func() bool
type patchsetRange struct {
minIncl, maxIncl int64
func (f *Facade) makeSearchWorkers(ctx context.Context, cls []*run.RunCL, definitions []*tryjob.Definition, luciProject string, shouldStop func() bool) ([]searchWorker, error) {
var hostToWorker = make(map[string]searchWorker)
for _, def := range definitions {
if def.GetBuildbucket() == nil {
panic(fmt.Errorf("call buildbucket backend for non-buildbucket definition: %s", def))
bbHost := def.GetBuildbucket().GetHost()
worker, ok := hostToWorker[bbHost]
if !ok {
bbClient, err := f.ClientFactory.MakeClient(ctx, bbHost, luciProject)
if err != nil {
return nil, err
clRanges, clWithSmallestRange := computeCLRangesAndPickSmallest(cls)
worker = searchWorker{
bbHost: bbHost,
luciProject: luciProject,
bbClient: bbClient,
acceptedCLRanges: clRanges,
clSearchTarget: clWithSmallestRange,
builderToDefinition: make(map[string]*tryjob.Definition),
shouldStop: shouldStop,
hostToWorker[bbHost] = worker
worker.builderToDefinition[bbutil.FormatBuilderID(def.GetBuildbucket().GetBuilder())] = def
if def.GetEquivalentTo() != nil {
worker.builderToDefinition[bbutil.FormatBuilderID(def.GetEquivalentTo().GetBuildbucket().GetBuilder())] = def
ret := make([]searchWorker, 0, len(hostToWorker))
for _, worker := range hostToWorker {
ret = append(ret, worker)
return ret, nil
func computeCLRangesAndPickSmallest(cls []*run.RunCL) (map[string]patchsetRange, *run.RunCL) {
clToRange := make(map[string]patchsetRange, len(cls))
var clWithSmallestPatchsetRange *run.RunCL
var smallestRange int64
for _, cl := range cls {
psRange := struct {
minIncl int64
maxIncl int64
}{int64(cl.Detail.GetMinEquivalentPatchset()), int64(cl.Detail.GetPatchset())}
clToRange[formatChangeID(cl.Detail.GetGerrit().GetHost(), cl.Detail.GetGerrit().GetInfo().GetNumber())] = psRange
if r := psRange.maxIncl - psRange.minIncl + 1; smallestRange == 0 || r < smallestRange {
clWithSmallestPatchsetRange = cl
smallestRange = r
return clToRange, clWithSmallestPatchsetRange
type searchResult struct {
tryjob *tryjob.Tryjob
err error
func (sw *searchWorker) search(ctx context.Context, resultCh chan<- searchResult) {
changeDetail := sw.clSearchTarget.Detail
gc := &bbpb.GerritChange{
Host: changeDetail.GetGerrit().GetHost(),
Project: changeDetail.GetGerrit().GetInfo().GetProject(),
Change: changeDetail.GetGerrit().GetInfo().GetNumber(),
req := &bbpb.SearchBuildsRequest{
Predicate: &bbpb.BuildPredicate{
GerritChanges: []*bbpb.GerritChange{gc},
IncludeExperimental: true,
Mask: searchBuildsMask,
for ps := changeDetail.GetPatchset(); ps >= changeDetail.GetMinEquivalentPatchset(); ps-- {
gc.Patchset = int64(ps)
req.PageToken = ""
for {
if sw.shouldStop() {
res, err := sw.bbClient.SearchBuilds(ctx, req)
if err != nil {
resultCh <- searchResult{err: errors.Annotate(err, "failed to call buildbucket.SearchBuilds").Tag(transient.Tag).Err()}
for _, build := range res.GetBuilds() {
if def, ok := sw.canUseBuild(build); ok {
tj, err := sw.toTryjob(ctx, build, def)
if err != nil {
resultCh <- searchResult{err: err}
resultCh <- searchResult{tryjob: tj}
if res.NextPageToken == "" {
req.PageToken = res.NextPageToken
func (sw searchWorker) canUseBuild(build *bbpb.Build) (*tryjob.Definition, bool) {
switch def, matchBuilder := sw.builderToDefinition[bbutil.FormatBuilderID(build.GetBuilder())]; {
case !matchBuilder:
case !sw.matchCLs(build):
case hasAdditionalProperties(build):
return def, true
return nil, false
func (sw searchWorker) matchCLs(build *bbpb.Build) bool {
gcs := build.GetInput().GetGerritChanges()
changeToPatchset := make(map[string]int64, len(gcs))
for _, gc := range gcs {
changeToPatchset[formatChangeID(gc.GetHost(), gc.GetChange())] = gc.GetPatchset()
if len(changeToPatchset) != len(sw.acceptedCLRanges) {
return false
for changeID, ps := range changeToPatchset {
switch psRange, ok := sw.acceptedCLRanges[changeID]; {
case !ok:
return false
case ps < psRange.minIncl:
return false
case ps > psRange.maxIncl:
return false
return true
func hasAdditionalProperties(build *bbpb.Build) bool {
props := build.GetInfra().GetBuildbucket().GetRequestedProperties().GetFields()
for key := range props {
if !AcceptedAdditionalPropKeys.Has(key) {
return true
return false
func formatChangeID(host string, changeNum int64) string {
return fmt.Sprintf("%s/%d", host, changeNum)
func (sw *searchWorker) toTryjob(ctx context.Context, build *bbpb.Build, def *tryjob.Definition) (*tryjob.Tryjob, error) {
status, result, err := parseStatusAndResult(ctx, build)
if err != nil {
return nil, err
return &tryjob.Tryjob{
ExternalID: tryjob.MustBuildbucketID(sw.bbHost, build.Id),
Definition: def,
Status: status,
Result: result,
}, nil