blob: d78eb53821d3854fcd24ba3346fa8aacab2f8db4 [file] [log] [blame]
// Copyright 2015 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package main
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/golang/protobuf/proto"
"github.com/maruel/subcommands"
"golang.org/x/net/context"
"github.com/luci/luci-go/client/isolate"
"github.com/luci/luci-go/common/auth"
logpb "github.com/luci/luci-go/common/eventlog/proto"
"github.com/luci/luci-go/common/isolated"
"github.com/luci/luci-go/common/isolatedclient"
)
const (
// archiveThreshold is the size (in bytes) used to determine whether to add
// files to a tar archive before uploading. Files smaller than this size will
// be combined into archives before being uploaded to the server.
archiveThreshold = 100e3 // 100kB
// archiveMaxSize is the maximum size of the created archives.
archiveMaxSize = 10e6
// infraFailExit is the exit code used when the exparchive fails due to
// infrastructure errors (for example, failed server requests).
infraFailExit = 2
)
func cmdExpArchive(defaultAuthOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "exparchive <options>",
ShortDesc: "EXPERIMENTAL parses a .isolate file to create a .isolated file, and uploads it and all referenced files to an isolate server",
LongDesc: "All the files listed in the .isolated file are put in the isolate server cache. Small files are combined together in a tar archive before uploading.",
CommandRun: func() subcommands.CommandRun {
c := &expArchiveRun{}
c.commonServerFlags.Init(defaultAuthOpts)
c.isolateFlags.Init(&c.Flags)
c.loggingFlags.Init(&c.Flags)
c.Flags.StringVar(&c.dumpJSON, "dump-json", "",
"Write isolated digests of archived trees to this file as JSON")
return c
},
}
}
// expArchiveRun contains the logic for the experimental archive subcommand.
// It implements subcommand.CommandRun
type expArchiveRun struct {
commonServerFlags // Provides the GetFlags method.
isolateFlags isolateFlags
loggingFlags loggingFlags
dumpJSON string
}
// Item represents a file or symlink referenced by an isolate file.
type Item struct {
Path string
RelPath string
Size int64
Mode os.FileMode
Digest isolated.HexDigest
}
// main contains the core logic for experimental archive.
func (c *expArchiveRun) main() error {
// TODO(djd): This func is long and has a lot of internal complexity (like,
// such as, archiveCallback). Refactor.
start := time.Now()
archiveOpts := &c.isolateFlags.ArchiveOptions
// Parse the incoming isolate file.
deps, rootDir, isol, err := isolate.ProcessIsolate(archiveOpts)
if err != nil {
return fmt.Errorf("failed to process isolate: %v", err)
}
log.Printf("Isolate referenced %d deps", len(deps))
// Set up a background context which is cancelled when this function returns.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create the isolated client which connects to the isolate server.
authCl, err := c.createAuthClient()
if err != nil {
return err
}
client := isolatedclient.New(nil, authCl, c.isolatedFlags.ServerURL, c.isolatedFlags.Namespace, nil, nil)
// Set up a checker and uploader. We limit the uploader to one concurrent
// upload, since the uploads are all coming from disk (with the exception of
// the isolated JSON itself) and we only want a single goroutine reading from
// disk at once.
checker := NewChecker(ctx, client)
uploader := NewUploader(ctx, client, 1)
// Walk each of the deps, partioning the results into symlinks and files categorised by size.
var links, archiveFiles, indivFiles []*Item
var archiveSize, indivSize int64 // Cumulative size of archived/individual files.
for _, dep := range deps {
// Try to walk dep. If dep is a file (or symlink), the inner function is called exactly once.
err := filepath.Walk(filepath.Clean(dep), func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
relPath, err := filepath.Rel(rootDir, path)
if err != nil {
return err
}
item := &Item{
Path: path,
RelPath: relPath,
Mode: info.Mode(),
Size: info.Size(),
}
switch {
case item.Mode&os.ModeSymlink == os.ModeSymlink:
links = append(links, item)
case item.Size < archiveThreshold:
archiveFiles = append(archiveFiles, item)
archiveSize += item.Size
default:
indivFiles = append(indivFiles, item)
indivSize += item.Size
}
return nil
})
if err != nil {
return err
}
}
// Construct a map of the files that constitute the isolate.
files := make(map[string]isolated.File)
log.Printf("Isolate expanded to %d files (total size %s) and %d symlinks", len(archiveFiles)+len(indivFiles), humanize.Bytes(uint64(archiveSize+indivSize)), len(links))
log.Printf("\t%d files (%s) to be isolated individually", len(indivFiles), humanize.Bytes(uint64(indivSize)))
log.Printf("\t%d files (%s) to be isolated in archives", len(archiveFiles), humanize.Bytes(uint64(archiveSize)))
// Handle the symlinks.
for _, item := range links {
l, err := os.Readlink(item.Path)
if err != nil {
return fmt.Errorf("unable to resolve symlink for %q: %v", item.Path, err)
}
files[item.RelPath] = isolated.SymLink(l)
}
// Handle the small to-be-archived files.
bundles := ShardItems(archiveFiles, archiveMaxSize)
log.Printf("\t%d TAR archives to be isolated", len(bundles))
for _, bundle := range bundles {
bundle := bundle
digest, tarSize, err := bundle.Digest()
if err != nil {
return err
}
log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes(uint64(tarSize)))
log.Printf("\tcontains %d files (total %s)", len(bundle.Items), humanize.Bytes(uint64(bundle.ItemSize)))
// Mint an item for this tar.
item := &Item{
Path: fmt.Sprintf(".%s.tar", digest),
RelPath: fmt.Sprintf(".%s.tar", digest),
Size: tarSize,
Mode: 0644, // Read
Digest: digest,
}
files[item.RelPath] = isolated.TarFile(item.Digest, int(item.Mode), item.Size)
checker.AddItem(item, false, func(item *Item, ps *isolatedclient.PushState) {
if ps == nil {
return
}
log.Printf("QUEUED %q for upload", item.RelPath)
uploader.Upload(item.RelPath, bundle.Contents, ps, func() {
log.Printf("UPLOADED %q", item.RelPath)
})
})
}
// Handle the large individually-uploaded files.
for _, item := range indivFiles {
d, err := hashFile(item.Path)
if err != nil {
return err
}
item.Digest = d
files[item.RelPath] = isolated.BasicFile(item.Digest, int(item.Mode), item.Size)
checker.AddItem(item, false, func(item *Item, ps *isolatedclient.PushState) {
if ps == nil {
return
}
log.Printf("QUEUED %q for upload", item.RelPath)
uploader.UploadFile(item, ps, func() {
log.Printf("UPLOADED %q", item.RelPath)
})
})
}
// Marshal the isolated file into JSON, and create an Item to describe it.
isol.Files = files
isolJSON, err := json.Marshal(isol)
if err != nil {
return err
}
isolItem := &Item{
Path: archiveOpts.Isolated,
RelPath: filepath.Base(archiveOpts.Isolated),
Digest: isolated.HashBytes(isolJSON),
Size: int64(len(isolJSON)),
}
// Check and upload isolate JSON.
checker.AddItem(isolItem, true, func(item *Item, ps *isolatedclient.PushState) {
if ps == nil {
return
}
log.Printf("QUEUED %q for upload", item.RelPath)
uploader.UploadBytes(item.RelPath, isolJSON, ps, func() {
log.Printf("UPLOADED %q", item.RelPath)
})
})
// Make sure that all pending items have been checked.
if err := checker.Close(); err != nil {
return err
}
// Make sure that all the uploads have completed successfully.
if err := uploader.Close(); err != nil {
return err
}
// Write the isolated file, and emit its digest to stdout.
if err := ioutil.WriteFile(archiveOpts.Isolated, isolJSON, 0644); err != nil {
return err
}
fmt.Printf("%s\t%s\n", isolItem.Digest, filepath.Base(archiveOpts.Isolated))
// Optionally, write the digest of the isolated file as JSON (in the same
// format as batch_archive).
if c.dumpJSON != "" {
// The name is the base name of the isolated file, extension stripped.
name := filepath.Base(archiveOpts.Isolated)
if i := strings.LastIndex(name, "."); i != -1 {
name = name[:i]
}
j, err := json.Marshal(map[string]isolated.HexDigest{
name: isolItem.Digest,
})
if err != nil {
return err
}
if err := ioutil.WriteFile(c.dumpJSON, j, 0644); err != nil {
return err
}
}
end := time.Now()
archiveDetails := &logpb.IsolateClientEvent_ArchiveDetails{
HitCount: proto.Int64(int64(checker.Hit.Count)),
MissCount: proto.Int64(int64(checker.Miss.Count)),
HitBytes: &checker.Hit.Bytes,
MissBytes: &checker.Miss.Bytes,
IsolateHash: []string{string(isolItem.Digest)},
}
eventlogger := NewLogger(ctx, c.loggingFlags.EventlogEndpoint)
op := logpb.IsolateClientEvent_ARCHIVE.Enum()
if err := eventlogger.logStats(ctx, op, start, end, archiveDetails); err != nil {
log.Printf("Failed to log to eventlog: %v", err)
}
return nil
}
func (c *expArchiveRun) parseFlags(args []string) error {
if len(args) != 0 {
return errors.New("position arguments not expected")
}
if err := c.commonServerFlags.Parse(); err != nil {
return err
}
cwd, err := os.Getwd()
if err != nil {
return err
}
if err := c.isolateFlags.Parse(cwd, RequireIsolateFile&RequireIsolatedFile); err != nil {
return err
}
return nil
}
func (c *expArchiveRun) Run(a subcommands.Application, args []string, _ subcommands.Env) int {
fmt.Fprintln(a.GetErr(), "WARNING: this command is experimental")
if err := c.parseFlags(args); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
if len(c.isolateFlags.ArchiveOptions.Blacklist) != 0 {
fmt.Fprintf(a.GetErr(), "%s: blacklist is not supported\n", a.GetName())
return 1
}
if err := c.main(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
return 0
}
func hashFile(path string) (isolated.HexDigest, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
return isolated.Hash(f)
}