blob: fa89dd98e6da961f4847a42f73362516bbae1198 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gs
import (
"context"
"fmt"
"io"
"net/http"
"time"
"go.chromium.org/luci/common/errors"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
gs "cloud.google.com/go/storage"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
)
var (
// ReadWriteScopes is the set of scopes needed for read/write Google Storage
// access.
ReadWriteScopes = []string{gs.ScopeReadWrite}
// ReadOnlyScopes is the set of scopes needed for read/write Google Storage
// read-only access.
ReadOnlyScopes = []string{gs.ScopeReadOnly}
)
// Client abstracts functionality to connect with and use Google Storage from
// the actual Google Storage client.
//
// Non-production implementations are used primarily for testing.
type Client interface {
io.Closer
// Attrs retrieves Object attributes for a given path.
Attrs(p Path) (*gs.ObjectAttrs, error)
// NewReader instantiates a new Reader instance for the named bucket/path.
//
// The supplied offset must be >= 0, or else this function will panic.
//
// If the supplied length is <0, no upper byte bound will be set.
NewReader(p Path, offset, length int64) (io.ReadCloser, error)
// NewWriter instantiates a new Writer instance for the named bucket/path.
NewWriter(p Path) (Writer, error)
// Delete deletes the object at the specified path.
//
// If the object does not exist, it is considered a success.
Delete(p Path) error
// Rename renames an object from one path to another.
//
// NOTE: The object should be removed from its original path, but current
// implementation uses two operations (Copy + Delete), so it may
// occasionally fail.
Rename(src, dst Path) error
}
// prodGSObject is an implementation of Client interface using the production
// Google Storage client.
type prodClient struct {
context.Context
// rt is the RoundTripper to use, or nil for the cloud service default.
rt http.RoundTripper
// baseClient is a basic Google Storage client instance. It is used for
// operations that don't need custom header injections.
baseClient *gs.Client
}
// NewProdClient creates a new Client instance that uses production Cloud
// Storage.
//
// The supplied RoundTripper will be used to make connections. If nil, the
// default HTTP client will be used.
func NewProdClient(ctx context.Context, rt http.RoundTripper) (Client, error) {
c := prodClient{
Context: ctx,
rt: rt,
}
var err error
c.baseClient, err = c.newClient()
if err != nil {
return nil, err
}
return &c, nil
}
func (c *prodClient) Close() error {
return c.baseClient.Close()
}
func (c *prodClient) NewWriter(p Path) (Writer, error) {
bucket, filename, err := splitPathErr(p)
if err != nil {
return nil, err
}
return &prodWriter{
Context: c,
client: c,
bucket: bucket,
relpath: filename,
}, nil
}
func (c *prodClient) Attrs(p Path) (*gs.ObjectAttrs, error) {
obj, err := c.handleForPath(p)
if err != nil {
return nil, err
}
return obj.Attrs(c)
}
func (c *prodClient) NewReader(p Path, offset, length int64) (io.ReadCloser, error) {
if offset < 0 {
panic(fmt.Errorf("offset (%d) must be >= 0", offset))
}
obj, err := c.handleForPath(p)
if err != nil {
return nil, err
}
return obj.NewRangeReader(c, offset, length)
}
func (c *prodClient) Rename(src, dst Path) error {
srcObj, err := c.handleForPath(src)
if err != nil {
return fmt.Errorf("invalid source path: %s", err)
}
dstObj, err := c.handleForPath(dst)
if err != nil {
return fmt.Errorf("invalid destination path: %s", err)
}
// First stage: CopyTo
err = retry.Retry(c, transient.Only(retry.Default), func() error {
if _, err := dstObj.CopierFrom(srcObj).Run(c); err != nil {
// The storage library doesn't return gs.ErrObjectNotExist when Delete
// returns a 404. Catch that explicitly.
// 403 errors are non-transient.
if isNotFoundError(err) || isForbidden(err) {
return err
}
// Assume all unexpected errors are transient.
return transient.Tag.Apply(err)
}
return nil
}, func(err error, d time.Duration) {
log.Fields{
log.ErrorKey: err,
"delay": d,
"src": src,
"dst": dst,
}.Warningf(c, "Transient error copying GS file. Retrying...")
})
if err != nil {
return err
}
// Second stage: Delete. This is not fatal.
if err := c.deleteObject(srcObj); err != nil {
log.Fields{
log.ErrorKey: err,
"path": src,
}.Warningf(c, "(Non-fatal) Failed to delete source during rename.")
}
return nil
}
func (c *prodClient) Delete(p Path) error {
dstObj, err := c.handleForPath(p)
if err != nil {
return fmt.Errorf("invalid path: %s", err)
}
return c.deleteObject(dstObj)
}
func (c *prodClient) deleteObject(o *gs.ObjectHandle) error {
return retry.Retry(c, transient.Only(retry.Default), func() error {
if err := o.Delete(c); err != nil {
// The storage library doesn't return gs.ErrObjectNotExist when Delete
// returns a 404. Catch that explicitly.
if isNotFoundError(err) {
// If the file wasn't found, then the delete "succeeded".
return nil
}
// 403 errors are non-transient.
if isForbidden(err) {
return err
}
// Assume all unexpected errors are transient.
return transient.Tag.Apply(err)
}
return nil
}, func(err error, d time.Duration) {
log.Fields{
log.ErrorKey: err,
"delay": d,
}.Warningf(c, "Transient error deleting file. Retrying...")
})
}
func (c *prodClient) newClient() (*gs.Client, error) {
var optsArray [1]option.ClientOption
opts := optsArray[:0]
if c.rt != nil {
opts = append(opts, option.WithHTTPClient(&http.Client{
Transport: c.rt,
}))
}
return gs.NewClient(c, opts...)
}
func (c *prodClient) handleForPath(p Path) (*gs.ObjectHandle, error) {
bucket, filename, err := splitPathErr(p)
if err != nil {
return nil, err
}
return c.baseClient.Bucket(bucket).Object(filename), nil
}
func splitPathErr(p Path) (bucket, filename string, err error) {
bucket, filename = p.Split()
switch {
case bucket == "":
err = errors.New("path has no bucket")
case filename == "":
err = errors.New("path has no filename")
}
return
}
func isForbidden(err error) bool {
if t, ok := err.(*googleapi.Error); ok {
switch t.Code {
case http.StatusForbidden:
return true
}
}
return false
}
func isNotFoundError(err error) bool {
if err == gs.ErrObjectNotExist {
return true
}
// The storage library doesn't return gs.ErrObjectNotExist when Delete
// returns a 404. Catch that explicitly.
if t, ok := err.(*googleapi.Error); ok {
switch t.Code {
case http.StatusNotFound:
return true
}
}
return false
}