blob: b0bfbcee5f69bb1acc7e6a85c2f5991a9fa38696 [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bbfacade
import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
bbpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/cv/api/recipe/v1"
"go.chromium.org/luci/cv/internal/run"
"go.chromium.org/luci/cv/internal/tryjob"
)
const propertyKey = "$recipe_engine/cq"
// Launch schedules requested Tryjobs in Buildbucket.
//
// The Tryjobs will include relevant info from the Run (e.g. Run mode) and
// involves all provided CLs.
//
// Updates the Tryjobs that are scheduled successfully in Buildbucket in place.
// The following fields will be updated:
// * ExternalID
// * Status
// * Result
//
// Returns nil if all tryjobs have been successfully launched. Otherwise,
// returns `errors.MultiError` where each element is the launch error of
// the corresponding Tryjob.
//
// Uses Tryjob ID as the request key for deduplication. This ensures only one
// Buildbucket build will be scheduled for one Tryjob within the deduplication
// window (currently 1 min. in Buildbucket).
func (f *Facade) Launch(ctx context.Context, tryjobs []*tryjob.Tryjob, r *run.Run, cls []*run.RunCL) error {
tryjobsByHost := splitTryjobsByHost(tryjobs)
tryjobToIndex := make(map[*tryjob.Tryjob]int, len(tryjobs))
for i, tj := range tryjobs {
tryjobToIndex[tj] = i
}
launchErrs := errors.NewLazyMultiError(len(tryjobs))
poolErr := parallel.WorkPool(min(len(tryjobsByHost), 8), func(work chan<- func() error) {
for host, tryjobs := range tryjobsByHost {
host, tryjobs := host, tryjobs
work <- func() error {
err := f.schedule(ctx, host, r, cls, tryjobs)
switch merrs, ok := err.(errors.MultiError); {
case err == nil:
case !ok:
// assign singular error to all tryjobs.
for _, tj := range tryjobs {
launchErrs.Assign(tryjobToIndex[tj], err)
}
default:
for i, tj := range tryjobs {
launchErrs.Assign(tryjobToIndex[tj], merrs[i])
}
}
return nil
}
}
})
if poolErr != nil {
panic(fmt.Errorf("impossible"))
}
return launchErrs.Get()
}
func splitTryjobsByHost(tryjobs []*tryjob.Tryjob) map[string][]*tryjob.Tryjob {
ret := make(map[string][]*tryjob.Tryjob, 2) // normally, at most 2 host.
for _, tj := range tryjobs {
bbDef := tj.Definition.GetBuildbucket()
if bbDef == nil {
panic(fmt.Errorf("launch non-Buildbucket Tryjob (%T) with Buildbucket backend", tj.Definition.GetBackend()))
}
ret[bbDef.GetHost()] = append(ret[bbDef.GetHost()], tj)
}
return ret
}
func (f *Facade) schedule(ctx context.Context, host string, r *run.Run, cls []*run.RunCL, tryjobs []*tryjob.Tryjob) error {
bbClient, err := f.ClientFactory.MakeClient(ctx, host, r.ID.LUCIProject())
if err != nil {
return errors.Annotate(err, "failed to create Buildbucket client").Err()
}
batchReq, err := prepareBatchRequest(tryjobs, r, cls)
if err != nil {
return errors.Annotate(err, "failed to create batch schedule build request").Err()
}
batchRes, err := bbClient.Batch(ctx, batchReq)
if err != nil {
return err
}
ret := errors.NewLazyMultiError(len(tryjobs))
for i, res := range batchRes.GetResponses() {
switch res.GetResponse().(type) {
case *bbpb.BatchResponse_Response_ScheduleBuild:
build := res.GetScheduleBuild()
status, result, err := parseStatusAndResult(ctx, build)
if err != nil {
ret.Assign(i, err)
}
tj := tryjobs[i]
tj.ExternalID = tryjob.MustBuildbucketID(host, build.Id)
tj.Status = status
tj.Result = result
case *bbpb.BatchResponse_Response_Error:
ret.Assign(i, status.ErrorProto(res.GetError()))
default:
panic(fmt.Errorf("unexpected response type: %T", res.GetResponse()))
}
}
return ret.Get()
}
func prepareBatchRequest(tryjobs []*tryjob.Tryjob, r *run.Run, cls []*run.RunCL) (*bbpb.BatchRequest, error) {
gcs := makeGerritChanges(cls)
nonExpProp, expProp, err := makeProperties(r.Mode)
if err != nil {
return nil, errors.Annotate(err, "failed to make input properties").Err()
}
nonExpTags, expTags, err := makeTags(r, cls)
if err != nil {
return nil, errors.Annotate(err, "failed to make tags").Err()
}
batchReq := &bbpb.BatchRequest{
Requests: make([]*bbpb.BatchRequest_Request, len(tryjobs)),
}
for i, tj := range tryjobs {
def := tj.Definition
req := &bbpb.ScheduleBuildRequest{
RequestId: strconv.Itoa(int(tj.ID)),
Builder: def.GetBuildbucket().GetBuilder(),
Properties: nonExpProp,
GerritChanges: gcs,
Tags: nonExpTags,
Mask: defaultMask,
}
if def.GetExperimental() {
req.Properties = expProp
req.Tags = expTags
}
batchReq.Requests[i] = &bbpb.BatchRequest_Request{
Request: &bbpb.BatchRequest_Request_ScheduleBuild{
ScheduleBuild: req,
},
}
}
return batchReq, nil
}
func makeProperties(mode run.Mode) (nonexp, exp *structpb.Struct, err error) {
in := &recipe.Input{
Active: true,
DryRun: mode == run.DryRun,
RunMode: string(mode),
TopLevel: true,
}
if nonexp, err = makeCVProperties(in); err != nil {
return nil, nil, err
}
in.Experimental = true
if exp, err = makeCVProperties(in); err != nil {
return nil, nil, err
}
return nonexp, exp, nil
}
func makeCVProperties(in *recipe.Input) (*structpb.Struct, error) {
b, err := protojson.Marshal(in)
if err != nil {
return nil, err
}
var raw map[string]interface{}
if err := json.Unmarshal(b, &raw); err != nil {
return nil, err
}
return structpb.NewStruct(map[string]interface{}{propertyKey: raw})
}
func makeGerritChanges(cls []*run.RunCL) []*bbpb.GerritChange {
ret := make([]*bbpb.GerritChange, len(cls))
for i, cl := range cls {
g := cl.Detail.GetGerrit()
if g == nil {
panic(fmt.Errorf("change backend (%T) is not supported", cl.Detail.GetKind()))
}
ret[i] = &bbpb.GerritChange{
Host: g.GetHost(),
Project: g.GetInfo().GetProject(),
Change: g.GetInfo().GetNumber(),
Patchset: int64(cl.Detail.GetPatchset()),
}
}
return ret
}
func makeTags(r *run.Run, cls []*run.RunCL) (nonExp, exp []*bbpb.StringPair, err error) {
var commonTags []*bbpb.StringPair
addTag := func(key string, values ...string) {
for _, v := range values {
commonTags = append(commonTags, &bbpb.StringPair{Key: key, Value: v})
}
}
addTag("user_agent", "cq")
addTag("cq_attempt_key", r.ID.AttemptKey())
addTag("cq_cl_group_key", run.ComputeCLGroupKey(cls, false))
addTag("cq_equivalent_cl_group_key", run.ComputeCLGroupKey(cls, true))
owners := stringset.New(1) // normally 1 owner 1 triggerer
triggerers := stringset.New(1)
for _, cl := range cls {
ownerID, err := cl.Detail.OwnerIdentity()
if err != nil {
return nil, nil, err
}
owners.Add(ownerID.Email())
triggerers.Add(cl.Trigger.GetEmail())
}
addTag("cq_cl_owner", owners.ToSlice()...)
addTag("cq_triggerer", triggerers.ToSlice()...)
// TODO(crbug/1323978): support custom tags
nonExp = append([]*bbpb.StringPair{{Key: "cq_experimental", Value: "false"}}, commonTags...)
exp = append([]*bbpb.StringPair{{Key: "cq_experimental", Value: "true"}}, commonTags...)
sortTags(nonExp)
sortTags(exp)
return nonExp, exp, nil
}
func sortTags(tags []*bbpb.StringPair) {
sort.Slice(tags, func(i, j int) bool {
switch strings.Compare(tags[i].GetKey(), tags[j].GetKey()) {
case 0:
return strings.Compare(tags[i].GetValue(), tags[j].GetValue()) < 0
case -1:
return true
case 1:
return false
default:
panic(fmt.Errorf("unreachable"))
}
})
}
func min(i, j int) int {
if i < j {
return i
}
return j
}