blob: 0ef4e7033d500c8c6e8be3421402129ea9189cba [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tumble
import (
"context"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"go.chromium.org/luci/appengine/gaemiddleware"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/server/router"
ds "go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/gae/service/info"
)
const (
baseURL = "/internal/" + baseName
fireAllTasksURL = baseURL + "/fire_all_tasks"
processShardPattern = baseURL + "/process_shard/:shard_id/at/:timestamp"
transientHTTPHeader = "X-LUCI-Tumble-Transient"
)
// Service is an instance of a Tumble service. It installs its handlers into an
// HTTP router and services Tumble request tasks.
type Service struct {
// Namespaces is a function that returns the datastore namespaces that Tumble
// will poll.
//
// If nil, Tumble will be executed against all namespaces registered in the
// datastore.
Namespaces func(context.Context) ([]string, error)
}
// InstallHandlers installs http handlers.
//
// 'base' is usually gaemiddleware.BaseProd(), but can also be its derivative
// if something else it needed in the context.
func (s *Service) InstallHandlers(r *router.Router, base router.MiddlewareChain) {
// GET so that this can be invoked from cron
r.GET(fireAllTasksURL, base.Extend(gaemiddleware.RequireCron), s.FireAllTasksHandler)
r.POST(processShardPattern, base.Extend(gaemiddleware.RequireTaskQueue(baseName)),
func(ctx *router.Context) {
loop := ctx.Request.URL.Query().Get("single") == ""
s.ProcessShardHandler(ctx, loop)
})
}
// FireAllTasksHandler is an HTTP handler that expects `logging` and `luci/gae`
// services to be installed into the context.
//
// FireAllTasksHandler verifies that it was called within an Appengine Cron
// request, and then invokes the FireAllTasks function.
func (s *Service) FireAllTasksHandler(c *router.Context) {
if err := s.FireAllTasks(c.Context); err != nil {
c.Writer.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(c.Writer, "fire_all_tasks failed: %s", err)
} else {
c.Writer.Write([]byte("ok"))
}
}
// FireAllTasks searches for work in all namespaces, and fires off a process
// task for any shards it finds that have at least one Mutation present to
// ensure that no work languishes forever. This may not be needed in
// a constantly-loaded system with good tumble key distribution.
func (s *Service) FireAllTasks(c context.Context) error {
cfg := getConfig(c)
namespaces, err := s.getNamespaces(c, cfg)
if err != nil {
return err
}
// Probe each namespace in parallel. Each probe function reports its own
// errors, so the work pool will never return any non-nil error response.
var errCount, taskCount counter
_ = parallel.WorkPool(cfg.NumGoroutines, func(ch chan<- func() error) {
for _, ns := range namespaces {
ns := ns
ch <- func() error {
// Generate a list of all shards.
allShards := make([]taskShard, 0, cfg.TotalShardCount(ns))
for i := uint64(0); i < cfg.TotalShardCount(ns); i++ {
allShards = append(allShards, taskShard{i, minTS})
}
s.fireAllTasksForNamespace(c, cfg, ns, allShards, &errCount, &taskCount)
return nil
}
}
})
if errCount > 0 {
logging.Errorf(c, "Encountered %d error(s).", errCount)
return errors.New("errors were encountered while probing for tasks")
}
logging.Debugf(c, "Successfully probed %d namespace(s) and fired %d tasks(s).",
len(namespaces), taskCount)
return err
}
func (s *Service) fireAllTasksForNamespace(c context.Context, cfg *Config, ns string, allShards []taskShard,
errCount, taskCount *counter) {
// Enter the supplied namespace.
logging.Infof(c, "Firing all tasks for namespace %q", ns)
c = info.MustNamespace(c, ns)
if ns != "" {
c = logging.SetField(c, "namespace", ns)
}
// First, check if the namespace has *any* Mutations.
q := ds.NewQuery("tumble.Mutation").KeysOnly(true).Limit(1)
switch amt, err := ds.Count(c, q); {
case err != nil:
logging.WithError(err).Errorf(c, "Error querying for Mutations")
errCount.inc()
return
case amt == 0:
logging.Infof(c, "No Mutations registered for this namespace.")
return
}
// We have at least one Mutation for this namespace. Iterate through all
// shards and dispatch a processing task for each one that has Mutations.
//
// Track shards that we find work for. After scanning is complete, fire off
// tasks for all identified shards.
triggerShards := make(map[taskShard]struct{}, len(allShards))
for _, shrd := range allShards {
amt, err := ds.Count(c, processShardQuery(c, cfg, shrd.shard).Limit(1))
if err != nil {
logging.Fields{
logging.ErrorKey: err,
"shard": shrd.shard,
}.Errorf(c, "Error querying for shards")
errCount.inc()
break
}
if amt > 0 {
logging.Infof(c, "Found work in shard [%d]", shrd.shard)
triggerShards[shrd] = struct{}{}
}
}
// Fire tasks for shards with identified work.
if len(triggerShards) > 0 {
logging.Infof(c, "Firing tasks for %d tasked shard(s).", len(triggerShards))
if !fireTasks(c, cfg, triggerShards, false) {
logging.Errorf(c, "Failed to fire tasks.")
errCount.inc()
} else {
taskCount.add(len(triggerShards))
}
} else {
logging.Infof(c, "No tasked shards were found.")
}
}
func (s *Service) getNamespaces(c context.Context, cfg *Config) ([]string, error) {
// Get the set of namespaces to handle.
nsFn := s.Namespaces
if nsFn == nil {
nsFn = getDatastoreNamespaces
}
namespaces, err := nsFn(c)
if err != nil {
logging.WithError(err).Errorf(c, "Failed to enumerate namespaces.")
return nil, err
}
return namespaces, nil
}
// ProcessShardHandler is an HTTP handler that expects `logging` and `luci/gae`
// services to be installed into the context.
//
// ProcessShardHandler verifies that its being run as a taskqueue task and that
// the following parameters exist and are well-formed:
// * timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
// * shard_id: decimal-encoded shard identifier.
//
// ProcessShardHandler then invokes ProcessShard with the parsed parameters. It
// runs in the namespace of the task which scheduled it and processes mutations
// for that namespace.
func (s *Service) ProcessShardHandler(ctx *router.Context, loop bool) {
c, rw, p := ctx.Context, ctx.Writer, ctx.Params
tstampStr := p.ByName("timestamp")
sidStr := p.ByName("shard_id")
tstamp, err := strconv.ParseInt(tstampStr, 10, 64)
if err != nil {
logging.Errorf(c, "bad timestamp %q", tstampStr)
rw.WriteHeader(http.StatusNotFound)
fmt.Fprintf(rw, "bad timestamp")
return
}
sid, err := strconv.ParseUint(sidStr, 10, 64)
if err != nil {
logging.Errorf(c, "bad shardID %q", tstampStr)
rw.WriteHeader(http.StatusNotFound)
fmt.Fprintf(rw, "bad shardID")
return
}
cfg := getConfig(c)
logging.Infof(c, "Processing tasks in namespace %q", info.GetNamespace(c))
// AppEngine backend instances run for 10 minute at most,
// set the overall context deadline to 9 minutes.
c, cancel := clock.WithDeadline(c, clock.Now(c).Add(9*time.Minute))
defer cancel()
err = processShard(c, cfg, time.Unix(tstamp, 0).UTC(), sid, loop)
if err != nil {
logging.Errorf(c, "failure! %s", err)
if transient.Tag.In(err) {
rw.Header().Add(transientHTTPHeader, "true")
}
rw.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(rw, "error: %s", err)
} else {
rw.Write([]byte("ok"))
}
}
// getDatastoreNamespaces returns a list of all of the namespaces in the
// datastore.
//
// This is done by issuing a datastore query for kind "__namespace__". The
// resulting keys will have IDs for the namespaces, namely:
// - The default namespace will have integer ID 1.
// - Other namespaces will have string IDs.
func getDatastoreNamespaces(c context.Context) ([]string, error) {
q := ds.NewQuery("__namespace__").KeysOnly(true)
// Query our datastore for the full set of namespaces.
var namespaceKeys []*ds.Key
if err := ds.GetAll(c, q, &namespaceKeys); err != nil {
logging.WithError(err).Errorf(c, "Failed to execute namespace query.")
return nil, err
}
namespaces := make([]string, 0, len(namespaceKeys))
for _, nk := range namespaceKeys {
// Add our namespace ID. For the default namespace, the key will have an
// integer ID of 1, so StringID will correctly be an empty string.
namespaces = append(namespaces, nk.StringID())
}
return namespaces, nil
}
// processURL creates a new url for a process shard taskqueue task, including
// the given timestamp and shard number.
func processURL(ts timestamp, shard uint64, ns string, loop bool) string {
v := strings.NewReplacer(
":shard_id", fmt.Sprint(shard),
":timestamp", strconv.FormatInt(int64(ts), 10),
).Replace(processShardPattern)
// Append our namespace query parameter. This is cosmetic, and the default
// namespace will have this query parameter omitted.
query := url.Values{}
if ns != "" {
query.Set("ns", ns)
}
if !loop {
query.Set("single", "1")
}
if len(query) > 0 {
v += "?" + query.Encode()
}
return v
}