blob: 3a76dc0bd2e10d37692f797eb9c4ce4a70887c7e [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package memory
import (
"errors"
"fmt"
"net/http"
"sync"
"sync/atomic"
"golang.org/x/net/context"
ds "github.com/luci/gae/service/datastore"
tq "github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/stringset"
)
var (
currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace")
)
//////////////////////////////// taskQueueData /////////////////////////////////
type taskQueueData struct {
sync.Mutex
named tq.QueueData
archived tq.QueueData
}
var _ interface {
memContextObj
tq.Testable
} = (*taskQueueData)(nil)
func newTaskQueueData() memContextObj {
return &taskQueueData{
named: tq.QueueData{"default": {}},
archived: tq.QueueData{"default": {}},
}
}
func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
func (t *taskQueueData) endTxn() {}
func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
txn := obj.(*txnTaskQueueData)
for qn, tasks := range txn.anony {
for _, tsk := range tasks {
tsk.Name = mkName(c, tsk.Name, t.named[qn])
t.named[qn][tsk.Name] = tsk
}
}
txn.anony = nil
}
func (t *taskQueueData) mkTxn(*ds.TransactionOptions) memContextObj {
return &txnTaskQueueData{
parent: t,
anony: tq.AnonymousQueueData{},
}
}
func (t *taskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
return nil
}
func (t *taskQueueData) CreateQueue(queueName string) {
t.Lock()
defer t.Unlock()
if _, ok := t.named[queueName]; ok {
panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName))
}
t.named[queueName] = map[string]*tq.Task{}
t.archived[queueName] = map[string]*tq.Task{}
}
func (t *taskQueueData) GetScheduledTasks() tq.QueueData {
t.Lock()
defer t.Unlock()
return dupQueue(t.named)
}
func (t *taskQueueData) GetTombstonedTasks() tq.QueueData {
t.Lock()
defer t.Unlock()
return dupQueue(t.archived)
}
func (t *taskQueueData) resetTasksWithLock() {
for queueName := range t.named {
t.named[queueName] = map[string]*tq.Task{}
t.archived[queueName] = map[string]*tq.Task{}
}
}
func (t *taskQueueData) ResetTasks() {
t.Lock()
defer t.Unlock()
t.resetTasksWithLock()
}
func (t *taskQueueData) getQueueNameLocked(queueName string) (string, error) {
if queueName == "" {
queueName = "default"
}
if _, ok := t.named[queueName]; !ok {
return "", errors.New("UNKNOWN_QUEUE")
}
return queueName, nil
}
func (t *taskQueueData) purgeLocked(queueName string) error {
queueName, err := t.getQueueNameLocked(queueName)
if err != nil {
return err
}
t.named[queueName] = map[string]*tq.Task{}
t.archived[queueName] = map[string]*tq.Task{}
return nil
}
var tqOkMethods = stringset.NewFromSlice("GET", "POST", "HEAD", "PUT", "DELETE")
func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, queueName string) (*tq.Task, error) {
toSched := task.Duplicate()
if toSched.Path == "" {
toSched.Path = "/_ah/queue/" + queueName
}
if toSched.ETA.IsZero() {
toSched.ETA = clock.Now(c).Add(toSched.Delay)
} else if toSched.Delay != 0 {
panic("taskqueue: both Delay and ETA are set")
}
toSched.Delay = 0
if toSched.Method == "" {
toSched.Method = "POST"
}
if !tqOkMethods.Has(toSched.Method) {
return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Method)
}
if toSched.Method != "POST" && toSched.Method != "PUT" {
toSched.Payload = nil
}
if _, ok := toSched.Header[currentNamespace]; !ok {
if ns != "" {
if toSched.Header == nil {
toSched.Header = http.Header{}
}
toSched.Header[currentNamespace] = []string{ns}
}
}
// TODO(riannucci): implement DefaultNamespace
if toSched.Name == "" {
toSched.Name = mkName(c, "", t.named[queueName])
} else {
if !validTaskName.MatchString(toSched.Name) {
return nil, errors.New("INVALID_TASK_NAME")
}
}
return toSched, nil
}
/////////////////////////////// txnTaskQueueData ///////////////////////////////
type txnTaskQueueData struct {
lock sync.Mutex
// boolean 0 or 1, use atomic.*Int32 to access.
closed int32
anony tq.AnonymousQueueData
parent *taskQueueData
}
var _ interface {
memContextObj
tq.Testable
} = (*txnTaskQueueData)(nil)
func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
impossible(fmt.Errorf("cannot apply nested transaction"))
}
func (t *txnTaskQueueData) mkTxn(*ds.TransactionOptions) memContextObj {
impossible(fmt.Errorf("cannot start nested transaction"))
return nil
}
func (t *txnTaskQueueData) endTxn() {
if atomic.LoadInt32(&t.closed) == 1 {
panic("cannot end transaction twice")
}
atomic.StoreInt32(&t.closed, 1)
}
func (t *txnTaskQueueData) ResetTasks() {
t.Lock()
defer t.Unlock()
for queuename := range t.anony {
t.anony[queuename] = nil
}
t.parent.resetTasksWithLock()
}
func (t *txnTaskQueueData) Lock() {
t.lock.Lock()
t.parent.Lock()
}
func (t *txnTaskQueueData) Unlock() {
t.parent.Unlock()
t.lock.Unlock()
}
func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
t.Lock()
defer t.Unlock()
ret := make(tq.AnonymousQueueData, len(t.anony))
for k, vs := range t.anony {
ret[k] = make([]*tq.Task, len(vs))
for i, v := range vs {
tsk := v.Duplicate()
tsk.Name = ""
ret[k][i] = tsk
}
}
return ret
}
func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData {
return t.parent.GetTombstonedTasks()
}
func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData {
return t.parent.GetScheduledTasks()
}
func (t *txnTaskQueueData) CreateQueue(queueName string) {
t.parent.CreateQueue(queueName)
}