blob: 2f07003dc4c382312dd4616a22bd58b714524e19 [file] [log] [blame]
package main
import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
"cloud.google.com/go/storage"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
)
var (
bytesBackedUp = metric.NewCounter("backups/bytes_backed_up",
"Cumulative size of files backed up",
&types.MetricMetadata{Units: types.Bytes})
filesBackedUp = metric.NewCounter("backups/files_backed_up",
"Number of files actually backed up",
&types.MetricMetadata{})
bytesStored = metric.NewCounter("backups/bytes_stored",
"Cumulative size of objects stored in GCS",
&types.MetricMetadata{Units: types.Bytes})
)
// backupToGS backs up each filename read from filenameChan to GCS
// it spawns worker goroutines and returns a channel that will be
// closed when all workers have finished.
func backupToGS(
ctx context.Context,
filenameChan <-chan string,
bucket *storage.BucketHandle,
prefix string,
key []byte,
workers int,
errorChan chan<- error) <-chan struct{} {
doneChan := make(chan struct{})
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for filename := range filenameChan {
select {
case _ = <-ctx.Done():
break
default:
}
objName := prefix + filepath.ToSlash(filename)
written, err := writeToGS(ctx, filename, bucket, objName, key)
if err != nil {
if os.IsNotExist(err) { // Non-existent files are tolerable
logging.Warningf(ctx, "File intended for backup was not found: %s", filename)
continue
}
errorChan <- fmt.Errorf("Failed to copy file '%s' to GCS: %v", filename, err)
continue
}
filesBackedUp.Add(ctx, 1)
bytesBackedUp.Add(ctx, written)
}
}()
}
go func() {
wg.Wait()
logging.Debugf(ctx, "WaitGroup is finished")
doneChan <- struct{}{}
}()
return doneChan
}
func writeToGS(ctx context.Context, filename string, bucket *storage.BucketHandle, objname string, key []byte) (int64, error) {
obj := bucket.Object(objname)
if key != nil {
obj = obj.Key(key)
}
// TODO set metadata on obj
var written int64
err := retry.Retry(ctx, transient.Only(retry.Default), func() (err error) {
gsWriter := obj.NewWriter(ctx)
defer func() {
if errGs := gsWriter.Close(); errGs != nil {
err = transient.Tag.Apply(fmt.Errorf("Failed to close gcsWriter: %v", errGs))
return
}
bytesStored.Add(ctx, gsWriter.Attrs().Size)
}()
zipWriter := gzip.NewWriter(gsWriter)
defer func() {
if errZip := zipWriter.Close(); errZip != nil {
err = fmt.Errorf("Failed to close gzipWriter: %v", errZip)
}
}()
f, err := os.Open(filename)
if err != nil {
logging.Warningf(ctx, "Failed to open file '%s': %v", filename, err)
return err
}
defer func() {
if errClose := f.Close(); errClose != nil {
err = errClose
}
}()
if written, err = io.Copy(zipWriter, f); err != nil {
// FIXME determine if error really is transient
return transient.Tag.Apply(fmt.Errorf("Failed to backup file '%s': %v", filename, err))
}
return nil
}, func(err error, d time.Duration) {
logging.Warningf(ctx, "Transient error on GS write. Retrying in %.1fs ...: %v", d.Seconds(), err)
})
return written, err
}