blob: 4377ae1543e680eb92d042a6692dd15870acc1ca [file] [log] [blame]
package main
import (
"context"
"fmt"
"path/filepath"
"sync"
"time"
"cloud.google.com/go/storage"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
)
var (
filesDeleted = metric.NewCounter("backups/files_deleted",
"Number of files deleted from GCS",
&types.MetricMetadata{})
)
// deleteFromGS deletes each filename read from filenamesChan
func delFromGS(ctx context.Context, bucket *storage.BucketHandle, prefix string, filenamesChan <-chan string, workers int, errorChan chan<- error) <-chan struct{} {
doneChan := make(chan struct{})
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for filename := range filenamesChan {
select {
case _ = <-ctx.Done():
break
default:
}
objName := prefix + filepath.ToSlash(filename)
errorChan <- retry.Retry(ctx, transient.Only(retry.Default), func() error {
if err := bucket.Object(objName).Delete(ctx); err != nil {
if err == storage.ErrObjectNotExist {
return nil
}
return fmt.Errorf("Failed to delete object '%s': %v", objName, err)
}
filesDeleted.Add(ctx, 1)
return nil
}, func(err error, d time.Duration) {
logging.Warningf(ctx, "Transient error on GS delete. Retrying in %.1fs ...: %v", d.Seconds(), err)
})
}
}()
}
go func() {
wg.Wait()
logging.Debugf(ctx, "WaitGroup is finished")
doneChan <- struct{}{}
}()
return doneChan
}