blob: ae47daf4cb90de1d957583dece0dd0d60b22cf66 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tq
import (
"context"
"flag"
"strings"
"go.chromium.org/luci/common/errors"
luciflag "go.chromium.org/luci/common/flag"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/server/module"
"go.chromium.org/luci/server/tq/internal/db"
"go.chromium.org/luci/server/tq/tqtesting"
)
// ModuleName can be used to refer to this module when declaring dependencies.
var ModuleName = module.RegisterName("go.chromium.org/luci/server/tq")
// ModuleOptions contain configuration of the TQ server module.
//
// It will be used to initialize Default dispatcher.
type ModuleOptions struct {
// Dispatcher is a dispatcher to use.
//
// Default is the global Default instance.
Dispatcher *Dispatcher
// CloudProject is ID of a project to use to construct full queue names.
//
// Default is the project the server is running in.
CloudProject string
// CloudRegion is a ID of a region to use to construct full queue names.
//
// Default is the region the server is running in.
CloudRegion string
// Namespace is a namespace for tasks that use DeduplicationKey.
//
// This is needed if two otherwise independent deployments share a single
// Cloud Tasks instance.
//
// Default is "".
Namespace string
// DefaultTargetHost is a hostname to dispatch Cloud Tasks to by default.
//
// Individual task classes may override it with their own specific host.
//
// On GAE defaults to the GAE application itself. Elsewhere has no default:
// if the dispatcher can't figure out where to send the task, the task
// submission fails.
DefaultTargetHost string
// PushAs is a service account email to be used for generating OIDC tokens.
//
// The service account must be within the same project. The server account
// must have "iam.serviceAccounts.actAs" permission for `PushAs` account.
//
// Default is the server's own account.
PushAs string
// AuthorizedPushers is a list of service account emails to accept pushes from
// in addition to PushAs.
//
// This is handy when migrating from one PushAs account to another, or when
// submitting tasks from one service, but handing them in another.
//
// Optional.
AuthorizedPushers []string
// ServingPrefix is a URL path prefix to serve registered task handlers from.
//
// POSTs to a URL under this prefix (regardless which one) will be treated
// as Cloud Tasks pushes.
//
// Must start with "/internal/". Default is "/internal/tasks". If set to
// literal "-", no routes will be registered at all.
ServingPrefix string
// SweepMode defines how to perform sweeps of the transaction tasks reminders.
//
// This process is necessary to make sure all transactionally submitted tasks
// eventually execute, even if Cloud Tasks RPCs fail. When enqueueing a task
// the client transactionally commits a special "reminder" record, which
// indicates an intent to submit a Cloud Task. If the subsequent Cloud Tasks
// RPC fails (or the process crashes before attempting it), the reminder
// record is discovered by the sweep process and used to ensure the task is
// eventually submitted.
//
// There are two stages: the sweep initiation and the actual processing.
//
// The initiation should happen periodically and centrally: no mater how many
// replicas of the process are running, there needs to be only one sweep
// initiator. But it doesn't have to be the same process each time. Also
// multiple concurrent initiations are not catastrophic, though they impose
// huge overhead and should be avoided.
//
// Two ways to do sweep initiations are:
// * Based on a periodic external signal such as a Cloud Scheduler job or
// GAE cron handler. See SweepInitiationEndpoint and
// SweepInitiationLaunchers.
// * Based on a timer inside some *single* primary process. For example
// on Kubernetes this may be a single pod Deployment, or a zero-indexed
// replica in a StatefulSet. See Sweep().
//
// Once the initiation happens, there are two ways to process the sweep (and
// this is what SweepMode defines):
// * "inproc" - do all the processing right inside the replica that
// performed the initiation. This has scalability and reliability limits,
// but it doesn't require any additional infrastructure setup and has
// somewhat better observability.
// * "distributed" - use Cloud Tasks itself to distribute the work across
// many replicas. This requires some configuration. See SweepTaskQueue,
// SweepTaskPrefix and SweepTargetHost.
//
// Default is "distributed" mode.
SweepMode string
// SweepInitiationEndpoint is a URL path that can be hit to initiate a sweep.
//
// GET requests to this endpoint (if they have proper authentication headers)
// will initiate sweeps. If SweepMode is "inproc" the sweep will happen in
// the same process that handled the request.
//
// On GAE default is "/internal/tasks/c/sweep". On non-GAE it is "-", meaning
// the endpoint is not exposed. When not using the endpoint there should be
// some single process somewhere that calls Sweep() to periodically initiate
// sweeps.
SweepInitiationEndpoint string
// SweepInitiationLaunchers is a list of service account emails authorized to
// launch sweeps via SweepInitiationEndpoint.
//
// Additionally on GAE the Appengine service itself is always authorized to
// launch sweeps via cron or task queues.
//
// Default is the server's own account.
SweepInitiationLaunchers []string
// SweepTaskQueue is a Cloud Tasks queue name to use to distribute sweep
// subtasks when running in "distributed" SweepMode.
//
// Can be in short or full form. See Queue in TaskClass for details. The queue
// should be configured to allow at least 10 QPS.
//
// Default is "tq-sweep".
SweepTaskQueue string
// SweepTaskPrefix is a URL prefix to use for sweep subtasks when running
// in "distributed" SweepMode.
//
// There should be a Dispatcher instance somewhere that is configured to
// receive such tasks (via non-default ServingPrefix). This is useful if
// you want to limit what processes process the sweeps.
//
// Must start with "/internal/". If unset defaults to the value of
// ServingPrefix.
SweepTaskPrefix string
// SweepTargetHost is a hostname to dispatch sweep subtasks to when running
// in "distributed" SweepMode.
//
// This usually should be DefaultTargetHost, but it may be different if you
// want to route sweep subtasks somewhere else.
//
// If unset defaults to the value of DefaultTargetHost.
SweepTargetHost string
// SweepShards defines how many subtasks are submitted when initiating
// a sweep.
//
// It is safe to change it any time. Default is 16.
SweepShards int
}
// Register registers the command line flags.
//
// Mutates `o` by populating defaults.
func (o *ModuleOptions) Register(f *flag.FlagSet) {
f.StringVar(&o.CloudProject, "tq-cloud-project", o.CloudProject,
`Cloud Project to use to construct full queue names, default is the same as -cloud-project.`)
f.StringVar(&o.CloudRegion, "tq-cloud-region", o.CloudRegion,
`Cloud Region to use to construct full queue names, default is the same as -cloud-region.`)
f.StringVar(&o.Namespace, "tq-namespace", o.Namespace,
`Namespace for tasks that use deduplication keys (optional).`)
f.StringVar(&o.DefaultTargetHost, "tq-default-target-host", o.DefaultTargetHost,
`Hostname to dispatch Cloud Tasks to by default.`)
f.StringVar(&o.PushAs, "tq-push-as", o.PushAs,
`Service account email to be used for generating OIDC tokens. `+
`Default is server's own account.`)
f.Var(luciflag.StringSlice(&o.AuthorizedPushers), "tq-authorized-pusher",
`Service account email to accept pushes from (in addition to -tq-push-as). May be repeated.`)
if o.ServingPrefix == "" {
o.ServingPrefix = "/internal/tasks"
}
f.StringVar(&o.ServingPrefix, "tq-serving-prefix", o.ServingPrefix,
`URL prefix to serve registered task handlers from, must start with '/internal/'. Set to '-' to disable serving.`)
if o.SweepMode == "" {
o.SweepMode = "distributed"
}
f.StringVar(&o.SweepMode, "tq-sweep-mode", o.SweepMode,
`How to do sweeps of transactional task reminders: either "distributed" or "inproc".`)
f.StringVar(&o.SweepInitiationEndpoint, "tq-sweep-initiation-endpoint", o.SweepInitiationEndpoint,
`URL path of an endpoint that launches sweeps.`)
f.Var(luciflag.StringSlice(&o.SweepInitiationLaunchers), "tq-sweep-initiation-launcher",
`Service account email allowed to hit -tq-sweep-initiation-endpoint. May be repeated.`)
if o.SweepTaskQueue == "" {
o.SweepTaskQueue = "tq-sweep"
}
f.StringVar(&o.SweepTaskQueue, "tq-sweep-task-queue", o.SweepTaskQueue,
`A queue name to use to distribute sweep subtasks`)
f.StringVar(&o.SweepTaskPrefix, "tq-sweep-task-prefix", o.SweepTaskPrefix,
`URL prefix to use for sweep subtasks, must start with '/internal/'. Defaults to -tq-serving-prefix.`)
f.StringVar(&o.SweepTargetHost, "tq-sweep-target-host", o.SweepTargetHost,
`Hostname to dispatch sweep subtasks to. Defaults to -tq-default-target-host.`)
if o.SweepShards == 0 {
o.SweepShards = 16
}
f.IntVar(&o.SweepShards, "tq-sweep-shards", o.SweepShards,
`How many subtasks are submitted when initiating a sweep.`)
}
// NewModule returns a server module that sets up a TQ dispatcher.
func NewModule(opts *ModuleOptions) module.Module {
if opts == nil {
opts = &ModuleOptions{}
}
return &tqModule{opts: opts}
}
// NewModuleFromFlags is a variant of NewModule that initializes options through
// command line flags.
//
// Calling this function registers flags in flag.CommandLine. They are usually
// parsed in server.Main(...).
func NewModuleFromFlags() module.Module {
opts := &ModuleOptions{}
opts.Register(flag.CommandLine)
return NewModule(opts)
}
// tqModule implements module.Module.
type tqModule struct {
opts *ModuleOptions
}
// Name is part of module.Module interface.
func (*tqModule) Name() module.Name {
return ModuleName
}
// Dependencies is part of module.Module interface.
func (*tqModule) Dependencies() []module.Dependency {
var deps []module.Dependency
db.VisitImpls(func(db *db.Impl) {
deps = append(deps, module.RequiredDependency(db.Module))
})
return deps
}
// Initialize is part of module.Module interface.
func (m *tqModule) Initialize(ctx context.Context, host module.Host, opts module.HostOptions) (context.Context, error) {
if m.opts.Dispatcher == nil {
m.opts.Dispatcher = &Default
}
submitter, err := m.initDispatching(ctx, host, opts)
if err != nil {
return nil, err
}
if err := m.initSweeping(ctx, host, opts); err != nil {
return nil, err
}
return UseSubmitter(ctx, submitter), nil
}
func (m *tqModule) initDispatching(ctx context.Context, host module.Host, opts module.HostOptions) (Submitter, error) {
disp := m.opts.Dispatcher
disp.GAE = opts.GAE
disp.NoAuth = !opts.Prod
disp.DefaultTargetHost = m.opts.DefaultTargetHost
disp.AuthorizedPushers = m.opts.AuthorizedPushers
disp.CloudProject = m.opts.CloudProject
if disp.CloudProject == "" {
disp.CloudProject = opts.CloudProject
}
disp.CloudRegion = m.opts.CloudRegion
if disp.CloudRegion == "" {
disp.CloudRegion = opts.CloudRegion
}
if err := ValidateNamespace(m.opts.Namespace); err != nil {
return nil, errors.Annotate(err, "bad TQ namespace %q", m.opts.Namespace).Err()
}
disp.Namespace = m.opts.Namespace
if m.opts.PushAs != "" {
disp.PushAs = m.opts.PushAs
} else {
info, err := auth.GetSigner(ctx).ServiceInfo(ctx)
if err != nil {
return nil, errors.Annotate(err, "failed to get own service account email").Err()
}
disp.PushAs = info.ServiceAccountName
}
var submitter Submitter
if opts.Prod {
// When running for real use real services.
creds, err := auth.GetPerRPCCredentials(ctx, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return nil, errors.Annotate(err, "failed to get PerRPCCredentials").Err()
}
cloudSub, err := NewCloudSubmitter(ctx, creds)
if err != nil {
return nil, err
}
host.RegisterCleanup(func(ctx context.Context) { cloudSub.Close() })
submitter = cloudSub
} else {
// When running locally use a simple in-memory scheduler, but go through
// HTTP layer to pick up logging, middlewares, etc.
scheduler := &tqtesting.Scheduler{
Executor: &tqtesting.LoopbackHTTPExecutor{
Handler: host.Routes(),
},
}
host.RunInBackground("luci.tq", func(ctx context.Context) {
scheduler.Run(ctx, tqtesting.ParallelExecute())
})
Default.NoAuth = true
submitter = scheduler
}
if m.opts.ServingPrefix != "-" {
logging.Infof(ctx, "TQ is serving tasks from %q", m.opts.ServingPrefix)
if !strings.HasPrefix(m.opts.ServingPrefix, "/internal/") {
return nil, errors.Reason(`-tq-serving-prefix must start with "/internal/", got %q`, m.opts.ServingPrefix).Err()
}
disp.InstallTasksRoutes(host.Routes(), m.opts.ServingPrefix)
}
return submitter, nil
}
func (m *tqModule) initSweeping(ctx context.Context, host module.Host, opts module.HostOptions) error {
// Fill in defaults.
if m.opts.SweepInitiationEndpoint == "" {
if opts.GAE || !opts.Prod {
m.opts.SweepInitiationEndpoint = "/internal/tasks/c/sweep"
} else {
m.opts.SweepInitiationEndpoint = "-"
}
}
if len(m.opts.SweepInitiationLaunchers) == 0 {
info, err := auth.GetSigner(ctx).ServiceInfo(ctx)
if err != nil {
return errors.Annotate(err, "failed to get own service account email").Err()
}
m.opts.SweepInitiationLaunchers = []string{info.ServiceAccountName}
}
if m.opts.SweepTaskPrefix == "" {
if m.opts.ServingPrefix != "-" {
m.opts.SweepTaskPrefix = m.opts.ServingPrefix
} else {
m.opts.SweepTaskPrefix = "/internal/tasks"
}
}
if !strings.HasPrefix(m.opts.SweepTaskPrefix, "/internal/") {
return errors.Reason(`-tq-sweep-task-prefix must start with "/internal/", got %q`, m.opts.SweepTaskPrefix).Err()
}
if m.opts.SweepTargetHost == "" {
m.opts.SweepTargetHost = m.opts.DefaultTargetHost // may be "" on GAE
}
disp := m.opts.Dispatcher
// Setup the sweep processing.
switch m.opts.SweepMode {
case "distributed":
logging.Infof(ctx, "TQ sweep task queue is %q", m.opts.SweepTaskQueue)
disp.Sweeper = NewDistributedSweeper(disp, DistributedSweeperOptions{
SweepShards: m.opts.SweepShards,
TasksPerScan: 2048, // TODO: make configurable if necessary
SecondaryScanShards: 16, // TODO: make configurable if necessary
LessorID: "", // TODO: make configurable if necessary
TaskQueue: m.opts.SweepTaskQueue,
TaskPrefix: m.opts.SweepTaskPrefix,
TaskHost: m.opts.SweepTargetHost,
})
case "inproc":
logging.Infof(ctx, "TQ is using inproc sweeper")
disp.Sweeper = NewInProcSweeper(InProcSweeperOptions{
SweepShards: m.opts.SweepShards,
TasksPerScan: 2048, // TODO: make configurable if necessary
SecondaryScanShards: 16, // TODO: make configurable if necessary
SubmitBatchSize: 128, // TODO: make configurable if necessary
SubmitConcurrentBatches: 32, // TODO: make configurable if necessary
})
default:
return errors.Reason(`invalid -sweep-mode %q, must be either "distributed" or "inproc"`, m.opts.SweepMode).Err()
}
// Setup the sweep initiation.
if m.opts.SweepInitiationEndpoint != "-" {
logging.Infof(ctx, "TQ sweep initiation endpoint is %q", m.opts.SweepInitiationEndpoint)
disp.InstallSweepRoute(host.Routes(), m.opts.SweepInitiationEndpoint)
}
return nil
}