blob: 4dd09797d9f2f91f5a9c8a38f5636833e5af26c4 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tumble
import (
"context"
"fmt"
"math"
"strings"
"time"
ds "go.chromium.org/gae/service/datastore"
"go.chromium.org/gae/service/info"
tq "go.chromium.org/gae/service/taskqueue"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
)
type timestamp int64
const minTS timestamp = math.MinInt64
func (t timestamp) Unix() time.Time {
return time.Unix((int64)(t), 0).UTC()
}
func mkTimestamp(cfg *Config, t time.Time) timestamp {
trf := time.Duration(cfg.TemporalRoundFactor)
eta := t.UTC().Add(time.Duration(cfg.TemporalMinDelay) + trf).Round(trf)
return timestamp(eta.Unix())
}
type taskShard struct {
shard uint64
time timestamp
}
func fireTasks(c context.Context, cfg *Config, shards map[taskShard]struct{}, loop bool) bool {
if len(shards) == 0 {
return true
}
nextSlot := mkTimestamp(cfg, clock.Now(c).UTC())
logging.Fields{
"slot": nextSlot,
}.Debugf(c, "got next slot")
tasks := make([]*tq.Task, 0, len(shards))
// Transform our namespace into a valid task queue task name.
ns := info.GetNamespace(c)
taskNS := nsToTaskName(ns)
for shard := range shards {
eta := nextSlot
if cfg.DelayedMutations && shard.time > eta {
eta = shard.time
}
// Generate our task name.
//
// Fold namespace into the task name, since task names must be unique across
// all namespaces.
taskName := fmt.Sprintf("%d_%s_%d", eta, taskNS, shard.shard)
if !loop {
// Differentiate non-loop (cron) tasks from loop (Mutation-scheduled)
// tasks so we don't supplant a long-running task with a cron task due to
// timing.
taskName += "_single"
}
tsk := &tq.Task{
Name: taskName,
Path: processURL(eta, shard.shard, ns, loop),
ETA: eta.Unix(),
// TODO(riannucci): Tune RetryOptions?
}
tasks = append(tasks, tsk)
logging.Infof(c, "added task %q %s %s", tsk.Name, tsk.Path, tsk.ETA)
}
if err := errors.Filter(tq.Add(ds.WithoutTransaction(c), baseName, tasks...), tq.ErrTaskAlreadyAdded); err != nil {
logging.Warningf(c, "attempted to fire tasks %v, but failed: %s", shards, err)
return false
}
return true
}
// nsToTaskName flattens a namespace into a string that can be part of a valid
// task queue task name.
func nsToTaskName(v string) string {
// Escape single underscores in the namespace name.
v = strings.Replace(v, "_", "__", -1)
// Replace any invalid task queue name characters with underscore.
return strings.Map(func(r rune) rune {
switch {
case (r >= 'a' && r <= 'z'),
(r >= 'A' && r <= 'Z'),
r == '_', r == '-':
return r
default:
return '_'
}
}, v)
}