blob: 18f60eb75587020f1d923cfc05e8a50c0086ac3c [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buildbucket
import (
"context"
"fmt"
"net/http"
"time"
"go.chromium.org/luci/buildbucket/deprecated"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
swarmbucketAPI "go.chromium.org/luci/common/api/buildbucket/swarmbucket/v1"
swarmingAPI "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/data/strpair"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/milo/buildsource/swarming"
"go.chromium.org/luci/milo/common"
"go.chromium.org/luci/milo/common/model"
"go.chromium.org/luci/milo/frontend/ui"
"go.chromium.org/luci/server/auth"
)
func getPool(c context.Context, bid *buildbucketpb.BuilderID) (*ui.MachinePool, error) {
// Get PoolKey
builderPool := model.BuilderPool{
BuilderID: datastore.MakeKey(c, model.BuilderSummaryKind, common.LegacyBuilderIDString(bid)),
}
// These are eventually consistent, so just log an error and pass if not found.
switch err := datastore.Get(c, &builderPool); {
case datastore.IsErrNoSuchEntity(err):
logging.Warningf(c, "builder pool not found")
return nil, nil
case err != nil:
return nil, err
}
// Get BotPool
botPool := &model.BotPool{PoolID: builderPool.PoolKey.StringID()}
switch err := datastore.Get(c, botPool); {
case datastore.IsErrNoSuchEntity(err):
logging.Warningf(c, "bot pool not found")
return nil, nil
case err != nil:
return nil, err
}
return ui.NewMachinePool(c, botPool), nil
}
// stripEmptyDimensions removes dimensions that are empty, such as "cores:".
func stripEmptyDimensions(dims []string) []string {
source := strpair.ParseMap(dims)
result := strpair.Map{}
for k, ds := range source {
for _, dim := range ds {
if dim != "" {
result.Add(k, dim)
}
}
}
return result.Format()
}
// processBuilders parses out all of the builder pools from the Swarmbucket get_builders response,
// and saves the BuilderPool information into the datastore.
// It returns a list of PoolDescriptors that needs to be fetched and saved.
func processBuilders(c context.Context, r *swarmbucketAPI.LegacySwarmbucketApiGetBuildersResponseMessage) ([]model.PoolDescriptor, error) {
var builderPools []model.BuilderPool
var descriptors []model.PoolDescriptor
seen := stringset.New(0)
for _, bucket := range r.Buckets {
project, bucketName := deprecated.BucketNameToV2(bucket.Name)
if project == "" {
// This may happen if the bucket or builder does not fulfill the LUCI
// naming convention.
logging.Warningf(c, "invalid bucket/builder %q/, skipping", bucket.Name)
continue
}
for _, builder := range bucket.Builders {
id := common.LegacyBuilderIDString(&buildbucketpb.BuilderID{
Project: project,
Bucket: bucketName,
Builder: builder.Name,
})
dimensions := stripEmptyDimensions(builder.SwarmingDimensions)
descriptor := model.NewPoolDescriptor(builder.SwarmingHostname, dimensions)
dID := descriptor.PoolID()
builderPools = append(builderPools, model.BuilderPool{
BuilderID: datastore.MakeKey(c, model.BuilderSummaryKind, id),
PoolKey: datastore.MakeKey(c, model.BotPoolKind, dID),
})
if added := seen.Add(dID); added {
descriptors = append(descriptors, descriptor)
}
}
}
return descriptors, datastore.Put(c, builderPools)
}
// parseBot parses a Swarming BotInfo response into the structure we will
// save into the datastore. Since BotInfo doesn't have an explicit status
// field that matches Milo's abstraction of a Bot, the status is inferred:
// * A bot with TaskID is Busy
// * A bot that is dead or quarantined is Offline
// * Otherwise, it is implicitly connected and Idle.
func parseBot(c context.Context, swarmingHost string, botInfo *swarmingAPI.SwarmingRpcsBotInfo) (*model.Bot, error) {
lastSeen, err := time.Parse(swarming.SwarmingTimeLayout, botInfo.LastSeenTs)
if err != nil {
return nil, err
}
result := &model.Bot{
Name: botInfo.BotId,
URL: fmt.Sprintf("https://%s/bot?id=%s", swarmingHost, botInfo.BotId),
LastSeen: lastSeen,
}
switch {
case botInfo.TaskId != "" || botInfo.MaintenanceMsg != "":
result.Status = model.Busy
case botInfo.IsDead || botInfo.Quarantined:
result.Status = model.Offline
default:
// Defaults to idle.
}
return result, nil
}
// processBot retrieves the Bot pool details from Swarming for a given set of
// dimensions for its respective Swarming host, and saves the data into datastore.
func processBot(c context.Context, desc model.PoolDescriptor) error {
t, err := auth.GetRPCTransport(c, auth.AsSelf)
if err != nil {
return err
}
sc, err := swarmingAPI.New(&http.Client{Transport: t})
if err != nil {
return err
}
sc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", desc.Host())
var bots []model.Bot
bl := sc.Bots.List().Dimensions(desc.Dimensions().Format()...)
// Keep fetching until the cursor is empty.
for {
botList, err := bl.Do()
if err != nil {
return err
}
for _, botInfo := range botList.Items {
// Ignore deleted bots.
if botInfo.Deleted {
continue
}
bot, err := parseBot(c, desc.Host(), botInfo)
if err != nil {
return err
}
bots = append(bots, *bot)
}
if botList.Cursor == "" {
break
}
bl = bl.Cursor(botList.Cursor)
}
// If there are too many bots, then it won't fit in datastore.
// Only store a subset of the bots.
// TODO(hinoka): This is inaccurate, but will only affect few builders.
// Instead of chopping this list off, just store the statistics.
if len(bots) > 1000 {
bots = bots[:1000]
}
// This is a large RPC, don't try to batch it.
return datastore.Put(c, &model.BotPool{
PoolID: desc.PoolID(),
Descriptor: desc,
Bots: bots,
LastUpdate: clock.Now(c),
})
}
// fetchBotPools resolves the descriptors into actual BotPool information.
// The input is a list of descriptors to fetch from swarming.
// Basically this just runs processBot() a bunch of times.
func processBots(c context.Context, descriptors []model.PoolDescriptor) error {
return parallel.WorkPool(8, func(ch chan<- func() error) {
for _, desc := range descriptors {
desc := desc
ch <- func() error {
return processBot(c, desc)
}
}
})
}
// UpdatePools is a cron job endpoint that:
// 1. Fetches all the builders from our associated Swarmbucket instance.
// 2. Consolidates all known descriptors (host+dimensions), saves BuilderPool.
// 3. Fetches and saves BotPool data from swarming for all known descriptors.
func UpdatePools(c context.Context) error {
host, err := getHost(c)
if err != nil {
return err
}
sc, err := newSwarmbucketClient(c, host)
if err != nil {
return err
}
r, err := sc.GetBuilders().Do()
if err != nil {
return err
}
// Process builders and save them. We get back the descriptors that we have
// to fetch next.
descriptors, err := processBuilders(c, r)
if err != nil {
return errors.Annotate(err, "processing builders").Err()
}
// And now also fetch and save the BotPools.
return processBots(c, descriptors)
}