blob: 64013c65426facde7faddc57d3c129b4204b42ab [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processing
import (
"bytes"
"context"
"io"
"strings"
"testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
api "go.chromium.org/luci/cipd/api/cipd/v1"
"go.chromium.org/luci/cipd/appengine/impl/gs"
"go.chromium.org/luci/cipd/appengine/impl/testutil"
"go.chromium.org/luci/cipd/common"
"go.chromium.org/luci/common/retry/transient"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestExtractor(t *testing.T) {
t.Parallel()
ctx := context.Background()
Convey("With mocks", t, func() {
var publishedRef *api.ObjectRef
var canceled bool
expectedUploadAlgo := api.HashAlgo_SHA256
cas := testutil.MockCAS{
BeginUploadImpl: func(_ context.Context, r *api.BeginUploadRequest) (*api.UploadOperation, error) {
So(r.HashAlgo, ShouldEqual, expectedUploadAlgo)
return &api.UploadOperation{
OperationId: "op_id",
UploadUrl: "http://example.com/upload",
}, nil
},
FinishUploadImpl: func(_ context.Context, r *api.FinishUploadRequest) (*api.UploadOperation, error) {
So(r.UploadOperationId, ShouldEqual, "op_id")
publishedRef = r.ForceHash
return &api.UploadOperation{Status: api.UploadStatus_PUBLISHED}, nil
},
CancelUploadImpl: func(_ context.Context, r *api.CancelUploadRequest) (*api.UploadOperation, error) {
So(r.UploadOperationId, ShouldEqual, "op_id")
canceled = true
return &api.UploadOperation{Status: api.UploadStatus_CANCELED}, nil
},
}
extracted := bytes.Buffer{}
uploader := &trackingWriter{w: &extracted}
testFileBody := strings.Repeat("01234567", 8960)
ex := Extractor{
Reader: packageReader(map[string]string{
"test/file": testFileBody,
}),
CAS: &cas,
PrimaryHash: api.HashAlgo_SHA256,
AlternativeHashes: []api.HashAlgo{api.HashAlgo_SHA1},
Uploader: func(ctx context.Context, size int64, uploadURL string) io.Writer { return uploader },
BufferSize: 64 * 1024,
}
Convey("Happy path, SHA256", func() {
res, err := ex.Run(ctx, "test/file")
So(err, ShouldBeNil)
// Check the return value.
So(res.Path, ShouldEqual, "test/file")
So(res.Ref, ShouldResembleProto, &api.ObjectRef{
HashAlgo: api.HashAlgo_SHA256,
HexDigest: hexDigest(api.HashAlgo_SHA256, testFileBody),
})
So(res.Size, ShouldEqual, len(testFileBody))
So(res.Hashes, ShouldHaveLength, 2)
for _, h := range []api.HashAlgo{api.HashAlgo_SHA1, api.HashAlgo_SHA256} {
So(common.HexDigest(res.Hashes[h]), ShouldEqual, hexDigest(h, testFileBody))
}
// Check it actually uploaded the correct thing.
So(extracted.String(), ShouldEqual, testFileBody)
So(publishedRef, ShouldResembleProto, &api.ObjectRef{
HashAlgo: api.HashAlgo_SHA256,
HexDigest: hexDigest(api.HashAlgo_SHA256, testFileBody),
})
// Check it was written in 64 Kb chunks, NOT 32 Kb as used by zip.Reader.
So(uploader.calls, ShouldResemble, []int{64 * 1024, 6 * 1024})
})
Convey("No such file in the package", func() {
_, err := ex.Run(ctx, "unknown")
So(err, ShouldErrLike, `failed to open the file for reading: no file "unknown" inside the package`)
So(transient.Tag.In(err), ShouldBeFalse)
})
Convey("Internal error when initiating the upload", func() {
cas.BeginUploadImpl = func(_ context.Context, r *api.BeginUploadRequest) (*api.UploadOperation, error) {
return nil, status.Errorf(codes.Internal, "boo")
}
_, err := ex.Run(ctx, "test/file")
So(err, ShouldErrLike, `failed to open a CAS upload: rpc error: code = Internal desc = boo`)
So(transient.Tag.In(err), ShouldBeTrue)
})
Convey("Internal error when finalizing the upload", func() {
cas.FinishUploadImpl = func(_ context.Context, r *api.FinishUploadRequest) (*api.UploadOperation, error) {
return nil, status.Errorf(codes.Internal, "boo")
}
_, err := ex.Run(ctx, "test/file")
So(err, ShouldErrLike, `failed to finalize the CAS upload: rpc error: code = Internal desc = boo`)
So(transient.Tag.In(err), ShouldBeTrue)
})
Convey("Asked to restart the upload", func() {
uploader.err = &gs.RestartUploadError{Offset: 124}
_, err := ex.Run(ctx, "test/file")
So(err, ShouldErrLike, `asked to restart the upload from faraway offset: the upload should be restarted from offset 124`)
So(transient.Tag.In(err), ShouldBeTrue)
So(canceled, ShouldBeTrue)
})
})
}
func packageReader(data map[string]string) *PackageReader {
buf := bytes.NewReader(testutil.MakeZip(data))
size := int64(buf.Len())
r, _ := NewPackageReader(buf, size)
return r
}
type trackingWriter struct {
w io.Writer
calls []int // sizes of each pushed chunk
err error // err to return from Write
}
func (w *trackingWriter) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
w.calls = append(w.calls, len(p))
return w.w.Write(p)
}