blob: 117f1f2f3fbe473c6fec2ad416fb1c296700d5e8 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cli
import (
"context"
"io"
"os"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/client/coordinator"
"go.chromium.org/luci/logdog/common/renderer"
"go.chromium.org/luci/logdog/common/types"
"github.com/maruel/subcommands"
)
type latestCommandRun struct {
subcommands.CommandRunBase
raw bool
}
func newLatestCommand() *subcommands.Command {
return &subcommands.Command{
UsageLine: "latest [options] stream",
ShortDesc: "Write the latest full log record in a stream to STDOUT.",
LongDesc: "Write the latest full log record in a stream to STDOUT. If the stream " +
"doesn't have any log entries, will block until a log entry is available.",
CommandRun: func() subcommands.CommandRun {
cmd := &latestCommandRun{}
cmd.Flags.BoolVar(&cmd.raw, "raw", false,
"Reproduce original log stream, instead of attempting to render for humans.")
return cmd
},
}
}
func (cmd *latestCommandRun) Run(scApp subcommands.Application, args []string, _ subcommands.Env) int {
a := scApp.(*application)
// User-friendly: trim any leading or trailing slashes from the path.
if len(args) != 1 {
log.Errorf(a, "Exactly one argument, the stream path, must be supplied.")
return 1
}
var addr *types.StreamAddr
var err error
if addr, err = types.ParseURL(args[0]); err != nil {
// Not a log stream address.
project, path, _, err := a.splitPath(args[0])
if err != nil {
log.WithError(err).Errorf(a, "Invalid path specifier.")
return 1
}
addr = &types.StreamAddr{Project: project, Path: types.StreamPath(path)}
if err := addr.Path.Validate(); err != nil {
log.Fields{
log.ErrorKey: err,
"project": addr.Project,
"path": addr.Path,
}.Errorf(a, "Invalid command-line stream path.")
return 1
}
}
coord, err := a.coordinatorClient(addr.Host)
if err != nil {
errors.Log(a, errors.Annotate(err, "failed to create Coordinator client").Err())
return 1
}
stream := coord.Stream(addr.Project, addr.Path)
tctx, _ := a.timeoutCtx(a)
le, st, err := cmd.getTailEntry(tctx, stream)
if err != nil {
log.Fields{
log.ErrorKey: err,
"project": addr.Project,
"path": addr.Path,
}.Errorf(a, "Failed to load latest record.")
if err == context.DeadlineExceeded {
return 2
}
return 1
}
// Render the entry.
r := renderer.Renderer{
Source: &renderer.StaticSource{le},
Raw: cmd.raw,
DatagramWriter: getDatagramWriter(a, &st.Desc),
}
if _, err := io.Copy(os.Stdout, &r); err != nil {
log.WithError(err).Errorf(a, "failed to write to output")
return 1
}
return 0
}
func (cmd *latestCommandRun) getTailEntry(c context.Context, s *coordinator.Stream) (
*logpb.LogEntry, *coordinator.LogStream, error) {
// Loop until we either hard fail or succeed.
var st coordinator.LogStream
delayTimer := clock.NewTimer(c)
defer delayTimer.Stop()
for {
ls, err := s.Tail(c, coordinator.Complete(), coordinator.WithState(&st))
// TODO(iannucci,dnj): use retry module + transient tags instead
delayTimer.Reset(5 * time.Second)
switch {
case err == nil:
return ls, &st, nil
case err == coordinator.ErrNoSuchStream, ls == nil:
log.WithError(err).Warningf(c, "No log entries, sleeping and retry.")
if ar := <-delayTimer.GetC(); ar.Incomplete() {
// Timer stopped prematurely.
return nil, nil, ar.Err
}
default:
return nil, nil, err
}
}
}