blob: 7f62bfb4ec1f4d79eb01e94661dc6a62a57510b3 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package archivist
import (
"context"
"crypto/sha256"
"encoding/base64"
"io"
"regexp"
cl "cloud.google.com/go/logging"
"github.com/golang/protobuf/proto"
mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/gcloud"
"go.chromium.org/luci/common/gcloud/gs"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/common/tsmon/distribution"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
tsmon_types "go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/config"
logdog "go.chromium.org/luci/logdog/api/endpoints/coordinator/services/v1"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/archive"
"go.chromium.org/luci/logdog/common/storage"
"go.chromium.org/luci/logdog/common/types"
"go.chromium.org/luci/logdog/common/viewer"
)
const (
tsEntriesField = "entries"
tsIndexField = "index"
)
var logIDRe = regexp.MustCompile(`^[[:alnum:]._\-][[:alnum:]./_\-]{0,510}`)
// CLClient is a general interface for CloudLogging client and intended to enable
// unit tests to stub out CloudLogging.
type CLClient interface {
Close() error
Logger(logID string, opts ...cl.LoggerOption) *cl.Logger
Ping(context.Context) error
}
var (
// tsCount counts the raw number of archival tasks that this instance has
// processed, regardless of success/failure.
tsCount = metric.NewCounter("logdog/archivist/archive/count",
"The number of archival tasks processed.",
nil,
field.String("project"),
field.Bool("successful"))
// tsSize tracks the archive binary file size distribution of completed
// archives.
//
// The "archive" field is the specific type of archive (entries, index, data)
// that is being tracked.
//
// The "stream" field is the type of log stream that is being archived.
tsSize = metric.NewCumulativeDistribution("logdog/archivist/archive/size",
"The size (in bytes) of each archive file.",
&tsmon_types.MetricMetadata{Units: tsmon_types.Bytes},
distribution.DefaultBucketer,
field.String("project"),
field.String("archive"),
field.String("stream"))
// tsTotalBytes tracks the cumulative total number of bytes that have
// been archived by this instance.
//
// The "archive" field is the specific type of archive (entries, index, data)
// that is being tracked.
//
// The "stream" field is the type of log stream that is being archived.
tsTotalBytes = metric.NewCounter("logdog/archivist/archive/total_bytes",
"The total number of archived bytes.",
&tsmon_types.MetricMetadata{Units: tsmon_types.Bytes},
field.String("project"),
field.String("archive"),
field.String("stream"))
// tsLogEntries tracks the number of log entries per individual
// archival.
//
// The "stream" field is the type of log stream that is being archived.
tsLogEntries = metric.NewCumulativeDistribution("logdog/archivist/archive/log_entries",
"The total number of log entries per archive.",
nil,
distribution.DefaultBucketer,
field.String("project"),
field.String("stream"))
// tsTotalLogEntries tracks the total number of log entries that have
// been archived by this instance.
//
// The "stream" field is the type of log stream that is being archived.
tsTotalLogEntries = metric.NewCounter("logdog/archivist/archive/total_log_entries",
"The total number of log entries.",
nil,
field.String("project"),
field.String("stream"))
)
// Settings defines the archival parameters for a specific archival operation.
//
// In practice, this will be formed from service and project settings.
type Settings struct {
// GSBase is the base Google Storage path. This includes the bucket name
// and any associated path.
GSBase gs.Path
// GSStagingBase is the base Google Storage path for archive staging. This
// includes the bucket name and any associated path.
GSStagingBase gs.Path
// IndexStreamRange is the maximum number of stream indexes in between index
// entries. See archive.Manifest for more information.
IndexStreamRange int
// IndexPrefixRange is the maximum number of prefix indexes in between index
// entries. See archive.Manifest for more information.
IndexPrefixRange int
// IndexByteRange is the maximum number of stream data bytes in between index
// entries. See archive.Manifest for more information.
IndexByteRange int
// CloudLoggingProjectID is the ID of the Google Cloud Platform project to export
// logs to.
//
// May be empty, if no export is configured.
CloudLoggingProjectID string
// CloudLoggingBufferLimit is the maximum number of megabytes that the
// Cloud Logger will keep in memory per concurrent-task before flushing them
// out.
CloudLoggingBufferLimit int
}
// SettingsLoader returns archival Settings for a given project.
type SettingsLoader func(ctx context.Context, project string) (*Settings, error)
// Archivist is a stateless configuration capable of archiving individual log
// streams.
type Archivist struct {
// Service is the client to use to communicate with Coordinator's Services
// endpoint.
Service logdog.ServicesClient
// SettingsLoader loads archival settings for a specific project.
SettingsLoader SettingsLoader
// Storage is the archival source Storage instance.
Storage storage.Storage
// GSClientFactory obtains a Google Storage client for archive generation.
GSClientFactory func(ctx context.Context, luciProject string) (gs.Client, error)
// CLClientFactory obtains a Cloud Logging client for log exports.
// `luciProject` is the ID of the LUCI project to export logs from, and
// `clProject` is the ID of the Google Cloud project to export logs to.
CLClientFactory func(ctx context.Context, luciProject, clProject string, onError func(err error)) (CLClient, error)
}
// storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
// to during archival. This should be greater than the maximum LogEntry size.
const storageBufferSize = types.MaxLogEntryDataSize * 64
// ArchiveTask processes and executes a single log stream archive task.
//
// If the supplied Context is Done, operation may terminate before completion,
// returning the Context's error.
func (a *Archivist) ArchiveTask(ctx context.Context, task *logdog.ArchiveTask) error {
err := a.archiveTaskImpl(ctx, task)
failure := isFailure(err)
// Add a result metric.
tsCount.Add(ctx, 1, task.Project, !failure)
return err
}
// archiveTaskImpl performs the actual task archival.
//
// Its error return value is used to indicate how the archive failed. isFailure
// will be called to determine if the returned error value is a failure or a
// status error.
func (a *Archivist) archiveTaskImpl(ctx context.Context, task *logdog.ArchiveTask) error {
// Validate the project name.
if err := config.ValidateProjectName(task.Project); err != nil {
logging.WithError(err).Errorf(ctx, "invalid project name %q: %s", task.Project)
return nil
}
// Load archival settings for this project.
settings, err := a.loadSettings(ctx, task.Project)
switch {
case err == config.ErrNoConfig:
logging.WithError(err).Errorf(ctx, "The project config doesn't exist; discarding the task.")
return nil
case transient.Tag.In(err):
// If this is a transient error, exit immediately and do not delete the
// archival task.
logging.WithError(err).Warningf(ctx, "TRANSIENT error during loading the project config.")
return err
case err != nil:
// This project has bad or no archival settings, this is non-transient,
// discard the task.
logging.WithError(err).Errorf(ctx, "Failed to load settings for project.")
return nil
}
// Load the log stream's current state. If it is already archived, we will
// return an immediate success.
ls, err := a.Service.LoadStream(ctx, &logdog.LoadStreamRequest{
Project: task.Project,
Id: task.Id,
Desc: true,
})
switch {
case err != nil:
logging.WithError(err).Errorf(ctx, "Failed to load log stream.")
return err
case ls.State == nil:
logging.Errorf(ctx, "Log stream did not include state.")
return errors.New("log stream did not include state")
case ls.State.Purged:
logging.Warningf(ctx, "Log stream is purged. Discarding archival request.")
a.expungeStorage(ctx, task.Project, ls.Desc, ls.State.TerminalIndex)
return nil
case ls.State.Archived:
logging.Infof(ctx, "Log stream is already archived. Discarding archival request.")
a.expungeStorage(ctx, task.Project, ls.Desc, ls.State.TerminalIndex)
return nil
case ls.State.ProtoVersion != logpb.Version:
logging.Fields{
"protoVersion": ls.State.ProtoVersion,
"expectedVersion": logpb.Version,
}.Errorf(ctx, "Unsupported log stream protobuf version.")
return errors.New("unsupported log stream protobuf version")
case ls.Desc == nil:
logging.Errorf(ctx, "Log stream did not include a descriptor.")
return errors.New("log stream did not include a descriptor")
}
ar := logdog.ArchiveStreamRequest{
Project: task.Project,
Id: task.Id,
}
// Build our staged archival plan. This doesn't actually do any archiving.
staged, err := a.makeStagedArchival(ctx, task.Project, task.Realm, settings, ls)
if err != nil {
logging.WithError(err).Errorf(ctx, "Failed to create staged archival plan.")
return err
}
// TODO(crbug.com/1164124) - handle the error from clClient.Close()
defer staged.Close()
// Archive to staging.
//
// If a non-transient failure occurs here, we will report it to the Archivist
// under the assumption that it will continue occurring.
//
// We will handle error creating the plan and executing the plan in the same
// switch statement below.
switch err = staged.stage(); {
case transient.Tag.In(err):
// If this is a transient error, exit immediately and do not delete the
// archival task.
logging.WithError(err).Warningf(ctx, "TRANSIENT error during archival operation.")
return err
case err != nil:
// This is a non-transient error, so we are confident that any future
// Archival will also encounter this error. We will mark this archival
// as an error and report it to the Coordinator.
logging.WithError(err).Errorf(ctx, "Archival failed with non-transient error.")
ar.Error = err.Error()
if ar.Error == "" {
// This needs to be non-nil, so if our actual error has an empty string,
// fill in a generic message.
ar.Error = "archival error"
}
default:
// In case something fails, clean up our staged archival (best effort).
defer staged.cleanup()
// Finalize the archival.
if err := staged.finalize(&ar); err != nil {
logging.WithError(err).Errorf(ctx, "Failed to finalize archival.")
return err
}
// Add metrics for this successful archival.
streamType := staged.desc.StreamType.String()
staged.stream.addMetrics(ctx, task.Project, tsEntriesField, streamType)
staged.index.addMetrics(ctx, task.Project, tsIndexField, streamType)
tsLogEntries.Add(ctx, float64(staged.logEntryCount), task.Project, streamType)
tsTotalLogEntries.Add(ctx, staged.logEntryCount, task.Project, streamType)
}
if _, err := a.Service.ArchiveStream(ctx, &ar); err != nil {
logging.WithError(err).Errorf(ctx, "Failed to report archive state.")
return err
}
a.expungeStorage(ctx, task.Project, ls.Desc, ar.TerminalIndex)
return nil
}
// expungeStorage does a best-effort expunging of the intermediate storage
// (BigTable) rows after successful archival.
//
// `desc` is a binary-encoded LogStreamDescriptor
// `terminalIndex` should be the terminal index of the archived stream. If it's
//
// <0 (an empty stream) we skip the expunge.
func (a *Archivist) expungeStorage(ctx context.Context, project string, desc []byte, terminalIndex int64) {
if terminalIndex < 0 {
// no log rows
return
}
if desc == nil {
logging.Warningf(ctx, "expungeStorage: nil desc")
return
}
var lsd logpb.LogStreamDescriptor
if err := proto.Unmarshal(desc, &lsd); err != nil {
logging.WithError(err).Warningf(ctx, "expungeStorage: decoding desc")
return
}
err := a.Storage.Expunge(ctx, storage.ExpungeRequest{
Path: lsd.Path(),
Project: project,
})
if err != nil {
logging.WithError(err).Warningf(ctx, "expungeStorage: failed")
}
}
// loadSettings loads and validates archival settings.
func (a *Archivist) loadSettings(ctx context.Context, project string) (*Settings, error) {
if a.SettingsLoader == nil {
panic("no settings loader configured")
}
st, err := a.SettingsLoader(ctx, project)
switch {
case err != nil:
return nil, err
case st.GSBase.Bucket() == "":
logging.Fields{
logging.ErrorKey: err,
"gsBase": st.GSBase,
}.Errorf(ctx, "Invalid storage base.")
return nil, errors.New("invalid storage base")
case st.GSStagingBase.Bucket() == "":
logging.Fields{
logging.ErrorKey: err,
"gsStagingBase": st.GSStagingBase,
}.Errorf(ctx, "Invalid storage staging base.")
return nil, errors.New("invalid storage staging base")
default:
return st, nil
}
}
func (a *Archivist) makeStagedArchival(ctx context.Context, project string, realm string,
st *Settings, ls *logdog.LoadStreamResponse) (*stagedArchival, error) {
gsClient, err := a.GSClientFactory(ctx, project)
if err != nil {
logging.Fields{
logging.ErrorKey: err,
"protoVersion": ls.State.ProtoVersion,
}.Errorf(ctx, "Failed to obtain GSClient.")
return nil, err
}
sa := stagedArchival{
Archivist: a,
Settings: st,
ctx: ctx,
project: project,
realm: realm,
gsclient: gsClient,
terminalIndex: types.MessageIndex(ls.State.TerminalIndex),
}
// Deserialize and validate the descriptor protobuf. If this fails, it is a
// non-transient error.
if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
logging.Fields{
logging.ErrorKey: err,
"protoVersion": ls.State.ProtoVersion,
}.Errorf(ctx, "Failed to unmarshal descriptor protobuf.")
return nil, err
}
sa.path = sa.desc.Path()
// Construct staged archival paths sa.stream and sa.index. The path length
// must not exceed 1024 bytes, it is GCS limit.
if err = sa.makeStagingPaths(1024); err != nil {
return nil, err
}
// Construct a CloudLogging client, if the config is set and the input
// stream type is TEXT.
if st.CloudLoggingProjectID != "" && sa.desc.StreamType == logpb.StreamType_TEXT {
// Validate the project ID, and ping the project to verify the auth.
if err = gcloud.ValidateProjectID(st.CloudLoggingProjectID); err != nil {
return nil, errors.Annotate(err, "CloudLoggingProjectID %q", st.CloudLoggingProjectID).Err()
}
onError := func(err error) {
logging.Fields{
"luciProject": project,
"cloudProject": st.CloudLoggingProjectID,
"path": sa.path,
}.Errorf(ctx, "archiving log to Cloud Logging: %v", err)
}
clc, err := a.CLClientFactory(ctx, project, st.CloudLoggingProjectID, onError)
if err != nil {
logging.Fields{
logging.ErrorKey: err,
"protoVersion": ls.State.ProtoVersion,
}.Errorf(ctx, "Failed to obtain CloudLogging client.")
return nil, err
}
if err = clc.Ping(ctx); err != nil {
return nil, errors.Annotate(
err, "failed to ping CloudProject %q for Cloud Logging export",
st.CloudLoggingProjectID).Err()
}
sa.clclient = clc
}
return &sa, nil
}
type stagedArchival struct {
*Archivist
*Settings
ctx context.Context
project string
realm string
path types.StreamPath
desc logpb.LogStreamDescriptor
stream stagingPaths
index stagingPaths
terminalIndex types.MessageIndex
logEntryCount int64
gsclient gs.Client
clclient CLClient
}
func base64Hash(p types.StreamName) string {
hasher := sha256.New()
hasher.Write([]byte(p))
return base64.RawURLEncoding.EncodeToString(hasher.Sum(nil))
}
// makeStagingPaths populates `staged` and `final` fields in sa.stream and
// sa.index.
//
// It prefixes the staging GCS paths with a hash of stream's Logdog prefix to
// make sure we spread the load across GCS namespace to avoid hotspotting its
// metadata server.
//
// These paths may be shared between projects. To enforce an absence of
// conflicts, we will insert the project name as part of the path.
func (sa *stagedArchival) makeStagingPaths(maxGSFilenameLength int) error {
// "<prefix>/+/<name>" => (<prefix>, <name>).
prefix, name := sa.path.Split()
if name == "" {
return errors.Reason("got prefix-only path %q, don't know how to stage it", sa.path).Err()
}
// base64 encoded SHA256 hash of the prefix.
prefixHash := "p/" + base64Hash(prefix)
// GCS paths we need to generate are:
// <GSStagingBase>/<project>/<prefixHash>/+/<name>/logstream.entries
// <GSStagingBase>/<project>/<prefixHash>/+/<name>/logstream.index
// <GSBase>/<project>/<prefix>/+/<name>/logstream.entries
// <GSBase>/<project>/<prefix>/+/<name>/logstream.index
//
// Each path length must be less than maxGSFilenameLength bytes. And we want
// <name> component in all paths to be identical. If some path doesn't fit
// the limit, we replace <name> with "<name-prefix>-TRUNCATED-<hash>"
// everywhere, making it fit the limit.
// Note: len("logstream.entries") > len("logstream.index"), use it for max len.
maxStagingLen := len(sa.GSStagingBase.Concat(sa.project, prefixHash, "+", string(name), "logstream.entries").Filename())
maxFinalLen := len(sa.GSBase.Concat(sa.project, string(prefix), "+", string(name), "logstream.entries").Filename())
// See if we need to truncate <name> to fit GCS paths into limits.
//
// The sa.path is user-provided and is unlimited. It is known to be large
// enough to exceed max ID length (https://crbug.com/1138017).
// So, truncate it if needed while avoiding overwrites by using crypto hash.
maxPathLen := maxStagingLen
if maxFinalLen > maxStagingLen {
maxPathLen = maxFinalLen
}
if bytesToCut := maxPathLen - maxGSFilenameLength; bytesToCut > 0 {
nameSuffix := types.StreamName("-TRUNCATED-" + base64Hash(name)[:16])
// Replace last len(nameSuffix)+bytesToCut bytes with nameSuffix. It will
// reduce the overall name size by `bytesToCut` bytes, as we need.
if len(nameSuffix)+bytesToCut > len(name) {
// There's no enough space even to fit nameSuffix. The prefix is too
// huge. This should be rare, abort.
return errors.Reason("can't stage %q of project %q, prefix is too long", sa.path, sa.project).Err()
}
name = name[:len(name)-len(nameSuffix)-bytesToCut] + nameSuffix
}
// Everything should fit into the limits now.
nameMap := map[string]*stagingPaths{
"logstream.entries": &sa.stream,
"logstream.index": &sa.index,
}
for file, spaths := range nameMap {
spaths.staged = sa.GSStagingBase.Concat(sa.project, prefixHash, "+", string(name), file)
spaths.final = sa.GSBase.Concat(sa.project, string(prefix), "+", string(name), file)
}
return nil
}
// stage executes the archival process, archiving to the staged storage paths.
//
// If stage fails, it may return a transient error.
func (sa *stagedArchival) stage() (err error) {
// Group any transient errors that occur during cleanup. If we aren't
// returning a non-transient error, return a transient "terr".
var terr errors.MultiError
defer func() {
if err == nil && len(terr) > 0 {
logging.Errorf(sa.ctx, "Encountered transient errors: %s", terr)
err = transient.Tag.Apply(terr)
}
}()
// Close our writers on exit. If any of them fail to close, mark the archival
// as a transient failure.
closeWriter := func(closer io.Closer, path gs.Path) {
// Close the Writer. If this results in an error, append it to our transient
// error MultiError.
if ierr := closer.Close(); ierr != nil {
logging.Warningf(sa.ctx, "Error closing writer to %s: %s", path, ierr)
terr = append(terr, ierr)
}
// If we have an archival error, also delete the path associated with this
// stream. This is a non-fatal failure, since we've already hit a fatal
// one.
if err != nil || len(terr) > 0 {
logging.Warningf(sa.ctx, "Cleaning up %s after error", path)
if ierr := sa.gsclient.Delete(path); ierr != nil {
logging.Fields{
logging.ErrorKey: ierr,
"path": path,
}.Warningf(sa.ctx, "Failed to delete stream on error.")
}
}
}
// createWriter is a shorthand function for creating a writer to a path and
// reporting an error if it failed.
createWriter := func(p gs.Path) (gs.Writer, error) {
w, ierr := sa.gsclient.NewWriter(p)
if ierr != nil {
logging.Fields{
logging.ErrorKey: ierr,
"path": p,
}.Errorf(sa.ctx, "Failed to create writer.")
return nil, ierr
}
return w, nil
}
var streamWriter, indexWriter gs.Writer
if streamWriter, err = createWriter(sa.stream.staged); err != nil {
return err
}
defer closeWriter(streamWriter, sa.stream.staged)
if indexWriter, err = createWriter(sa.index.staged); err != nil {
return err
}
defer closeWriter(indexWriter, sa.index.staged)
// Read our log entries from intermediate storage.
ss := storageSource{
Context: sa.ctx,
st: sa.Storage,
project: sa.project,
path: sa.path,
terminalIndex: sa.terminalIndex,
lastIndex: -1,
}
m := archive.Manifest{
LUCIProject: sa.project,
Desc: &sa.desc,
Source: &ss,
LogWriter: streamWriter,
IndexWriter: indexWriter,
StreamIndexRange: sa.IndexStreamRange,
PrefixIndexRange: sa.IndexPrefixRange,
ByteRange: sa.IndexByteRange,
Logger: logging.Get(sa.ctx),
}
if sa.clclient != nil {
logID := "luci-logs"
tags := sa.desc.GetTags()
if tags == nil {
tags = map[string]string{}
}
if sa.realm != "" {
tags["realm"] = sa.realm
}
// bbagent adds viewer.LogDogViewerURLTag to log streams for
// "back to build" link in UI
//
// This URL isn't useful in Cloud Logging UI, and doesn't add any value
// to search capabilities. So, remove it.
delete(tags, viewer.LogDogViewerURLTag)
switch val, ok := tags["luci.CloudLogExportID"]; {
case !ok, len(val) == 0: // skip
// len(LogID) must be < 512, and allows ./_- and alphanumerics.
// If CloudLogExportID is too long or contains unsupported chars, fall back to
// the default LogID.
case len(val) > 511:
logging.Errorf(sa.ctx, "CloudLogExportID: too long - %d", len(val))
case !logIDRe.MatchString(val):
logging.Errorf(sa.ctx, "CloudLogExportID(%s): does not match %s", val, logIDRe)
default:
logID = val
}
m.CloudLogger = sa.clclient.Logger(
logID,
cl.CommonLabels(tags),
cl.CommonResource(&mrpb.MonitoredResource{
Type: "generic_task",
Labels: map[string]string{
"project_id": sa.project,
"location": sa.desc.GetName(),
"namespace": sa.desc.GetPrefix(),
"job": "cloud-logging-export",
},
}),
cl.BufferedByteLimit(sa.CloudLoggingBufferLimit*1024*1024),
)
}
if err = archive.Archive(m); err != nil {
logging.WithError(err).Errorf(sa.ctx, "Failed to archive log stream.")
return err
}
if ss.logEntryCount == 0 {
// If our last log index was <0, then no logs were archived.
logging.Warningf(sa.ctx, "No log entries were archived.")
}
// Update our state with archival results.
sa.terminalIndex = ss.lastIndex
sa.logEntryCount = ss.logEntryCount
sa.stream.bytesWritten = streamWriter.Count()
sa.index.bytesWritten = indexWriter.Count()
return nil
}
type stagingPaths struct {
staged gs.Path
final gs.Path
bytesWritten int64
}
func (d *stagingPaths) clearStaged() { d.staged = "" }
func (d *stagingPaths) enabled() bool { return d.final != "" }
func (d *stagingPaths) addMetrics(ctx context.Context, projectField, archiveField, streamField string) {
tsSize.Add(ctx, float64(d.bytesWritten), projectField, archiveField, streamField)
tsTotalBytes.Add(ctx, d.bytesWritten, projectField, archiveField, streamField)
}
func (sa *stagedArchival) finalize(ar *logdog.ArchiveStreamRequest) error {
err := parallel.FanOutIn(func(taskC chan<- func() error) {
for _, d := range sa.getStagingPaths() {
d := d
// Don't finalize zero-sized streams.
if !d.enabled() || d.bytesWritten == 0 {
continue
}
taskC <- func() error {
if err := sa.gsclient.Rename(d.staged, d.final); err != nil {
logging.Fields{
logging.ErrorKey: err,
"stagedPath": d.staged,
"finalPath": d.final,
}.Errorf(sa.ctx, "Failed to rename GS object.")
return err
}
// Clear the staged value to indicate that it no longer exists.
d.clearStaged()
return nil
}
}
})
if err != nil {
return err
}
ar.TerminalIndex = int64(sa.terminalIndex)
ar.LogEntryCount = sa.logEntryCount
ar.StreamUrl = string(sa.stream.final)
ar.StreamSize = sa.stream.bytesWritten
ar.IndexUrl = string(sa.index.final)
ar.IndexSize = sa.index.bytesWritten
return nil
}
func (sa *stagedArchival) Close() error {
var clErr error
if sa.clclient != nil {
clErr = errors.Annotate(sa.clclient.Close(),
"while closing CloudLogging client for (%s/%s/+/%s)",
sa.project, sa.desc.GetPrefix(), sa.desc.GetName()).Err()
}
return errors.Flatten(errors.MultiError{sa.gsclient.Close(), clErr})
}
func (sa *stagedArchival) cleanup() {
for _, d := range sa.getStagingPaths() {
if d.staged == "" {
continue
}
logging.Warningf(sa.ctx, "Cleaning up staged path %s", d.staged)
if err := sa.gsclient.Delete(d.staged); err != nil {
logging.Fields{
logging.ErrorKey: err,
"path": d.staged,
}.Warningf(sa.ctx, "Failed to clean up staged path.")
}
d.clearStaged()
}
}
func (sa *stagedArchival) getStagingPaths() []*stagingPaths {
return []*stagingPaths{
&sa.stream,
&sa.index,
}
}
// statusErrorWrapper is an error wrapper. It is detected by IsFailure and used to
// determine whether the supplied error represents a failure or just a status
// error.
type statusErrorWrapper struct {
inner error
}
var _ interface {
error
errors.Wrapped
} = (*statusErrorWrapper)(nil)
func (e *statusErrorWrapper) Error() string {
if e.inner != nil {
return e.inner.Error()
}
return ""
}
func (e *statusErrorWrapper) Unwrap() error {
return e.inner
}
func isFailure(err error) bool {
if err == nil {
return false
}
_, ok := err.(*statusErrorWrapper)
return !ok
}