blob: 9da8002b9942c496f0bdeaf70fdfb7176670d58b [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package coordinator
import (
"context"
"sync"
"time"
"go.chromium.org/luci/common/clock"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/fetcher"
"go.chromium.org/luci/logdog/common/types"
)
// coordinatorSource is a fetcher.Source implementation that uses the
// Coordinator API.
type coordinatorSource struct {
sync.Mutex
stream *Stream
tidx types.MessageIndex
requireCompleteStream bool
streamState *LogStream
}
func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogRequest) (
[]*logpb.LogEntry, types.MessageIndex, error) {
s.Lock()
defer s.Unlock()
params := append(make([]GetParam, 0, 4),
LimitBytes(int(req.Bytes)),
LimitCount(req.Count),
Index(req.Index),
)
// If we haven't terminated, use this opportunity to fetch/update our stream
// state.
var streamState LogStream
reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0)
if reqStream {
params = append(params, WithState(&streamState))
}
delayTimer := clock.NewTimer(c)
defer delayTimer.Stop()
for {
logs, err := s.stream.Get(c, params...)
// TODO(iannucci,dnj): use retry module + transient tags instead
delayTimer.Reset(5 * time.Second)
switch err {
case nil:
if reqStream {
s.streamState = &streamState
s.tidx = streamState.State.TerminalIndex
}
return logs, s.tidx, nil
case ErrNoSuchStream:
if s.requireCompleteStream {
return nil, 0, err
}
log.WithError(err).Warningf(c, "Stream does not exist. Sleeping pending registration.")
// Delay, interrupting if our Context is interrupted.
if tr := <-delayTimer.GetC(); tr.Incomplete() {
return nil, 0, tr.Err
}
default:
if err != nil {
return nil, 0, err
}
}
}
}
// Descriptor returns the LogStreamDescriptor for this stream, if known,
// or returns nil.
func (s *coordinatorSource) Descriptor() *logpb.LogStreamDescriptor {
if s.streamState != nil {
return &s.streamState.Desc
}
return nil
}
// Fetcher returns a Fetcher implementation for this Stream.
//
// If you pass a nil fetcher.Options, a default option set will be used. The
// o.Source field will always be overwritten to be based off this stream.
func (s *Stream) Fetcher(c context.Context, o *fetcher.Options) *fetcher.Fetcher {
if o == nil {
o = &fetcher.Options{}
} else {
o = &(*o)
}
o.Source = &coordinatorSource{
stream: s, tidx: -1, requireCompleteStream: o.RequireCompleteStream}
return fetcher.New(c, *o)
}