blob: 6c2f1804329237a95eb7892e5400a387f6aadcfa [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Package gs exports helpers to upload log data to Google Storage.
package gs
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/gcloud/gs"
gcgs "go.chromium.org/luci/common/gcloud/gs"
"go.chromium.org/luci/common/logging"
)
// ImageArchivePrefix is the base URL for the Google Storage bucket for
// ChromeOS image archives.
const ImageArchivePrefix = "gs://chromeos-image-archive"
// DirWriter exposes methods to write a local directory to Google Storage.
type DirWriter struct {
client gsClient
maxConcurrentUploads int
retryIterator retry.Iterator
}
// gsClient is a Google Storage client.
//
// This interface is a subset of the gcgs.Client interface.
type gsClient interface {
NewWriter(p gcgs.Path) (gs.Writer, error)
}
// NewDirWriter creates an object which can write a directory and its subdirectories to the given Google Storage path
func NewDirWriter(client gsClient, maxConcurrentUploads int) *DirWriter {
return &DirWriter{
client: client,
maxConcurrentUploads: maxConcurrentUploads,
retryIterator: &concurrencySafeRetryIterator{
i: &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 100 * time.Millisecond,
Retries: 100,
},
MaxDelay: 30 * time.Second,
Multiplier: 2,
},
},
}
}
func verifyPaths(localPath string, gsPath string) error {
problems := []string{}
if _, err := os.Stat(localPath); err != nil {
problems = append(problems, fmt.Sprintf("invalid local path (%s)", localPath))
}
if _, err := url.Parse(gsPath); err != nil {
problems = append(problems, fmt.Sprintf("invalid GS path (%s)", gsPath))
}
if len(problems) > 0 {
return errors.Reason("path errors: %s", strings.Join(problems, ", ")).Err()
}
return nil
}
// WriteDir writes a local directory to Google Storage.
//
// If ctx is canceled, WriteDir() returns after completing in-flight uploads,
// skipping remaining contents of the directory and returns ctx.Err().
func (w *DirWriter) WriteDir(ctx context.Context, srcDir string, dstDir gcgs.Path) error {
logging.Debugf(ctx, "Writing %s and subtree to %s.", srcDir, dstDir)
if err := verifyPaths(srcDir, string(dstDir)); err != nil {
return err
}
files, merr := discoverFiles(srcDir, dstDir)
var terr error
err := parallel.WorkPool(w.maxConcurrentUploads, func(items chan<- func() error) {
for _, f := range files {
// Create a loop-local variable for capture in the lambda.
f := f
item := func() error {
return w.writeOne(ctx, f)
}
select {
case items <- item:
continue
case <-ctx.Done():
// terr is captured.
terr = ctx.Err()
break
}
}
})
if err != nil {
merr = append(merr, err)
}
if terr != nil {
merr = append(merr, err)
}
if len(merr) > 0 {
return errors.Annotate(merr, "writing dir %s to %s", srcDir, dstDir).Err()
}
return nil
}
func discoverFiles(srcDir string, dstDir gcgs.Path) ([]*file, errors.MultiError) {
var merr errors.MultiError
files := []*file{}
if err := filepath.Walk(srcDir, func(src string, info os.FileInfo, err error) error {
// Continue walking the directory tree on errors so that we upload as
// many files as possible.
if err != nil {
merr = append(merr, errors.Annotate(err, "list files to upload: %s", src).Err())
return nil
}
relPath, err := filepath.Rel(srcDir, src)
if err != nil {
merr = append(merr, errors.Annotate(err, "writing from %s to %s", src, dstDir).Err())
return nil
}
files = append(files, &file{
Src: src,
Dest: dstDir.Concat(relPath),
Info: info,
})
return nil
}); err != nil {
panic(fmt.Sprintf("Directory walk leaked error: %s", err))
}
return files, merr
}
func (w *DirWriter) writeOne(ctx context.Context, f *file) error {
err := f.Write(ctx, w.client)
for err != nil {
d := w.retryIterator.Next(ctx, err)
if d == retry.Stop {
break
}
logging.Warningf(ctx, "%s failed upload: %s. Will retry after %s", f.Src, err, d.String())
// This sleep implies that the worker goroutine trying to upload this
// file will block. Because we use parallel.WorkPool(), this means that
// one of the fix number of concurrent goroutines will be blocked.
//
// This is intentional: Most errors are due to transient service
// degradation in Google Storage. Blocking the worker goroutines ensures
// that our overall upload throughput is throttled in case of such
// transient errors.
time.Sleep(d)
err = f.Write(ctx, w.client)
}
return err
}
type file struct {
Src string
Dest gcgs.Path
Info os.FileInfo
}
func (f *file) Write(ctx context.Context, client gsClient) error {
if f.Info.IsDir() {
return nil
}
if skip, reason := shouldSkipUpload(f.Info); skip {
logging.Debugf(ctx, "Skipped %s because: %s.", f.Src, reason)
return nil
}
r, err := os.Open(f.Src)
if err != nil {
return errors.Annotate(err, "writing from %s to %s", f.Src, f.Dest).Err()
}
defer r.Close()
writer, err := client.NewWriter(f.Dest)
if err != nil {
return errors.Annotate(err, "writing from %s to %s", f.Src, f.Dest).Err()
}
// Ignore errors as we may have already closed writer by the time this runs.
defer writer.Close()
bs := make([]byte, f.Info.Size())
if _, err = r.Read(bs); err != nil {
return errors.Annotate(err, "writing from %s to %s", f.Src, f.Dest).Err()
}
n, err := writer.Write(bs)
if err != nil {
return errors.Annotate(err, "writing from %s to %s", f.Src, f.Dest).Err()
}
if int64(n) != f.Info.Size() {
return errors.Reason("length written to %s does not match source file size", f.Dest).Err()
}
err = writer.Close()
if err != nil {
return errors.Annotate(err, "writer for %s failed to close", f.Dest).Err()
}
return nil
}
// shouldSkipUpload determines if a particular file should be skipped.
//
// Also returns a reason for skipping the file.
func shouldSkipUpload(i os.FileInfo) (bool, string) {
if i.Mode()&os.ModeType == 0 {
return false, ""
}
switch {
case i.Mode()&os.ModeSymlink == os.ModeSymlink:
return true, "file is a symlink"
case i.Mode()&os.ModeDevice == os.ModeDevice:
return true, "file is a device"
case i.Mode()&os.ModeNamedPipe == os.ModeNamedPipe:
return true, "file is a named pipe"
case i.Mode()&os.ModeSocket == os.ModeSocket:
return true, "file is a unix domain socket"
case i.Mode()&os.ModeIrregular == os.ModeIrregular:
return true, "file is an irregular file of unknown type"
default:
return true, "file is a non-file of unknown type"
}
}
type concurrencySafeRetryIterator struct {
i retry.Iterator
m sync.Mutex
}
func (r *concurrencySafeRetryIterator) Next(ctx context.Context, err error) time.Duration {
r.m.Lock()
defer r.m.Unlock()
return r.i.Next(ctx, err)
}