blob: 43502b49b9d058ea68e4e633b124e5a31643e58a [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package downloader
import (
"archive/tar"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"
"github.com/dustin/go-humanize"
"golang.org/x/sync/errgroup"
"go.chromium.org/luci/client/internal/common"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/caching/cache"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/isolated"
"go.chromium.org/luci/common/isolatedclient"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
)
// Downloader is a high level interface to an isolatedclient.Client.
//
// Downloader provides functionality to download full isolated trees.
type Downloader struct {
// Immutable variables.
ctx context.Context
cancel func()
c *isolatedclient.Client
rootHash isolated.HexDigest
outputDir string
options Options
// Mutable variables.
mu sync.Mutex
started bool
finished bool
// err does not hold more than 10 errors not to consume too much memory.
err errors.MultiError
interval time.Duration
isoMap map[isolated.HexDigest]*isolated.Isolated
fileStats FileStats
lastFileStatsCb time.Time
// dirCache is a cache of known existing directories which is extended
// and read from by ensureDir.
muCache sync.RWMutex
dirCache stringset.Set
// pool is a goroutine priority pool which manages jobs to download
// isolated trees and files.
pool *common.GoroutinePriorityPool
}
// Options are some optional bits you can pass to New.
type Options struct {
// FileCallback allows you to set a callback function that will be called with
// every file name and metadata which is extracted to disk by the Downloader.
//
// This callback should execute quickly (e.g. push to channel, append to list),
// as it will partially block the process of the download.
//
// Tarball archives behave a bit differently. The callback will be called for
// individual files in the tarball, but the 'Digest' field will be empty. The
// Size and Mode fields will be populated, however. The callback will ALSO be
// called for the tarfile as a whole (but the tarfile will not actually exist
// on disk).
FileCallback func(string, *isolated.File)
// FileStatsCallback is a callback function that will be called at intervals
// with relevant statistics (see MaxFileStatsInterval).
//
// This callback should execute quickly (e.g. push to channel, append to list,
// etc.) as it will partially block the process of the download. However,
// since it's only called once every "interval" amount of time, being a bit
// slow here (e.g. doing console IO) isn't the worst.
//
// To allow this callback to actuate meaningfully for small downloads, this
// will be called more frequently at the beginning of the download, and will
// taper off to MaxFileStatsInterval.
FileStatsCallback func(FileStats, time.Duration)
// MaxFileStatsInterval changes the maximum interval that the
// FileStatsCallback will be called at.
//
// At the beginning of the download the interval is 100ms, but it will ramp up
// to the provided maxInterval. If you specify a MaxFileStatsInterval
// smaller than 100ms, there will be no ramp up, just a fixed interval at the
// one you specify here.
//
// Default: 5 seconds
MaxFileStatsInterval time.Duration
// MaxConcurrentJobs is the number of parallel worker goroutines the
// downloader will have.
//
// Default: 8
MaxConcurrentJobs int
// Cache is used to save/load isolated items to/from cache.
Cache *cache.Cache
}
// normalizePathSeparator returns path having os native path separator.
func normalizePathSeparator(p string) string {
if filepath.Separator == '/' {
return strings.Replace(p, `\`, string(filepath.Separator), -1)
}
return strings.Replace(p, "/", string(filepath.Separator), -1)
}
// New returns a Downloader instance, good to download one isolated.
//
// ctx will be used for logging and clock.
//
// The Client, hash and outputDir must be specified.
//
// If options is nil, this will use defaults as described in the Options struct.
func New(ctx context.Context, c *isolatedclient.Client, hash isolated.HexDigest, outputDir string, options *Options) *Downloader {
var opt Options
if options != nil {
opt = *options
}
if opt.MaxConcurrentJobs == 0 {
if runtime.GOARCH == "386" {
// set lower value to prevent memory exceeded error.
opt.MaxConcurrentJobs = 2
} else {
opt.MaxConcurrentJobs = 8
}
}
if opt.MaxFileStatsInterval == 0 {
opt.MaxFileStatsInterval = 5 * time.Second
}
interval := 100 * time.Millisecond
if interval > opt.MaxFileStatsInterval {
interval = opt.MaxFileStatsInterval
}
ctx2, cancel := context.WithCancel(ctx)
ret := &Downloader{
ctx: ctx2,
cancel: cancel,
c: c,
options: opt,
interval: interval,
dirCache: stringset.New(0),
rootHash: hash,
outputDir: normalizePathSeparator(outputDir),
isoMap: map[isolated.HexDigest]*isolated.Isolated{},
}
return ret
}
// Start begins downloading the isolated.
func (d *Downloader) Start() {
d.mu.Lock()
defer d.mu.Unlock()
if d.started {
return
}
d.started = true
d.pool = common.NewGoroutinePriorityPool(d.ctx, d.options.MaxConcurrentJobs)
if err := d.ensureDir(d.outputDir); err != nil {
d.addError(isolatedType, "<isolated setup>", err)
d.cancel()
return
}
// Start downloading the isolated tree in the work pool.
d.scheduleIsolatedJob(d.rootHash)
}
// Wait waits for the completion of the download, and returns either `nil` if
// no errors occurred during the operation, or an `errors.MultiError` otherwise.
//
// This will Start() the Downloader, if it hasn't been started already.
//
// Calling this many times is safe (and will always return the same thing).
func (d *Downloader) Wait() error {
d.Start()
_ = d.pool.Wait()
d.updateFileStats(func(s *FileStats) bool {
ret := d.finished == false
d.finished = true
return ret
})
if d.err != nil {
return d.err
}
return nil
}
// RootIsolated returns Isolated for rootHash.
func (d *Downloader) RootIsolated() (*isolated.Isolated, error) {
d.mu.Lock()
defer d.mu.Unlock()
finished := d.finished
if !finished {
return nil, errors.New(
"can only call RootIsolated on a finished Downloader")
}
if root, ok := d.isoMap[d.rootHash]; ok {
return root, nil
}
return nil, errors.New("there is no isolated for root hash")
}
func (d *Downloader) addError(ty downloadType, name string, err error) {
err = errors.Annotate(err, "%s %s", ty, name).Err()
d.mu.Lock()
defer d.mu.Unlock()
if len(d.err) >= 10 {
errors.Log(d.ctx, err)
return
}
d.err = append(d.err, err)
}
func (d *Downloader) startFile(size *int64) {
d.updateFileStats(func(s *FileStats) bool {
s.CountScheduled++
if size != nil && *size > 0 {
s.BytesScheduled += uint64(*size)
}
return false
})
}
func (d *Downloader) completeFile(name string, f *isolated.File) {
d.updateFileStats(func(s *FileStats) bool {
s.CountCompleted++
return false
})
if d.options.FileCallback != nil {
d.options.FileCallback(name, f)
}
}
func (d *Downloader) setIsolated(hash isolated.HexDigest, i isolated.Isolated) {
d.mu.Lock()
defer d.mu.Unlock()
// Omit files, they'll be called back with fileCb, and take up most of the
// space.
i.Files = nil
d.isoMap[hash] = &i
}
func (d *Downloader) updateFileStats(lockedCb func(*FileStats) bool) {
var stats FileStats
var span time.Duration
doCall := false
func() {
d.mu.Lock()
defer d.mu.Unlock()
maybeCall := lockedCb(&d.fileStats)
if d.options.FileStatsCallback != nil {
doCall = maybeCall
now := clock.Now(d.ctx)
span = now.Sub(d.lastFileStatsCb)
if span >= d.interval {
if d.interval < d.options.MaxFileStatsInterval {
d.interval *= 2
if d.interval > d.options.MaxFileStatsInterval {
d.interval = d.options.MaxFileStatsInterval
}
}
doCall = true
}
if doCall {
d.lastFileStatsCb = now
stats = d.fileStats
}
}
}()
if doCall {
d.options.FileStatsCallback(stats, span)
}
}
func (d *Downloader) track(w io.Writer) io.Writer {
if d.options.FileStatsCallback != nil {
return &writeTracker{d, w}
}
return w
}
// ensureDir ensures that the directory dir exists.
func (d *Downloader) ensureDir(dir string) error {
dir = normalizePathSeparator(dir)
// Fast path: if the cache has the directory, we're done.
d.muCache.RLock()
cached := d.dirCache.Has(dir)
d.muCache.RUnlock()
if cached {
return nil
}
// Slow path: collect the directory and its parents, then create
// them and add them to the cache.
d.muCache.Lock()
defer d.muCache.Unlock()
var parents []string
for i := dir; i != "" && !d.dirCache.Has(i); i = filepath.Dir(i) {
if i == d.outputDir {
break
}
parents = append(parents, i)
}
for i := len(parents) - 1; i >= 0; i-- {
if err := os.Mkdir(parents[i], 0700); err != nil && !os.IsExist(err) {
return err
}
d.dirCache.Add(parents[i])
}
return nil
}
func (d *Downloader) processFile(name string, details isolated.File) {
d.startFile(details.Size)
// Get full local path for file.
name = normalizePathSeparator(name)
filename := filepath.Join(d.outputDir, name)
if details.Link != nil {
d.doSymlink(filename, name, &details)
} else if details.Type == isolated.TarArchive {
d.scheduleTarballJob(name, &details)
} else {
d.scheduleFileJob(filename, name, &details)
}
}
func (d *Downloader) doSymlink(filename, name string, details *isolated.File) {
// Ensure dir exists.
if err := d.ensureDir(filepath.Dir(filename)); err != nil {
d.addError(fileType, name, err)
return
}
if err := os.Symlink(*details.Link, filename); err != nil {
d.addError(fileType, name, err)
return
}
d.completeFile(name, details)
}
func (d *Downloader) scheduleFileJob(filename, name string, details *isolated.File) {
d.pool.Schedule(fileType.Priority(), func() {
// Ensure dir exists.
if err := d.ensureDir(filepath.Dir(filename)); err != nil {
d.addError(fileType, name, err)
return
}
mode := 0644
// Ignore mode other than executable bit.
if details.Mode != nil {
mode |= *details.Mode & 0100
}
if d.options.Cache == nil {
if err := retry.Retry(d.ctx, transient.Only(retry.Default), func() (rerr error) {
// no cache use case.
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, os.FileMode(mode))
if err != nil {
return err
}
defer func() {
if cerr := f.Close(); rerr == nil {
rerr = cerr
}
}()
if err := d.c.Fetch(d.ctx, details.Digest, d.track(f)); err != nil {
f.Close()
os.Remove(filename)
return err
}
return nil
}, nil); err != nil {
d.addError(fileType, name, err)
return
}
d.completeFile(name, details)
return
}
err := d.options.Cache.Hardlink(details.Digest, filename, os.FileMode(mode))
if err != nil {
if !errors.Contains(err, os.ErrNotExist) {
d.addError(fileType, name, errors.Annotate(err, "failed to link from cache").Err())
return
}
retryCnt := 0
var lastRetryErr error
if err := retry.Retry(d.ctx, transient.Only(retry.Default), func() error {
// cache miss case
pr, pw := io.Pipe()
wg, ctx := errgroup.WithContext(d.ctx)
wg.Go(func() error {
err := d.c.Fetch(ctx, details.Digest, d.track(pw))
if perr := pw.CloseWithError(err); perr != nil {
return errors.Annotate(perr, "failed to close pipe writer").Err()
}
return err
})
wg.Go(func() error {
err := d.options.Cache.AddWithHardlink(ctx, details.Digest, pr, filename, os.FileMode(mode))
if perr := pr.CloseWithError(err); perr != nil {
return errors.Annotate(perr, "failed to close pipe reader").Err()
}
// invalid hash might be due to data corruption in network.
if errors.Contains(err, cache.ErrInvalidHash) {
return transient.Tag.Apply(err)
}
return err
})
return wg.Wait()
}, func(err error, duration time.Duration) {
retryCnt += 1
lastRetryErr = err
}); err != nil {
d.addError(fileType, name, errors.Annotate(err, "failed to read from cache").Err())
return
}
if lastRetryErr != nil {
logging.WithError(lastRetryErr).Warningf(d.ctx, "failed to fetch %d times for %s(%s)", retryCnt, filename, details.Digest)
}
}
d.completeFile(name, details)
}, func() {
d.addError(fileType, name, d.ctx.Err())
})
}
func (d *Downloader) loadOrAddToCache(hash isolated.HexDigest) (*bytes.Buffer, error) {
var buf bytes.Buffer
if d.options.Cache != nil {
if r, err := d.options.Cache.Read(hash); err == nil {
err := func() error {
defer r.Close()
if _, err := io.Copy(&buf, r); err != nil {
return errors.Annotate(err, "failed to call io.Copy").Err()
}
return nil
}()
if err != nil {
return nil, err
}
return &buf, nil
}
}
if err := retry.Retry(d.ctx, transient.Only(retry.Default), func() error {
buf.Reset()
if err := d.c.Fetch(d.ctx, hash, d.track(&buf)); err != nil {
return err
}
if got, want := isolated.HashBytes(d.c.Hash(), buf.Bytes()), hash; got != want {
return errors.Reason("digest missmatch got %s, want %s", got, want).Tag(transient.Tag).Err()
}
return nil
}, nil); err != nil {
return nil, err
}
if d.options.Cache != nil {
if err := d.options.Cache.Add(d.ctx, hash, bytes.NewReader(buf.Bytes())); err != nil {
return nil, err
}
}
return &buf, nil
}
func (d *Downloader) scheduleTarballJob(tarname string, details *isolated.File) {
hash := details.Digest
d.pool.Schedule(tarType.Priority(), func() {
buf, err := d.loadOrAddToCache(hash)
if err != nil {
d.addError(tarType, string(hash), err)
return
}
tf := tar.NewReader(buf)
loop:
for {
hdr, err := tf.Next()
switch err {
case io.EOF:
// end of the tarball
break loop
case nil:
default:
d.addError(tarType, string(hash), errors.Annotate(err, "failed to call Next()").Err())
return
}
name := filepath.Clean(normalizePathSeparator(hdr.Name))
// got a file to read
if hdr.Typeflag != tar.TypeReg {
d.addError(tarType, string(hash)+":"+name,
errors.New("not a regular file"))
continue
}
if filepath.IsAbs(name) {
d.addError(tarType, string(hash)+":"+name,
errors.New("absolute path"))
continue
}
if strings.HasPrefix(name, "..") {
d.addError(tarType, string(hash)+":"+name,
errors.New("relative path traversal"))
continue
}
filename := filepath.Join(d.outputDir, name)
// Igonre mode other than executable bit.
mode := 0644 | (int(hdr.Mode) & 0100)
if err := d.ensureDir(filepath.Dir(filename)); err != nil {
d.addError(tarType, string(hash)+":"+filename, err)
continue
}
// This is to close |f| as early as possible.
func() {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, os.FileMode(mode))
if err != nil {
d.addError(tarType, string(hash)+":"+filename, err)
return
}
defer func() {
if err := f.Close(); err != nil {
d.addError(tarType, string(hash)+":"+filename, err)
}
}()
n, err := io.Copy(f, tf)
if err != nil {
d.addError(tarType, string(hash)+":"+filename, errors.Annotate(err, "failed to call io.Copy()").Err())
return
}
if n != hdr.Size {
d.addError(tarType, string(hash)+":"+filename,
errors.New("failed to copy entire file"))
return
}
// Fake a File entry for the subfile. Also call startfile so that the
// started/completed file numbers balance.
d.startFile(nil)
d.completeFile(name, &isolated.File{
Mode: &mode,
Size: &hdr.Size,
})
}()
}
// Also issue a callback for the overall tarball.
d.completeFile(tarname, details)
}, func() {
d.addError(tarType, string(hash), d.ctx.Err())
})
}
func (d *Downloader) scheduleIsolatedJob(hash isolated.HexDigest) {
d.pool.Schedule(isolatedType.Priority(), func() {
var buf bytes.Buffer
if err := retry.Retry(d.ctx, transient.Only(retry.Default), func() error {
buf.Reset()
return d.c.Fetch(d.ctx, hash, &buf)
}, nil); err != nil {
d.addError(isolatedType, string(hash), err)
return
}
var root isolated.Isolated
if err := json.Unmarshal(buf.Bytes(), &root); err != nil {
d.addError(isolatedType, string(hash), err)
return
}
d.setIsolated(hash, root)
for _, node := range root.Includes {
d.scheduleIsolatedJob(node)
}
for name, details := range root.Files {
d.processFile(name, details)
}
}, func() {
d.addError(isolatedType, string(hash), d.ctx.Err())
})
}
// FileStats is very basic statistics about the progress of
// a FetchIsolatedTracked operation.
type FileStats struct {
// These cover the files that the isolated file says to fetch.
CountScheduled uint64
CountCompleted uint64
// These cover the bytes of the files that the isolated file describes, not
// the bytes of the isolated files themselves.
//
// Note that these are potentially served from the local cache, and so you
// could observe speeds much faster than the network speed :).
BytesScheduled uint64
BytesCompleted uint64
}
// StatLine calculates a simple statistics line suitable for logging.
func (f *FileStats) StatLine(previous *FileStats, span time.Duration) string {
var bytesDownloaded uint64
if previous != nil {
bytesDownloaded = f.BytesCompleted - previous.BytesCompleted
}
return fmt.Sprintf("Files (%d/%d) - %s / %s - %0.1f%% - %s/s",
f.CountCompleted, f.CountScheduled,
humanize.Bytes(f.BytesCompleted), humanize.Bytes(f.BytesScheduled),
100*float64(f.BytesCompleted)/float64(f.BytesScheduled),
humanize.Bytes(uint64(float64(bytesDownloaded)/span.Seconds())),
)
}
type writeTracker struct {
d *Downloader
w io.Writer
}
func (w *writeTracker) Write(bs []byte) (n int, err error) {
w.d.updateFileStats(func(s *FileStats) bool {
s.BytesCompleted += uint64(len(bs))
return false
})
return w.w.Write(bs)
}
type downloadType int8
const (
fileType downloadType = iota
tarType
isolatedType
)
func (d downloadType) Priority() int64 {
return int64(d)
}
func (d downloadType) String() string {
switch d {
case fileType:
return "file"
case tarType:
return "tarball"
case isolatedType:
return "isolated"
default:
panic("invalid downloadType")
}
}
// Stats is stats for FetchAndMap.
type Stats struct {
Duration time.Duration `json:"duration"`
ItemsCold []byte `json:"items_cold"`
ItemsHot []byte `json:"items_hot"`
}
// CacheStats returns packed stats for cache miss/hit.
func CacheStats(cache *cache.Cache) ([]byte, []byte, error) {
// TODO(yyanagisawa): refactor this.
added := cache.Added()
used := cache.Used()
sort.Slice(added, func(i, j int) bool { return added[i] < added[j] })
itemsCold, err := isolated.Pack(added)
if err != nil {
return nil, nil, errors.Annotate(err, "failed to call Pack for cold items").Err()
}
hotCounter := make(map[int64]int)
for _, v := range used {
hotCounter[v]++
}
for _, v := range added {
hotCounter[v]--
}
var hot []int64
for k, v := range hotCounter {
for i := 0; i < v; i++ {
hot = append(hot, k)
}
}
sort.Slice(hot, func(i, j int) bool { return hot[i] < hot[j] })
itemsHot, err := isolated.Pack(hot)
if err != nil {
return nil, nil, errors.Annotate(err, "failed to call Pack for hot items").Err()
}
return itemsCold, itemsHot, nil
}
// FetchAndMap fetches an isolated tree, create the tree and returns isolated tree.
func FetchAndMap(ctx context.Context, isolatedHash isolated.HexDigest, c *isolatedclient.Client, cache *cache.Cache, outDir string) (*isolated.Isolated, Stats, error) {
start := time.Now()
d := New(ctx, c, isolatedHash, outDir, &Options{
Cache: cache,
})
waitErr := d.Wait()
itemsCold, itemsHot, err := CacheStats(cache)
if err != nil {
return nil, Stats{}, errors.Annotate(err, "failed to call CacheStats").Err()
}
d.mu.Lock()
defer d.mu.Unlock()
return d.isoMap[isolatedHash], Stats{
Duration: time.Now().Sub(start),
ItemsCold: itemsCold,
ItemsHot: itemsHot,
}, waitErr
}