blob: 9c3e55db482eb8f521663b2435017e50357bc9dd [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"context"
"fmt"
"net/http"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/gae/service/info"
tq "go.chromium.org/luci/gae/service/taskqueue"
)
/////////////////////////////// public functions ///////////////////////////////
func useTQ(c context.Context) context.Context {
return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface {
memCtx, isTxn := cur(ic)
tqd := memCtx.Get(memContextTQIdx)
ns := info.GetNamespace(ic)
if isTxn {
return &taskqueueTxnImpl{tqd.(*txnTaskQueueData), ic, ns}
}
return &taskqueueImpl{tqd.(*taskQueueData), ic, ns}
})
}
//////////////////////////////// taskqueueImpl /////////////////////////////////
type taskqueueImpl struct {
*taskQueueData
ctx context.Context
ns string
}
var _ tq.RawInterface = (*taskqueueImpl)(nil)
func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
// Reject the entire batch if at least one task is bad. That's how prod API
// behaves too.
if err := checkManyTasks(tasks, false); err != nil {
return err
}
t.lock.Lock()
defer t.lock.Unlock()
q, err := t.getQueueLocked(queueName)
if err != nil {
return err
}
for _, task := range tasks {
name := task.Name
if name == "" {
name = q.genTaskName()
}
task = prepTask(t.ctx, task, name, q.name, t.ns)
err := q.addTask(task)
if err != nil {
cb(nil, err)
} else {
cb(task.Duplicate(), nil)
}
}
return nil
}
func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error {
t.lock.Lock()
defer t.lock.Unlock()
q, err := t.getQueueLocked(queueName)
if err != nil {
return err
}
for i, task := range tasks {
if err := q.deleteTask(task); err != nil {
cb(i, err)
}
}
return nil
}
func (t *taskqueueImpl) Lease(maxTasks int, queueName string, leaseTime time.Duration) ([]*tq.Task, error) {
t.lock.Lock()
defer t.lock.Unlock()
q, err := t.getQueueLocked(queueName)
if err != nil {
return nil, err
}
return q.leaseTasks(clock.Now(t.ctx), maxTasks, leaseTime, false, "")
}
func (t *taskqueueImpl) LeaseByTag(maxTasks int, queueName string, leaseTime time.Duration, tag string) ([]*tq.Task, error) {
t.lock.Lock()
defer t.lock.Unlock()
q, err := t.getQueueLocked(queueName)
if err != nil {
return nil, err
}
return q.leaseTasks(clock.Now(t.ctx), maxTasks, leaseTime, true, tag)
}
func (t *taskqueueImpl) ModifyLease(task *tq.Task, queueName string, leaseTime time.Duration) error {
t.lock.Lock()
defer t.lock.Unlock()
q, err := t.getQueueLocked(queueName)
if err != nil {
return err
}
return q.modifyTaskLease(clock.Now(t.ctx), task, leaseTime)
}
func (t *taskqueueImpl) Purge(queueName string) error {
t.lock.Lock()
defer t.lock.Unlock()
return t.purgeLocked(queueName)
}
func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
t.lock.Lock()
defer t.lock.Unlock()
for _, qn := range queueNames {
q, err := t.getQueueLocked(qn)
if err != nil {
cb(nil, err)
} else {
cb(q.getStats(), nil)
}
}
return nil
}
func (t *taskqueueImpl) SetConstraints(c *tq.Constraints) error {
t.setConstraints(c)
return nil
}
func (t *taskqueueImpl) Constraints() tq.Constraints {
return t.getConstraints()
}
func (t *taskqueueImpl) GetTestable() tq.Testable { return &taskQueueTestable{t.ns, t} }
/////////////////////////////// taskqueueTxnImpl ///////////////////////////////
type taskqueueTxnImpl struct {
*txnTaskQueueData
ctx context.Context
ns string
}
var _ tq.RawInterface = (*taskqueueTxnImpl)(nil)
func (t *taskqueueTxnImpl) addLocked(task *tq.Task, taskName, queueName string) (*tq.Task, error) {
numTasks := 0
for _, vs := range t.anony {
numTasks += len(vs)
}
if numTasks+1 > 5 {
// transactional tasks are actually implemented 'for real' as Actions which
// ride on the datastore. The current datastore implementation only allows
// a maximum of 5 Actions per transaction, and more than that result in a
// BAD_REQUEST.
return nil, errBadRequest
}
toSched := prepTask(t.ctx, task, taskName, queueName, t.ns)
t.anony[queueName] = append(t.anony[queueName], toSched)
return toSched.Duplicate(), nil
}
func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
if err := assertTxnValid(t.ctx); err != nil {
return err
}
// Reject the entire batch if at least one task is bad. That's how prod API
// behaves too.
if err := checkManyTasks(tasks, true); err != nil {
return err
}
// Generate names for all tasks.
names := make([]string, len(tasks))
t.parent.lock.Lock()
q, err := t.parent.getQueueLocked(queueName)
if err == nil {
queueName = q.name
for i := range tasks {
names[i] = q.genTaskName()
}
}
t.parent.lock.Unlock()
if err != nil {
return err
}
t.lock.Lock()
defer t.lock.Unlock()
for i, task := range tasks {
cb(t.addLocked(task, names[i], queueName))
}
return nil
}
func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error {
return errors.New("taskqueue: cannot DeleteMulti from a transaction")
}
func (t *taskqueueTxnImpl) Lease(maxTasks int, queueName string, leaseTime time.Duration) ([]*tq.Task, error) {
return nil, errors.New("taskqueue: cannot Lease from a transaction")
}
func (t *taskqueueTxnImpl) LeaseByTag(maxTasks int, queueName string, leaseTime time.Duration, tag string) ([]*tq.Task, error) {
return nil, errors.New("taskqueue: cannot LeaseByTag from a transaction")
}
func (t *taskqueueTxnImpl) ModifyLease(task *tq.Task, queueName string, leaseTime time.Duration) error {
return errors.New("taskqueue: cannot ModifyLease from a transaction")
}
func (t *taskqueueTxnImpl) Constraints() tq.Constraints {
return t.parent.getConstraints()
}
func (t *taskqueueTxnImpl) Purge(string) error {
return errors.New("taskqueue: cannot Purge from a transaction")
}
func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error {
return errors.New("taskqueue: cannot Stats from a transaction")
}
func (t *taskqueueTxnImpl) GetTestable() tq.Testable { return &taskQueueTestable{t.ns, t} }
////////////////////////// private functions ///////////////////////////////////
// checkTask ensures the task properties (in particular name and method, as
// passed by the user) are acceptable.
//
// Only empty name is allowed in transactions (the name will be auto generated
// later).
func checkTask(task *tq.Task, isTxn bool) error {
switch {
case task == nil:
return fmt.Errorf("taskqueue: the task can't be nil")
case isTxn && task.Name != "":
return fmt.Errorf("taskqueue: INVALID_TASK_NAME: cannot add named task %q in transaction", task.Name)
case task.Name != "" && !validTaskName.MatchString(task.Name):
return errInvalidTaskName
}
switch task.Method {
case "", "POST", "PUT", "PULL", "GET", "HEAD", "DELETE":
// good methods
default:
return fmt.Errorf("taskqueue: bad method %q", task.Method)
}
return nil
}
// checkManyTasks is a batch variant of checkTask that returns a multi error.
func checkManyTasks(tasks []*tq.Task, isTxn bool) error {
lme := errors.NewLazyMultiError(len(tasks))
for i, t := range tasks {
lme.Assign(i, checkTask(t, isTxn))
}
return lme.Get()
}
// prepTask clones 'task' and fills in its properties (including name).
//
// We need to clone the task, since per Task Queues API AddMulti method returns
// a modified copy of the task, without actually touching the original.
//
// Assumes the passed is already validated via checkTask. It overrides whatever
// name is specified in task.Name with taskName.
func prepTask(c context.Context, task *tq.Task, taskName, queueName, ns string) *tq.Task {
if taskName == "" {
panic("taskqueue: taskName should be auto-generated already, if necessary")
}
toSched := task.Duplicate()
toSched.Name = taskName
if toSched.ETA.IsZero() {
toSched.ETA = clock.Now(c).Add(toSched.Delay)
} else if toSched.Delay != 0 {
panic("taskqueue: both Delay and ETA are set")
}
toSched.Delay = 0
switch toSched.Method {
// Methods that can have payloads.
case "":
toSched.Method = "POST"
fallthrough
case "POST", "PUT", "PULL":
break
// Methods that can not have payloads.
case "GET", "HEAD", "DELETE":
toSched.Payload = nil
default:
panic("taskqueue: task.Method should have been validated already")
}
// PULL tasks have no HTTP related stuff in them (Path and Header).
if toSched.Method == "PULL" {
toSched.Path = ""
toSched.Header = nil
} else {
if toSched.Path == "" {
toSched.Path = "/_ah/queue/" + queueName
}
if _, ok := toSched.Header[currentNamespace]; !ok {
if ns != "" {
if toSched.Header == nil {
toSched.Header = http.Header{}
}
toSched.Header[currentNamespace] = []string{ns}
}
}
// TODO(riannucci): implement DefaultNamespace
}
return toSched
}
func taskNamespace(task *tq.Task) string { return task.Header.Get(currentNamespace) }