blob: d9ca26579e21f36afe70d2eda63b2b660fbb26ed [file] [log] [blame]
// Copyright 2023 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package swarming
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/data/rand/mathrand"
swarmingv2 "go.chromium.org/luci/swarming/proto/api_v2"
. "github.com/smartystreets/goconvey/convey"
)
func TestGetOne(t *testing.T) {
t.Parallel()
Convey("With context", t, func() {
sleeps := 0
contextTimeout := 2 * time.Hour
ctx := mockContext(&sleeps, &contextTimeout, nil)
const doneTaskID = "done"
const doneLaterID = "later"
const doneNever = "never"
const fatalTaskID = "fatal"
client := clientMock{
taskResultMock: func(ctx context.Context, taskID string, fields *TaskResultFields) (*swarmingv2.TaskResultResponse, error) {
state := swarmingv2.TaskState_PENDING
switch {
case taskID == doneTaskID:
state = swarmingv2.TaskState_COMPLETED
case taskID == doneLaterID && clock.Since(ctx, testclock.TestRecentTimeUTC) > time.Hour:
state = swarmingv2.TaskState_COMPLETED
case taskID == fatalTaskID:
return nil, status.Errorf(codes.PermissionDenied, "boo")
}
return &swarmingv2.TaskResultResponse{
TaskId: taskID,
State: state,
}, nil
},
}
Convey("Already done task: Wait", func() {
res, err := GetOne(ctx, client, doneTaskID, nil, WaitAll)
So(err, ShouldBeNil)
So(res.State, ShouldEqual, swarmingv2.TaskState_COMPLETED)
So(sleeps, ShouldEqual, 0)
})
Convey("Already done task: Poll", func() {
res, err := GetOne(ctx, client, doneTaskID, nil, NoWait)
So(err, ShouldBeNil)
So(res.State, ShouldEqual, swarmingv2.TaskState_COMPLETED)
So(sleeps, ShouldEqual, 0)
})
Convey("Pending task: Wait", func() {
res, err := GetOne(ctx, client, doneLaterID, nil, WaitAll)
So(err, ShouldBeNil)
So(res.State, ShouldEqual, swarmingv2.TaskState_COMPLETED)
So(sleeps, ShouldEqual, 254) // ~= 1h / 15s sans ramp up and random
})
Convey("Pending task: Poll", func() {
res, err := GetOne(ctx, client, doneLaterID, nil, NoWait)
So(err, ShouldBeNil)
So(res.State, ShouldEqual, swarmingv2.TaskState_PENDING)
So(sleeps, ShouldEqual, 0)
})
Convey("Context canceled while waiting", func() {
res, err := GetOne(ctx, client, doneNever, nil, WaitAll)
So(err, ShouldEqual, context.Canceled)
So(res.State, ShouldEqual, swarmingv2.TaskState_PENDING)
So(sleeps, ShouldEqual, 459) // ~= 2h / 15s sans ramp up and random
})
Convey("Fatal RPC failure", func() {
res, err := GetOne(ctx, client, fatalTaskID, nil, WaitAll)
So(status.Code(err), ShouldEqual, codes.PermissionDenied)
So(res, ShouldBeNil)
So(sleeps, ShouldEqual, 0)
})
})
}
func TestGetMany(t *testing.T) {
t.Parallel()
Convey("With context", t, func() {
sleeps := 0
contextTimeout := 2 * time.Hour
onSleep := func() {}
ctx := mockContext(&sleeps, &contextTimeout, func() { onSleep() })
mockedState := map[string]swarmingv2.TaskState{}
mockedErr := map[string]codes.Code{}
mockedRPCErr := codes.OK
client := clientMock{
taskResultsMock: func(ctx context.Context, taskIDs []string, fields *TaskResultFields) ([]ResultOrErr, error) {
if mockedRPCErr != codes.OK {
return nil, status.Errorf(mockedRPCErr, "RPC error")
}
out := make([]ResultOrErr, len(taskIDs))
for i, taskID := range taskIDs {
if code, ok := mockedErr[taskID]; ok {
out[i] = ResultOrErr{Err: status.Errorf(code, "some error")}
} else if state, ok := mockedState[taskID]; ok {
out[i] = ResultOrErr{
Result: &swarmingv2.TaskResultResponse{
TaskId: taskID,
State: state,
},
}
} else {
panic(fmt.Sprintf("unexpected task %q", taskID))
}
}
return out, nil
},
}
type stateOrErr struct {
state swarmingv2.TaskState
err codes.Code
}
call := func(ctx context.Context, taskIDs []string, mode WaitMode) map[string]stateOrErr {
out := map[string]stateOrErr{}
GetMany(ctx, client, taskIDs, nil, mode, func(taskID string, res *swarmingv2.TaskResultResponse, err error) {
if _, ok := out[taskID]; ok {
panic(fmt.Sprintf("task %q reported twice", taskID))
}
s := stateOrErr{}
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
s.err = codes.DeadlineExceeded
} else {
s.err = status.Code(err)
}
}
if res != nil {
s.state = res.State
}
out[taskID] = s
})
So(len(out), ShouldEqual, len(taskIDs))
return out
}
Convey("NoWait: no RPC errors", func() {
mockedState["done"] = swarmingv2.TaskState_COMPLETED
mockedState["pending"] = swarmingv2.TaskState_PENDING
mockedErr["missing"] = codes.NotFound
out := call(ctx, []string{"done", "pending", "missing"}, NoWait)
So(out, ShouldResemble, map[string]stateOrErr{
"done": {state: swarmingv2.TaskState_COMPLETED},
"pending": {state: swarmingv2.TaskState_PENDING},
"missing": {err: codes.NotFound},
})
})
Convey("NoWait: fatal RPC error", func() {
mockedRPCErr = codes.PermissionDenied
out := call(ctx, []string{"t1", "t2"}, NoWait)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {err: codes.PermissionDenied},
"t2": {err: codes.PermissionDenied},
})
})
Convey("NoWait: transient RPC errors", func() {
mockedState["done"] = swarmingv2.TaskState_COMPLETED
mockedState["pending"] = swarmingv2.TaskState_PENDING
mockedRPCErr = codes.Internal
onSleep = func() {
if sleeps > 5 {
mockedRPCErr = codes.OK
}
}
out := call(ctx, []string{"done", "pending"}, NoWait)
So(out, ShouldResemble, map[string]stateOrErr{
"done": {state: swarmingv2.TaskState_COMPLETED},
"pending": {state: swarmingv2.TaskState_PENDING},
})
So(sleeps, ShouldEqual, 6)
})
Convey("WaitAll: no RPC errors", func() {
mockedState["t1"] = swarmingv2.TaskState_COMPLETED
mockedState["t2"] = swarmingv2.TaskState_PENDING
mockedState["t3"] = swarmingv2.TaskState_PENDING
mockedState["t4"] = swarmingv2.TaskState_PENDING
onSleep = func() {
switch sleeps {
case 2:
mockedErr["t2"] = codes.NotFound
case 3:
mockedState["t3"] = swarmingv2.TaskState_COMPLETED
case 4:
mockedState["t4"] = swarmingv2.TaskState_COMPLETED
}
}
out := call(ctx, []string{"t1", "t2", "t3", "t4"}, WaitAll)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_COMPLETED},
"t2": {state: swarmingv2.TaskState_PENDING, err: codes.NotFound},
"t3": {state: swarmingv2.TaskState_COMPLETED},
"t4": {state: swarmingv2.TaskState_COMPLETED},
})
So(sleeps, ShouldEqual, 4)
})
Convey("WaitAll: fatal RPC error", func() {
mockedState["t1"] = swarmingv2.TaskState_COMPLETED
mockedState["t2"] = swarmingv2.TaskState_PENDING
mockedState["t3"] = swarmingv2.TaskState_PENDING
mockedState["t4"] = swarmingv2.TaskState_PENDING
onSleep = func() {
switch sleeps {
case 2:
mockedState["t4"] = swarmingv2.TaskState_COMPLETED
case 3:
mockedRPCErr = codes.PermissionDenied
}
}
out := call(ctx, []string{"t1", "t2", "t3", "t4"}, WaitAll)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_COMPLETED},
"t2": {state: swarmingv2.TaskState_PENDING, err: codes.PermissionDenied},
"t3": {state: swarmingv2.TaskState_PENDING, err: codes.PermissionDenied},
"t4": {state: swarmingv2.TaskState_COMPLETED},
})
So(sleeps, ShouldEqual, 3)
})
Convey("WaitAll: transient RPC errors", func() {
mockedState["t1"] = swarmingv2.TaskState_COMPLETED
mockedState["t2"] = swarmingv2.TaskState_PENDING
mockedState["t3"] = swarmingv2.TaskState_PENDING
mockedState["t4"] = swarmingv2.TaskState_PENDING
onSleep = func() {
switch sleeps {
case 5:
mockedRPCErr = codes.Internal
case 8:
mockedRPCErr = codes.OK
case 10:
mockedErr["t2"] = codes.NotFound
case 11:
mockedState["t3"] = swarmingv2.TaskState_COMPLETED
case 12:
mockedState["t4"] = swarmingv2.TaskState_COMPLETED
}
}
out := call(ctx, []string{"t1", "t2", "t3", "t4"}, WaitAll)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_COMPLETED},
"t2": {state: swarmingv2.TaskState_PENDING, err: codes.NotFound},
"t3": {state: swarmingv2.TaskState_COMPLETED},
"t4": {state: swarmingv2.TaskState_COMPLETED},
})
So(sleeps, ShouldEqual, 12)
})
Convey("WaitAll: context deadline", func() {
mockedState["t1"] = swarmingv2.TaskState_COMPLETED
mockedState["t2"] = swarmingv2.TaskState_PENDING
out := call(ctx, []string{"t1", "t2"}, WaitAll)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_COMPLETED},
"t2": {state: swarmingv2.TaskState_PENDING, err: codes.DeadlineExceeded},
})
So(sleeps, ShouldEqual, 459)
})
Convey("WaitAny: no RPC errors, no waiting", func() {
mockedState["t1"] = swarmingv2.TaskState_COMPLETED
mockedState["t2"] = swarmingv2.TaskState_PENDING
out := call(ctx, []string{"t1", "t2"}, WaitAny)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_COMPLETED},
"t2": {state: swarmingv2.TaskState_PENDING},
})
So(sleeps, ShouldEqual, 0)
})
Convey("WaitAny: no RPC errors, waiting", func() {
mockedState["t1"] = swarmingv2.TaskState_PENDING
mockedState["t2"] = swarmingv2.TaskState_PENDING
mockedState["t3"] = swarmingv2.TaskState_PENDING
mockedState["t4"] = swarmingv2.TaskState_PENDING
onSleep = func() {
switch sleeps {
case 5:
mockedErr["t2"] = codes.NotFound
case 6:
mockedState["t1"] = swarmingv2.TaskState_COMPLETED
mockedState["t4"] = swarmingv2.TaskState_COMPLETED
}
}
out := call(ctx, []string{"t1", "t2", "t3", "t4"}, WaitAny)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_COMPLETED},
"t2": {state: swarmingv2.TaskState_PENDING, err: codes.NotFound},
"t3": {state: swarmingv2.TaskState_PENDING},
"t4": {state: swarmingv2.TaskState_COMPLETED},
})
So(sleeps, ShouldEqual, 6)
})
Convey("WaitAny: fatal RPC error", func() {
mockedState["t1"] = swarmingv2.TaskState_PENDING
mockedState["t2"] = swarmingv2.TaskState_PENDING
mockedState["t3"] = swarmingv2.TaskState_PENDING
mockedState["t4"] = swarmingv2.TaskState_PENDING
onSleep = func() {
switch sleeps {
case 5:
mockedErr["t2"] = codes.NotFound
case 6:
mockedRPCErr = codes.PermissionDenied
}
}
out := call(ctx, []string{"t1", "t2", "t3", "t4"}, WaitAny)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_PENDING, err: codes.PermissionDenied},
"t2": {state: swarmingv2.TaskState_PENDING, err: codes.NotFound},
"t3": {state: swarmingv2.TaskState_PENDING, err: codes.PermissionDenied},
"t4": {state: swarmingv2.TaskState_PENDING, err: codes.PermissionDenied},
})
So(sleeps, ShouldEqual, 6)
})
Convey("WaitAny: transient RPC errors", func() {
mockedState["t1"] = swarmingv2.TaskState_PENDING
mockedState["t2"] = swarmingv2.TaskState_PENDING
mockedRPCErr = codes.Internal
onSleep = func() {
switch sleeps {
case 5:
mockedRPCErr = codes.OK
case 10:
mockedState["t2"] = swarmingv2.TaskState_COMPLETED
}
}
out := call(ctx, []string{"t1", "t2"}, WaitAny)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_PENDING},
"t2": {state: swarmingv2.TaskState_COMPLETED},
})
So(sleeps, ShouldEqual, 10)
})
Convey("WaitAny: context deadline", func() {
mockedState["t1"] = swarmingv2.TaskState_PENDING
mockedState["t2"] = swarmingv2.TaskState_PENDING
out := call(ctx, []string{"t1", "t2"}, WaitAny)
So(out, ShouldResemble, map[string]stateOrErr{
"t1": {state: swarmingv2.TaskState_PENDING, err: codes.DeadlineExceeded},
"t2": {state: swarmingv2.TaskState_PENDING, err: codes.DeadlineExceeded},
})
So(sleeps, ShouldEqual, 459)
})
})
}
func mockContext(sleeps *int, timeout *time.Duration, cb func()) context.Context {
ctx, cancel := context.WithCancel(context.Background())
ctx = mathrand.Set(ctx, rand.New(rand.NewSource(0)))
ctx, clk := testclock.UseTime(ctx, testclock.TestRecentTimeUTC)
clk.SetTimerCallback(func(d time.Duration, t clock.Timer) {
if clk.Now().Sub(testclock.TestRecentTimeUTC) > *timeout {
cancel()
} else {
if sleeps != nil {
*sleeps += 1
}
if cb != nil {
cb()
}
clk.Add(d)
}
})
return ctx
}
// Unfortunately we can't use swarmingtest.Client here due to module import
// cycles, so set up a separate more targeted mock.
type clientMock struct {
Client // to "implement" all other methods by panicing with nil dereference
taskResultMock func(ctx context.Context, taskID string, fields *TaskResultFields) (*swarmingv2.TaskResultResponse, error)
taskResultsMock func(ctx context.Context, taskIDs []string, fields *TaskResultFields) ([]ResultOrErr, error)
}
func (m clientMock) TaskResult(ctx context.Context, taskID string, fields *TaskResultFields) (*swarmingv2.TaskResultResponse, error) {
return m.taskResultMock(ctx, taskID, fields)
}
func (m clientMock) TaskResults(ctx context.Context, taskIDs []string, fields *TaskResultFields) ([]ResultOrErr, error) {
return m.taskResultsMock(ctx, taskIDs, fields)
}