blob: dc23cb9d7f6c3a99f8fa029170f1572f2219e6d9 [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"os/signal"
"runtime"
"strings"
"time"
"cloud.google.com/go/compute/metadata"
"github.com/maruel/subcommands"
"go.chromium.org/luci/auth"
"go.chromium.org/luci/auth/client/authcli"
"go.chromium.org/luci/client/versioncli"
"go.chromium.org/luci/common/cli"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"go.chromium.org/luci/common/tsmon"
"go.chromium.org/luci/common/tsmon/target"
"infra/libs/infraenv"
"infra/tools/cloudtail"
)
// Where to look for service account JSON creds if not provided via CLI.
const (
defaultServiceAccountNewMac = "/opt/creds/service_accounts/service-account-cloudtail.json"
defaultServiceAccountPosix = "/creds/service_accounts/service-account-cloudtail.json"
defaultServiceAccountWin = "C:\\creds\\service_accounts\\service-account-cloudtail.json"
)
////////////////////////////////////////////////////////////////////////////////
// Common functions and structs.
type commonOptions struct {
authFlags authcli.Flags
tsmonFlags tsmon.Flags
localLogLevel logging.Level
localLogFile string
flushTimeout time.Duration
bufferingTime time.Duration
debug bool
teeToStdout bool
projectID string
resourceType string
resourceID string
logID string
}
type state struct {
id cloudtail.ClientID
client cloudtail.Client
buffer cloudtail.PushBuffer
cleanups []func()
}
func (s *state) cleanup() {
for _, f := range s.cleanups {
f()
}
}
// registerFlags adds all CLI flags to the flag set.
func (opts *commonOptions) registerFlags(f *flag.FlagSet, defaultAuthOpts auth.Options, defaultAutoFlush bool) {
// Default log level.
opts.localLogLevel = logging.Warning
opts.authFlags.Register(f, defaultAuthOpts)
f.Var(&opts.localLogLevel, "local-log-level",
"The logging level of local logger (for cloudtail own logs): debug, info, warning, error")
f.StringVar(&opts.localLogFile, "local-log-file", "",
"If set, local logger (for cloudtail own logs) will log into given file. Else, logs to stderr")
f.DurationVar(&opts.flushTimeout, "flush-timeout", 5*time.Second,
"How long to wait for all pending data to be flushed when exiting")
f.DurationVar(&opts.bufferingTime, "buffering-time", cloudtail.DefaultFlushTimeout,
"How long to buffer a log line before flushing it (larger values improve batching at the cost of latency)")
f.BoolVar(&opts.teeToStdout, "tee-to-stdout", false,
"If set, will reprint all consumed input to stdout (in addition to sending it to Cloud Logging)")
f.BoolVar(&opts.debug, "debug", false,
"If set, will print Cloud Logging calls to stdout instead of sending them")
f.StringVar(&opts.projectID, "project-id", "", "Cloud project ID to push logs to")
f.StringVar(&opts.resourceType, "resource-type", "machine", "What kind of entity produces the log (e.g. 'master')")
f.StringVar(&opts.resourceID, "resource-id", "", "Identifier of the entity producing the log")
f.StringVar(&opts.logID, "log-id", "default", "ID of the log")
opts.tsmonFlags = tsmon.NewFlags()
opts.tsmonFlags.Target.TargetType = target.TaskType
opts.tsmonFlags.Target.TaskServiceName = "cloudtail"
if defaultAutoFlush {
opts.tsmonFlags.Flush = "auto"
}
opts.tsmonFlags.Register(f)
}
// processFlags validates flags, creates and configures logger, client, etc.
func (opts *commonOptions) processFlags(ctx context.Context) (context.Context, state, error) {
state := state{}
// Logger.
// gologger.StdConfig to stderr is already configured in getApplication.
if opts.localLogFile != "" {
f, err := os.OpenFile(opts.localLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return ctx, state, fmt.Errorf("Local log file %q not writable: %s", opts.localLogFile, err)
}
ctx = (&gologger.LoggerConfig{
Out: f,
}).Use(ctx)
state.cleanups = append(state.cleanups, func() { _ = f.Close() })
}
logging.SetLevel(ctx, opts.localLogLevel)
// Auth options.
authOpts, err := opts.authFlags.Options()
if err != nil {
return ctx, state, err
}
if opts.projectID == "" {
if authOpts.ServiceAccountJSONPath != "" {
opts.projectID = projectIDFromServiceAccountJSON(authOpts.ServiceAccountJSONPath)
if opts.projectID != "" {
logging.Debugf(ctx, "Guessed project ID from the service account JSON: %s", opts.projectID)
}
}
if opts.projectID == "" {
return ctx, state, fmt.Errorf("-project-id is required")
}
}
// Tsmon options.
if opts.tsmonFlags.Target.TaskJobName == "" {
opts.tsmonFlags.Target.TaskJobName = fmt.Sprintf(
"%s-%s-%s", opts.logID, opts.resourceType, opts.resourceID)
}
if err = tsmon.InitializeFromFlags(ctx, &opts.tsmonFlags); err != nil {
return ctx, state, err
}
state.cleanups = append(state.cleanups, func() { tsmon.Shutdown(ctx) })
// Client.
httpClient, err := auth.NewAuthenticator(ctx, auth.SilentLogin, authOpts).Client()
if err != nil {
return ctx, state, err
}
state.id = cloudtail.ClientID{
ResourceType: opts.resourceType,
ResourceID: opts.resourceID,
LogID: opts.logID,
}
state.client, err = cloudtail.NewClient(cloudtail.ClientOptions{
ClientID: state.id,
Client: httpClient,
ProjectID: opts.projectID,
Debug: opts.debug,
})
if err != nil {
return ctx, state, err
}
// Buffer.
state.buffer = cloudtail.NewPushBuffer(cloudtail.PushBufferOptions{
Client: state.client,
FlushTimeout: opts.bufferingTime,
})
return ctx, state, nil
}
// defaultServiceAccountJSON returns path to a default service account
// credentials file if it exists.
func defaultServiceAccountJSONPath() string {
path := defaultServiceAccountPosix
switch runtime.GOOS {
case "windows":
path = defaultServiceAccountWin
case "darwin":
if _, err := os.Stat(defaultServiceAccountNewMac); err == nil {
path = defaultServiceAccountNewMac
}
}
// Ensure its readable by opening it.
f, err := os.Open(path)
if err != nil {
return ""
}
f.Close()
return path
}
// projectIDFromServiceAccountJSON extracts Cloud Project ID from the service
// account JSON.
//
// It tries to use 'project_id' key, if present, and falls back to email
// parsing otherwise (for old JSON files that don't have project_id field, but
// use "<projectid>-stuff@developer.gserviceaccount.com" email format).
//
// Returns empty string if can't do it.
func projectIDFromServiceAccountJSON(path string) string {
if path == auth.GCEServiceAccount {
p, err := metadata.ProjectID()
if err != nil {
return ""
}
return p
}
f, err := os.Open(path)
if err != nil {
return ""
}
defer f.Close()
var sa struct {
ProjectID string `json:"project_id"`
ClientEmail string `json:"client_email"`
}
if err := json.NewDecoder(f).Decode(&sa); err != nil {
return ""
}
if sa.ProjectID != "" {
return sa.ProjectID
}
// Expected form: <projectid>-stuff@developer.gserviceaccount.com.
chunks := strings.Split(sa.ClientEmail, "@")
if len(chunks) != 2 || chunks[1] != "developer.gserviceaccount.com" {
return ""
}
chunks = strings.Split(chunks[0], "-")
if len(chunks) != 2 {
return ""
}
return chunks[0]
}
func catchCtrlC(handler func() error) {
ctrlC := make(chan os.Signal, 1)
signal.Notify(ctrlC, os.Interrupt)
go func() {
stopCalled := false
for range ctrlC {
if !stopCalled {
stopCalled = true
fmt.Fprintln(os.Stderr, "\nCaught Ctrl+C, flushing and exiting... Send another Ctrl+C to kill.")
if err := handler(); err != nil {
fmt.Fprintln(os.Stderr, "\n", err)
}
} else {
os.Exit(2)
}
}
}()
}
////////////////////////////////////////////////////////////////////////////////
// 'send' subcommand: sends a single line passed as CLI argument.
func cmdSend(defaultAuthOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "send [options] -severity SEVERITY -text TEXT",
ShortDesc: "sends a single entry to a cloud log",
LongDesc: "Sends a single entry to a cloud log.",
CommandRun: func() subcommands.CommandRun {
c := &sendRun{}
c.commonOptions.registerFlags(&c.Flags, defaultAuthOpts, false)
c.Flags.Var(&c.severity, "severity", "Log entry severity")
c.Flags.StringVar(&c.text, "text", "", "Log entry to send")
return c
},
}
}
type sendRun struct {
subcommands.CommandRunBase
commonOptions
severity cloudtail.Severity
text string
}
func (c *sendRun) Run(a subcommands.Application, args []string, env subcommands.Env) int {
if len(args) != 0 {
fmt.Fprintf(os.Stderr, "Cloudtail send doesn't accept positional command line arguments %q\n", args)
return 1
}
if c.text == "" {
fmt.Fprintln(os.Stderr, "-text is required")
return 1
}
ctx, state, err := c.commonOptions.processFlags(cli.GetContext(a, c, env))
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return 1
}
defer tsmon.Shutdown(ctx)
if c.teeToStdout {
fmt.Println(c.text)
}
// Sending one item shouldn't involve any buffering. So make SIGINT
// (or timeout) abort the whole pipeline right away.
ctx, abort := context.WithCancel(ctx)
catchCtrlC(func() error {
abort()
return nil
})
ctx, _ = clock.WithTimeout(ctx, c.flushTimeout)
state.buffer.Start(ctx)
state.buffer.Send(ctx, cloudtail.Entry{
Timestamp: time.Now(),
Severity: c.severity,
TextPayload: c.text,
})
if err := state.buffer.Stop(ctx); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return 1
}
return 0
}
////////////////////////////////////////////////////////////////////////////////
// 'pipe' subcommand: reads stdin and sends each line as a separate log entry.
func cmdPipe(defaultAuthOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "pipe [options]",
ShortDesc: "sends each line of stdin as a separate log entry",
LongDesc: "Sends each line of stdin as a separate log entry",
CommandRun: func() subcommands.CommandRun {
c := &pipeRun{}
c.commonOptions.registerFlags(&c.Flags, defaultAuthOpts, true)
c.Flags.IntVar(&c.lineBufferSize, "line-buffer-size", 100000,
"Number of log lines to buffer in-memory. 0 disables buffering and "+
"makes cloudtail block while flushing lines to the API.")
return c
},
}
}
type pipeRun struct {
subcommands.CommandRunBase
commonOptions
lineBufferSize int
}
func (c *pipeRun) Run(a subcommands.Application, args []string, env subcommands.Env) int {
if len(args) != 0 {
fmt.Fprintf(os.Stderr, "Cloudtail pipe doesn't accept positional command line arguments %q\n", args)
return 1
}
ctx, state, err := c.commonOptions.processFlags(cli.GetContext(a, c, env))
if err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}
defer tsmon.Shutdown(ctx)
var input io.Reader = os.Stdin
if c.teeToStdout {
input = io.TeeReader(input, os.Stdout)
}
// We need to wrap stdin in a io.Pipe to be able to prematurely abort reads on
// SIGINT. There seem to be no reliable way of aborting pending os.Stdin read
// on Linux. Closing the file descriptor doesn't work. So on SIGINT we keep
// the blocked stdin read hanging in the goroutine, but forcefully close the
// write end of the pipe to shutdown cloudtail.PipeReader below. Eventually
// stdin read unblocks, finds a closed io.Pipe and aborts the goroutine.
pipeR, pipeW := io.Pipe()
go func() {
defer pipeW.Close()
io.Copy(pipeW, input)
}()
catchCtrlC(pipeW.Close)
pipeReader := cloudtail.PipeReader{
ClientID: state.id,
Source: pipeR,
PushBuffer: state.buffer,
Parser: cloudtail.StdParser(),
LineBufferSize: c.lineBufferSize,
}
// On EOF (which also happens on SIGINT) start a countdown that will abort
// the context and unblock everything even if some data wasn't sent.
ctx, abort := context.WithCancel(ctx)
pipeReader.OnEOF = func() {
logging.Debugf(ctx, "EOF detected, aborting in %s", c.flushTimeout)
go func() {
time.Sleep(c.flushTimeout)
abort()
}()
}
state.buffer.Start(ctx)
err1 := pipeReader.Run(ctx)
if err1 != nil {
fmt.Fprintln(os.Stderr, err1)
}
err2 := state.buffer.Stop(ctx)
if err2 != nil {
fmt.Fprintln(os.Stderr, err2)
}
if err1 != nil || err2 != nil {
return 1
}
return 0
}
////////////////////////////////////////////////////////////////////////////////
// 'tail' subcommand: tails a file and sends each line as a log entry.
func cmdTail(defaultAuthOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "tail [options] -path PATH",
ShortDesc: "tails a file and sends each line as a log entry",
LongDesc: "Tails a file and sends each line as a log entry. Stops by SIGINT.",
CommandRun: func() subcommands.CommandRun {
c := &tailRun{}
c.commonOptions.registerFlags(&c.Flags, defaultAuthOpts, true)
c.Flags.StringVar(&c.path, "path", "", "Path to a file to tail")
return c
},
}
}
type tailRun struct {
subcommands.CommandRunBase
commonOptions
path string
}
func (c *tailRun) Run(a subcommands.Application, args []string, env subcommands.Env) int {
if len(args) != 0 {
fmt.Fprintf(os.Stderr, "Cloudtail tail doesn't accept positional command line arguments %q\n", args)
return 1
}
if c.path == "" {
fmt.Fprintln(os.Stderr, "-path is required")
return 1
}
ctx, state, err := c.commonOptions.processFlags(cli.GetContext(a, c, env))
defer state.cleanup()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}
var teeOutput io.Writer
if c.teeToStdout {
teeOutput = os.Stdout
}
ctx, abort := context.WithCancel(ctx)
defer abort()
state.buffer.Start(ctx)
tailer, err := cloudtail.NewTailer(cloudtail.TailerOptions{
Path: c.path,
Parser: cloudtail.StdParser(),
TeeOutput: teeOutput,
PushBuffer: state.buffer,
SeekToEnd: true,
})
if err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}
// Send the stop signal through tailer.Stop to properly flush everything.
// In 'tail' mode, unlike 'pipe' mode, Ctrl+C is the only stop signal
// (there's no EOF). At the same time start a countdown that will abort
// the context and unblock everything even if some data wasn't sent.
catchCtrlC(func() error {
go func() {
time.Sleep(c.flushTimeout)
abort()
}()
tailer.Stop()
return nil
})
tailer.Run(ctx) // this will block until some time after tailer.Stop is called
// Wait until we flush everything or until the timeout. Second SIGINT
// kills the process immediately anyhow (see catchCtrlC).
if err := state.buffer.Stop(ctx); err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}
return 0
}
////////////////////////////////////////////////////////////////////////////////
func getApplication(defaultAuthOpts auth.Options) *cli.Application {
return &cli.Application{
Name: "cloudtail",
Title: "Tail logs and send them to Cloud Logging",
Context: func(ctx context.Context) context.Context {
return gologger.StdConfig.Use(ctx)
},
EnvVars: map[string]subcommands.EnvVarDefinition{
"CLOUDTAIL_DEBUG_EMULATE_429": {
Advanced: true,
ShortDesc: "DEBUG/TEST: Non-empty values emulate a server response of 'Too Many Requests'.",
},
},
Commands: []*subcommands.Command{
subcommands.CmdHelp,
versioncli.CmdVersion("cloudtail"),
// Main commands.
cmdSend(defaultAuthOpts),
cmdPipe(defaultAuthOpts),
cmdTail(defaultAuthOpts),
// Authentication related commands.
authcli.SubcommandInfo(defaultAuthOpts, "whoami", false),
authcli.SubcommandLogin(defaultAuthOpts, "login", false),
authcli.SubcommandLogout(defaultAuthOpts, "logout", false),
},
}
}
func main() {
mathrand.SeedRandomly()
authOpts := infraenv.DefaultAuthOptions()
authOpts.ServiceAccountJSONPath = defaultServiceAccountJSONPath()
authOpts.Scopes = []string{
auth.OAuthScopeEmail,
"https://www.googleapis.com/auth/logging.write",
}
os.Exit(subcommands.Run(getApplication(authOpts), nil))
}