blob: 25407245f220e42fd64e8e625a26b35f8c522434 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package archive constructs a LogDog archive out of log stream components.
// Records are read from the stream and emitted as an archive.
package archive
import (
"crypto/sha256"
"encoding/hex"
"io"
"reflect"
cl "cloud.google.com/go/logging"
"github.com/golang/protobuf/proto"
"go.chromium.org/luci/common/data/recordio"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/renderer"
)
// CloudLogging entry has a limit of 256KB in the internal byte representation.
// If an entry is larger, CloudLogging will reject the entry with an error.
//
// To minimize the chance of a LogEntry exceeding the limit, Archive applies
// the following limits to the entry before exporting logpb.LogEntry
// to CloudLogging.
const (
// maxPayload is the maximum size for the payload of a CloudLogging entry.
//
// If a single line exceeds the limit in size, the line likely contains
// a dump of a serialized object, which wouldn't be useful in searches, and
// the line will get truncated when being exported to CloudLogging.
maxPayload = 128 * 1024
// maxTagSum is the maximum size sum of tag keys and values that can be
// attached to a CloudLogging Entry. If the sum exceeds the limit,
// the stream won't be exported to CloudLogging.
maxTagSum = 96 * 1024
)
// CLLogger is a general interface for CloudLogging logger and intended to enable
// unit tests and stub out CloudLogging.
type CLLogger interface {
Log(cl.Entry)
}
// Manifest is a set of archival parameters.
type Manifest struct {
// LUCIProject is the LUCI project for the stream.
LUCIProject string
// Desc is the logpb.LogStreamDescriptor for the stream.
Desc *logpb.LogStreamDescriptor
// Source is the LogEntry Source for the stream.
Source renderer.Source
// LogWriter, if not nil, is the Writer to which the log stream record stream
// will be written.
LogWriter io.Writer
// IndexWriter, if not nil, is the Writer to which the log stream Index
// protobuf stream will be written.
IndexWriter io.Writer
// StreamIndexRange, if >0, is the maximum number of log entry stream indices
// in between successive index entries.
//
// If no index constraints are set, an index entry will be emitted for each
// LogEntry.
StreamIndexRange int
// PrefixIndexRange, if >0, is the maximum number of log entry prefix indices
// in between successive index entries.
PrefixIndexRange int
// ByteRange, if >0, is the maximum number of log entry bytes in between
// successive index entries.
ByteRange int
// Logger, if not nil, will be used to log status during archival.
Logger logging.Logger
// CloudLogger, if not nil, will be used to export archived log entries to
// Cloud Logging.
CloudLogger CLLogger
// sizeFunc is a size method override used for testing.
sizeFunc func(proto.Message) int
}
func (m *Manifest) logger() logging.Logger {
if m.Logger == nil ||
(reflect.ValueOf(m.Logger).Kind() == reflect.Ptr &&
reflect.ValueOf(m.Logger).IsNil()) {
return logging.Null
}
return m.Logger
}
// Archive performs the log archival described in the supplied Manifest.
func Archive(m Manifest) error {
// Wrap our log source in a safeLogEntrySource to protect our index order.
m.Source = &safeLogEntrySource{
Manifest: &m,
Source: m.Source,
}
// If no constraints are applied, index every LogEntry.
if m.StreamIndexRange <= 0 && m.PrefixIndexRange <= 0 && m.ByteRange <= 0 {
m.StreamIndexRange = 1
}
if m.LogWriter == nil {
return nil
}
// If we're constructing an index, allocate a stateful index builder.
var idx *indexBuilder
if m.IndexWriter != nil {
idx = &indexBuilder{
Manifest: &m,
index: logpb.LogIndex{
Desc: m.Desc,
},
sizeFunc: m.sizeFunc,
}
}
// Compute a hash to be used as the ID of the stream in Cloud Logging.
sha := sha256.New()
sha.Write([]byte(m.LUCIProject))
sha.Write([]byte(m.Desc.Prefix))
sha.Write([]byte(m.Desc.Name))
streamIDHash := sha.Sum(nil)
return parallel.FanOutIn(func(taskC chan<- func() error) {
logC := make(chan *logpb.LogEntry)
taskC <- func() error {
if err := archiveLogs(m.LogWriter, m.Desc, logC, idx, m.CloudLogger, streamIDHash, m.logger()); err != nil {
return err
}
// If we're building an index, emit it now that the log stream has
// finished.
if idx != nil {
return idx.emit(m.IndexWriter)
}
return nil
}
// Iterate through all of our Source's logs and process them.
taskC <- func() error {
defer close(logC)
for {
le, err := m.Source.NextLogEntry()
if le != nil {
logC <- le
}
switch err {
case nil:
case io.EOF:
return nil
default:
return err
}
}
}
})
}
func archiveLogs(w io.Writer, d *logpb.LogStreamDescriptor, logC <-chan *logpb.LogEntry, idx *indexBuilder, cloudLogger CLLogger, streamIDHash []byte, logger logging.Logger) error {
offset := int64(0)
out := func(pb proto.Message) error {
d, err := proto.Marshal(pb)
if err != nil {
return err
}
count, err := recordio.WriteFrame(w, d)
offset += int64(count)
return err
}
isCLDisabled := (cloudLogger == nil ||
(reflect.ValueOf(cloudLogger).Kind() == reflect.Ptr &&
reflect.ValueOf(cloudLogger).IsNil()))
if !isCLDisabled {
tsum := 0
for k, v := range d.GetTags() {
tsum += len(k)
tsum += len(v)
if tsum > maxTagSum {
logger.Errorf("sum(tags) > %d; skipping the stream for CloudLogging export", maxTagSum)
isCLDisabled = true
break
}
}
}
// Start with our descriptor protobuf. Defer error handling until later, as
// we are still responsible for draining "logC".
err := out(d)
eb := newEntryBuffer(maxPayload, hex.EncodeToString(streamIDHash), d)
for le := range logC {
if err != nil {
continue
}
// Add this LogEntry to our index, noting the current offset.
if idx != nil {
idx.addLogEntry(le, offset)
}
err = out(le)
// Skip CloudLogging export, if disabled.
if isCLDisabled {
continue
}
for _, entry := range eb.append(le) {
cloudLogger.Log(*entry)
}
}
// Export the last entry.
//
// If there was an error, the buffered line can possibly contain
// an incomplete line, which was going to be completed in the next LogEntry.
//
// If so, skip flushing out the buffered line to prevent the complete
// version of the line from being deduped on the next retry of
// the archival task.
if err == nil {
if entry := eb.flush(); entry != nil {
cloudLogger.Log(*entry)
}
}
return err
}