blob: c6e9b51c9d9a112dcc4435bad1333b33065bb00c [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fetcher
import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/golang/protobuf/proto"
"go.chromium.org/luci/common/clock"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/renderer"
"go.chromium.org/luci/logdog/common/types"
)
const (
// DefaultDelay is the default Delay value.
DefaultDelay = 5 * time.Second
// DefaultBufferBytes is the default number of bytes to buffer.
DefaultBufferBytes = int64(1024 * 1024) // 1MB
)
// LogRequest is a structure used by the Fetcher to request a range of logs
// from its Source.
type LogRequest struct {
// Index is the starting log index to request.
Index types.MessageIndex
// Count, if >0, is the maximum number of log entries to request.
Count int
// Bytes, if >0, is the maximum number of log bytes to request. At least one
// log must be returned regardless of the byte limit.
Bytes int64
}
// Source is the source of log stream and log information.
//
// The Source is responsible for handling retries, backoff, and transient
// errors. An error from the Source will shut down the Fetcher.
type Source interface {
// LogEntries populates the supplied LogRequest with available sequential
// log entries as available.
//
// This may optionally block pending new log entries, but may also return zero
// log entries if none are available yet.
//
// Upon success, the requested logs and terminal message index is returned. If
// no terminal index is known, a value <0 will be returned.
LogEntries(context.Context, *LogRequest) ([]*logpb.LogEntry, types.MessageIndex, error)
// Descriptor returns the stream's descriptor, if the source knows it.
//
// If the source doesn't have this information, this should return nil.
Descriptor() *logpb.LogStreamDescriptor
}
// Options is the set of configuration parameters for a Fetcher.
type Options struct {
// Source is the log stream source.
Source Source
// Index is the starting stream index to retrieve.
Index types.MessageIndex
// Count is the total number of logs to retrieve. If zero, the full stream
// will be fetched.
Count int64
// Count is the minimum amount of LogEntry records to buffer. If the buffered
// amount dips below Count, more logs will be fetched.
//
// If zero, no count target will be applied.
BufferCount int
// BufferBytes is the target number of LogEntry bytes to buffer. If the
// buffered amount dips below Bytes, more logs will be fetched.
//
// If zero, no byte target will be applied unless Count is also zero, in which
// case DefaultBufferBytes byte constraint will be applied.
BufferBytes int64
// PrefetchFactor constrains the amount of additional data to fetch when
// refilling the buffer. Effective Count and Bytes values are multiplied
// by PrefetchFactor to determine the amount of logs to request.
PrefetchFactor int
// Delay is the amount of time to wait in between unsuccessful log requests.
Delay time.Duration
// Set this to immediately bail out with ErrIncompleteStream if the stream
// isn't complete yet. This can be useful when you believe the stream to
// already be terminal, but haven't done an RPC with LogDog yet to actually
// confirm this.
RequireCompleteStream bool
// sizeFunc is a function that calculates the byte size of a LogEntry
// protobuf.
//
// If nil, proto.Size will be used. This is used for testing.
sizeFunc func(proto.Message) int
}
// ErrIncompleteStream is returned by Fetcher if Options.RequireCompleteStream
// was true and the underlying Stream is still incomplete (i.e. has not yet
// been terminated by the client, or archived).
var ErrIncompleteStream = errors.New("stream has not yet terminated")
// A Fetcher buffers LogEntry records by querying the Source for log data.
// It attmepts to maintain a steady stream of records by prefetching available
// records in advance of consumption.
//
// A Fetcher is not goroutine-safe.
type Fetcher struct {
// o is the set of Options.
o *Options
// logC is the communication channel between the fetch goroutine and
// the user-facing NextLogEntry. Logs are punted through this channel
// one-by-one.
logC chan *logResponse
// fetchErr is the retained error state. If not nil, fetching has stopped and
// all Fetcher methods will return this error.
fetchErr error
}
// New instantiates a new Fetcher instance.
//
// The Fetcher can be cancelled by cancelling the supplied context.
func New(c context.Context, o Options) *Fetcher {
if o.Delay <= 0 {
o.Delay = DefaultDelay
}
if o.BufferBytes <= 0 && o.BufferCount <= 0 {
o.BufferBytes = DefaultBufferBytes
}
if o.PrefetchFactor <= 1 {
o.PrefetchFactor = 1
}
f := Fetcher{
o: &o,
logC: make(chan *logResponse, o.Count),
}
go f.fetch(c)
return &f
}
type logResponse struct {
log *logpb.LogEntry
size int64
index types.MessageIndex
err error
}
// NextLogEntry returns the next buffered LogEntry, blocking until it becomes
// available.
//
// If the end of the log stream is encountered, NextLogEntry will return
// io.EOF.
//
// If the Fetcher is cancelled, a context.Canceled error will be returned.
func (f *Fetcher) NextLogEntry() (*logpb.LogEntry, error) {
var le *logpb.LogEntry
if f.fetchErr == nil {
resp := <-f.logC
if resp.err != nil {
f.fetchErr = resp.err
}
if resp.log != nil {
le = resp.log
}
}
return le, f.fetchErr
}
type fetchRequest struct {
index types.MessageIndex
count int
bytes int64
respC chan *fetchResponse
}
type fetchResponse struct {
logs []*logpb.LogEntry
tidx types.MessageIndex
err error
}
// fetch is run in a separate goroutine. It handles buffering goroutine and
// punts logs and/or error state to NextLogEntry via logC.
func (f *Fetcher) fetch(c context.Context) {
defer close(f.logC)
lb := logBuffer{}
nextFetchIndex := f.o.Index
lastSentIndex := types.MessageIndex(-1)
tidx := types.MessageIndex(-1)
c, cancelFunc := context.WithCancel(c)
bytes := int64(0)
logFetchBaseC := make(chan *fetchResponse)
var logFetchC chan *fetchResponse
defer func() {
// Reap our fetch goroutine, if still fetching.
if logFetchC != nil {
cancelFunc()
<-logFetchC
}
close(logFetchBaseC)
}()
errOut := func(err error) {
f.logC <- &logResponse{err: err}
}
count := int64(0)
for tidx < 0 || lastSentIndex < tidx {
// If we're configured with an upper delivery bound and we've delivered
// that many entries, we're done.
remaining := int64(-1)
if f.o.Count > 0 {
remaining = f.o.Count - count
if remaining <= 0 {
log.Fields{
"count": count,
}.Debugf(c, "Exceeded maximum log count.")
break
}
}
// If we have a buffered log, prepare it for sending.
var sendC chan<- *logResponse
var lr *logResponse
if le := lb.current(); le != nil {
lr = &logResponse{
log: le,
size: f.sizeOf(le),
index: types.MessageIndex(le.StreamIndex),
}
sendC = f.logC
}
// If we're not currently fetching logs, and we are below our thresholds,
// request a new batch of logs.
if logFetchC == nil && (tidx < 0 || nextFetchIndex <= tidx) {
// We always have a byte constraint. Are we below it?
fetchCount := f.applyConstraint(int64(f.o.BufferCount), int64(lb.size()))
fetchBytes := f.applyConstraint(f.o.BufferBytes, bytes)
// Apply maximum log count constraint, if one is specified.
if remaining >= 0 {
// Factor in our currently-buffered logs.
remaining -= int64(lb.size())
switch {
case remaining <= 0:
// No more to fetch, we've buffered all the logs we're going to need.
fetchCount = -1
case fetchCount == 0, remaining < fetchCount:
fetchCount = remaining
}
}
// Fetch logs if neither constraint has vetoed.
if fetchCount >= 0 && fetchBytes >= 0 {
logFetchC = logFetchBaseC
req := fetchRequest{
index: nextFetchIndex,
count: int(fetchCount),
bytes: fetchBytes,
respC: logFetchC,
}
log.Fields{
"index": req.index,
"count": req.count,
"bytes": req.bytes,
}.Debugf(c, "Buffering next round of logs...")
go f.fetchLogs(c, &req)
}
}
select {
case <-c.Done():
// Our Context has been cancelled. Terminate and propagate that error.
log.WithError(c.Err()).Debugf(c, "Context has been cancelled.")
errOut(c.Err())
return
case resp := <-logFetchC:
logFetchC = nil
if resp.err != nil {
log.WithError(resp.err).Errorf(c, "Error fetching logs.")
errOut(resp.err)
return
}
if f.o.RequireCompleteStream && resp.tidx == -1 {
errOut(ErrIncompleteStream)
return
}
// Account for each log and add it to our buffer.
if len(resp.logs) > 0 {
for _, le := range resp.logs {
bytes += int64(f.sizeOf(le))
}
nextFetchIndex = types.MessageIndex(resp.logs[len(resp.logs)-1].StreamIndex) + 1
lb.append(resp.logs...)
}
if resp.tidx >= 0 {
tidx = resp.tidx
}
case sendC <- lr:
bytes -= lr.size
lastSentIndex = lr.index
count++
lb.next()
}
}
// We've punted the full log stream.
errOut(io.EOF)
}
func (f *Fetcher) fetchLogs(c context.Context, req *fetchRequest) {
resp := fetchResponse{}
defer func() {
if r := recover(); r != nil {
resp.err = fmt.Errorf("panic during fetch: %v", r)
}
req.respC <- &resp
}()
// Request the next chunk of logs from our Source.
lreq := LogRequest{
Index: req.index,
Count: req.count,
Bytes: req.bytes,
}
if lreq.Count < 0 {
lreq.Count = 0
}
if lreq.Bytes < 0 {
lreq.Bytes = 0
}
for {
log.Fields{
"index": req.index,
}.Debugf(c, "Fetching logs.")
resp.logs, resp.tidx, resp.err = f.o.Source.LogEntries(c, &lreq)
switch {
case resp.err != nil:
log.Fields{
log.ErrorKey: resp.err,
"index": req.index,
}.Errorf(c, "Fetch returned error.")
return
case len(resp.logs) > 0:
log.Fields{
"index": req.index,
"count": len(resp.logs),
"terminalIndex": resp.tidx,
"firstLog": resp.logs[0].StreamIndex,
"lastLog": resp.logs[len(resp.logs)-1].StreamIndex,
}.Debugf(c, "Fetched log entries.")
return
case resp.tidx >= 0 && req.index > resp.tidx:
// Will never be satisfied, since we're requesting beyond the terminal
// index.
return
default:
// No logs this round. Sleep for more.
log.Fields{
"index": req.index,
"delay": f.o.Delay,
}.Infof(c, "No logs returned. Sleeping...")
if tr := clock.Sleep(c, f.o.Delay); tr.Incomplete() {
log.WithError(tr.Err).Warningf(c, "Context was canceled.")
resp.err = tr.Err
return
}
}
}
}
func (f *Fetcher) sizeOf(le *logpb.LogEntry) int64 {
sf := f.o.sizeFunc
if sf == nil {
sf = proto.Size
}
return int64(sf(le))
}
func (f *Fetcher) applyConstraint(want, have int64) int64 {
switch {
case want <= 0:
// No constraint, unbounded.
return 0
case want > have:
// We have fewer logs buffered. Fetch the difference (including
// prefetch).
return (want * int64(f.o.PrefetchFactor)) - have
default:
// We're within our constraint. Do not fetch logs.
return -1
}
}
// Reader returns an io.Reader (a *renderer.Renderer) for this Fetcher.
func (f *Fetcher) Reader() io.Reader {
return &renderer.Renderer{
Source: f,
Raw: true,
}
}
// Descriptor returns the last-known Descriptor from the underlying Stream, if
// known.
//
// If the underlying stream doesn't have this information, this returns nil.
func (f *Fetcher) Descriptor() *logpb.LogStreamDescriptor {
return f.o.Source.Descriptor()
}