blob: 23aaa1d16d6605daee057f09f4246f92a6047061 [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package archiver
import (
"bytes"
"crypto"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sync"
humanize "github.com/dustin/go-humanize"
"go.chromium.org/luci/common/isolated"
"go.chromium.org/luci/common/isolatedclient"
"go.chromium.org/luci/common/system/filesystem"
)
const (
// archiveMaxSize is the maximum size of the created archives.
archiveMaxSize = 10e6
)
// limitedOS contains a subset of the functions from the os package.
type limitedOS interface {
Readlink(string) (string, error)
// Open is like os.Open, but returns an io.ReadCloser since
// that's all we need and it's easier to implement with a fake.
Open(string) (io.ReadCloser, error)
openFiler
}
type openFiler interface {
// OpenFile is like os.OpenFile, but returns an io.WriteCloser since
// that's all we need and it's easier to implement with a fake.
OpenFile(string, int, os.FileMode) (io.WriteCloser, error)
}
// standardOS implements limitedOS by delegating to the standard library's os package.
type standardOS struct{}
func (sos standardOS) Readlink(name string) (string, error) {
return os.Readlink(name)
}
func (sos standardOS) Open(name string) (io.ReadCloser, error) {
return os.Open(name)
}
func (sos standardOS) OpenFile(name string, flag int, perm os.FileMode) (io.WriteCloser, error) {
return os.OpenFile(name, flag, perm)
}
// hashResult stores the result of an isolated.Hash call.
type hashResult struct {
digest isolated.HexDigest
err error
}
// UploadTracker uploads and keeps track of files.
type UploadTracker struct {
checker Checker
uploader Uploader
isol *isolated.Isolated
// A cache of file hashes, to speed up future requests for a hash of the same file.
fileHashCache *sync.Map
// Override for testing.
lOS limitedOS
doHashFileImpl func(*UploadTracker, string) (isolated.HexDigest, error)
}
// newUploadTracker constructs an UploadTracker. It tracks uploaded files in isol.Files.
func newUploadTracker(checker Checker, uploader Uploader, isol *isolated.Isolated, fileHashCache *sync.Map) *UploadTracker {
isol.Files = make(map[string]isolated.File)
return &UploadTracker{
checker: checker,
uploader: uploader,
isol: isol,
fileHashCache: fileHashCache,
lOS: standardOS{},
doHashFileImpl: doHashFile,
}
}
// UploadDeps uploads all of the items in parts.
func (ut *UploadTracker) UploadDeps(parts partitionedDeps) error {
if err := ut.populateSymlinks(parts.links.items); err != nil {
return err
}
if err := ut.tarAndUploadFiles(parts.filesToArchive.items); err != nil {
return err
}
if err := ut.uploadFiles(parts.indivFiles.items); err != nil {
return err
}
return nil
}
// Files returns the files which have been uploaded.
// Note: files may not have completed uploading until the tracker's Checker and
// Uploader have been closed.
func (ut *UploadTracker) Files() map[string]isolated.File {
return ut.isol.Files
}
// populateSymlinks adds an isolated.File to files for each provided symlink
func (ut *UploadTracker) populateSymlinks(symlinks []*Item) error {
for _, item := range symlinks {
l, err := ut.lOS.Readlink(item.Path)
if err != nil {
return fmt.Errorf("unable to resolve symlink for %q: %v", item.Path, err)
}
ut.isol.Files[item.RelPath] = isolated.SymLink(l)
}
return nil
}
// tarAndUploadFiles creates bundles of files, uploads them, and adds each bundle to files.
func (ut *UploadTracker) tarAndUploadFiles(smallFiles []*Item) error {
bundles := shardItems(smallFiles, archiveMaxSize)
log.Printf("\t%d TAR archives to be isolated", len(bundles))
for _, bundle := range bundles {
// TODO(crbug/854610): Do not create a tarfile when it contains a low
// number of files, it should upload the file(s) directly instead.
bundle := bundle
digest, tarSize, err := bundle.Digest(ut.checker.Hash())
if err != nil {
return err
}
log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes(uint64(tarSize)))
log.Printf("\tcontains %d files (total %s)", len(bundle.items), humanize.Bytes(uint64(bundle.itemSize)))
// Mint an item for this tar.
item := &Item{
Path: fmt.Sprintf(".%s.tar", digest),
RelPath: fmt.Sprintf(".%s.tar", digest),
Size: tarSize,
Digest: digest,
}
ut.isol.Files[item.RelPath] = isolated.TarFile(item.Digest, item.Size)
ut.checker.AddItem(item, false, func(item *Item, ps *isolatedclient.PushState) {
if ps == nil {
return
}
// We are about to upload item. If we fail, the entire isolate operation will fail.
// If we succeed, then item will be on the server by the time that the isolate operation
// completes. So it is safe for subsequent checks to assume that the item exists.
ut.checker.PresumeExists(item)
log.Printf("QUEUED %q for upload", item.RelPath)
ut.uploader.Upload(item.RelPath, bundle.Contents, ps, func() {
log.Printf("UPLOADED %q", item.RelPath)
})
})
}
return nil
}
// uploadFiles uploads each file and adds it to files.
func (ut *UploadTracker) uploadFiles(files []*Item) error {
// Handle the large individually-uploaded files.
for _, item := range files {
d, err := ut.hashFile(item.Path)
if err != nil {
return err
}
item.Digest = d
ut.isol.Files[item.RelPath] = isolated.BasicFile(item.Digest, int(item.Mode), item.Size)
ut.checker.AddItem(item, false, func(item *Item, ps *isolatedclient.PushState) {
if ps == nil {
return
}
// We are about to upload item. If we fail, the entire isolate operation will fail.
// If we succeed, then item will be on the server by the time that the isolate operation
// completes. So it is safe for subsequent checks to assume that the item exists.
ut.checker.PresumeExists(item)
log.Printf("QUEUED %q for upload", item.RelPath)
ut.uploader.UploadFile(item, ps, func() {
log.Printf("UPLOADED %q", item.RelPath)
})
})
}
return nil
}
// IsolatedSummary contains an isolate name and its digest.
type IsolatedSummary struct {
// Name is the base name an isolated file with any extension stripped
Name string
Digest isolated.HexDigest
}
// Finalize creates and uploads the isolate JSON at the isolatePath, and closes the checker and uploader.
// It returns the isolate name and digest.
// Finalize should only be called after UploadDeps.
func (ut *UploadTracker) Finalize(isolatedPath string) (IsolatedSummary, error) {
isolFile, err := newIsolatedFile(ut.isol, isolatedPath)
if err != nil {
return IsolatedSummary{}, err
}
// Check and upload isolate JSON.
ut.checker.AddItem(isolFile.item(ut.checker.Hash()), true, func(item *Item, ps *isolatedclient.PushState) {
if ps == nil {
return
}
log.Printf("QUEUED %q for upload", item.RelPath)
ut.uploader.UploadBytes(item.RelPath, isolFile.contents(), ps, func() {
log.Printf("UPLOADED %q", item.RelPath)
})
})
// Write the isolated file...
if isolFile.path != "" {
if err := isolFile.writeJSONFile(ut.lOS); err != nil {
return IsolatedSummary{}, err
}
}
return IsolatedSummary{
Name: isolFile.name(),
Digest: isolFile.item(ut.checker.Hash()).Digest,
}, nil
}
// hashFile returns the hash of the contents of path, memoizing its results.
func (ut *UploadTracker) hashFile(path string) (isolated.HexDigest, error) {
result, ok := ut.fileHashCache.Load(path)
if ok {
castedResult := result.(hashResult)
return castedResult.digest, castedResult.err
}
digest, err := ut.doHashFileImpl(ut, path)
ut.fileHashCache.Store(path, hashResult{digest, err})
return digest, err
}
// doHashFile returns the hash of the contents of path. This should not be
// called directly; call hashFile instead.
func doHashFile(ut *UploadTracker, path string) (isolated.HexDigest, error) {
f, err := ut.lOS.Open(path)
if err != nil {
return "", err
}
defer f.Close()
return isolated.Hash(ut.checker.Hash(), f)
}
// isolatedFile is an isolated file which is stored in memory.
// It can produce a corresponding Item, for upload to the server,
// and also write its contents to the filesystem.
type isolatedFile struct {
json []byte
path string
}
func newIsolatedFile(isol *isolated.Isolated, path string) (*isolatedFile, error) {
j, err := json.Marshal(isol)
if err != nil {
return &isolatedFile{}, err
}
return &isolatedFile{json: j, path: path}, nil
}
// item creates an *Item to represent the isolated JSON file.
func (ij *isolatedFile) item(h crypto.Hash) *Item {
return &Item{
Path: ij.path,
RelPath: filepath.Base(ij.path),
Digest: isolated.HashBytes(h, ij.json),
Size: int64(len(ij.json)),
}
}
func (ij *isolatedFile) contents() []byte {
return ij.json
}
// writeJSONFile writes the file contents to the filesystem.
func (ij *isolatedFile) writeJSONFile(opener openFiler) error {
f, err := opener.OpenFile(ij.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
_, err = io.Copy(f, bytes.NewBuffer(ij.json))
err2 := f.Close()
if err != nil {
return err
}
return err2
}
// name returns the base name of the isolated file, extension stripped.
func (ij *isolatedFile) name() string {
return filesystem.GetFilenameNoExt(ij.path)
}