blob: aec4de2161f7b9153a574b13510ee9f1ae87e66d [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package swarmingimpl
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/luci/client/cmd/swarming/swarmingimpl/output"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/swarming/client/swarming"
"go.chromium.org/luci/swarming/client/swarming/swarmingtest"
swarmingv2 "go.chromium.org/luci/swarming/proto/api_v2"
. "github.com/smartystreets/goconvey/convey"
)
func TestCollectParse(t *testing.T) {
t.Parallel()
expectErr := func(argv []string, errLike string) {
_, code, _, stderr := SubcommandTest(
context.Background(),
CmdCollect,
append([]string{"-server", "example.com"}, argv...),
nil, nil,
)
So(code, ShouldEqual, 1)
So(stderr, ShouldContainSubstring, errLike)
}
Convey(`Make sure that Parse handles no task IDs given.`, t, func() {
expectErr(nil, "must specify at least one task id")
})
Convey(`Make sure that Parse handles a malformed task ID.`, t, func() {
expectErr([]string{"$$$$$"}, "must be hex")
})
Convey(`Make sure that Parse handles a dup task ID.`, t, func() {
expectErr([]string{"aaaaaaaaa", "aaaaaaaaa"}, "given more than once")
})
Convey(`Make sure that Parse handles a negative timeout.`, t, func() {
expectErr([]string{"-timeout", "-30m", "aaaaaaaaa"}, "negative timeout")
})
}
func TestCollect(t *testing.T) {
t.Parallel()
casOutputDir := t.TempDir()
Convey(`With mocks`, t, func() {
sleeps := 0
onSleep := func() {}
ctx, clk := testclock.UseTime(context.Background(), testclock.TestRecentTimeUTC)
clk.SetTimerCallback(func(d time.Duration, t clock.Timer) {
clk.Add(d)
sleeps += 1
onSleep()
})
mockedState := map[string]swarmingv2.TaskState{}
mockedHasCAS := map[string]bool{}
mockedErr := map[string]codes.Code{}
mockedStdoutErr := error(nil)
mockedCASErr := error(nil)
service := &swarmingtest.Client{
TaskResultsMock: func(ctx context.Context, taskIDs []string, fields *swarming.TaskResultFields) ([]swarming.ResultOrErr, error) {
out := make([]swarming.ResultOrErr, len(taskIDs))
for i, taskID := range taskIDs {
if code, ok := mockedErr[taskID]; ok {
out[i] = swarming.ResultOrErr{Err: status.Errorf(code, "some error")}
} else if state, ok := mockedState[taskID]; ok {
out[i] = swarming.ResultOrErr{
Result: &swarmingv2.TaskResultResponse{
TaskId: taskID,
State: state,
},
}
if mockedHasCAS[taskID] {
out[i].Result.CasOutputRoot = &swarmingv2.CASReference{
CasInstance: "cas-instance",
Digest: &swarmingv2.Digest{
Hash: "cas-" + taskID,
},
}
}
} else {
panic(fmt.Sprintf("unexpected task %q", taskID))
}
}
return out, nil
},
TaskOutputMock: func(ctx context.Context, taskID string, out io.Writer) (swarmingv2.TaskState, error) {
if mockedStdoutErr != nil {
return 0, mockedStdoutErr
}
_, err := fmt.Fprintf(out, "Output of %s", taskID)
return swarmingv2.TaskState_COMPLETED, err
},
FilesFromCASMock: func(ctx context.Context, outdir string, casRef *swarmingv2.CASReference) ([]string, error) {
if casRef.CasInstance != "cas-instance" {
panic("unexpected CAS instance")
}
if !strings.HasPrefix(casRef.Digest.Hash, "cas-") {
panic("unexpected fake digest")
}
taskID := casRef.Digest.Hash[len("cas-"):]
if want := filepath.Join(casOutputDir, taskID); want != outdir {
panic(fmt.Sprintf("expecting out dir %q, got %q", want, outdir))
}
return []string{"out-" + taskID}, mockedCASErr
},
}
Convey(`Happy path`, func() {
mockedState["a0"] = swarmingv2.TaskState_PENDING
mockedHasCAS["a0"] = true
mockedState["a1"] = swarmingv2.TaskState_COMPLETED
mockedHasCAS["a1"] = true
mockedState["a2"] = swarmingv2.TaskState_COMPLETED
mockedHasCAS["a2"] = false
onSleep = func() {
mockedState["a0"] = swarmingv2.TaskState_COMPLETED
}
_, code, stdout, _ := SubcommandTest(
ctx,
CmdCollect,
[]string{
"-server", "example.com",
"-json-output", "-",
"-task-output-stdout", "json",
"-output-dir", casOutputDir,
"a1", "a0", "a2",
},
nil, service,
)
So(code, ShouldEqual, 0)
So(sortJSON(stdout), ShouldEqual, `{
"a0": {
"output": "Output of a0",
"outputs": [
"out-a0"
],
"results": {
"task_id": "a0",
"cas_output_root": {
"cas_instance": "cas-instance",
"digest": {
"hash": "cas-a0"
}
},
"state": "COMPLETED"
}
},
"a1": {
"output": "Output of a1",
"outputs": [
"out-a1"
],
"results": {
"task_id": "a1",
"cas_output_root": {
"cas_instance": "cas-instance",
"digest": {
"hash": "cas-a1"
}
},
"state": "COMPLETED"
}
},
"a2": {
"output": "Output of a2",
"results": {
"task_id": "a2",
"state": "COMPLETED"
}
}
}
`)
// Check actually created output directories, even for tasks with no
// outputs.
for _, taskID := range []string{"a0", "a1", "a2"} {
s, err := os.Stat(filepath.Join(casOutputDir, taskID))
So(err, ShouldBeNil)
So(s.IsDir(), ShouldBeTrue)
}
})
Convey(`Collect error`, func() {
mockedState["a0"] = swarmingv2.TaskState_COMPLETED
mockedErr["a1"] = codes.PermissionDenied
_, code, stdout, _ := SubcommandTest(
ctx,
CmdCollect,
[]string{
"-server", "example.com",
"-json-output", "-",
"-task-output-stdout", "json",
"-output-dir", casOutputDir,
"a0", "a1",
},
nil, service,
)
So(code, ShouldEqual, 0)
So(sortJSON(stdout), ShouldEqual, `{
"a0": {
"output": "Output of a0",
"results": {
"task_id": "a0",
"state": "COMPLETED"
}
},
"a1": {
"error": "rpc error: code = PermissionDenied desc = some error"
}
}
`)
})
Convey(`Stdout fetch error`, func() {
mockedState["a0"] = swarmingv2.TaskState_COMPLETED
mockedStdoutErr = errors.New("boom")
_, code, stdout, _ := SubcommandTest(
ctx,
CmdCollect,
[]string{
"-server", "example.com",
"-json-output", "-",
"-task-output-stdout", "json",
"-output-dir", casOutputDir,
"a0",
},
nil, service,
)
So(code, ShouldEqual, 0)
So(sortJSON(stdout), ShouldEqual, `{
"a0": {
"error": "fetching console output of a0: boom",
"results": {
"task_id": "a0",
"state": "COMPLETED"
}
}
}
`)
})
Convey(`CAS fetch error`, func() {
mockedState["a0"] = swarmingv2.TaskState_COMPLETED
mockedHasCAS["a0"] = true
mockedCASErr = errors.New("boom")
_, code, stdout, _ := SubcommandTest(
ctx,
CmdCollect,
[]string{
"-server", "example.com",
"-json-output", "-",
"-task-output-stdout", "json",
"-output-dir", casOutputDir,
"a0",
},
nil, service,
)
So(code, ShouldEqual, 0)
So(sortJSON(stdout), ShouldEqual, `{
"a0": {
"error": "fetching isolated output of a0: boom",
"output": "Output of a0",
"results": {
"task_id": "a0",
"cas_output_root": {
"cas_instance": "cas-instance",
"digest": {
"hash": "cas-a0"
}
},
"state": "COMPLETED"
}
}
}
`)
})
Convey(`Timeout waiting`, func() {
mockedState["a0"] = swarmingv2.TaskState_PENDING
_, code, stdout, _ := SubcommandTest(
ctx,
CmdCollect,
[]string{
"-server", "example.com",
"-json-output", "-",
"-task-output-stdout", "json",
"-output-dir", casOutputDir,
"-timeout", "1h",
"a0",
},
nil, service,
)
So(code, ShouldEqual, 0)
So(sortJSON(stdout), ShouldEqual, `{
"a0": {
"error": "rpc_timeout",
"results": {
"task_id": "a0",
"state": "PENDING"
}
}
}
`)
})
Convey(`No waiting`, func() {
mockedState["a0"] = swarmingv2.TaskState_PENDING
mockedState["a1"] = swarmingv2.TaskState_PENDING
onSleep = func() {
panic("must not sleep")
}
_, code, stdout, _ := SubcommandTest(
ctx,
CmdCollect,
[]string{
"-server", "example.com",
"-json-output", "-",
"-task-output-stdout", "json",
"-output-dir", casOutputDir,
"-wait=false",
"a1", "a0",
},
nil, service,
)
So(code, ShouldEqual, 0)
So(sortJSON(stdout), ShouldEqual, `{
"a0": {
"results": {
"task_id": "a0",
"state": "PENDING"
}
},
"a1": {
"results": {
"task_id": "a1",
"state": "PENDING"
}
}
}
`)
})
Convey(`Waiting any`, func() {
mockedState["a0"] = swarmingv2.TaskState_PENDING
mockedState["a1"] = swarmingv2.TaskState_PENDING
mockedState["a2"] = swarmingv2.TaskState_PENDING
onSleep = func() {
mockedState["a1"] = swarmingv2.TaskState_COMPLETED
mockedState["a2"] = swarmingv2.TaskState_COMPLETED
}
_, code, stdout, _ := SubcommandTest(
ctx,
CmdCollect,
[]string{
"-server", "example.com",
"-json-output", "-",
"-task-output-stdout", "json",
"-output-dir", casOutputDir,
"-eager",
"a1", "a0", "a2",
},
nil, service,
)
So(code, ShouldEqual, 0)
So(sortJSON(stdout), ShouldEqual, `{
"a0": {
"results": {
"task_id": "a0",
"state": "PENDING"
}
},
"a1": {
"output": "Output of a1",
"results": {
"task_id": "a1",
"state": "COMPLETED"
}
},
"a2": {
"output": "Output of a2",
"results": {
"task_id": "a2",
"state": "COMPLETED"
}
}
}
`)
})
})
}
func TestCollectSummarizeResults(t *testing.T) {
t.Parallel()
Convey(`Generates json.`, t, func() {
result1 := &swarmingv2.TaskResultResponse{
CurrentTaskSlice: 0,
Duration: 1,
ExitCode: 0,
State: swarmingv2.TaskState_COMPLETED,
PerformanceStats: &swarmingv2.PerformanceStats{
BotOverhead: 0.1,
CacheTrim: &swarmingv2.OperationStats{Duration: 0.1},
Cleanup: &swarmingv2.OperationStats{Duration: 0.1},
// Stats with 0 value also should be kept.
IsolatedDownload: &swarmingv2.CASOperationStats{
Duration: 0.1,
InitialNumberItems: 0,
InitialSize: 0,
ItemsCold: []byte(""),
ItemsHot: []byte("download_hot"),
NumItemsCold: 0,
NumItemsHot: 1,
TotalBytesItemsCold: 0,
TotalBytesItemsHot: 1,
},
IsolatedUpload: &swarmingv2.CASOperationStats{
Duration: 0.1,
InitialNumberItems: 0,
InitialSize: 0,
ItemsCold: []byte(""),
ItemsHot: []byte("upload_hot"),
NumItemsCold: 0,
NumItemsHot: 1,
TotalBytesItemsCold: 0,
TotalBytesItemsHot: 1,
},
NamedCachesInstall: &swarmingv2.OperationStats{Duration: 0.1},
NamedCachesUninstall: &swarmingv2.OperationStats{Duration: 0.1},
PackageInstallation: &swarmingv2.OperationStats{Duration: 0.1},
},
}
result2 := &swarmingv2.TaskResultResponse{
CurrentTaskSlice: 1,
Duration: 1,
ExitCode: -1,
State: swarmingv2.TaskState_COMPLETED,
PerformanceStats: &swarmingv2.PerformanceStats{
BotOverhead: 0.1,
CacheTrim: &swarmingv2.OperationStats{},
Cleanup: &swarmingv2.OperationStats{},
IsolatedDownload: &swarmingv2.CASOperationStats{},
IsolatedUpload: &swarmingv2.CASOperationStats{},
NamedCachesInstall: &swarmingv2.OperationStats{},
NamedCachesUninstall: &swarmingv2.OperationStats{},
PackageInstallation: &swarmingv2.OperationStats{},
},
}
result3 := &swarmingv2.TaskResultResponse{
CurrentTaskSlice: 0,
Duration: 1,
ExitCode: -1,
State: swarmingv2.TaskState_KILLED,
PerformanceStats: &swarmingv2.PerformanceStats{
BotOverhead: 0.1,
CacheTrim: &swarmingv2.OperationStats{Duration: 0.1},
Cleanup: &swarmingv2.OperationStats{Duration: 0.1},
IsolatedDownload: &swarmingv2.CASOperationStats{
Duration: 0.1,
InitialNumberItems: 0,
InitialSize: 0,
ItemsCold: []byte(""),
ItemsHot: []byte("download_hot"),
NumItemsCold: 0,
NumItemsHot: 1,
TotalBytesItemsCold: 0,
TotalBytesItemsHot: 1,
},
IsolatedUpload: &swarmingv2.CASOperationStats{},
NamedCachesInstall: &swarmingv2.OperationStats{Duration: 0.1},
NamedCachesUninstall: &swarmingv2.OperationStats{},
PackageInstallation: &swarmingv2.OperationStats{Duration: 0.1},
},
}
result4 := &swarmingv2.TaskResultResponse{
CurrentTaskSlice: 0,
Duration: 1,
State: swarmingv2.TaskState_RUNNING,
}
tmpDir := t.TempDir()
emitted := passThroughEmitter(func(sink *output.Sink) summaryEmitter {
return &defaultSummaryEmitter{
sink: sink,
populateStdout: true,
}
}, []*taskResult{
{
taskID: "task1",
result: result1,
output: fakeTextOutput(tmpDir, "Output"),
},
{
taskID: "task2",
result: result2,
output: fakeTextOutput(tmpDir, "Output"),
},
{
taskID: "task3",
result: result3,
output: fakeTextOutput(tmpDir, "Output"),
},
{
taskID: "task4",
result: result4,
output: fakeTextOutput(tmpDir, "Output"),
},
})
So(emitted, ShouldEqual, `{
"task1": {
"output": "Output",
"results": {
"duration": 1,
"state": "COMPLETED",
"performance_stats": {
"bot_overhead": 0.1,
"isolated_download": {
"duration": 0.1,
"items_hot": "ZG93bmxvYWRfaG90",
"num_items_hot": "1",
"total_bytes_items_hot": "1"
},
"isolated_upload": {
"duration": 0.1,
"items_hot": "dXBsb2FkX2hvdA==",
"num_items_hot": "1",
"total_bytes_items_hot": "1"
},
"package_installation": {
"duration": 0.1
},
"cache_trim": {
"duration": 0.1
},
"named_caches_install": {
"duration": 0.1
},
"named_caches_uninstall": {
"duration": 0.1
},
"cleanup": {
"duration": 0.1
}
}
}
},
"task2": {
"output": "Output",
"results": {
"duration": 1,
"exit_code": "-1",
"state": "COMPLETED",
"performance_stats": {
"bot_overhead": 0.1,
"isolated_download": {},
"isolated_upload": {},
"package_installation": {},
"cache_trim": {},
"named_caches_install": {},
"named_caches_uninstall": {},
"cleanup": {}
},
"current_task_slice": 1
}
},
"task3": {
"output": "Output",
"results": {
"duration": 1,
"exit_code": "-1",
"state": "KILLED",
"performance_stats": {
"bot_overhead": 0.1,
"isolated_download": {
"duration": 0.1,
"items_hot": "ZG93bmxvYWRfaG90",
"num_items_hot": "1",
"total_bytes_items_hot": "1"
},
"isolated_upload": {},
"package_installation": {
"duration": 0.1
},
"cache_trim": {
"duration": 0.1
},
"named_caches_install": {
"duration": 0.1
},
"named_caches_uninstall": {},
"cleanup": {
"duration": 0.1
}
}
}
},
"task4": {
"output": "Output",
"results": {
"duration": 1,
"state": "RUNNING"
}
}
}
`)
})
}
func TestCollectSummarizeResultsPython(t *testing.T) {
t.Parallel()
Convey(`Simple json.`, t, func() {
tmpDir := t.TempDir()
emitted := passThroughEmitter(func(sink *output.Sink) summaryEmitter {
return &legacySummaryEmitter{
sink: sink,
populateStdout: true,
taskIDs: []string{"failed1", "finished", "failed2"},
resultByID: map[string]*taskResult{},
}
}, []*taskResult{
{
taskID: "finished",
result: &swarmingv2.TaskResultResponse{
State: swarmingv2.TaskState_COMPLETED,
Duration: 1,
ExitCode: 0,
},
output: fakeTextOutput(tmpDir, "Output"),
},
{
taskID: "failed1",
err: errors.New("boom"),
},
{
taskID: "failed2",
err: errors.New("boom"),
},
})
So(emitted, ShouldEqual, `{
"shards": [
null,
{
"duration": 1,
"output": "Output",
"state": "COMPLETED"
},
null
]
}
`)
})
}
func sortJSON(s string) string {
var top map[string]json.RawMessage
if err := json.Unmarshal([]byte(s), &top); err != nil {
panic(err)
}
blob, err := json.MarshalIndent(top, "", " ")
if err != nil {
panic(err)
}
return string(blob) + "\n"
}
func passThroughEmitter(emitter func(sink *output.Sink) summaryEmitter, results []*taskResult) string {
var merr errors.MultiError
var buf bytes.Buffer
sink := output.NewSink(&buf)
em := emitter(sink)
em.start(&merr)
for _, res := range results {
em.emit(res, &merr)
}
em.finish(&merr)
So(merr, ShouldBeNil)
So(sink.Finalize(), ShouldBeNil)
return buf.String()
}
func fakeTextOutput(tmpDir string, text string) *textOutput {
file, err := os.CreateTemp(tmpDir, "swarming_collect_test_*.txt")
So(err, ShouldBeNil)
_, err = file.WriteString(text)
So(err, ShouldBeNil)
return &textOutput{
file: file,
temp: true,
}
}