blob: 286c97569613f7f60d0c1bd4046182c83a2aa1d0 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"container/heap"
"context"
"errors"
"fmt"
"hash/fnv"
"net/http"
"regexp"
"sync"
"sync/atomic"
"time"
prodConstraints "go.chromium.org/luci/gae/impl/prod/constraints"
ds "go.chromium.org/luci/gae/service/datastore"
tq "go.chromium.org/luci/gae/service/taskqueue"
)
var (
currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
errBadRequest = errors.New("BAD_REQUEST")
errInvalidTaskName = errors.New("INVALID_TASK_NAME")
errUnknownQueue = errors.New("UNKNOWN_QUEUE")
errTombstonedTask = errors.New("TOMBSTONED_TASK")
errUnknownTask = errors.New("UNKNOWN_TASK")
errInvalidQueueMode = errors.New("INVALID_QUEUE_MODE")
errTaskLeaseExpired = errors.New("TASK_LEASE_EXPIRED")
)
//////////////////////////////// sortedQueue ///////////////////////////////////
type sortedQueue struct {
name string
isPullQueue bool
nextAutoGenID uint64
tasks map[string]*tq.Task // added, but not deleted
archived map[string]*tq.Task // tombstones
sorted taskIndex // sorted by (ETA, name)
sortedPerTag map[string]*taskIndex // tag => tasks sorted by (ETA, name)
}
func newSortedQueue(name string, isPullQueue bool) *sortedQueue {
// Pick the initial value of the counter based on queue name, so it looks
// scary. To make sure users don't attempt to "guess" IDs or correlate them
// between different queues.
h := fnv.New64()
h.Write([]byte(name))
return &sortedQueue{
name: name,
isPullQueue: isPullQueue,
nextAutoGenID: h.Sum64(),
tasks: map[string]*tq.Task{},
archived: map[string]*tq.Task{},
sortedPerTag: map[string]*taskIndex{},
}
}
// All sortedQueue methods are assumed to be called under taskQueueData lock.
func (q *sortedQueue) genTaskName() string {
q.nextAutoGenID++
return fmt.Sprintf("%d", q.nextAutoGenID)
}
func (q *sortedQueue) addTask(task *tq.Task) error {
switch {
case task.Method == "PULL" && !q.isPullQueue:
return errInvalidQueueMode
case task.Method != "PULL" && q.isPullQueue:
return errInvalidQueueMode
}
if _, ok := q.archived[task.Name]; ok {
// SDK converts TOMBSTONE -> already added too
return tq.ErrTaskAlreadyAdded
} else if _, ok := q.tasks[task.Name]; ok {
return tq.ErrTaskAlreadyAdded
}
q.tasks[task.Name] = task
if q.isPullQueue {
q.sorted.add(task)
perTag, ok := q.sortedPerTag[task.Tag]
if !ok {
perTag = &taskIndex{}
q.sortedPerTag[task.Tag] = perTag
}
perTag.add(task)
}
return nil
}
func (q *sortedQueue) deleteTask(task *tq.Task) error {
if _, ok := q.archived[task.Name]; ok {
return errTombstonedTask
}
if _, ok := q.tasks[task.Name]; !ok {
return errUnknownTask
}
t := q.tasks[task.Name]
q.archived[task.Name] = t
delete(q.tasks, task.Name)
if q.isPullQueue {
q.sorted.remove(t)
q.sortedPerTag[t.Tag].remove(t)
}
return nil
}
func (q *sortedQueue) leaseTasks(now time.Time, maxTasks int, leaseTime time.Duration, useTag bool, tag string) ([]*tq.Task, error) {
if !q.isPullQueue {
return nil, errInvalidQueueMode
}
if maxTasks <= 0 {
return nil, errBadRequest
}
leaseSec := int(leaseTime / time.Second)
if leaseSec < 0 {
return nil, errBadRequest
}
if !useTag && tag != "" {
panic("taskqueue: impossible leaseTasks call")
}
// useTag == true and tag == "" is VALID request here. It means "find
// the first task by ETA, and use its tag as if it was passed to leaseTasks,
// or fetch only untagged tasks if the tag is not set". That's how production
// API works too.
if useTag && tag == "" {
// Fetch the first task with ETA <= now to examine its tag.
task := q.sorted.peek()
if task == nil || task.ETA.After(now) {
return nil, nil // no ready tasks at all
}
// It is possible 'tag' is "" here. It means "fetch only untagged tasks".
tag = task.Tag
}
// Extract all the tasks that match the criteria. We'll update their ETA and
// push them back into the index (at updated positions).
var tasks []*tq.Task
if useTag {
if perTag := q.sortedPerTag[tag]; perTag != nil {
tasks = perTag.extract(now, maxTasks)
for _, t := range tasks {
q.sorted.remove(t)
}
}
} else {
tasks = q.sorted.extract(now, maxTasks)
for _, t := range tasks {
q.sortedPerTag[t.Tag].remove(t)
}
}
// Seconds precision is important, that's how production API works.
newETA := now.Add(time.Duration(leaseSec) * time.Second)
for _, t := range tasks {
t.ETA = newETA
q.sorted.add(t)
q.sortedPerTag[t.Tag].add(t)
}
out := make([]*tq.Task, len(tasks))
for i := range tasks {
out[i] = tasks[i].Duplicate()
}
return out, nil
}
func (q *sortedQueue) modifyTaskLease(now time.Time, t *tq.Task, leaseTime time.Duration) error {
if !q.isPullQueue {
return errInvalidQueueMode
}
leaseSec := int(leaseTime / time.Second)
if leaseSec < 0 {
return errBadRequest
}
if _, ok := q.archived[t.Name]; ok {
return errTombstonedTask
}
if _, ok := q.tasks[t.Name]; !ok {
return errUnknownTask
}
// Check ownership of the task by using ETA field as a "cookie". Clients are
// supposed to round-trip the ETA they get from 'leaseTasks' back to
// 'modifyLease'. Production API works the same way.
task := q.tasks[t.Name]
if !task.ETA.Equal(t.ETA) {
return errTaskLeaseExpired
}
// The lease has been lost by timeout.
if now.After(task.ETA) {
return errTaskLeaseExpired
}
// Update the lease and the indexes. Seconds precision is important, that's
// how production API works.
q.sorted.remove(task)
q.sortedPerTag[task.Tag].remove(task)
task.ETA = now.Add(time.Duration(leaseSec) * time.Second)
q.sorted.add(task)
q.sortedPerTag[task.Tag].add(task)
// Make the caller know the new ETA, in case it needs to be passed to
// 'modifyLease' again.
t.ETA = task.ETA
return nil
}
func (q *sortedQueue) purge() {
q.tasks = map[string]*tq.Task{}
q.archived = map[string]*tq.Task{}
q.sorted = taskIndex{}
q.sortedPerTag = map[string]*taskIndex{}
}
func (q *sortedQueue) getStats() *tq.Statistics {
s := tq.Statistics{
Tasks: len(q.tasks),
}
for _, t := range q.tasks {
if s.OldestETA.IsZero() {
s.OldestETA = t.ETA
} else if t.ETA.Before(s.OldestETA) {
s.OldestETA = t.ETA
}
}
return &s
}
/////////////////////////////// Indexing helpers ///////////////////////////////
// taskIndex is a heap of tasks sorted by (ETA, name), oldest first.
type taskIndex struct {
data taskIndexData
}
// add puts the task into the index, complexity is O(log(N)).
func (idx *taskIndex) add(t *tq.Task) {
heap.Push(&idx.data, t)
}
// remove deletes the task from the index, complexity is O(N).
func (idx *taskIndex) remove(t *tq.Task) {
for i, task := range idx.data {
if t == task {
heap.Remove(&idx.data, i)
return
}
}
}
// peek returns the task with the minimum ETA, complexity is O(1).
func (idx *taskIndex) peek() *tq.Task {
if len(idx.data) == 0 {
return nil
}
return idx.data[0]
}
// extract finds up to 'max' tasks with ETA <= now and removes them.
//
// Returns them as well, preserving the order (the first returned task is the
// oldest).
//
// Complexity is O(log(N)*max).
func (idx *taskIndex) extract(now time.Time, max int) []*tq.Task {
var out []*tq.Task
for len(idx.data) > 0 && len(out) < max && !idx.data[0].ETA.After(now) {
out = append(out, heap.Pop(&idx.data).(*tq.Task))
}
return out
}
// taskIndexData implements heap.Interface.
type taskIndexData []*tq.Task
func (d taskIndexData) Len() int { return len(d) }
func (d taskIndexData) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d taskIndexData) Less(i, j int) bool {
if d[i].ETA.Equal(d[j].ETA) {
return d[i].Name < d[j].Name
}
return d[i].ETA.Before(d[j].ETA)
}
func (d *taskIndexData) Push(x interface{}) {
*d = append(*d, x.(*tq.Task))
}
func (d *taskIndexData) Pop() interface{} {
old := *d
n := len(old)
x := old[n-1]
*d = old[0 : n-1]
return x
}
//////////////////////////////// taskQueueData /////////////////////////////////
type taskQueueData struct {
lock sync.Mutex
queues map[string]*sortedQueue
constraints tq.Constraints
}
var _ memContextObj = (*taskQueueData)(nil)
func newTaskQueueData() memContextObj {
return &taskQueueData{
queues: map[string]*sortedQueue{"default": newSortedQueue("default", false)},
constraints: prodConstraints.TQ(),
}
}
func (t *taskQueueData) endTxn() {}
func (t *taskQueueData) beginCommit(c context.Context, txnCtxObj memContextObj) txnCommitOp {
txn := txnCtxObj.(*txnTaskQueueData)
txn.lock.Lock() // no need to hold t.lock, since no collisions are possible
return &txnCommitCallback{
unlock: txn.lock.Unlock,
apply: func() {
t.lock.Lock()
defer t.lock.Unlock()
for qn, tasks := range txn.anony {
q := t.queues[qn]
for _, tsk := range tasks {
err := q.addTask(tsk) // prepped in txnTaskQueueData.AddMulti, must be good
if err != nil {
panic(err)
}
}
}
txn.anony = nil
},
}
}
func (t *taskQueueData) mkTxn(*ds.TransactionOptions) memContextObj {
return &txnTaskQueueData{
parent: t,
anony: tq.AnonymousQueueData{},
}
}
func (t *taskQueueData) getTransactionTasks(ns string) tq.AnonymousQueueData { return nil }
func (t *taskQueueData) createQueue(queueName string) {
t.createQueueInternal(queueName, false)
}
func (t *taskQueueData) createPullQueue(queueName string) {
t.createQueueInternal(queueName, true)
}
func (t *taskQueueData) createQueueInternal(queueName string, isPullQueue bool) {
t.lock.Lock()
defer t.lock.Unlock()
if _, ok := t.queues[queueName]; ok {
panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName))
}
t.queues[queueName] = newSortedQueue(queueName, isPullQueue)
}
func (t *taskQueueData) getScheduledTasks(ns string) tq.QueueData {
t.lock.Lock()
defer t.lock.Unlock()
r := make(tq.QueueData, len(t.queues))
for qn, q := range t.queues {
r[qn] = make(map[string]*tq.Task, len(q.tasks))
for tn, t := range q.tasks {
if taskNamespace(t) == ns {
r[qn][tn] = t.Duplicate()
}
}
}
return r
}
func (t *taskQueueData) getTombstonedTasks(ns string) tq.QueueData {
t.lock.Lock()
defer t.lock.Unlock()
r := make(tq.QueueData, len(t.queues))
for qn, q := range t.queues {
r[qn] = make(map[string]*tq.Task, len(q.archived))
for tn, t := range q.archived {
if taskNamespace(t) == ns {
r[qn][tn] = t.Duplicate()
}
}
}
return r
}
func (t *taskQueueData) resetTasks() {
t.lock.Lock()
defer t.lock.Unlock()
for _, q := range t.queues {
q.purge()
}
}
func (t *taskQueueData) getQueueLocked(queueName string) (*sortedQueue, error) {
if queueName == "" {
queueName = "default"
}
q, ok := t.queues[queueName]
if !ok {
return nil, errUnknownQueue
}
return q, nil
}
func (t *taskQueueData) purgeLocked(queueName string) error {
q, err := t.getQueueLocked(queueName)
if err != nil {
return err
}
q.purge()
return nil
}
func (t *taskQueueData) setConstraints(c *tq.Constraints) {
t.lock.Lock()
defer t.lock.Unlock()
if c == nil {
t.constraints = tq.Constraints{}
} else {
t.constraints = *c
}
}
func (t *taskQueueData) getConstraints() tq.Constraints {
t.lock.Lock()
defer t.lock.Unlock()
return t.constraints
}
/////////////////////////////// txnTaskQueueData ///////////////////////////////
type txnTaskQueueData struct {
lock sync.Mutex
// boolean 0 or 1, use atomic.*Int32 to access.
closed int32
anony tq.AnonymousQueueData
parent *taskQueueData
}
var _ memContextObj = (*txnTaskQueueData)(nil)
func (t *txnTaskQueueData) mkTxn(*ds.TransactionOptions) memContextObj {
impossible(fmt.Errorf("cannot start nested transaction"))
return nil
}
func (*txnTaskQueueData) beginCommit(c context.Context, txnCtxObj memContextObj) txnCommitOp {
impossible(fmt.Errorf("cannot commit a nested transaction"))
return nil
}
func (t *txnTaskQueueData) endTxn() {
if atomic.LoadInt32(&t.closed) == 1 {
panic("cannot end transaction twice")
}
atomic.StoreInt32(&t.closed, 1)
}
func (t *txnTaskQueueData) resetTasks() {
t.lock.Lock()
for queuename := range t.anony {
t.anony[queuename] = nil
}
t.lock.Unlock()
t.parent.resetTasks()
}
func (t *txnTaskQueueData) getTransactionTasks(ns string) tq.AnonymousQueueData {
t.lock.Lock()
defer t.lock.Unlock()
ret := make(tq.AnonymousQueueData, len(t.anony))
for k, vs := range t.anony {
ret[k] = make([]*tq.Task, len(vs))
for i, v := range vs {
if taskNamespace(v) == ns {
ret[k][i] = v.Duplicate()
}
}
}
return ret
}
func (t *txnTaskQueueData) getTombstonedTasks(ns string) tq.QueueData {
return t.parent.getTombstonedTasks(ns)
}
func (t *txnTaskQueueData) getScheduledTasks(ns string) tq.QueueData {
return t.parent.getScheduledTasks(ns)
}
func (t *txnTaskQueueData) createQueue(queueName string) {
t.parent.createQueue(queueName)
}
func (t *txnTaskQueueData) createPullQueue(queueName string) {
t.parent.createPullQueue(queueName)
}
// taskQueueTestable is a tq.Testable implementation that is bound to a
// specified namespace.
type taskQueueTestable struct {
ns string
data interface {
resetTasks()
getTombstonedTasks(ns string) tq.QueueData
getScheduledTasks(ns string) tq.QueueData
getTransactionTasks(ns string) tq.AnonymousQueueData
createQueue(queueName string)
createPullQueue(queueName string)
}
}
func (t *taskQueueTestable) ResetTasks() { t.data.resetTasks() }
func (t *taskQueueTestable) GetTombstonedTasks() tq.QueueData {
return t.data.getTombstonedTasks(t.ns)
}
func (t *taskQueueTestable) GetScheduledTasks() tq.QueueData {
return t.data.getScheduledTasks(t.ns)
}
func (t *taskQueueTestable) GetTransactionTasks() tq.AnonymousQueueData {
return t.data.getTransactionTasks(t.ns)
}
func (t *taskQueueTestable) CreateQueue(queueName string) { t.data.createQueue(queueName) }
func (t *taskQueueTestable) CreatePullQueue(queueName string) { t.data.createPullQueue(queueName) }