blob: 68d69768d332ff21a259c56f95ae99ffc8d51c10 [file] [log] [blame]
// Copyright 2019 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Package agent implements an agent which talks to a drone queen
// service and manages Swarming bots.
package agent
import (
"context"
"fmt"
"io/ioutil"
"log"
"math"
"os"
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"go.chromium.org/luci/common/errors"
"infra/appengine/drone-queen/api"
"infra/cmd/drone-agent/internal/agent/state"
"infra/cmd/drone-agent/internal/bot"
"infra/cmd/drone-agent/internal/draining"
)
// Agent talks to a drone queen service and manages Swarming bots.
// This struct stores the static configuration for the agent. The
// dynamic state is stored in state.State.
type Agent struct {
Client api.DroneClient
// SwarmingURL is the URL of the Swarming instance. Should be
// a full URL without the path, e.g. https://host.example.com
SwarmingURL string
// WorkingDir is used for Swarming bot working dirs. It is
// the caller's responsibility to create this.
WorkingDir string
ReportingInterval time.Duration
DUTCapacity int
// StartBotFunc is used to start Swarming bots.
// This must be set.
StartBotFunc func(bot.Config) (bot.Bot, error)
// logger is used for Agent logging. If nil, use the log package.
logger logger
// wrapStateFunc is called to wrap the agent state. This is
// used for instrumenting the state for testing. If nil, this
// is a no-op.
wrapStateFunc func(*state.State) stateInterface
// hive value of the drone agent. This is used for DUT/drone affinity.
// A drone is assigned DUTs with same hive value.
Hive string
}
// logger defines the logging interface used by Agent.
type logger interface {
Printf(string, ...interface{})
}
// stateInterface is the state interface used by the agent. The usual
// implementation of the interface is in the state package.
type stateInterface interface {
UUID() string
WithExpire(ctx context.Context, t time.Time) context.Context
SetExpiration(t time.Time)
AddDUT(dutID string)
DrainDUT(dutID string)
TerminateDUT(dutID string)
DrainAll()
TerminateAll()
Wait()
BlockDUTs()
ActiveDUTs() []string
}
// Run runs the agent until it is canceled via the context.
func (a *Agent) Run(ctx context.Context) {
a.log("Agent starting")
for {
if draining.IsDraining(ctx) || ctx.Err() != nil {
a.log("Agent exited")
return
}
if err := a.runOnce(ctx); err != nil {
a.log("Lost drone assignment: %v", err)
}
}
}
// runOnce runs one instance of registering and maintaining a drone
// assignment with the queen.
//
// If the context is canceled, this function terminates quickly and
// gracefully (e.g., like handling a SIGTERM or an abort). If the
// context is drained, this function terminates slowly and gracefully.
// In either case, this function returns nil.
//
// If the assignment is lost or expired for whatever reason, this
// function returns an error.
func (a *Agent) runOnce(ctx context.Context) error {
a.log("Registering with queen")
res, err := a.Client.ReportDrone(ctx, a.reportRequest(ctx, ""))
if err != nil {
return errors.Annotate(err, "register with queen").Err()
}
if s := res.GetStatus(); s != api.ReportDroneResponse_OK {
return errors.Reason("register with queen: got unexpected status %v", s).Err()
}
// Set up state.
uuid := res.GetDroneUuid()
if uuid == "" {
return errors.Reason("register with queen: got empty UUID").Err()
}
s := a.wrapState(state.New(uuid, hook{a: a, uuid: uuid}))
// Set up expiration context.
t, err := ptypes.Timestamp(res.GetExpirationTime())
if err != nil {
return errors.Annotate(err, "register with queen: read expiration").Err()
}
ctx = s.WithExpire(ctx, t)
// Do normal report update.
if err := applyUpdateToState(res, s); err != nil {
return errors.Annotate(err, "register with queen").Err()
}
return a.reportLoop(ctx, s)
}
// reportLoop implements the core reporting loop of the agent.
// See also runOnce.
func (a *Agent) reportLoop(ctx context.Context, s stateInterface) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
defer wg.Wait()
readyToExit := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
s.BlockDUTs()
s.TerminateAll()
case <-readyToExit:
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-draining.C(ctx):
s.BlockDUTs()
s.DrainAll()
case <-readyToExit:
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-draining.C(ctx):
case <-ctx.Done():
}
s.BlockDUTs()
s.Wait()
close(readyToExit)
}()
for {
select {
case <-time.After(a.ReportingInterval):
case <-readyToExit:
return nil
}
a.log("Reporting to queen")
if err := a.reportDrone(ctx, s); err != nil {
if _, ok := err.(fatalError); ok {
a.log("Terminating due to fatal error: %s", err)
cancel()
return err
}
a.log("Error reporting to queen: %s", err)
}
}
}
// reportDrone does one cycle of calling the ReportDrone queen RPC and
// handling the response.
func (a *Agent) reportDrone(ctx context.Context, s stateInterface) error {
res, err := a.Client.ReportDrone(ctx, a.reportRequest(ctx, s.UUID()))
if err != nil {
return errors.Annotate(err, "report to queen").Err()
}
switch rs := res.GetStatus(); rs {
case api.ReportDroneResponse_OK:
case api.ReportDroneResponse_UNKNOWN_UUID:
s.TerminateAll()
return fatalError{reason: "queen returned UNKNOWN_UUID"}
default:
return errors.Reason("report to queen: got unexpected status %v", rs).Err()
}
if err := applyUpdateToState(res, s); err != nil {
return errors.Annotate(err, "report to queen").Err()
}
return nil
}
// applyUpdateToState applies the response from a ReportDrone call to the agent state.
func applyUpdateToState(res *api.ReportDroneResponse, s stateInterface) error {
t, err := ptypes.Timestamp(res.GetExpirationTime())
if err != nil {
return errors.Annotate(err, "apply update to state").Err()
}
s.SetExpiration(t)
draining := make(map[string]bool)
for _, d := range res.GetDrainingDuts() {
s.DrainDUT(d)
draining[d] = true
}
assigned := make(map[string]bool)
for _, d := range res.GetAssignedDuts() {
assigned[d] = true
if !draining[d] {
s.AddDUT(d)
}
}
for _, d := range s.ActiveDUTs() {
if !assigned[d] {
s.TerminateDUT(d)
}
}
return nil
}
// reportRequest returns the api.ReportDroneRequest to use when
// reporting to the drone queen.
func (a *Agent) reportRequest(ctx context.Context, uuid string) *api.ReportDroneRequest {
hostname, err := os.Hostname()
if err != nil {
a.log("Error getting drone hostname: %s", err)
}
req := api.ReportDroneRequest{
DroneUuid: uuid,
LoadIndicators: &api.ReportDroneRequest_LoadIndicators{
DutCapacity: intToUint32(a.DUTCapacity),
},
DroneDescription: hostname,
Hive: a.Hive,
}
if shouldRefuseNewDUTs(ctx) {
req.LoadIndicators.DutCapacity = 0
}
return &req
}
// shouldRefuseNewDUTs returns true if we should refuse new DUTs.
func shouldRefuseNewDUTs(ctx context.Context) bool {
return draining.IsDraining(ctx) || ctx.Err() != nil
}
func (a *Agent) log(format string, args ...interface{}) {
if v := a.logger; v != nil {
v.Printf(format, args...)
} else {
log.Printf(format, args...)
}
}
func (a *Agent) wrapState(s *state.State) stateInterface {
if a.wrapStateFunc == nil {
return s
}
return a.wrapStateFunc(s)
}
// hook implements state.ControllerHook.
type hook struct {
a *Agent
uuid string
}
// StartBot implements state.ControllerHook.
func (h hook) StartBot(dutID string) (bot.Bot, error) {
dir, err := ioutil.TempDir(h.a.WorkingDir, dutID+".")
if err != nil {
return nil, errors.Annotate(err, "start bot %v", dutID).Err()
}
b, err := h.a.StartBotFunc(h.botConfig(dutID, dir))
if err != nil {
_ = os.RemoveAll(dir)
return nil, errors.Annotate(err, "start bot %v", dutID).Err()
}
return b, nil
}
// botConfig returns a bot config for starting a Swarming bot.
func (h hook) botConfig(dutID string, workDir string) bot.Config {
const botIDPrefix = "crossk-"
botID := botIDPrefix + dutID
return bot.Config{
SwarmingURL: h.a.SwarmingURL,
BotID: botID,
WorkDirectory: workDir,
}
}
// ReleaseDUT implements state.ControllerHook.
func (h hook) ReleaseDUT(dutID string) {
const releaseDUTsTimeout = time.Minute
ctx := context.Background()
ctx, f := context.WithTimeout(ctx, releaseDUTsTimeout)
defer f()
req := api.ReleaseDutsRequest{
DroneUuid: h.uuid,
Duts: []string{dutID},
}
// Releasing DUTs is best-effort. Ignore any errors since
// there's no way to handle them.
//
// TODO(ayatane): Log or track errors?
_, _ = h.a.Client.ReleaseDuts(ctx, &req)
}
// fatalError indicates that the agent should terminate its current
// UUID assignment session and re-register with the queen.
type fatalError struct {
reason string
}
func (e fatalError) Error() string {
return fmt.Sprintf("agent fatal error: %s", e.reason)
}
// intToUint32 converts an int to a uint32.
// If the value is negative, return 0.
// If the value overflows, return the max value.
func intToUint32(a int) uint32 {
if a < 0 {
return 0
}
if a > math.MaxUint32 {
return math.MaxUint32
}
return uint32(a)
}