| // Copyright 2020 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package sink |
| |
| import ( |
| "context" |
| "io/ioutil" |
| "net/http" |
| "os" |
| "testing" |
| |
| . "github.com/smartystreets/goconvey/convey" |
| . "go.chromium.org/luci/common/testing/assertions" |
| pb "go.chromium.org/luci/resultdb/proto/v1" |
| sinkpb "go.chromium.org/luci/resultdb/sink/proto/v1" |
| ) |
| |
| func TestArtifactChannel(t *testing.T) { |
| t.Parallel() |
| |
| Convey("schedule", t, func() { |
| cfg := testServerConfig("127.0.0.1:123", "secret") |
| ctx := context.Background() |
| |
| streamCh := make(chan *http.Request, 10) |
| cfg.ArtifactStreamClient.Transport = mockTransport( |
| func(in *http.Request) (*http.Response, error) { |
| streamCh <- in |
| return &http.Response{StatusCode: http.StatusNoContent}, nil |
| }, |
| ) |
| batchCh := make(chan *pb.BatchCreateArtifactsRequest, 10) |
| cfg.Recorder.(*mockRecorder).batchCreateArtifacts = func(ctx context.Context, in *pb.BatchCreateArtifactsRequest) (*pb.BatchCreateArtifactsResponse, error) { |
| batchCh <- in |
| return nil, nil |
| } |
| createTask := func(name, content string) *uploadTask { |
| art := testArtifactWithContents([]byte(content)) |
| task, err := newUploadTask(name, art) |
| So(err, ShouldBeNil) |
| return task |
| } |
| |
| Convey("with a small artifact", func() { |
| task := createTask("invocations/inv/artifacts/art1", "content") |
| ac := newArtifactChannel(ctx, &cfg) |
| ac.schedule(task) |
| ac.closeAndDrain(ctx) |
| |
| // The artifact should have been sent to the batch channel. |
| So(<-batchCh, ShouldResembleProto, &pb.BatchCreateArtifactsRequest{ |
| Requests: []*pb.CreateArtifactRequest{{ |
| Parent: "invocations/inv", |
| Artifact: &pb.Artifact{ |
| ArtifactId: "art1", |
| ContentType: task.art.ContentType, |
| SizeBytes: int64(len("content")), |
| Contents: []byte("content"), |
| }, |
| }}, |
| }) |
| }) |
| |
| Convey("with multiple, small artifacts", func() { |
| cfg.MaxBatchableArtifactSize = 10 |
| ac := newArtifactChannel(ctx, &cfg) |
| |
| t1 := createTask("invocations/inv/artifacts/art1", "1234") |
| t2 := createTask("invocations/inv/artifacts/art2", "5678") |
| t3 := createTask("invocations/inv/artifacts/art3", "9012") |
| ac.schedule(t1) |
| ac.schedule(t2) |
| ac.schedule(t3) |
| ac.closeAndDrain(ctx) |
| |
| // The 1st request should contain the first two artifacts. |
| So(<-batchCh, ShouldResembleProto, &pb.BatchCreateArtifactsRequest{ |
| Requests: []*pb.CreateArtifactRequest{ |
| // art1 |
| { |
| Parent: "invocations/inv", |
| Artifact: &pb.Artifact{ |
| ArtifactId: "art1", |
| ContentType: t1.art.ContentType, |
| SizeBytes: int64(len("1234")), |
| Contents: []byte("1234"), |
| }, |
| }, |
| // art2 |
| { |
| Parent: "invocations/inv", |
| Artifact: &pb.Artifact{ |
| ArtifactId: "art2", |
| ContentType: t2.art.ContentType, |
| SizeBytes: int64(len("5678")), |
| Contents: []byte("5678"), |
| }, |
| }, |
| }, |
| }) |
| |
| // The 2nd request should only contain the last one. |
| So(<-batchCh, ShouldResembleProto, &pb.BatchCreateArtifactsRequest{ |
| Requests: []*pb.CreateArtifactRequest{ |
| // art3 |
| { |
| Parent: "invocations/inv", |
| Artifact: &pb.Artifact{ |
| ArtifactId: "art3", |
| ContentType: t3.art.ContentType, |
| SizeBytes: int64(len("9012")), |
| Contents: []byte("9012"), |
| }, |
| }, |
| }, |
| }) |
| }) |
| |
| Convey("with a large artifact", func() { |
| cfg.MaxBatchableArtifactSize = 10 |
| ac := newArtifactChannel(ctx, &cfg) |
| |
| t1 := createTask("invocations/inv/artifacts/art1", "content-foo-bar") |
| ac.schedule(t1) |
| ac.closeAndDrain(ctx) |
| |
| // The artifact should have been sent to the stream channel. |
| req := <-streamCh |
| So(req, ShouldNotBeNil) |
| So(req.URL.String(), ShouldEqual, |
| "https://"+cfg.ArtifactStreamHost+"/invocations/inv/artifacts/art1") |
| body, err := ioutil.ReadAll(req.Body) |
| So(err, ShouldBeNil) |
| So(body, ShouldResemble, []byte("content-foo-bar")) |
| }) |
| }) |
| |
| } |
| |
| func TestUploadTask(t *testing.T) { |
| t.Parallel() |
| |
| Convey("newUploadTask", t, func() { |
| name := "invocations/inv/artifacts/art1" |
| fArt := testArtifactWithFile(func(f *os.File) { |
| _, err := f.Write([]byte("content")) |
| So(err, ShouldBeNil) |
| }) |
| fArt.ContentType = "plain/text" |
| defer os.Remove(fArt.GetFilePath()) |
| |
| Convey("works", func() { |
| t, err := newUploadTask(name, fArt) |
| So(err, ShouldBeNil) |
| So(t, ShouldResemble, &uploadTask{art: fArt, artName: name, size: int64(len("content"))}) |
| }) |
| |
| Convey("fails", func() { |
| // stat error |
| So(os.Remove(fArt.GetFilePath()), ShouldBeNil) |
| _, err := newUploadTask(name, fArt) |
| So(err, ShouldErrLike, "querying file info") |
| |
| // is a directory |
| path, err := ioutil.TempDir("", "foo") |
| So(err, ShouldBeNil) |
| defer os.RemoveAll(path) |
| fArt.Body.(*sinkpb.Artifact_FilePath).FilePath = path |
| _, err = newUploadTask(name, fArt) |
| So(err, ShouldErrLike, "is a directory") |
| }) |
| }) |
| |
| Convey("CreateRequest", t, func() { |
| name := "invocations/inv/tests/t1/results/r1/artifacts/a1" |
| fArt := testArtifactWithFile(func(f *os.File) { |
| _, err := f.Write([]byte("content")) |
| So(err, ShouldBeNil) |
| }) |
| fArt.ContentType = "plain/text" |
| defer os.Remove(fArt.GetFilePath()) |
| ut, err := newUploadTask(name, fArt) |
| So(err, ShouldBeNil) |
| |
| Convey("works", func() { |
| req, err := ut.CreateRequest() |
| So(err, ShouldBeNil) |
| So(req, ShouldResembleProto, &pb.CreateArtifactRequest{ |
| Parent: "invocations/inv/tests/t1/results/r1", |
| Artifact: &pb.Artifact{ |
| ArtifactId: "a1", |
| ContentType: "plain/text", |
| SizeBytes: int64(len("content")), |
| Contents: []byte("content"), |
| }, |
| }) |
| }) |
| |
| Convey("fails", func() { |
| // the artifact content changed. |
| So(ioutil.WriteFile(fArt.GetFilePath(), []byte("surprise!!"), 0), ShouldBeNil) |
| _, err := ut.CreateRequest() |
| So(err, ShouldErrLike, "the size of the artifact contents changed") |
| |
| // the file no longer exists. |
| So(os.Remove(fArt.GetFilePath()), ShouldBeNil) |
| _, err = ut.CreateRequest() |
| So(err, ShouldErrLike, "open "+fArt.GetFilePath()) // no such file or directory |
| }) |
| }) |
| } |