blob: 78d7fd2a294be8913d0e04ea31c8bb7462fa2a85 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package server implements an environment for running LUCI servers.
//
// It interprets command line flags and initializes the serving environment with
// the following core services:
//
// - go.chromium.org/luci/common/logging: logging via Google Cloud Logging.
// - go.opentelemetry.io/otel/trace: OpenTelemetry tracing with export to
// Google Cloud Trace.
// - go.chromium.org/luci/server/tsmon: monitoring metrics via ProdX.
// - go.chromium.org/luci/server/auth: sending and receiving RPCs
// authenticated with Google OAuth2 or OpenID tokens. Support for
// authorization via LUCI groups and LUCI realms.
// - go.chromium.org/luci/server/caching: in-process caching.
// - go.chromium.org/luci/server/warmup: allows other server components to
// register warmup callbacks that run before the server starts handling
// requests.
// - go.chromium.org/luci/server/experiments: simple feature flags support.
// - go.chromium.org/luci/grpc/prpc: pRPC server and RPC Explorer UI.
// - Error reporting via Google Cloud Error Reporting.
// - Continuous profiling via Google Cloud Profiler.
//
// Other functionality is optional and provided by modules (objects implementing
// module.Module interface). They should be passed to the server when it starts
// (see the example below). Modules usually expose their configuration via
// command line flags, and provide functionality by injecting state into
// the server's global context.Context or by exposing gRPC endpoints.
//
// Usage example:
//
// import (
// ...
//
// "go.chromium.org/luci/server"
// "go.chromium.org/luci/server/gaeemulation"
// "go.chromium.org/luci/server/module"
// "go.chromium.org/luci/server/redisconn"
// )
//
// func main() {
// modules := []module.Module{
// gaeemulation.NewModuleFromFlags(),
// redisconn.NewModuleFromFlags(),
// }
// server.Main(nil, modules, func(srv *server.Server) error {
// // Initialize global state, change root context (if necessary).
// if err := initializeGlobalStuff(srv.Context); err != nil {
// return err
// }
// srv.Context = injectGlobalStuff(srv.Context)
//
// // Install regular HTTP routes.
// srv.Routes.GET("/", nil, func(c *router.Context) {
// // ...
// })
//
// // Install gRPC services.
// servicepb.RegisterSomeServer(srv, &SomeServer{})
// return nil
// })
// }
//
// More examples can be found in the code search: https://source.chromium.org/search?q=%22server.Main%28nil%2C%20modules%2C%22
//
// # Known modules
//
// The following modules (in alphabetical order) are a part of the LUCI
// repository and can be used in any server binary:
//
// - go.chromium.org/luci/config/server/cfgmodule: provides LUCI Config
// client, exposes config validation endpoints used by LUCI Config service.
// - go.chromium.org/luci/server/analytics: generates Google Analytics js
// snippets for inclusion in a service's web pages.
// - go.chromium.org/luci/server/bqlog: implements best effort low-overhead
// structured logging to BigQuery suitable for debug data like access logs.
// - go.chromium.org/luci/server/cron: allows registering Cloud Scheduler (aka
// Appengine cron.yaml) handlers, with proper authentication and monitoring
// metrics.
// - go.chromium.org/luci/server/encryptedcookies: implements an
// authentication scheme for HTTP routes based on encrypted cookies and user
// sessions in some session store.
// - go.chromium.org/luci/server/dsmapper: provides a way to apply some
// function to all datastore entities of some particular kind, in parallel,
// distributing work via Cloud Tasks.
// - go.chromium.org/luci/server/gaeemulation: implements
// go.chromium.org/luci/gae Datastore interface via Google Cloud Datastore
// API. Named so because because it enables migration of GAEv1 apps to GAEv2
// without touching datastore-related code.
// - go.chromium.org/luci/server/gerritauth: implements authentication using
// Gerrit JWTs. Useful if a service is used by a Gerrit frontend plugin.
// - go.chromium.org/luci/server/limiter: a simple load shedding mechanism
// that puts a limit on a number of concurrent gRPC requests the server
// is handling.
// - go.chromium.org/luci/server/mailer: sending simple emails.
// - go.chromium.org/luci/server/redisconn: a Redis client. Also enables Redis
// as a caching backend for go.chromium.org/luci/server/caching and for
// go.chromium.org/luci/gae/filter/dscache.
// - go.chromium.org/luci/server/secrets: enables generation and validation of
// HMAC-tagged tokens via go.chromium.org/luci/server/tokens.
// - go.chromium.org/luci/server/span: a Cloud Spanner client. Wraps Spanner
// API a bit to improve interoperability with other modules (in particular
// the TQ module).
// - go.chromium.org/luci/server/tq: implements a task queue mechanism on top
// of Cloud Tasks and Cloud PubSub. Also implements transactional task
// enqueuing when submitting tasks in a Cloud Datastore or a Cloud Spanner
// transaction.
//
// Most of them need to be configured via corresponding CLI flags to be useful.
// See implementation of individual modules for details.
//
// An up-to-date list of all known module implementations can be found here:
// https://source.chromium.org/search?q=%22NewModuleFromFlags()%20module.Module%22
//
// # gRPC services
//
// The server implements grpc.ServiceRegistrar interface which means it can be
// used to register gRPC service implementations in. The registered services
// will be exposed via gRPC protocol over the gRPC port (if the gRPC serving
// port is configured in options) and via pRPC protocol over the main HTTP port
// (if the main HTTP serving port is configured in options). The server is also
// pre-configured with a set of gRPC interceptors that collect performance
// metrics, catch panics and authenticate requests. More interceptors can be
// added via RegisterUnaryServerInterceptors.
//
// # Security considerations
//
// The expected deployment environments are Kubernetes, Google App Engine and
// Google Cloud Run. In all cases the server is expected to be behind a load
// balancer or proxy (or a series of load balancers and proxies) that terminate
// TLS and set `X-Forwarded-For` and `X-Forwarded-Proto` headers. In particular
// `X-Forwarded-For` header should look like:
//
// [<untrusted part>,]<IP that connected to the LB>,<unimportant>[,<more>].
//
// Where `<untrusted part>` may be present if the original request from the
// Internet comes with `X-Forwarded-For` header. The IP specified there is not
// trusted, but the server assumes the load balancer at least sanitizes the
// format of this field.
//
// `<IP that connected to the LB>` is the end-client IP that can be used by the
// server for logs and for IP-allowlist checks.
//
// `<unimportant>` is a "global forwarding rule external IP" for GKE or
// the constant "169.254.1.1" for GAE and Cloud Run. It is unused. See
// https://cloud.google.com/load-balancing/docs/https for more info.
//
// `<more>` may be present if the request was proxied through more layers of
// load balancers while already inside the cluster. The server currently assumes
// this is not happening (i.e. `<more>` is absent, or, in other words, the
// client IP is the second to last in the `X-Forwarded-For` list). If you need
// to recognize more layers of load balancing, please file a feature request to
// add a CLI flag specifying how many layers of load balancers to skip to get to
// the original IP.
package server
import (
"context"
cryptorand "crypto/rand"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"flag"
"fmt"
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
gcemetadata "cloud.google.com/go/compute/metadata"
"cloud.google.com/go/errorreporting"
credentials "cloud.google.com/go/iam/credentials/apiv1"
"cloud.google.com/go/iam/credentials/apiv1/credentialspb"
"cloud.google.com/go/profiler"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
gcppropagator "github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
otelmetricnoop "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
oteltrace "go.opentelemetry.io/otel/trace"
oteltracenoop "go.opentelemetry.io/otel/trace/noop"
"golang.org/x/oauth2"
"google.golang.org/api/option"
codepb "google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
clientauth "go.chromium.org/luci/auth"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
luciflag "go.chromium.org/luci/common/flag"
"go.chromium.org/luci/common/flag/stringlistflag"
"go.chromium.org/luci/common/iotools"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"go.chromium.org/luci/common/logging/sdlogger"
"go.chromium.org/luci/common/system/signals"
tsmoncommon "go.chromium.org/luci/common/tsmon"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/monitor"
"go.chromium.org/luci/common/tsmon/target"
"go.chromium.org/luci/grpc/discovery"
"go.chromium.org/luci/grpc/grpcmon"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/grpc/prpc"
"go.chromium.org/luci/hardcoded/chromeinfra" // should be used ONLY in Main()
"go.chromium.org/luci/web/rpcexplorer"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/server/auth/authdb"
"go.chromium.org/luci/server/auth/authdb/dump"
"go.chromium.org/luci/server/auth/openid"
"go.chromium.org/luci/server/auth/signing"
"go.chromium.org/luci/server/caching"
"go.chromium.org/luci/server/experiments"
"go.chromium.org/luci/server/internal"
"go.chromium.org/luci/server/internal/gae"
"go.chromium.org/luci/server/middleware"
"go.chromium.org/luci/server/module"
"go.chromium.org/luci/server/portal"
"go.chromium.org/luci/server/router"
"go.chromium.org/luci/server/secrets"
"go.chromium.org/luci/server/tsmon"
"go.chromium.org/luci/server/warmup"
)
const (
// Path of the health check endpoint.
healthEndpoint = "/healthz"
// Log a warning if health check is slower than this.
healthTimeLogThreshold = 50 * time.Millisecond
defaultTsMonFlushInterval = 60 * time.Second
defaultTsMonFlushTimeout = 15 * time.Second
)
var (
versionMetric = metric.NewString(
"server/version",
"Version of the running container image (taken from -container-image-id).",
nil)
)
// cloudRegionFromGAERegion maps GAE region codes (e.g. `s`) to corresponding
// cloud regions (e.g. `us-central1`), which may be defined as regions where GAE
// creates resources associated with the app, such as Task Queues or Flex VMs.
//
// Sadly this mapping is not documented, thus the below map is incomplete. Feel
// free to modify it if you deployed to some new GAE region.
//
// This mapping is unused if `-cloud-region` flag is passed explicitly.
var cloudRegionFromGAERegion = map[string]string{
"e": "europe-west1",
"g": "europe-west2",
"h": "europe-west3",
"m": "us-west2",
"p": "us-east1",
"s": "us-central1",
}
// Context key of *incomingRequest{...}, see httpRoot(...) and grpcRoot(...).
var incomingRequestKey = "go.chromium.org/luci/server.incomingRequest"
// Main initializes the server and runs its serving loop until SIGTERM.
//
// Registers all options in the default flag set and uses `flag.Parse` to parse
// them. If 'opts' is nil, the default options will be used. Only flags are
// allowed in the command line (no positional arguments).
//
// Additionally recognizes GAE_* and K_* env vars as an indicator that the
// server is running in the corresponding serverless runtime. This slightly
// tweaks its behavior to match what these runtimes expects from servers.
//
// On errors, logs them and aborts the process with non-zero exit code.
func Main(opts *Options, mods []module.Module, init func(srv *Server) error) {
// Prepopulate defaults for flags based on the runtime environment.
opts, err := OptionsFromEnv(opts)
if err != nil {
fmt.Fprintf(os.Stderr, "When constructing options: %s\n", err)
os.Exit(3)
}
// Register and parse server flags.
opts.Register(flag.CommandLine)
flag.Parse()
if args := flag.Args(); len(args) > 0 {
fmt.Fprintf(os.Stderr, "got unexpected positional command line arguments: %v\n", args)
os.Exit(3)
}
// Construct the server and run its serving loop.
srv, err := New(context.Background(), *opts, mods)
if err != nil {
srv.Fatal(err)
}
if init != nil {
if err = init(srv); err != nil {
srv.Fatal(err)
}
}
if err = srv.Serve(); err != nil {
srv.Fatal(err)
}
}
// Options are used to configure the server.
//
// Most of them are exposed as command line flags (see Register implementation).
// Some (specific to serverless runtimes) are only settable through code or are
// derived from the environment.
type Options struct {
Prod bool // set when running in production (not on a dev workstation)
Serverless module.Serverless // set when running in a serverless environment, implies Prod
Hostname string // used for logging and metric fields, default is os.Hostname
HTTPAddr string // address to bind the main listening socket to
GRPCAddr string // address to bind the gRPC listening socket to
AdminAddr string // address to bind the admin socket to, ignored on GAE and Cloud Run
AllowH2C bool // if true, allow HTTP/2 Cleartext traffic on non-gRPC HTTP ports
DefaultRequestTimeout time.Duration // how long non-internal HTTP handlers are allowed to run, 1 min by default
InternalRequestTimeout time.Duration // how long "/internal/*" HTTP handlers are allowed to run, 10 min by default
ShutdownDelay time.Duration // how long to wait after SIGTERM before shutting down
ClientAuth clientauth.Options // base settings for client auth options
TokenCacheDir string // where to cache auth tokens (optional)
AuthDBProvider auth.DBProvider // source of the AuthDB: if set all Auth* options below are ignored
AuthDBPath string // if set, load AuthDB from a file
AuthServiceHost string // hostname of an Auth Service to use
AuthDBDump string // Google Storage path to fetch AuthDB dumps from
AuthDBSigner string // service account that signs AuthDB dumps
FrontendClientID string // OAuth2 ClientID for frontend (e.g. user sign in)
OpenIDRPCAuthEnable bool // if true, use OIDC identity tokens for RPC authentication
OpenIDRPCAuthAudience stringlistflag.Flag // additional allowed OIDC token audiences
CloudProject string // name of the hosting Google Cloud Project
CloudRegion string // name of the hosting Google Cloud region
TraceSampling string // what portion of traces to upload to Cloud Trace (ignored on GAE and Cloud Run)
TsMonAccount string // service account to flush metrics as
TsMonServiceName string // service name of tsmon target
TsMonJobName string // job name of tsmon target
TsMonFlushInterval time.Duration // how often to flush metrics
TsMonFlushTimeout time.Duration // timeout for flushing
ProfilingProbability float64 // an [0; 1.0] float with a chance to enable Cloud Profiler in the process
ProfilingServiceID string // service name to associated with profiles in Cloud Profiler
ContainerImageID string // ID of the container image with this binary, for logs (optional)
EnableExperiments []string // names of go.chromium.org/luci/server/experiments to enable
CloudErrorReporting bool // set to true to enable Cloud Error Reporting
testSeed int64 // used to seed rng in tests
testStdout sdlogger.LogEntryWriter // mocks stdout in tests
testStderr sdlogger.LogEntryWriter // mocks stderr in tests
testListeners map[string]net.Listener // addr => net.Listener, for tests
testDisableTracing bool // don't install a tracing backend
}
// OptionsFromEnv prepopulates options based on the runtime environment.
//
// It detects if the process is running on GAE or Cloud Run and adjust options
// accordingly. See FromGAEEnv and FromCloudRunEnv for exact details of how it
// happens.
//
// Either mutates give `opts`, returning it in the end, or (if `opts` is nil)
// create new Options.
func OptionsFromEnv(opts *Options) (*Options, error) {
if opts == nil {
opts = &Options{}
}
// Populate unset ClientAuth fields with hardcoded defaults.
authDefaults := chromeinfra.DefaultAuthOptions()
if opts.ClientAuth.ClientID == "" {
opts.ClientAuth.ClientID = authDefaults.ClientID
opts.ClientAuth.ClientSecret = authDefaults.ClientSecret
}
if opts.ClientAuth.TokenServerHost == "" {
opts.ClientAuth.TokenServerHost = authDefaults.TokenServerHost
}
if opts.ClientAuth.SecretsDir == "" {
opts.ClientAuth.SecretsDir = authDefaults.SecretsDir
}
if opts.ClientAuth.LoginSessionsHost == "" {
opts.ClientAuth.LoginSessionsHost = authDefaults.LoginSessionsHost
}
// Use CloudOAuthScopes by default when using UserCredentialsMethod auth mode.
// This is ignored when running in the cloud (the server uses the ambient
// credentials provided by the environment).
if len(opts.ClientAuth.Scopes) == 0 {
opts.ClientAuth.Scopes = auth.CloudOAuthScopes
}
// Prepopulate defaults for flags based on the runtime environment.
opts.FromGAEEnv()
if err := opts.FromCloudRunEnv(); err != nil {
return nil, errors.Annotate(err, "failed to probe Cloud Run environment").Err()
}
return opts, nil
}
// Register registers the command line flags.
func (o *Options) Register(f *flag.FlagSet) {
if o.HTTPAddr == "" {
o.HTTPAddr = "localhost:8800"
}
if o.GRPCAddr == "" {
o.GRPCAddr = "-" // disabled by default
}
if o.AdminAddr == "" {
o.AdminAddr = "localhost:8900"
}
if o.DefaultRequestTimeout == 0 {
o.DefaultRequestTimeout = time.Minute
}
if o.InternalRequestTimeout == 0 {
o.InternalRequestTimeout = 10 * time.Minute
}
if o.ShutdownDelay == 0 {
o.ShutdownDelay = 15 * time.Second
}
if o.TsMonFlushInterval == 0 {
o.TsMonFlushInterval = defaultTsMonFlushInterval
}
if o.TsMonFlushTimeout == 0 {
o.TsMonFlushTimeout = defaultTsMonFlushTimeout
}
if o.ProfilingProbability == 0 {
o.ProfilingProbability = 1.0
} else if o.ProfilingProbability < 0 {
o.ProfilingProbability = 0
}
f.BoolVar(&o.Prod, "prod", o.Prod, "Switch the server into production mode")
f.StringVar(&o.HTTPAddr, "http-addr", o.HTTPAddr, "Address to bind the main listening socket to or '-' to disable")
f.StringVar(&o.GRPCAddr, "grpc-addr", o.GRPCAddr, "Address to bind the gRPC listening socket to or '-' to disable")
f.StringVar(&o.AdminAddr, "admin-addr", o.AdminAddr, "Address to bind the admin socket to or '-' to disable")
f.BoolVar(&o.AllowH2C, "allow-h2c", o.AllowH2C, "If set, allow HTTP/2 Cleartext traffic on non-gRPC HTTP ports (in addition to HTTP/1 traffic). The gRPC port always allows it, it is essential for gRPC")
f.DurationVar(&o.DefaultRequestTimeout, "default-request-timeout", o.DefaultRequestTimeout, "How long incoming HTTP requests are allowed to run before being canceled (or 0 for infinity)")
f.DurationVar(&o.InternalRequestTimeout, "internal-request-timeout", o.InternalRequestTimeout, "How long incoming /internal/* HTTP requests are allowed to run before being canceled (or 0 for infinity)")
f.DurationVar(&o.ShutdownDelay, "shutdown-delay", o.ShutdownDelay, "How long to wait after SIGTERM before shutting down")
f.StringVar(
&o.ClientAuth.ServiceAccountJSONPath,
"service-account-json",
o.ClientAuth.ServiceAccountJSONPath,
"Path to a JSON file with service account private key",
)
f.StringVar(
&o.ClientAuth.ActAsServiceAccount,
"act-as",
o.ClientAuth.ActAsServiceAccount,
"Act as this service account",
)
f.StringVar(
&o.TokenCacheDir,
"token-cache-dir",
o.TokenCacheDir,
"Where to cache auth tokens (optional)",
)
f.StringVar(
&o.AuthDBPath,
"auth-db-path",
o.AuthDBPath,
"If set, load AuthDB text proto from this file (incompatible with -auth-service-host)",
)
f.StringVar(
&o.AuthServiceHost,
"auth-service-host",
o.AuthServiceHost,
"Hostname of an Auth Service to use (incompatible with -auth-db-path)",
)
f.StringVar(
&o.AuthDBDump,
"auth-db-dump",
o.AuthDBDump,
"Google Storage path to fetch AuthDB dumps from. Default is gs://<auth-service-host>/auth-db",
)
f.StringVar(
&o.AuthDBSigner,
"auth-db-signer",
o.AuthDBSigner,
"Service account that signs AuthDB dumps. Default is derived from -auth-service-host if it is *.appspot.com",
)
f.StringVar(
&o.FrontendClientID,
"frontend-client-id",
o.FrontendClientID,
"OAuth2 clientID for use in frontend, e.g. for user sign in (optional)",
)
f.BoolVar(
&o.OpenIDRPCAuthEnable,
"open-id-rpc-auth-enable",
o.OpenIDRPCAuthEnable,
"If set accept OpenID Connect ID tokens as per-RPC credentials",
)
f.Var(
&o.OpenIDRPCAuthAudience,
"open-id-rpc-auth-audience",
"Additional accepted value of `aud` claim in OpenID tokens, can be repeated",
)
f.StringVar(
&o.CloudProject,
"cloud-project",
o.CloudProject,
"Name of hosting Google Cloud Project (optional)",
)
f.StringVar(
&o.CloudRegion,
"cloud-region",
o.CloudRegion,
"Name of hosting Google Cloud region, e.g. 'us-central1' (optional)",
)
f.StringVar(
&o.TraceSampling,
"trace-sampling",
o.TraceSampling,
"What portion of traces to upload to Cloud Trace. Either a percent (i.e. '0.1%') or a QPS (i.e. '1qps'). Ignored on GAE and Cloud Run. Default is 0.1qps.",
)
f.StringVar(
&o.TsMonAccount,
"ts-mon-account",
o.TsMonAccount,
"Collect and flush tsmon metrics using this account for auth (disables tsmon if not set)",
)
f.StringVar(
&o.TsMonServiceName,
"ts-mon-service-name",
o.TsMonServiceName,
"Service name of tsmon target (disables tsmon if not set)",
)
f.StringVar(
&o.TsMonJobName,
"ts-mon-job-name",
o.TsMonJobName,
"Job name of tsmon target (disables tsmon if not set)",
)
f.DurationVar(
&o.TsMonFlushInterval,
"ts-mon-flush-interval",
o.TsMonFlushInterval,
fmt.Sprintf("How often to flush tsmon metrics. Default to %s if < 1s or unset", o.TsMonFlushInterval),
)
f.DurationVar(
&o.TsMonFlushTimeout,
"ts-mon-flush-timeout",
o.TsMonFlushTimeout,
fmt.Sprintf("Timeout for tsmon flush. Default to %s if < 1s or unset. Must be shorter than --ts-mon-flush-interval.", o.TsMonFlushTimeout),
)
f.Float64Var(
&o.ProfilingProbability,
"profiling-probability",
o.ProfilingProbability,
fmt.Sprintf("A float [0; 1.0] with probability to enable Cloud Profiler for the current process. Default is %f.", o.ProfilingProbability),
)
f.StringVar(
&o.ProfilingServiceID,
"profiling-service-id",
o.ProfilingServiceID,
"Service name to associated with profiles in Cloud Profiler. Defaults to the value of -ts-mon-job-name.",
)
f.StringVar(
&o.ContainerImageID,
"container-image-id",
o.ContainerImageID,
"ID of the container image with this binary, for logs (optional)",
)
f.BoolVar(
&o.CloudErrorReporting,
"cloud-error-reporting",
o.CloudErrorReporting,
"Enable Cloud Error Reporting",
)
// See go.chromium.org/luci/server/experiments.
f.Var(luciflag.StringSlice(&o.EnableExperiments), "enable-experiment",
`A name of the experiment to enable. May be repeated.`)
}
// FromGAEEnv uses the GAE_* env vars to configure the server for the GAE
// environment.
//
// Does nothing if GAE_VERSION is not set.
//
// Equivalent to passing the following flags:
//
// -prod
// -http-addr 0.0.0.0:${PORT}
// -admin-addr -
// -shutdown-delay 1s
// -cloud-project ${GOOGLE_CLOUD_PROJECT}
// -cloud-region <derived from the region code in GAE_APPLICATION>
// -service-account-json :gce
// -ts-mon-service-name ${GOOGLE_CLOUD_PROJECT}
// -ts-mon-job-name ${GAE_SERVICE}
//
// Additionally the hostname and -container-image-id (used in metric and trace
// fields) are derived from available GAE_* env vars to be semantically similar
// to what they represent in the GKE environment.
//
// Note that a mapping between a region code in GAE_APPLICATION and
// the corresponding cloud region is not documented anywhere, so if you see
// warnings when your app starts up either update the code to recognize your
// region code or pass '-cloud-region' argument explicitly in app.yaml.
//
// See https://cloud.google.com/appengine/docs/standard/go/runtime.
func (o *Options) FromGAEEnv() {
if os.Getenv("GAE_VERSION") == "" {
return
}
o.Serverless = module.GAE
o.Prod = true
o.Hostname = uniqueServerlessHostname(
os.Getenv("GAE_SERVICE"),
os.Getenv("GAE_DEPLOYMENT_ID"),
os.Getenv("GAE_INSTANCE"),
)
o.HTTPAddr = fmt.Sprintf("0.0.0.0:%s", os.Getenv("PORT"))
o.GRPCAddr = "-"
o.AdminAddr = "-"
o.ShutdownDelay = time.Second
o.CloudProject = os.Getenv("GOOGLE_CLOUD_PROJECT")
o.ClientAuth.ServiceAccountJSONPath = clientauth.GCEServiceAccount
o.ClientAuth.GCESupportsArbitraryScopes = true
o.TsMonServiceName = os.Getenv("GOOGLE_CLOUD_PROJECT")
o.TsMonJobName = os.Getenv("GAE_SERVICE")
o.ContainerImageID = fmt.Sprintf("appengine/%s/%s:%s",
os.Getenv("GOOGLE_CLOUD_PROJECT"),
os.Getenv("GAE_SERVICE"),
os.Getenv("GAE_VERSION"),
)
// Note: GAE_APPLICATION is missing on Flex.
if appID := os.Getenv("GAE_APPLICATION"); appID != "" && o.CloudRegion == "" {
o.CloudRegion = cloudRegionFromGAERegion[strings.Split(appID, "~")[0]]
}
}
// FromCloudRunEnv recognized K_SERVICE environment variable and configures
// some options based on what it discovers in the environment.
//
// Does nothing if K_SERVICE is not set.
//
// Equivalent to passing the following flags:
//
// -prod
// -http-addr -
// -grpc-addr -
// -admin-addr -
// -allow-h2c
// -shutdown-delay 1s
// -cloud-project <cloud project Cloud Run container is running in>
// -cloud-region <cloud region Cloud Run container is running in>
// -service-account-json :gce
// -open-id-rpc-auth-enable
// -ts-mon-service-name <cloud project Cloud Run container is running in>
// -ts-mon-job-name ${K_SERVICE}
//
// Flags passed via the actual command line in the Cloud Run manifest override
// these prefilled defaults. In particular pass either `-http-addr` or
// `-grpc-addr` (or both) to enable corresponding ports.
//
// Additionally the hostname (used in metric and trace fields) is derived from
// environment to be semantically similar to what it looks like in the GKE
// environment.
func (o *Options) FromCloudRunEnv() error {
if os.Getenv("K_SERVICE") == "" {
return nil
}
// See https://cloud.google.com/run/docs/container-contract.
project, err := gcemetadata.Get("project/project-id")
if err != nil {
return errors.Annotate(err, "failed to get the project ID").Err()
}
region, err := gcemetadata.Get("instance/region")
if err != nil {
return errors.Annotate(err, "failed to get the cloud region").Err()
}
// Region format returned by Cloud Run is `projects/PROJECT-NUMBER/regions/REGION`
parts := strings.Split(region, "/")
region = parts[len(parts)-1]
instance, err := gcemetadata.Get("instance/id")
if err != nil {
return errors.Annotate(err, "failed to get the instance ID").Err()
}
o.Serverless = module.CloudRun
o.Prod = true
o.Hostname = uniqueServerlessHostname(os.Getenv("K_REVISION"), instance)
o.HTTPAddr = "-"
o.GRPCAddr = "-"
o.AdminAddr = "-"
o.AllowH2C = true // to allow using HTTP2 end-to-end with `--use-http2` deployment flag
o.ShutdownDelay = time.Second
o.CloudProject = project
o.CloudRegion = region
o.ClientAuth.ServiceAccountJSONPath = clientauth.GCEServiceAccount
o.ClientAuth.GCESupportsArbitraryScopes = true
o.OpenIDRPCAuthEnable = true
o.TsMonServiceName = project
o.TsMonJobName = os.Getenv("K_SERVICE")
return nil
}
// uniqueServerlessHostname generates a hostname to use when running in a GCP
// serverless environment.
//
// Unlike GKE or GCE environments, serverless containers do not have a proper
// unique hostname set, but we still need to identify them uniquely in logs
// and monitoring metrics. They do have a giant hex instance ID string, but it
// is not informative on its own and cumbersome to use.
//
// This functions produces a reasonably readable and unique string that looks
// like `parts[0]-parts[1]-...-hash(parts[last])`. It assumes the last string
// in `parts` is the giant instance ID.
func uniqueServerlessHostname(parts ...string) string {
id := sha256.Sum256([]byte(parts[len(parts)-1]))
parts[len(parts)-1] = hex.EncodeToString(id[:])[:16]
return strings.Join(parts, "-")
}
// ImageVersion extracts image tag or digest from ContainerImageID.
//
// This is eventually reported as a value of 'server/version' metric.
//
// On GAE it would return the service version name based on GAE_VERSION env var,
// since ContainerImageID is artificially constructed to look like
// "appengine/${CLOUD_PROJECT}/${GAE_SERVICE}:${GAE_VERSION}".
//
// On Cloud Run it is responsibility of the deployment layer to correctly
// populate -container-image-id command line flag.
//
// Returns "unknown" if ContainerImageID is empty or malformed.
func (o *Options) ImageVersion() string {
// Recognize "<path>@sha256:<digest>" and "<path>:<tag>".
idx := strings.LastIndex(o.ContainerImageID, "@")
if idx == -1 {
idx = strings.LastIndex(o.ContainerImageID, ":")
}
if idx == -1 {
return "unknown"
}
return o.ContainerImageID[idx+1:]
}
// ImageName extracts image name from ContainerImageID.
//
// This is the part of ContainerImageID before ':' or '@'.
func (o *Options) ImageName() string {
// Recognize "<path>@sha256:<digest>" and "<path>:<tag>".
idx := strings.LastIndex(o.ContainerImageID, "@")
if idx == -1 {
idx = strings.LastIndex(o.ContainerImageID, ":")
}
if idx == -1 {
return "unknown"
}
return o.ContainerImageID[:idx]
}
// userAgent derives a user-agent like string identifying the server.
func (o *Options) userAgent() string {
return fmt.Sprintf("LUCI-Server (service: %s; job: %s; ver: %s);", o.TsMonServiceName, o.TsMonJobName, o.ImageVersion())
}
// shouldEnableTracing is true if options indicate we should enable tracing.
func (o *Options) shouldEnableTracing() bool {
switch {
case o.CloudProject == "":
return false // nowhere to upload traces to
case !o.Prod && o.TraceSampling == "":
return false // in dev mode don't upload samples by default
default:
return !o.testDisableTracing
}
}
// hostOptions constructs HostOptions for module.Initialize(...).
func (o *Options) hostOptions() module.HostOptions {
return module.HostOptions{
Prod: o.Prod,
Serverless: o.Serverless,
CloudProject: o.CloudProject,
CloudRegion: o.CloudRegion,
}
}
// Server is responsible for initializing and launching the serving environment.
//
// Generally assumed to be a singleton: do not launch multiple Server instances
// within the same process, use AddPort instead if you want to expose multiple
// HTTP ports with different routers.
//
// Server can serve plain HTTP endpoints, routing them trough a router.Router,
// and gRPC APIs (exposing them over gRPC and pRPC protocols). Use an instance
// of Server as a grpc.ServiceRegistrar when registering gRPC services. Services
// registered that way will be available via gRPC protocol over the gRPC port
// and via pRPC protocol over the main HTTP port. Interceptors can be added via
// RegisterUnaryServerInterceptors. RPC authentication can be configured via
// SetRPCAuthMethods.
//
// pRPC protocol is served on the same port as the main HTTP router, making it
// possible to expose just a single HTTP port for everything (which is a
// requirement on Appengine).
//
// Native gRPC protocol is always served though a dedicated gRPC h2c port since
// the gRPC library has its own HTTP/2 server implementation not compatible
// with net/http package used everywhere else. There's an assortments of hacks
// to workaround this, but many ultimately depend on experimental and slow
// grpc.Server.ServeHTTP method. See https://github.com/grpc/grpc-go/issues/586
// and https://github.com/grpc/grpc-go/issues/4620. Another often recommended
// workaround is https://github.com/soheilhy/cmux, which decides if a new
// connection is a gRPC one or a regular HTTP/2 one. It doesn't work when the
// server is running behind a load balancer that understand HTTP/2, since it
// just opens a **single** backend connection and sends both gRPC and regular
// HTTP/2 requests over it. This happens on Cloud Run, for example. See e.g.
// https://ahmet.im/blog/grpc-http-mux-go/.
//
// If you want to serve HTTP and gRPC over the same public port, configure your
// HTTP load balancer (e.g. https://cloud.google.com/load-balancing/docs/https)
// to route requests into appropriate containers and ports. Another alternative
// is to put an HTTP/2 proxy (e.g. Envoy) right into the pod with the server
// process and route traffic "locally" there. This option would also allow to
// add local grpc-web proxy into the mix if necessary.
//
// The server doesn't do TLS termination (even for gRPC traffic). It must be
// sitting behind a load balancer or a proxy that terminates TLS and sends clear
// text (HTTP/1 or HTTP/2 for gRPC) requests to corresponding ports, injecting
// `X-Forwarded-*` headers. See "Security considerations" section above for more
// details.
type Server struct {
// Context is the root context used by all requests and background activities.
//
// Can be replaced (by a derived context) before Serve call, for example to
// inject values accessible to all request handlers.
Context context.Context
// Routes is a router for requests hitting HTTPAddr port.
//
// This router is used for all requests whose Host header does not match any
// specially registered per-host routers (see VirtualHost). Normally, there
// are no such per-host routers, so usually Routes is used for all requests.
//
// This router is also accessible to the server modules and they can install
// routes into it.
//
// Should be populated before Serve call.
Routes *router.Router
// CookieAuth is an authentication method implemented via cookies.
//
// It is initialized only if the server has a module implementing such scheme
// (e.g. "go.chromium.org/luci/server/encryptedcookies").
CookieAuth auth.Method
// Options is a copy of options passed to New.
Options Options
startTime time.Time // for calculating uptime for /healthz
lastReqTime atomic.Value // time.Time when the last request started
stdout sdlogger.LogEntryWriter // for logging to stdout, nil in dev mode
stderr sdlogger.LogEntryWriter // for logging to stderr, nil in dev mode
errRptClient *errorreporting.Client // for reporting to the cloud Error Reporting
logRequestCB func(context.Context, *sdlogger.LogEntry) // if non-nil, need to emit request log entries via it
mainPort *Port // pre-registered main HTTP port, see initMainPort
grpcPort *grpcPort // non-nil when exposing a gRPC port
prpc *prpc.Server // pRPC server implementation exposed on the main port
mu sync.Mutex // protects fields below
ports []servingPort // all non-dummy ports (each one bound to a TCP socket)
started bool // true inside and after Serve
stopped bool // true inside and after Shutdown
ready chan struct{} // closed right before starting the serving loop
done chan struct{} // closed after Shutdown returns
// gRPC/pRPC configuration.
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
rpcAuthMethods []auth.Method
rndM sync.Mutex // protects rnd
rnd *rand.Rand // used to generate trace and operation IDs
bgrDone chan struct{} // closed to stop background activities
bgrWg sync.WaitGroup // waits for RunInBackground goroutines to stop
warmupM sync.Mutex // protects 'warmup' and the actual warmup critical section
warmup []func(context.Context)
cleanupM sync.Mutex // protects 'cleanup' and the actual cleanup critical section
cleanup []func(context.Context)
tsmon *tsmon.State // manages flushing of tsmon metrics
propagator propagation.TextMapPropagator // knows how to propagate trace headers
cloudTS oauth2.TokenSource // source of cloud-scoped tokens for Cloud APIs
signer *signerImpl // the signer used by the auth system
actorTokens *actorTokensImpl // for impersonating service accounts
authDB atomic.Value // if not using AuthDBProvider, the last known good authdb.DB instance
runningAs string // email of an account the server runs as
}
// servingPort represents either an HTTP or gRPC serving port.
type servingPort interface {
nameForLog() string
serve(baseCtx func() context.Context) error
shutdown(ctx context.Context)
}
// moduleHostImpl implements module.Host via server.Server.
//
// Just a tiny wrapper to make sure modules consume only curated limited set of
// the server API and do not retain the pointer to the server.
type moduleHostImpl struct {
srv *Server
mod module.Module
invalid bool
cookieAuth auth.Method
}
func (h *moduleHostImpl) panicIfInvalid() {
if h.invalid {
panic("module.Host must not be used outside of Initialize")
}
}
func (h *moduleHostImpl) HTTPAddr() net.Addr {
h.panicIfInvalid()
if h.srv.mainPort.listener != nil {
return h.srv.mainPort.listener.Addr()
}
return nil
}
func (h *moduleHostImpl) GRPCAddr() net.Addr {
h.panicIfInvalid()
if h.srv.grpcPort != nil {
return h.srv.grpcPort.listener.Addr()
}
return nil
}
func (h *moduleHostImpl) Routes() *router.Router {
h.panicIfInvalid()
return h.srv.Routes
}
func (h *moduleHostImpl) RunInBackground(activity string, f func(context.Context)) {
h.panicIfInvalid()
h.srv.RunInBackground(activity, f)
}
func (h *moduleHostImpl) RegisterWarmup(cb func(context.Context)) {
h.panicIfInvalid()
h.srv.RegisterWarmup(cb)
}
func (h *moduleHostImpl) RegisterCleanup(cb func(context.Context)) {
h.panicIfInvalid()
h.srv.RegisterCleanup(cb)
}
func (h *moduleHostImpl) RegisterService(desc *grpc.ServiceDesc, impl any) {
h.panicIfInvalid()
h.srv.RegisterService(desc, impl)
}
func (h *moduleHostImpl) RegisterUnaryServerInterceptors(intr ...grpc.UnaryServerInterceptor) {
h.panicIfInvalid()
h.srv.RegisterUnaryServerInterceptors(intr...)
}
func (h *moduleHostImpl) RegisterStreamServerInterceptors(intr ...grpc.StreamServerInterceptor) {
h.panicIfInvalid()
h.srv.RegisterStreamServerInterceptors(intr...)
}
func (h *moduleHostImpl) RegisterCookieAuth(method auth.Method) {
h.panicIfInvalid()
h.cookieAuth = method
}
// New constructs a new server instance.
//
// It hosts one or more HTTP servers and starts and stops them in unison. It is
// also responsible for preparing contexts for incoming requests.
//
// The given context will become the root context of the server and will be
// inherited by all handlers.
//
// On errors returns partially initialized server (always non-nil). At least
// its logging will be configured and can be used to report the error. Trying
// to use such partially initialized server for anything else is undefined
// behavior.
func New(ctx context.Context, opts Options, mods []module.Module) (srv *Server, err error) {
seed := opts.testSeed
if seed == 0 {
if err := binary.Read(cryptorand.Reader, binary.BigEndian, &seed); err != nil {
panic(err)
}
}
srv = &Server{
Context: ctx,
Options: opts,
startTime: clock.Now(ctx).UTC(),
ready: make(chan struct{}),
done: make(chan struct{}),
rnd: rand.New(rand.NewSource(seed)),
bgrDone: make(chan struct{}),
}
// Cleanup what we can on failures.
defer func() {
if err != nil {
srv.runCleanup()
}
}()
// Logging is needed to report any errors during the early initialization.
srv.initLogging()
logging.Infof(srv.Context, "Server starting...")
if srv.Options.ContainerImageID != "" {
logging.Infof(srv.Context, "Container image is %s", srv.Options.ContainerImageID)
}
// Need the hostname (e.g. pod name on k8s) for logs and metrics.
if srv.Options.Hostname == "" {
srv.Options.Hostname, err = os.Hostname()
if err != nil {
return srv, errors.Annotate(err, "failed to get own hostname").Err()
}
}
switch srv.Options.Serverless {
case module.GAE:
logging.Infof(srv.Context, "Running on %s", srv.Options.Hostname)
logging.Infof(srv.Context, "Instance is %q", os.Getenv("GAE_INSTANCE"))
if srv.Options.CloudRegion == "" {
if appID := os.Getenv("GAE_APPLICATION"); appID != "" {
logging.Warningf(srv.Context, "Could not figure out the primary Cloud region based "+
"on the region code in GAE_APPLICATION %q, consider passing the region name "+
"via -cloud-region flag explicitly", appID)
}
} else {
logging.Infof(srv.Context, "Cloud region is %s", srv.Options.CloudRegion)
}
// Initialize default tickets for background activities. These tickets are
// overridden in per-request contexts with request-specific tickets.
srv.Context = gae.WithTickets(srv.Context, gae.DefaultTickets())
case module.CloudRun:
logging.Infof(srv.Context, "Running on %s", srv.Options.Hostname)
logging.Infof(srv.Context, "Revision is %q", os.Getenv("K_REVISION"))
default:
// On k8s log pod IPs too, this is useful when debugging k8s routing.
logging.Infof(srv.Context, "Running on %s (%s)", srv.Options.Hostname, networkAddrsForLog())
}
// Log enabled experiments, warn if some of them are unknown now.
var exps []experiments.ID
for _, name := range opts.EnableExperiments {
if exp, ok := experiments.GetByName(name); ok {
logging.Infof(ctx, "Enabling experiment %q", name)
exps = append(exps, exp)
} else {
logging.Warningf(ctx, "Skipping unknown experiment %q", name)
}
}
srv.Context = experiments.Enable(srv.Context, exps...)
// Configure base server subsystems by injecting them into the root context
// inherited later by all requests.
srv.Context = caching.WithProcessCacheData(srv.Context, caching.NewProcessCacheData())
if err := srv.initAuthStart(); err != nil {
return srv, errors.Annotate(err, "failed to initialize auth").Err()
}
if err := srv.initTSMon(); err != nil {
return srv, errors.Annotate(err, "failed to initialize tsmon").Err()
}
if err := srv.initAuthFinish(); err != nil {
return srv, errors.Annotate(err, "failed to finish auth initialization").Err()
}
if err := srv.initOpenTelemetry(); err != nil {
return srv, errors.Annotate(err, "failed to initialize tracing").Err()
}
if err := srv.initErrorReporting(); err != nil {
return srv, errors.Annotate(err, "failed to initialize error reporting").Err()
}
if err := srv.initProfiling(); err != nil {
return srv, errors.Annotate(err, "failed to initialize profiling").Err()
}
if err := srv.initMainPort(); err != nil {
return srv, errors.Annotate(err, "failed to initialize the main port").Err()
}
if err := srv.initGrpcPort(); err != nil {
return srv, errors.Annotate(err, "failed to initialize the gRPC port").Err()
}
if err := srv.initAdminPort(); err != nil {
return srv, errors.Annotate(err, "failed to initialize the admin port").Err()
}
if err := srv.initWarmup(); err != nil {
return srv, errors.Annotate(err, "failed to initialize warmup callbacks").Err()
}
// Sort modules by their initialization order based on declared dependencies,
// discover unfulfilled required dependencies.
sorted, err := resolveDependencies(mods)
if err != nil {
return srv, err
}
// Initialize all modules in their topological order.
impls := make([]*moduleHostImpl, len(sorted))
for i, mod := range sorted {
impls[i] = &moduleHostImpl{srv: srv, mod: mod}
switch ctx, err := mod.Initialize(srv.Context, impls[i], srv.Options.hostOptions()); {
case err != nil:
return srv, errors.Annotate(err, "failed to initialize module %q", mod.Name()).Err()
case ctx != nil:
srv.Context = ctx
}
impls[i].invalid = true // make sure the module does not retain it
}
// Ensure there's only one CookieAuth method registered.
var cookieAuthMod module.Module
for _, impl := range impls {
if impl.cookieAuth != nil {
if cookieAuthMod != nil {
return srv, errors.Annotate(err,
"conflict between %q and %q: both register a cookie auth scheme - pick one",
cookieAuthMod.Name(), impl.mod.Name(),
).Err()
}
cookieAuthMod = impl.mod
srv.CookieAuth = impl.cookieAuth
}
}
// Install the RPC Explorer, using the registered auth method if it is
// compatible.
rpcExpAuth, _ := srv.CookieAuth.(rpcexplorer.AuthMethod)
rpcexplorer.Install(srv.Routes, rpcExpAuth)
return srv, nil
}
// AddPort prepares and binds an additional serving HTTP port.
//
// Can be used to open more listening HTTP ports (in addition to opts.HTTPAddr
// and opts.AdminAddr). The returned Port object can be used to populate the
// router that serves requests hitting the added port.
//
// If opts.ListenAddr is '-', a dummy port will be added: it is a valid *Port
// object, but it is not actually exposed as a listening TCP socket. This is
// useful to disable listening ports without changing any code.
//
// Must be called before Serve (panics otherwise).
func (s *Server) AddPort(opts PortOptions) (*Port, error) {
port := &Port{
Routes: s.newRouter(opts),
parent: s,
opts: opts,
allowH2C: s.Options.AllowH2C,
}
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
if opts.ListenAddr != "-" {
var err error
if port.listener, err = s.createListener(opts.ListenAddr); err != nil {
return nil, errors.Annotate(err, "failed to bind the listening port for %q at %q", opts.Name, opts.ListenAddr).Err()
}
// Add to the list of ports that actually have sockets listening.
s.ports = append(s.ports, port)
}
return port, nil
}
// VirtualHost returns a router (registering it if necessary) used for requests
// that hit the main port (opts.HTTPAddr) and have the given Host header.
//
// Should be used in rare cases when the server is exposed through multiple
// domain names and requests should be routed differently based on what domain
// was used. If your server is serving only one domain name, or you don't care
// what domain name is used to access it, do not use VirtualHost.
//
// Note that requests that match some registered virtual host router won't
// reach the default router (server.Routes), even if the virtual host router
// doesn't have a route for them. Such requests finish with HTTP 404.
//
// Also the router created by VirtualHost is initially completely empty: the
// server and its modules don't install anything into it (there's intentionally
// no mechanism to do this). For that reason VirtualHost should never by used to
// register a router for the "main" domain name: it will make the default
// server.Routes (and all handlers installed there by server modules) useless,
// probably breaking the server. Put routes for the main server functionality
// directly into server.Routes instead, using VirtualHost only for routes that
// critically depend on Host header.
//
// Must be called before Serve (panics otherwise).
func (s *Server) VirtualHost(host string) *router.Router {
return s.mainPort.VirtualHost(host)
}
// createListener creates a TCP listener on the given address.
func (s *Server) createListener(addr string) (net.Listener, error) {
// If not running tests, bind the socket as usual.
if s.Options.testListeners == nil {
return net.Listen("tcp", addr)
}
// In test mode the listener MUST be prepared already.
l := s.Options.testListeners[addr]
if l == nil {
return nil, errors.Reason("test listener is not set").Err()
}
return l, nil
}
// newRouter creates a Router with the default middleware chain and routes.
func (s *Server) newRouter(opts PortOptions) *router.Router {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
// This is a chain of router.Middleware. It is preceded by a chain of raw
// net/http middlewares (see wrapHTTPHandler):
// * s.httpRoot: initializes *incomingRequest in the context.
// * otelhttp.NewHandler: opens a tracing span.
// * s.httpDispatch: finishes the context initialization.
mw := router.NewMiddlewareChain(
middleware.WithPanicCatcher, // transforms panics into HTTP 500
)
if s.tsmon != nil && !opts.DisableMetrics {
mw = mw.Extend(s.tsmon.Middleware) // collect HTTP requests metrics
}
// Setup middleware chain used by ALL requests.
r := router.New()
r.Use(mw)
// Mandatory health check/readiness probe endpoint.
r.GET(healthEndpoint, nil, func(c *router.Context) {
c.Writer.Write([]byte(s.healthResponse(c.Request.Context())))
})
// Add NotFound handler wrapped in our middlewares so that unrecognized
// requests are at least logged. If we don't do that they'll be handled
// completely silently and this is very confusing when debugging 404s.
r.NotFound(nil, func(c *router.Context) {
http.NotFound(c.Writer, c.Request)
})
return r
}
// RunInBackground launches the given callback in a separate goroutine right
// before starting the serving loop.
//
// If the server is already running, launches it right away. If the server
// fails to start, the goroutines will never be launched.
//
// Should be used for background asynchronous activities like reloading configs.
//
// All logs lines emitted by the callback are annotated with "activity" field
// which can be arbitrary, but by convention has format "<namespace>.<name>",
// where "luci" namespace is reserved for internal activities.
//
// The context passed to the callback is canceled when the server is shutting
// down. It is expected the goroutine will exit soon after the context is
// canceled.
func (s *Server) RunInBackground(activity string, f func(context.Context)) {
s.bgrWg.Add(1)
go func() {
defer s.bgrWg.Done()
select {
case <-s.ready:
// Construct the context after the server is fully initialized. Cancel it
// as soon as bgrDone is signaled.
ctx, cancel := context.WithCancel(s.Context)
if activity != "" {
ctx = logging.SetField(ctx, "activity", activity)
}
defer cancel()
go func() {
select {
case <-s.bgrDone:
cancel()
case <-ctx.Done():
}
}()
f(ctx)
case <-s.bgrDone:
// the server is closed, no need to run f() anymore
}
}()
}
// RegisterService is part of grpc.ServiceRegistrar interface.
//
// The registered service will be exposed through both gRPC and pRPC protocols
// on corresponding ports. See Server doc.
//
// Must be called before Serve (panics otherwise).
func (s *Server) RegisterService(desc *grpc.ServiceDesc, impl any) {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
s.prpc.RegisterService(desc, impl)
if s.grpcPort != nil {
s.grpcPort.registerService(desc, impl)
}
}
// RegisterUnaryServerInterceptors registers grpc.UnaryServerInterceptor's
// applied to all unary RPCs that hit the server.
//
// Interceptors are chained in order they are registered, i.e. the first
// registered interceptor becomes the outermost. The initial chain already
// contains some base interceptors (e.g. for monitoring) and all interceptors
// registered by server modules. RegisterUnaryServerInterceptors extends this
// chain. Subsequent calls to RegisterUnaryServerInterceptors adds more
// interceptors into the chain.
//
// Must be called before Serve (panics otherwise).
func (s *Server) RegisterUnaryServerInterceptors(intr ...grpc.UnaryServerInterceptor) {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
s.unaryInterceptors = append(s.unaryInterceptors, intr...)
}
// RegisterStreamServerInterceptors registers grpc.StreamServerInterceptor's
// applied to all streaming RPCs that hit the server.
//
// Interceptors are chained in order they are registered, i.e. the first
// registered interceptor becomes the outermost. The initial chain already
// contains some base interceptors (e.g. for monitoring) and all interceptors
// registered by server modules. RegisterStreamServerInterceptors extends this
// chain. Subsequent calls to RegisterStreamServerInterceptors adds more
// interceptors into the chain.
//
// Must be called before Serve (panics otherwise).
func (s *Server) RegisterStreamServerInterceptors(intr ...grpc.StreamServerInterceptor) {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
s.streamInterceptors = append(s.streamInterceptors, intr...)
}
// RegisterUnifiedServerInterceptors registers given interceptors into both
// unary and stream interceptor chains.
//
// It is just a convenience helper for UnifiedServerInterceptor's that usually
// need to be registered in both unary and stream interceptor chains. This
// method is equivalent to calling RegisterUnaryServerInterceptors and
// RegisterStreamServerInterceptors, passing corresponding flavors of
// interceptors to them.
//
// Must be called before Serve (panics otherwise).
func (s *Server) RegisterUnifiedServerInterceptors(intr ...grpcutil.UnifiedServerInterceptor) {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
for _, cb := range intr {
s.unaryInterceptors = append(s.unaryInterceptors, cb.Unary())
s.streamInterceptors = append(s.streamInterceptors, cb.Stream())
}
}
// ConfigurePRPC allows tweaking pRPC-specific server configuration.
//
// Use it only for changing pRPC-specific options (usually ones that are related
// to HTTP protocol in some way). This method **must not be used** for
// registering interceptors or setting authentication options (changes to them
// done here will cause a panic). Instead use RegisterUnaryServerInterceptors to
// register interceptors or SetRPCAuthMethods to change how the server
// authenticates RPC requests. Changes done through these methods will apply
// to both gRPC and pRPC servers.
//
// Must be called before Serve (panics otherwise).
func (s *Server) ConfigurePRPC(cb func(srv *prpc.Server)) {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
cb(s.prpc)
if s.prpc.UnaryServerInterceptor != nil {
panic("use Server.RegisterUnaryServerInterceptors to register interceptors")
}
}
// SetRPCAuthMethods overrides how the server authenticates incoming gRPC and
// pRPC requests.
//
// It receives a list of auth.Method implementations which will be applied
// one after another to try to authenticate the request until the first
// successful hit. If all methods end up to be non-applicable (i.e. none of the
// methods notice any headers they recognize), the request will be passed
// through to the handler as anonymous (coming from an "anonymous identity").
// Rejecting anonymous requests (if necessary) is the job of an authorization
// layer, often implemented as a gRPC interceptor. For simple cases use
// go.chromium.org/luci/server/auth/rpcacl interceptor.
//
// By default (if SetRPCAuthMethods is never called) the server will check
// incoming requests have an `Authorization` header with a Google OAuth2 access
// token that has `https://www.googleapis.com/auth/userinfo.email` scope (see
// auth.GoogleOAuth2Method). Requests without `Authorization` header will be
// considered anonymous.
//
// If OpenIDRPCAuthEnable option is set (matching `-open-id-rpc-auth-enable`
// flag), the service will recognize ID tokens as well. This is important for
// e.g. Cloud Run where this is the only authentication method supported
// natively by the platform. ID tokens are also generally faster to check than
// access tokens.
//
// Note that this call completely overrides the previously configured list of
// methods instead of appending to it, since chaining auth methods is often
// tricky and it is safer to just always provide the whole list at once.
//
// Passing an empty list of methods is allowed. All requests will be considered
// anonymous in that case.
//
// Note that this call **doesn't affect** how plain HTTP requests (hitting the
// main HTTP port and routed through s.Router) are authenticated. Very often
// RPC requests and plain HTTP requests need different authentication methods
// and using an RPC authentication for everything is incorrect. To authenticate
// plain HTTP requests use auth.Authenticate(...) HTTP router middleware,
// perhaps in combination with s.CookieAuth (which is non-nil if there is a
// server module installed that provides a cookie-based authentication scheme).
//
// Must be called before Serve (panics otherwise).
func (s *Server) SetRPCAuthMethods(methods []auth.Method) {
s.mu.Lock()
defer s.mu.Unlock()
if s.started {
s.Fatal(errors.Reason("the server has already been started").Err())
}
s.rpcAuthMethods = methods
}
// Serve launches the serving loop.
//
// Blocks forever or until the server is stopped via Shutdown (from another
// goroutine or from a SIGTERM handler). Returns nil if the server was shutdown
// correctly or an error if it failed to start or unexpectedly died. The error
// is logged inside.
//
// Should be called only once. Panics otherwise.
func (s *Server) Serve() error {
// Set s.started flag to "lock" the configuration. This would allow to read
// fields like `s.ports` without the fear of a race conditions.
s.mu.Lock()
if s.started {
s.mu.Unlock()
s.Fatal(errors.Reason("the server has already been started").Err())
}
s.started = true
s.mu.Unlock()
// The configuration is "locked" now and we can finish the setup.
authInterceptor := auth.AuthenticatingInterceptor(s.rpcAuthMethods)
// Assemble the final interceptor chains: base interceptors => auth =>
// whatever was installed by users of server.Server. Note we put grpcmon
// before the panic catcher to make sure panics are actually reported to
// the monitoring. grpcmon is also before the authentication to make sure
// auth errors are reported as well.
unaryInterceptors := append([]grpc.UnaryServerInterceptor{
grpcmon.UnaryServerInterceptor,
grpcutil.UnaryServerPanicCatcherInterceptor,
authInterceptor.Unary(),
}, s.unaryInterceptors...)
streamInterceptors := append([]grpc.StreamServerInterceptor{
grpcmon.StreamServerInterceptor,
grpcutil.StreamServerPanicCatcherInterceptor,
authInterceptor.Stream(),
}, s.streamInterceptors...)
// Finish setting the pRPC server. It supports only unary RPCs. The root
// request context is created in the HTTP land using base HTTP middlewares.
s.prpc.UnaryServerInterceptor = grpcutil.ChainUnaryServerInterceptors(unaryInterceptors...)
// Finish setting the gRPC server, if enabled.
if s.grpcPort != nil {
grpcRoot := s.grpcRoot()
grpcDispatch := s.grpcDispatch()
s.grpcPort.addServerOptions(
grpc.ChainUnaryInterceptor(
grpcRoot.Unary(),
otelgrpc.UnaryServerInterceptor(),
grpcDispatch.Unary(),
),
grpc.ChainUnaryInterceptor(unaryInterceptors...),
grpc.ChainStreamInterceptor(
grpcRoot.Stream(),
otelgrpc.StreamServerInterceptor(),
grpcDispatch.Stream(),
),
grpc.ChainStreamInterceptor(streamInterceptors...),
)
}
// Run registered best-effort warmup callbacks right before serving.
s.runWarmup()
// Catch SIGTERM while inside the serving loop. Upon receiving SIGTERM, wait
// until the pod is removed from the load balancer before actually shutting
// down and refusing new connections. If we shutdown immediately, some clients
// may see connection errors, because they are not aware yet the server is
// closing: Pod shutdown sequence and Endpoints list updates are racing with
// each other, we want Endpoints list updates to win, i.e. we want the pod to
// actually be fully alive as long as it is still referenced in Endpoints
// list. We can't guarantee this, but we can improve chances.
stop := signals.HandleInterrupt(func() {
if s.Options.Prod {
s.waitUntilNotServing()
}
s.Shutdown()
})
defer stop()
// Log how long it took from 'New' to the serving loop.
logging.Infof(s.Context, "Startup done in %s", clock.Now(s.Context).Sub(s.startTime))
// Unblock all pending RunInBackground goroutines, so they can start.
close(s.ready)
// Run serving loops in parallel.
errs := make(errors.MultiError, len(s.ports))
wg := sync.WaitGroup{}
wg.Add(len(s.ports))
for i, port := range s.ports {
logging.Infof(s.Context, "Serving %s", port.nameForLog())
i := i
port := port
go func() {
defer wg.Done()
if err := port.serve(func() context.Context { return s.Context }); err != nil {
logging.WithError(err).Errorf(s.Context, "Server %s failed", port.nameForLog())
errs[i] = err
s.Shutdown() // close all other servers
}
}()
}
wg.Wait()
// Per http.Server docs, we end up here *immediately* after Shutdown call was
// initiated. Some requests can still be in-flight. We block until they are
// done (as indicated by Shutdown call itself exiting).
logging.Infof(s.Context, "Waiting for the server to stop...")
<-s.done
logging.Infof(s.Context, "The serving loop stopped, running the final cleanup...")
s.runCleanup()
logging.Infof(s.Context, "The server has stopped")
if errs.First() != nil {
return errs
}
return nil
}
// Shutdown gracefully stops the server if it was running.
//
// Blocks until the server is stopped. Can be called multiple times.
func (s *Server) Shutdown() {
s.mu.Lock()
defer s.mu.Unlock()
if s.stopped {
return
}
logging.Infof(s.Context, "Shutting down the server...")
// Tell all RunInBackground goroutines to stop.
close(s.bgrDone)
// Stop all http.Servers in parallel. Each Shutdown call blocks until the
// corresponding server is stopped.
wg := sync.WaitGroup{}
wg.Add(len(s.ports))
for _, port := range s.ports {
port := port
go func() {
defer wg.Done()
port.shutdown(s.Context)
}()
}
wg.Wait()
// Wait for all background goroutines to stop.
s.bgrWg.Wait()
// Notify Serve that it can exit now.
s.stopped = true
close(s.done)
}
// Fatal logs the error and immediately shuts down the process with exit code 3.
//
// No cleanup is performed. Deferred statements are not run. Not recoverable.
func (s *Server) Fatal(err error) {
errors.Log(s.Context, err)
os.Exit(3)
}
// healthResponse prepares text/plan response for the health check endpoints.
//
// It additionally contains some easy to obtain information that may help in
// debugging deployments.
func (s *Server) healthResponse(c context.Context) string {
maybeEmpty := func(s string) string {
if s == "" {
return "<unknown>"
}
return s
}
return strings.Join([]string{
"OK",
"",
"uptime: " + clock.Now(c).Sub(s.startTime).String(),
"image: " + maybeEmpty(s.Options.ContainerImageID),
"",
"service: " + maybeEmpty(s.Options.TsMonServiceName),
"job: " + maybeEmpty(s.Options.TsMonJobName),
"host: " + s.Options.Hostname,
"",
}, "\n")
}
// waitUntilNotServing is called during the graceful shutdown and it tries to
// figure out when the traffic stops flowing to the server (i.e. when it is
// removed from the load balancer).
//
// It's a heuristic optimization for the case when the load balancer keeps
// sending traffic to a terminating Pod for some time after the Pod entered
// "Terminating" state. It can happen due to latencies in Endpoints list
// updates. We want to keep the listening socket open as long as there are
// incoming requests (but no longer than 1 min).
func (s *Server) waitUntilNotServing() {
logging.Infof(s.Context, "Received SIGTERM, waiting for the traffic to stop...")
// When the server is idle the loop below exits immediately and the server
// enters the shutdown path, rejecting new connections. Since we gave
// Kubernetes no time to update the Endpoints list, it is possible someone
// still might send a request to the server (and it will be rejected).
// To avoid that we always sleep a bit here to give Kubernetes a chance to
// propagate the Endpoints list update everywhere. The loop below then
// verifies clients got the update and stopped sending requests.
time.Sleep(s.Options.ShutdownDelay)
deadline := clock.Now(s.Context).Add(time.Minute)
for {
now := clock.Now(s.Context)
lastReq, ok := s.lastReqTime.Load().(time.Time)
if !ok || now.Sub(lastReq) > 15*time.Second {
logging.Infof(s.Context, "No requests received in the last 15 sec, proceeding with the shutdown...")
break
}
if now.After(deadline) {
logging.Warningf(s.Context, "Gave up waiting for the traffic to stop, proceeding with the shutdown...")
break
}
time.Sleep(100 * time.Millisecond)
}
}
// RegisterWarmup registers a callback that is run in server's Serve right
// before the serving loop.
//
// It receives the global server context (including all customizations made
// by the user code in server.Main). Intended for best-effort warmups: there's
// no way to gracefully abort the server startup from a warmup callback.
//
// Registering a new warmup callback from within a warmup causes a deadlock,
// don't do that.
func (s *Server) RegisterWarmup(cb func(context.Context)) {
s.warmupM.Lock()
defer s.warmupM.Unlock()
s.warmup = append(s.warmup, cb)
}
// runWarmup runs all registered warmup functions (sequentially in registration
// order).
func (s *Server) runWarmup() {
s.warmupM.Lock()
defer s.warmupM.Unlock()
ctx := logging.SetField(s.Context, "activity", "luci.warmup")
for _, cb := range s.warmup {
cb(ctx)
}
}
// RegisterCleanup registers a callback that is run in Serve after the server
// has exited the serving loop.
//
// Registering a new cleanup callback from within a cleanup causes a deadlock,
// don't do that.
func (s *Server) RegisterCleanup(cb func(context.Context)) {
s.cleanupM.Lock()
defer s.cleanupM.Unlock()
s.cleanup = append(s.cleanup, cb)
}
// runCleanup runs all registered cleanup functions (sequentially in reverse
// order).
func (s *Server) runCleanup() {
s.cleanupM.Lock()
defer s.cleanupM.Unlock()
for i := len(s.cleanup) - 1; i >= 0; i-- {
s.cleanup[i](s.Context)
}
}
// genUniqueBlob writes a pseudo-random byte blob into the given slice.
func (s *Server) genUniqueBlob(b []byte) {
s.rndM.Lock()
s.rnd.Read(b)
s.rndM.Unlock()
}
// genUniqueID returns pseudo-random hex string of given even length.
func (s *Server) genUniqueID(l int) string {
b := make([]byte, l/2)
s.genUniqueBlob(b)
return hex.EncodeToString(b)
}
// incomingRequest is a request received by the server.
//
// It is either an HTTP or a gRPC request.
type incomingRequest struct {
url string // the full URL for logs
method string // HTTP method verb for logs, e.g. "POST"
metadata auth.RequestMetadata // headers etc.
healthCheck bool // true if this is a health check request
}
// requestResult is logged after completion of a request.
type requestResult struct {
statusCode int // the HTTP status code to log
requestSize int64 // the request size in bytes if known
responseSize int64 // the response size in bytes if known
extraFields logging.Fields // extra fields to log (will be mutated!)
}
// wrapHTTPHandler wraps port's router into net/http middlewares.
//
// TODO(vadimsh): Get rid of router.Middleware and move this to newRouter(...).
// Since introduction of http.Request.Context() there's no reason for
// router.Middleware to exist anymore.
func (s *Server) wrapHTTPHandler(next http.Handler) http.Handler {
return s.httpRoot(
otelhttp.NewHandler(
s.httpDispatch(next),
"",
otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents),
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
return r.URL.Path
}),
),
)
}
// httpRoot is the entry point for non-gRPC HTTP requests.
//
// It is an http/net middleware for interoperability with other existing
// http/net middlewares (currently only OpenTelemetry otelhttp middleware).
//
// Its job is to initialize *incomingRequest in the context which is then
// examined by other middlewares (and the tracing sampler), in particular in
// httpDispatch.
//
// See grpcRoot(...) for a gRPC counterpart.
func (s *Server) httpRoot(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// This context is derived from s.Context (see Serve) and has various server
// systems injected into it already. Its only difference from s.Context is
// that http.Server cancels it when the client disconnects, which we want.
ctx := r.Context()
// Apply per-request HTTP timeout, if any.
timeout := s.Options.DefaultRequestTimeout
if strings.HasPrefix(r.URL.Path, "/internal/") {
timeout = s.Options.InternalRequestTimeout
}
if timeout != 0 {
var cancelCtx context.CancelFunc
ctx, cancelCtx = context.WithTimeout(ctx, timeout)
defer cancelCtx()
}
// Reconstruct the original URL for logging.
protocol := r.Header.Get("X-Forwarded-Proto")
if protocol != "https" {
protocol = "http"
}
url := fmt.Sprintf("%s://%s%s", protocol, r.Host, r.RequestURI)
// incomingRequest is used by middlewares that work with both HTTP and gRPC
// requests, in particular it is used by startRequest(...).
next.ServeHTTP(rw, r.WithContext(context.WithValue(ctx, &incomingRequestKey, &incomingRequest{
url: url,
method: r.Method,
metadata: auth.RequestMetadataForHTTP(r),
healthCheck: r.RequestURI == healthEndpoint && isHealthCheckerUA(r.UserAgent()),
})))
})
}
// httpDispatch finishes HTTP request context initialization.
//
// Its primary purpose it so setup logging, but it also does some other context
// touches. See startRequest(...) where the bulk of work is happening.
//
// The next stop is the router.Middleware chain as registered in newRouter(...)
// and by the user code.
//
// See grpcDispatch(...) for a gRPC counterpart.
func (s *Server) httpDispatch(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// Track how many response bytes are sent and what status is set, for logs.
trackingRW := iotools.NewResponseWriter(rw)
// Initialize per-request context (logging, GAE tickets, etc).
ctx, done := s.startRequest(r.Context())
// Log the result when done.
defer func() {
done(&requestResult{
statusCode: trackingRW.Status(),
requestSize: r.ContentLength,
responseSize: trackingRW.ResponseSize(),
})
}()
next.ServeHTTP(trackingRW, r.WithContext(ctx))
})
}
// grpcRoot is the entry point for gRPC requests.
//
// Its job is to initialize *incomingRequest in the context which is then
// examined by other middlewares (and the tracing sampler), in particular in
// grpcDispatch.
//
// See httpRoot(...) for a HTTP counterpart.
func (s *Server) grpcRoot() grpcutil.UnifiedServerInterceptor {
return func(ctx context.Context, fullMethod string, handler func(ctx context.Context) error) (err error) {
// incomingRequest is used by middlewares that work with both HTTP and gRPC
// requests, in particular it is used by startRequest(...).
//
// Note that here `ctx` is already derived from s.Context (except it is
// canceled if the client disconnects). See grpcPort{} implementation.
md := auth.RequestMetadataForGRPC(ctx)
return handler(context.WithValue(ctx, &incomingRequestKey, &incomingRequest{
url: fmt.Sprintf("grpc://%s%s", md.Host(), fullMethod),
method: "POST",
metadata: md,
healthCheck: strings.HasPrefix(fullMethod, "/grpc.health.") && isHealthCheckerUA(md.Header("User-Agent")),
}))
}
}
// grpcDispatch finishes gRPC request context initialization.
//
// Its primary purpose it so setup logging, but it also does some other context
// touches. See startRequest(...) where the bulk of work is happening.
//
// The next stop is the gRPC middleware chain as registered via server's API.
//
// See httpDispatch(...) for a HTTP counterpart.
func (s *Server) grpcDispatch() grpcutil.UnifiedServerInterceptor {
return func(ctx context.Context, fullMethod string, handler func(ctx context.Context) error) (err error) {
// Initialize per-request context (logging, GAE tickets, etc).
ctx, done := s.startRequest(ctx)
// Log the result when done.
defer func() {
code := status.Code(err)
httpStatusCode := grpcutil.CodeStatus(code)
// Log errors (for parity with pRPC server behavior).
switch {
case httpStatusCode >= 400 && httpStatusCode < 500:
logging.Warningf(ctx, "%s", err)
case httpStatusCode >= 500:
logging.Errorf(ctx, "%s", err)
}
// Report canonical GRPC code as a log entry field for filtering by it.
canonical, ok := codepb.Code_name[int32(code)]
if !ok {
canonical = fmt.Sprintf("%d", int64(code))
}
done(&requestResult{
statusCode: httpStatusCode, // this is an approximation
extraFields: logging.Fields{"code": canonical},
})
}()
return handler(ctx)
}
}
// startRequest finishes preparing the per-request context.
//
// It returns a callback that must be called after finishing processing this
// request.
//
// The incoming context is assumed to be derived by either httpRoot(...) or
// grpcRoot(...) and have *incomingRequest inside.
func (s *Server) startRequest(ctx context.Context) (context.Context, func(*requestResult)) {
// The value *must* be there. Let it panic if it is not.
req := ctx.Value(&incomingRequestKey).(*incomingRequest)
// If running on GAE, initialize the per-request API tickets needed to make
// RPCs to the GAE service bridge.
if s.Options.Serverless == module.GAE {
ctx = gae.WithTickets(ctx, gae.RequestTickets(req.metadata))
}
// This is used in waitUntilNotServing.
started := clock.Now(ctx)
if !req.healthCheck {
s.lastReqTime.Store(started)
}
// If the tracing is completely disabled we'll have an empty span context.
// But we need a trace ID in the context anyway for correlating logs (see
// below). Open a noop non-recording span with random generated trace ID.
span := oteltrace.SpanFromContext(ctx)
spanCtx := span.SpanContext()
if !spanCtx.HasTraceID() {
var traceID oteltrace.TraceID
s.genUniqueBlob(traceID[:])
spanCtx = oteltrace.NewSpanContext(oteltrace.SpanContextConfig{
TraceID: traceID,
})
ctx = oteltrace.ContextWithSpanContext(ctx, spanCtx)
}
// Associate all logs with one another by using the same trace ID, which also
// matches the trace ID extracted by the propagator from incoming headers.
// Make sure to use the full trace ID format that includes the project name.
// This is important to group logs generated by us with logs generated by
// the GCP (which uses the full trace ID) when running in Cloud. Outside of
// Cloud it doesn't really matter what trace ID is used as long as all log
// entries use the same one.
traceID := spanCtx.TraceID().String()
if s.Options.CloudProject != "" {
traceID = fmt.Sprintf("projects/%s/traces/%s", s.Options.CloudProject, traceID)
}
// SpanID can be missing if there's no actual tracing. This is fine.
spanID := ""
if spanCtx.HasSpanID() {
spanID = spanCtx.SpanID().String()
}
// When running in prod, make the logger emit log entries in JSON format that
// Cloud Logger collectors understand natively.
var severityTracker *sdlogger.SeverityTracker
if s.Options.Prod {
// Start assembling logging sink layers starting with the innermost one.
logSink := s.stdout
// If we are going to log the overall request status, install the tracker
// that observes the maximum emitted severity to use it as an overall
// severity for the request log entry.
if s.logRequestCB != nil {
severityTracker = &sdlogger.SeverityTracker{Out: logSink}
logSink = severityTracker
}
// If have Cloud Error Reporting enabled, intercept errors to upload them.
// TODO(vadimsh): Fill in `CloudErrorsSink.Request` with something.
if s.errRptClient != nil {
logSink = &sdlogger.CloudErrorsSink{
Client: s.errRptClient,
Out: logSink,
}
}
// Associate log entries with the tracing span where they were emitted.
annotateWithSpan := func(ctx context.Context, e *sdlogger.LogEntry) {
if spanID := oteltrace.SpanContextFromContext(ctx).SpanID(); spanID.IsValid() {
e.SpanID = spanID.String()
}
}
// Finally install all this into the request context.
ctx = logging.SetFactory(ctx, sdlogger.Factory(logSink, sdlogger.LogEntry{
TraceID: traceID,
Operation: &sdlogger.Operation{ID: s.genUniqueID(32)},
}, annotateWithSpan))
}
// Do final context touches.
ctx = caching.WithRequestCache(ctx)
// This will be called once the request is fully processed.
return ctx, func(res *requestResult) {
now := clock.Now(ctx)
latency := now.Sub(started)
if req.healthCheck {
// Do not log fast health check calls AT ALL, they just spam logs.
if latency < healthTimeLogThreshold {
return
}
// Emit a warning if the health check is slow, this likely indicates
// high CPU load.
logging.Warningf(ctx, "Health check is slow: %s > %s", latency, healthTimeLogThreshold)
}
// If there's no need to emit the overall request log entry, we are done.
// See initLogging(...) for where this is decided.
if s.logRequestCB == nil {
return
}
// When running behind Envoy, log its request IDs to simplify debugging.
extraFields := res.extraFields
if xrid := req.metadata.Header("X-Request-Id"); xrid != "" {
if extraFields == nil {
extraFields = make(logging.Fields, 1)
}
extraFields["requestId"] = xrid
}
// If we were tracking the overall severity, collect the outcome.
severity := sdlogger.InfoSeverity
if severityTracker != nil {
severity = severityTracker.MaxSeverity()
}
// Log the final outcome of the processed request.
s.logRequestCB(ctx, &sdlogger.LogEntry{
Severity: severity,
Timestamp: sdlogger.ToTimestamp(now),
TraceID: traceID,
TraceSampled: span.IsRecording(),
SpanID: spanID, // the top-level span ID if present
Fields: extraFields,
RequestInfo: &sdlogger.RequestInfo{
Method: req.method,
URL: req.url,
Status: res.statusCode,
RequestSize: fmt.Sprintf("%d", res.requestSize),
ResponseSize: fmt.Sprintf("%d", res.responseSize),
UserAgent: req.metadata.Header("User-Agent"),
RemoteIP: endUserIP(req.metadata),
Latency: fmt.Sprintf("%fs", latency.Seconds()),
},
})
}
}
// initLogging initializes the server logging.
//
// Called very early during server startup process. Many server fields may not
// be initialized yet, be careful.
//
// When running in production uses the ugly looking JSON format that is hard to
// read by humans but which is parsed by google-fluentd and GCP serverless
// hosting environment.
//
// To support per-request log grouping in Cloud Logging UI there must be
// two different log streams:
// - A stream with top-level HTTP request entries (conceptually like Apache's
// access.log, i.e. with one log entry per request).
// - A stream with logs produced within requests (correlated with HTTP request
// logs via the trace ID field).
//
// Both streams are expected to have a particular format and use particular
// fields for Cloud Logging UI to display them correctly. This technique is
// primarily intended for GAE Flex, but it works in many Google environments:
// https://cloud.google.com/appengine/articles/logging#linking_app_logs_and_requests
//
// On GKE we use 'stderr' stream for top-level HTTP request entries and 'stdout'
// stream for logs produced by requests.
//
// On GAE and Cloud Run, the stream with top-level HTTP request entries is
// produced by the GCP runtime itself. So we emit only logs produced within
// requests (also to 'stdout', just like on GKE).
//
// In all environments 'stderr' stream is used to log all global activities that
// happens outside of any request handler (stuff like initialization, shutdown,
// background goroutines, etc).
//
// In non-production mode we use the human-friendly format and a single 'stderr'
// log stream for everything.
func (s *Server) initLogging() {
if !s.Options.Prod {
s.Context = gologger.StdConfig.Use(s.Context)
s.Context = logging.SetLevel(s.Context, logging.Debug)
s.logRequestCB = func(ctx context.Context, entry *sdlogger.LogEntry) {
logging.Infof(ctx, "%d %s %q (%s)",
entry.RequestInfo.Status,
entry.RequestInfo.Method,
entry.RequestInfo.URL,
entry.RequestInfo.Latency,
)
}
return
}
if s.Options.testStdout != nil {
s.stdout = s.Options.testStdout
} else {
s.stdout = &sdlogger.Sink{Out: os.Stdout}
}
if s.Options.testStderr != nil {
s.stderr = s.Options.testStderr
} else {
s.stderr = &sdlogger.Sink{Out: os.Stderr}
}
s.Context = logging.SetFactory(s.Context,
sdlogger.Factory(s.stderr, sdlogger.LogEntry{
Operation: &sdlogger.Operation{
ID: s.genUniqueID(32), // correlate all global server logs together
},
}, nil),
)
s.Context = logging.SetLevel(s.Context, logging.Debug)
// Skip writing the root request log entry on Serverless GCP since the load
// balancer there writes the entry itself.
switch s.Options.Serverless {
case module.GAE:
// Skip. GAE writes it to "appengine.googleapis.com/request_log" itself.
case module.CloudRun:
// Skip. Cloud Run writes it to "run.googleapis.com/requests" itself.
default:
// Emit to stderr where Cloud Logging collectors pick it up.
s.logRequestCB = func(_ context.Context, entry *sdlogger.LogEntry) { s.stderr.Write(entry) }
}
}
// initAuthStart initializes the core auth system by preparing the context
// and verifying auth tokens can actually be minted (i.e. supplied credentials
// are valid).
//
// It is called before the tsmon monitoring is initialized: tsmon needs auth.
// The rest of the auth initialization (the part that needs tsmon) happens in
// initAuthFinish after tsmon is initialized.
func (s *Server) initAuthStart() error {
// Make a transport that appends information about the server as User-Agent.
ua := s.Options.userAgent()
rootTransport := clientauth.NewModifyingTransport(http.DefaultTransport, func(req *http.Request) error {
newUA := ua
if cur := req.UserAgent(); cur != "" {
newUA += " " + cur
}
req.Header.Set("User-Agent", newUA)
return nil
})
// Initialize the token generator based on s.Options.ClientAuth.
opts := s.Options.ClientAuth
// Use `rootTransport` for calls made by the token generator (e.g. when
// refreshing tokens).
opts.Transport = rootTransport
// We aren't going to use the authenticator's transport (and thus its
// monitoring), only the token source. DisableMonitoring == true removes some
// log spam.
opts.DisableMonitoring = true
// GCP is very aggressive in caching the token internally (in the metadata
// server) and refreshing it only when it is very close to its expiration. We
// need to match this behavior in our in-process cache, otherwise
// GetAccessToken complains that the token refresh procedure doesn't actually
// change the token (because the metadata server returned the cached one).
opts.MinTokenLifetime = 20 * time.Second
// The default value for ClientAuth.SecretsDir is usually hardcoded to point
// to where the token cache is located on developer machines (~/.config/...).
// This location often doesn't exist when running from inside a container.
// The token cache is also not really needed for production services that use
// service accounts (they don't need cached refresh tokens). So in production
// mode totally ignore default ClientAuth.SecretsDir and use whatever was
// passed as -token-cache-dir. If it is empty (default), then no on-disk token
// cache is used at all.
//
// If -token-cache-dir was explicitly set, always use it (even in dev mode).
// This is useful when running containers locally: developer's credentials
// on the host machine can be mounted inside the container.
if s.Options.Prod || s.Options.TokenCacheDir != "" {
opts.SecretsDir = s.Options.TokenCacheDir
}
// Annotate the context used for logging from the token generator.
ctx := logging.SetField(s.Context, "activity", "luci.auth")
tokens := clientauth.NewTokenGenerator(ctx, opts)
// Prepare partially initialized structs for the auth.Config. They will be
// fully initialized in initAuthFinish once we have a sufficiently working
// auth context that can call Cloud IAM.
s.signer = &signerImpl{srv: s}
s.actorTokens = &actorTokensImpl{}
// Either use the explicitly passed AuthDB provider or the one initialized
// by initAuthDB.
provider := s.Options.AuthDBProvider
if provider == nil {
provider = func(context.Context) (authdb.DB, error) {
db, _ := s.authDB.Load().(authdb.DB) // refreshed asynchronously in refreshAuthDB
return db, nil
}
}
// Initialize the state in the context.
s.Context = auth.Initialize(s.Context, &auth.Config{
DBProvider: provider,
Signer: s.signer,
AccessTokenProvider: func(ctx context.Context, scopes []string) (*oauth2.Token, error) {
return tokens.GenerateOAuthToken(ctx, scopes, 0)
},
IDTokenProvider: func(ctx context.Context, audience string) (*oauth2.Token, error) {
return tokens.GenerateIDToken(ctx, audience, 0)
},
ActorTokensProvider: s.actorTokens,
AnonymousTransport: func(context.Context) http.RoundTripper { return rootTransport },
FrontendClientID: func(context.Context) (string, error) { return s.Options.FrontendClientID, nil },
EndUserIP: endUserIP,
IsDevMode: !s.Options.Prod,
})
// Note: we initialize a token source for one arbitrary set of scopes here. In
// many practical cases this is sufficient to verify that credentials are
// valid. For example, when we use service account JSON key, if we can
// generate a token with *some* scope (meaning Cloud accepted our signature),
// we can generate tokens with *any* scope, since there's no restrictions on
// what scopes are accessible to a service account, as long as the private key
// is valid (which we just verified by generating some token).
_, err := tokens.GenerateOAuthToken(ctx, auth.CloudOAuthScopes, 0)
if err != nil {
// ErrLoginRequired may happen only when running the server locally using
// developer's credentials. Let them know how the problem can be fixed.
if !s.Options.Prod && err == clientauth.ErrLoginRequired {
scopes := fmt.Sprintf("-scopes %q", strings.Join(auth.CloudOAuthScopes, " "))
if opts.ActAsServiceAccount != "" && opts.ActViaLUCIRealm == "" {
scopes = "-scopes-iam"
}
logging.Errorf(s.Context, "Looks like you run the server locally and it doesn't have credentials for some OAuth scopes")
logging.Errorf(s.Context, "Run the following command to set them up: ")
logging.Errorf(s.Context, " $ luci-auth login %s", scopes)
}
return errors.Annotate(err, "failed to initialize the token source").Err()
}
// Report who we are running as. Useful when debugging access issues.
switch email, err := tokens.GetEmail(); {
case err == nil:
logging.Infof(s.Context, "Running as %s", email)
s.runningAs = email
case err == clientauth.ErrNoEmail:
logging.Warningf(s.Context, "Running as <unknown>, cautiously proceeding...")
case err != nil:
return errors.Annotate(err, "failed to check the service account email").Err()
}
return nil
}
// initAuthFinish finishes auth system initialization.
//
// It is called after tsmon is initialized.
func (s *Server) initAuthFinish() error {
// We should be able to make basic authenticated requests now and can
// construct a token source used by server's own guts to call Cloud APIs,
// such us Cloud Trace and Cloud Error Reporting (and others).
var err error
s.cloudTS, err = auth.GetTokenSource(s.Context, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return errors.Annotate(err, "failed to initialize the cloud token source").Err()
}
// Finish constructing `signer` and `actorTokens` that were waiting for
// an IAM client.
iamClient, err := credentials.NewIamCredentialsClient(
s.Context,
option.WithTokenSource(s.cloudTS),
option.WithGRPCDialOption(grpc.WithStatsHandler(&grpcmon.ClientRPCStatsMonitor{})),
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor())),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor())),
)
if err != nil {
return errors.Annotate(err, "failed to construct IAM client").Err()
}
s.RegisterCleanup(func(ctx context.Context) { iamClient.Close() })
s.signer.iamClient = iamClient
s.actorTokens.iamClient = iamClient
// If not using a custom AuthDB provider, initialize the standard one that
// fetches AuthDB (a database with groups and auth config) from a central
// place. This also starts a goroutine to periodically refresh it.
if s.Options.AuthDBProvider == nil {
if err := s.initAuthDB(); err != nil {
return errors.Annotate(err, "failed to initialize AuthDB").Err()
}
}
// Default RPC authentication methods. See also SetRPCAuthMethods.
s.rpcAuthMethods = make([]auth.Method, 0, 2)
if s.Options.OpenIDRPCAuthEnable {
// The preferred authentication method.
s.rpcAuthMethods = append(s.rpcAuthMethods, &openid.GoogleIDTokenAuthMethod{
AudienceCheck: openid.AudienceMatchesHost,
Audience: s.Options.OpenIDRPCAuthAudience,
SkipNonJWT: true, // pass OAuth2 access tokens through
})
}
// Backward compatibility for the RPC Explorer and old clients.
s.rpcAuthMethods = append(s.rpcAuthMethods, &auth.GoogleOAuth2Method{
Scopes: []string{clientauth.OAuthScopeEmail},
})
return nil
}
// initAuthDB interprets -auth-db-* flags and sets up fetching of AuthDB.
func (s *Server) initAuthDB() error {
// Check flags are compatible.
switch {
case s.Options.AuthDBPath != "" && s.Options.AuthServiceHost != "":
return errors.Reason("-auth-db-path and -auth-service-host can't be used together").Err()
case s.Options.AuthServiceHost == "" && (s.Options.AuthDBDump != "" || s.Options.AuthDBSigner != ""):
return errors.Reason("-auth-db-dump and -auth-db-signer can be used only with -auth-service-host").Err()
case s.Options.AuthDBDump != "" && !strings.HasPrefix(s.Options.AuthDBDump, "gs://"):
return errors.Reason("-auth-db-dump value should start with gs://, got %q", s.Options.AuthDBDump).Err()
case strings.Contains(s.Options.AuthServiceHost, "/"):
return errors.Reason("-auth-service-host should be a plain hostname, got %q", s.Options.AuthServiceHost).Err()
}
// Fill in defaults.
if s.Options.AuthServiceHost != "" {
if s.Options.AuthDBDump == "" {
s.Options.AuthDBDump = fmt.Sprintf("gs://%s/auth-db", s.Options.AuthServiceHost)
}
if s.Options.AuthDBSigner == "" {
if !strings.HasSuffix(s.Options.AuthServiceHost, ".appspot.com") {
return errors.Reason("-auth-db-signer is required if -auth-service-host is not *.appspot.com").Err()
}
s.Options.AuthDBSigner = fmt.Sprintf("%s@appspot.gserviceaccount.com",
strings.TrimSuffix(s.Options.AuthServiceHost, ".appspot.com"))
}
}
// Fetch the initial copy of AuthDB. Note that this happens before we start
// the serving loop, to make sure incoming requests have some AuthDB to use.
if err := s.refreshAuthDB(s.Context); err != nil {
return errors.Annotate(err, "failed to load the initial AuthDB version").Err()
}
// Periodically refresh it in the background.
s.RunInBackground("luci.authdb", func(c context.Context) {
for {
jitter := time.Duration(rand.Int63n(int64(10 * time.Second)))
if r := <-clock.After(c, 30*time.Second+jitter); r.Err != nil {
return // the context is canceled
}
if err := s.refreshAuthDB(c); err != nil {
// Don't log the error if the server is shutting down.
if !errors.Is(err, context.Canceled) {
logging.WithError(err).Errorf(c, "Failed to reload AuthDB, using the cached one")
}
}
}
})
return nil
}
// refreshAuthDB reloads AuthDB from the source and stores it in memory.
func (s *Server) refreshAuthDB(c context.Context) error {
cur, _ := s.authDB.Load().(authdb.DB)
db, err := s.fetchAuthDB(c, cur)
if err != nil {
return err
}
s.authDB.Store(db)
return nil
}
// fetchAuthDB fetches the most recent copy of AuthDB from the external source.
//
// Used only if Options.AuthDBProvider is nil.
//
// 'cur' is the currently used AuthDB or nil if fetching it for the first time.
// Returns 'cur' as is if it's already fresh.
func (s *Server) fetchAuthDB(c context.Context, cur authdb.DB) (authdb.DB, error) {
// Loading from a local file (useful in integration tests).
if s.Options.AuthDBPath != "" {
r, err := os.Open(s.Options.AuthDBPath)
if err != nil {
return nil, errors.Annotate(err, "failed to open AuthDB file").Err()
}
defer r.Close()
db, err := authdb.SnapshotDBFromTextProto(r)
if err != nil {
return nil, errors.Annotate(err, "failed to load AuthDB file").Err()
}
return db, nil
}
// Loading from a GCS dump (s.Options.AuthDB* are validated here already).
if s.Options.AuthDBDump != "" {
c, cancel := clock.WithTimeout(c, 5*time.Minute)
defer cancel()
fetcher := dump.Fetcher{
StorageDumpPath: s.Options.AuthDBDump[len("gs://"):],
AuthServiceURL: "https://" + s.Options.AuthServiceHost,
AuthServiceAccount: s.Options.AuthDBSigner,
OAuthScopes: auth.CloudOAuthScopes,
}
curSnap, _ := cur.(*authdb.SnapshotDB)
snap, err := fetcher.FetchAuthDB(c, curSnap)
if err != nil {
return nil, errors.Annotate(err, "fetching from GCS dump failed").Err()
}
return snap, nil
}
// In dev mode default to "allow everything".
if !s.Options.Prod {
return authdb.DevServerDB{}, nil
}
// In prod mode default to "fail on any non-trivial check". Some services may
// not need to use AuthDB at all and configuring it for them is a hassle. If
// they try to use it for something vital, they'll see the error.
return authdb.UnconfiguredDB{
Error: errors.Reason("a source of AuthDB is not configured, see -auth-* server flags").Err(),
}, nil
}
// initTSMon initializes time series monitoring state.
func (s *Server) initTSMon() error {
// We keep tsmon always enabled (flushing to /dev/null if no -ts-mon-* flags
// are set) so that tsmon's in-process store is populated, and metrics there
// can be examined via /admin/tsmon. This is useful when developing/debugging
// tsmon metrics.
var customMonitor monitor.Monitor
if s.Options.TsMonAccount == "" || s.Options.TsMonServiceName == "" || s.Options.TsMonJobName == "" {
logging.Infof(s.Context, "tsmon is in the debug mode: metrics are collected, but flushed to /dev/null (pass -ts-mon-* flags to start uploading metrics)")
customMonitor = monitor.NewNilMonitor()
}
interval := int(s.Options.TsMonFlushInterval.Seconds())
if interval == 0 {
interval = int(defaultTsMonFlushInterval.Seconds())
}
timeout := int(s.Options.TsMonFlushTimeout.Seconds())
if timeout == 0 {
timeout = int(defaultTsMonFlushTimeout.Seconds())
}
if timeout >= interval {
return errors.Reason("-ts-mon-flush-timeout (%ds) must be shorter than -ts-mon-flush-interval (%ds)", timeout, interval).Err()
}
s.tsmon = &tsmon.State{
CustomMonitor: customMonitor,
Settings: &tsmon.Settings{
Enabled: true,
ProdXAccount: s.Options.TsMonAccount,
FlushIntervalSec: interval,
FlushTimeoutSec: timeout,
ReportRuntimeStats: true,
},
Target: func(c context.Context) target.Task {
// TODO(vadimsh): We pretend to be a GAE app for now to be able to
// reuse existing dashboards. Each pod pretends to be a separate GAE
// version. That way we can stop worrying about TaskNumAllocator and just
// use 0 (since there'll be only one task per "version"). This looks
// chaotic for deployments with large number of pods.
return target.Task{
DataCenter: "appengine",
ServiceName: s.Options.TsMonServiceName,
JobName: s.Options.TsMonJobName,
HostName: s.Options.Hostname,
}
},
}
if customMonitor != nil {
tsmon.PortalPage.SetReadOnlySettings(s.tsmon.Settings,
"Running in the debug mode. Pass all -ts-mon-* command line flags to start uploading metrics.")
} else {
tsmon.PortalPage.SetReadOnlySettings(s.tsmon.Settings,
"Settings are controlled through -ts-mon-* command line flags.")
}
// Enable this configuration in s.Context so all transports created during
// the server startup have tsmon instrumentation.
s.tsmon.Activate(s.Context)
// Report our image version as a metric, useful to monitor rollouts.
tsmoncommon.RegisterCallbackIn(s.Context, func(ctx context.Context) {
versionMetric.Set(ctx, s.Options.ImageVersion())
})
// Periodically flush metrics.
s.RunInBackground("luci.tsmon", s.tsmon.FlushPeriodically)
return nil
}
// otelResource returns an OTEL resource identifying this server instance.
//
// It is just a bunch of labels essentially reported to monitoring backends
// together with traces.
func (s *Server) otelResource(ctx context.Context) (*resource.Resource, error) {
return resource.New(
ctx,
resource.WithTelemetrySDK(),
resource.WithDetectors(gcp.NewDetector()),
resource.WithAttributes(
semconv.ServiceName(fmt.Sprintf("%s/%s", s.Options.TsMonServiceName, s.Options.TsMonJobName)),
semconv.ServiceInstanceID(s.Options.Hostname),
semconv.ContainerImageName(s.Options.ImageName()),
semconv.ContainerImageTag(s.Options.ImageVersion()),
),
)
}
// otelErrorHandler returns a top-level OTEL error catcher.
//
// It just logs errors (with some dedupping to avoid spam).
func (s *Server) otelErrorHandler(ctx context.Context) otel.ErrorHandlerFunc {
// State for suppressing repeated ResourceExhausted error messages, otherwise
// logs may get flooded with them. They are usually not super important, but
// ignoring them completely is also not great.
errorDedup := struct {
lock sync.Mutex
report time.Time
count int
}{}
return func(err error) {
if !strings.Contains(err.Error(), "ResourceExhausted") {
logging.Warningf(ctx, "Error in Cloud Trace exporter: %s", err)
return
}
errorDedup.lock.Lock()
defer errorDedup.lock.Unlock()
errorDedup.count++
if errorDedup.report.IsZero() || time.Since(errorDedup.report) > 5*time.Minute {
if errorDedup.report.IsZero() {
logging.Warningf(ctx, "Error in Cloud Trace exporter: %s", err)
} else {
logging.Warningf(ctx, "Error in Cloud Trace exporter: %s (%d occurrences in %s since the last report)", err, errorDedup.count, time.Since(errorDedup.report))
}
errorDedup.report = time.Now()
errorDedup.count = 0
}
}
}
// otelSampler prepares a sampler based on CLI flags and environment.
func (s *Server) otelSampler(ctx context.Context) (trace.Sampler, error) {
// On GCP Serverless let the GCP load balancer make decisions about
// sampling. If it decides to sample a trace, it will let us know through
// options of the parent span in X-Cloud-Trace-Context. We will collect only
// traces from requests that GCP wants to sample itself. Traces without
// a parent context are never sampled. This also means traces from random
// background goroutines aren't sampled either (i.e. we don't need GateSampler
// as used below).
if s.Options.Serverless.IsGCP() {
logging.Infof(ctx, "Setting up Cloud Trace exports to %q using GCP Serverless sampling strategy", s.Options.CloudProject)
return trace.ParentBased(trace.NeverSample()), nil
}
// Parse -trace-sampling spec to get the base sampler.
sampling := s.Options.TraceSampling
if sampling == "" {
sampling = "0.1qps"
}
logging.Infof(ctx, "Setting up Cloud Trace exports to %q (%s)", s.Options.CloudProject, sampling)
sampler, err := internal.BaseSampler(sampling)
if err != nil {
return nil, errors.Annotate(err, "bad -trace-sampling").Err()
}
// Sample only if the context is an incoming request context. This is needed
// to avoid various background goroutines spamming with top-level spans. This
// usually happens if a library is oblivious of tracing, but uses an
// instrumented HTTP or gRPC client it got from outside, and the passes
// context.Background() (or some unrelated context) to it. The end result is
// lots and lots of non-informative disconnected top-level spans.
//
// Also skip sampling health check requests, they end up being spammy as well.
sampler = internal.GateSampler(sampler, func(ctx context.Context) bool {
req, _ := ctx.Value(&incomingRequestKey).(*incomingRequest)
return req != nil && !req.healthCheck
})
// Inherit the sampling decision from a parent span. Note this totally ignores
// `sampler` if there's a parent span (local or remote). This is usually what
// we want to get complete trace trees with well-defined root and no gaps.
return trace.ParentBased(sampler), nil
}
// otelSpanExporter initializes a trace spans exporter.
func (s *Server) otelSpanExporter(ctx context.Context) (trace.SpanExporter, error) {
return texporter.New(
texporter.WithContext(ctx),
texporter.WithProjectID(s.Options.CloudProject),
texporter.WithTraceClientOptions([]option.ClientOption{
option.WithTokenSource(s.cloudTS),
}),
)
}
// initOpenTelemetry initializes OpenTelemetry to export to Cloud Trace.
func (s *Server) initOpenTelemetry() error {
// Initialize a transformer that knows how to extract span info from the
// context and serialize it as a bunch of headers and vice-versa. It is
// invoked by otelhttp and otelgrpc middleware and when creating instrumented
// HTTP clients. Recognize X-Cloud-Trace-Context for compatibility with traces
// created by GCLB.
//
// It is used to parse incoming headers even when tracing is disabled, so
// initialize it unconditionally, just don't install as a global propagator.
s.propagator = propagation.NewCompositeTextMapPropagator(
gcppropagator.CloudTraceOneWayPropagator{},
propagation.TraceContext{},
)
// If tracing is disabled, initialize OTEL with noop providers that just
// silently discard everything. Otherwise OTEL leaks memory, see
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/5190
if !s.Options.shouldEnableTracing() {
otel.SetTracerProvider(oteltracenoop.TracerProvider{})
otel.SetMeterProvider(otelmetricnoop.MeterProvider{})
return nil
}
// Annotate logs from OpenTelemetry so they can be filtered in Cloud Logging.
ctx := logging.SetField(s.Context, "activity", "luci.trace")
// TODO(vadimsh): Install OpenTelemetry global logger using otel.SetLogger().
// This will require implementing a hefty logr.LogSink interface on top of
// the LUCI logger. Not doing that results in garbled stderr when OTEL wants
// to log something (unclear when it happens exactly, if at all).
res, err := s.otelResource(ctx)
if err != nil {
return errors.Annotate(err, "failed to init OpenTelemetry resource").Err()
}
sampler, err := s.otelSampler(ctx)
if err != nil {
return errors.Annotate(err, "failed to init OpenTelemetry sampler").Err()
}
exp, err := s.otelSpanExporter(ctx)
if err != nil {
return errors.Annotate(err, "failed to init OpenTelemetry span exporter").Err()
}
tp := trace.NewTracerProvider(
trace.WithResource(res),
trace.WithSampler(sampler),
trace.WithBatcher(exp,
trace.WithMaxQueueSize(8192), // how much to buffer before dropping
trace.WithBatchTimeout(30*time.Second), // how long to buffer before flushing
trace.WithExportTimeout(time.Minute), // deadline for the export RPC call
trace.WithMaxExportBatchSize(2048), // size of a single RPC
),
)
s.RegisterCleanup(func(ctx context.Context) {
ctx = logging.SetField(ctx, "activity", "luci.trace")
if err := tp.ForceFlush(ctx); err != nil {
logging.Errorf(ctx, "Final trace flush failed: %s", err)
}
if err := tp.Shutdown(ctx); err != nil {
logging.Errorf(ctx, "Error shutting down TracerProvider: %s", err)
}
})
// Register all globals to make them be used by default.
otel.SetErrorHandler(s.otelErrorHandler(ctx))
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(s.propagator)
// We don't use OTEL metrics. Set the noop provider to avoid leaking memory.
// See https://github.com/open-telemetry/opentelemetry-go-contrib/issues/5190.
otel.SetMeterProvider(otelmetricnoop.MeterProvider{})
return nil
}
// initProfiling initialized Cloud Profiler.
func (s *Server) initProfiling() error {
// Skip if not enough configuration is given.
switch {
case !s.Options.Prod:
return nil // silently skip, no need for log spam in dev mode
case s.Options.CloudProject == "":
logging.Infof(s.Context, "Cloud Profiler is disabled: -cloud-project is not set")
return nil
case s.Options.ProfilingServiceID == "" && s.Options.TsMonJobName == "":
logging.Infof(s.Context, "Cloud Profiler is disabled: neither -profiling-service-id nor -ts-mon-job-name are set")
return nil
}
// Enable profiler based on a given probability. Low probabilities are useful
// to avoid hitting Cloud Profiler quotas when running services with many
// replicas. Profiles are aggregated anyway, for large enough number of
// servers it doesn't matter if only a random subset of them is sampled.
sample := rand.Float64()
if sample < s.Options.ProfilingProbability {
if s.Options.ProfilingProbability >= 1.0 {
logging.Infof(s.Context, "Cloud Profiler is enabled")
} else {
logging.Infof(s.Context,
"Cloud Profiler is enabled: rand %.2f < profiling-probability %.2f",
sample, s.Options.ProfilingProbability)
}
} else {
if s.Options.ProfilingProbability <= 0 {
logging.Infof(s.Context, "Cloud Profiler is disabled")
} else {
logging.Infof(s.Context,
"Cloud Profiler is disabled: rand %.2f >= profiling-probability %.2f",
sample, s.Options.ProfilingProbability)
}
return nil
}
cfg := profiler.Config{
ProjectID: s.Options.CloudProject,
Service: s.getServiceID(),
ServiceVersion: s.Options.ImageVersion(),
Instance: s.Options.Hostname,
// Note: these two options may potentially have impact on performance, but
// it is likely small enough not to bother.
MutexProfiling: true,
AllocForceGC: true,
}
// Launch the agent that runs in the background and periodically collects and
// uploads profiles. It fails to launch if Service or ServiceVersion do not
// pass regexp validation. Make it non-fatal, but still log.
if err := profiler.Start(cfg, option.WithTokenSource(s.cloudTS)); err != nil {
logging.Errorf(s.Context, "Cloud Profiler is disabled: failed do start - %s", err)
return nil
}
logging.Infof(s.Context, "Set up Cloud Profiler (service %q, version %q)", cfg.Service, cfg.ServiceVersion)
return nil
}
// getServiceID get the service id from either ProfilingServiceID or TsMonJobName.
func (s *Server) getServiceID() string {
// Prefer ProfilingServiceID if given, fall back to TsMonJobName. Replace
// forbidden '/' symbol.
serviceID := s.Options.ProfilingServiceID
if serviceID == "" {
serviceID = s.Options.TsMonJobName
}
serviceID = strings.ReplaceAll(serviceID, "/", "-")
return serviceID
}
// initMainPort initializes the server on options.HTTPAddr port.
func (s *Server) initMainPort() error {
var err error
s.mainPort, err = s.AddPort(PortOptions{
Name: "main",
ListenAddr: s.Options.HTTPAddr,
})
if err != nil {
return err
}
s.Routes = s.mainPort.Routes
// Install auth info handlers (under "/auth/api/v1/server/").
auth.InstallHandlers(s.Routes, nil)
// Prepare the pRPC server. Its configuration will be finished in Serve after
// all interceptors and authentication methods are registered.
s.prpc = &prpc.Server{
// Allow compression when not running on GAE. On GAE compression for text
// responses is done by GAE itself and doing it in our code would be
// wasteful.
EnableResponseCompression: s.Options.Serverless != module.GAE,
}
discovery.Enable(s.prpc)
s.prpc.InstallHandlers(s.Routes, nil)
return nil
}
// initGrpcPort initializes the listening gRPC port.
func (s *Server) initGrpcPort() error {
if s.Options.GRPCAddr == "" || s.Options.GRPCAddr == "-" {
return nil // the gRPC port is disabled
}
listener, err := s.createListener(s.Options.GRPCAddr)
if err != nil {
return errors.Annotate(err, `failed to bind the listening port for "grpc" at %q`, s.Options.GRPCAddr).Err()
}
s.grpcPort = &grpcPort{listener: listener}
s.ports = append(s.ports, s.grpcPort)
return nil
}
// initAdminPort initializes the server on options.AdminAddr port.
func (s *Server) initAdminPort() error {
if s.Options.AdminAddr == "-" {
return nil // the admin port is disabled
}
// Admin portal uses XSRF tokens that require a secret key. We generate this
// key randomly during process startup (i.e. now). It means XSRF tokens in
// admin HTML pages rendered by a server process are understood only by the
// exact same process. This is OK for admin pages (they are not behind load
// balancers and we don't care that a server restart invalidates all tokens).
secret := make([]byte, 20)
if _, err := cryptorand.Read(secret); err != nil {
return err
}
store := secrets.NewDerivedStore(secrets.Secret{Active: secret})
withAdminSecret := router.NewMiddlewareChain(func(c *router.Context, next router.Handler) {
c.Request = c.Request.WithContext(secrets.Use(c.Request.Context(), store))
next(c)
})
// Install endpoints accessible through the admin port only.
adminPort, err := s.AddPort(PortOptions{
Name: "admin",
ListenAddr: s.Options.AdminAddr,
DisableMetrics: true, // do not pollute HTTP metrics with admin-only routes
})
if err != nil {
return err
}
routes := adminPort.Routes
routes.GET("/", nil, func(c *router.Context) {
http.Redirect(c.Writer, c.Request, "/admin/portal", http.StatusFound)
})
portal.InstallHandlers(routes, withAdminSecret, portal.AssumeTrustedPort)
// Install pprof endpoints on the admin port. Note that they must not be
// exposed via the main serving port, since they do no authentication and
// may leak internal information. Also note that pprof handlers rely on
// routing structure not supported by our router, so we do a bit of manual
// routing.
//
// See also internal/pprof.go for more profiling goodies exposed through the
// admin portal.
routes.GET("/debug/pprof/*path", nil, func(c *router.Context) {
switch strings.TrimPrefix(c.Params.ByName("path"), "/") {
case "cmdline":
pprof.Cmdline(c.Writer, c.Request)
case "profile":
pprof.Profile(c.Writer, c.Request)
case "symbol":
pprof.Symbol(c.Writer, c.Request)
case "trace":
pprof.Trace(c.Writer, c.Request)
default:
pprof.Index(c.Writer, c.Request)
}
})
return nil
}
// initErrorReporting initializes an Error Report client.
func (s *Server) initErrorReporting() error {
if !s.Options.CloudErrorReporting || s.Options.CloudProject == "" {
return nil
}
// Get token source to call Error Reporting API.
var err error
s.errRptClient, err = errorreporting.NewClient(s.Context, s.Options.CloudProject, errorreporting.Config{
ServiceName: s.getServiceID(),
ServiceVersion: s.Options.ImageVersion(),
OnError: func(err error) {
// TODO(crbug/1204640): s/Warningf/Errorf once "Error Reporting" is itself
// more reliable.
logging.Warningf(s.Context, "Error Reporting could not log error: %s", err)
},
}, option.WithTokenSource(s.cloudTS))
if err != nil {
return err
}
s.RegisterCleanup(func(ctx context.Context) { s.errRptClient.Close() })
return nil
}
// initWarmup schedules execution of global warmup callbacks.
//
// On GAE also registers /_ah/warmup route.
func (s *Server) initWarmup() error {
// See https://cloud.google.com/appengine/docs/standard/go/configuring-warmup-requests.
// All warmups should happen *before* the serving loop and /_ah/warmup should
// just always return OK.
if s.Options.Serverless == module.GAE {
s.Routes.GET("/_ah/warmup", nil, func(*router.Context) {})
}
s.RegisterWarmup(func(ctx context.Context) { warmup.Warmup(ctx) })
return nil
}
// signerImpl implements signing.Signer on top of *Server.
type signerImpl struct {
srv *Server
iamClient *credentials.IamCredentialsClient
}
// SignBytes signs the blob with some active private key.
func (s *signerImpl) SignBytes(ctx context.Context, blob []byte) (keyName string, signature []byte, err error) {
resp, err := s.iamClient.SignBlob(ctx, &credentialspb.SignBlobRequest{
Name: "projects/-/serviceAccounts/" + s.srv.runningAs,
Payload: blob,
})
if err != nil {
return "", nil, grpcutil.WrapIfTransient(err)
}
return resp.KeyId, resp.SignedBlob, nil
}
// Certificates returns a bundle with public certificates for all active keys.
func (s *signerImpl) Certificates(ctx context.Context) (*signing.PublicCertificates, error) {
return signing.FetchCertificatesForServiceAccount(ctx, s.srv.runningAs)
}
// ServiceInfo returns information about the current service.
func (s *signerImpl) ServiceInfo(ctx context.Context) (*signing.ServiceInfo, error) {
return &signing.ServiceInfo{
AppID: s.srv.Options.CloudProject,
AppRuntime: "go",
AppRuntimeVersion: runtime.Version(),
AppVersion: s.srv.Options.ImageVersion(),
ServiceAccountName: s.srv.runningAs,
}, nil
}
// actorTokensImpl implements auth.ActorTokensProvider using IAM Credentials.
type actorTokensImpl struct {
iamClient *credentials.IamCredentialsClient
}
// GenerateAccessToken generates an access token for the given account.
func (a *actorTokensImpl) GenerateAccessToken(ctx context.Context, serviceAccount string, scopes, delegates []string) (*oauth2.Token, error) {
resp, err := a.iamClient.GenerateAccessToken(ctx, &credentialspb.GenerateAccessTokenRequest{
Name: "projects/-/serviceAccounts/" + serviceAccount,
Scope: scopes,
Delegates: delegatesList(delegates),
})
if err != nil {
return nil, grpcutil.WrapIfTransient(err)
}
return &oauth2.Token{
AccessToken: resp.AccessToken,
TokenType: "Bearer",
Expiry: resp.ExpireTime.AsTime(),
}, nil
}
// GenerateIDToken generates an ID token for the given account.
func (a *actorTokensImpl) GenerateIDToken(ctx context.Context, serviceAccount, audience string, delegates []string) (string, error) {
resp, err := a.iamClient.GenerateIdToken(ctx, &credentialspb.GenerateIdTokenRequest{
Name: "projects/-/serviceAccounts/" + serviceAccount,
Audience: audience,
Delegates: delegatesList(delegates),
IncludeEmail: true,
})
if err != nil {
return "", grpcutil.WrapIfTransient(err)
}
return resp.Token, nil
}
// delegatesList prepends `projects/-/serviceAccounts/` to emails.
func delegatesList(emails []string) []string {
if len(emails) == 0 {
return nil
}
out := make([]string, len(emails))
for i, email := range emails {
out[i] = "projects/-/serviceAccounts/" + email
}
return out
}
// networkAddrsForLog returns a string with IPv4 addresses of local network
// interfaces, if possible.
func networkAddrsForLog() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return fmt.Sprintf("failed to enumerate network interfaces: %s", err)
}
var ips []string
for _, address := range addrs {
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipv4 := ipnet.IP.To4(); ipv4 != nil {
ips = append(ips, ipv4.String())
}
}
}
if len(ips) == 0 {
return "<no IPv4 interfaces>"
}
return strings.Join(ips, ", ")
}
// endUserIP extracts end-user IP address from X-Forwarded-For header.
func endUserIP(r auth.RequestMetadata) string {
// X-Forwarded-For header is set by Cloud Load Balancer and GCP Serverless
// load balancer and has format:
// [<untrusted part>,]<IP that connected to LB>,<unimportant>[,<more>].
//
// <untrusted part> may be present if the original request from the Internet
// comes with X-Forwarded-For header. We can't trust IPs specified there. We
// assume GCP load balancers sanitize the format of this field though.
//
// <IP that connected to LB> is what we are after.
//
// <unimportant> is "global forwarding rule external IP" for GKE or
// the constant "169.254.1.1" for GCP Serverless. We don't care about these.
//
// <more> is present only if we proxy the request through more layers of
// load balancers *while it is already inside GKE cluster*. We assume we don't
// do that (if we ever do, Options{...} should be extended with a setting that
// specifies how many layers of load balancers to skip to get to the original
// IP). On GCP Serverless <more> is always empty.
//
// See https://cloud.google.com/load-balancing/docs/https for more info.
forwardedFor := strings.Split(r.Header("X-Forwarded-For"), ",")
if len(forwardedFor) >= 2 {
return strings.TrimSpace(forwardedFor[len(forwardedFor)-2])
}
// Fallback to the peer IP if X-Forwarded-For is not set. Happens when
// connecting to the server's port directly from within the cluster.
ip, _, err := net.SplitHostPort(r.RemoteAddr())
if err != nil {
return "0.0.0.0"
}
return ip
}
// isHealthCheckerUA returns true for known user agents of health probers.
func isHealthCheckerUA(ua string) bool {
switch {
case strings.HasPrefix(ua, "kube-probe/"): // Kubernetes
return true
case strings.HasPrefix(ua, "GoogleHC"): // Cloud Load Balancer
return true
default:
return false
}
}
// resolveDependencies sorts modules based on their dependencies.
//
// Discovers unfulfilled required dependencies.
func resolveDependencies(mods []module.Module) ([]module.Module, error) {
// Build a map: module.Name => module.Module
modules := make(map[module.Name]module.Module, len(mods))
for _, m := range mods {
if _, ok := modules[m.Name()]; ok {
return nil, errors.Reason("duplicate module %q", m.Name()).Err()
}
modules[m.Name()] = m
}
// Ensure all required dependencies exist, throw away missing optional
// dependencies. The result is a directed graph that can be topo-sorted.
graph := map[module.Name][]module.Name{}
for _, m := range mods {
for _, d := range m.Dependencies() {
name := d.Dependency()
if _, exists := modules[name]; !exists {
if !d.Required() {
continue
}
return nil, errors.Reason("module %q requires module %q which is not provided", m.Name(), name).Err()
}
graph[m.Name()] = append(graph[m.Name()], name)
}
}
sorted := make([]module.Module, 0, len(graph))
visited := make(map[module.Name]bool, len(graph))
var visit func(n module.Name)
visit = func(n module.Name) {
if !visited[n] {
visited[n] = true
for _, dep := range graph[n] {
visit(dep)
}
sorted = append(sorted, modules[n])
}
}
for _, m := range mods {
visit(m.Name())
}
return sorted, nil
}