blob: 1104ce58de06b28abb810daffe4f4f11ff6cec21 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package rawpresentation
import (
bbpb ""
log ""
annopb ""
const (
// DefaultLogDogHost is the default LogDog host, if one isn't specified via
// query string.
DefaultLogDogHost = chromeinfra.LogDogHost
// AnnotationStream represents a LogDog annotation protobuf stream.
type AnnotationStream struct {
Project string
Path types.StreamPath
// Client is the HTTP client to use for LogDog communication.
Client *coordinator.Client
// The cached Step object
step *annopb.Step
// Build is the build.proto, if this annotation stream is Build messages
// instead of Step messages.
build *bbpb.Build
finished bool
// normalize validates and normalizes the stream's parameters.
func (as *AnnotationStream) normalize() error {
if err := config.ValidateProjectName(as.Project); err != nil {
return errors.Annotate(err, "Invalid project name: %s", as.Project).Tag(grpcutil.InvalidArgumentTag).Err()
if err := as.Path.Validate(); err != nil {
return errors.Annotate(err, "Invalid log stream path %q", as.Path).Tag(grpcutil.InvalidArgumentTag).Err()
return nil
var errNotMilo = errors.New("Requested stream is not a recognized protobuf")
var errNotDatagram = errors.New("Requested stream is not a datagram stream")
var errNoEntries = errors.New("Log stream has no annotation entries")
// populateCache loads the annotation stream from LogDog and caches it on this
// AnnotationStream.
// If the stream does not exist, or is invalid, populateCache will return a Milo error.
func (as *AnnotationStream) populateCache(c context.Context) error {
// Cached?
if as.step != nil || != nil {
return nil
// Load from LogDog directly.
"host": as.Client.Host,
"project": as.Project,
"path": as.Path,
}.Infof(c, "Making tail request to LogDog to populateCache annotation stream.")
var (
state coordinator.LogStream
stream = as.Client.Stream(as.Project, as.Path)
le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Complete())
if err != nil {
log.WithError(err).Errorf(c, "Failed to load stream.")
return err
// Make sure that this is an annotation stream.
switch {
case state.Desc.StreamType != logpb.StreamType_DATAGRAM:
return errNotDatagram
case le == nil:
// No annotation stream data, so render a minimal page.
return errNoEntries
var toUnmarshal proto.Message
var compressed bool
var followup func()
switch state.Desc.ContentType {
case annopb.ContentTypeAnnotations:
var step annopb.Step
toUnmarshal = &step
followup = func() {
var latestEndedTime time.Time
for _, sub := range step.Substep {
switch t := sub.Substep.(type) {
case *annopb.Step_Substep_AnnotationStream:
// TODO(hinoka,dnj): Implement recursive / embedded substream fetching if
// specified.
log.Warningf(c, "Annotation stream links LogDog substream [%+v], not supported!", t.AnnotationStream)
case *annopb.Step_Substep_Step:
endedTime := t.Step.Ended.AsTime()
if t.Step.Ended != nil && endedTime.After(latestEndedTime) {
latestEndedTime = endedTime
if latestEndedTime.IsZero() {
// No substep had an ended time :(
latestEndedTime = step.Started.AsTime()
as.step = &step
case luciexe.BuildProtoZlibContentType:
var build bbpb.Build
toUnmarshal = &build
compressed = true
followup = func() { = &build
return errNotMilo
// Get the last log entry in the stream. In reality, this will be index 0,
// since the "Tail" call should only return one log entry.
// Because we supplied the "Complete" flag to Tail and succeeded, this
// datagram will be complete even if its source datagram(s) are fragments.
dg := le.GetDatagram()
if dg == nil {
return errors.New("Datagram stream does not have datagram data")
data := dg.Data
if compressed {
z, err := zlib.NewReader(bytes.NewBuffer(dg.Data))
if err != nil {
return errors.Annotate(
err, "Datagram is marked as compressed, but failed to open zlib stream",
if data, err = io.ReadAll(z); err != nil {
return errors.Annotate(
err, "Datagram is marked as compressed, but failed to decompress",
// Attempt to decode the protobuf.
if err := proto.Unmarshal(data, toUnmarshal); err != nil {
return err
as.finished = (state.State.TerminalIndex >= 0 &&
le.StreamIndex == uint64(state.State.TerminalIndex))
return nil
func (as *AnnotationStream) toMiloBuild(c context.Context) *ui.MiloBuildLegacy {
prefix, name := as.Path.Split()
// Prepare a Streams object with only one stream.
streams := Streams{
MainStream: &Stream{
Server: as.Client.Host,
Prefix: string(prefix),
Path: string(name),
IsDatagram: true,
Data: as.step,
Closed: as.finished,
var (
build ui.MiloBuildLegacy
ub = ViewerURLBuilder{
Host: as.Client.Host,
Project: as.Project,
Prefix: prefix,
AddLogDogToBuild(c, &ub, streams.MainStream.Data, &build)
return &build
// ViewerURLBuilder is a URL builder that constructs LogDog viewer URLs.
type ViewerURLBuilder struct {
Host string
Prefix types.StreamName
Project string
// NewURLBuilder creates a new URLBuilder that can generate links to LogDog
// pages given a LogDog StreamAddr.
func NewURLBuilder(addr *types.StreamAddr) *ViewerURLBuilder {
prefix, _ := addr.Path.Split()
return &ViewerURLBuilder{
Host: addr.Host,
Prefix: prefix,
Project: addr.Project,
// BuildLink implements URLBuilder.
func (b *ViewerURLBuilder) BuildLink(l *annopb.AnnotationLink) *ui.Link {
switch t := l.Value.(type) {
case *annopb.AnnotationLink_LogdogStream:
ls := t.LogdogStream
server := ls.Server
if server == "" {
server = b.Host
prefix := types.StreamName(ls.Prefix)
if prefix == "" {
prefix = b.Prefix
u := viewer.GetURL(server, b.Project, prefix.Join(types.StreamName(ls.Name)))
link := ui.NewLink(l.Label, u, fmt.Sprintf("logdog link for %s", l.Label))
if link.Label == "" {
link.Label = ls.Name
return link
case *annopb.AnnotationLink_Url:
link := ui.NewLink(l.Label, t.Url, fmt.Sprintf("step link for %s", l.Label))
if link.Label == "" {
link.Label = "unnamed"
return link
// Don't know how to render.
return nil
// GetBuild returns either a MiloBuildLegacy or a Build from a raw datagram
// stream.
// The type of return value is determined by the content type of the stream.
func GetBuild(c context.Context, host string, project string, path types.StreamPath) (*ui.MiloBuildLegacy, *ui.BuildPage, error) {
as := AnnotationStream{
Project: project,
Path: path,
if err := as.normalize(); err != nil {
return nil, nil, err
// Setup our LogDog client.
var err error
if as.Client, err = NewClient(c, host); err != nil {
return nil, nil, errors.Annotate(err, "generating LogDog Client").Err()
// Load the Milo annotation protobuf from the annotation stream.
switch err := as.populateCache(c); errors.Unwrap(err) {
case nil, errNoEntries:
case coordinator.ErrNoSuchStream:
return nil, nil, grpcutil.NotFoundTag.Apply(err)
case coordinator.ErrNoAccess:
return nil, nil, grpcutil.PermissionDeniedTag.Apply(err)
case errNotMilo, errNotDatagram:
// The user requested a LogDog url that isn't a Milo annotation.
return nil, nil, grpcutil.InvalidArgumentTag.Apply(err)
return nil, nil, errors.Annotate(err, "failed to load stream").Err()
if as.step != nil {
return as.toMiloBuild(c), nil, nil
now := timestamppb.New(clock.Now(c))
return nil, &ui.BuildPage{Build: ui.Build{Build:, Now: now}}, nil
// ReadAnnotations synchronously reads and decodes the latest Step information
// from the provided StreamAddr.
func ReadAnnotations(c context.Context, addr *types.StreamAddr) (*annopb.Step, error) {
log.Infof(c, "Loading build from LogDog stream at: %s", addr)
client, err := NewClient(c, addr.Host)
if err != nil {
return nil, errors.Annotate(err, "failed to create LogDog client").Err()
as := AnnotationStream{
Client: client,
Project: addr.Project,
Path: addr.Path,
if err := as.normalize(); err != nil {
return nil, errors.Annotate(err, "failed to normalize annotation stream parameters").Err()
if err := as.populateCache(c); err != nil {
return nil, err
if as.step == nil {
return nil, errors.New("stream does not contain annopb.Step")
return as.step, nil