blob: 9d0dcda9dda62359748f608595fc5cee819ac7ff [file] [log] [blame]
// Copyright 2023 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rbe
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/swarming/internal/remoteworkers"
internalspb "go.chromium.org/luci/swarming/proto/internals"
"go.chromium.org/luci/swarming/server/model"
)
// ReservationServer is responsible for creating and canceling RBE reservations.
type ReservationServer struct {
rbe remoteworkers.ReservationsClient
internals internalspb.InternalsClient
serverVersion string
}
// NewReservationServer creates a new reservation server given an RBE client
// connection.
func NewReservationServer(ctx context.Context, cc grpc.ClientConnInterface, internals internalspb.InternalsClient, serverVersion string) *ReservationServer {
return &ReservationServer{
rbe: remoteworkers.NewReservationsClient(cc),
internals: internals,
serverVersion: serverVersion,
}
}
// RegisterTQTasks registers task queue handlers.
//
// Tasks are actually submitted from the Python side.
func (s *ReservationServer) RegisterTQTasks(disp *tq.Dispatcher) {
disp.RegisterTaskClass(tq.TaskClass{
ID: "rbe-enqueue",
Prototype: &internalspb.EnqueueRBETask{},
Kind: tq.Transactional,
Queue: "rbe-enqueue",
Handler: func(ctx context.Context, payload proto.Message) error {
return s.handleEnqueueRBETask(ctx, payload.(*internalspb.EnqueueRBETask))
},
})
disp.RegisterTaskClass(tq.TaskClass{
ID: "rbe-cancel",
Prototype: &internalspb.CancelRBETask{},
Kind: tq.Transactional,
Queue: "rbe-cancel",
Handler: func(ctx context.Context, payload proto.Message) error {
return s.handleCancelRBETask(ctx, payload.(*internalspb.CancelRBETask))
},
})
}
// handleEnqueueRBETask is responsible for creating a reservation.
func (s *ReservationServer) handleEnqueueRBETask(ctx context.Context, task *internalspb.EnqueueRBETask) (err error) {
// On fatal errors need to mark the TaskToRun in Swarming DB as failed. On
// transient errors just let the TQ to call us again for a retry. Note that
// CreateReservation will fail with AlreadyExists if we actually managed to
// create the reservation on the previous attempt.
defer func() {
if err != nil && !transient.Tag.In(err) {
// Something is fatally broken, report this to Swarming.
if derr := s.reservationDenied(ctx, task.Payload, err); derr != nil {
// If the report itself failed for whatever reason, ask TQ to retry by
// returning this error as is (i.e. not tagged with Fatal or Ignore).
err = derr
} else {
// Swarming was successfully notified and we need to tell TQ not to
// retry the task by tagging the error. Mark expected fatal errors (like
// FailedPrecondition due to missing bots) with Ignore, and all
// unexpected ones (e.g. PermissionDenied due to misconfiguration) with
// Fatal tag. This will make them show up differently in monitoring and
// logs.
if isExpectedRBEError(err) {
err = tq.Ignore.Apply(err)
} else {
err = tq.Fatal.Apply(err)
}
}
}
}()
// Fetch TaskToRun and verify it is still pending. It may have been canceled
// already. This check is especially important if handleEnqueueRBETask is
// being retried in a loop due to some repeating error. Canceling TaskToRun
// should stop this retry loop.
ttr, err := newTaskToRunFromPayload(ctx, task.Payload)
if err != nil {
return errors.Annotate(err, "bad EnqueueRBETask payload").Err()
}
switch err := datastore.Get(ctx, ttr); {
case err == datastore.ErrNoSuchEntity:
logging.Warningf(ctx, "TaskToRun entity is already gone")
return nil
case err != nil:
return errors.Annotate(err, "failed to fetch TaskToRun").Tag(transient.Tag).Err()
case !ttr.IsReapable():
logging.Warningf(ctx, "TaskToRun is no longer pending")
return nil
}
// This will show up in e.g. Swarming bot logs.
if task.Payload.DebugInfo != nil {
task.Payload.DebugInfo.GoSwarmingVersion = s.serverVersion
}
payload, err := anypb.New(task.Payload)
if err != nil {
return errors.Annotate(err, "failed to serialize the payload").Err()
}
constraints := make([]*remoteworkers.Constraint, len(task.Constraints))
for i, ctr := range task.Constraints {
constraints[i] = &remoteworkers.Constraint{
Key: "label:" + ctr.Key,
AllowedValues: ctr.AllowedValues,
}
}
var overallExpiration *timestamppb.Timestamp
var queuingTimeout *durationpb.Duration
var executionTimeout *durationpb.Duration
if task.ExecutionTimeout == nil {
// TODO(vadimsh): Get rid of this code path when it is no longer being hit.
logging.Warningf(ctx, "No execution timeout set")
overallExpiration = task.Expiry
} else {
// How much time left to sit in the queue.
untilExpired := task.Expiry.AsTime().Sub(clock.Now(ctx))
if untilExpired < 0 {
untilExpired = 0
}
queuingTimeout = durationpb.New(untilExpired)
// How much time there is to run once started.
executionTimeout = task.ExecutionTimeout
// The task must be done by that time.
overallExpiration = timestamppb.New(
task.Expiry.AsTime().Add(task.ExecutionTimeout.AsDuration()))
}
logging.Infof(ctx, "Creating reservation %q", task.Payload.ReservationId)
reservationName := fmt.Sprintf("%s/reservations/%s", task.RbeInstance, task.Payload.ReservationId)
reservation, err := s.rbe.CreateReservation(ctx, &remoteworkers.CreateReservationRequest{
Parent: task.RbeInstance,
Reservation: &remoteworkers.Reservation{
Name: reservationName,
Payload: payload,
ExpireTime: overallExpiration,
QueuingTimeout: queuingTimeout,
ExecutionTimeout: executionTimeout,
Priority: task.Priority,
Constraints: constraints,
RequestedBotId: task.RequestedBotId,
},
})
if status.Code(err) == codes.AlreadyExists {
logging.Warningf(ctx, "Reservation already exists, this is likely a retry: %s", err)
reservation, err = s.rbe.GetReservation(ctx, &remoteworkers.GetReservationRequest{
Name: reservationName,
WaitSeconds: 0,
})
}
// Ask TQ to retry on transient gRPC errors.
err = grpcutil.WrapIfTransientOr(err, codes.DeadlineExceeded)
if err != nil {
return err
}
logging.Infof(ctx, "Reservation: %v", reservation)
return nil
}
// reservationDenied notifies the Swarming Python side that the reservation
// cannot be created due to some fatal error (in particular if there are no
// RBE bots that can execute it).
func (s *ReservationServer) reservationDenied(ctx context.Context, task *internalspb.TaskPayload, reason error) error {
level := logging.Error
if isExpectedRBEError(reason) {
level = logging.Warning
}
logging.Logf(ctx, level, "Failed to submit RBE reservation %q: %s", task.ReservationId, reason)
// Convert RBE reply to a slice expiration reason. Note that there's
// specifically no generic "UNKNOWN" error: all possible RBE errors should
// have known reasons.
var reasonCode internalspb.ExpireSliceRequest_Reason
switch grpcutil.Code(reason) {
case codes.FailedPrecondition:
// There are no bots alive matching the task.
reasonCode = internalspb.ExpireSliceRequest_NO_RESOURCE
case codes.ResourceExhausted:
// QueuingTimeout is 0 and there are no non-busy bots matching the task.
reasonCode = internalspb.ExpireSliceRequest_EXPIRED
case codes.PermissionDenied:
// Likely an RBE instance misconfiguration (i.e. should not happen).
reasonCode = internalspb.ExpireSliceRequest_PERMISSION_DENIED
case codes.InvalidArgument:
// RBE doesn't like format of dimensions (i.e. should not happen).
reasonCode = internalspb.ExpireSliceRequest_INVALID_ARGUMENT
default:
return errors.Reason("unexpected RBE gRPC status code in %s", reason).Err()
}
// Tell Swarming to switch to the next slice, if necessary.
return s.expireSlice(ctx, task, reasonCode, reason.Error())
}
// ExpireSliceBasedOnReservation checks the reservation status by calling
// the Reservations API and invokes ExpireSlice if the reservation is dead.
//
// It is ultimately invoked from a PubSub push handler when handling
// notifications from the RBE scheduler.
//
// `reservationName` is a full reservation name, including the project and
// RBE instance IDs: `projects/.../instances/.../reservations/...`.
func (s *ReservationServer) ExpireSliceBasedOnReservation(ctx context.Context, reservationName string) error {
// Get the up-to-date state of the reservation.
reservation, err := s.rbe.GetReservation(ctx, &remoteworkers.GetReservationRequest{
Name: reservationName,
WaitSeconds: 0,
})
err = grpcutil.WrapIfTransientOr(err, codes.DeadlineExceeded)
if err != nil {
return errors.Annotate(err, "failed to fetch reservation %s", reservationName).Err()
}
// Don't care about pending reservations.
if reservation.State != remoteworkers.ReservationState_RESERVATION_COMPLETED &&
reservation.State != remoteworkers.ReservationState_RESERVATION_CANCELLED {
logging.Warningf(ctx, "Ignoring reservation %s in non-terminal state: %s", reservationName, reservation.State)
return nil
}
// Get the final reservation status. Note that if a bot picked up a lease and
// then failed it, the status is still OK and the error is propagated through
// the `result` field.
//
// Observed non-OK statuses:
// * FAILED_PRECONDITION if there are no matching bots at all.
// * DEADLINE_EXCEEDED if the reservation wasn't finished before its expiry.
// * INTERNAL if the reservation was completed, but with unset result field.
// * CANCELLED if the reservation was canceled before being assigned.
// * ABORTED if the bot picked up the lease and then stopped sending pings.
//
// Note that canceling the reservation with the bot's acknowledgment (i.e.
// while it is already running) results in OK status.
statusErr := status.ErrorProto(reservation.Status)
if statusErr == nil && reservation.State == remoteworkers.ReservationState_RESERVATION_CANCELLED {
statusErr = status.Error(codes.Canceled, "reservation was canceled")
}
// TaskPayload contains exact "coordinates" of the TaskToRun in the datastore.
// It must be present in all leases.
var payload internalspb.TaskPayload
if err := reservation.Payload.UnmarshalTo(&payload); err != nil {
return errors.Annotate(err, "failed to unmarshal reservation %s payload", reservationName).Err()
}
logging.Infof(ctx, "TaskPayload:\n%s", prettyProto(&payload))
// TaskResult contains extra information supplied by the bot when it was
// closing the lease. It is empty if the lease didn't reach the bot at all.
if reservation.Result != nil {
var result internalspb.TaskResult
if err := reservation.Result.UnmarshalTo(&result); err != nil {
return errors.Annotate(err, "failed to unmarshal reservation result").Err()
}
if result.BotInternalError != "" {
if statusErr != nil {
logging.Errorf(ctx, "Overriding with a bot internal error: %s", statusErr)
}
statusErr = status.Errorf(codes.Internal, "%s: %s", reservation.AssignedBotId, result.BotInternalError)
}
}
// Log the final derived status.
if statusErr != nil {
logging.Infof(ctx, "Reservation is %s by %q: %s", reservation.State, reservation.AssignedBotId, statusErr)
} else {
logging.Infof(ctx, "Reservation is %s by %q", reservation.State, reservation.AssignedBotId)
}
// Ignore noop reservations: they are not associated with Swarming slices and
// used during manual testing only.
if payload.Noop {
return nil
}
// See if the TaskToRun is still pending. If it was already assigned or
// canceled per Swarming datastore state, there's nothing to do.
ttr, err := newTaskToRunFromPayload(ctx, &payload)
if err != nil {
return errors.Annotate(err, "bad TaskPayload").Err()
}
switch err := datastore.Get(ctx, ttr); {
case err == datastore.ErrNoSuchEntity:
logging.Warningf(ctx, "TaskToRun entity is already gone")
return nil
case err != nil:
return errors.Annotate(err, "failed to fetch TaskToRun").Tag(transient.Tag).Err()
case !ttr.IsReapable():
return nil
}
// We already checked the TaskToRun slice is still pending, which means the
// bot hasn't called `/bot/claim` yet and haven't started working on a task
// yet (it doesn't know what task to work on). But if `statusErr` is nil, the
// reservation is already finished (and successfully at that). This should not
// be possible and indicates some buggy or non-compliant bot.
if statusErr == nil {
logging.Errorf(ctx, "Unexpected completion notification")
statusErr = status.Errorf(codes.Internal, "the reservation is unexpectedly finished by %q", reservation.AssignedBotId)
}
// Convert an RBE error condition to a slice expiration status.
var reasonCode internalspb.ExpireSliceRequest_Reason
switch status.Code(statusErr) {
case codes.FailedPrecondition:
reasonCode = internalspb.ExpireSliceRequest_NO_RESOURCE
case codes.DeadlineExceeded:
reasonCode = internalspb.ExpireSliceRequest_EXPIRED
case codes.Internal, codes.Aborted:
reasonCode = internalspb.ExpireSliceRequest_BOT_INTERNAL_ERROR
default:
// Note that this branch includes codes.Canceled which happens when
// a reservation is canceled before it is assigned to a bot. Currently the
// cancellation is always initiated through Swarming and Swarming already
// marks the corresponding TaskToRun as consumed before canceling the
// reservation: we should never end up here due to ttr.IsReapable() check
// above.
logging.Errorf(ctx, "Ignoring unexpected reservation status: %s", statusErr)
return nil
}
// Tell Swarming to switch to the next slice, if necessary
logging.Warningf(ctx, "Expiring slice with %s: %s", reasonCode, statusErr)
if err := s.expireSlice(ctx, &payload, reasonCode, statusErr.Error()); err != nil {
return errors.Annotate(err, "failed to expire the slice").Tag(transient.Tag).Err()
}
return nil
}
// expireSlice calls Swarming Python's ExpireSlice RPC.
func (s *ReservationServer) expireSlice(ctx context.Context, task *internalspb.TaskPayload, code internalspb.ExpireSliceRequest_Reason, details string) error {
_, err := s.internals.ExpireSlice(ctx, &internalspb.ExpireSliceRequest{
TaskId: task.TaskId,
TaskToRunShard: task.TaskToRunShard,
TaskToRunId: task.TaskToRunId,
Reason: code,
Details: details,
})
return err
}
// handleCancelRBETask cancels a reservation.
func (s *ReservationServer) handleCancelRBETask(ctx context.Context, task *internalspb.CancelRBETask) error {
logging.Infof(ctx, "Cancelling reservation %q", task.ReservationId)
resp, err := s.rbe.CancelReservation(ctx, &remoteworkers.CancelReservationRequest{
Name: fmt.Sprintf("%s/reservations/%s", task.RbeInstance, task.ReservationId),
Intent: remoteworkers.CancelReservationIntent_ANY,
})
switch status.Code(err) {
case codes.OK:
logging.Infof(ctx, "Cancel result: %s", resp.Result)
return nil
case codes.NotFound:
logging.Warningf(ctx, "No such reservation, nothing to cancel")
return tq.Ignore.Apply(err)
default:
return grpcutil.WrapIfTransientOr(err, codes.DeadlineExceeded)
}
}
// newTaskToRunFromPayload returns an empty TaskToRun struct with populated
// entity key using information from the TaskPayload proto.
func newTaskToRunFromPayload(ctx context.Context, p *internalspb.TaskPayload) (*model.TaskToRun, error) {
taskReqKey, err := model.TaskIDToRequestKey(ctx, p.TaskId)
if err != nil {
return nil, err
}
return &model.TaskToRun{
Key: model.TaskToRunKey(ctx, taskReqKey, p.TaskToRunShard, p.TaskToRunId),
}, nil
}
// isExpectedRBEError returns true for errors that are expected to happen during
// normal code flow.
func isExpectedRBEError(err error) bool {
code := grpcutil.Code(err)
return code == codes.FailedPrecondition || code == codes.ResourceExhausted
}
// prettyProto formats a proto message for logs.
func prettyProto(msg proto.Message) string {
blob, err := prototext.MarshalOptions{
Multiline: true,
Indent: " ",
}.Marshal(msg)
if err != nil {
return fmt.Sprintf("<error: %s>", err)
}
return string(blob)
}