blob: d188bb467ecdfbce595bd93849623212f1ad312e [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import (
durpb ""
bbpb ""
logdog_types ""
swarmingpb ""
type isoInput struct {
Server string `json:"server"`
Namespace string `json:"namespace"`
Hash string `json:"hash"`
type cipdInput struct {
Package string `json:"package"`
Version string `json:"version"`
type ledProperties struct {
LedRunID string `json:"led_run_id"`
IsolatedInput *isoInput `json:"isolated_input,omitempty"`
CIPDInput *cipdInput `json:"cipd_input,omitempty"`
func (jd *Definition) addLedProperties(ctx context.Context, uid string) error {
// Set the "$recipe_engine/led" recipe properties.
buf := make([]byte, 32)
if _, err := cryptorand.Read(ctx, buf); err != nil {
return errors.Annotate(err, "generating random token").Err()
streamName, err := logdog_types.MakeStreamName("", "led", uid, hex.EncodeToString(buf))
if err != nil {
return errors.Annotate(err, "generating logdog token").Err()
logdogPrefix := string(streamName)
bb := jd.GetBuildbucket()
if bb == nil {
panic("impossible: Buildbucket is nil while flattening to swarming")
// TODO(iannucci): change logdog project to something reserved to 'led' tasks.
// Though if we merge logdog into resultdb, this hopefully becomes moot.
bb.BbagentArgs.Build.Infra.Logdog.Prefix = logdogPrefix
// Pass the CIPD package or isolate containing the recipes code into
// the led recipe module. This gives the build the information it needs
// to launch child builds using the same version of the recipes code.
props := ledProperties{LedRunID: logdogPrefix}
// The logdog prefix is unique to each led job, so it can be used as an
// ID for the job.
if payload := jd.GetUserPayload(); payload.GetDigest() != "" {
props.IsolatedInput = &isoInput{
Server: payload.GetServer(),
Namespace: payload.GetNamespace(),
Hash: payload.GetDigest(),
} else if pkg := bb.GetBbagentArgs().GetBuild().GetExe(); pkg != nil {
props.CIPDInput = &cipdInput{
Package: pkg.GetCipdPackage(),
Version: pkg.GetCipdVersion(),
"$recipe_engine/led": props,
logdogTag := "log_location:logdog://" + logdogPrefix
if bb.LegacyKitchen {
logdogTag += "/+/annotations"
} else {
logdogTag += "/+/build.proto"
return jd.Edit(func(je Editor) {
je.Tags([]string{logdogTag, "allow_milo:1"})
type expiringData struct {
absolute time.Duration // from scheduling task
relative time.Duration // from previous slice
dimesions []*swarmingpb.StringPair
caches []*bbpb.BuildInfra_Swarming_CacheEntry
func (ed *expiringData) createWith(template *swarmingpb.TaskProperties) *swarmingpb.TaskProperties {
if len(template.Dimensions) != 0 {
panic("impossible; createWith called with dimensions already set")
ret := proto.Clone(template).(*swarmingpb.TaskProperties)
dimMap := map[string]stringset.Set{}
addDims := func(key string, values ...string) {
if set, ok := dimMap[key]; !ok {
dimMap[key] = stringset.NewFromSlice(values...)
} else {
for _, dim := range ed.dimesions {
addDims(dim.Key, dim.Value)
for _, nc := range ed.caches {
ret.NamedCaches = append(ret.NamedCaches, &swarmingpb.NamedCacheEntry{
Name: nc.Name,
DestPath: nc.Path,
addDims("caches", nc.Name)
newDims := make([]*swarmingpb.StringListPair, 0, len(dimMap))
for _, key := range keysOf(dimMap) {
newDims = append(newDims, &swarmingpb.StringListPair{
Key: key, Values: dimMap[key].ToSortedSlice()})
ret.Dimensions = newDims
return ret
func (jd *Definition) makeExpiringSliceData() (ret []*expiringData, err error) {
bb := jd.GetBuildbucket()
expirationSet := map[time.Duration]*expiringData{}
nonExpiring := expiringData{}
addExpiration := func(name string, protoDuration *durpb.Duration, add func(d *expiringData)) error {
if protoDuration == nil {
protoDuration = &durpb.Duration{}
dur, err := ptypes.Duration(protoDuration)
if err != nil {
return errors.Annotate(err, "parsing %s expiration", name).Err()
if dur > 0 {
data, ok := expirationSet[dur]
if !ok {
data = &expiringData{absolute: dur}
expirationSet[dur] = data
} else {
return nil
for _, cache := range bb.BbagentArgs.GetBuild().GetInfra().GetSwarming().GetCaches() {
err := addExpiration("cache", cache.WaitForWarmCache, func(data *expiringData) {
data.caches = append(data.caches, cache)
if err != nil {
return nil, err
for _, dim := range bb.BbagentArgs.GetBuild().GetInfra().GetSwarming().GetTaskDimensions() {
err := addExpiration("dimension", dim.Expiration, func(data *expiringData) {
data.dimesions = append(data.dimesions, &swarmingpb.StringPair{Key: dim.Key, Value: dim.Value})
if err != nil {
return nil, err
ret = make([]*expiringData, 0, len(expirationSet))
for _, data := range expirationSet {
ret = append(ret, data)
sort.Slice(ret, func(i, j int) bool {
return ret[i].absolute < ret[j].absolute
ret[0].relative = ret[0].absolute
for i := range ret[1:] {
ret[i+1].relative = ret[i+1].absolute - ret[i].absolute
if total, err := ptypes.Duration(bb.BbagentArgs.Build.SchedulingTimeout); err == nil {
if ret[len(ret)-1].absolute < total {
// if the task's total expiration time is greater than the last slice's
// expiration, then use nonExpiring as the last slice.
nonExpiring.absolute = total
nonExpiring.relative = total - ret[len(ret)-1].absolute
ret = append(ret, &nonExpiring)
} else {
// otherwise, add all of nonExpiring's guts to the last slice.
last := ret[len(ret)-1]
last.caches = append(last.caches, nonExpiring.caches...)
last.dimesions = append(last.dimesions, nonExpiring.dimesions...)
// Ret now looks like:
// rel @ 20s - caches:[a b c]
// rel @ 40s - caches:[d e]
// rel @ inf - caches:[f]
// We need to transform this into:
// rel @ 20s - caches:[a b c d e f]
// rel @ 40s - caches:[d e f]
// rel @ inf - caches:[f]
// Since a slice expiring at 20s includes all the caches (and dimensions) of
// all slices expiring after it.
for i := len(ret) - 2; i >= 0; i-- {
ret[i].dimesions = append(ret[i].dimesions, ret[i+1].dimesions...)
ret[i].caches = append(ret[i].caches, ret[i+1].caches...)
func (jd *Definition) generateCommand(ctx context.Context, ks KitchenSupport) ([]string, error) {
bb := jd.GetBuildbucket()
if bb.LegacyKitchen {
return ks.GenerateCommand(ctx, bb)
// TODO(iannucci): have bbagent set 'logdog.viewer_url' to the milo build
// view URL if there's no buildbucket build associated with it.
return []string{
"bbagent${EXECUTABLE_SUFFIX}", bbinput.Encode(bb.BbagentArgs),
}, nil
// FlattenToSwarming modifies this Definition to populate the Swarming field
// from the Buildbucket field.
// After flattening, HighLevelEdit functionality will no longer work on this
// Definition.
func (jd *Definition) FlattenToSwarming(ctx context.Context, uid string, ks KitchenSupport) error {
if jd.GetSwarming() != nil {
return nil
err := jd.addLedProperties(ctx, uid)
if err != nil {
return errors.Annotate(err, "adding led properties").Err()
expiringData, err := jd.makeExpiringSliceData()
if err != nil {
return errors.Annotate(err, "calculating expirations").Err()
bb := jd.GetBuildbucket()
bbi := bb.GetBbagentArgs().GetBuild().GetInfra()
sw := &Swarming{
Hostname: jd.Info().SwarmingHostname(),
Task: &swarmingpb.TaskRequest{
Name: jd.Info().TaskName(),
Priority: jd.Info().Priority(),
ServiceAccount: bbi.GetSwarming().GetTaskServiceAccount(),
Tags: jd.Info().Tags(),
User: uid,
TaskSlices: make([]*swarmingpb.TaskSlice, len(expiringData)),
baseProperties := &swarmingpb.TaskProperties{
CipdInputs: append(([]*swarmingpb.CIPDPackage)(nil), bb.CipdPackages...),
CasInputs: jd.UserPayload,
EnvPaths: bb.EnvPrefixes,
ExecutionTimeout: bb.BbagentArgs.Build.ExecutionTimeout,
GracePeriod: bb.GracePeriod,
Containment: bb.Containment,
baseProperties.Command, err = jd.generateCommand(ctx, ks)
if err != nil {
return errors.Annotate(err, "generating Command").Err()
if exe := bb.BbagentArgs.Build.Exe; exe != nil {
baseProperties.CipdInputs = append(baseProperties.CipdInputs, &swarmingpb.CIPDPackage{
PackageName: exe.CipdPackage,
Version: exe.CipdVersion,
DestPath: path.Dir(bb.BbagentArgs.ExecutablePath),
for i, dat := range expiringData {
sw.Task.TaskSlices[i] = &swarmingpb.TaskSlice{
Expiration: ptypes.DurationProto(dat.relative),
Properties: dat.createWith(baseProperties),
jd.JobType = &Definition_Swarming{Swarming: sw}
return nil