blob: ebe802f510445b863983b9de56b25a0beb7b6d45 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package buildbucket
import (
buildbucketpb ""
swarmingAPI ""
func getPool(c context.Context, bid *buildbucketpb.BuilderID) (*ui.MachinePool, error) {
// Get PoolKey
builderPool := model.BuilderPool{
BuilderID: datastore.MakeKey(c, model.BuilderSummaryKind, common.LegacyBuilderIDString(bid)),
// These are eventually consistent, so just log an error and pass if not found.
switch err := datastore.Get(c, &builderPool); {
case datastore.IsErrNoSuchEntity(err):
logging.Warningf(c, "builder pool not found")
return nil, nil
case err != nil:
return nil, err
// Get BotPool
botPool := &model.BotPool{PoolID: builderPool.PoolKey.StringID()}
switch err := datastore.Get(c, botPool); {
case datastore.IsErrNoSuchEntity(err):
logging.Warningf(c, "bot pool not found")
return nil, nil
case err != nil:
return nil, err
return ui.NewMachinePool(c, botPool), nil
// stripDimensionExpiration removes dimension expiration if it exists.
// e.g. "60:key:value" -> "key:value".
func stripDimensionExpiration(dims []string) []string {
result := strpair.Map{}
for _, dim := range dims {
splitted := strings.Split(dim, ":")
key := splitted[0]
value := splitted[1]
if len(splitted) == 3 {
key = splitted[1]
value = splitted[2]
result.Add(key, value)
return result.Format()
// processBuilders saves the builder information into the datastore then returns
// a list of PoolDescriptors that needs to be fetched and saved.
func processBuilders(c context.Context, builders []*buildbucketpb.BuilderItem) ([]model.PoolDescriptor, error) {
var builderPools []model.BuilderPool
var descriptors []model.PoolDescriptor
seen := stringset.New(0)
for _, builder := range builders {
id := common.LegacyBuilderIDString(builder.Id)
dimensions := stripDimensionExpiration(builder.Config.Dimensions)
descriptor := model.NewPoolDescriptor(builder.Config.SwarmingHost, dimensions)
dID := descriptor.PoolID()
builderPools = append(builderPools, model.BuilderPool{
BuilderID: datastore.MakeKey(c, model.BuilderSummaryKind, id),
PoolKey: datastore.MakeKey(c, model.BotPoolKind, dID),
if added := seen.Add(dID); added {
descriptors = append(descriptors, descriptor)
return descriptors, datastore.Put(c, builderPools)
// parseBot parses a Swarming BotInfo response into the structure we will
// save into the datastore. Since BotInfo doesn't have an explicit status
// field that matches Milo's abstraction of a Bot, the status is inferred:
// * A bot with TaskID is Busy
// * A bot that is dead or quarantined is Offline
// * Otherwise, it is implicitly connected and Idle.
func parseBot(c context.Context, swarmingHost string, botInfo *swarmingAPI.SwarmingRpcsBotInfo) (*model.Bot, error) {
lastSeen, err := time.Parse(swarming.SwarmingTimeLayout, botInfo.LastSeenTs)
if err != nil {
return nil, err
result := &model.Bot{
Name: botInfo.BotId,
URL: fmt.Sprintf("https://%s/bot?id=%s", swarmingHost, botInfo.BotId),
LastSeen: lastSeen,
switch {
case botInfo.TaskId != "" || botInfo.MaintenanceMsg != "":
result.Status = milostatus.Busy
case botInfo.IsDead || botInfo.Quarantined:
result.Status = milostatus.Offline
// Defaults to idle.
return result, nil
// processBot retrieves the Bot pool details from Swarming for a given set of
// dimensions for its respective Swarming host, and saves the data into datastore.
func processBot(c context.Context, desc model.PoolDescriptor) error {
t, err := auth.GetRPCTransport(c, auth.AsSelf)
if err != nil {
return err
sc, err := swarmingAPI.New(&http.Client{Transport: t})
if err != nil {
return err
sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", desc.Host())
var bots []model.Bot
bl := sc.Bots.List().Dimensions(desc.Dimensions().Format()...)
// Keep fetching until the cursor is empty.
for {
botList, err := bl.Do()
if err != nil {
return err
for _, botInfo := range botList.Items {
// Ignore deleted bots.
if botInfo.Deleted {
bot, err := parseBot(c, desc.Host(), botInfo)
if err != nil {
return err
bots = append(bots, *bot)
if botList.Cursor == "" {
bl = bl.Cursor(botList.Cursor)
// If there are too many bots, then it won't fit in datastore.
// Only store a subset of the bots.
// TODO(hinoka): This is inaccurate, but will only affect few builders.
// Instead of chopping this list off, just store the statistics.
if len(bots) > 1000 {
bots = bots[:1000]
// This is a large RPC, don't try to batch it.
return datastore.Put(c, &model.BotPool{
PoolID: desc.PoolID(),
Descriptor: desc,
Bots: bots,
LastUpdate: clock.Now(c),
// fetchBotPools resolves the descriptors into actual BotPool information.
// The input is a list of descriptors to fetch from swarming.
// Basically this just runs processBot() a bunch of times.
func processBots(c context.Context, descriptors []model.PoolDescriptor) error {
return parallel.WorkPool(8, func(ch chan<- func() error) {
for _, desc := range descriptors {
desc := desc
ch <- func() error {
return processBot(c, desc)
// UpdatePools is a cron job endpoint that:
// 1. Fetches all the builders from our associated buildbucket instance.
// 2. Consolidates all known descriptors (host+dimensions), saves BuilderPool.
// 3. Fetches and saves BotPool data from swarming for all known descriptors.
func UpdatePools(c context.Context) error {
host, err := getHost(c)
if err != nil {
return err
buildersClient, err := ProdBuildersClientFactory(c, host, auth.AsSelf)
if err != nil {
return err
// Get all the builders from buildbucket.
builders := make([]*buildbucketpb.BuilderItem, 0)
req := &buildbucketpb.ListBuildersRequest{PageSize: 1000}
for {
r, err := buildersClient.ListBuilders(c, req)
if err != nil {
return err
builders = append(builders, r.Builders...)
if r.NextPageToken == "" {
req.PageToken = r.NextPageToken
logging.Infof(c, "got %d builders from buildbucket", len(builders))
// Process builders and save them. We get back the descriptors that we have
// to fetch next.
descriptors, err := processBuilders(c, builders)
if err != nil {
return errors.Annotate(err, "processing builders").Err()
// And now also fetch and save the BotPools.
return processBots(c, descriptors)