blob: 2c29eb79d411bbbd62f5507406883b933bbe3624 [file]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package archivist
import (
"bytes"
"encoding/hex"
"fmt"
"io"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/google"
"github.com/luci/luci-go/common/retry/transient"
"github.com/luci/luci-go/common/sync/parallel"
"github.com/luci/luci-go/common/tsmon/distribution"
"github.com/luci/luci-go/common/tsmon/field"
"github.com/luci/luci-go/common/tsmon/metric"
tsmon_types "github.com/luci/luci-go/common/tsmon/types"
"github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
"github.com/luci/luci-go/logdog/api/logpb"
"github.com/luci/luci-go/logdog/common/archive"
"github.com/luci/luci-go/logdog/common/storage"
"github.com/luci/luci-go/logdog/common/types"
"github.com/luci/luci-go/luci_config/common/cfgtypes"
)
const (
tsEntriesField = "entries"
tsIndexField = "index"
tsDataField = "data"
// If the archive dispatch is within this range of the current time, we will
// avoid archival.
dispatchThreshold = 5 * time.Minute
)
var (
// tsCount counts the raw number of archival tasks that this instance has
// processed, regardless of success/failure.
tsCount = metric.NewCounter("logdog/archivist/archive/count",
"The number of archival tasks processed.",
nil,
field.Bool("successful"))
// tsSize tracks the archive binary file size distribution of completed
// archives.
//
// The "archive" field is the specific type of archive (entries, index, data)
// that is being tracked.
//
// The "stream" field is the type of log stream that is being archived.
tsSize = metric.NewCumulativeDistribution("logdog/archivist/archive/size",
"The size (in bytes) of each archive file.",
&tsmon_types.MetricMetadata{Units: tsmon_types.Bytes},
distribution.DefaultBucketer,
field.String("archive"),
field.String("stream"))
// tsTotalBytes tracks the cumulative total number of bytes that have
// been archived by this instance.
//
// The "archive" field is the specific type of archive (entries, index, data)
// that is being tracked.
//
// The "stream" field is the type of log stream that is being archived.
tsTotalBytes = metric.NewCounter("logdog/archivist/archive/total_bytes",
"The total number of archived bytes.",
&tsmon_types.MetricMetadata{Units: tsmon_types.Bytes},
field.String("archive"),
field.String("stream"))
// tsLogEntries tracks the number of log entries per individual
// archival.
//
// The "stream" field is the type of log stream that is being archived.
tsLogEntries = metric.NewCumulativeDistribution("logdog/archivist/archive/log_entries",
"The total number of log entries per archive.",
nil,
distribution.DefaultBucketer,
field.String("stream"))
// tsTotalLogEntries tracks the total number of log entries that have
// been archived by this instance.
//
// The "stream" field is the type of log stream that is being archived.
tsTotalLogEntries = metric.NewCounter("logdog/archivist/archive/total_log_entries",
"The total number of log entries.",
nil,
field.String("stream"))
)
// Task is a single archive task.
type Task interface {
// UniqueID returns a task-unique value. Other tasks, and other retries of
// this task, should (try to) not reuse this ID.
UniqueID() string
// Task is the archive task to execute.
Task() *logdog.ArchiveTask
// Consume marks that this task's processing is complete and that it should be
// consumed. This may be called multiple times with no additional effect.
Consume()
// AssertLease asserts that the lease for this Task is still held.
//
// On failure, it will return an error. If successful, the Archivist may
// assume that it holds the lease longer.
AssertLease(context.Context) error
}
// Settings defines the archival parameters for a specific archival operation.
//
// In practice, this will be formed from service and project settings.
type Settings struct {
// GSBase is the base Google Storage path. This includes the bucket name
// and any associated path.
//
// This must be unique to this archival project. In practice, it will be
// composed of the project's archival bucket and project ID.
GSBase gs.Path
// GSStagingBase is the base Google Storage path for archive staging. This
// includes the bucket name and any associated path.
//
// This must be unique to this archival project. In practice, it will be
// composed of the project's staging archival bucket and project ID.
GSStagingBase gs.Path
// AlwaysRender, if true, means that a binary should be archived
// regardless of whether a specific binary file extension has been supplied
// with the log stream.
AlwaysRender bool
// IndexStreamRange is the maximum number of stream indexes in between index
// entries. See archive.Manifest for more information.
IndexStreamRange int
// IndexPrefixRange is the maximum number of prefix indexes in between index
// entries. See archive.Manifest for more information.
IndexPrefixRange int
// IndexByteRange is the maximum number of stream data bytes in between index
// entries. See archive.Manifest for more information.
IndexByteRange int
}
// SettingsLoader returns archival Settings for a given project.
type SettingsLoader func(context.Context, cfgtypes.ProjectName) (*Settings, error)
// Archivist is a stateless configuration capable of archiving individual log
// streams.
type Archivist struct {
// Service is the client to use to communicate with Coordinator's Services
// endpoint.
Service logdog.ServicesClient
// SettingsLoader loads archival settings for a specific project.
SettingsLoader SettingsLoader
// Storage is the archival source Storage instance.
Storage storage.Storage
// GSClient is the Google Storage client to for archive generation.
GSClient gs.Client
}
// storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
// to during archival. This should be greater than the maximum LogEntry size.
const storageBufferSize = types.MaxLogEntryDataSize * 64
// ArchiveTask processes and executes a single log stream archive task.
//
// During processing, the Task's Consume method may be called to indicate that
// it should be consumed.
//
// If the supplied Context is Done, operation may terminate before completion,
// returning the Context's error.
func (a *Archivist) ArchiveTask(c context.Context, task Task) {
c = log.SetFields(c, log.Fields{
"project": task.Task().Project,
"id": task.Task().Id,
})
log.Debugf(c, "Received archival task.")
err := a.archiveTaskImpl(c, task)
failure := isFailure(err)
log.Fields{
log.ErrorKey: err,
"failure": failure,
}.Infof(c, "Finished archive task.")
// Add a result metric.
tsCount.Add(c, 1, !failure)
}
// archiveTaskImpl performs the actual task archival.
//
// Its error return value is used to indicate how the archive failed. isFailure
// will be called to determine if the returned error value is a failure or a
// status error.
func (a *Archivist) archiveTaskImpl(c context.Context, task Task) error {
at := task.Task()
// Validate the project name.
if err := cfgtypes.ProjectName(at.Project).Validate(); err != nil {
task.Consume()
return fmt.Errorf("invalid project name %q: %s", at.Project, err)
}
// Get the local time. If we are within the dispatchThreshold, retry this
// archival later.
if ad := google.TimeFromProto(at.DispatchedAt); !ad.IsZero() {
now := clock.Now(c)
delta := now.Sub(ad)
if delta < 0 {
delta = -delta
}
if delta < dispatchThreshold {
log.Fields{
"localTime": now.Local(),
"dispatchTime": ad.Local(),
"delta": delta,
"threshold": dispatchThreshold,
}.Infof(c, "Log stream is within dispatch threshold. Returning task to queue.")
return statusErr(errors.New("log stream is within dispatch threshold"))
}
}
// Load the log stream's current state. If it is already archived, we will
// return an immediate success.
ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
Project: at.Project,
Id: at.Id,
Desc: true,
})
switch {
case err != nil:
log.WithError(err).Errorf(c, "Failed to load log stream.")
return err
case ls.State == nil:
log.Errorf(c, "Log stream did not include state.")
return errors.New("log stream did not include state")
case ls.State.Purged:
log.Warningf(c, "Log stream is purged. Discarding archival request.")
task.Consume()
return statusErr(errors.New("log stream is purged"))
case ls.State.Archived:
log.Infof(c, "Log stream is already archived. Discarding archival request.")
task.Consume()
return statusErr(errors.New("log stream is archived"))
case !bytes.Equal(ls.ArchivalKey, at.Key):
if len(ls.ArchivalKey) == 0 {
// The log stream is not registering as "archive pending" state.
//
// This can happen if the eventually-consistent datastore hasn't updated
// its log stream state by the time this Pub/Sub task is received. In
// this case, we will continue retrying the task until datastore registers
// that some key is associated with it.
log.Infof(c, "Archival request received before log stream has its key.")
return statusErr(errors.New("premature archival request"))
}
// This can happen if a Pub/Sub message is dispatched during a transaction,
// but that specific transaction failed. In this case, the Pub/Sub message
// will have a key that doesn't match the key that was transactionally
// encoded, and can be discarded.
log.Fields{
"logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKey),
"requestArchivalKey": hex.EncodeToString(at.Key),
}.Infof(c, "Superfluous archival request (keys do not match). Discarding.")
task.Consume()
return statusErr(errors.New("superfluous archival request"))
case ls.State.ProtoVersion != logpb.Version:
log.Fields{
"protoVersion": ls.State.ProtoVersion,
"expectedVersion": logpb.Version,
}.Errorf(c, "Unsupported log stream protobuf version.")
return errors.New("unsupported log stream protobuf version")
case ls.Desc == nil:
log.Errorf(c, "Log stream did not include a descriptor.")
return errors.New("log stream did not include a descriptor")
}
// If the archival request is younger than the settle delay, kick it back to
// retry later.
age := google.DurationFromProto(ls.Age)
if settle := google.DurationFromProto(at.SettleDelay); age < settle {
log.Fields{
"age": age,
"settleDelay": settle,
}.Infof(c, "Log stream is younger than the settle delay. Returning task to queue.")
return statusErr(errors.New("log stream is within settle delay"))
}
// Load archival settings for this project.
settings, err := a.loadSettings(c, cfgtypes.ProjectName(at.Project))
if err != nil {
log.Fields{
log.ErrorKey: err,
"project": at.Project,
}.Errorf(c, "Failed to load settings for project.")
return err
}
ar := logdog.ArchiveStreamRequest{
Project: at.Project,
Id: at.Id,
}
// Build our staged archival plan. This doesn't actually do any archiving.
staged, err := a.makeStagedArchival(c, cfgtypes.ProjectName(at.Project), settings, ls, task.UniqueID())
if err != nil {
log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
return err
}
// Are we required to archive a complete log stream?
if age <= google.DurationFromProto(at.CompletePeriod) {
// If we're requiring completeness, perform a keys-only scan of intermediate
// storage to ensure that we have all of the records before we bother
// streaming to storage only to find that we are missing data.
if err := staged.checkComplete(c); err != nil {
return err
}
}
// Archive to staging.
//
// If a non-transient failure occurs here, we will report it to the Archivist
// under the assumption that it will continue occurring.
//
// We will handle error creating the plan and executing the plan in the same
// switch statement below.
switch err = staged.stage(c); {
case transient.Tag.In(err):
// If this is a transient error, exit immediately and do not delete the
// archival task.
log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.")
return err
case err != nil:
// This is a non-transient error, so we are confident that any future
// Archival will also encounter this error. We will mark this archival
// as an error and report it to the Coordinator.
log.WithError(err).Errorf(c, "Archival failed with non-transient error.")
ar.Error = err.Error()
if ar.Error == "" {
// This needs to be non-nil, so if our acutal error has an empty string,
// fill in a generic message.
ar.Error = "archival error"
}
default:
// In case something fails, clean up our staged archival (best effort).
defer staged.cleanup(c)
// Finalize the archival. First, extend our lease to confirm that we still
// hold it.
if err := task.AssertLease(c); err != nil {
log.WithError(err).Errorf(c, "Failed to extend task lease before finalizing.")
return err
}
// Finalize the archival.
if err := staged.finalize(c, a.GSClient, &ar); err != nil {
log.WithError(err).Errorf(c, "Failed to finalize archival.")
return err
}
// Add metrics for this successful archival.
streamType := staged.desc.StreamType.String()
staged.stream.addMetrics(c, tsEntriesField, streamType)
staged.index.addMetrics(c, tsIndexField, streamType)
staged.data.addMetrics(c, tsDataField, streamType)
tsLogEntries.Add(c, float64(staged.logEntryCount), streamType)
tsTotalLogEntries.Add(c, staged.logEntryCount, streamType)
}
log.Fields{
"streamURL": ar.StreamUrl,
"indexURL": ar.IndexUrl,
"dataURL": ar.DataUrl,
"terminalIndex": ar.TerminalIndex,
"logEntryCount": ar.LogEntryCount,
"hadError": ar.Error,
"complete": ar.Complete(),
}.Debugf(c, "Finished archival round. Reporting archive state.")
// Extend the lease again to confirm that we still hold it.
if err := task.AssertLease(c); err != nil {
log.WithError(err).Errorf(c, "Failed to extend task lease before reporting.")
return err
}
if _, err := a.Service.ArchiveStream(c, &ar); err != nil {
log.WithError(err).Errorf(c, "Failed to report archive state.")
return err
}
// Archival is complete and acknowledged by Coordinator. Consume the archival
// task.
task.Consume()
return nil
}
// loadSettings loads and validates archival settings.
func (a *Archivist) loadSettings(c context.Context, project cfgtypes.ProjectName) (*Settings, error) {
if a.SettingsLoader == nil {
panic("no settings loader configured")
}
st, err := a.SettingsLoader(c, project)
switch {
case err != nil:
return nil, err
case st.GSBase.Bucket() == "":
log.Fields{
log.ErrorKey: err,
"gsBase": st.GSBase,
}.Errorf(c, "Invalid storage base.")
return nil, errors.New("invalid storage base")
case st.GSStagingBase.Bucket() == "":
log.Fields{
log.ErrorKey: err,
"gsStagingBase": st.GSStagingBase,
}.Errorf(c, "Invalid storage staging base.")
return nil, errors.New("invalid storage staging base")
default:
return st, nil
}
}
func (a *Archivist) makeStagedArchival(c context.Context, project cfgtypes.ProjectName,
st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) {
sa := stagedArchival{
Archivist: a,
Settings: st,
project: project,
terminalIndex: types.MessageIndex(ls.State.TerminalIndex),
}
// Deserialize and validate the descriptor protobuf. If this fails, it is a
// non-transient error.
if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
log.Fields{
log.ErrorKey: err,
"protoVersion": ls.State.ProtoVersion,
}.Errorf(c, "Failed to unmarshal descriptor protobuf.")
return nil, err
}
sa.path = sa.desc.Path()
// Construct our staged archival paths.
sa.stream = sa.makeStagingPaths("logstream.entries", uid)
sa.index = sa.makeStagingPaths("logstream.index", uid)
// If we're emitting binary files, construct that too.
bext := sa.desc.BinaryFileExt
if bext != "" || sa.AlwaysRender {
// If no binary file extension was supplied, choose a default.
if bext == "" {
bext = "bin"
}
sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid)
}
return &sa, nil
}
type stagedArchival struct {
*Archivist
*Settings
project cfgtypes.ProjectName
path types.StreamPath
desc logpb.LogStreamDescriptor
stream stagingPaths
index stagingPaths
data stagingPaths
finalized bool
terminalIndex types.MessageIndex
logEntryCount int64
}
// makeStagingPaths returns a stagingPaths instance for the given path and
// file name. It incorporates a unique ID into the staging name to differentiate
// it from other staging paths for the same path/name.
func (sa *stagedArchival) makeStagingPaths(name, uid string) stagingPaths {
proj := string(sa.project)
// Either of these paths may be shared between projects. To enforce
// an absence of conflicts, we will insert the project name as part of the
// path.
return stagingPaths{
staged: sa.GSStagingBase.Concat(proj, string(sa.path), uid, name),
final: sa.GSBase.Concat(proj, string(sa.path), name),
}
}
// checkComplete performs a quick scan of intermediate storage to ensure that
// all of the log stream's records are available.
func (sa *stagedArchival) checkComplete(c context.Context) error {
if sa.terminalIndex < 0 {
log.Warningf(c, "Cannot archive complete stream with no terminal index.")
return statusErr(errors.New("completeness required, but stream has no terminal index"))
}
sreq := storage.GetRequest{
Project: sa.project,
Path: sa.path,
KeysOnly: true,
}
nextIndex := types.MessageIndex(0)
var ierr error
err := sa.Storage.Get(sreq, func(e *storage.Entry) bool {
idx, err := e.GetStreamIndex()
if err != nil {
ierr = errors.Annotate(err, "could not get stream index").Err()
return false
}
switch {
case idx != nextIndex:
ierr = fmt.Errorf("missing log entry index %d (next %d)", nextIndex, idx)
return false
case idx == sa.terminalIndex:
// We have hit our terminal index, so all of the log data is here!
return false
default:
nextIndex++
return true
}
})
if ierr != nil {
return ierr
}
if err != nil {
return err
}
return nil
}
// stage executes the archival process, archiving to the staged storage paths.
//
// If stage fails, it may return a transient error.
func (sa *stagedArchival) stage(c context.Context) (err error) {
log.Fields{
"streamURL": sa.stream.staged,
"indexURL": sa.index.staged,
"dataURL": sa.data.staged,
}.Debugf(c, "Staging log stream...")
// Group any transient errors that occur during cleanup. If we aren't
// returning a non-transient error, return a transient "terr".
var terr errors.MultiError
defer func() {
if err == nil && len(terr) > 0 {
err = transient.Tag.Apply(terr)
}
}()
// Close our writers on exit. If any of them fail to close, mark the archival
// as a transient failure.
closeWriter := func(closer io.Closer, path gs.Path) {
// Close the Writer. If this results in an error, append it to our transient
// error MultiError.
if ierr := closer.Close(); ierr != nil {
terr = append(terr, ierr)
}
// If we have an archival error, also delete the path associated with this
// stream. This is a non-fatal failure, since we've already hit a fatal
// one.
if err != nil || len(terr) > 0 {
if ierr := sa.GSClient.Delete(path); ierr != nil {
log.Fields{
log.ErrorKey: ierr,
"path": path,
}.Warningf(c, "Failed to delete stream on error.")
}
}
}
// createWriter is a shorthand function for creating a writer to a path and
// reporting an error if it failed.
createWriter := func(p gs.Path) (gs.Writer, error) {
w, ierr := sa.GSClient.NewWriter(p)
if ierr != nil {
log.Fields{
log.ErrorKey: ierr,
"path": p,
}.Errorf(c, "Failed to create writer.")
return nil, ierr
}
return w, nil
}
var streamWriter, indexWriter, dataWriter gs.Writer
if streamWriter, err = createWriter(sa.stream.staged); err != nil {
return
}
defer closeWriter(streamWriter, sa.stream.staged)
if indexWriter, err = createWriter(sa.index.staged); err != nil {
return err
}
defer closeWriter(indexWriter, sa.index.staged)
if sa.data.enabled() {
// Only emit a data stream if we are configured to do so.
if dataWriter, err = createWriter(sa.data.staged); err != nil {
return err
}
defer closeWriter(dataWriter, sa.data.staged)
}
// Read our log entries from intermediate storage.
ss := storageSource{
Context: c,
st: sa.Storage,
project: sa.project,
path: sa.path,
terminalIndex: sa.terminalIndex,
lastIndex: -1,
}
m := archive.Manifest{
Desc: &sa.desc,
Source: &ss,
LogWriter: streamWriter,
IndexWriter: indexWriter,
DataWriter: dataWriter,
StreamIndexRange: sa.IndexStreamRange,
PrefixIndexRange: sa.IndexPrefixRange,
ByteRange: sa.IndexByteRange,
Logger: log.Get(c),
}
if err = archive.Archive(m); err != nil {
log.WithError(err).Errorf(c, "Failed to archive log stream.")
return
}
switch {
case ss.logEntryCount == 0:
// If our last log index was <0, then no logs were archived.
log.Warningf(c, "No log entries were archived.")
default:
// Update our terminal index.
log.Fields{
"terminalIndex": ss.lastIndex,
"logEntryCount": ss.logEntryCount,
}.Debugf(c, "Finished archiving log stream.")
}
// Update our state with archival results.
sa.terminalIndex = ss.lastIndex
sa.logEntryCount = ss.logEntryCount
sa.stream.bytesWritten = streamWriter.Count()
sa.index.bytesWritten = indexWriter.Count()
if dataWriter != nil {
sa.data.bytesWritten = dataWriter.Count()
}
return
}
type stagingPaths struct {
staged gs.Path
final gs.Path
bytesWritten int64
}
func (d *stagingPaths) clearStaged() { d.staged = "" }
func (d *stagingPaths) enabled() bool { return d.final != "" }
func (d *stagingPaths) addMetrics(c context.Context, archiveField, streamField string) {
tsSize.Add(c, float64(d.bytesWritten), archiveField, streamField)
tsTotalBytes.Add(c, d.bytesWritten, archiveField, streamField)
}
func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logdog.ArchiveStreamRequest) error {
err := parallel.FanOutIn(func(taskC chan<- func() error) {
for _, d := range sa.getStagingPaths() {
d := d
// Don't finalize zero-sized streams.
if !d.enabled() || d.bytesWritten == 0 {
continue
}
taskC <- func() error {
if err := client.Rename(d.staged, d.final); err != nil {
log.Fields{
log.ErrorKey: err,
"stagedPath": d.staged,
"finalPath": d.final,
}.Errorf(c, "Failed to rename GS object.")
return err
}
// Clear the staged value to indicate that it no longer exists.
d.clearStaged()
return nil
}
}
})
if err != nil {
return err
}
ar.TerminalIndex = int64(sa.terminalIndex)
ar.LogEntryCount = sa.logEntryCount
ar.StreamUrl = string(sa.stream.final)
ar.StreamSize = sa.stream.bytesWritten
ar.IndexUrl = string(sa.index.final)
ar.IndexSize = sa.index.bytesWritten
if sa.data.enabled() {
ar.DataUrl = string(sa.data.final)
ar.DataSize = sa.data.bytesWritten
}
return nil
}
func (sa *stagedArchival) cleanup(c context.Context) {
for _, d := range sa.getStagingPaths() {
if d.staged == "" {
continue
}
if err := sa.GSClient.Delete(d.staged); err != nil {
log.Fields{
log.ErrorKey: err,
"path": d.staged,
}.Warningf(c, "Failed to clean up staged path.")
}
d.clearStaged()
}
}
func (sa *stagedArchival) getStagingPaths() []*stagingPaths {
return []*stagingPaths{
&sa.stream,
&sa.index,
&sa.data,
}
}
// statusErrorWrapper is an error wrapper. It is detected by IsFailure and used to
// determine whether the supplied error represents a failure or just a status
// error.
type statusErrorWrapper struct {
inner error
}
var _ interface {
error
errors.Wrapped
} = (*statusErrorWrapper)(nil)
func statusErr(inner error) error {
return &statusErrorWrapper{inner}
}
func (e *statusErrorWrapper) Error() string {
if e.inner != nil {
return e.inner.Error()
}
return ""
}
func (e *statusErrorWrapper) InnerError() error {
return e.inner
}
func isFailure(err error) bool {
if err == nil {
return false
}
_, ok := err.(*statusErrorWrapper)
return !ok
}