| // Copyright 2020 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package job |
| |
| import ( |
| "context" |
| "encoding/hex" |
| "fmt" |
| "path" |
| "sort" |
| "strings" |
| "time" |
| |
| "github.com/golang/protobuf/ptypes" |
| "google.golang.org/protobuf/proto" |
| durpb "google.golang.org/protobuf/types/known/durationpb" |
| |
| "go.chromium.org/luci/buildbucket/cmd/bbagent/bbinput" |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/data/rand/cryptorand" |
| "go.chromium.org/luci/common/data/stringset" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/flag/flagenum" |
| "go.chromium.org/luci/led/job/experiments" |
| logdog_types "go.chromium.org/luci/logdog/common/types" |
| swarmingpb "go.chromium.org/luci/swarming/proto/api_v2" |
| ) |
| |
| type cipdInput struct { |
| Package string `json:"package"` |
| Version string `json:"version"` |
| } |
| |
| type ledProperties struct { |
| LedRunID string `json:"led_run_id"` |
| |
| RbeCasInput *swarmingpb.CASReference `json:"rbe_cas_input,omitempty"` |
| |
| CIPDInput *cipdInput `json:"cipd_input,omitempty"` |
| |
| ShadowedBucket string `json:"shadowed_bucket"` |
| } |
| |
| // For accepting the "-resultdb" flag of "led launch". |
| type RDBEnablement string |
| |
| func (r *RDBEnablement) String() string { |
| return string(*r) |
| } |
| |
| func (r *RDBEnablement) Set(v string) error { |
| return RdbChoices.FlagSet(r, v) |
| } |
| |
| const ( |
| // Swarming/ResultDB integration will be forcefully enabled. |
| RDBOn RDBEnablement = "on" |
| // Swarming/ResultDB integration will be forcefully disabled. |
| RDBOff RDBEnablement = "off" |
| ) |
| |
| var RdbChoices = flagenum.Enum{ |
| "on": RDBOn, |
| "off": RDBOff, |
| } |
| |
| func (jd *Definition) addLedProperties(ctx context.Context, uid string) (err error) { |
| // Set the "$recipe_engine/led" recipe properties. |
| bb := jd.GetBuildbucket() |
| if bb == nil { |
| panic("impossible: Buildbucket is nil while flattening to swarming") |
| } |
| bb.EnsureBasics() |
| |
| bb.BbagentArgs.Build.CreateTime, err = ptypes.TimestampProto(clock.Now(ctx)) |
| if err != nil { |
| return errors.Annotate(err, "populating creation time").Err() |
| } |
| |
| buf := make([]byte, 32) |
| if _, err := cryptorand.Read(ctx, buf); err != nil { |
| return errors.Annotate(err, "generating random token").Err() |
| } |
| logdogPrefixSN, err := logdog_types.MakeStreamName("", "led", uid, hex.EncodeToString(buf)) |
| if err != nil { |
| return errors.Annotate(err, "generating logdog token").Err() |
| } |
| logdogPrefix := string(logdogPrefixSN) |
| logdogProjectPrefix := path.Join(bb.BbagentArgs.Build.Infra.Logdog.Project, logdogPrefix) |
| |
| // TODO(iannucci): change logdog project to something reserved to 'led' tasks. |
| // Though if we merge logdog into resultdb, this hopefully becomes moot. |
| bb.BbagentArgs.Build.Infra.Logdog.Prefix = logdogPrefix |
| |
| // Pass the CIPD package or isolate containing the recipes code into |
| // the led recipe module. This gives the build the information it needs |
| // to launch child builds using the same version of the recipes code. |
| // |
| // The logdog prefix is unique to each led job, so it can be used as an |
| // ID for the job. |
| props := ledProperties{LedRunID: logdogProjectPrefix} |
| casUserPayload, err := jd.Info().CurrentIsolated() |
| if err != nil { |
| return errors.Annotate(err, "failed to get CAS user payload for the build").Err() |
| } |
| if exe := bb.GetBbagentArgs().GetBuild().GetExe(); exe.GetCipdPackage() != "" { |
| props.CIPDInput = &cipdInput{ |
| Package: exe.CipdPackage, |
| Version: exe.CipdVersion, |
| } |
| } else if casUserPayload.GetDigest() != nil { |
| props.RbeCasInput = proto.Clone(casUserPayload).(*swarmingpb.CASReference) |
| } |
| |
| // in case both isolate and rbe-cas properties are set in "$recipe_engine/led". |
| bb.WriteProperties(map[string]any{ |
| "$recipe_engine/led": nil, |
| }) |
| bb.WriteProperties(map[string]any{ |
| "$recipe_engine/led": props, |
| }) |
| |
| streamName := "build.proto" |
| if bb.LegacyKitchen { |
| streamName = "annotations" |
| } |
| |
| logdogHost := "logs.chromium.org" |
| if strings.Contains(jd.Info().SwarmingHostname(), "-dev") { |
| logdogHost = "luci-logdog-dev.appspot.com" |
| } |
| |
| logdogTag := "log_location:logdog://" + path.Join( |
| logdogHost, logdogProjectPrefix, "+", streamName) |
| |
| return jd.Edit(func(je Editor) { |
| je.Tags([]string{logdogTag, "allow_milo:1"}) |
| }) |
| } |
| |
| type expiringDims struct { |
| absolute time.Duration // from scheduling task |
| relative time.Duration // from previous slice |
| |
| // key -> values |
| dimensions map[string]stringset.Set |
| } |
| |
| func (ed *expiringDims) addDimVals(key string, values ...string) { |
| if ed.dimensions == nil { |
| ed.dimensions = map[string]stringset.Set{} |
| } |
| if set, ok := ed.dimensions[key]; !ok { |
| ed.dimensions[key] = stringset.NewFromSlice(values...) |
| } else { |
| set.AddAll(values) |
| } |
| } |
| |
| func (ed *expiringDims) updateFrom(other *expiringDims) { |
| for key, values := range other.dimensions { |
| ed.addDimVals(key, values.ToSlice()...) |
| } |
| } |
| |
| func (ed *expiringDims) createWith(template *swarmingpb.TaskProperties) *swarmingpb.TaskProperties { |
| if len(template.Dimensions) != 0 { |
| panic("impossible; createWith called with dimensions already set") |
| } |
| |
| ret := proto.Clone(template).(*swarmingpb.TaskProperties) |
| |
| newDims := make([]*swarmingpb.StringPair, 0, len(ed.dimensions)) |
| for _, key := range keysOf(ed.dimensions) { |
| for _, value := range ed.dimensions[key].ToSortedSlice() { |
| newDims = append(newDims, &swarmingpb.StringPair{ |
| Key: key, Value: value}) |
| } |
| } |
| ret.Dimensions = newDims |
| |
| return ret |
| } |
| |
| func (jd *Definition) makeExpiringSliceData() (ret []*expiringDims, err error) { |
| bb := jd.GetBuildbucket() |
| expirationSet := map[time.Duration]*expiringDims{} |
| nonExpiring := &expiringDims{} |
| getExpiringSlot := func(dimType, name string, protoDuration *durpb.Duration) (*expiringDims, error) { |
| var dur time.Duration |
| if protoDuration != nil { |
| var err error |
| if dur, err = ptypes.Duration(protoDuration); err != nil { |
| return nil, errors.Annotate(err, "parsing %s %q expiration", dimType, name).Err() |
| } |
| } |
| if dur > 0 { |
| data, ok := expirationSet[dur] |
| if !ok { |
| data = &expiringDims{absolute: dur} |
| expirationSet[dur] = data |
| } |
| return data, nil |
| } |
| return nil, nil |
| } |
| // Cache and dimension expiration have opposite defaults for 0 or negative |
| // times. |
| // |
| // Cache entries with WaitForWarmCache <= 0 mean that the dimension for the |
| // cache essentially expires at 0. |
| // |
| // Dimension entries with Expiration <= 0 mean that the dimension expires at |
| // 'infinity' |
| for _, cache := range bb.BbagentArgs.GetBuild().GetInfra().GetSwarming().GetCaches() { |
| slot, err := getExpiringSlot("cache", cache.Name, cache.WaitForWarmCache) |
| if err != nil { |
| return nil, err |
| } |
| if slot != nil { |
| slot.addDimVals("caches", cache.Name) |
| } |
| } |
| for _, dim := range bb.BbagentArgs.GetBuild().GetInfra().GetSwarming().GetTaskDimensions() { |
| slot, err := getExpiringSlot("dimension", dim.Key, dim.Expiration) |
| if err != nil { |
| return nil, err |
| } |
| if slot == nil { |
| slot = nonExpiring |
| } |
| slot.addDimVals(dim.Key, dim.Value) |
| } |
| |
| ret = make([]*expiringDims, 0, len(expirationSet)) |
| if len(expirationSet) > 0 { |
| for _, data := range expirationSet { |
| ret = append(ret, data) |
| } |
| sort.Slice(ret, func(i, j int) bool { |
| return ret[i].absolute < ret[j].absolute |
| }) |
| ret[0].relative = ret[0].absolute |
| for i := range ret[1:] { |
| ret[i+1].relative = ret[i+1].absolute - ret[i].absolute |
| } |
| } |
| if total, err := ptypes.Duration(bb.BbagentArgs.Build.SchedulingTimeout); err == nil { |
| if len(ret) == 0 || ret[len(ret)-1].absolute < total { |
| // if the task's total expiration time is greater than the last slice's |
| // expiration, then use nonExpiring as the last slice. |
| nonExpiring.absolute = total |
| if len(ret) > 0 { |
| nonExpiring.relative = total - ret[len(ret)-1].absolute |
| } else { |
| nonExpiring.relative = total |
| } |
| ret = append(ret, nonExpiring) |
| } else { |
| // otherwise, add all of nonExpiring's guts to the last slice. |
| ret[len(ret)-1].updateFrom(nonExpiring) |
| } |
| } |
| |
| // Ret now looks like: |
| // rel @ 20s - caches:[a b c] |
| // rel @ 40s - caches:[d e] |
| // rel @ inf - caches:[f] |
| // |
| // We need to transform this into: |
| // rel @ 20s - caches:[a b c d e f] |
| // rel @ 40s - caches:[d e f] |
| // rel @ inf - caches:[f] |
| // |
| // Since a slice expiring at 20s includes all the caches (and dimensions) of |
| // all slices expiring after it. |
| for i := len(ret) - 2; i >= 0; i-- { |
| ret[i].updateFrom(ret[i+1]) |
| } |
| |
| return |
| } |
| |
| func (jd *Definition) generateCommand(ctx context.Context, ks KitchenSupport) ([]string, error) { |
| bb := jd.GetBuildbucket() |
| |
| if bb.LegacyKitchen { |
| return ks.GenerateCommand(ctx, bb) |
| } |
| |
| ret := []string{"bbagent${EXECUTABLE_SUFFIX}"} |
| if bb.FinalBuildProtoPath != "" { |
| ret = append(ret, "--output", path.Join("${ISOLATED_OUTDIR}", bb.FinalBuildProtoPath)) |
| } |
| bb.BbagentArgs.Build.Infra.Buildbucket.Hostname = "" |
| if bb.BbagentArgs.CacheDir == "" { |
| bb.BbagentArgs.CacheDir = bb.BbagentArgs.Build.GetInfra().GetBbagent().GetCacheDir() |
| } |
| if bb.BbagentArgs.PayloadPath == "" { |
| bb.BbagentArgs.PayloadPath = "kitchen-checkout" |
| } |
| return append(ret, bbinput.Encode(bb.BbagentArgs)), nil |
| } |
| |
| func (jd *Definition) generateCIPDPackages() (cipdPackages []*swarmingpb.CipdPackage) { |
| cipdPackages = ([]*swarmingpb.CipdPackage)(nil) |
| bb := jd.GetBuildbucket() |
| if !bb.BbagentDownloadCIPDPkgs() { |
| cipdPackages = append(cipdPackages, bb.CipdPackages...) |
| return |
| } |
| |
| if agentSrc := bb.BbagentArgs.GetBuild().GetInfra().GetBuildbucket().GetAgent().GetSource(); agentSrc != nil { |
| if cipdSource := agentSrc.GetCipd(); cipdSource != nil { |
| cipdPackages = append(cipdPackages, &swarmingpb.CipdPackage{ |
| Path: ".", |
| PackageName: cipdSource.Package, |
| Version: cipdSource.Version, |
| }) |
| } |
| } |
| return |
| } |
| |
| // FlattenToSwarming modifies this Definition to populate the Swarming field |
| // from the Buildbucket field. |
| // |
| // After flattening, HighLevelEdit functionality will no longer work on this |
| // Definition. |
| // |
| // `uid` and `parentTaskId`, if specified, override the user and parentTaskId |
| // fields, respectively. |
| func (jd *Definition) FlattenToSwarming(ctx context.Context, uid, parentTaskId string, ks KitchenSupport, resultdb RDBEnablement) error { |
| if sw := jd.GetSwarming(); sw != nil { |
| if uid != "" { |
| sw.Task.User = uid |
| } |
| if parentTaskId != "" { |
| sw.Task.ParentTaskId = parentTaskId |
| } |
| switch resultdb { |
| case RDBOff: |
| sw.Task.Resultdb = nil |
| case RDBOn: |
| if sw.Task.Realm != "" { |
| sw.Task.Resultdb = &swarmingpb.ResultDBCfg{ |
| Enable: true, |
| } |
| } else { |
| return errors.Reason("ResultDB cannot be enabled on raw swarming tasks if the realm field is unset").Err() |
| } |
| default: |
| } |
| return nil |
| } |
| err := jd.addLedProperties(ctx, uid) |
| if err != nil { |
| return errors.Annotate(err, "adding led properties").Err() |
| } |
| |
| expiringDims, err := jd.makeExpiringSliceData() |
| if err != nil { |
| return errors.Annotate(err, "calculating expirations").Err() |
| } |
| |
| bb := jd.GetBuildbucket() |
| bbi := bb.GetBbagentArgs().GetBuild().GetInfra() |
| project := bb.GetBbagentArgs().GetBuild().GetBuilder().GetProject() |
| bucket := bb.GetBbagentArgs().GetBuild().GetBuilder().GetBucket() |
| if project == "" || bucket == "" { |
| return errors.Reason("incomplete Builder ID, need both `project` and `bucket` set").Err() |
| } |
| sw := &Swarming{ |
| Hostname: jd.Info().SwarmingHostname(), |
| Task: &swarmingpb.NewTaskRequest{ |
| Name: jd.Info().TaskName(), |
| Realm: fmt.Sprintf("%s:%s", project, bucket), |
| ParentTaskId: parentTaskId, |
| Priority: jd.Info().Priority(), |
| ServiceAccount: bbi.GetSwarming().GetTaskServiceAccount(), |
| Tags: jd.Info().Tags(), |
| User: uid, |
| TaskSlices: make([]*swarmingpb.TaskSlice, len(expiringDims)), |
| }, |
| } |
| |
| // Enable swarming/resultdb integration. |
| enableRDB := (resultdb == RDBOn || (resultdb == "" && bbi.GetResultdb().GetInvocation() != "")) |
| if enableRDB { |
| // Clear the original build's ResultDB invocation. |
| bbi.Resultdb.Invocation = "" |
| sw.Task.Resultdb = &swarmingpb.ResultDBCfg{ |
| Enable: true, |
| } |
| } |
| |
| var casUserPayload *swarmingpb.CASReference |
| // Do not set CAS input to task slices if bbagent handles downloading packages. |
| if !bb.BbagentDownloadCIPDPkgs() { |
| casUserPayload, err = jd.Info().CurrentIsolated() |
| if err != nil { |
| return errors.Annotate(err, "failed to get CAS user payload for the build").Err() |
| } |
| } |
| baseProperties := &swarmingpb.TaskProperties{ |
| CipdInput: &swarmingpb.CipdInput{ |
| Packages: jd.generateCIPDPackages(), |
| }, |
| CasInputRoot: casUserPayload, |
| |
| EnvPrefixes: bb.EnvPrefixes, |
| ExecutionTimeoutSecs: int32(bb.BbagentArgs.Build.ExecutionTimeout.GetSeconds()), |
| |
| // TODO(iannucci): When build creation is done in Go, share this 3 minute |
| // constant between here and there. Or, better, implement CreateBuild so we |
| // don't have to do this at all. |
| GracePeriodSecs: int32(bb.BbagentArgs.Build.GracePeriod.GetSeconds()) + 180, |
| } |
| |
| if bb.Containment.GetContainmentType() != swarmingpb.ContainmentType_NOT_SPECIFIED { |
| baseProperties.Containment = bb.Containment |
| } |
| |
| baseProperties.Env = make([]*swarmingpb.StringPair, len(bb.EnvVars)+1) |
| copy(baseProperties.Env, bb.EnvVars) |
| expEnvValue := "FALSE" |
| if bb.BbagentArgs.Build.Input.Experimental { |
| expEnvValue = "TRUE" |
| } |
| baseProperties.Env[len(baseProperties.Env)-1] = &swarmingpb.StringPair{ |
| Key: "BUILDBUCKET_EXPERIMENTAL", |
| Value: expEnvValue, |
| } |
| |
| if caches := bb.BbagentArgs.Build.Infra.Swarming.GetCaches(); len(caches) > 0 { |
| baseProperties.Caches = make([]*swarmingpb.CacheEntry, len(caches)) |
| for i, cache := range caches { |
| baseProperties.Caches[i] = &swarmingpb.CacheEntry{ |
| Name: cache.Name, |
| Path: path.Join(bb.CacheDir(), cache.Path), |
| } |
| } |
| } |
| |
| baseProperties.Command, err = jd.generateCommand(ctx, ks) |
| if err != nil { |
| return errors.Annotate(err, "generating Command").Err() |
| } |
| |
| if exe := bb.BbagentArgs.Build.Exe; exe.GetCipdPackage() != "" && !bb.BbagentDownloadCIPDPkgs() { |
| baseProperties.CipdInput.Packages = append(baseProperties.CipdInput.Packages, &swarmingpb.CipdPackage{ |
| PackageName: exe.CipdPackage, |
| Version: exe.CipdVersion, |
| Path: bb.PayloadPath(), |
| }) |
| } |
| |
| for i, dat := range expiringDims { |
| sw.Task.TaskSlices[i] = &swarmingpb.TaskSlice{ |
| ExpirationSecs: int32(dat.relative.Seconds()), |
| Properties: dat.createWith(baseProperties), |
| } |
| } |
| |
| if err := experiments.Apply(ctx, bb.BbagentArgs.Build, sw.Task); err != nil { |
| return errors.Annotate(err, "applying experiments").Err() |
| } |
| |
| jd.JobType = &Definition_Swarming{Swarming: sw} |
| return nil |
| } |