blob: 9a84881cee93ba8854fb7e3bb52f8a25c08a7f50 [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bbfacade
import (
"context"
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"
"sync"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
"go.chromium.org/luci/auth/identity"
bbpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/cv/api/recipe/v1"
"go.chromium.org/luci/cv/internal/buildbucket"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/tryjob"
)
// TODO: crbug/333811087 - remove after cq module is deleted
const legacyPropertyKey = "$recipe_engine/cq"
const propertyKey = "$recipe_engine/cv"
// Launch schedules requested Tryjobs in Buildbucket.
//
// The Tryjobs will include relevant info from the Run (e.g. Run mode) and
// involves all provided CLs.
//
// Updates the Tryjobs that are scheduled successfully in Buildbucket in place.
// The following fields will be updated:
// - ExternalID
// - Status
// - Result
//
// Returns nil if all tryjobs have been successfully launched. Otherwise,
// returns `errors.MultiError` where each element is the launch error of
// the corresponding Tryjob.
//
// Uses Tryjob ID as the request key for deduplication. This ensures only one
// Buildbucket build will be scheduled for one Tryjob within the deduplication
// window (currently 1 min. in Buildbucket).
func (f *Facade) Launch(ctx context.Context, tryjobs []*tryjob.Tryjob, r *run.Run, cls []*run.RunCL) error {
tryjobsByHost := splitTryjobsByHost(tryjobs)
tryjobToIndex := make(map[*tryjob.Tryjob]int, len(tryjobs))
for i, tj := range tryjobs {
tryjobToIndex[tj] = i
}
launchErrs := errors.NewLazyMultiError(len(tryjobs))
poolErr := parallel.WorkPool(min(len(tryjobsByHost), 8), func(work chan<- func() error) {
for host, tryjobs := range tryjobsByHost {
host, tryjobs := host, tryjobs
work <- func() error {
err := f.schedule(ctx, host, r, cls, tryjobs)
var merrs errors.MultiError
switch ok := errors.As(err, &merrs); {
case err == nil:
case !ok:
// assign singular error to all tryjobs.
for _, tj := range tryjobs {
launchErrs.Assign(tryjobToIndex[tj], err)
}
default:
for i, tj := range tryjobs {
launchErrs.Assign(tryjobToIndex[tj], merrs[i])
}
}
return nil
}
}
})
if poolErr != nil {
panic(fmt.Errorf("impossible"))
}
return launchErrs.Get()
}
func splitTryjobsByHost(tryjobs []*tryjob.Tryjob) map[string][]*tryjob.Tryjob {
ret := make(map[string][]*tryjob.Tryjob, 2) // normally, at most 2 host.
for _, tj := range tryjobs {
bbDef := tj.Definition.GetBuildbucket()
if bbDef == nil {
panic(fmt.Errorf("launch non-Buildbucket Tryjob (%T) with Buildbucket backend", tj.Definition.GetBackend()))
}
ret[bbDef.GetHost()] = append(ret[bbDef.GetHost()], tj)
}
return ret
}
func (f *Facade) schedule(ctx context.Context, host string, r *run.Run, cls []*run.RunCL, tryjobs []*tryjob.Tryjob) error {
bbClient, err := f.ClientFactory.MakeClient(ctx, host, r.ID.LUCIProject())
if err != nil {
return errors.Annotate(err, "failed to create Buildbucket client").Err()
}
batches, err := prepareBatches(ctx, tryjobs, r, cls)
if err != nil {
return errors.Annotate(err, "failed to create batch schedule build requests").Err()
}
scheduleErrs := errors.NewLazyMultiError(len(tryjobs))
var wg sync.WaitGroup
for _, batch := range batches {
batch := batch
wg.Add(1)
go func() {
defer wg.Done()
err := makeRequestAndUpdateTryjobs(ctx, bbClient, batch, host)
var merrs errors.MultiError
switch ok := errors.As(err, &merrs); {
case err == nil:
case !ok:
// assign singular error to all tryjobs.
for i := 0; i < len(batch.tryjobs); i++ {
scheduleErrs.Assign(batch.offset+i, err)
}
default:
for i, merr := range merrs {
scheduleErrs.Assign(batch.offset+i, merr)
}
}
}()
}
wg.Wait()
return scheduleErrs.Get()
}
func makeRequestAndUpdateTryjobs(ctx context.Context, bbClient buildbucket.Client, batch batch, host string) error {
logging.Debugf(ctx, "scheduling a batch of %d builds against buildbucket", len(batch.req.GetRequests()))
res, err := bbClient.Batch(ctx, batch.req)
if err != nil {
return err
}
ret := errors.NewLazyMultiError(len(res.GetResponses()))
for i, res := range res.GetResponses() {
switch res.GetResponse().(type) {
case *bbpb.BatchResponse_Response_ScheduleBuild:
build := res.GetScheduleBuild()
status, result, err := parseStatusAndResult(ctx, build)
if err != nil {
ret.Assign(i, err)
}
tj := batch.tryjobs[i]
tj.ExternalID = tryjob.MustBuildbucketID(host, build.Id)
tj.Status = status
tj.Result = result
case *bbpb.BatchResponse_Response_Error:
ret.Assign(i, status.ErrorProto(res.GetError()))
case nil:
logging.Errorf(ctx, "FIXME(crbug/1359509): received nil response at index %d for batch request:\n%s", i, batch.req)
ret.Assign(i, transient.Tag.Apply(errors.New("unexpected nil response from Buildbucket")))
default:
panic(fmt.Errorf("unexpected response type: %T", res.GetResponse()))
}
}
return ret.Get()
}
// limit comes from buildbucket:
// https://pkg.go.dev/go.chromium.org/luci/buildbucket/proto#BatchRequest
const maxBatchSize = 200
// batch represents a batch request that contains smaller than $maxBatchSize
// of tryjobs
type batch struct {
req *bbpb.BatchRequest
tryjobs []*tryjob.Tryjob // the tryjobs to launch in batch request
offset int // offset of the tryjobs in the overall tryjob slice
}
func prepareBatches(ctx context.Context, tryjobs []*tryjob.Tryjob, r *run.Run, cls []*run.RunCL) ([]batch, error) {
gcs := makeGerritChanges(cls)
nonOptProp, optProp, err := makeProperties(ctx, r.Mode, r.Owner)
if err != nil {
return nil, errors.Annotate(err, "failed to make input properties").Err()
}
nonOptTags, optTags, err := makeTags(r, cls)
if err != nil {
return nil, errors.Annotate(err, "failed to make tags").Err()
}
requests := make([]*bbpb.BatchRequest_Request, len(tryjobs))
for i, tj := range tryjobs {
def := tj.Definition
req := &bbpb.ScheduleBuildRequest{
RequestId: strconv.Itoa(int(tj.ID)),
Builder: def.GetBuildbucket().GetBuilder(),
Properties: nonOptProp,
GerritChanges: gcs,
Tags: nonOptTags,
Mask: defaultMask,
}
if def.GetOptional() {
req.Properties = optProp
req.Tags = optTags
}
if experiments := tj.Definition.GetExperiments(); len(experiments) > 0 {
req.Experiments = make(map[string]bool, len(experiments))
for _, exp := range experiments {
req.Experiments[exp] = true
}
}
requests[i] = &bbpb.BatchRequest_Request{
Request: &bbpb.BatchRequest_Request_ScheduleBuild{
ScheduleBuild: req,
},
}
}
// creating $numBatch of batches with similar number of requests in each
// batch.
numBatch := (len(requests)-1)/maxBatchSize + 1
batchSize := len(requests) / numBatch
ret := make([]batch, 0, numBatch)
for offset := 0; offset < len(requests); {
req := &bbpb.BatchRequest{
Requests: requests[offset:min(offset+batchSize, len(requests))],
}
ret = append(ret, batch{
req: req,
tryjobs: tryjobs[offset:min(offset+batchSize, len(requests))],
offset: offset,
})
offset += len(req.GetRequests())
}
return ret, nil
}
func makeProperties(ctx context.Context, mode run.Mode, owner identity.Identity) (nonOpt, opt *structpb.Struct, err error) {
isGoogler, err := auth.GetState(ctx).DB().IsMember(ctx, owner, []string{"googlers"})
if err != nil {
return nil, nil, err
}
in := &recipe.Input{
Active: true,
DryRun: mode == run.DryRun,
RunMode: string(mode),
TopLevel: true,
OwnerIsGoogler: isGoogler,
}
if nonOpt, err = makeCVProperties(in); err != nil {
return nil, nil, err
}
in.Experimental = true
if opt, err = makeCVProperties(in); err != nil {
return nil, nil, err
}
return nonOpt, opt, nil
}
func makeCVProperties(in *recipe.Input) (*structpb.Struct, error) {
b, err := protojson.Marshal(in)
if err != nil {
return nil, err
}
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return nil, err
}
return structpb.NewStruct(map[string]any{
propertyKey: raw,
legacyPropertyKey: raw,
})
}
func makeGerritChanges(cls []*run.RunCL) []*bbpb.GerritChange {
ret := make([]*bbpb.GerritChange, len(cls))
for i, cl := range cls {
g := cl.Detail.GetGerrit()
if g == nil {
panic(fmt.Errorf("change backend (%T) is not supported", cl.Detail.GetKind()))
}
ret[i] = &bbpb.GerritChange{
Host: g.GetHost(),
Project: g.GetInfo().GetProject(),
Change: g.GetInfo().GetNumber(),
Patchset: int64(cl.Detail.GetPatchset()),
}
}
return ret
}
func makeTags(r *run.Run, cls []*run.RunCL) (nonOpt, opt []*bbpb.StringPair, err error) {
var commonTags []*bbpb.StringPair
addTag := func(key string, values ...string) {
for _, v := range values {
commonTags = append(commonTags, &bbpb.StringPair{Key: key, Value: strings.TrimSpace(v)})
}
}
addTag("user_agent", "cq")
addTag("cq_attempt_key", r.ID.AttemptKey())
addTag("cq_cl_group_key", run.ComputeCLGroupKey(cls, false))
addTag("cq_equivalent_cl_group_key", run.ComputeCLGroupKey(cls, true))
owners := stringset.New(1) // normally 1 owner 1 triggerer
triggerers := stringset.New(1)
for _, cl := range cls {
ownerID, err := cl.Detail.OwnerIdentity()
if err != nil {
return nil, nil, err
}
owners.Add(ownerID.Email())
triggerers.Add(cl.Trigger.GetEmail())
}
addTag("cq_cl_owner", owners.ToSlice()...)
addTag("cq_triggerer", triggerers.ToSlice()...)
addTag("cq_cl_tag", r.Options.GetCustomTryjobTags()...)
nonOpt = append([]*bbpb.StringPair{{Key: "cq_experimental", Value: "false"}}, commonTags...)
opt = append([]*bbpb.StringPair{{Key: "cq_experimental", Value: "true"}}, commonTags...)
sortTags(nonOpt)
sortTags(opt)
return nonOpt, opt, nil
}
func sortTags(tags []*bbpb.StringPair) {
slices.SortFunc(tags, func(a, b *bbpb.StringPair) int {
if res := strings.Compare(a.GetKey(), b.GetKey()); res != 0 {
return res
}
return strings.Compare(a.GetValue(), b.GetValue())
})
}