blob: d9d0167f5c38526def77559d7fbafb7917c0303c [file]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rawpresentation
import (
"strings"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/google"
miloProto "github.com/luci/luci-go/common/proto/milo"
"github.com/luci/luci-go/logdog/api/logpb"
"github.com/luci/luci-go/logdog/client/coordinator"
"github.com/luci/luci-go/logdog/common/types"
"github.com/luci/luci-go/logdog/common/viewer"
"github.com/luci/luci-go/luci_config/common/cfgtypes"
"github.com/luci/luci-go/milo/api/resp"
"github.com/luci/luci-go/milo/buildsource/rawpresentation/internal"
"github.com/luci/luci-go/milo/common"
)
const (
// intermediateCacheLifetime is the amount of time to cache intermediate (non-
// terminal) annotation streams. Terminal annotation streams are cached
// indefinitely.
intermediateCacheLifetime = 10 * time.Second
// defaultLogDogHost is the default LogDog host, if one isn't specified via
// query string.
defaultLogDogHost = "luci-logdog.appspot.com"
)
// AnnotationStream represents a LogDog annotation protobuf stream.
type AnnotationStream struct {
Project cfgtypes.ProjectName
Path types.StreamPath
// Client is the HTTP client to use for LogDog communication.
Client *coordinator.Client
// cs is the unmarshalled annotation stream Step and associated data.
cs internal.CachedStep
}
// Normalize validates and normalizes the stream's parameters.
func (as *AnnotationStream) Normalize() error {
if err := as.Project.Validate(); err != nil {
return errors.Annotate(err, "Invalid project name: %s", as.Project).Tag(common.CodeParameterError).Err()
}
if err := as.Path.Validate(); err != nil {
return errors.Annotate(err, "Invalid log stream path %q", as.Path).Tag(common.CodeParameterError).Err()
}
return nil
}
var errNotMilo = errors.New("Requested stream is not a Milo annotation protobuf")
var errNotDatagram = errors.New("Requested stream is not a datagram stream")
var errNoEntries = errors.New("Log stream has no annotation entries")
// Fetch loads the annotation stream from LogDog.
//
// If the stream does not exist, or is invalid, Fetch will return a Milo error.
// Otherwise, it will return the Step that was loaded.
//
// Fetch caches the step, so multiple calls to Fetch will return the same Step
// value.
func (as *AnnotationStream) Fetch(c context.Context) (*miloProto.Step, error) {
// Cached?
if as.cs.Step != nil {
return as.cs.Step, nil
}
// Load from LogDog directly.
log.Fields{
"host": as.Client.Host,
"project": as.Project,
"path": as.Path,
}.Infof(c, "Making tail request to LogDog to fetch annotation stream.")
var (
state coordinator.LogStream
stream = as.Client.Stream(as.Project, as.Path)
)
le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Complete())
if err != nil {
log.WithError(err).Errorf(c, "Failed to load stream.")
return nil, err
}
// Make sure that this is an annotation stream.
switch {
case state.Desc.ContentType != miloProto.ContentTypeAnnotations:
return nil, errNotMilo
case state.Desc.StreamType != logpb.StreamType_DATAGRAM:
return nil, errNotDatagram
case le == nil:
// No annotation stream data, so render a minimal page.
return nil, errNoEntries
}
// Get the last log entry in the stream. In reality, this will be index 0,
// since the "Tail" call should only return one log entry.
//
// Because we supplied the "Complete" flag to Tail and suceeded, this datagram
// will be complete even if its source datagram(s) are fragments.
dg := le.GetDatagram()
if dg == nil {
return nil, errors.New("Datagram stream does not have datagram data")
}
// Attempt to decode the Step protobuf.
var step miloProto.Step
if err := proto.Unmarshal(dg.Data, &step); err != nil {
return nil, err
}
var latestEndedTime time.Time
for _, sub := range step.Substep {
switch t := sub.Substep.(type) {
case *miloProto.Step_Substep_AnnotationStream:
// TODO(hinoka,dnj): Implement recursive / embedded substream fetching if
// specified.
log.Warningf(c, "Annotation stream links LogDog substream [%+v], not supported!", t.AnnotationStream)
case *miloProto.Step_Substep_Step:
endedTime := google.TimeFromProto(t.Step.Ended)
if t.Step.Ended != nil && endedTime.After(latestEndedTime) {
latestEndedTime = endedTime
}
}
}
if latestEndedTime.IsZero() {
// No substep had an ended time :(
latestEndedTime = google.TimeFromProto(step.Started)
}
// Build our CachedStep.
as.cs = internal.CachedStep{
Step: &step,
Finished: (state.State.TerminalIndex >= 0 && le.StreamIndex == uint64(state.State.TerminalIndex)),
}
return as.cs.Step, nil
}
func (as *AnnotationStream) toMiloBuild(c context.Context) *resp.MiloBuild {
prefix, name := as.Path.Split()
// Prepare a Streams object with only one stream.
streams := Streams{
MainStream: &Stream{
Server: as.Client.Host,
Prefix: string(prefix),
Path: string(name),
IsDatagram: true,
Data: as.cs.Step,
Closed: as.cs.Finished,
},
}
var (
build resp.MiloBuild
ub = ViewerURLBuilder{
Host: as.Client.Host,
Project: as.Project,
Prefix: prefix,
}
)
AddLogDogToBuild(c, &ub, streams.MainStream.Data, &build)
return &build
}
// ViewerURLBuilder is a URL builder that constructs LogDog viewer URLs.
type ViewerURLBuilder struct {
Host string
Prefix types.StreamName
Project cfgtypes.ProjectName
}
// BuildLink implements URLBuilder.
func (b *ViewerURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
switch t := l.Value.(type) {
case *miloProto.Link_LogdogStream:
ls := t.LogdogStream
server := ls.Server
if server == "" {
server = b.Host
}
prefix := types.StreamName(ls.Prefix)
if prefix == "" {
prefix = b.Prefix
}
u := viewer.GetURL(server, b.Project, prefix.Join(types.StreamName(ls.Name)))
link := resp.NewLink(l.Label, u)
if link.Label == "" {
link.Label = ls.Name
}
return link
case *miloProto.Link_Url:
link := resp.NewLink(l.Label, t.Url)
if link.Label == "" {
link.Label = "unnamed"
}
return link
default:
// Don't know how to render.
return nil
}
}
// BuildID implements buildsource.ID.
type BuildID struct {
Host string
Project cfgtypes.ProjectName
Path types.StreamPath
}
// GetLog implements buildsource.ID.
func (b *BuildID) GetLog(context.Context, string) (string, bool, error) { panic("not implemented") }
// Get implements buildsource.ID.
func (b *BuildID) Get(c context.Context) (*resp.MiloBuild, error) {
as := AnnotationStream{
Project: b.Project,
Path: b.Path,
}
if err := as.Normalize(); err != nil {
return nil, err
}
// Setup our LogDog client.
var err error
if as.Client, err = NewClient(c, b.Host); err != nil {
return nil, errors.Annotate(err, "generating LogDog Client").Err()
}
// Load the Milo annotation protobuf from the annotation stream.
switch _, err := as.Fetch(c); errors.Unwrap(err) {
case nil, errNoEntries:
case coordinator.ErrNoSuchStream:
return nil, common.CodeNotFound.Tag().Apply(err)
case coordinator.ErrNoAccess:
return nil, common.CodeNoAccess.Tag().Apply(err)
case errNotMilo, errNotDatagram:
// The user requested a LogDog url that isn't a Milo annotation.
return nil, common.CodeParameterError.Tag().Apply(err)
default:
return nil, errors.Annotate(err, "failed to load stream").Err()
}
return as.toMiloBuild(c), nil
}
// NewBuildID generates a new un-validated BuildID.
func NewBuildID(host, project, path string) *BuildID {
return &BuildID{
strings.TrimSpace(host),
cfgtypes.ProjectName(project),
types.StreamPath(strings.Trim(path, "/")),
}
}