blob: e543fc129e4200e5011ab05472c7fe04b8f5324a [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Executable tq-sweeper-spanner runs an inproc sweeping driver that scans
// Spanner database for TQ transactional tasks reminders.
//
// It should run as a single replica per Spanner DB. It is OK if it temporarily
// goes offline or runs on multiple replicas: this will just delay the task
// reminders sweeping.
package main
import (
"context"
"flag"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/server"
"go.chromium.org/luci/server/module"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/server/tq"
// Sweep Spanner DB.
_ "go.chromium.org/luci/server/tq/txn/spanner"
)
func main() {
sweepInterval := flag.Duration("tq-sweep-interval", time.Minute, "How often to run full sweeps")
opts := &tq.ModuleOptions{
SweepMode: "inproc",
ServingPrefix: "-", // not serving any tasks
SweepInitiationEndpoint: "-", // sweeps are launched by the timer only
}
opts.Register(flag.CommandLine)
modules := []module.Module{
span.NewModuleFromFlags(),
tq.NewModule(opts),
}
server.Main(nil, modules, func(srv *server.Server) error {
if *sweepInterval < 0 {
*sweepInterval = 0
}
srv.RunInBackground("", func(ctx context.Context) {
count := 0
for ctx.Err() == nil {
count++
start := clock.Now(ctx)
logging.Infof(ctx, "--------------- Starting the sweep #%d ------------------", count)
tq.Default.Sweep(ctx)
logging.Infof(ctx, "--------------- Sweep #%d done in %s ---------------", count, clock.Now(ctx).Sub(start))
if sleep := start.Add(*sweepInterval).Sub(clock.Now(ctx)); sleep > 0 && ctx.Err() == nil {
logging.Infof(ctx, "Waiting %s until the next sweep", sleep)
clock.Sleep(ctx, sleep)
}
}
})
return nil
})
}