blob: 181b54b5f21e11abab5c82e5d45995e11fe2fd06 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lib
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"sort"
"strings"
"sync"
"time"
rbeclient "github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/filemetadata"
"github.com/maruel/subcommands"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
"go.chromium.org/luci/client/downloader"
"go.chromium.org/luci/client/internal/common"
"go.chromium.org/luci/common/api/swarming/swarming/v1"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/isolated"
"go.chromium.org/luci/common/isolatedclient"
"go.chromium.org/luci/common/lhttp"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/system/signals"
)
const (
// Define environment variables used in Swarming client.
// ServerEnvVar is Swarming server host to which a client connect.
// Example: "chromium-swarm.appspot.com"
ServerEnvVar = "SWARMING_SERVER"
// TaskIDEnvVar is Swarming task ID in which this task is running.
// The `swarming` command line tool uses this to populate `ParentTaskId`
// when being used to trigger new tasks from within a swarming task.
TaskIDEnvVar = "SWARMING_TASK_ID"
// UserEnvVar is user name.
// The `swarming` command line tool uses this to populate `User`
// when being used to trigger new tasks.
UserEnvVar = "USER"
)
// TriggerResults is a set of results from using the trigger subcommand,
// describing all of the tasks that were triggered successfully.
type TriggerResults struct {
// Tasks is a list of successfully triggered tasks represented as
// TriggerResult values.
Tasks []*swarming.SwarmingRpcsTaskRequestMetadata `json:"tasks"`
}
// The swarming server has an internal 60-second deadline for responding to
// requests, so 90 seconds shouldn't cause any requests to fail that would
// otherwise succeed.
const swarmingRPCRequestTimeout = 90 * time.Second
const swarmingAPISuffix = "/_ah/api/swarming/v1/"
// swarmingService is an interface intended to stub out the swarming API
// bindings for testing.
type swarmingService interface {
NewTask(ctx context.Context, req *swarming.SwarmingRpcsNewTaskRequest) (*swarming.SwarmingRpcsTaskRequestMetadata, error)
CountTasks(ctx context.Context, start float64, state string, tags ...string) (*swarming.SwarmingRpcsTasksCount, error)
ListTasks(ctx context.Context, limit int64, start float64, state string, tags []string, fields []googleapi.Field) ([]*swarming.SwarmingRpcsTaskResult, error)
CancelTask(ctx context.Context, taskID string, req *swarming.SwarmingRpcsTaskCancelRequest) (*swarming.SwarmingRpcsCancelResponse, error)
TaskRequest(ctx context.Context, taskID string) (*swarming.SwarmingRpcsTaskRequest, error)
TaskResult(ctx context.Context, taskID string, perf bool) (*swarming.SwarmingRpcsTaskResult, error)
TaskOutput(ctx context.Context, taskID string) (*swarming.SwarmingRpcsTaskOutput, error)
FilesFromIsolate(ctx context.Context, outdir string, isolateRef *swarming.SwarmingRpcsFilesRef) ([]string, error)
FilesFromCAS(ctx context.Context, outdir string, cascli *rbeclient.Client, casRef *swarming.SwarmingRpcsCASReference) ([]string, error)
CountBots(ctx context.Context, dimensions ...string) (*swarming.SwarmingRpcsBotsCount, error)
ListBots(ctx context.Context, dimensions []string, fields []googleapi.Field) ([]*swarming.SwarmingRpcsBotInfo, error)
DeleteBot(ctx context.Context, botID string) (*swarming.SwarmingRpcsDeletedResponse, error)
TerminateBot(ctx context.Context, botID string) (*swarming.SwarmingRpcsTerminateResponse, error)
}
type swarmingServiceImpl struct {
client *http.Client
service *swarming.Service
worker int
}
func (s *swarmingServiceImpl) NewTask(ctx context.Context, req *swarming.SwarmingRpcsNewTaskRequest) (res *swarming.SwarmingRpcsTaskRequestMetadata, err error) {
err = retryGoogleRPC(ctx, "NewTask", func() (ierr error) {
res, ierr = s.service.Tasks.New(req).Context(ctx).Do()
return
})
return
}
func (s *swarmingServiceImpl) CountTasks(ctx context.Context, start float64, state string, tags ...string) (res *swarming.SwarmingRpcsTasksCount, err error) {
err = retryGoogleRPC(ctx, "CountTasks", func() (ierr error) {
res, ierr = s.service.Tasks.Count().Context(ctx).Start(start).State(state).Tags(tags...).Do()
return
})
return
}
func (s *swarmingServiceImpl) ListTasks(ctx context.Context, limit int64, start float64, state string, tags []string, fields []googleapi.Field) ([]*swarming.SwarmingRpcsTaskResult, error) {
// Create an empty array so that if serialized to JSON it's an empty list,
// not null.
tasks := []*swarming.SwarmingRpcsTaskResult{}
// If no fields are specified, all fields will be returned. If any fields are
// specified, ensure the cursor is specified so we can get subsequent pages.
if len(fields) > 0 {
fields = append(fields, "cursor")
}
call := s.service.Tasks.List().Context(ctx).Limit(limit).Start(start).State(state).Tags(tags...).Fields(fields...)
// Keep calling as long as there's a cursor indicating more tasks to list.
for {
var res *swarming.SwarmingRpcsTaskList
err := retryGoogleRPC(ctx, "ListTasks", func() (ierr error) {
res, ierr = call.Do()
return
})
if err != nil {
return tasks, err
}
tasks = append(tasks, res.Items...)
if res.Cursor == "" || int64(len(tasks)) >= limit || len(res.Items) == 0 {
break
}
call.Cursor(res.Cursor)
}
if int64(len(tasks)) > limit {
tasks = tasks[0:limit]
}
return tasks, nil
}
func (s *swarmingServiceImpl) CancelTask(ctx context.Context, taskID string, req *swarming.SwarmingRpcsTaskCancelRequest) (res *swarming.SwarmingRpcsCancelResponse, err error) {
err = retryGoogleRPC(ctx, "CancelTask", func() (ierr error) {
res, ierr = s.service.Task.Cancel(taskID, req).Context(ctx).Do()
return ierr
})
return res, err
}
func (s *swarmingServiceImpl) TaskRequest(ctx context.Context, taskID string) (res *swarming.SwarmingRpcsTaskRequest, err error) {
err = retryGoogleRPC(ctx, "TaskRequest", func() (ierr error) {
res, ierr = s.service.Task.Request(taskID).Context(ctx).Do()
return ierr
})
return res, err
}
func (s *swarmingServiceImpl) TaskResult(ctx context.Context, taskID string, perf bool) (res *swarming.SwarmingRpcsTaskResult, err error) {
err = retryGoogleRPC(ctx, "TaskResult", func() (ierr error) {
res, ierr = s.service.Task.Result(taskID).IncludePerformanceStats(perf).Context(ctx).Do()
return
})
return res, err
}
func (s *swarmingServiceImpl) TaskOutput(ctx context.Context, taskID string) (res *swarming.SwarmingRpcsTaskOutput, err error) {
err = retryGoogleRPC(ctx, "TaskOutput", func() (ierr error) {
res, ierr = s.service.Task.Stdout(taskID).Context(ctx).Do()
return ierr
})
return res, err
}
func (s *swarmingServiceImpl) FilesFromIsolate(ctx context.Context, outdir string, isolateRef *swarming.SwarmingRpcsFilesRef) ([]string, error) {
isolatedOpts := []isolatedclient.Option{
isolatedclient.WithAuthClient(s.client),
isolatedclient.WithNamespace(isolateRef.Namespace),
isolatedclient.WithUserAgent(SwarmingUserAgent),
}
isolatedClient := isolatedclient.NewClient(isolateRef.Isolatedserver, isolatedOpts...)
var filesMu sync.Mutex
var files []string
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer signals.HandleInterrupt(cancel)()
opts := &downloader.Options{
MaxConcurrentJobs: s.worker,
FileCallback: func(name string, _ *isolated.File) {
filesMu.Lock()
files = append(files, name)
filesMu.Unlock()
},
FileStatsCallback: func(fileStats downloader.FileStats, _ time.Duration) {
logging.Debugf(ctx, "Downloaded %d of %d bytes in %d of %d files to %s",
fileStats.BytesCompleted, fileStats.BytesScheduled,
fileStats.CountCompleted, fileStats.CountScheduled,
outdir)
},
}
dl := downloader.New(ctx, isolatedClient, isolated.HexDigest(isolateRef.Isolated), outdir, opts)
return files, dl.Wait()
}
// FilesFromCAS downloads outputs from CAS.
func (s *swarmingServiceImpl) FilesFromCAS(ctx context.Context, outdir string, cascli *rbeclient.Client, casRef *swarming.SwarmingRpcsCASReference) ([]string, error) {
d := digest.Digest{
Hash: casRef.Digest.Hash,
Size: casRef.Digest.SizeBytes,
}
outputs, _, err := cascli.DownloadDirectory(ctx, d, outdir, filemetadata.NewNoopCache())
if err != nil {
return nil, errors.Annotate(err, "failed to download directory").Err()
}
files := make([]string, 0, len(outputs))
for path := range outputs {
files = append(files, path)
}
sort.Strings(files)
return files, nil
}
func (s *swarmingServiceImpl) CountBots(ctx context.Context, dimensions ...string) (res *swarming.SwarmingRpcsBotsCount, err error) {
err = retryGoogleRPC(ctx, "CountBots", func() (ierr error) {
res, ierr = s.service.Bots.Count().Context(ctx).Dimensions(dimensions...).Do()
return
})
return
}
func (s *swarmingServiceImpl) ListBots(ctx context.Context, dimensions []string, fields []googleapi.Field) ([]*swarming.SwarmingRpcsBotInfo, error) {
// Create an empty array so that if serialized to JSON it's an empty list,
// not null.
bots := []*swarming.SwarmingRpcsBotInfo{}
// If no fields are specified, all fields will be returned. If any fields are
// specified, ensure the cursor is specified so we can get subsequent pages.
if len(fields) > 0 {
fields = append(fields, "cursor")
}
call := s.service.Bots.List().Context(ctx).Dimensions(dimensions...).Fields(fields...)
// Keep calling as long as there's a cursor indicating more bots to list.
for {
var res *swarming.SwarmingRpcsBotList
err := retryGoogleRPC(ctx, "ListBots", func() (ierr error) {
res, ierr = call.Do()
return
})
if err != nil {
return bots, err
}
bots = append(bots, res.Items...)
if res.Cursor == "" {
break
}
call.Cursor(res.Cursor)
}
return bots, nil
}
func (s *swarmingServiceImpl) DeleteBot(ctx context.Context, botID string) (res *swarming.SwarmingRpcsDeletedResponse, err error) {
err = retryGoogleRPC(ctx, "DeleteBot", func() (ierr error) {
res, ierr = s.service.Bot.Delete(botID).Context(ctx).Do()
return
})
return
}
func (s *swarmingServiceImpl) TerminateBot(ctx context.Context, botID string) (res *swarming.SwarmingRpcsTerminateResponse, err error) {
err = retryGoogleRPC(ctx, "TerminateBot", func() (ierr error) {
res, ierr = s.service.Bot.Terminate(botID).Context(ctx).Do()
return
})
return
}
type taskState int32
const (
maskAlive = 1
stateBotDied taskState = 1 << 1
stateCancelled taskState = 1 << 2
stateCompleted taskState = 1 << 3
stateExpired taskState = 1 << 4
statePending taskState = 1<<5 | maskAlive
stateRunning taskState = 1<<6 | maskAlive
stateTimedOut taskState = 1 << 7
stateNoResource taskState = 1 << 8
stateKilled taskState = 1 << 9
stateUnknown taskState = -1
)
func parseTaskState(state string) (taskState, error) {
switch state {
case "BOT_DIED":
return stateBotDied, nil
case "CANCELED":
return stateCancelled, nil
case "COMPLETED":
return stateCompleted, nil
case "EXPIRED":
return stateExpired, nil
case "PENDING":
return statePending, nil
case "RUNNING":
return stateRunning, nil
case "TIMED_OUT":
return stateTimedOut, nil
case "NO_RESOURCE":
return stateNoResource, nil
case "KILLED":
return stateKilled, nil
default:
return stateUnknown, errors.Reason("unrecognized state: %q", state).Err()
}
}
func (t taskState) Alive() bool {
return (t & maskAlive) != 0
}
func (t taskState) Completed() bool {
return (t & stateCompleted) != 0
}
// AuthFlags is an interface to register auth flags and create http.Client and CAS Client.
type AuthFlags interface {
// Register registers auth flags to the given flag set. e.g. -service-account-json.
Register(f *flag.FlagSet)
// Parse parses auth flags.
Parse() error
// NewHTTPClient creates an authroised http.Client.
NewHTTPClient(ctx context.Context) (*http.Client, error)
// NewRBEClient creates an authroised RBE Client.
NewRBEClient(ctx context.Context, addr string, instance string) (*rbeclient.Client, error)
}
type commonFlags struct {
subcommands.CommandRunBase
defaultFlags common.Flags
authFlags AuthFlags
serverURL string
worker int
}
// Init initializes common flags.
func (c *commonFlags) Init(authFlags AuthFlags) {
c.defaultFlags.Init(&c.Flags)
c.authFlags = authFlags
c.authFlags.Register(&c.Flags)
c.Flags.StringVar(&c.serverURL, "server", os.Getenv(ServerEnvVar), fmt.Sprintf("Server URL; required. Set $%s to set a default.", ServerEnvVar))
c.Flags.StringVar(&c.serverURL, "S", os.Getenv(ServerEnvVar), "Alias for -server.")
c.Flags.IntVar(&c.worker, "worker", 8, "Number of workers used to download isolated files.")
}
// Parse parses the common flags.
func (c *commonFlags) Parse() error {
if err := c.defaultFlags.Parse(); err != nil {
return err
}
if err := c.authFlags.Parse(); err != nil {
return err
}
if c.serverURL == "" {
return errors.Reason("must provide -server").Err()
}
s, err := lhttp.CheckURL(c.serverURL)
if err != nil {
return err
}
c.serverURL = s
return nil
}
func (c *commonFlags) createSwarmingClient(ctx context.Context) (swarmingService, error) {
authcli, err := c.authFlags.NewHTTPClient(ctx)
if err != nil {
return nil, err
}
// Create a copy of the client so that the timeout only applies to Swarming
// RPC requests, not to Isolate requests made by this service. A shallow
// copy is ok because only the timeout needs to be different.
rpcClient := *authcli
rpcClient.Timeout = swarmingRPCRequestTimeout
s, err := swarming.NewService(ctx, option.WithHTTPClient(&rpcClient))
if err != nil {
return nil, err
}
s.BasePath = c.serverURL + swarmingAPISuffix
s.UserAgent = SwarmingUserAgent
return &swarmingServiceImpl{
client: authcli,
service: s,
worker: c.worker}, nil
}
func tagTransientGoogleAPIError(err error) error {
// Responses with HTTP codes < 500, if we got them, indicate fatal errors.
if gerr, _ := err.(*googleapi.Error); gerr != nil && gerr.Code < 500 {
return err
}
// HTTP error already has transient.Tag if it is retryable.
if _, ok := lhttp.IsHTTPError(err); ok {
return err
}
// Everything else (timeouts, DNS issues, etc) is considered
// a transient error.
return transient.Tag.Apply(err)
}
func printError(a subcommands.Application, err error) {
fmt.Fprintf(a.GetErr(), "%s: %s\n%s\n", a.GetName(), err, strings.Join(errors.RenderStack(err), "\n"))
}
// retryGoogleRPC retries an RPC on transient errors, such as HTTP 500.
func retryGoogleRPC(ctx context.Context, rpcName string, rpc func() error) error {
return retry.Retry(ctx, transient.Only(retry.Default), func() error {
err := rpc()
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code >= 500 {
return transient.Tag.Apply(err)
}
if errors.Contains(err, context.DeadlineExceeded) {
return transient.Tag.Apply(err)
}
var temporary bool
errors.Walk(err, func(err error) bool {
if terr, ok := err.(interface{ Temporary() bool }); ok && terr.Temporary() {
temporary = true
return false
}
return true
})
if temporary {
return transient.Tag.Apply(err)
}
if err != nil {
return errors.Annotate(err, "failed to call %s", rpcName).Err()
}
return nil
}, retry.LogCallback(ctx, rpcName))
}