blob: b429059c32d019a9c95ddbb4a706f63afaf8e453 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package annotee
import (
log ""
annopb ""
const (
// DefaultBufferSize is the Stream BufferSize value that will be used if no
// buffer size is provided.
DefaultBufferSize = 8192
const (
// STDOUT is the system STDOUT stream name.
STDOUT = types.StreamName("stdout")
// STDERR is the system STDERR stream.
STDERR = types.StreamName("stderr")
// DefaultAnnotationSubpath is the default annotation subpath. It will be used
// if an explicit subpath is not provided.
DefaultAnnotationSubpath = types.StreamName("annotations")
// Stream describes a single process stream.
type Stream struct {
// Reader is the stream data reader. It will be processed until it returns
// an error or io.EOF.
Reader io.Reader
// Name is the logdog stream name.
Name types.StreamName
// Tee, if not nil, is a writer where all consumed stream data should be
// forwarded.
Tee io.Writer
// Alias is the base stream name that this stream should alias to.
Alias string
// Annotate, if true, causes annotations in this Stream to be captured and
// an annotation LogDog stream to be emitted.
Annotate bool
// StripAnnotations, if true, causes all encountered annotations to be
// stripped from incoming stream data. Otherwise, those annotations will
// still advnace the annotation state (if Annotate is true), but will not be
// included in any output streams.
StripAnnotations bool
// EmitAllLink, if true, instructs an "all STDOUT/STDERR" link to be injected
// into this Stream.
EmitAllLink bool
// BufferSize is the size of the read buffer that will be used when processing
// this stream's data.
BufferSize int
// Options are the configuration options for a Processor.
type Options struct {
// Base is the base log stream name. This is prepended to every log name, as
// well as any generate log names.
Base types.StreamName
// AnnotationSubpath is the path underneath of Base where the annotation
// stream will be written.
// If empty, DefaultAnnotationSubpath will be used.
AnnotationSubpath types.StreamName
// LinkGenerator generates links to alias for a given log stream.
// If nil, no link annotations will be injected.
LinkGenerator LinkGenerator
// Client is the LogDog Butler Client to use for stream creation.
Client *streamclient.Client
// Execution describes the current applicaton's execution parameters. This
// will be used to construct annotation state.
Execution *annotation.Execution
// TeeAnnotations, if true, causes all encountered annotations to be
// tee'd, if teeing is configured.
TeeAnnotations bool
// TeeText, if true, causes all encountered non-annotation lines to be
// tee'd, if teeing is configured.
TeeText bool
// MetadataUpdateInterval is the amount of time to wait after stream metadata
// updates to push the updated metadata protobuf to the metadata stream.
// - If this is < 0, metadata will only be pushed at the beginning and end of
// a step.
// - If this equals 0, metadata will be pushed every time it's updated.
// - If this is 0, DefaultMetadataUpdateInterval will be used.
MetadataUpdateInterval time.Duration
// Offline specifies whether parsing happens not at the same time as
// emitting. If true and CURRENT_TIMESTAMP annotations are not provided
// then step start/end times are left empty.
Offline bool
// CloseSteps specified whether outstanding open steps must be closed.
CloseSteps bool
// AnnotationUpdated is synchronously called when the annotation message
// changes.
// stepBinary is binary-serialized annopb.Step.
// stepBinary must not be mutated.
// The call blocks writing datagrams to the output stream.
AnnotationUpdated func(stepBinary []byte)
// Processor consumes data from a list of Stream entries and interacts with the
// supplied Client instance.
// A Processor must be instantiated with New.
type Processor struct {
ctx context.Context
o *Options
astate *annotation.State
stepHandlers map[*annotation.Step]*stepHandler
annotationStream streamclient.DatagramStream
annotationC chan annotationSignal
annotationFinishedC chan struct{}
allEmittedStreams map[*Stream]struct{}
type annotationSignal struct {
data []byte
updateType annotation.UpdateType
// New instantiates a new Processor.
func New(c context.Context, o Options) *Processor {
p := Processor{
ctx: c,
o: &o,
stepHandlers: make(map[*annotation.Step]*stepHandler),
p.astate = &annotation.State{
LogNameBase: o.Base,
Callbacks: &annotationCallbacks{&p},
Execution: o.Execution,
Clock: clock.Get(c),
Offline: o.Offline,
return &p
// initialize initializes p's annotation stream handling system. If it is called
// more than once, it is a no-op.
func (p *Processor) initialize() (err error) {
// If we're already initialized, do nothing.
if p.annotationStream != nil {
return nil
annotationPath := p.o.AnnotationSubpath
if annotationPath == "" {
annotationPath = DefaultAnnotationSubpath
annotationPath = p.astate.RootStep().BaseStream(annotationPath)
// Create our annotation stream.
p.annotationStream, err = p.o.Client.NewDatagramStream(
p.ctx, annotationPath,
if err != nil {
log.WithError(err).Errorf(p.ctx, "Failed to create annotation stream.")
// Complete initialization and start our annotation meter.
p.annotationC = make(chan annotationSignal)
p.annotationFinishedC = make(chan struct{})
p.allEmittedStreams = map[*Stream]struct{}{}
// Run our annotation meter in a separate goroutine.
go p.runAnnotationMeter(p.annotationStream, p.o.MetadataUpdateInterval)
return nil
// RunStreams executes the Processor, consuming data from its configured streams
// and forwarding it to LogDog. Run will block until all streams have
// terminated.
// If a stream terminates with an error, or if there is an error processing the
// stream data, Run will return an error. If multiple Streams fail with errors,
// an errors.MultiError will be returned. io.EOF does not count as an error.
func (p *Processor) RunStreams(streams []*Stream) error {
ingestMu := sync.Mutex{}
ingest := func(s *Stream, l string) error {
defer ingestMu.Unlock()
return p.IngestLine(s, l)
// Read from all configured streams until they are finished.
return parallel.FanOutIn(func(taskC chan<- func() error) {
for _, s := range streams {
s := s
bufferSize := s.BufferSize
if bufferSize <= 0 {
bufferSize = DefaultBufferSize
taskC <- func() error {
lr := newLineReader(s.Reader, bufferSize)
for {
line, err := lr.readLine()
if err != nil {
if err == io.EOF {
return nil
return err
if err := ingest(s, line); err != nil {
log.ErrorKey: err,
"stream": s.Name,
"line": line,
}.Errorf(p.ctx, "Failed to ingest line.")
// IngestLine ingests a single line of text from an input stream, responding to
// any annotations encountered.
// This method is not goroutine-safe.
func (p *Processor) IngestLine(s *Stream, line string) error {
// Initialize our annotation stream.
if err := p.initialize(); err != nil {
log.WithError(err).Errorf(p.ctx, "Failed to initialize.")
return err
a := extractAnnotation(line)
if a != "" {
log.Debugf(p.ctx, "Annotation: %q", a)
var step *annotation.Step
if s.Annotate {
if a != "" {
// Append our annotation to the annotation state. This may cause our
// annotation callbacks to be invoked.
if err := p.astate.Append(a); err != nil {
log.ErrorKey: err,
"stream": s.Name,
"annotation": a,
}.Errorf(p.ctx, "Failed to process annotation.")
// Use the step handler for the current step.
step = p.astate.CurrentStep()
} else {
// Not handling annotations. Use our root step handler.
step = p.astate.RootStep()
h, err := p.getStepHandler(step, true)
if err != nil {
return err
// Determine our injected annotations.
injectedAnnotations := h.flushInjectedAnnotations()
// Emit the "all" link if configured (at most once).
if lg := p.o.LinkGenerator; lg != nil && s.EmitAllLink {
injectedAnnotations = append(injectedAnnotations,
buildAliasAnnotation("all", "stdio", lg.GetLink("**/stdout", "**/stderr")))
s.EmitAllLink = false
// Get our root log stream handler. As an optimization, if "step" is
// the root step, then "h" is already the root handler, so we don't need
// to duplicate the lookup.
// We only need the handler if we're going to emit annotations to the root
// stream.
var rootHandler *stepHandler
if !s.StripAnnotations && (a != "" || len(injectedAnnotations) > 0) {
if rootStep := p.astate.RootStep(); rootStep != step {
rootHandler, err = p.getStepHandler(rootStep, true)
if err != nil {
return err
} else {
rootHandler = h
// Handle annotation line.
if a != "" {
// If we're teeing annotations, emit this annotation.
// Some annotations (notably, STEP_LOG_LINE) contain actual content instead
// of structural build hinting. We don't want to tee those unless we're also
// teeing text.
if s.Tee != nil && p.o.TeeAnnotations && (p.o.TeeText || !isContentAnnotation(a)) {
if err := writeTextLine(s.Tee, line); err != nil {
log.WithError(err).Errorf(h, "Failed to tee annotation line.")
return err
// If we're not stripping annotations, emit this to the root handler.
if !s.StripAnnotations {
if err := rootHandler.writeBaseStream(s, line); err != nil {
log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.")
return err
// Emit any injected annotations.
for _, anno := range injectedAnnotations {
// Teeing?
if s.Tee != nil && p.o.TeeAnnotations {
if err := writeTextLine(s.Tee, anno); err != nil {
log.WithError(err).Errorf(h, "Failed to tee annotation line.")
return err
// Not stripping?
if !s.StripAnnotations {
if err := rootHandler.writeBaseStream(s, anno); err != nil {
log.WithError(err).Errorf(h, "Failed to send injected annotation line to LogDog.")
return err
// If we're stripping text, write a warning message noting that this stream
// will not have text in it.
if !p.o.TeeText && s.Tee != nil {
if !h.textStrippedNote {
err := writeTextLine(s.Tee, "This build is configured to send log data exclusively to LogDog. "+
"Please use the LogDog link on the build page to view this log stream.")
if err != nil {
log.WithError(err).Errorf(h, "Failed to write text stripped notice.")
return err
h.textStrippedNote = true
// Add links to specific log streams as they are generated.
injectTextStreamLines := h.flushInjectedTextStreamLines()
for _, line := range injectTextStreamLines {
if err := writeTextLine(s.Tee, line); err != nil {
log.WithError(err).Errorf(h, "Failed to inject text stream line: %s", line)
return err
// If this is a text line, and we're teeing text, emit this line.
if a == "" {
if s.Tee != nil && p.o.TeeText {
if err := writeTextLine(s.Tee, line); err != nil {
log.WithError(err).Errorf(h, "Failed to tee text line.")
return err
// Write to our LogDog stream.
if err := h.writeBaseStream(s, line); err != nil {
log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.")
return err
return err
// Finish instructs the Processor to close any outstanding state. This should be
// called when all automatic state updates have completed in case any steps
// didn't properly close their state.
// Finish will return the closed annotation state that was accumulated during
// processing.
func (p *Processor) Finish() *annotation.State {
// Finish our step handlers.
var closeTime *timestamp.Timestamp
if !p.astate.Offline {
// Note: p.astate.Clock is never nil here, see astate setup in New().
closeTime = google.NewTimestamp(p.astate.Clock.Now())
for _, h := range p.stepHandlers {
p.finishStepHandler(h, p.o.CloseSteps, closeTime)
// If we're initialized, shut down our annotation handling.
if p.annotationStream != nil {
// Close and reap our annotation meter goroutine.
// Close and destruct our annotation stream.
if err := p.annotationStream.Close(); err != nil {
log.WithError(err).Errorf(p.ctx, "Failed to close annotation stream.")
p.annotationStream = nil
return p.astate
func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHandler, error) {
if h := p.stepHandlers[step]; h != nil {
return h, nil
if !create {
return nil, nil
h, err := newStepHandler(p, step)
if err != nil {
log.ErrorKey: err,
"step": step,
}.Errorf(p.ctx, "Failed to create step handler.")
return nil, err
p.stepHandlers[step] = h
return h, nil
func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool, closeTime *timestamp.Timestamp) {
// Remove this handler from our list. This will stop us from
// double-finishing when we call Close() below, which calls the StepClosed
// callback, which calls finishStepHandler if the step is still in the map.
delete(p.stepHandlers, h.step)
if h.finished {
// Finish the step.
if closeSteps {
// Note: this is noop if the step is already closed. In particular, when
// we end up here through StepClosed callback.
// Close all streams associated with this handler.
if closeSteps {
// Notify that the annotation state has updated (closed).
h.finished = true
func (p *Processor) annotationStateUpdated(ut annotation.UpdateType) {
// Serialize our annotation state immediately, as the Step's internal state
// may change in future annotation processing iterations.
data, err := proto.Marshal(p.astate.RootStep().Proto())
if err != nil {
log.WithError(err).Errorf(p.ctx, "Failed to marshal state.")
// Send the data to our meter for transmission.
p.annotationC <- annotationSignal{data, ut}
func (p *Processor) runAnnotationMeter(s streamclient.DatagramStream, interval time.Duration) {
defer close(p.annotationFinishedC)
var latest []byte
sendLatest := func() {
if latest == nil {
if err := s.WriteDatagram(latest); err != nil {
log.WithError(err).Errorf(p.ctx, "Failed to write annotation.")
if p.o.AnnotationUpdated != nil {
latest = nil
// Make sure we send any buffered annotation stream before exiting.
defer sendLatest()
// Timer to handle metering (if enabled).
t := clock.NewTimer(p.ctx)
timerRunning := false
defer t.Stop()
first := true
for {
select {
case as, ok := <-p.annotationC:
if !ok {
// Handle the new annotation data.
latest =
switch {
case first, as.updateType == annotation.UpdateStructural, interval == 0:
// If
// - This is the first, we always send the first datagram immediately.
// - This is a structural update, we send it quickly.
// - We're not metering, so send every annotation immediately.
first = false
case interval > 0 && !timerRunning:
// Metering. Start our timer, if it's not already running from a
// previous annotation.
timerRunning = true
case <-t.GetC():
timerRunning = false
type annotationCallbacks struct {
func (c *annotationCallbacks) StepClosed(step *annotation.Step) {
if h, _ := c.getStepHandler(step, false); h != nil {
c.finishStepHandler(h, true, nil)
func (c *annotationCallbacks) Updated(step *annotation.Step, ut annotation.UpdateType) {
if h, _ := c.getStepHandler(step, false); h != nil {
func (c *annotationCallbacks) StepLogLine(step *annotation.Step, name types.StreamName, label, line string) {
h, err := c.getStepHandler(step, true)
if err != nil {
s, created, err := h.getTextStream(name)
if err != nil {
log.ErrorKey: err,
"step": h,
"stream": name,
}.Errorf(c.ctx, "Failed to get log substream.")
if created {
h.maybeInjectLink(label, "logdog", name)
if err := writeTextLine(s, line); err != nil {
log.ErrorKey: err,
"stream": name,
}.Errorf(c.ctx, "Failed to export log line.")
func (c *annotationCallbacks) StepLogEnd(step *annotation.Step, name types.StreamName) {
if h, _ := c.getStepHandler(step, true); h != nil {
// stepHandler handles the steps associated with a specified stream.
type stepHandler struct {
processor *Processor
step *annotation.Step
client *streamclient.Client
injectedAnnotations []string
injectedTextStreamLines []string
streams map[types.StreamName]io.WriteCloser
finished bool
allEmitted bool
textStrippedNote bool
func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) {
h := stepHandler{
Context: log.SetField(p.ctx, "step", step),
processor: p,
step: step,
client: p.o.Client,
streams: make(map[types.StreamName]io.WriteCloser),
// Send our initial annotation state.
return &h, nil
func (h *stepHandler) String() string {
return h.step.String()
func (h *stepHandler) writeBaseStream(s *Stream, line string) error {
name := h.step.BaseStream(s.Name)
stream, created, err := h.getTextStream(name)
if err != nil {
return err
if created {
switch s.Name {
case STDOUT:
if h.step.SetSTDOUTStream(&annopb.LogdogStream{Name: string(name)}) {
case STDERR:
if h.step.SetSTDERRStream(&annopb.LogdogStream{Name: string(name)}) {
segs := s.Name.Segments()
h.maybeInjectLink("stdio", segs[len(segs)-1], name)
return writeTextLine(stream, line)
func (h *stepHandler) updated(ut annotation.UpdateType) {
if !h.finished {
func (h *stepHandler) getTextStream(name types.StreamName) (s io.WriteCloser, created bool, err error) {
if h.finished {
err = fmt.Errorf("refusing to get stream %q for finished handler", name)
if s = h.streams[name]; s != nil {
// Create a new stream. Clone the properties archetype and customize.
s, err = h.client.NewStream(h.processor.ctx, name)
if err == nil {
created = true
h.streams[name] = s
func (h *stepHandler) closeStream(name types.StreamName) {
s := h.streams[name]
if s != nil {
h.closeStreamImpl(name, s)
delete(h.streams, name)
func (h *stepHandler) closeAllStreams() {
for name, s := range h.streams {
h.closeStreamImpl(name, s)
h.streams = make(map[types.StreamName]io.WriteCloser)
func (h *stepHandler) closeStreamImpl(name types.StreamName, s io.WriteCloser) {
if err := s.Close(); err != nil {
log.ErrorKey: err,
"stream": name,
}.Errorf(h, "Failed to close step stream.")
func (h *stepHandler) flushInjectedAnnotations() []string {
return flushStringSlice(&h.injectedAnnotations)
func (h *stepHandler) flushInjectedTextStreamLines() []string {
return flushStringSlice(&h.injectedTextStreamLines)
func (h *stepHandler) maybeInjectLink(base, text string, names ...types.StreamName) {
if lg := h.processor.o.LinkGenerator; lg != nil {
link := lg.GetLink(names...)
h.injectedAnnotations = append(h.injectedAnnotations, buildAliasAnnotation(base, text, link))
h.injectedTextStreamLines = append(h.injectedTextStreamLines, fmt.Sprintf("LogDog Link [%s]: %s", base, link))
func (h *stepHandler) maybeInjectTextStreamLink(name string, stream types.StreamName) {
if lg := h.processor.o.LinkGenerator; lg != nil {
func buildAliasAnnotation(base, text, link string) string {
return buildAnnotation("STEP_LINK", fmt.Sprintf("%s-->%s", text, base), link)
func flushStringSlice(sp *[]string) []string {
if sp == nil {
return nil
s := *sp
if len(s) == 0 {
return nil
lines := make([]string, len(s))
copy(lines, s)
*sp = s[:0]
return lines
// lineReader reads from an input stream and returns the data line-by-line.
// We don't use a Scanner because we want to be able to handle lines that may
// exceed the buffer length. We don't use ReadBytes here because we need to
// capture the last line in the stream, even if it doesn't end with a newline.
type lineReader struct {
r *bufio.Reader
buf bytes.Buffer
func newLineReader(r io.Reader, bufSize int) *lineReader {
return &lineReader{
r: bufio.NewReaderSize(r, bufSize),
func (l *lineReader) readLine() (string, error) {
for {
line, isPrefix, err := l.r.ReadLine()
if err != nil {
return "", err
l.buf.Write(line) // Can't (reasonably) fail.
if !isPrefix {
return l.buf.String(), nil
func writeTextLine(w io.Writer, line string) error {
_, err := fmt.Fprintln(w, line)
return err
func extractAnnotation(line string) string {
line = strings.TrimRightFunc(line, unicode.IsSpace)
if len(line) <= 6 || !(strings.HasPrefix(line, "@@@") && strings.HasSuffix(line, "@@@")) {
return ""
return strings.TrimSpace(line[3 : len(line)-3])
func buildAnnotation(name string, params ...string) string {
v := make([]string, 1, 1+len(params))
v[0] = name
return "@@@" + strings.Join(append(v, params...), "@") + "@@@"
func isContentAnnotation(a string) bool {
// Strip out any annotation arguments.
if idx := strings.IndexRune(a, '@'); idx > 0 {
a = a[:idx]
return a == "STEP_LOG_LINE"