blob: 46885ed71008755e1781619c220afd2cc614a01f [file] [log] [blame]
// Copyright 2023 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rbe
import (
"context"
"fmt"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/gae/impl/memory"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/swarming/internal/remoteworkers"
internalspb "go.chromium.org/luci/swarming/proto/internals"
"go.chromium.org/luci/swarming/server/model"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestReservationServer(t *testing.T) {
t.Parallel()
Convey("With mocks", t, func() {
const rbeInstance = "projects/x/instances/y"
const rbeReservation = "reservation-id"
ctx := memory.Use(context.Background())
ctx, _ = testclock.UseTime(ctx, testclock.TestRecentTimeUTC)
rbe := mockedReservationClient{
newState: remoteworkers.ReservationState_RESERVATION_PENDING,
}
internals := mockedInternalsClient{}
srv := ReservationServer{
rbe: &rbe,
internals: &internals,
serverVersion: "go-version",
}
expirationTimeout := time.Hour
executionTimeout := 10 * time.Minute
expiry := clock.Now(ctx).Add(expirationTimeout).UTC()
enqueueTask := &internalspb.EnqueueRBETask{
Payload: &internalspb.TaskPayload{
ReservationId: rbeReservation,
TaskId: "60b2ed0a43023110",
TaskToRunShard: 14,
TaskToRunId: 1,
DebugInfo: &internalspb.TaskPayload_DebugInfo{
PySwarmingVersion: "py-version",
},
},
RbeInstance: rbeInstance,
Expiry: timestamppb.New(expiry),
ExecutionTimeout: durationpb.New(executionTimeout),
RequestedBotId: "some-bot-id",
Constraints: []*internalspb.EnqueueRBETask_Constraint{
{Key: "key1", AllowedValues: []string{"v1", "v2"}},
{Key: "key2", AllowedValues: []string{"v3"}},
},
Priority: 123,
}
taskReqKey, err := model.UnpackTaskRequestKey(ctx, enqueueTask.Payload.TaskId)
So(err, ShouldBeNil)
taskToRun := &model.TaskToRun{
Key: model.TaskToRunKey(ctx, taskReqKey,
enqueueTask.Payload.TaskToRunShard,
enqueueTask.Payload.TaskToRunId,
),
Expiration: datastore.NewIndexedOptional(expiry),
}
So(datastore.Put(ctx, taskToRun), ShouldBeNil)
Convey("handleEnqueueRBETask ok", func() {
err := srv.handleEnqueueRBETask(ctx, enqueueTask)
So(err, ShouldBeNil)
expectedPayload, _ := anypb.New(&internalspb.TaskPayload{
ReservationId: rbeReservation,
TaskId: "60b2ed0a43023110",
TaskToRunShard: 14,
TaskToRunId: 1,
DebugInfo: &internalspb.TaskPayload_DebugInfo{
PySwarmingVersion: "py-version",
GoSwarmingVersion: "go-version",
},
})
So(rbe.reservation, ShouldResembleProto, &remoteworkers.Reservation{
Name: fmt.Sprintf("%s/reservations/%s", rbeInstance, rbeReservation),
State: remoteworkers.ReservationState_RESERVATION_PENDING,
Payload: expectedPayload,
Constraints: []*remoteworkers.Constraint{
{Key: "label:key1", AllowedValues: []string{"v1", "v2"}},
{Key: "label:key2", AllowedValues: []string{"v3"}},
},
ExpireTime: timestamppb.New(expiry.Add(executionTimeout)),
QueuingTimeout: durationpb.New(expirationTimeout),
ExecutionTimeout: durationpb.New(executionTimeout),
Priority: 123,
RequestedBotId: "some-bot-id",
})
})
Convey("handleEnqueueRBETask TaskToRun is gone", func() {
So(datastore.Delete(ctx, datastore.KeyForObj(ctx, taskToRun)), ShouldBeNil)
err := srv.handleEnqueueRBETask(ctx, enqueueTask)
So(err, ShouldBeNil)
// Didn't call RBE.
So(rbe.reservation, ShouldBeNil)
})
Convey("handleEnqueueRBETask TaskToRun is claimed", func() {
taskToRun.Expiration.Unset()
So(datastore.Put(ctx, taskToRun), ShouldBeNil)
err := srv.handleEnqueueRBETask(ctx, enqueueTask)
So(err, ShouldBeNil)
// Didn't call RBE.
So(rbe.reservation, ShouldBeNil)
})
Convey("handleEnqueueRBETask transient err", func() {
rbe.errCreate = status.Errorf(codes.Internal, "boom")
err := srv.handleEnqueueRBETask(ctx, enqueueTask)
So(err, ShouldNotBeNil)
So(transient.Tag.In(err), ShouldBeTrue)
})
Convey("handleEnqueueRBETask already exists", func() {
rbe.errCreate = status.Errorf(codes.AlreadyExists, "boom")
err := srv.handleEnqueueRBETask(ctx, enqueueTask)
So(err, ShouldBeNil)
})
Convey("handleEnqueueRBETask fatal error", func() {
Convey("expected error, report ok", func() {
rbe.errCreate = status.Errorf(codes.FailedPrecondition, "boom")
internals.expireSlice = func(req *internalspb.ExpireSliceRequest) error {
So(req, ShouldResembleProto, &internalspb.ExpireSliceRequest{
TaskId: enqueueTask.Payload.TaskId,
TaskToRunShard: enqueueTask.Payload.TaskToRunShard,
TaskToRunId: enqueueTask.Payload.TaskToRunId,
Reason: internalspb.ExpireSliceRequest_NO_RESOURCE,
Details: "rpc error: code = FailedPrecondition desc = boom",
})
return nil
}
err := srv.handleEnqueueRBETask(ctx, enqueueTask)
So(tq.Ignore.In(err), ShouldBeTrue)
})
Convey("unexpected error, report ok", func() {
rbe.errCreate = status.Errorf(codes.PermissionDenied, "boom")
internals.expireSlice = func(req *internalspb.ExpireSliceRequest) error {
So(req, ShouldResembleProto, &internalspb.ExpireSliceRequest{
TaskId: enqueueTask.Payload.TaskId,
TaskToRunShard: enqueueTask.Payload.TaskToRunShard,
TaskToRunId: enqueueTask.Payload.TaskToRunId,
Reason: internalspb.ExpireSliceRequest_PERMISSION_DENIED,
Details: "rpc error: code = PermissionDenied desc = boom",
})
return nil
}
err := srv.handleEnqueueRBETask(ctx, enqueueTask)
So(tq.Fatal.In(err), ShouldBeTrue)
})
Convey("expected, report failed", func() {
rbe.errCreate = status.Errorf(codes.FailedPrecondition, "boom")
internals.expireSlice = func(_ *internalspb.ExpireSliceRequest) error {
return status.Errorf(codes.InvalidArgument, "boom")
}
err := srv.handleEnqueueRBETask(ctx, enqueueTask)
So(err, ShouldNotBeNil)
So(tq.Ignore.In(err), ShouldBeFalse)
So(tq.Fatal.In(err), ShouldBeFalse)
})
})
Convey("handleCancelRBETask ok", func() {
err := srv.handleCancelRBETask(ctx, &internalspb.CancelRBETask{
RbeInstance: rbeInstance,
ReservationId: rbeReservation,
})
So(err, ShouldBeNil)
So(rbe.lastCancel, ShouldResembleProto, &remoteworkers.CancelReservationRequest{
Name: fmt.Sprintf("%s/reservations/%s", rbeInstance, rbeReservation),
Intent: remoteworkers.CancelReservationIntent_ANY,
})
})
Convey("handleCancelRBETask not found", func() {
rbe.errCancel = status.Errorf(codes.NotFound, "boo")
err := srv.handleCancelRBETask(ctx, &internalspb.CancelRBETask{
RbeInstance: rbeInstance,
ReservationId: rbeReservation,
})
So(tq.Ignore.In(err), ShouldBeTrue)
})
Convey("handleCancelRBETask internal", func() {
rbe.errCancel = status.Errorf(codes.Internal, "boo")
err := srv.handleCancelRBETask(ctx, &internalspb.CancelRBETask{
RbeInstance: rbeInstance,
ReservationId: rbeReservation,
})
So(transient.Tag.In(err), ShouldBeTrue)
})
Convey("ExpireSliceBasedOnReservation", func() {
const (
reservationName = "projects/.../instances/.../reservations/..."
taskSliceIndex = 1
taskToRunShard = 5
taskToRunID = 678
taskID = "637f8e221100aa10"
)
var (
expireSliceReason internalspb.ExpireSliceRequest_Reason
expireSliceDetails string
)
internals.expireSlice = func(r *internalspb.ExpireSliceRequest) error {
So(r.TaskId, ShouldEqual, taskID)
So(r.TaskToRunShard, ShouldEqual, taskToRunShard)
So(r.TaskToRunId, ShouldEqual, taskToRunID)
So(r.Reason, ShouldNotEqual, internalspb.ExpireSliceRequest_REASON_UNSPECIFIED)
expireSliceReason = r.Reason
expireSliceDetails = r.Details
return nil
}
prepTaskToRun := func(reapable bool) {
var exp datastore.Optional[time.Time, datastore.Indexed]
if reapable {
exp.Set(testclock.TestRecentTimeUTC.Add(time.Hour))
}
taskReqKey, _ := model.UnpackTaskRequestKey(ctx, taskID)
So(datastore.Put(ctx, &model.TaskToRun{
Key: model.TaskToRunKey(ctx, taskReqKey, taskToRunShard, taskToRunID),
Expiration: exp,
}), ShouldBeNil)
}
prepReapableTaskToRun := func() { prepTaskToRun(true) }
prepClaimedTaskToRun := func() { prepTaskToRun(false) }
expireBasedOnReservation := func(state remoteworkers.ReservationState, statusErr error, result *internalspb.TaskResult) {
rbe.reservation = &remoteworkers.Reservation{
Name: reservationName,
State: state,
Status: status.Convert(statusErr).Proto(),
}
rbe.reservation.Payload, _ = anypb.New(&internalspb.TaskPayload{
ReservationId: "",
TaskId: taskID,
SliceIndex: taskSliceIndex,
TaskToRunShard: taskToRunShard,
TaskToRunId: taskToRunID,
})
if result != nil {
rbe.reservation.Result, _ = anypb.New(result)
}
expireSliceReason = internalspb.ExpireSliceRequest_REASON_UNSPECIFIED
expireSliceDetails = ""
So(srv.ExpireSliceBasedOnReservation(ctx, reservationName), ShouldBeNil)
}
expectNoExpireSlice := func() {
So(expireSliceReason, ShouldEqual, internalspb.ExpireSliceRequest_REASON_UNSPECIFIED)
}
expectExpireSlice := func(r internalspb.ExpireSliceRequest_Reason, details string) {
So(expireSliceReason, ShouldEqual, r)
So(expireSliceDetails, ShouldContainSubstring, details)
}
Convey("Still pending", func() {
prepReapableTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_PENDING,
nil,
nil,
)
expectNoExpireSlice()
})
Convey("Successful", func() {
prepClaimedTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
nil,
&internalspb.TaskResult{},
)
expectNoExpireSlice()
})
Convey("Canceled #1", func() {
prepClaimedTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
status.Errorf(codes.Canceled, "canceled"),
nil,
)
expectNoExpireSlice()
})
Convey("Canceled #2", func() {
prepClaimedTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_CANCELLED,
nil,
nil,
)
expectNoExpireSlice()
})
Convey("Expired", func() {
prepReapableTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
status.Errorf(codes.DeadlineExceeded, "deadline"),
nil,
)
expectExpireSlice(internalspb.ExpireSliceRequest_EXPIRED, "deadline")
})
Convey("No resources", func() {
prepReapableTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
status.Errorf(codes.FailedPrecondition, "no bots"),
nil,
)
expectExpireSlice(internalspb.ExpireSliceRequest_NO_RESOURCE, "no bots")
})
Convey("Bot internal error", func() {
prepReapableTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
status.Errorf(codes.DeadlineExceeded, "ignored"),
&internalspb.TaskResult{BotInternalError: "boom"},
)
expectExpireSlice(internalspb.ExpireSliceRequest_BOT_INTERNAL_ERROR, "boom")
})
Convey("Aborted before claimed", func() {
prepReapableTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
status.Errorf(codes.Aborted, "bot died"),
nil,
)
expectExpireSlice(internalspb.ExpireSliceRequest_BOT_INTERNAL_ERROR, "bot died")
})
Convey("Unexpectedly successful reservations", func() {
prepReapableTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
nil,
nil,
)
expectExpireSlice(internalspb.ExpireSliceRequest_BOT_INTERNAL_ERROR, "unexpectedly finished")
})
Convey("Unexpectedly canceled reservations", func() {
prepReapableTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
status.Errorf(codes.Canceled, "ignored"),
nil,
)
expectNoExpireSlice()
})
Convey("Skips already claimed TaskToRun", func() {
prepClaimedTaskToRun()
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
status.Errorf(codes.FailedPrecondition, "no bots"),
nil,
)
expectNoExpireSlice()
})
Convey("Skips missing TaskToRun", func() {
expireBasedOnReservation(
remoteworkers.ReservationState_RESERVATION_COMPLETED,
status.Errorf(codes.FailedPrecondition, "no bots"),
nil,
)
expectNoExpireSlice()
})
})
})
}
type mockedReservationClient struct {
lastCreate *remoteworkers.CreateReservationRequest
lastGet *remoteworkers.GetReservationRequest
lastCancel *remoteworkers.CancelReservationRequest
errCreate error
errGet error
errCancel error
newState remoteworkers.ReservationState
reservation *remoteworkers.Reservation
}
func (m *mockedReservationClient) CreateReservation(ctx context.Context, in *remoteworkers.CreateReservationRequest, opts ...grpc.CallOption) (*remoteworkers.Reservation, error) {
m.lastCreate = in
m.reservation = proto.Clone(in.Reservation).(*remoteworkers.Reservation)
m.reservation.State = m.newState
if m.errCreate != nil {
return nil, m.errCreate
}
return m.reservation, nil
}
func (m *mockedReservationClient) GetReservation(ctx context.Context, in *remoteworkers.GetReservationRequest, opts ...grpc.CallOption) (*remoteworkers.Reservation, error) {
m.lastGet = in
if m.errGet != nil {
return nil, m.errGet
}
return m.reservation, nil
}
func (m *mockedReservationClient) CancelReservation(ctx context.Context, in *remoteworkers.CancelReservationRequest, opts ...grpc.CallOption) (*remoteworkers.CancelReservationResponse, error) {
m.lastCancel = in
if m.errCancel != nil {
return nil, m.errCancel
}
return &remoteworkers.CancelReservationResponse{}, nil
}
type mockedInternalsClient struct {
expireSlice func(*internalspb.ExpireSliceRequest) error
}
func (m *mockedInternalsClient) ExpireSlice(ctx context.Context, in *internalspb.ExpireSliceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
if m.expireSlice == nil {
panic("must not be called")
}
if err := m.expireSlice(in); err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}