blob: 8ff525501606f8dd8a1a2f0983f91366ae1557d7 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Package swarming implements a client for creating skylab-swarming tasks and
// getting their results.
package swarming
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
swarming_api "go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"google.golang.org/api/googleapi"
)
// SkylabPool is the swarming pool for all skylab bots.
const SkylabPool = "ChromeOSSkylab"
// Client is a swarming client for creating tasks and waiting for their results.
type Client struct {
SwarmingService *swarming_api.Service
server string
}
// ListedHost is a collection of information about the DUT managed by a particular bot.
type ListedHost struct {
Hostname string
}
func (l *ListedHost) String() string {
return l.Hostname
}
// NewClient creates a new Client.
func NewClient(h *http.Client, server string) (*Client, error) {
service, err := newSwarmingService(h, server)
if err != nil {
return nil, err
}
c := &Client{
SwarmingService: service,
server: server,
}
return c, nil
}
const swarmingAPISuffix = "_ah/api/swarming/v1/"
func newSwarmingService(h *http.Client, server string) (*swarming_api.Service, error) {
s, err := swarming_api.New(h)
if err != nil {
return nil, errors.Annotate(err, "create swarming client").Err()
}
s.BasePath = server + swarmingAPISuffix
return s, nil
}
// CreateTask creates a swarming task based on the given request,
// retrying transient errors.
func (c *Client) CreateTask(ctx context.Context, req *swarming_api.SwarmingRpcsNewTaskRequest) (*swarming_api.SwarmingRpcsTaskRequestMetadata, error) {
var resp *swarming_api.SwarmingRpcsTaskRequestMetadata
createTask := func() error {
var err error
resp, err = c.SwarmingService.Tasks.New(req).Context(ctx).Do()
return err
}
if err := callWithRetries(ctx, "create task", createTask); err != nil {
return nil, err
}
return resp, nil
}
func getFullTaskList(ctx context.Context, call *swarming_api.TasksListCall) ([]*swarming_api.SwarmingRpcsTaskResult, error) {
var tr []*swarming_api.SwarmingRpcsTaskResult
var err error
var tl *swarming_api.SwarmingRpcsTaskList
for {
tl, err = call.Context(ctx).Do()
if err != nil {
return nil, err
}
tr = append(tr, tl.Items...)
if tl.Cursor == "" {
break
}
call = call.Cursor(tl.Cursor)
}
return tr, nil
}
// getActiveLeaseTasksForDimensions gets active leases for any combination of dimensions and
// does no sanity checking on the dimensions provided.
func (c *Client) getActiveLeaseTasksForDimensions(ctx context.Context, dims map[string]string) ([]*swarming_api.SwarmingRpcsTaskResult, error) {
var tr []*swarming_api.SwarmingRpcsTaskResult
getResult := func() error {
tr = nil
var err error
var call *swarming_api.TasksListCall
tags := dimsToTags(dims)
call = c.SwarmingService.Tasks.List().Tags(tags...).State("RUNNING")
r, err := getFullTaskList(ctx, call)
if err != nil {
return err
}
tr = append(tr, r...)
call = c.SwarmingService.Tasks.List().Tags(tags...).State("PENDING")
r, err = getFullTaskList(ctx, call)
if err != nil {
return err
}
tr = append(tr, r...)
return nil
}
if err := callWithRetries(ctx, "get result", getResult); err != nil {
return nil, errors.Annotate(err, fmt.Sprintf("get active leases tasks for dims (%s)", formatDims(dims))).Err()
}
return tr, nil
}
// GetActiveLeaseTasksForModel gets active leases *specifically* targeted to a model.
// Leases that apply to a specific hostname are not counted here.
// TODO(gregorynisbet): Count leases that target specific hostnames as well.
func (c *Client) GetActiveLeaseTasksForModel(ctx context.Context, model string) ([]*swarming_api.SwarmingRpcsTaskResult, error) {
// TODO(gregorynisbet): Can we search by tags as well?
var dims = map[string]string{
"label-model": model,
"skylab-tool": "lease",
}
return c.getActiveLeaseTasksForDimensions(ctx, dims)
}
// GetActiveLeaseTasksForBoard gets active leases *specifically* targeted to a board.
func (c *Client) GetActiveLeaseTasksForBoard(ctx context.Context, board string) ([]*swarming_api.SwarmingRpcsTaskResult, error) {
var dims = map[string]string{
"label-board": board,
"skylab-tool": "lease",
}
return c.getActiveLeaseTasksForDimensions(ctx, dims)
}
// CancelTask cancels a swarming task by taskID,
// retrying transient errors.
func (c *Client) CancelTask(ctx context.Context, taskID string) error {
ctx, cf := context.WithTimeout(ctx, 60*time.Second)
defer cf()
var tc *swarming_api.SwarmingRpcsCancelResponse
getResult := func() error {
var err error
req := &swarming_api.SwarmingRpcsTaskCancelRequest{
KillRunning: true,
}
tc, err = c.SwarmingService.Task.Cancel(taskID, req).Context(ctx).Do()
return err
}
if err := callWithRetries(ctx, "get result", getResult); err != nil {
return errors.Annotate(err, fmt.Sprintf("cancel task %s", taskID)).Err()
}
if !tc.Ok {
return errors.New(fmt.Sprintf("task %s is not successfully canceled", taskID))
}
return nil
}
// GetResults gets results for the tasks with given IDs,
// retrying transient errors.
func (c *Client) GetResults(ctx context.Context, IDs []string) ([]*swarming_api.SwarmingRpcsTaskResult, error) {
results := make([]*swarming_api.SwarmingRpcsTaskResult, len(IDs))
for i, ID := range IDs {
var r *swarming_api.SwarmingRpcsTaskResult
getResult := func() error {
var err error
r, err = c.SwarmingService.Task.Result(ID).Context(ctx).Do()
return err
}
if err := callWithRetries(ctx, "get result", getResult); err != nil {
return nil, errors.Annotate(err, fmt.Sprintf("get swarming result for task %s", ID)).Err()
}
results[i] = r
}
return results, nil
}
// GetResultsForTags gets results for tasks that match all the given tags,
// retrying transient errors.
func (c *Client) GetResultsForTags(ctx context.Context, tags []string) ([]*swarming_api.SwarmingRpcsTaskResult, error) {
var results *swarming_api.SwarmingRpcsTaskList
getResults := func() error {
var err error
results, err = c.SwarmingService.Tasks.List().Tags(tags...).Context(ctx).Do()
return err
}
if err := callWithRetries(ctx, "get result", getResults); err != nil {
return nil, errors.Annotate(err, fmt.Sprintf("get swarming result for tags %s", tags)).Err()
}
return results.Items, nil
}
// GetRequests gets the task requests for the given task IDs,
// retrying transient errors.
func (c *Client) GetRequests(ctx context.Context, IDs []string) ([]*swarming_api.SwarmingRpcsTaskRequest, error) {
requests := make([]*swarming_api.SwarmingRpcsTaskRequest, len(IDs))
for i, ID := range IDs {
var request *swarming_api.SwarmingRpcsTaskRequest
getRequest := func() error {
var err error
request, err = c.SwarmingService.Task.Request(ID).Context(ctx).Do()
return err
}
if err := callWithRetries(ctx, "get request", getRequest); err != nil {
return nil, errors.Annotate(err, fmt.Sprintf("rerun task %s", ID)).Err()
}
requests[i] = request
}
return requests, nil
}
// GetTaskState gets the state of the given task,
// retrying transient errors.
func (c *Client) GetTaskState(ctx context.Context, ID string) (*swarming_api.SwarmingRpcsTaskStates, error) {
var result *swarming_api.SwarmingRpcsTaskStates
getState := func() error {
var err error
result, err = c.SwarmingService.Tasks.GetStates().TaskId(ID).Context(ctx).Do()
return err
}
if err := callWithRetries(ctx, "get state", getState); err != nil {
return nil, errors.Annotate(err, fmt.Sprintf("get task state for task ID %s", ID)).Err()
}
return result, nil
}
// GetTaskOutputs gets the task outputs for the given IDs,
// retrying transient errors.
func (c *Client) GetTaskOutputs(ctx context.Context, IDs []string) ([]*swarming_api.SwarmingRpcsTaskOutput, error) {
results := make([]*swarming_api.SwarmingRpcsTaskOutput, len(IDs))
for i, ID := range IDs {
var result *swarming_api.SwarmingRpcsTaskOutput
getResult := func() error {
var err error
result, err = c.SwarmingService.Task.Stdout(ID).Context(ctx).Do()
return err
}
if err := callWithRetries(ctx, "get result", getResult); err != nil {
return nil, errors.Annotate(err, fmt.Sprintf("get swarming stdout for task %s", ID)).Err()
}
results[i] = result
}
return results, nil
}
// BotExists checks if an bot exists with the given dimensions.
func (c *Client) BotExists(ctx context.Context, dims []*swarming_api.SwarmingRpcsStringPair) (bool, error) {
var resp *swarming_api.SwarmingRpcsBotList
err := callWithRetries(ctx, "check bot exists", func() error {
call := c.SwarmingService.Bots.List().Dimensions(flattenStringPairs(dims)...).IsDead("FALSE").Limit(1)
r, err := call.Context(ctx).Do()
if err != nil {
return errors.Annotate(err, "bot exists").Err()
}
if r == nil {
return errors.Reason("bot exists: nil RPC response").Err()
}
// Assign to captured variable only on success.
resp = r
return nil
})
if err != nil {
return false, err
}
return len(resp.Items) > 0, nil
}
// GetBotIDs returns slice of bot IDs by given dimensions.
func (c *Client) GetBotIDs(ctx context.Context, dims []*swarming_api.SwarmingRpcsStringPair) ([]string, error) {
bots, err := c.GetBots(ctx, dims)
if err != nil {
return nil, err
}
var ids []string
for _, bot := range bots {
id, err := LookupDimension(bot.Dimensions, "id")
if err != nil {
return nil, errors.Annotate(err, "error with bot id").Err()
}
ids = append(ids, id)
}
return ids, nil
}
// getSwarmingRpcsBotList -- get a SwarmingRpcsBotList, retrying as appropriate for swarming
func getSwarmingRpcsBotList(ctx context.Context, c *Client, call *swarming_api.BotsListCall) (*swarming_api.SwarmingRpcsBotList, error) {
var tl *swarming_api.SwarmingRpcsBotList
f := func() error {
var err error
tl, err = call.Context(ctx).Do()
return err
}
err := callWithRetries(ctx, "get bot list", f)
if err != nil {
return nil, err
}
return tl, nil
}
// GetBots returns a slice of bots
func (c *Client) GetBots(ctx context.Context, dims []*swarming_api.SwarmingRpcsStringPair) ([]*swarming_api.SwarmingRpcsBotInfo, error) {
var out []*swarming_api.SwarmingRpcsBotInfo
call := c.SwarmingService.Bots.List().Dimensions(flattenStringPairs(dims)...)
for {
tl, err := getSwarmingRpcsBotList(ctx, c, call)
if err != nil {
return nil, err
}
call = call.Cursor(tl.Cursor)
for _, item := range tl.Items {
out = append(out, item)
}
if tl.Cursor == "" {
return out, nil
}
}
}
// GetListedBots returns information about the DUTs managed by bots satisfying particular dimensions.
func (c *Client) GetListedBots(ctx context.Context, dims []*swarming_api.SwarmingRpcsStringPair) ([]*ListedHost, error) {
var out []*ListedHost
bots, err := c.GetBots(ctx, dims)
if err != nil {
return nil, err
}
for _, bot := range bots {
var err error
newEntry := &ListedHost{}
newEntry.Hostname, err = LookupDimension(bot.Dimensions, "dut_name")
if err != nil {
continue
}
out = append(out, newEntry)
}
return out, nil
}
// GetFlatBotDimensionsForTask takes a task id and returns the dimensions of the bot that is currently running the task.
// The output map has exactly one value per dimension. Dimensions where a key k maps to multiple values v1, v2, v3 ... correspond to
// the pair (k, v1) in the output map.
// Keys that are present but have no values associated with them get the magical sentinel value "".
// If the bot does not exist, then an error is returned instead.
func (c *Client) GetFlatBotDimensionsForTask(ctx context.Context, taskID string) (map[string]string, error) {
out := make(map[string]string)
call := c.SwarmingService.Task.Result(taskID)
results, err := call.Context(ctx).Do()
if err != nil {
return nil, errors.Annotate(err, "get task for taskID").Err()
}
for _, dim := range results.BotDimensions {
if len(dim.Value) > 0 {
out[dim.Key] = dim.Value[0]
}
}
return out, nil
}
// DutNameToBotID gets the bot id associated with a particular dut by its hostname.
func (c *Client) DutNameToBotID(ctx context.Context, host string) (string, error) {
dims := []*swarming_api.SwarmingRpcsStringPair{
{Key: "pool", Value: SkylabPool},
{Key: "dut_name", Value: host},
}
ids, err := c.GetBotIDs(ctx, dims)
switch {
case err != nil:
return "", errors.Annotate(err, "failed to find bot").Err()
case len(ids) == 0:
return "", errors.Reason("did not find any bot with dut_name %q", host).Err()
case len(ids) > 1:
return "", errors.Reason("more than one bot with dut_name %q", host).Err()
}
return ids[0], nil
}
// LookupDimension gets a single string value associated with a dimension
func LookupDimension(dims []*swarming_api.SwarmingRpcsStringListPair, key string) (string, error) {
for _, pair := range dims {
if pair.Key == key {
if len(pair.Value) == 0 {
return "", fmt.Errorf("found key, 0 values")
}
if len(pair.Value) > 1 {
return "", fmt.Errorf("found key, (%d) values", len(pair.Value))
}
return pair.Value[0], nil
}
}
// TODO(gregorynisbet): truncate key if it's too long
return "", fmt.Errorf("no corresponding value for key (%s)", key)
}
func flattenStringPairs(pairs []*swarming_api.SwarmingRpcsStringPair) []string {
ss := make([]string, len(pairs))
for i, p := range pairs {
ss[i] = fmt.Sprintf("%s:%s", p.Key, p.Value)
}
return ss
}
// dimsToTags takes swarming dimensions specified as a map
// and returns them as a list of Key:Value pairs.
func dimsToTags(m map[string]string) []string {
var out []string
for k, v := range m {
out = append(out, fmt.Sprintf("%s:%s", k, v))
}
return out
}
// GetTaskURL gets a URL for the task with the given ID.
func (c *Client) GetTaskURL(taskID string) string {
return TaskURL(c.server, taskID)
}
var retryableCodes = map[int]bool{
http.StatusInternalServerError: true, // 500
http.StatusBadGateway: true, // 502
http.StatusServiceUnavailable: true, // 503
http.StatusGatewayTimeout: true, // 504
http.StatusInsufficientStorage: true, // 507
}
func retryParams() retry.Iterator {
// crbug.com/1061200: Swarming's response on internal error indicates that
// we should retry in 30 seconds. We try after (15 + 22.5 + 33.75) seconds
// to balance responsiveness against additional load due to retries.
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 15 * time.Second,
Retries: 3,
},
Multiplier: 1.5,
}
}
func tagErrIfTransient(f func() error) func() error {
return func() error {
err := f()
tag := false
errors.Walk(
err,
func(ierr error) bool {
if errIsTransient(ierr) {
tag = true
return false
}
return true
},
)
if tag {
return transient.Tag.Apply(err)
}
return err
}
}
func errIsTransient(err error) bool {
if err == nil {
return false
}
if e, ok := err.(net.Error); ok && e.Temporary() {
return true
}
if e, ok := err.(*googleapi.Error); ok && retryableCodes[e.Code] {
return true
}
if strings.Contains(err.Error(), "connection reset by peer") {
return true
}
if strings.Contains(err.Error(), "unexpected EOF") {
return true
}
return false
}
// callWithRetries calls the given function, retrying transient swarming
// errors, with swarming-appropriate backoff and delay.
func callWithRetries(ctx context.Context, opname string, f func() error) error {
return retry.Retry(ctx, transient.Only(retryParams), tagErrIfTransient(f), retry.LogCallback(ctx, opname))
}
// TaskURL returns a URL to inspect a task with the given ID.
func TaskURL(swarmingService string, taskID string) string {
return fmt.Sprintf("%stask?id=%s", swarmingService, taskID)
}
// TaskListURLForTags returns a tasklist URL filtered by the given tags.
func TaskListURLForTags(swarmingService string, tags []string) string {
h := parseSwarmingHost(swarmingService)
u := url.URL{
Scheme: "https",
Host: h,
Path: "tasklist",
}
q := u.Query()
for _, t := range tags {
q.Add("f", t)
}
u.RawQuery = q.Encode()
return u.String()
}
func parseSwarmingHost(s string) string {
u, err := url.Parse(s)
// Not a valid URL, return back the input string.
if err != nil || u.Scheme == "" {
return s
}
return u.Host
}
// formatDims converts dimensions stored in a map into a human-readable format.
func formatDims(m map[string]string) string {
var out []string
for k, v := range m {
out = append(out, fmt.Sprintf("%s:%s", k, v))
}
return strings.Join(out, ", ")
}