blob: 811aaf0fa19ffbe599b39d44cfd89f8dfc783321 [file] [log] [blame]
// Copyright 2021 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package storage
import (
"context"
"errors"
"fmt"
"io/fs"
"log"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"google.golang.org/api/googleapi"
)
const maxConcurrentUploads = 10
// For testing
type GSClientInterface interface {
Upload(ctx context.Context, localFolder string, gsUrl string) error
Close()
}
type GSClient struct {
client StorageClientInterface
}
// Storage metadata for local file
type LocalObject struct {
FullPath string
RelPath string
}
// Storage metadata for remote file
type GSObject struct {
Bucket string
Object string
}
// Extend adds addendum to the end of object for a GSObject
func (o GSObject) Extend(addendum string) GSObject {
return GSObject{
Bucket: o.Bucket,
Object: path.Join(o.Object, addendum),
}
}
func NewGSClient(ctx context.Context, credentialsFile string) (*GSClient, error) {
var client *StorageClient
var err error
if credentialsFile != "" {
client, err = NewStorageClientWithCredsFile(ctx, credentialsFile)
if err != nil {
return nil, err
}
} else {
client, err = NewStorageClientWithDefaultAccount(ctx)
if err != nil {
return nil, err
}
}
return &GSClient{
client: client,
}, nil
}
func NewGSTestClient(client StorageClientInterface) *GSClient {
return &GSClient{
client: client,
}
}
func (c *GSClient) Close() {
c.client.Close()
}
// Upload uploads localFolder (which may be a file) to designated gsURL
// uploads here are done in parallel
func (c *GSClient) Upload(ctx context.Context, localFolder string, gsUrl string) error {
log.Printf("Starting upload of %s to %s", localFolder, gsUrl)
rootObject, err := c.parseGSURL(gsUrl)
if err != nil {
return fmt.Errorf("unable to parse gs url, %w", err)
}
log.Printf("Parsed URL: %s", rootObject)
allFiles, err := c.getAllFilesInFolderRecursively(localFolder)
if err != nil {
return fmt.Errorf("unable to get local files, %w", err)
}
log.Printf("Uploading files (max %d workers): %s", maxConcurrentUploads, allFiles)
uploadOne := func(currentFile LocalObject) error {
currentObject := rootObject.Extend(currentFile.RelPath)
if err := c.client.Write(ctx, currentFile.FullPath, currentObject); err != nil {
decoratedError := fmt.Errorf("failed upload for file %s to %s, %w", currentFile, gsUrl, err)
if isTransientError(decoratedError) {
return transient.Tag.Apply(decoratedError)
}
return decoratedError
}
return nil
}
uploadOneWithRetries := func(currentFile LocalObject) error {
return retry.Retry(ctx, transient.Only(retry.Default), func() error {
return uploadOne(currentFile)
}, nil)
}
// timeout error
var terr error
err = parallel.WorkPool(maxConcurrentUploads, func(items chan<- func() error) {
for _, aFile := range allFiles {
// Create a loop-local variable for capture in the lambda.
f := aFile
item := func() error {
return uploadOneWithRetries(f)
}
// Check the context timeout when adding files to the stack.
select {
case <-ctx.Done():
terr = ctx.Err()
return
default:
items <- item
}
}
})
if terr != nil {
return terr
}
return err
}
// DownloadFile downloads a file from a designated gsURL to a given
// path on the local file system. If the bucket does not exist,
// returns ErrBucketNotExist. If the object does not exist,
// returns ErrObjectNotExist.
func (c *GSClient) DownloadFile(ctx context.Context, gsUrl, destLocalPath string) error {
log.Printf("Starting download of %s to %s", gsUrl, destLocalPath)
object, err := c.parseGSURL(gsUrl)
if err != nil {
return fmt.Errorf("unable to parse gs url, %w", err)
}
log.Printf("Parsed URL: %s", object)
err = c.client.Read(ctx, object, destLocalPath)
if err == ErrBucketNotExist {
return ErrBucketNotExist
}
if err == ErrObjectNotExist {
return ErrObjectNotExist
}
if err != nil {
return fmt.Errorf("reading file: %w", err)
}
return nil
}
// parseGSURL retrieves the bucket and object from a GS URL.
// URL expectation is of the form: "gs://bucket/object"
// This method does not exists in the GS client, so creating bespoke.
func (c *GSClient) parseGSURL(gsUrl string) (GSObject, error) {
if !strings.HasPrefix(gsUrl, "gs://") {
return GSObject{}, fmt.Errorf("gs url must begin with 'gs://', instead have, %s", gsUrl)
}
u, err := url.Parse(gsUrl)
if err != nil {
return GSObject{}, fmt.Errorf("unable to parse url, %w", err)
}
// Host corresponds to bucket
// Path corresponds to object (though we need to remove prepending '/')
return GSObject{
Bucket: u.Host,
Object: cleanObjectPath(u.Path),
}, nil
}
// cleanObjectPath removes the first slash in a path if any. The built-in path
// framework concerns itself with removing trailing only, and as such we do so
// bespoke.
func cleanObjectPath(objectPath string) string {
if strings.HasPrefix(objectPath, "/") {
return objectPath[1:]
}
return objectPath
}
// getAllFilesInFolderRecursively retrieves all files within a current folder.
// If a file is input instead, just return the file itself.
func (c *GSClient) getAllFilesInFolderRecursively(localFolder string) ([]LocalObject, error) {
fi, err := os.Stat(localFolder)
if err != nil {
return nil, fmt.Errorf("could not find file %w", err)
}
if !fi.IsDir() && fi.Mode()&fs.ModeSymlink == 0 {
return []LocalObject{{
FullPath: localFolder,
RelPath: localFolder[len(path.Dir(localFolder))+1:],
}}, nil
}
var files []LocalObject
err = filepath.Walk(localFolder, func(currPath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && info.Mode()&fs.ModeSymlink == 0 {
files = append(files, LocalObject{
FullPath: currPath,
RelPath: currPath[len(path.Clean(localFolder))+1:],
})
}
return nil
})
return files, err
}
// isTransientError check if an error is transient, aka needs retry.
func isTransientError(e error) bool {
lastErr := e
for {
unwrapped := errors.Unwrap(lastErr)
if unwrapped == nil {
break
}
lastErr = unwrapped
}
if gerr, _ := lastErr.(*googleapi.Error); gerr != nil {
if gerr.Code >= 500 || gerr.Code == 429 {
return true
}
}
return false
}