// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tq

import (
	"context"
	"flag"
	"strings"

	"go.chromium.org/luci/common/errors"
	luciflag "go.chromium.org/luci/common/flag"
	"go.chromium.org/luci/common/logging"

	"go.chromium.org/luci/server/auth"
	"go.chromium.org/luci/server/module"

	"go.chromium.org/luci/server/tq/internal/db"
	"go.chromium.org/luci/server/tq/tqtesting"
)

// ModuleName can be used to refer to this module when declaring dependencies.
var ModuleName = module.RegisterName("go.chromium.org/luci/server/tq")

// ModuleOptions contain configuration of the TQ server module.
//
// It will be used to initialize Default dispatcher.
type ModuleOptions struct {
	// Dispatcher is a dispatcher to use.
	//
	// Default is the global Default instance.
	Dispatcher *Dispatcher

	// CloudProject is ID of a project to use to construct full queue names.
	//
	// Default is the project the server is running in.
	CloudProject string

	// CloudRegion is a ID of a region to use to construct full queue names.
	//
	// Default is the region the server is running in.
	CloudRegion string

	// Namespace is a namespace for tasks that use DeduplicationKey.
	//
	// This is needed if two otherwise independent deployments share a single
	// Cloud Tasks instance.
	//
	// Default is "".
	Namespace string

	// DefaultTargetHost is a hostname to dispatch Cloud Tasks to by default.
	//
	// Individual task classes may override it with their own specific host.
	//
	// On GAE defaults to the GAE application itself. Elsewhere has no default:
	// if the dispatcher can't figure out where to send the task, the task
	// submission fails.
	DefaultTargetHost string

	// PushAs is a service account email to be used for generating OIDC tokens.
	//
	// The service account must be within the same project. The server account
	// must have "iam.serviceAccounts.actAs" permission for `PushAs` account.
	//
	// Default is the server's own account.
	PushAs string

	// AuthorizedPushers is a list of service account emails to accept pushes from
	// in addition to PushAs.
	//
	// This is handy when migrating from one PushAs account to another, or when
	// submitting tasks from one service, but handing them in another.
	//
	// Optional.
	AuthorizedPushers []string

	// ServingPrefix is a URL path prefix to serve registered task handlers from.
	//
	// POSTs to a URL under this prefix (regardless which one) will be treated
	// as Cloud Tasks pushes.
	//
	// Must start with "/internal/". Default is "/internal/tasks". If set to
	// literal "-", no routes will be registered at all.
	ServingPrefix string

	// SweepMode defines how to perform sweeps of the transaction tasks reminders.
	//
	// This process is necessary to make sure all transactionally submitted tasks
	// eventually execute, even if Cloud Tasks RPCs fail. When enqueueing a task
	// the client transactionally commits a special "reminder" record, which
	// indicates an intent to submit a Cloud Task. If the subsequent Cloud Tasks
	// RPC fails (or the process crashes before attempting it), the reminder
	// record is discovered by the sweep process and used to ensure the task is
	// eventually submitted.
	//
	// There are two stages: the sweep initiation and the actual processing.
	//
	// The initiation should happen periodically and centrally: no mater how many
	// replicas of the process are running, there needs to be only one sweep
	// initiator. But it doesn't have to be the same process each time. Also
	// multiple concurrent initiations are not catastrophic, though they impose
	// huge overhead and should be avoided.
	//
	// Two ways to do sweep initiations are:
	//   * Based on a periodic external signal such as a Cloud Scheduler job or
	//     GAE cron handler. See SweepInitiationEndpoint and
	//     SweepInitiationLaunchers.
	//   * Based on a timer inside some *single* primary process. For example
	//     on Kubernetes this may be a single pod Deployment, or a zero-indexed
	//     replica in a StatefulSet. See Sweep().
	//
	// Once the initiation happens, there are two ways to process the sweep (and
	// this is what SweepMode defines):
	//   * "inproc" - do all the processing right inside the replica that
	//     performed the initiation. This has scalability and reliability limits,
	//     but it doesn't require any additional infrastructure setup and has
	//     somewhat better observability.
	//   * "distributed" - use Cloud Tasks itself to distribute the work across
	//     many replicas. This requires some configuration. See SweepTaskQueue,
	//     SweepTaskPrefix and SweepTargetHost.
	//
	// Default is "distributed" mode.
	SweepMode string

	// SweepInitiationEndpoint is a URL path that can be hit to initiate a sweep.
	//
	// GET requests to this endpoint (if they have proper authentication headers)
	// will initiate sweeps. If SweepMode is "inproc" the sweep will happen in
	// the same process that handled the request.
	//
	// On GAE default is "/internal/tasks/c/sweep". On non-GAE it is "-", meaning
	// the endpoint is not exposed. When not using the endpoint there should be
	// some single process somewhere that calls Sweep() to periodically initiate
	// sweeps.
	SweepInitiationEndpoint string

	// SweepInitiationLaunchers is a list of service account emails authorized to
	// launch sweeps via SweepInitiationEndpoint.
	//
	// Additionally on GAE the Appengine service itself is always authorized to
	// launch sweeps via cron or task queues.
	//
	// Default is the server's own account.
	SweepInitiationLaunchers []string

	// SweepTaskQueue is a Cloud Tasks queue name to use to distribute sweep
	// subtasks when running in "distributed" SweepMode.
	//
	// Can be in short or full form. See Queue in TaskClass for details. The queue
	// should be configured to allow at least 10 QPS.
	//
	// Default is "tq-sweep".
	SweepTaskQueue string

	// SweepTaskPrefix is a URL prefix to use for sweep subtasks when running
	// in "distributed" SweepMode.
	//
	// There should be a Dispatcher instance somewhere that is configured to
	// receive such tasks (via non-default ServingPrefix). This is useful if
	// you want to limit what processes process the sweeps.
	//
	// Must start with "/internal/". If unset defaults to the value of
	// ServingPrefix.
	SweepTaskPrefix string

	// SweepTargetHost is a hostname to dispatch sweep subtasks to when running
	// in "distributed" SweepMode.
	//
	// This usually should be DefaultTargetHost, but it may be different if you
	// want to route sweep subtasks somewhere else.
	//
	// If unset defaults to the value of DefaultTargetHost.
	SweepTargetHost string

	// SweepShards defines how many subtasks are submitted when initiating
	// a sweep.
	//
	// It is safe to change it any time. Default is 16.
	SweepShards int
}

// Register registers the command line flags.
//
// Mutates `o` by populating defaults.
func (o *ModuleOptions) Register(f *flag.FlagSet) {
	f.StringVar(&o.CloudProject, "tq-cloud-project", o.CloudProject,
		`Cloud Project to use to construct full queue names, default is the same as -cloud-project.`)

	f.StringVar(&o.CloudRegion, "tq-cloud-region", o.CloudRegion,
		`Cloud Region to use to construct full queue names, default is the same as -cloud-region.`)

	f.StringVar(&o.Namespace, "tq-namespace", o.Namespace,
		`Namespace for tasks that use deduplication keys (optional).`)

	f.StringVar(&o.DefaultTargetHost, "tq-default-target-host", o.DefaultTargetHost,
		`Hostname to dispatch Cloud Tasks to by default.`)

	f.StringVar(&o.PushAs, "tq-push-as", o.PushAs,
		`Service account email to be used for generating OIDC tokens. `+
			`Default is server's own account.`)

	f.Var(luciflag.StringSlice(&o.AuthorizedPushers), "tq-authorized-pusher",
		`Service account email to accept pushes from (in addition to -tq-push-as). May be repeated.`)

	if o.ServingPrefix == "" {
		o.ServingPrefix = "/internal/tasks"
	}
	f.StringVar(&o.ServingPrefix, "tq-serving-prefix", o.ServingPrefix,
		`URL prefix to serve registered task handlers from, must start with '/internal/'. Set to '-' to disable serving.`)

	if o.SweepMode == "" {
		o.SweepMode = "distributed"
	}
	f.StringVar(&o.SweepMode, "tq-sweep-mode", o.SweepMode,
		`How to do sweeps of transactional task reminders: either "distributed" or "inproc".`)

	f.StringVar(&o.SweepInitiationEndpoint, "tq-sweep-initiation-endpoint", o.SweepInitiationEndpoint,
		`URL path of an endpoint that launches sweeps.`)

	f.Var(luciflag.StringSlice(&o.SweepInitiationLaunchers), "tq-sweep-initiation-launcher",
		`Service account email allowed to hit -tq-sweep-initiation-endpoint. May be repeated.`)

	if o.SweepTaskQueue == "" {
		o.SweepTaskQueue = "tq-sweep"
	}
	f.StringVar(&o.SweepTaskQueue, "tq-sweep-task-queue", o.SweepTaskQueue,
		`A queue name to use to distribute sweep subtasks`)

	f.StringVar(&o.SweepTaskPrefix, "tq-sweep-task-prefix", o.SweepTaskPrefix,
		`URL prefix to use for sweep subtasks, must start with '/internal/'. Defaults to -tq-serving-prefix.`)

	f.StringVar(&o.SweepTargetHost, "tq-sweep-target-host", o.SweepTargetHost,
		`Hostname to dispatch sweep subtasks to. Defaults to -tq-default-target-host.`)

	if o.SweepShards == 0 {
		o.SweepShards = 16
	}
	f.IntVar(&o.SweepShards, "tq-sweep-shards", o.SweepShards,
		`How many subtasks are submitted when initiating a sweep.`)
}

// NewModule returns a server module that sets up a TQ dispatcher.
func NewModule(opts *ModuleOptions) module.Module {
	if opts == nil {
		opts = &ModuleOptions{}
	}
	return &tqModule{opts: opts}
}

// NewModuleFromFlags is a variant of NewModule that initializes options through
// command line flags.
//
// Calling this function registers flags in flag.CommandLine. They are usually
// parsed in server.Main(...).
func NewModuleFromFlags() module.Module {
	opts := &ModuleOptions{}
	opts.Register(flag.CommandLine)
	return NewModule(opts)
}

// tqModule implements module.Module.
type tqModule struct {
	opts *ModuleOptions
}

// Name is part of module.Module interface.
func (*tqModule) Name() module.Name {
	return ModuleName
}

// Dependencies is part of module.Module interface.
func (*tqModule) Dependencies() []module.Dependency {
	var deps []module.Dependency
	db.VisitImpls(func(db *db.Impl) {
		if db.Module.Valid() {
			deps = append(deps, module.RequiredDependency(db.Module))
		}
	})
	return deps
}

// Initialize is part of module.Module interface.
func (m *tqModule) Initialize(ctx context.Context, host module.Host, opts module.HostOptions) (context.Context, error) {
	if m.opts.Dispatcher == nil {
		m.opts.Dispatcher = &Default
	}
	submitter, err := m.initDispatching(ctx, host, opts)
	if err != nil {
		return nil, err
	}
	if err := m.initSweeping(ctx, host, opts); err != nil {
		return nil, err
	}
	return UseSubmitter(ctx, submitter), nil
}

func (m *tqModule) initDispatching(ctx context.Context, host module.Host, opts module.HostOptions) (Submitter, error) {
	disp := m.opts.Dispatcher

	disp.GAE = opts.GAE
	disp.DisableAuth = !opts.Prod
	disp.DefaultTargetHost = m.opts.DefaultTargetHost
	disp.AuthorizedPushers = m.opts.AuthorizedPushers

	disp.CloudProject = m.opts.CloudProject
	if disp.CloudProject == "" {
		disp.CloudProject = opts.CloudProject
	}
	disp.CloudRegion = m.opts.CloudRegion
	if disp.CloudRegion == "" {
		disp.CloudRegion = opts.CloudRegion
	}

	if err := ValidateNamespace(m.opts.Namespace); err != nil {
		return nil, errors.Annotate(err, "bad TQ namespace %q", m.opts.Namespace).Err()
	}
	disp.Namespace = m.opts.Namespace

	if m.opts.PushAs != "" {
		disp.PushAs = m.opts.PushAs
	} else {
		info, err := auth.GetSigner(ctx).ServiceInfo(ctx)
		if err != nil {
			return nil, errors.Annotate(err, "failed to get own service account email").Err()
		}
		disp.PushAs = info.ServiceAccountName
	}

	var submitter Submitter
	if opts.Prod {
		// When running for real use real services.
		creds, err := auth.GetPerRPCCredentials(ctx, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
		if err != nil {
			return nil, errors.Annotate(err, "failed to get PerRPCCredentials").Err()
		}
		cloudSub, err := NewCloudSubmitter(ctx, creds)
		if err != nil {
			return nil, err
		}
		host.RegisterCleanup(func(ctx context.Context) { cloudSub.Close() })
		submitter = cloudSub
	} else {
		// When running locally use a simple in-memory scheduler, but go through
		// HTTP layer to pick up logging, middlewares, etc.
		scheduler := &tqtesting.Scheduler{
			Executor: &tqtesting.LoopbackHTTPExecutor{
				ServerAddr: host.HTTPAddr(),
			},
		}
		host.RunInBackground("luci.tq", func(ctx context.Context) {
			scheduler.Run(ctx, tqtesting.ParallelExecute())
		})
		submitter = scheduler
	}

	if m.opts.ServingPrefix != "-" {
		logging.Infof(ctx, "TQ is serving tasks from %q", m.opts.ServingPrefix)
		if !strings.HasPrefix(m.opts.ServingPrefix, "/internal/") {
			return nil, errors.Reason(`-tq-serving-prefix must start with "/internal/", got %q`, m.opts.ServingPrefix).Err()
		}
		disp.InstallTasksRoutes(host.Routes(), m.opts.ServingPrefix)
	}

	return submitter, nil
}

func (m *tqModule) initSweeping(ctx context.Context, host module.Host, opts module.HostOptions) error {
	// Fill in defaults.
	if m.opts.SweepInitiationEndpoint == "" {
		if opts.GAE || !opts.Prod {
			m.opts.SweepInitiationEndpoint = "/internal/tasks/c/sweep"
		} else {
			m.opts.SweepInitiationEndpoint = "-"
		}
	}

	if len(m.opts.SweepInitiationLaunchers) == 0 {
		info, err := auth.GetSigner(ctx).ServiceInfo(ctx)
		if err != nil {
			return errors.Annotate(err, "failed to get own service account email").Err()
		}
		m.opts.SweepInitiationLaunchers = []string{info.ServiceAccountName}
	}

	if m.opts.SweepTaskPrefix == "" {
		if m.opts.ServingPrefix != "-" {
			m.opts.SweepTaskPrefix = m.opts.ServingPrefix
		} else {
			m.opts.SweepTaskPrefix = "/internal/tasks"
		}
	}
	if !strings.HasPrefix(m.opts.SweepTaskPrefix, "/internal/") {
		return errors.Reason(`-tq-sweep-task-prefix must start with "/internal/", got %q`, m.opts.SweepTaskPrefix).Err()
	}

	if m.opts.SweepTargetHost == "" {
		m.opts.SweepTargetHost = m.opts.DefaultTargetHost // may be "" on GAE
	}

	disp := m.opts.Dispatcher

	// Setup the sweep processing.
	switch m.opts.SweepMode {
	case "distributed":
		logging.Infof(ctx, "TQ sweep task queue is %q", m.opts.SweepTaskQueue)
		disp.Sweeper = NewDistributedSweeper(disp, DistributedSweeperOptions{
			SweepShards:         m.opts.SweepShards,
			TasksPerScan:        2048, // TODO: make configurable if necessary
			SecondaryScanShards: 16,   // TODO: make configurable if necessary
			LessorID:            "",   // TODO: make configurable if necessary
			TaskQueue:           m.opts.SweepTaskQueue,
			TaskPrefix:          m.opts.SweepTaskPrefix,
			TaskHost:            m.opts.SweepTargetHost,
		})
	case "inproc":
		logging.Infof(ctx, "TQ is using inproc sweeper")
		disp.Sweeper = NewInProcSweeper(InProcSweeperOptions{
			SweepShards:             m.opts.SweepShards,
			TasksPerScan:            2048, // TODO: make configurable if necessary
			SecondaryScanShards:     16,   // TODO: make configurable if necessary
			SubmitBatchSize:         128,  // TODO: make configurable if necessary
			SubmitConcurrentBatches: 32,   // TODO: make configurable if necessary
		})
	default:
		return errors.Reason(`invalid -sweep-mode %q, must be either "distributed" or "inproc"`, m.opts.SweepMode).Err()
	}

	// Setup the sweep initiation.
	if m.opts.SweepInitiationEndpoint != "-" {
		logging.Infof(ctx, "TQ sweep initiation endpoint is %q", m.opts.SweepInitiationEndpoint)
		disp.InstallSweepRoute(host.Routes(), m.opts.SweepInitiationEndpoint)
	}

	return nil
}
