blob: f985e507de3ad13dd5fcd13334f68c53cc9d959e [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tsmon
import (
// DatastoreNamespace is a datastore namespace with all tsmon state.
const DatastoreNamespace = "ts_mon_instance_namespace"
// DatastoreTaskNumAllocator implements TaskNumAllocator on top of datastore.
// Its NotifyTaskIsAlive registers a claim for a task number, which is later
// fulfilled by the housekeeping cron (see AssignTaskNumbers).
type DatastoreTaskNumAllocator struct {
// NotifyTaskIsAlive is part of TaskNumAllocator interface.
func (DatastoreTaskNumAllocator) NotifyTaskIsAlive(ctx context.Context, task *target.Task, instanceID string) (taskNum int, err error) {
ctx = dsContext(ctx)
// Exact values here are not important. Important properties are:
// * 'entityID' is unique, and depends on both 'task' and 'instanceID'.
// * All instances from same task have same 'target' value.
// The cron ('AssignTaskNumbers') will fetch all entities and will group them
// by 'target' value before assigning task numbers.
target := fmt.Sprintf("%s|%s|%s|%s", task.DataCenter, task.ServiceName, task.JobName, task.HostName)
entityID := fmt.Sprintf("%s|%s", target, instanceID)
err = datastore.RunInTransaction(ctx, func(ctx context.Context) error {
entity := instance{ID: entityID}
switch err := datastore.Get(ctx, &entity); {
case err == datastore.ErrNoSuchEntity:
entity.Target = target
entity.TaskNum = -1
case err != nil:
return err
entity.LastUpdated = clock.Now(ctx).UTC()
taskNum = entity.TaskNum
return datastore.Put(ctx, &entity)
}, nil)
if err == nil && taskNum == -1 {
err = tsmon.ErrNoTaskNumber
// AssignTaskNumbers updates the set of task number requests created with
// DatastoreTaskNumAllocator.
// It assigns unique task numbers to those without ones set, and expires old
// ones (thus reclaiming task numbers assigned to them).
// Must be used from some (global per project) cron if DatastoreTaskNumAllocator
// is used. Use 'InstallHandlers' to install the corresponding cron handler.
func AssignTaskNumbers(ctx context.Context) error {
ctx = dsContext(ctx)
now := clock.Now(ctx)
cutoff := now.Add(-instanceExpirationTimeout)
perTarget := map[string]*workingSet{}
// Enumerate all instances stored in the datastore and expire old ones (in
// batches). Collect a set of used task numbers and a set of instances not
// assigned a number yet. Group by 'Target' (can be "" for old entities, this
// is fine).
q := datastore.NewQuery("Instance")
err := datastore.RunBatch(ctx, int32(taskQueryBatchSize), q, func(entity *instance) error {
set := perTarget[entity.Target]
if set == nil {
set = newWorkingSet(cutoff)
perTarget[entity.Target] = set
set.addInstance(ctx, entity)
if len(set.expired) >= taskQueryBatchSize {
if err := set.cleanupExpired(ctx); err != nil {
return err
return nil
if err != nil {
return errors.Annotate(err, "failed to enumerate or expire entries").Err()
return parallel.FanOutIn(func(tasks chan<- func() error) {
for target, set := range perTarget {
target := target
set := set
tasks <- func() error {
// "Flush" all pending expired instances.
if err := set.cleanupExpired(ctx); err != nil {
logging.WithError(err).Errorf(ctx, "Failed to delete expired entries for target %q", target)
return err
// Assign task numbers to those that don't have one assigned yet.
logging.Debugf(ctx, "Found %d expired and %d unassigned instances for target %q", set.totalExpired, len(set.pending), target)
if err := set.assignTaskNumbers(ctx); err != nil {
logging.WithError(err).Errorf(ctx, "Failed to assign task numbers for target %q", target)
return err
return nil
const (
instanceExpirationTimeout = 30 * time.Minute
taskQueryBatchSize = 500
// dsContext is used for all datastore accesses that touch 'instance' entities.
// It switches the namespace and disables dscache, since these entities are
// updated from Flex, which doesn't work with dscache. Besides, all reads happen
// either in transactions or through queries - dscache is useless anyhow.
func dsContext(ctx context.Context) context.Context {
ctx = info.MustNamespace(ctx, DatastoreNamespace)
return dscache.AddShardFunctions(ctx, func(*datastore.Key) (shards int, ok bool) {
return 0, true
// instance corresponds to one process that flushes metrics.
type instance struct {
_kind string `gae:"$kind,Instance"`
_extra datastore.PropertyMap `gae:"-,extra"`
ID string `gae:"$id"`
Target string `gae:"target,noindex"`
TaskNum int `gae:"task_num,noindex"`
LastUpdated time.Time `gae:"last_updated,noindex"`
// Disable dscache to allow these entities be updated from Flex, which doesn't
// work with dscache. Besides, we update these entities from transactions or
// based on queries - dscache is useless anywhere.
_ datastore.Toggle `gae:"$dscache.enable,false"`
// workingSet is used internally by AssignTaskNumbers.
type workingSet struct {
cutoff time.Time // if LastUpdate is before => instance has expired
expired []*datastore.Key // entities with LastUpdate too long ago
pending []*instance // entities with TaskNum is still -1
assignedNums map[int]struct{} // assigned already task numbers
totalExpired int // total number of entities deleted
func newWorkingSet(cutoff time.Time) *workingSet {
return &workingSet{
cutoff: cutoff,
assignedNums: map[int]struct{}{},
func (s *workingSet) addInstance(ctx context.Context, entity *instance) {
switch {
case entity.LastUpdated.Before(s.cutoff):
logging.Debugf(ctx, "Expiring %q (task_num %d), inactive since %s",
entity.ID, entity.TaskNum, entity.LastUpdated)
s.expired = append(s.expired, datastore.KeyForObj(ctx, entity))
case entity.TaskNum < 0:
s.pending = append(s.pending, entity)
s.assignedNums[entity.TaskNum] = struct{}{}
func (s *workingSet) cleanupExpired(ctx context.Context) error {
if len(s.expired) == 0 {
return nil
logging.Debugf(ctx, "Expiring %d instance(s)", len(s.expired))
if err := datastore.Delete(ctx, s.expired); err != nil {
return err
s.totalExpired += len(s.expired)
s.expired = s.expired[:0]
return nil
func (s *workingSet) assignTaskNumbers(ctx context.Context) error {
if len(s.pending) == 0 {
return nil
nextNum := gapFinder(s.assignedNums)
for _, entity := range s.pending {
entity.TaskNum = nextNum()
logging.Debugf(ctx, "Assigned %q task_num %d", entity.ID, entity.TaskNum)
// Update all pending entities. This is non-transactional, meaning:
// * We may override newer LastUpdated - no big deal.
// * If there are two parallel 'AssignTaskNumbers', they'll screw up each
// other. GAE cron gives some protection against concurrent cron job
// executions though.
return datastore.Put(ctx, s.pending)
func gapFinder(used map[int]struct{}) func() int {
next := 0
return func() int {
for {
n := next
_, has := used[n]
if !has {
return n