blob: 9414bbb7c173270ff0ed36b97b0d158ede2a963f [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logs
import (
"context"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
logdog "go.chromium.org/luci/logdog/api/endpoints/coordinator/logs/v1"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/appengine/coordinator"
"go.chromium.org/luci/logdog/appengine/coordinator/flex"
"go.chromium.org/luci/logdog/common/storage"
"go.chromium.org/luci/logdog/common/types"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/server/auth/realms"
)
const (
// getInitialArraySize is the initial amount of log slots to allocate for a
// Get request.
getInitialArraySize = 256
// getBytesLimit is the maximum amount of data that we are willing to query.
//
// We will limit byte responses to 16MB, based on the following constraints:
// - AppEngine cannot respond with more than 32MB of data. This includes JSON
// overhead, including notation and base64 data expansion.
// - `urlfetch`, which is used for Google Cloud Storage (archival) responses,
// cannot handle responses larger than 32MB.
getBytesLimit = 16 * 1024 * 1024
)
// Get returns state and log data for a single log stream.
func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResponse, error) {
return s.getImpl(c, req, false)
}
// Tail returns the last log entry for a given log stream.
func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetResponse, error) {
r := logdog.GetRequest{
Project: req.Project,
Path: req.Path,
State: req.State,
}
return s.getImpl(c, &r, true)
}
// getImpl is common code shared between Get and Tail endpoints.
func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (*logdog.GetResponse, error) {
log.Fields{
"project": req.Project,
"path": req.Path,
"index": req.Index,
"tail": tail,
}.Debugf(c, "Received get request.")
// Fetch the stream metadata, including its state.
fetcher := coordinator.MetadataFetcher{
Path: types.StreamPath(req.Path),
WithState: true,
}
if err := fetcher.FetchWithACLCheck(c); err != nil {
return nil, err
}
// If this log entry is Purged and we're not admin, pretend it doesn't exist.
if fetcher.Stream.Purged {
switch yes, err := coordinator.CheckAdminUser(c); {
case err != nil:
return nil, status.Error(codes.Internal, "internal server error")
case !yes:
log.Warningf(c, "Non-superuser requested purged log.")
return nil, status.Errorf(codes.NotFound, "path not found")
}
}
resp := logdog.GetResponse{Project: req.Project}
if fetcher.Prefix.Realm != "" {
_, resp.Realm = realms.Split(fetcher.Prefix.Realm)
}
if req.State {
resp.State = buildLogStreamState(fetcher.Stream, fetcher.State)
var err error
resp.Desc, err = fetcher.Stream.DescriptorProto()
if err != nil {
log.WithError(err).Errorf(c, "Failed to deserialize descriptor protobuf.")
return nil, status.Error(codes.Internal, "internal server error")
}
}
// Retrieve requested logs from storage, if requested.
startTime := clock.Now(c)
if err := s.getLogs(c, req, &resp, tail, fetcher.Stream, fetcher.State); err != nil {
log.WithError(err).Errorf(c, "Failed to get logs.")
return nil, status.Error(codes.Internal, "internal server error")
}
log.Fields{
"duration": clock.Now(c).Sub(startTime).String(),
"logCount": len(resp.Logs),
}.Debugf(c, "Get request completed successfully.")
return &resp, nil
}
func (s *server) getLogs(c context.Context, req *logdog.GetRequest, resp *logdog.GetResponse,
tail bool, ls *coordinator.LogStream, lst *coordinator.LogStreamState) error {
// Identify our URL signing parameters.
var signingRequest coordinator.URLSigningRequest
if sr := req.GetSignedUrls; sr != nil {
signingRequest.Lifetime = sr.Lifetime.AsDuration()
signingRequest.Stream = sr.Stream
signingRequest.Index = sr.Index
}
if !tail && req.LogCount < 0 && !signingRequest.HasWork() {
// No log operations are actually needed, so don't bother instanting our
// Storage instance only to do nothing.
return nil
}
svc := flex.GetServices(c)
st, err := svc.StorageForStream(c, lst, coordinator.Project(c))
if err != nil {
return errors.Annotate(err, "failed to create storage instance").Err()
}
defer st.Close()
project, path := coordinator.Project(c), ls.Path()
if tail {
resp.Logs, err = getTail(c, st, project, path)
} else if req.LogCount >= 0 {
byteLimit := int(req.ByteCount)
if byteLimit <= 0 || byteLimit > getBytesLimit {
byteLimit = getBytesLimit
}
resp.Logs, err = getHead(c, req, st, project, path, byteLimit)
}
if err != nil {
log.WithError(err).Errorf(c, "Failed to fetch log records.")
return err
}
// If we're requesting a signedl URL, try and get that too.
if signingRequest.HasWork() {
signedURLs, err := st.GetSignedURLs(c, &signingRequest)
switch {
case err != nil:
return errors.Annotate(err, "failed to generate signed URL").Err()
case signedURLs == nil:
log.Debugf(c, "Signed URL was requested, but is not supported by storage.")
default:
resp.SignedUrls = &logdog.GetResponse_SignedUrls{
Expiration: timestamppb.New(signedURLs.Expiration),
Stream: signedURLs.Stream,
Index: signedURLs.Index,
}
}
}
return nil
}
func getHead(c context.Context, req *logdog.GetRequest, st coordinator.SigningStorage, project string,
path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) {
log.Fields{
"project": project,
"path": path,
"index": req.Index,
"count": req.LogCount,
"bytes": req.ByteCount,
"noncontiguous": req.NonContiguous,
}.Debugf(c, "Issuing Get request.")
// Allocate result logs array.
logCount := int(req.LogCount)
asz := getInitialArraySize
if logCount > 0 && logCount < asz {
asz = logCount
}
logs := make([]*logpb.LogEntry, 0, asz)
sreq := storage.GetRequest{
Project: project,
Path: path,
Index: types.MessageIndex(req.Index),
Limit: logCount,
}
var ierr error
count := 0
err := retry.Retry(c, transient.Only(retry.Default), func() error {
// Issue the Get request. This may return a transient error, in which case
// we will retry.
return st.Get(c, sreq, func(e *storage.Entry) bool {
var le *logpb.LogEntry
if le, ierr = e.GetLogEntry(); ierr != nil {
return false
}
if count > 0 && byteLimit-len(e.D) < 0 {
// Not the first log, and we've exceeded our byte limit.
return false
}
sidx, _ := e.GetStreamIndex() // GetLogEntry succeeded, so this must.
if !(req.NonContiguous || sidx == sreq.Index) {
return false
}
logs = append(logs, le)
sreq.Index = sidx + 1
byteLimit -= len(e.D)
count++
return !(logCount > 0 && count >= logCount)
})
}, func(err error, delay time.Duration) {
log.Fields{
log.ErrorKey: err,
"delay": delay,
"initialIndex": req.Index,
"nextIndex": sreq.Index,
"count": len(logs),
}.Warningf(c, "Transient error while loading logs; retrying.")
})
switch err {
case nil:
if ierr != nil {
log.WithError(ierr).Errorf(c, "Bad log entry data.")
return nil, ierr
}
return logs, nil
case storage.ErrDoesNotExist:
return nil, nil
default:
log.Fields{
log.ErrorKey: err,
"initialIndex": req.Index,
"nextIndex": sreq.Index,
"count": len(logs),
}.Errorf(c, "Failed to execute range request.")
return nil, err
}
}
func getTail(c context.Context, st coordinator.SigningStorage, project string, path types.StreamPath) (
[]*logpb.LogEntry, error) {
log.Fields{
"project": project,
"path": path,
}.Debugf(c, "Issuing Tail request.")
var e *storage.Entry
err := retry.Retry(c, transient.Only(retry.Default), func() (err error) {
e, err = st.Tail(c, project, path)
return
}, func(err error, delay time.Duration) {
log.Fields{
log.ErrorKey: err,
"delay": delay,
}.Warningf(c, "Transient error while fetching tail log; retrying.")
})
switch err {
case nil:
le, err := e.GetLogEntry()
if err != nil {
log.WithError(err).Errorf(c, "Failed to load tail entry data.")
return nil, err
}
return []*logpb.LogEntry{le}, nil
case storage.ErrDoesNotExist:
return nil, nil
default:
log.WithError(err).Errorf(c, "Failed to fetch tail log.")
return nil, err
}
}