blob: 93632aa195fd569b1c6a7093c099cee4994bea75 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package server implements an environment for running LUCI servers.
//
// It interprets command line flags and initializes the serving environment with
// the following core services:
//
// • go.chromium.org/luci/common/logging: logging via Google Cloud Logging and
// error reporting via Google Cloud Error Reporting.
//
// • go.chromium.org/luci/common/trace: Tracing via Google Cloud Trace and
// profiling Google Cloud Profiler.
//
// • go.chromium.org/luci/server/tsmon: monitoring metrics via ProdX.
//
// • go.chromium.org/luci/server/auth: sending and receiving RPCs authenticated
// with Google OAuth2 or OpenID tokens. Support for authorization via LUCI
// groups and LUCI realms.
//
// • go.chromium.org/luci/server/caching: in-process caching.
//
// • go.chromium.org/luci/server/warmup: allows other server components to
// register warmup callbacks that run before the server starts handling
// requests.
//
// • go.chromium.org/luci/server/experiments: simple feature flags support.
//
// • go.chromium.org/luci/grpc/prpc: pRPC server and RPC Explorer UI.
//
// Other functionality is optional and provided by modules (objects implementing
// module.Module interface). They should be passed to the server when it starts
// (see the example below). Modules usually expose their configuration via
// command line flags, and provide functionality by injecting state into
// the server's global context.Context or by exposing pRPC endpoints.
//
// Usage example:
//
// import (
// ...
//
// "go.chromium.org/luci/server"
// "go.chromium.org/luci/server/gaeemulation"
// "go.chromium.org/luci/server/module"
// "go.chromium.org/luci/server/redisconn"
// )
//
// func main() {
// modules := []module.Module{
// gaeemulation.NewModuleFromFlags(),
// redisconn.NewModuleFromFlags(),
// }
// server.Main(nil, modules, func(srv *server.Server) error {
// // Initialize global state, change root context (if necessary).
// if err := initializeGlobalStuff(srv.Context); err != nil {
// return err
// }
// srv.Context = injectGlobalStuff(srv.Context)
//
// // Install regular HTTP routes.
// srv.Routes.GET("/", nil, func(c *router.Context) {
// // ...
// })
//
// // Install pRPC services.
// servicepb.RegisterSomeServer(srv.PRPC, &SomeServer{})
// return nil
// })
// }
//
// More examples can be found in the code search: https://source.chromium.org/search?q=%22server.Main(nil,%20modules,%22
//
// Known modules
//
// The following modules (in alphabetical order) are a part of the LUCI
// repository and can be used in any server binary:
//
// • go.chromium.org/luci/config/server/cfgmodule: provides LUCI Config client,
// exposes config validation endpoints used by LUCI Config service.
//
// • go.chromium.org/luci/server/bqlog: implements best effort low-overhead
// structured logging to BigQuery suitable for debug data like access logs.
//
// • go.chromium.org/luci/server/cron: allows registering Cloud Scheduler (aka
// Appengine cron.yaml) handlers, with proper authentication and monitoring
// metrics.
//
// • go.chromium.org/luci/server/encryptedcookies: implements an authentication
// scheme for HTTP routes based on encrypted cookies and user sessions in
// some session store.
//
// • go.chromium.org/luci/server/dsmapper: provides a way to apply some function
// to all datastore entities of some particular kind, in parallel, distributing
// work via Cloud Tasks.
//
// • go.chromium.org/luci/server/gaeemulation: implements
// go.chromium.org/luci/gae Datastore interface via Google Cloud Datastore API.
// Named so because because it enables migration of GAEv1 apps to GAEv2 without
// touching datastore-related code.
//
// • go.chromium.org/luci/server/limiter: a simple load shedding mechanism that
// puts a limit on a number of concurrent gRPC/pRPC requests the server is
// handling.
//
// • go.chromium.org/luci/server/redisconn: a Redis client. Also enables Redis
// as a caching backend for go.chromium.org/luci/server/caching and for
// go.chromium.org/luci/gae/filter/dscache.
//
// • go.chromium.org/luci/server/secrets: enables generation and validation of
// HMAC-tagged tokens via go.chromium.org/luci/server/tokens.
//
// • go.chromium.org/luci/server/span: a Cloud Spanner client. Wraps Spanner API
// a bit to improve interoperability with other modules (in particular the TQ
// module).
//
// • go.chromium.org/luci/server/tq: implements a task queue mechanism on top of
// Cloud Tasks and Cloud PubSub. Also implements transactional task enqueuing
// when submitting tasks in a Cloud Datastore or a Cloud Spanner transaction.
//
// Most of them need to be configured via corresponding CLI flags to be useful.
// See implementation of individual modules for details.
//
// An up-to-date list of all known module implementations can be found here:
// https://source.chromium.org/search?q=%22NewModuleFromFlags()%20module.Module%22
//
// pRPC services
//
// The server.PRPC field is the primary grpc.ServiceRegistrar that should be
// used to expose server's public gRPC/pRPC APIs. It is pre-configured with a
// set of gRPC interceptors that collect performance metrics, catch panics and
// authenticate requests using OAuth2 access tokens. Modules can add more
// interceptors to the default interceptor chain.
//
// Security considerations
//
// The expected deployment environments are Kubernetes, Google App Engine and
// Google Cloud Run. In all cases the server is expected to be behind a load
// balancer (or a series of load balancers) that terminates TLS and sets
// X-Forwarded-For header as:
//
// [<untrusted part>,]<IP that connected to the LB>,<unimportant>[,<more>].
//
// Where <untrusted part> may be present if the original request from the
// Internet comes with X-Forwarded-For header. The IP specified there is not
// trusted, but the server assumes the load balancer at least sanitizes the
// format of this field.
//
// <IP that connected to the LB> is the end-client IP that can be used by the
// server for logs and for IP-allowlist checks.
//
// <unimportant> is "global forwarding rule external IP" for GKE or
// the constant "169.254.1.1" for GAE. It is unused. See
// https://cloud.google.com/load-balancing/docs/https for more info.
//
// <more> may be present if the request was proxied through more layers of
// load balancers while already inside the cluster. The server currently assumes
// this is not happening (i.e. <more> is absent, or, in other words, the client
// IP is second to last in the X-Forwarded-For list). If you need to recognize
// more layers of load balancing, please file a feature request to add a CLI
// flag specifying how many layers of load balancers to skip to get to the
// original IP.
package server
import (
"context"
cryptorand "crypto/rand"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"flag"
"fmt"
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/errorreporting"
credentials "cloud.google.com/go/iam/credentials/apiv1"
"cloud.google.com/go/profiler"
"golang.org/x/oauth2"
"google.golang.org/api/option"
credentialspb "google.golang.org/genproto/googleapis/iam/credentials/v1"
"google.golang.org/grpc"
"contrib.go.opencensus.io/exporter/stackdriver"
"go.opencensus.io/exporter/stackdriver/propagation"
octrace "go.opencensus.io/trace"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/errors"
luciflag "go.chromium.org/luci/common/flag"
"go.chromium.org/luci/common/iotools"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"go.chromium.org/luci/common/logging/sdlogger"
"go.chromium.org/luci/common/system/signals"
tsmoncommon "go.chromium.org/luci/common/tsmon"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/monitor"
"go.chromium.org/luci/common/tsmon/target"
"go.chromium.org/luci/hardcoded/chromeinfra" // should be used ONLY in Main()
"go.chromium.org/luci/grpc/discovery"
"go.chromium.org/luci/grpc/grpcmon"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/grpc/prpc"
"go.chromium.org/luci/web/gowrappers/rpcexplorer"
clientauth "go.chromium.org/luci/auth"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/server/auth/authdb"
"go.chromium.org/luci/server/auth/authdb/dump"
"go.chromium.org/luci/server/auth/signing"
"go.chromium.org/luci/server/caching"
"go.chromium.org/luci/server/experiments"
"go.chromium.org/luci/server/internal"
"go.chromium.org/luci/server/middleware"
"go.chromium.org/luci/server/module"
"go.chromium.org/luci/server/portal"
"go.chromium.org/luci/server/router"
"go.chromium.org/luci/server/secrets"
"go.chromium.org/luci/server/tsmon"
"go.chromium.org/luci/server/warmup"
)
const (
// Path of the health check endpoint.
healthEndpoint = "/healthz"
// Log a warning if health check is slower than this.
healthTimeLogThreshold = 50 * time.Millisecond
)
var (
versionMetric = metric.NewString(
"server/version",
"Version of the running container image (taken from -container-image-id).",
nil)
)
// cloudRegionFromGAERegion maps GAE region codes (e.g. `s`) to corresponding
// cloud regions (e.g. `us-central1`), which may be defined as regions where GAE
// creates resources associated with the app, such as Task Queues or Flex VMs.
//
// Sadly this mapping is not documented, thus the below map is incomplete. Feel
// free to modify it if you deployed to some new GAE region.
//
// This mapping is unused if `-cloud-region` flag is passed explicitly.
var cloudRegionFromGAERegion = map[string]string{
"e": "europe-west1",
"g": "europe-west2",
"h": "europe-west3",
"m": "us-west2",
"p": "us-east1",
"s": "us-central1",
}
// Main initializes the server and runs its serving loop until SIGTERM.
//
// Registers all options in the default flag set and uses `flag.Parse` to parse
// them. If 'opts' is nil, the default options will be used. Only flags are
// allowed in the command line (no positional arguments).
//
// Additionally recognizes GAE_* env vars as an indicator that the server is
// running on GAE. This slightly tweaks its behavior to match what GAE expects
// from servers.
//
// On errors, logs them and aborts the process with non-zero exit code.
func Main(opts *Options, mods []module.Module, init func(srv *Server) error) {
mathrand.SeedRandomly()
if opts == nil {
opts = &Options{
ClientAuth: chromeinfra.SetDefaultAuthOptions(clientauth.Options{
Scopes: auth.CloudOAuthScopes, // matters only when using UserCredentialsMethod
}),
}
}
opts.Register(flag.CommandLine)
flag.Parse()
opts.FromGAEEnv()
srv, err := New(context.Background(), *opts, mods)
if err != nil {
srv.Fatal(err)
}
if args := flag.Args(); len(args) > 0 {
srv.Fatal(errors.Reason("got unexpected positional command line arguments: %v", args).Err())
}
if init != nil {
if err = init(srv); err != nil {
srv.Fatal(err)
}
}
if err = srv.ListenAndServe(); err != nil {
srv.Fatal(err)
}
}
// Options are used to configure the server.
//
// Most of them are exposed as command line flags (see Register implementation).
// Some (mostly GAE-specific) are only settable through code or are derived from
// the environment.
type Options struct {
Prod bool // set when running in production (not on a dev workstation)
GAE bool // set when running on GAE, implies Prod
Hostname string // used for logging and metric fields, default is os.Hostname
HTTPAddr string // address to bind the main listening socket to
AdminAddr string // address to bind the admin socket to, ignored on GAE
DefaultRequestTimeout time.Duration // how long non-internal HTTP handlers are allowed to run, 1 min by default
InternalRequestTimeout time.Duration // how long "/internal/*" HTTP handlers are allowed to run, 10 min by default
ShutdownDelay time.Duration // how long to wait after SIGTERM before shutting down
ClientAuth clientauth.Options // base settings for client auth options
TokenCacheDir string // where to cache auth tokens (optional)
AuthDBPath string // if set, load AuthDB from a file
AuthServiceHost string // hostname of an Auth Service to use
AuthDBDump string // Google Storage path to fetch AuthDB dumps from
AuthDBSigner string // service account that signs AuthDB dumps
FrontendClientID string // OAuth2 ClientID for frontend (e.g. user sign in)
CloudProject string // name of the hosting Google Cloud Project
CloudRegion string // name of the hosting Google Cloud region
TraceSampling string // what portion of traces to upload to Stackdriver (ignored on GAE)
TsMonAccount string // service account to flush metrics as
TsMonServiceName string // service name of tsmon target
TsMonJobName string // job name of tsmon target
ProfilingDisable bool // set to true to explicitly disable Stackdriver Profiler
ProfilingServiceID string // service name to associated with profiles in Stackdriver Profiler
ContainerImageID string // ID of the container image with this binary, for logs (optional)
EnableExperiments []string // names of go.chromium.org/luci/server/experiments to enable
CloudErrorReporting bool // set to true to enable Cloud Error Reporting
testSeed int64 // used to seed rng in tests
testStdout sdlogger.LogEntryWriter // mocks stdout in tests
testStderr sdlogger.LogEntryWriter // mocks stderr in tests
testListeners map[string]net.Listener // addr => net.Listener, for tests
testAuthDB authdb.DB // AuthDB to use in tests
testDisableTracing bool // don't install a tracing backend
}
// Register registers the command line flags.
func (o *Options) Register(f *flag.FlagSet) {
if o.HTTPAddr == "" {
o.HTTPAddr = "127.0.0.1:8800"
}
if o.AdminAddr == "" {
o.AdminAddr = "127.0.0.1:8900"
}
if o.DefaultRequestTimeout == 0 {
o.DefaultRequestTimeout = time.Minute
}
if o.InternalRequestTimeout == 0 {
o.InternalRequestTimeout = 10 * time.Minute
}
if o.ShutdownDelay == 0 {
o.ShutdownDelay = 15 * time.Second
}
f.BoolVar(&o.Prod, "prod", o.Prod, "Switch the server into production mode")
f.StringVar(&o.HTTPAddr, "http-addr", o.HTTPAddr, "Address to bind the main listening socket to or '-' to disable")
f.StringVar(&o.AdminAddr, "admin-addr", o.AdminAddr, "Address to bind the admin socket to or '-' to disable")
f.DurationVar(&o.DefaultRequestTimeout, "default-request-timeout", o.DefaultRequestTimeout, "How long incoming requests are allowed to run before being canceled (or 0 for infinity)")
f.DurationVar(&o.InternalRequestTimeout, "internal-request-timeout", o.InternalRequestTimeout, "How long incoming /internal/* requests are allowed to run before being canceled (or 0 for infinity)")
f.DurationVar(&o.ShutdownDelay, "shutdown-delay", o.ShutdownDelay, "How long to wait after SIGTERM before shutting down")
f.StringVar(
&o.ClientAuth.ServiceAccountJSONPath,
"service-account-json",
o.ClientAuth.ServiceAccountJSONPath,
"Path to a JSON file with service account private key",
)
f.StringVar(
&o.ClientAuth.ActAsServiceAccount,
"act-as",
o.ClientAuth.ActAsServiceAccount,
"Act as this service account",
)
f.StringVar(
&o.TokenCacheDir,
"token-cache-dir",
o.TokenCacheDir,
"Where to cache auth tokens (optional)",
)
f.StringVar(
&o.AuthDBPath,
"auth-db-path",
o.AuthDBPath,
"If set, load AuthDB text proto from this file (incompatible with -auth-service-host)",
)
f.StringVar(
&o.AuthServiceHost,
"auth-service-host",
o.AuthServiceHost,
"Hostname of an Auth Service to use (incompatible with -auth-db-path)",
)
f.StringVar(
&o.AuthDBDump,
"auth-db-dump",
o.AuthDBDump,
"Google Storage path to fetch AuthDB dumps from. Default is gs://<auth-service-host>/auth-db",
)
f.StringVar(
&o.AuthDBSigner,
"auth-db-signer",
o.AuthDBSigner,
"Service account that signs AuthDB dumps. Default is derived from -auth-service-host if it is *.appspot.com",
)
f.StringVar(
&o.FrontendClientID,
"frontend-client-id",
o.FrontendClientID,
"OAuth2 clientID for use in frontend, e.g. for user sign in (optional)",
)
f.StringVar(
&o.CloudProject,
"cloud-project",
o.CloudProject,
"Name of hosting Google Cloud Project (optional)",
)
f.StringVar(
&o.CloudRegion,
"cloud-region",
o.CloudRegion,
"Name of hosting Google Cloud region, e.g. 'us-central1' (optional)",
)
f.StringVar(
&o.TraceSampling,
"trace-sampling",
o.TraceSampling,
"What portion of traces to upload to Stackdriver. Either a percent (i.e. '0.1%') or a QPS (i.e. '1qps'). Ignored on GAE. Default is 0.1qps.",
)
f.StringVar(
&o.TsMonAccount,
"ts-mon-account",
o.TsMonAccount,
"Collect and flush tsmon metrics using this account for auth (disables tsmon if not set)",
)
f.StringVar(
&o.TsMonServiceName,
"ts-mon-service-name",
o.TsMonServiceName,
"Service name of tsmon target (disables tsmon if not set)",
)
f.StringVar(
&o.TsMonJobName,
"ts-mon-job-name",
o.TsMonJobName,
"Job name of tsmon target (disables tsmon if not set)",
)
f.BoolVar(
&o.ProfilingDisable,
"profiling-disable",
o.ProfilingDisable,
"Pass to explicitly disable Stackdriver Profiler",
)
f.StringVar(
&o.ProfilingServiceID,
"profiling-service-id",
o.ProfilingServiceID,
"Service name to associated with profiles in Stackdriver Profiler. Defaults to the value of -ts-mon-job-name.",
)
f.StringVar(
&o.ContainerImageID,
"container-image-id",
o.ContainerImageID,
"ID of the container image with this binary, for logs (optional)",
)
f.BoolVar(
&o.CloudErrorReporting,
"cloud-error-reporting",
o.CloudErrorReporting,
"Enable Cloud Error Reporting",
)
// See go.chromium.org/luci/server/experiments.
f.Var(luciflag.StringSlice(&o.EnableExperiments), "enable-experiment",
`A name of the experiment to enable. May be repeated.`)
}
// FromGAEEnv uses the GAE_* env vars to configure the server for the GAE
// environment.
//
// Does nothing if GAE_VERSION is not set.
//
// Equivalent to passing the following flags:
// -prod
// -http-addr 0.0.0.0:${PORT}
// -admin-addr -
// -shutdown-delay 0s
// -cloud-project ${GOOGLE_CLOUD_PROJECT}
// -cloud-region <derived from the region code in GAE_APPLICATION>
// -service-account-json :gce
// -ts-mon-service-name ${GOOGLE_CLOUD_PROJECT}
// -ts-mon-job-name ${GAE_SERVICE}
//
// Additionally the hostname and -container-image-id (used in metric and trace
// fields) are derived from available GAE_* env vars to be semantically similar
// to what they represent in the GKE environment.
//
// Note that a mapping between a region code in GAE_APPLICATION and
// the corresponding cloud region is not documented anywhere, so if you see
// warnings when your app starts up either update the code to recognize your
// region code or pass '-cloud-region' argument explicitly in app.yaml.
//
// See https://cloud.google.com/appengine/docs/standard/go/runtime.
func (o *Options) FromGAEEnv() {
if os.Getenv("GAE_VERSION") == "" {
return
}
o.GAE = true
o.Prod = true
o.Hostname = uniqueGAEHostname()
o.HTTPAddr = fmt.Sprintf("0.0.0.0:%s", os.Getenv("PORT"))
o.AdminAddr = "-"
o.ShutdownDelay = 0
o.CloudProject = os.Getenv("GOOGLE_CLOUD_PROJECT")
o.ClientAuth.ServiceAccountJSONPath = clientauth.GCEServiceAccount
o.TsMonServiceName = os.Getenv("GOOGLE_CLOUD_PROJECT")
o.TsMonJobName = os.Getenv("GAE_SERVICE")
o.ContainerImageID = fmt.Sprintf("appengine/%s/%s:%s",
os.Getenv("GOOGLE_CLOUD_PROJECT"),
os.Getenv("GAE_SERVICE"),
os.Getenv("GAE_VERSION"),
)
// Note: GAE_APPLICATION is missing on Flex.
if appID := os.Getenv("GAE_APPLICATION"); appID != "" && o.CloudRegion == "" {
o.CloudRegion = cloudRegionFromGAERegion[strings.Split(appID, "~")[0]]
}
}
// uniqueGAEHostname uses GAE_* env vars to derive a unique enough string that
// is used as a hostname in monitoring metrics.
func uniqueGAEHostname() string {
// GAE_INSTANCE is huge, hash it to get a small reasonably unique string.
id := sha256.Sum256([]byte(os.Getenv("GAE_INSTANCE")))
return fmt.Sprintf("%s-%s-%s",
os.Getenv("GAE_SERVICE"),
os.Getenv("GAE_DEPLOYMENT_ID"),
hex.EncodeToString(id[:])[:16],
)
}
// imageVersion extracts image tag or digest from ContainerImageID.
//
// This is eventually reported as a value of 'server/version' metric.
//
// Returns "unknown" if ContainerImageID is empty or malformed.
func (o *Options) imageVersion() string {
// Recognize "<path>@sha256:<digest>" and "<path>:<tag>".
idx := strings.LastIndex(o.ContainerImageID, "@")
if idx == -1 {
idx = strings.LastIndex(o.ContainerImageID, ":")
}
if idx == -1 {
return "unknown"
}
return o.ContainerImageID[idx+1:]
}
// userAgent derives a user-agent like string identifying the server.
func (o *Options) userAgent() string {
return fmt.Sprintf("LUCI-Server (service: %s; job: %s; ver: %s);", o.TsMonServiceName, o.TsMonJobName, o.imageVersion())
}
// shouldEnableTracing is true if options indicate we should enable tracing.
func (o *Options) shouldEnableTracing() bool {
switch {
case o.CloudProject == "":
return false // nowhere to upload traces to
case !o.Prod && o.TraceSampling == "":
return false // in dev mode don't upload samples by default
default:
return !o.testDisableTracing
}
}
// hostOptions constructs HostOptions for module.Initialize(...).
func (o *Options) hostOptions() module.HostOptions {
return module.HostOptions{
Prod: o.Prod,
GAE: o.GAE,
CloudProject: o.CloudProject,
CloudRegion: o.CloudRegion,
}
}
// Server is responsible for initializing and launching the serving environment.
//
// Generally assumed to be a singleton: do not launch multiple Server instances
// within the same process, use AddPort instead if you want to expose multiple
// ports.
//
// Doesn't do TLS. Should be sitting behind a load balancer that terminates
// TLS.
type Server struct {
// Context is the root context used by all requests and background activities.
//
// Can be replaced (by a derived context) before ListenAndServe call, for
// example to inject values accessible to all request handlers.
Context context.Context
// Routes is a router for requests hitting HTTPAddr port.
//
// This router is used for all requests whose Host header does not match any
// specially registered per-host routers (see VirtualHost). Normally, there
// are no such per-host routers, so usually Routes is used for all requests.
//
// This router is also accessible to the server modules and they can install
// routes into it.
//
// Should be populated before ListenAndServe call.
Routes *router.Router
// PRPC is pRPC server with APIs exposed on HTTPAddr port via Routes router.
//
// Should be populated before ListenAndServe call.
PRPC *prpc.Server
// CookieAuth is an authentication method implemented via cookies.
//
// It is initialized only if the server has a module implementing such scheme
// (e.g. "go.chromium.org/luci/server/encryptedcookies").
CookieAuth auth.Method
// Options is a copy of options passed to New.
Options Options
startTime time.Time // for calculating uptime for /healthz
lastReqTime atomic.Value // time.Time when the last request started
stdout sdlogger.LogEntryWriter // for logging to stdout, nil in dev mode
stderr sdlogger.LogEntryWriter // for logging to stderr, nil in dev mode
errRptClient *errorreporting.Client // for reporting to the cloud Error Reporting
mainPort *Port // pre-registered main port, see initMainPort
mu sync.Mutex // protects fields below
ports []*Port // all non-dummy ports (each one hosts an HTTP server)
started bool // true inside and after ListenAndServe
stopped bool // true inside and after Shutdown
ready chan struct{} // closed right before starting the serving loop
done chan struct{} // closed after Shutdown returns
// See RegisterUnaryServerInterceptor and ListenAndServe.
unaryInterceptors []grpc.UnaryServerInterceptor
rndM sync.Mutex // protects rnd
rnd *rand.Rand // used to generate trace and operation IDs
bgrDone chan struct{} // closed to stop background activities
bgrWg sync.WaitGroup // waits for RunInBackground goroutines to stop
warmupM sync.Mutex // protects 'warmup' and the actual warmup critical section
warmup []func(context.Context)
cleanupM sync.Mutex // protects 'cleanup' and the actual cleanup critical section
cleanup []func(context.Context)
tsmon *tsmon.State // manages flushing of tsmon metrics
sampler octrace.Sampler // trace sampler to use for top level spans
cloudTS oauth2.TokenSource // source of cloud-scoped tokens for Cloud APIs
signer *signerImpl // the signer used by the auth system
actorTokens *actorTokensImpl // for impersonating service accounts
authDB atomic.Value // last known good authdb.DB instance
runningAs string // email of an account the server runs as
}
// moduleHostImpl implements module.Host via server.Server.
//
// Just a tiny wrapper to make sure modules consume only curated limited set of
// the server API and do not retain the pointer to the server.
type moduleHostImpl struct {
srv *Server
mod module.Module
invalid bool
cookieAuth auth.Method
}
func (h *moduleHostImpl) panicIfInvalid() {
if h.invalid {
panic("module.Host must not be used outside of Initialize")
}
}
func (h *moduleHostImpl) ServiceRegistrar() grpc.ServiceRegistrar {
h.panicIfInvalid()
return h.srv.PRPC
}
func (h *moduleHostImpl) Routes() *router.Router {
h.panicIfInvalid()
return h.srv.Routes
}
func (h *moduleHostImpl) RunInBackground(activity string, f func(context.Context)) {
h.panicIfInvalid()
h.srv.RunInBackground(activity, f)
}
func (h *moduleHostImpl) RegisterWarmup(cb func(context.Context)) {
h.panicIfInvalid()
h.srv.RegisterWarmup(cb)
}
func (h *moduleHostImpl) RegisterCleanup(cb func(context.Context)) {
h.panicIfInvalid()
h.srv.RegisterCleanup(cb)
}
func (h *moduleHostImpl) RegisterUnaryServerInterceptor(intr grpc.UnaryServerInterceptor) {
h.panicIfInvalid()
h.srv.RegisterUnaryServerInterceptor(intr)
}
func (h *moduleHostImpl) RegisterCookieAuth(method auth.Method) {
h.panicIfInvalid()
h.cookieAuth = method
}
// New constructs a new server instance.
//
// It hosts one or more HTTP servers and starts and stops them in unison. It is
// also responsible for preparing contexts for incoming requests.
//
// The given context will become the root context of the server and will be
// inherited by all handlers.
//
// On errors returns partially initialized server (always non-nil). At least
// its logging will be configured and can be used to report the error. Trying
// to use such partially initialized server for anything else is undefined
// behavior.
func New(ctx context.Context, opts Options, mods []module.Module) (srv *Server, err error) {
seed := opts.testSeed
if seed == 0 {
if err := binary.Read(cryptorand.Reader, binary.BigEndian, &seed); err != nil {
panic(err)
}
}
// Do this very early, so that various transports created during the
// initialization are already wrapped with tracing. The rest of the tracing
// infra (e.g. actual uploads) is initialized later in initTracing.
if opts.shouldEnableTracing() {
internal.EnableOpenCensusTracing()
}
srv = &Server{
Context: ctx,
Options: opts,
startTime: clock.Now(ctx).UTC(),
ready: make(chan struct{}),
done: make(chan struct{}),
rnd: rand.New(rand.NewSource(seed)),
bgrDone: make(chan struct{}),
sampler: octrace.NeverSample(),
}
// Cleanup what we can on failures.
defer func() {
if err != nil {
srv.runCleanup()
}
}()
// Logging is needed to report any errors during the early initialization.
srv.initLogging()
logging.Infof(srv.Context, "Server starting...")
if srv.Options.ContainerImageID != "" {
logging.Infof(srv.Context, "Container image is %s", srv.Options.ContainerImageID)
}
// Need the hostname (e.g. pod name on k8s) for logs and metrics.
if srv.Options.Hostname == "" {
srv.Options.Hostname, err = os.Hostname()
if err != nil {
return srv, errors.Annotate(err, "failed to get own hostname").Err()
}
}
// On k8s log pod IPs too, this is useful when debugging k8s routing.
if !srv.Options.GAE {
logging.Infof(srv.Context, "Running on %s (%s)", srv.Options.Hostname, networkAddrsForLog())
} else {
logging.Infof(srv.Context, "Running on %s", srv.Options.Hostname)
logging.Infof(srv.Context, "Instance is %q", os.Getenv("GAE_INSTANCE"))
if srv.Options.CloudRegion == "" {
if appID := os.Getenv("GAE_APPLICATION"); appID != "" {
logging.Warningf(srv.Context, "Could not figure out the primary Cloud region based "+
"on the region code in GAE_APPLICATION %q, consider passing the region name "+
"via -cloud-region flag explicitly", appID)
}
} else {
logging.Infof(srv.Context, "Cloud region is %s", srv.Options.CloudRegion)
}
}
// Log enabled experiments, warn if some of them are unknown now.
var exps []experiments.ID
for _, name := range opts.EnableExperiments {
if exp, ok := experiments.GetByName(name); ok {
logging.Infof(ctx, "Enabling experiment %q", name)
exps = append(exps, exp)
} else {
logging.Warningf(ctx, "Skipping unknown experiment %q", name)
}
}
srv.Context = experiments.Enable(srv.Context, exps...)
// Configure base server subsystems by injecting them into the root context
// inherited later by all requests.
srv.Context = caching.WithProcessCacheData(srv.Context, caching.NewProcessCacheData())
if err := srv.initAuthStart(); err != nil {
return srv, errors.Annotate(err, "failed to initialize auth").Err()
}
if err := srv.initTSMon(); err != nil {
return srv, errors.Annotate(err, "failed to initialize tsmon").Err()
}
if err := srv.initAuthFinish(); err != nil {
return srv, errors.Annotate(err, "failed to finish auth initialization").Err()
}
if err := srv.initTracing(); err != nil {
return srv, errors.Annotate(err, "failed to initialize tracing").Err()
}
if err := srv.initErrorReporting(); err != nil {
return srv, errors.Annotate(err, "failed to initialize error reporting").Err()
}
if err := srv.initProfiling(); err != nil {
return srv, errors.Annotate(err, "failed to initialize profiling").Err()
}
if err := srv.initMainPort(); err != nil {
return srv, errors.Annotate(err, "failed to initialize the main port").Err()
}
if err := srv.initAdminPort(); err != nil {
return srv, errors.Annotate(err, "failed to initialize the admin port").Err()
}
if err := srv.initWarmup(); err != nil {
return srv, errors.Annotate(err, "failed to initialize warmup callbacks").Err()
}
// Sort modules by their initialization order based on declared dependencies,
// discover unfulfilled required dependencies.
sorted, err := resolveDependencies(mods)
if err != nil {
return srv, err
}
// Initialize all modules in their topological order.
impls := make([]*moduleHostImpl, len(sorted))
for i, mod := range sorted {
impls[i] = &moduleHostImpl{srv: srv, mod: mod}
switch ctx, err := mod.Initialize(srv.Context, impls[i], srv.Options.hostOptions()); {
case err != nil:
return srv, errors.Annotate(err, "failed to initialize module %q", mod.Name()).Err()
case ctx != nil:
srv.Context = ctx
}
impls[i].invalid = true // make sure the module does not retain it
}
// Ensure there's only one CookieAuth method registered.
var cookieAuthMod module.Module
for _, impl := range impls {
if impl.cookieAuth != nil {
if cookieAuthMod != nil {
return srv, errors.Annotate(err,
"conflict between %q and %q: both register a cookie auth scheme - pick one",
cookieAuthMod.Name(), impl.mod.Name(),
).Err()
}
cookieAuthMod = impl.mod
srv.CookieAuth = impl.cookieAuth
}
}
return srv, nil
}
// AddPort prepares an additional serving HTTP port.
//
// Can be used to open more listening HTTP ports (in addition to opts.HTTPAddr
// and opts.AdminAddr). The returned Port object can be used to populate the
// router that serves requests hitting the added port.
//
// If opts.ListenAddr is '-', a dummy port will be added: it is a valid *Port
// object, but it is not actually exposed as a listening TCP socket. This is
// useful to disable listening ports without changing any code.
//
// Should be called before ListenAndServe (panics otherwise).
func (s *Server) AddPort(opts PortOptions) *Port {
port := &Port{
Routes: s.newRouter(opts),
parent: s,
opts: opts,
}
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
if opts.ListenAddr != "-" {
s.ports = append(s.ports, port)
}
return port
}
// VirtualHost returns a router (registering it if necessary) used for requests
// that hit the main port (opts.HTTPAddr) and have the given Host header.
//
// Should be used in rare cases when the server is exposed through multiple
// domain names and requests should be routed differently based on what domain
// was used. If your server is serving only one domain name, or you don't care
// what domain name is used to access it, do not use VirtualHost.
//
// Note that requests that match some registered virtual host router won't
// reach the default router (server.Routes), even if the virtual host router
// doesn't have a route for them. Such requests finish with HTTP 404.
//
// Also the router created by VirtualHost is initially completely empty: the
// server and its modules don't install anything into it (there's intentionally
// no mechanism to do this). For that reason VirtualHost should never by used to
// register a router for the "main" domain name: it will make the default
// server.Routes (and all handlers installed there by server modules) useless,
// probably breaking the server. Put routes for the main server functionality
// directly into server.Routes instead, using VirtualHost only for routes that
// critically depend on Host header.
//
// Should be called before ListenAndServe (panics otherwise).
func (s *Server) VirtualHost(host string) *router.Router {
return s.mainPort.VirtualHost(host)
}
// newRouter creates a Router with the default middleware chain and routes.
func (s *Server) newRouter(opts PortOptions) *router.Router {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
mw := router.NewMiddlewareChain(
s.rootMiddleware, // prepares the per-request context
middleware.WithPanicCatcher, // transforms panics into HTTP 500
)
if s.tsmon != nil && !opts.DisableMetrics {
mw = mw.Extend(s.tsmon.Middleware) // collect HTTP requests metrics
}
// Setup middleware chain used by ALL requests.
r := router.New()
r.Use(mw)
// Mandatory health check/readiness probe endpoint.
r.GET(healthEndpoint, nil, func(c *router.Context) {
c.Writer.Write([]byte(s.healthResponse(c.Context)))
})
// Add NotFound handler wrapped in our middlewares so that unrecognized
// requests are at least logged. If we don't do that they'll be handled
// completely silently and this is very confusing when debugging 404s.
r.NotFound(nil, func(c *router.Context) {
http.NotFound(c.Writer, c.Request)
})
return r
}
// RunInBackground launches the given callback in a separate goroutine right
// before starting the serving loop.
//
// If the server is already running, launches it right away. If the server
// fails to start, the goroutines will never be launched.
//
// Should be used for background asynchronous activities like reloading configs.
//
// All logs lines emitted by the callback are annotated with "activity" field
// which can be arbitrary, but by convention has format "<namespace>.<name>",
// where "luci" namespace is reserved for internal activities.
//
// The context passed to the callback is canceled when the server is shutting
// down. It is expected the goroutine will exit soon after the context is
// canceled.
func (s *Server) RunInBackground(activity string, f func(context.Context)) {
s.bgrWg.Add(1)
go func() {
defer s.bgrWg.Done()
select {
case <-s.ready:
// Construct the context after the server is fully initialized. Cancel it
// as soon as bgrDone is signaled.
ctx, cancel := context.WithCancel(s.Context)
if activity != "" {
ctx = logging.SetField(ctx, "activity", activity)
}
defer cancel()
go func() {
select {
case <-s.bgrDone:
cancel()
case <-ctx.Done():
}
}()
f(ctx)
case <-s.bgrDone:
// the server is closed, no need to run f() anymore
}
}()
}
// RegisterUnaryServerInterceptor registers an grpc.UnaryServerInterceptor
// applied to all unary RPCs that hit the server.
//
// Interceptors are chained in order they are registered, i.e. the first
// registered interceptor becomes the outermost. The initial chain already
// contains some base interceptors (e.g. for monitoring) and all interceptors
// registered by server modules. RegisterUnaryServerInterceptor extends this
// chain.
//
// An interceptor set in server.PRPC.UnaryServerInterceptor (if any) is
// automatically registered as the last (innermost) one right before the server
// starts listening for requests in ListenAndServe.
//
// Should be called before ListenAndServe (panics otherwise).
func (s *Server) RegisterUnaryServerInterceptor(intr grpc.UnaryServerInterceptor) {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
s.unaryInterceptors = append(s.unaryInterceptors, intr)
}
// ListenAndServe launches the serving loop.
//
// Blocks forever or until the server is stopped via Shutdown (from another
// goroutine or from a SIGTERM handler). Returns nil if the server was shutdown
// correctly or an error if it failed to start or unexpectedly died. The error
// is logged inside.
//
// Should be called only once. Panics otherwise.
func (s *Server) ListenAndServe() error {
s.mu.Lock()
if s.started {
s.mu.Unlock()
s.Fatal(errors.Reason("the server has already been started").Err())
}
s.started = true
ports := append(make([]*Port, 0, len(s.ports)), s.ports...)
// Assemble the interceptor chain. Put our base interceptors in front of
// whatever interceptors were installed by modules and by the user of Server
// via public s.PRPC.UnaryServerInterceptor.
interceptors := []grpc.UnaryServerInterceptor{
grpcmon.UnaryServerInterceptor,
grpcutil.UnaryServerPanicCatcherInterceptor,
}
interceptors = append(interceptors, s.unaryInterceptors...)
if s.PRPC.UnaryServerInterceptor != nil {
interceptors = append(interceptors, s.PRPC.UnaryServerInterceptor)
}
s.mu.Unlock()
// Install the interceptor chain.
s.PRPC.UnaryServerInterceptor = grpcutil.ChainUnaryServerInterceptors(interceptors...)
// Run registered best-effort warmup callbacks right before serving.
s.runWarmup()
// Catch SIGTERM while inside the serving loop. Upon receiving SIGTERM, wait
// until the pod is removed from the load balancer before actually shutting
// down and refusing new connections. If we shutdown immediately, some clients
// may see connection errors, because they are not aware yet the server is
// closing: Pod shutdown sequence and Endpoints list updates are racing with
// each other, we want Endpoints list updates to win, i.e. we want the pod to
// actually be fully alive as long as it is still referenced in Endpoints
// list. We can't guarantee this, but we can improve chances.
stop := signals.HandleInterrupt(func() {
if s.Options.Prod {
s.waitUntilNotServing()
}
s.Shutdown()
})
defer stop()
// Log how long it took from 'New' to the serving loop.
logging.Infof(s.Context, "Startup done in %s", clock.Now(s.Context).Sub(s.startTime))
// Unblock all pending RunInBackground goroutines, so they can start.
close(s.ready)
// Run serving loops in parallel.
errs := make(errors.MultiError, len(ports))
wg := sync.WaitGroup{}
wg.Add(len(ports))
for i, port := range ports {
logging.Infof(s.Context, "Serving %s", port.nameForLog())
i := i
port := port
go func() {
defer wg.Done()
if err := s.serveLoop(port.httpServer()); err != http.ErrServerClosed {
logging.WithError(err).Errorf(s.Context, "Server %s failed", port.nameForLog())
errs[i] = err
s.Shutdown() // close all other servers
}
}()
}
wg.Wait()
// Per http.Server docs, we end up here *immediately* after Shutdown call was
// initiated. Some requests can still be in-flight. We block until they are
// done (as indicated by Shutdown call itself exiting).
logging.Infof(s.Context, "Waiting for the server to stop...")
<-s.done
logging.Infof(s.Context, "The serving loop stopped, running the final cleanup...")
s.runCleanup()
logging.Infof(s.Context, "The server has stopped")
if errs.First() != nil {
return errs
}
return nil
}
// Shutdown gracefully stops the server if it was running.
//
// Blocks until the server is stopped. Can be called multiple times.
func (s *Server) Shutdown() {
s.mu.Lock()
defer s.mu.Unlock()
if s.stopped {
return
}
logging.Infof(s.Context, "Shutting down the server...")
// Tell all RunInBackground goroutines to stop.
close(s.bgrDone)
// Stop all http.Servers in parallel. Each Shutdown call blocks until the
// corresponding server is stopped.
wg := sync.WaitGroup{}
wg.Add(len(s.ports))
for _, port := range s.ports {
port := port
go func() {
defer wg.Done()
port.httpServer().Shutdown(s.Context)
}()
}
wg.Wait()
// Wait for all background goroutines to stop.
s.bgrWg.Wait()
// Notify ListenAndServe that it can exit now.
s.stopped = true
close(s.done)
}
// Fatal logs the error and immediately shuts down the process with exit code 3.
//
// No cleanup is performed. Deferred statements are not run. Not recoverable.
func (s *Server) Fatal(err error) {
errors.Log(s.Context, err)
os.Exit(3)
}
// healthResponse prepares text/plan response for the health check endpoints.
//
// It additionally contains some easy to obtain information that may help in
// debugging deployments.
func (s *Server) healthResponse(c context.Context) string {
maybeEmpty := func(s string) string {
if s == "" {
return "<unknown>"
}
return s
}
return strings.Join([]string{
"OK",
"",
"uptime: " + clock.Now(c).Sub(s.startTime).String(),
"image: " + maybeEmpty(s.Options.ContainerImageID),
"",
"service: " + maybeEmpty(s.Options.TsMonServiceName),
"job: " + maybeEmpty(s.Options.TsMonJobName),
"host: " + s.Options.Hostname,
"",
}, "\n")
}
// serveLoop binds the socket and launches the serving loop.
//
// Basically srv.ListenAndServe with some testing helpers.
func (s *Server) serveLoop(srv *http.Server) error {
// If not running tests, let http.Server bind the socket as usual.
if s.Options.testListeners == nil {
return srv.ListenAndServe()
}
// In test mode the listener MUST be prepared already.
if l, _ := s.Options.testListeners[srv.Addr]; l != nil {
return srv.Serve(l)
}
return errors.Reason("test listener is not set").Err()
}
// waitUntilNotServing is called during the graceful shutdown and it tries to
// figure out when the traffic stops flowing to the server (i.e. when it is
// removed from the load balancer).
//
// It's a heuristic optimization for the case when the load balancer keeps
// sending traffic to a terminating Pod for some time after the Pod entered
// "Terminating" state. It can happen due to latencies in Endpoints list
// updates. We want to keep the listening socket open as long as there are
// incoming requests (but no longer than 1 min).
func (s *Server) waitUntilNotServing() {
logging.Infof(s.Context, "Received SIGTERM, waiting for the traffic to stop...")
// When the server is idle the loop below exits immediately and the server
// enters the shutdown path, rejecting new connections. Since we gave
// Kubernetes no time to update the Endpoints list, it is possible someone
// still might send a request to the server (and it will be rejected).
// To avoid that we always sleep a bit here to give Kubernetes a chance to
// propagate the Endpoints list update everywhere. The loop below then
// verifies clients got the update and stopped sending requests.
time.Sleep(s.Options.ShutdownDelay)
deadline := clock.Now(s.Context).Add(time.Minute)
for {
now := clock.Now(s.Context)
lastReq, ok := s.lastReqTime.Load().(time.Time)
if !ok || now.Sub(lastReq) > 15*time.Second {
logging.Infof(s.Context, "No requests received in the last 15 sec, proceeding with the shutdown...")
break
}
if now.After(deadline) {
logging.Warningf(s.Context, "Gave up waiting for the traffic to stop, proceeding with the shutdown...")
break
}
time.Sleep(100 * time.Millisecond)
}
}
// RegisterWarmup registers a callback that is run in server's ListenAndServe
// right before the serving loop.
//
// It receives the global server context (including all customizations made
// by the user code in server.Main). Intended for best-effort warmups: there's
// no way to gracefully abort the server startup from a warmup callback.
//
// Registering a new warmup callback from within a warmup causes a deadlock,
// don't do that.
func (s *Server) RegisterWarmup(cb func(context.Context)) {
s.warmupM.Lock()
defer s.warmupM.Unlock()
s.warmup = append(s.warmup, cb)
}
// runWarmup runs all registered warmup functions (sequentially in registration
// order).
func (s *Server) runWarmup() {
s.warmupM.Lock()
defer s.warmupM.Unlock()
ctx := logging.SetField(s.Context, "activity", "luci.warmup")
for _, cb := range s.warmup {
cb(ctx)
}
}
// RegisterCleanup registers a callback that is run in ListenAndServe after the
// server has exited the serving loop.
//
// Registering a new cleanup callback from within a cleanup causes a deadlock,
// don't do that.
func (s *Server) RegisterCleanup(cb func(context.Context)) {
s.cleanupM.Lock()
defer s.cleanupM.Unlock()
s.cleanup = append(s.cleanup, cb)
}
// runCleanup runs all registered cleanup functions (sequentially in reverse
// order).
func (s *Server) runCleanup() {
s.cleanupM.Lock()
defer s.cleanupM.Unlock()
for i := len(s.cleanup) - 1; i >= 0; i-- {
s.cleanup[i](s.Context)
}
}
// genUniqueID returns pseudo-random hex string of given even length.
func (s *Server) genUniqueID(l int) string {
b := make([]byte, l/2)
s.rndM.Lock()
s.rnd.Read(b)
s.rndM.Unlock()
return hex.EncodeToString(b)
}
var cloudTraceFormat = propagation.HTTPFormat{}
// rootMiddleware prepares the per-request context.
func (s *Server) rootMiddleware(c *router.Context, next router.Handler) {
started := clock.Now(s.Context)
// Wrap the request in a tracing span. The span is closed in the defer below
// (where we know the response status code). If this is a health check, open
// the span nonetheless, but do not record it (health checks are spammy and
// not interesting). This way the code is simpler ('span' is always non-nil
// and has TraceID). Additionally if some of health check code opens a span
// of its own, it will be ignored (as a child of not-recorded span).
healthCheck := isHealthCheckRequest(c.Request)
ctx, span := s.startRequestSpan(s.Context, c.Request, healthCheck)
// This is used in waitUntilNotServing.
if !healthCheck {
s.lastReqTime.Store(started)
}
// Associate all logs with the span via its Trace ID. Use the full ID if we
// can derive it. This is important to groups logs generated by us with logs
// generated by the GAE service itself (which uses the full trace ID), when
// running on GAE. Outside of GAE it doesn't really matter and works either
// way.
spanCtx := span.SpanContext()
traceID := hex.EncodeToString(spanCtx.TraceID[:])
if s.Options.CloudProject != "" {
traceID = fmt.Sprintf("projects/%s/traces/%s", s.Options.CloudProject, traceID)
}
// Track how many response bytes are sent and what status is set.
rw := iotools.NewResponseWriter(c.Writer)
c.Writer = rw
// Observe maximum emitted severity to use it as an overall severity for the
// request log entry.
severityTracker := sdlogger.SeverityTracker{Out: s.stdout}
// Log the overall request information when the request finishes. Use TraceID
// to correlate this log entry with entries emitted by the request handler
// below.
defer func() {
now := clock.Now(s.Context)
latency := now.Sub(started)
statusCode := rw.Status()
if healthCheck {
// Do not log fast health check calls AT ALL, they just spam logs.
if latency < healthTimeLogThreshold {
return
}
// Emit a warning if the health check is slow, this likely indicates
// high CPU load.
logging.Warningf(c.Context, "Health check is slow: %s > %s", latency, healthTimeLogThreshold)
}
// When running behind Envoy, log its request IDs to simplify debugging.
var extraFields logging.Fields
if xrid := c.Request.Header.Get("X-Request-Id"); xrid != "" {
extraFields = logging.Fields{"requestId": xrid}
}
entry := sdlogger.LogEntry{
Severity: severityTracker.MaxSeverity(),
Timestamp: sdlogger.ToTimestamp(now),
TraceID: traceID,
TraceSampled: span.IsRecordingEvents(),
SpanID: spanCtx.SpanID.String(), // the top-level span ID
Fields: extraFields,
RequestInfo: &sdlogger.RequestInfo{
Method: c.Request.Method,
URL: getRequestURL(c.Request),
Status: statusCode,
RequestSize: fmt.Sprintf("%d", c.Request.ContentLength),
ResponseSize: fmt.Sprintf("%d", rw.ResponseSize()),
UserAgent: c.Request.UserAgent(),
RemoteIP: getRemoteIP(c.Request),
Latency: fmt.Sprintf("%fs", latency.Seconds()),
},
}
if s.Options.Prod {
// Skip writing the root request log entry on GAE, since GAE writes it
// itself (in "appengine.googleapis.com/request_log" log). See also
// comments for initLogging(...).
if !s.Options.GAE {
s.stderr.Write(&entry)
}
} else {
logging.Infof(s.Context, "%d %s %q (%s)",
entry.RequestInfo.Status,
entry.RequestInfo.Method,
entry.RequestInfo.URL,
entry.RequestInfo.Latency,
)
}
span.AddAttributes(
octrace.Int64Attribute("/http/status_code", int64(statusCode)),
octrace.Int64Attribute("/http/request/size", c.Request.ContentLength),
octrace.Int64Attribute("/http/response/size", rw.ResponseSize()),
)
span.End()
}()
timeout := s.Options.DefaultRequestTimeout
if strings.HasPrefix(c.Request.URL.Path, "/internal/") {
timeout = s.Options.InternalRequestTimeout
}
if timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// Make the request logger emit log entries associated with the tracing span.
if s.Options.Prod {
annotateWithSpan := func(ctx context.Context, e *sdlogger.LogEntry) {
// Note: here 'span' is some inner span from where logging.Log(...) was
// called. We annotate log lines with spans that emitted them.
if span := octrace.FromContext(ctx); span != nil {
e.SpanID = span.SpanContext().SpanID.String()
}
}
var logSink sdlogger.LogEntryWriter
if s.errRptClient != nil {
logSink = &sdlogger.CloudErrorsSink{
Client: s.errRptClient,
Out: &severityTracker,
}
} else {
logSink = &severityTracker
}
ctx = logging.SetFactory(ctx, sdlogger.Factory(logSink, sdlogger.LogEntry{
TraceID: traceID,
Operation: &sdlogger.Operation{ID: s.genUniqueID(32)},
}, annotateWithSpan))
}
c.Context = caching.WithRequestCache(ctx)
next(c)
}
// initLogging initializes the server logging.
//
// Called very early during server startup process. Many server fields may not
// be initialized yet, be careful.
//
// When running in production uses the ugly looking JSON format that is hard to
// read by humans but which is parsed by google-fluentd and GAE hosting
// environment.
//
// To support per-request log grouping in Stackdriver Logs UI there must be
// two different log streams:
// * A stream with top-level HTTP request entries (conceptually like Apache's
// access.log, i.e. with one log entry per request).
// * A stream with logs produced within requests (correlated with HTTP request
// logs via the trace ID field).
//
// Both streams are expected to have a particular format and use particular
// fields for Stackdriver UI to display them correctly. This technique is
// primarily intended for GAE Flex, but it works in many Google environments:
// https://cloud.google.com/appengine/articles/logging#linking_app_logs_and_requests
//
// On GKE we use 'stderr' stream for top-level HTTP request entries and 'stdout'
// stream for logs produced by requests.
//
// On GAE, the stream with top-level HTTP request entries is produced by the GAE
// runtime itself (as 'appengine.googleapis.com/request_log'). So we emit only
// logs produced within requests (also to 'stdout', just like on GKE).
//
// In all environments 'stderr' stream is used to log all global activities that
// happens outside of any request handler (stuff like initialization, shutdown,
// background goroutines, etc).
//
// In non-production mode we use the human-friendly format and a single 'stderr'
// log stream for everything.
func (s *Server) initLogging() {
if !s.Options.Prod {
s.Context = gologger.StdConfig.Use(s.Context)
s.Context = logging.SetLevel(s.Context, logging.Debug)
return
}
if s.Options.testStdout != nil {
s.stdout = s.Options.testStdout
} else {
s.stdout = &sdlogger.Sink{Out: os.Stdout}
}
if s.Options.testStderr != nil {
s.stderr = s.Options.testStderr
} else {
s.stderr = &sdlogger.Sink{Out: os.Stderr}
}
s.Context = logging.SetFactory(s.Context,
sdlogger.Factory(s.stderr, sdlogger.LogEntry{
Operation: &sdlogger.Operation{
ID: s.genUniqueID(32), // correlate all global server logs together
},
}, nil),
)
s.Context = logging.SetLevel(s.Context, logging.Debug)
}
// initAuthStart initializes the core auth system by preparing the context
// and verifying auth tokens can actually be minted (i.e. supplied credentials
// are valid).
//
// It is called before the tsmon monitoring is initialized: tsmon needs auth.
// The rest of the auth initialization (the part that needs tsmon) happens in
// initAuthFinish after tsmon is initialized.
func (s *Server) initAuthStart() error {
// Make a transport that appends information about the server as User-Agent.
ua := s.Options.userAgent()
rootTransport := clientauth.NewModifyingTransport(http.DefaultTransport, func(req *http.Request) error {
newUA := ua
if cur := req.UserAgent(); cur != "" {
newUA += " " + cur
}
req.Header.Set("User-Agent", newUA)
return nil
})
// Initialize the token generator based on s.Options.ClientAuth.
opts := s.Options.ClientAuth
// Use `rootTransport` for calls made by the token generator (e.g. when
// refreshing tokens).
opts.Transport = rootTransport
// We aren't going to use the authenticator's transport (and thus its
// monitoring), only the token source. DisableMonitoring == true removes some
// log spam.
opts.DisableMonitoring = true
// GAE v2 is very aggressive in caching the token internally (in the metadata
// server) and refreshing it only when it is very close to its expiration. We
// need to match this behavior in our in-process cache, otherwise
// GetAccessToken complains that the token refresh procedure doesn't actually
// change the token (because the metadata server returned the cached one).
opts.MinTokenLifetime = 20 * time.Second
// The default value for ClientAuth.SecretsDir is usually hardcoded to point
// to where the token cache is located on developer machines (~/.config/...).
// This location often doesn't exist when running from inside a container.
// The token cache is also not really needed for production services that use
// service accounts (they don't need cached refresh tokens). So in production
// mode totally ignore default ClientAuth.SecretsDir and use whatever was
// passed as -token-cache-dir. If it is empty (default), then no on-disk token
// cache is used at all.
//
// If -token-cache-dir was explicitly set, always use it (even in dev mode).
// This is useful when running containers locally: developer's credentials
// on the host machine can be mounted inside the container.
if s.Options.Prod || s.Options.TokenCacheDir != "" {
opts.SecretsDir = s.Options.TokenCacheDir
}
// Annotate the context used for logging from the token generator.
ctx := logging.SetField(s.Context, "activity", "luci.auth")
tokens := clientauth.NewTokenGenerator(ctx, opts)
// Prepare partially initialized structs for the auth.Config. They will be
// fully initialized in initAuthFinish once we have a sufficiently working
// auth context that can call Cloud IAM.
s.signer = &signerImpl{srv: s}
s.actorTokens = &actorTokensImpl{}
// Initialize the state in the context.
s.Context = auth.Initialize(s.Context, &auth.Config{
DBProvider: func(context.Context) (authdb.DB, error) {
db, _ := s.authDB.Load().(authdb.DB) // refreshed asynchronously in refreshAuthDB
return db, nil
},
Signer: s.signer,
AccessTokenProvider: func(ctx context.Context, scopes []string) (*oauth2.Token, error) {
return tokens.GenerateOAuthToken(ctx, scopes, 0)
},
IDTokenProvider: func(ctx context.Context, audience string) (*oauth2.Token, error) {
return tokens.GenerateIDToken(ctx, audience, 0)
},
ActorTokensProvider: s.actorTokens,
AnonymousTransport: func(context.Context) http.RoundTripper { return rootTransport },
FrontendClientID: func(context.Context) (string, error) { return s.Options.FrontendClientID, nil },
EndUserIP: getRemoteIP,
IsDevMode: !s.Options.Prod,
})
// Note: we initialize a token source for one arbitrary set of scopes here. In
// many practical cases this is sufficient to verify that credentials are
// valid. For example, when we use service account JSON key, if we can
// generate a token with *some* scope (meaning Cloud accepted our signature),
// we can generate tokens with *any* scope, since there's no restrictions on
// what scopes are accessible to a service account, as long as the private key
// is valid (which we just verified by generating some token).
_, err := tokens.GenerateOAuthToken(ctx, auth.CloudOAuthScopes, 0)
if err != nil {
// ErrLoginRequired may happen only when running the server locally using
// developer's credentials. Let them know how the problem can be fixed.
if !s.Options.Prod && err == clientauth.ErrLoginRequired {
scopes := fmt.Sprintf("-scopes %q", strings.Join(auth.CloudOAuthScopes, " "))
if opts.ActAsServiceAccount != "" && opts.ActViaLUCIRealm == "" {
scopes = "-scopes-iam"
}
logging.Errorf(s.Context, "Looks like you run the server locally and it doesn't have credentials for some OAuth scopes")
logging.Errorf(s.Context, "Run the following command to set them up: ")
logging.Errorf(s.Context, " $ luci-auth login %s", scopes)
}
return errors.Annotate(err, "failed to initialize the token source").Err()
}
// Report who we are running as. Useful when debugging access issues.
switch email, err := tokens.GetEmail(); {
case err == nil:
logging.Infof(s.Context, "Running as %s", email)
s.runningAs = email
case err == clientauth.ErrNoEmail:
logging.Warningf(s.Context, "Running as <unknown>, cautiously proceeding...")
case err != nil:
return errors.Annotate(err, "failed to check the service account email").Err()
}
return nil
}
// initAuthFinish finishes auth system initialization.
//
// It is called after tsmon is initialized.
func (s *Server) initAuthFinish() error {
// We should be able to make basic authenticated requests now and can
// construct a token source used by server's own guts to call Cloud APIs,
// such us Cloud Trace and Cloud Error Reporting (and others).
var err error
s.cloudTS, err = auth.GetTokenSource(s.Context, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return errors.Annotate(err, "failed to initialize the cloud token source").Err()
}
// Finish constructing `signer` and `actorTokens` that were waiting for
// an IAM client.
iamClient, err := credentials.NewIamCredentialsClient(s.Context, option.WithTokenSource(s.cloudTS))
if err != nil {
return errors.Annotate(err, "failed to construct IAM client").Err()
}
s.RegisterCleanup(func(ctx context.Context) { iamClient.Close() })
s.signer.iamClient = iamClient
s.actorTokens.iamClient = iamClient
// Now initialize the AuthDB (a database with groups and auth config) and
// start a goroutine to periodically refresh it.
if err := s.initAuthDB(); err != nil {
return errors.Annotate(err, "failed to initialize AuthDB").Err()
}
return nil
}
// initAuthDB interprets -auth-db-* flags and sets up fetching of AuthDB.
func (s *Server) initAuthDB() error {
// Check flags are compatible.
switch {
case s.Options.AuthDBPath != "" && s.Options.AuthServiceHost != "":
return errors.Reason("-auth-db-path and -auth-service-host can't be used together").Err()
case s.Options.AuthServiceHost == "" && (s.Options.AuthDBDump != "" || s.Options.AuthDBSigner != ""):
return errors.Reason("-auth-db-dump and -auth-db-signer can be used only with -auth-service-host").Err()
case s.Options.AuthDBDump != "" && !strings.HasPrefix(s.Options.AuthDBDump, "gs://"):
return errors.Reason("-auth-db-dump value should start with gs://, got %q", s.Options.AuthDBDump).Err()
case strings.Contains(s.Options.AuthServiceHost, "/"):
return errors.Reason("-auth-service-host should be a plain hostname, got %q", s.Options.AuthServiceHost).Err()
}
// Fill in defaults.
if s.Options.AuthServiceHost != "" {
if s.Options.AuthDBDump == "" {
s.Options.AuthDBDump = fmt.Sprintf("gs://%s/auth-db", s.Options.AuthServiceHost)
}
if s.Options.AuthDBSigner == "" {
if !strings.HasSuffix(s.Options.AuthServiceHost, ".appspot.com") {
return errors.Reason("-auth-db-signer is required if -auth-service-host is not *.appspot.com").Err()
}
s.Options.AuthDBSigner = fmt.Sprintf("%s@appspot.gserviceaccount.com",
strings.TrimSuffix(s.Options.AuthServiceHost, ".appspot.com"))
}
}
// Fetch the initial copy of AuthDB. Note that this happens before we start
// the serving loop, to make sure incoming requests have some AuthDB to use.
if err := s.refreshAuthDB(s.Context); err != nil {
return errors.Annotate(err, "failed to load the initial AuthDB version").Err()
}
// Periodically refresh it in the background.
s.RunInBackground("luci.authdb", func(c context.Context) {
for {
if r := <-clock.After(c, 30*time.Second); r.Err != nil {
return // the context is canceled
}
if err := s.refreshAuthDB(c); err != nil {
logging.WithError(err).Errorf(c, "Failed to reload AuthDB, using the cached one")
}
}
})
return nil
}
// refreshAuthDB reloads AuthDB from the source and stores it in memory.
func (s *Server) refreshAuthDB(c context.Context) error {
cur, _ := s.authDB.Load().(authdb.DB)
db, err := s.fetchAuthDB(c, cur)
if err != nil {
return err
}
s.authDB.Store(db)
return nil
}
// fetchAuthDB fetches the most recent copy of AuthDB from the external source.
//
// 'cur' is the currently used AuthDB or nil if fetching it for the first time.
// Returns 'cur' as is if it's already fresh.
func (s *Server) fetchAuthDB(c context.Context, cur authdb.DB) (authdb.DB, error) {
if s.Options.testAuthDB != nil {
return s.Options.testAuthDB, nil
}
// Loading from a local file (useful in integration tests).
if s.Options.AuthDBPath != "" {
r, err := os.Open(s.Options.AuthDBPath)
if err != nil {
return nil, errors.Annotate(err, "failed to open AuthDB file").Err()
}
defer r.Close()
db, err := authdb.SnapshotDBFromTextProto(r)
if err != nil {
return nil, errors.Annotate(err, "failed to load AuthDB file").Err()
}
return db, nil
}
// Loading from a GCS dump (s.Options.AuthDB* are validated here already).
if s.Options.AuthDBDump != "" {
c, cancel := clock.WithTimeout(c, 5*time.Minute)
defer cancel()
fetcher := dump.Fetcher{
StorageDumpPath: s.Options.AuthDBDump[len("gs://"):],
AuthServiceURL: "https://" + s.Options.AuthServiceHost,
AuthServiceAccount: s.Options.AuthDBSigner,
OAuthScopes: auth.CloudOAuthScopes,
}
curSnap, _ := cur.(*authdb.SnapshotDB)
snap, err := fetcher.FetchAuthDB(c, curSnap)
if err != nil {
return nil, errors.Annotate(err, "fetching from GCS dump failed").Err()
}
return snap, nil
}
// In dev mode default to "allow everything".
if !s.Options.Prod {
return authdb.DevServerDB{}, nil
}
// In prod mode default to "fail on any non-trivial check". Some services may
// not need to use AuthDB at all and configuring it for them is a hassle. If
// they try to use it for something vital, they'll see the error.
return authdb.UnconfiguredDB{
Error: errors.Reason("a source of AuthDB is not configured, see -auth-* server flags").Err(),
}, nil
}
// initTSMon initializes time series monitoring state.
func (s *Server) initTSMon() error {
// We keep tsmon always enabled (flushing to /dev/null if no -ts-mon-* flags
// are set) so that tsmon's in-process store is populated, and metrics there
// can be examined via /admin/tsmon. This is useful when developing/debugging
// tsmon metrics.
var customMonitor monitor.Monitor
if s.Options.TsMonAccount == "" || s.Options.TsMonServiceName == "" || s.Options.TsMonJobName == "" {
logging.Infof(s.Context, "tsmon is in the debug mode: metrics are collected, but flushed to /dev/null (pass -ts-mon-* flags to start uploading metrics)")
customMonitor = monitor.NewNilMonitor()
}
s.tsmon = &tsmon.State{
CustomMonitor: customMonitor,
Settings: &tsmon.Settings{
Enabled: true,
ProdXAccount: s.Options.TsMonAccount,
FlushIntervalSec: 60,
ReportRuntimeStats: true,
},
Target: func(c context.Context) target.Task {
// TODO(vadimsh): We pretend to be a GAE app for now to be able to
// reuse existing dashboards. Each pod pretends to be a separate GAE
// version. That way we can stop worrying about TaskNumAllocator and just
// use 0 (since there'll be only one task per "version"). This looks
// chaotic for deployments with large number of pods.
return target.Task{
DataCenter: "appengine",
ServiceName: s.Options.TsMonServiceName,
JobName: s.Options.TsMonJobName,
HostName: s.Options.Hostname,
}
},
}
if customMonitor != nil {
tsmon.PortalPage.SetReadOnlySettings(s.tsmon.Settings,
"Running in the debug mode. Pass all -ts-mon-* command line flags to start uploading metrics.")
} else {
tsmon.PortalPage.SetReadOnlySettings(s.tsmon.Settings,
"Settings are controlled through -ts-mon-* command line flags.")
}
// Enable this configuration in s.Context so all transports created during
// the server startup have tsmon instrumentation.
s.tsmon.Activate(s.Context)
// Report our image version as a metric, useful to monitor rollouts.
tsmoncommon.RegisterCallbackIn(s.Context, func(ctx context.Context) {
versionMetric.Set(ctx, s.Options.imageVersion())
})
// Periodically flush metrics.
s.RunInBackground("luci.tsmon", s.tsmon.FlushPeriodically)
return nil
}
// initTracing initialized Stackdriver opencensus.io trace exporter.
func (s *Server) initTracing() error {
if !s.Options.shouldEnableTracing() {
return nil
}
if !s.Options.GAE {
// Parse -trace-sampling spec to get a sampler.
sampling := s.Options.TraceSampling
if sampling == "" {
sampling = "0.1qps"
}
logging.Infof(s.Context, "Setting up Stackdriver trace exports to %q (%s)", s.Options.CloudProject, sampling)
var err error
if s.sampler, err = internal.Sampler(sampling); err != nil {
return errors.Annotate(err, "bad -trace-sampling").Err()
}
} else {
// On GAE let the GAE make decisions about sampling. If it decides to sample
// a trace, it will let us know through options of the parent span in
// X-Cloud-Trace-Context. We will collect only traces from requests that
// GAE wants to sample itself.
logging.Infof(s.Context, "Setting up Stackdriver trace exports to %q using GAE sampling strategy", s.Options.CloudProject)
s.sampler = func(p octrace.SamplingParameters) octrace.SamplingDecision {
return octrace.SamplingDecision{Sample: p.ParentContext.IsSampled()}
}
}
// Set the token source to call Stackdriver API.
opts := []option.ClientOption{option.WithTokenSource(s.cloudTS)}
// Register the trace uploader. It is also accidentally metrics uploader, but
// we shouldn't be using metrics (we have tsmon instead).
exporter, err := stackdriver.NewExporter(stackdriver.Options{
ProjectID: s.Options.CloudProject,
MonitoringClientOptions: opts, // note: this should be effectively unused
TraceClientOptions: opts,
BundleDelayThreshold: 10 * time.Second,
BundleCountThreshold: 512,
DefaultTraceAttributes: map[string]interface{}{
"cr.dev/image": s.Options.ContainerImageID,
"cr.dev/service": s.Options.TsMonServiceName,
"cr.dev/job": s.Options.TsMonJobName,
"cr.dev/host": s.Options.Hostname,
},
OnError: func(err error) {
logging.Errorf(s.Context, "Stackdriver error: %s", err)
},
})
if err != nil {
return err
}
octrace.RegisterExporter(exporter)
// No matter what, do not sample "random" top-level spans from background
// goroutines we don't control. We'll start top spans ourselves in
// startRequestSpan.
octrace.ApplyConfig(octrace.Config{DefaultSampler: octrace.NeverSample()})
// Do the final flush before exiting.
s.RegisterCleanup(func(context.Context) { exporter.Flush() })
return nil
}
// initProfiling initialized Stackdriver Profiler.
func (s *Server) initProfiling() error {
// Skip if not enough configuration is given.
switch {
case !s.Options.Prod:
return nil // silently skip, no need for log spam in dev mode
case s.Options.CloudProject == "":
logging.Infof(s.Context, "Stackdriver profiler is disabled: -cloud-project is not set")
return nil
case s.Options.ProfilingDisable:
logging.Infof(s.Context, "Stackdriver profiler is disabled: -profiling-disable is set")
return nil
case s.Options.ProfilingServiceID == "" && s.Options.TsMonJobName == "":
logging.Infof(s.Context, "Stackdriver profiler is disabled: neither -profiling-service-id nor -ts-mon-job-name are set")
return nil
}
cfg := profiler.Config{
ProjectID: s.Options.CloudProject,
Service: s.getServiceID(),
ServiceVersion: s.Options.imageVersion(),
Instance: s.Options.Hostname,
// Note: these two options may potentially have impact on performance, but
// it is likely small enough not to bother.
MutexProfiling: true,
AllocForceGC: true,
}
// Launch the agent that runs in the background and periodically collects and
// uploads profiles. It fails to launch if Service or ServiceVersion do not
// pass regexp validation. Make it non-fatal, but still log.
if err := profiler.Start(cfg, option.WithTokenSource(s.cloudTS)); err != nil {
logging.Errorf(s.Context, "Stackdriver profiler is disabled: failed do start - %s", err)
return nil
}
logging.Infof(s.Context, "Setting up Stackdriver profiler (service %q, version %q)", cfg.Service, cfg.ServiceVersion)
return nil
}
// getServiceID get the service id from either ProfilingServiceID or TsMonJobName.
func (s *Server) getServiceID() string {
// Prefer ProfilingServiceID if given, fall back to TsMonJobName. Replace
// forbidden '/' symbol.
serviceID := s.Options.ProfilingServiceID
if serviceID == "" {
serviceID = s.Options.TsMonJobName
}
serviceID = strings.ReplaceAll(serviceID, "/", "-")
return serviceID
}
// initMainPort initializes the server on options.HTTPAddr port.
func (s *Server) initMainPort() error {
s.mainPort = s.AddPort(PortOptions{
Name: "main",
ListenAddr: s.Options.HTTPAddr,
})
s.Routes = s.mainPort.Routes
// Install auth info handlers (under "/auth/api/v1/server/").
auth.InstallHandlers(s.Routes, nil)
// Expose public pRPC endpoints (see also ListenAndServe where we put the
// final interceptors).
s.PRPC = &prpc.Server{
Authenticator: &auth.Authenticator{
Methods: []auth.Method{
&auth.GoogleOAuth2Method{
Scopes: []string{clientauth.OAuthScopeEmail},
},
},
},
// Allow compression when not running on GAE. On GAE compression for text
// responses is done by GAE itself and doing it in our code would be
// wasteful.
EnableCompression: !s.Options.GAE,
}
discovery.Enable(s.PRPC)
s.PRPC.InstallHandlers(s.Routes, nil)
// Install RPCExplorer web app at "/rpcexplorer/".
rpcexplorer.Install(s.Routes)
return nil
}
// initAdminPort initializes the server on options.AdminAddr port.
func (s *Server) initAdminPort() error {
if s.Options.AdminAddr == "-" {
return nil // the admin port is disabled
}
// Admin portal uses XSRF tokens that require a secret key. We generate this
// key randomly during process startup (i.e. now). It means XSRF tokens in
// admin HTML pages rendered by a server process are understood only by the
// exact same process. This is OK for admin pages (they are not behind load
// balancers and we don't care that a server restart invalidates all tokens).
secret := make([]byte, 20)
if _, err := cryptorand.Read(secret); err != nil {
return err
}
store := secrets.NewDerivedStore(secrets.Secret{Current: secret})
withAdminSecret := router.NewMiddlewareChain(func(c *router.Context, next router.Handler) {
c.Context = secrets.Use(c.Context, store)
next(c)
})
// Install endpoints accessible through the admin port only.
adminPort := s.AddPort(PortOptions{
Name: "admin",
ListenAddr: s.Options.AdminAddr,
DisableMetrics: true, // do not pollute HTTP metrics with admin-only routes
})
routes := adminPort.Routes
routes.GET("/", nil, func(c *router.Context) {
http.Redirect(c.Writer, c.Request, "/admin/portal", http.StatusFound)
})
portal.InstallHandlers(routes, withAdminSecret, portal.AssumeTrustedPort)
// Install pprof endpoints on the admin port. Note that they must not be
// exposed via the main serving port, since they do no authentication and
// may leak internal information. Also note that pprof handlers rely on
// routing structure not supported by our router, so we do a bit of manual
// routing.
//
// See also internal/pprof.go for more profiling goodies exposed through the
// admin portal.
routes.GET("/debug/pprof/*path", nil, func(c *router.Context) {
switch strings.TrimPrefix(c.Params.ByName("path"), "/") {
case "cmdline":
pprof.Cmdline(c.Writer, c.Request)
case "profile":
pprof.Profile(c.Writer, c.Request)
case "symbol":
pprof.Symbol(c.Writer, c.Request)
case "trace":
pprof.Trace(c.Writer, c.Request)
default:
pprof.Index(c.Writer, c.Request)
}
})
return nil
}
// initErrorReporting initializes an Error Report client.
func (s *Server) initErrorReporting() error {
if !s.Options.CloudErrorReporting || s.Options.CloudProject == "" {
return nil
}
// Get token source to call Error Reporting API.
var err error
s.errRptClient, err = errorreporting.NewClient(s.Context, s.Options.CloudProject, errorreporting.Config{
ServiceName: s.getServiceID(),
ServiceVersion: s.Options.imageVersion(),
OnError: func(err error) { logging.Errorf(s.Context, "Error Reporting could not log error: %s", err) },
}, option.WithTokenSource(s.cloudTS))
if err != nil {
return err
}
s.RegisterCleanup(func(ctx context.Context) { s.errRptClient.Close() })
return nil
}
// initWarmup schedules execution of global warmup callbacks.
//
// On GAE also registers /_ah/warmup route.
func (s *Server) initWarmup() error {
// See https://cloud.google.com/appengine/docs/standard/go/configuring-warmup-requests.
// All warmups should happen *before* the serving loop and /_ah/warmup should
// just always return OK.
if s.Options.GAE {
s.Routes.GET("/_ah/warmup", nil, func(*router.Context) {})
}
s.RegisterWarmup(func(ctx context.Context) { warmup.Warmup(ctx) })
return nil
}
// startRequestSpan opens a new per-request trace span.
//
// Reuses the existing trace (if specified in the request headers) or starts
// a new one.
func (s *Server) startRequestSpan(ctx context.Context, r *http.Request, skipSampling bool) (context.Context, *octrace.Span) {
var sampler octrace.Sampler
if skipSampling {
sampler = octrace.NeverSample()
} else {
sampler = s.sampler
}
// Add this span as a child to a span propagated through X-Cloud-Trace-Context
// header (if any). Start a new root span otherwise.
var span *octrace.Span
if parent, hasParent := cloudTraceFormat.SpanContextFromRequest(r); hasParent {
ctx, span = octrace.StartSpanWithRemoteParent(ctx, "HTTP:"+r.URL.Path, parent,
octrace.WithSpanKind(octrace.SpanKindServer),
octrace.WithSampler(sampler),
)
} else {
ctx, span = octrace.StartSpan(ctx, "HTTP:"+r.URL.Path,
octrace.WithSpanKind(octrace.SpanKindServer),
octrace.WithSampler(sampler),
)
}
// Request info (these are recognized by Stackdriver natively).
span.AddAttributes(
octrace.StringAttribute("/http/host", r.Host),
octrace.StringAttribute("/http/method", r.Method),
octrace.StringAttribute("/http/path", r.URL.Path),
)
return ctx, span
}
// signerImpl implements signing.Signer on top of *Server.
type signerImpl struct {
srv *Server
iamClient *credentials.IamCredentialsClient
}
// SignBytes signs the blob with some active private key.
func (s *signerImpl) SignBytes(ctx context.Context, blob []byte) (keyName string, signature []byte, err error) {
resp, err := s.iamClient.SignBlob(ctx, &credentialspb.SignBlobRequest{
Name: "projects/-/serviceAccounts/" + s.srv.runningAs,
Payload: blob,
})
if err != nil {
return "", nil, grpcutil.WrapIfTransient(err)
}
return resp.KeyId, resp.SignedBlob, nil
}
// Certificates returns a bundle with public certificates for all active keys.
func (s *signerImpl) Certificates(ctx context.Context) (*signing.PublicCertificates, error) {
return signing.FetchCertificatesForServiceAccount(ctx, s.srv.runningAs)
}
// ServiceInfo returns information about the current service.
func (s *signerImpl) ServiceInfo(ctx context.Context) (*signing.ServiceInfo, error) {
return &signing.ServiceInfo{
AppID: s.srv.Options.CloudProject,
AppRuntime: "go",
AppRuntimeVersion: runtime.Version(),
AppVersion: s.srv.Options.imageVersion(),
ServiceAccountName: s.srv.runningAs,
}, nil
}
// actorTokensImpl implements auth.ActorTokensProvider using IAM Credentials.
type actorTokensImpl struct {
iamClient *credentials.IamCredentialsClient
}
// GenerateAccessToken generates an access token for the given account.
func (a *actorTokensImpl) GenerateAccessToken(ctx context.Context, serviceAccount string, scopes []string) (*oauth2.Token, error) {
resp, err := a.iamClient.GenerateAccessToken(ctx, &credentialspb.GenerateAccessTokenRequest{
Name: "projects/-/serviceAccounts/" + serviceAccount,
Scope: scopes,
})
if err != nil {
return nil, grpcutil.WrapIfTransient(err)
}
return &oauth2.Token{
AccessToken: resp.AccessToken,
TokenType: "Bearer",
Expiry: resp.ExpireTime.AsTime(),
}, nil
}
// GenerateIDToken generates an ID token for the given account.
func (a *actorTokensImpl) GenerateIDToken(ctx context.Context, serviceAccount, audience string) (string, error) {
resp, err := a.iamClient.GenerateIdToken(ctx, &credentialspb.GenerateIdTokenRequest{
Name: "projects/-/serviceAccounts/" + serviceAccount,
Audience: audience,
IncludeEmail: true,
})
if err != nil {
return "", grpcutil.WrapIfTransient(err)
}
return resp.Token, nil
}
// networkAddrsForLog returns a string with IPv4 addresses of local network
// interfaces, if possible.
func networkAddrsForLog() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return fmt.Sprintf("failed to enumerate network interfaces: %s", err)
}
var ips []string
for _, address := range addrs {
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipv4 := ipnet.IP.To4(); ipv4 != nil {
ips = append(ips, ipv4.String())
}
}
}
if len(ips) == 0 {
return "<no IPv4 interfaces>"
}
return strings.Join(ips, ", ")
}
// getRemoteIP extracts end-user IP address from X-Forwarded-For header.
func getRemoteIP(r *http.Request) string {
// X-Forwarded-For header is set by Cloud Load Balancer and GAE frontend and
// has format:
// [<untrusted part>,]<IP that connected to LB>,<unimportant>[,<more>].
//
// <untrusted part> may be present if the original request from the Internet
// comes with X-Forwarded-For header. We can't trust IPs specified there. We
// assume Cloud Load Balancer and GAE sanitize the format of this field
// though.
//
// <IP that connected to LB> is what we are after.
//
// <unimportant> is "global forwarding rule external IP" for GKE or
// the constant "169.254.1.1" for GAE. We don't care about these.
//
// <more> is present only if we proxy the request through more layers of
// load balancers *while it is already inside GKE cluster*. We assume we don't
// do that (if we ever do, Options{...} should be extended with a setting that
// specifies how many layers of load balancers to skip to get to the original
// IP). On GAE <more> is always empty.
//
// See https://cloud.google.com/load-balancing/docs/https for more info.
forwardedFor := strings.Split(r.Header.Get("X-Forwarded-For"), ",")
if len(forwardedFor) >= 2 {
return strings.TrimSpace(forwardedFor[len(forwardedFor)-2])
}
// Fallback to the peer IP if X-Forwarded-For is not set. Happens when
// connecting to the server's port directly from within the cluster.
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return "0.0.0.0"
}
return ip
}
// getRequestURL reconstructs original request URL to log it (best effort).
func getRequestURL(r *http.Request) string {
proto := r.Header.Get("X-Forwarded-Proto")
if proto != "https" {
proto = "http"
}
host := r.Host
if r.Host == "" {
host = "127.0.0.1"
}
return fmt.Sprintf("%s://%s%s", proto, host, r.RequestURI)
}
// isHealthCheckRequest is true if the request appears to be coming from
// a known health check probe.
func isHealthCheckRequest(r *http.Request) bool {
if r.URL.Path == healthEndpoint {
switch ua := r.UserAgent(); {
case strings.HasPrefix(ua, "kube-probe/"): // Kubernetes
return true
case strings.HasPrefix(ua, "GoogleHC"): // Cloud Load Balancer
return true
}
}
return false
}
// resolveDependencies sorts modules based on their dependencies.
//
// Discovers unfulfilled required dependencies.
func resolveDependencies(mods []module.Module) ([]module.Module, error) {
// Build a map: module.Name => module.Module
modules := make(map[module.Name]module.Module, len(mods))
for _, m := range mods {
if _, ok := modules[m.Name()]; ok {
return nil, errors.Reason("duplicate module %q", m.Name()).Err()
}
modules[m.Name()] = m
}
// Ensure all required dependencies exist, throw away missing optional
// dependencies. The result is a directed graph that can be topo-sorted.
graph := map[module.Name][]module.Name{}
for _, m := range mods {
for _, d := range m.Dependencies() {
name := d.Dependency()
if _, exists := modules[name]; !exists {
if !d.Required() {
continue
}
return nil, errors.Reason("module %q requires module %q which is not provided", m.Name(), name).Err()
}
graph[m.Name()] = append(graph[m.Name()], name)
}
}
sorted := make([]module.Module, 0, len(graph))
visited := make(map[module.Name]bool, len(graph))
var visit func(n module.Name)
visit = func(n module.Name) {
if !visited[n] {
visited[n] = true
for _, dep := range graph[n] {
visit(dep)
}
sorted = append(sorted, modules[n])
}
}
for _, m := range mods {
visit(m.Name())
}
return sorted, nil
}