blob: ea791bf8e4cd42f2c04b75038593339dcf19ac43 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsmon
import (
"net/http"
"golang.org/x/net/context"
ds "github.com/luci/gae/service/datastore"
"github.com/luci/gae/service/info"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/tsmon"
"github.com/luci/luci-go/server/router"
)
// InstallHandlers installs HTTP handlers for tsmon routes.
func InstallHandlers(r *router.Router, base router.MiddlewareChain) {
r.GET("/internal/cron/ts_mon/housekeeping", base, housekeepingHandler)
}
// housekeepingHandler is an HTTP handler that should be run every minute by
// cron on App Engine. It assigns task numbers to datastore entries, and runs
// any global metric callbacks.
func housekeepingHandler(c *router.Context) {
if !info.IsDevAppServer(c.Context) && c.Request.Header.Get("X-Appengine-Cron") != "true" {
c.Writer.WriteHeader(http.StatusForbidden)
http.Error(c.Writer, "request not made from cron", http.StatusForbidden)
return
}
if err := assignTaskNumbers(c.Context); err != nil {
c.Writer.WriteHeader(http.StatusInternalServerError)
}
tsmon.GetState(c.Context).RunGlobalCallbacks(c.Context)
}
// assignTaskNumbers does some housekeeping on the datastore entries for App
// Engine instances - assigning unique task numbers to those without ones set,
// and expiring old entities.
func assignTaskNumbers(c context.Context) error {
c = info.MustNamespace(c, instanceNamespace)
now := clock.Now(c)
expiredTime := now.Add(-instanceExpirationTimeout)
usedTaskNums := map[int]struct{}{}
totalExpired := 0
expiredKeys := make([]*ds.Key, 0, ds.Raw(c).Constraints().QueryBatchSize)
var unassigned []*instance
// expireInstanceBatch processes the set of instances in "expiredKeys",
// deletes them, and clears the list for the next iteration.
//
// We do this in batches to handle large numbers without inflating memory
// requirements. If there are any timeouts or problems, this will also enable
// us to iteratively chip away at the problem.
expireInstanceBatch := func(c context.Context) error {
if len(expiredKeys) == 0 {
return nil
}
logging.Debugf(c, "Expiring %d instance(s)", len(expiredKeys))
if err := ds.Delete(c, expiredKeys); err != nil {
logging.WithError(err).Errorf(c, "Failed to expire instances.")
return err
}
// Clear the instances list for next round.
totalExpired += len(expiredKeys)
expiredKeys = expiredKeys[:0]
return nil
}
// Query all instances from datastore.
q := ds.NewQuery("Instance")
b := ds.Batcher{
Callback: expireInstanceBatch,
}
if err := b.Run(c, q, func(i *instance) {
if i.TaskNum >= 0 {
usedTaskNums[i.TaskNum] = struct{}{}
}
if i.LastUpdated.Before(expiredTime) {
expiredKeys = append(expiredKeys, ds.NewKey(c, "Instance", i.ID, 0, nil))
logging.Debugf(c, "Expiring %s task_num %d, inactive since %s",
i.ID, i.TaskNum, i.LastUpdated.String())
} else if i.TaskNum < 0 {
unassigned = append(unassigned, i)
}
}); err != nil {
logging.WithError(err).Errorf(c, "Failed to get Instance entities from datastore")
return err
}
// Final expiration round.
if err := expireInstanceBatch(c); err != nil {
logging.WithError(err).Debugf(c, "Failed to expire final instance batch.")
return err
}
logging.Debugf(c, "Found %d expired and %d unassigned instances",
totalExpired, len(unassigned))
// Assign task numbers to those that don't have one assigned yet.
nextNum := gapFinder(usedTaskNums)
for _, i := range unassigned {
i.TaskNum = nextNum()
logging.Debugf(c, "Assigned %s task_num %d", i.ID, i.TaskNum)
}
// Update all the entities in datastore.
if err := ds.Put(c, unassigned); err != nil {
logging.WithError(err).Errorf(c, "Failed to update task numbers")
return err
}
return nil
}
func gapFinder(used map[int]struct{}) func() int {
next := 0
return func() int {
for {
n := next
next++
_, has := used[n]
if !has {
return n
}
}
}
}