[tq] Enable task deletion.
Enable named task deletion. This generalizes the task queue batching
function.
Add the concept of a name suffix to the task. This allows the user to
supply information without discarding sharding utility.
BUG=chromium:751925
TEST=unit
Review-Url: https://codereview.chromium.org/2986373002
diff --git a/appengine/tq/tq.go b/appengine/tq/tq.go
index 6f974dd..397c465 100644
--- a/appengine/tq/tq.go
+++ b/appengine/tq/tq.go
@@ -59,12 +59,28 @@
// Tasks are routed based on type of the payload message, see RegisterTask.
Payload proto.Message
+ // NamePrefix, if not empty, is a string that will be prefixed to the task's
+ // name. Characters in NamePrefix must be appropriate task queue name
+ // characters. NamePrefix can be useful because the Task Queue system allows
+ // users to search for tasks by prefix.
+ //
+ // Lexicographically close names can cause hot spots in the Task Queues
+ // backend. If NamePrefix is specified, users should try and ensure that
+ // it is friendly to sharding (e.g., begins with a hash string).
+ //
+ // Setting NamePrefix and/or DeduplicationKey will result in a named task
+ // being generated. This task can be cancelled using DeleteTask.
+ NamePrefix string
+
// DeduplicationKey is optional unique key of the task.
//
// If a task of a given proto type with a given key has already been enqueued
// recently, this task will be silently ignored.
//
// Such tasks can only be used outside of transactions.
+ //
+ // Setting NamePrefix and/or DeduplicationKey will result in a named task
+ // being generated. This task can be cancelled using DeleteTask.
DeduplicationKey string
// Title is optional string that identifies the task in HTTP logs.
@@ -93,6 +109,40 @@
RetryOptions *taskqueue.RetryOptions
}
+// Name generates and returns the task's name.
+//
+// If the task is not a named task (doesn't have NamePrefix or DeduplicationKey
+// set), this will return an empty string.
+func (task *Task) Name() string {
+ if task.NamePrefix == "" && task.DeduplicationKey == "" {
+ return ""
+ }
+
+ parts := make([]string, 0, 2)
+
+ if task.NamePrefix != "" {
+ parts = append(parts, task.NamePrefix)
+ }
+
+ // There's some weird restrictions on what characters are allowed inside task
+ // names. Lexicographically close names also cause hot spot problems in the
+ // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
+ // as task names. Also each task kind owns its own namespace of deduplication
+ // keys, so add task type to the digest as well.
+ if task.DeduplicationKey != "" {
+ h := sha256.New()
+ if task.Payload == nil {
+ panic("task must have a Payload")
+ }
+ h.Write([]byte(proto.MessageName(task.Payload)))
+ h.Write([]byte{0})
+ h.Write([]byte(task.DeduplicationKey))
+ parts = append(parts, hex.EncodeToString(h.Sum(nil)))
+ }
+
+ return strings.Join(parts, "-")
+}
+
// Handler is called to handle one enqueued task.
//
// The passed context is produced by a middleware chain installed with
@@ -144,19 +194,22 @@
}
}
-// AddTask submits given tasks to an appropriate task queue.
+// runBatchesPerQueue is a generic parallel task distributor. It solves the
+// problems that:
+// - "tasks" may be assigned to different queues, and tasks assigned to the
+// same queue should be batched together.
+// - Any given batch may exceed queue operation limits, and thus needs to be
+// broken into multiple operations on sub-batches.
//
-// It means, at some later time in some other GAE process, callbacks registered
-// as handlers for corresponding proto types will be called.
-//
-// If the given context is transactional, inherits the transaction. Note if
-// running outside of a transaction and multiple tasks are passed, the operation
-// is not atomic: it returns an error if at least one enqueue operation failed
-// (there's no way to figure out which one exactly).
-//
-// Returns only transient errors. Unlike regular Task Queue's Add,
-// ErrTaskAlreadyAdded is not considered an error.
-func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
+// fn is called for each sub-batch assigned to each queue. All resulting errors
+// are then flattened. If no fn invocation returns any errors, nil will be
+// returned. If a single error is returned, this function will return that
+// error. If an errors.MultiError is returned, it and any embedded
+// MultiError (recursively) will be flattened into a single MultiError
+// containing only the non-nil errors. This simplifies user expectations.
+func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task,
+ fn func(c context.Context, queue string, tasks []*taskqueue.Task) error) error {
+
if len(tasks) == 0 {
return nil
}
@@ -167,10 +220,7 @@
if err != nil {
return err
}
- if err := taskqueue.Add(c, queue, t); err != nil {
- if err == taskqueue.ErrTaskAlreadyAdded {
- return nil
- }
+ if err := fn(c, queue, []*taskqueue.Task{t}); err != nil {
return transient.Tag.Apply(err)
}
return nil
@@ -187,42 +237,82 @@
// Enqueue in parallel, per-queue, split into batches based on Task Queue
// RPC limits (100 tasks per batch).
+ const maxBatchSize = 100
errs := make(chan error)
ops := 0
for q, tasks := range perQueue {
for len(tasks) > 0 {
- count := 100
+ count := maxBatchSize
if count > len(tasks) {
count = len(tasks)
}
go func(q string, batch []*taskqueue.Task) {
- errs <- taskqueue.Add(c, q, batch...)
+ errs <- fn(c, q, batch)
}(q, tasks[:count])
tasks = tasks[count:]
ops++
}
}
- // Gather all errors throwing away ErrTaskAlreadyAdded.
- var all errors.MultiError
+ all := errors.NewLazyMultiError(ops)
for i := 0; i < ops; i++ {
err := <-errs
- if merr, yep := err.(errors.MultiError); yep {
- for _, e := range merr {
- if e != nil && e != taskqueue.ErrTaskAlreadyAdded {
- all = append(all, e)
- }
- }
- } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
- all = append(all, err)
+ if err != nil {
+ all.Assign(i, err)
}
}
- if len(all) == 0 {
- return nil
+ if err := flattenErrors(all.Get()); err != nil {
+ return transient.Tag.Apply(err)
}
+ return nil
+}
- return transient.Tag.Apply(all)
+// AddTask submits given tasks to an appropriate task queue.
+//
+// It means, at some later time in some other GAE process, callbacks registered
+// as handlers for corresponding proto types will be called.
+//
+// If the given context is transactional or namespaced, inherits the
+// transaction/namespace. Note if running outside of a transaction and multiple
+// tasks are passed, the operation is not atomic: it returns an error if at
+// least one enqueue operation failed (there's no way to figure out which one
+// exactly).
+//
+// Returns only transient errors. Unlike regular Task Queue's Add,
+// ErrTaskAlreadyAdded is not considered an error.
+func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
+ return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error {
+ if err := taskqueue.Add(c, queue, tasks...); err != nil {
+ return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded)
+ }
+ return nil
+ })
+}
+
+// DeleteTask deletes the specified tasks from their queues.
+//
+// If the given context is transactional or namespaced, inherits the
+// transaction/namespace. Note if running outside of a transaction and multiple
+// tasks are passed, the operation is not atomic: it returns an error if at
+// least one enqueue operation failed (there's no way to figure out which one
+// exactly).
+//
+// Returns only transient errors. Unlike regular Task Queue's Delete,
+// attempts to delete an unknown or tombstoned task are not considered errors.
+func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error {
+ return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error {
+ return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), func(err error) bool {
+ // Currently, the best way to detect an attempt to delete an unknown task
+ // is to check the string with tolerable error message phrases.
+ for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTONED_TASK"} {
+ if strings.Contains(err.Error(), phrase) {
+ return true
+ }
+ }
+ return false
+ })
+ })
}
// InstallRoutes installs appropriate HTTP routes in the router.
@@ -276,23 +366,9 @@
retryOpts = task.RetryOptions
}
- // There's some weird restrictions on what characters are allowed inside task
- // names. Lexicographically close names also cause hot spot problems in the
- // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
- // as task names. Also each task kind owns its own namespace of deduplication
- // keys, so add task type to the digest as well.
- name := ""
- if task.DeduplicationKey != "" {
- h := sha256.New()
- h.Write([]byte(handler.typeName))
- h.Write([]byte{0})
- h.Write([]byte(task.DeduplicationKey))
- name = hex.EncodeToString(h.Sum(nil))
- }
-
return &taskqueue.Task{
Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
- Name: name,
+ Name: task.Name(),
Method: "POST",
Payload: blob,
ETA: task.ETA,
@@ -414,3 +490,32 @@
return task, nil
}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// flattenErrors collapses a multi-dimensional MultiError space into a flat
+// MultiError, removing "nil" errors.
+//
+// If err is not an errors.MultiError, will return err directly.
+//
+// As a special case, if merr contains no non-nil errors, nil will be returned.
+func flattenErrors(err error) error {
+ var ret errors.MultiError
+ flattenErrorsRec(&ret, err)
+ if len(ret) == 0 {
+ return nil
+ }
+ return ret
+}
+
+func flattenErrorsRec(ret *errors.MultiError, err error) {
+ switch et := err.(type) {
+ case nil:
+ case errors.MultiError:
+ for _, e := range et {
+ flattenErrorsRec(ret, e)
+ }
+ default:
+ *ret = append(*ret, et)
+ }
+}
diff --git a/appengine/tq/tq_test.go b/appengine/tq/tq_test.go
index e3176d0..316be4d 100644
--- a/appengine/tq/tq_test.go
+++ b/appengine/tq/tq_test.go
@@ -57,6 +57,19 @@
next(c)
}))
}
+ runTasks := func(ctx context.Context) []int {
+ var codes []int
+ for _, tasks := range taskqueue.GetTestable(ctx).GetScheduledTasks() {
+ for _, task := range tasks {
+ // Execute the task.
+ req := httptest.NewRequest("POST", "http://example.com"+task.Path, bytes.NewReader(task.Payload))
+ rw := httptest.NewRecorder()
+ r.ServeHTTP(rw, req)
+ codes = append(codes, rw.Code)
+ }
+ }
+ return codes
+ }
Convey("Single task", func() {
var calls []proto.Message
@@ -74,6 +87,7 @@
err := d.AddTask(ctx, &Task{
Payload: &duration.Duration{Seconds: 123},
DeduplicationKey: "abc",
+ NamePrefix: "prefix",
Title: "abc-def",
Delay: 30 * time.Second,
})
@@ -81,7 +95,7 @@
// Added the task.
expectedPath := "/internal/tasks/default/abc-def"
- expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
+ expectedName := "prefix-afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123.000s"}`)
tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
So(tasks, ShouldResemble, taskqueue.QueueData{
@@ -101,6 +115,7 @@
err = d.AddTask(ctx, &Task{
Payload: &duration.Duration{Seconds: 123},
DeduplicationKey: "abc",
+ NamePrefix: "prefix",
})
So(err, ShouldBeNil)
@@ -108,16 +123,35 @@
tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
So(len(tasks["default"]), ShouldResemble, 1)
- // Execute the task.
- req := httptest.NewRequest("POST", "http://example.com"+expectedPath, bytes.NewReader(expectedBody))
- rw := httptest.NewRecorder()
- r.ServeHTTP(rw, req)
-
- // Executed.
- So(calls, ShouldResemble, []proto.Message{
- &duration.Duration{Seconds: 123},
+ Convey("Executed", func() {
+ // Execute the task.
+ So(runTasks(ctx), ShouldResemble, []int{200})
+ So(calls, ShouldResemble, []proto.Message{
+ &duration.Duration{Seconds: 123},
+ })
})
- So(rw.Code, ShouldEqual, 200)
+
+ Convey("Deleted", func() {
+ So(d.DeleteTask(ctx, &Task{
+ Payload: &duration.Duration{Seconds: 123},
+ DeduplicationKey: "abc",
+ NamePrefix: "prefix",
+ }), ShouldBeNil)
+
+ // Did not execute any tasks.
+ So(runTasks(ctx), ShouldHaveLength, 0)
+ So(calls, ShouldHaveLength, 0)
+ })
+ })
+
+ Convey("Deleting unknown task returns nil", func() {
+ handler := func(c context.Context, payload proto.Message, execCount int) error { return nil }
+ d.RegisterTask(&duration.Duration{}, handler, "default", nil)
+
+ So(d.DeleteTask(ctx, &Task{
+ Payload: &duration.Duration{Seconds: 123},
+ DeduplicationKey: "something",
+ }), ShouldBeNil)
})
Convey("Many tasks", func() {
@@ -166,6 +200,10 @@
}
So(len(delaysDefault), ShouldEqual, 100)
So(len(delaysAnotherQ), ShouldEqual, 100)
+
+ // Delete the tasks.
+ So(d.DeleteTask(ctx, t...), ShouldBeNil)
+ So(runTasks(ctx), ShouldHaveLength, 0)
})
Convey("Execution errors", func() {