[tq] Enable task deletion.

Enable named task deletion. This generalizes the task queue batching
function.

Add the concept of a name suffix to the task. This allows the user to
supply information without discarding sharding utility.

BUG=chromium:751925
TEST=unit

Review-Url: https://codereview.chromium.org/2986373002
diff --git a/appengine/tq/tq.go b/appengine/tq/tq.go
index 6f974dd..397c465 100644
--- a/appengine/tq/tq.go
+++ b/appengine/tq/tq.go
@@ -59,12 +59,28 @@
 	// Tasks are routed based on type of the payload message, see RegisterTask.
 	Payload proto.Message
 
+	// NamePrefix, if not empty, is a string that will be prefixed to the task's
+	// name. Characters in NamePrefix must be appropriate task queue name
+	// characters. NamePrefix can be useful because the Task Queue system allows
+	// users to search for tasks by prefix.
+	//
+	// Lexicographically close names can cause hot spots in the Task Queues
+	// backend. If NamePrefix is specified, users should try and ensure that
+	// it is friendly to sharding (e.g., begins with a hash string).
+	//
+	// Setting NamePrefix and/or DeduplicationKey will result in a named task
+	// being generated. This task can be cancelled using DeleteTask.
+	NamePrefix string
+
 	// DeduplicationKey is optional unique key of the task.
 	//
 	// If a task of a given proto type with a given key has already been enqueued
 	// recently, this task will be silently ignored.
 	//
 	// Such tasks can only be used outside of transactions.
+	//
+	// Setting NamePrefix and/or DeduplicationKey will result in a named task
+	// being generated. This task can be cancelled using DeleteTask.
 	DeduplicationKey string
 
 	// Title is optional string that identifies the task in HTTP logs.
@@ -93,6 +109,40 @@
 	RetryOptions *taskqueue.RetryOptions
 }
 
+// Name generates and returns the task's name.
+//
+// If the task is not a named task (doesn't have NamePrefix or DeduplicationKey
+// set), this will return an empty string.
+func (task *Task) Name() string {
+	if task.NamePrefix == "" && task.DeduplicationKey == "" {
+		return ""
+	}
+
+	parts := make([]string, 0, 2)
+
+	if task.NamePrefix != "" {
+		parts = append(parts, task.NamePrefix)
+	}
+
+	// There's some weird restrictions on what characters are allowed inside task
+	// names. Lexicographically close names also cause hot spot problems in the
+	// Task Queues backend. To avoid these two issues, we always use SHA256 hashes
+	// as task names. Also each task kind owns its own namespace of deduplication
+	// keys, so add task type to the digest as well.
+	if task.DeduplicationKey != "" {
+		h := sha256.New()
+		if task.Payload == nil {
+			panic("task must have a Payload")
+		}
+		h.Write([]byte(proto.MessageName(task.Payload)))
+		h.Write([]byte{0})
+		h.Write([]byte(task.DeduplicationKey))
+		parts = append(parts, hex.EncodeToString(h.Sum(nil)))
+	}
+
+	return strings.Join(parts, "-")
+}
+
 // Handler is called to handle one enqueued task.
 //
 // The passed context is produced by a middleware chain installed with
@@ -144,19 +194,22 @@
 	}
 }
 
-// AddTask submits given tasks to an appropriate task queue.
+// runBatchesPerQueue is a generic parallel task distributor. It solves the
+// problems that:
+//	- "tasks" may be assigned to different queues, and tasks assigned to the
+//    same queue should be batched together.
+//	- Any given batch may exceed queue operation limits, and thus needs to be
+//	  broken into multiple operations on sub-batches.
 //
-// It means, at some later time in some other GAE process, callbacks registered
-// as handlers for corresponding proto types will be called.
-//
-// If the given context is transactional, inherits the transaction. Note if
-// running outside of a transaction and multiple tasks are passed, the operation
-// is not atomic: it returns an error if at least one enqueue operation failed
-// (there's no way to figure out which one exactly).
-//
-// Returns only transient errors. Unlike regular Task Queue's Add,
-// ErrTaskAlreadyAdded is not considered an error.
-func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
+// fn is called for each sub-batch assigned to each queue. All resulting errors
+// are then flattened. If no fn invocation returns any errors, nil will be
+// returned. If a single error is returned, this function will return that
+// error. If an errors.MultiError is returned, it and any embedded
+// MultiError (recursively) will be flattened into a single MultiError
+// containing only the non-nil errors. This simplifies user expectations.
+func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task,
+	fn func(c context.Context, queue string, tasks []*taskqueue.Task) error) error {
+
 	if len(tasks) == 0 {
 		return nil
 	}
@@ -167,10 +220,7 @@
 		if err != nil {
 			return err
 		}
-		if err := taskqueue.Add(c, queue, t); err != nil {
-			if err == taskqueue.ErrTaskAlreadyAdded {
-				return nil
-			}
+		if err := fn(c, queue, []*taskqueue.Task{t}); err != nil {
 			return transient.Tag.Apply(err)
 		}
 		return nil
@@ -187,42 +237,82 @@
 
 	// Enqueue in parallel, per-queue, split into batches based on Task Queue
 	// RPC limits (100 tasks per batch).
+	const maxBatchSize = 100
 	errs := make(chan error)
 	ops := 0
 	for q, tasks := range perQueue {
 		for len(tasks) > 0 {
-			count := 100
+			count := maxBatchSize
 			if count > len(tasks) {
 				count = len(tasks)
 			}
 			go func(q string, batch []*taskqueue.Task) {
-				errs <- taskqueue.Add(c, q, batch...)
+				errs <- fn(c, q, batch)
 			}(q, tasks[:count])
 			tasks = tasks[count:]
 			ops++
 		}
 	}
 
-	// Gather all errors throwing away ErrTaskAlreadyAdded.
-	var all errors.MultiError
+	all := errors.NewLazyMultiError(ops)
 	for i := 0; i < ops; i++ {
 		err := <-errs
-		if merr, yep := err.(errors.MultiError); yep {
-			for _, e := range merr {
-				if e != nil && e != taskqueue.ErrTaskAlreadyAdded {
-					all = append(all, e)
-				}
-			}
-		} else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
-			all = append(all, err)
+		if err != nil {
+			all.Assign(i, err)
 		}
 	}
 
-	if len(all) == 0 {
-		return nil
+	if err := flattenErrors(all.Get()); err != nil {
+		return transient.Tag.Apply(err)
 	}
+	return nil
+}
 
-	return transient.Tag.Apply(all)
+// AddTask submits given tasks to an appropriate task queue.
+//
+// It means, at some later time in some other GAE process, callbacks registered
+// as handlers for corresponding proto types will be called.
+//
+// If the given context is transactional or namespaced, inherits the
+// transaction/namespace. Note if running outside of a transaction and multiple
+// tasks are passed, the operation is not atomic: it returns an error if at
+// least one enqueue operation failed (there's no way to figure out which one
+// exactly).
+//
+// Returns only transient errors. Unlike regular Task Queue's Add,
+// ErrTaskAlreadyAdded is not considered an error.
+func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
+	return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error {
+		if err := taskqueue.Add(c, queue, tasks...); err != nil {
+			return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded)
+		}
+		return nil
+	})
+}
+
+// DeleteTask deletes the specified tasks from their queues.
+//
+// If the given context is transactional or namespaced, inherits the
+// transaction/namespace. Note if running outside of a transaction and multiple
+// tasks are passed, the operation is not atomic: it returns an error if at
+// least one enqueue operation failed (there's no way to figure out which one
+// exactly).
+//
+// Returns only transient errors. Unlike regular Task Queue's Delete,
+// attempts to delete an unknown or tombstoned task are not considered errors.
+func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error {
+	return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error {
+		return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), func(err error) bool {
+			// Currently, the best way to detect an attempt to delete an unknown task
+			// is to check the string with tolerable error message phrases.
+			for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTONED_TASK"} {
+				if strings.Contains(err.Error(), phrase) {
+					return true
+				}
+			}
+			return false
+		})
+	})
 }
 
 // InstallRoutes installs appropriate HTTP routes in the router.
@@ -276,23 +366,9 @@
 		retryOpts = task.RetryOptions
 	}
 
-	// There's some weird restrictions on what characters are allowed inside task
-	// names. Lexicographically close names also cause hot spot problems in the
-	// Task Queues backend. To avoid these two issues, we always use SHA256 hashes
-	// as task names. Also each task kind owns its own namespace of deduplication
-	// keys, so add task type to the digest as well.
-	name := ""
-	if task.DeduplicationKey != "" {
-		h := sha256.New()
-		h.Write([]byte(handler.typeName))
-		h.Write([]byte{0})
-		h.Write([]byte(task.DeduplicationKey))
-		name = hex.EncodeToString(h.Sum(nil))
-	}
-
 	return &taskqueue.Task{
 		Path:         fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
-		Name:         name,
+		Name:         task.Name(),
 		Method:       "POST",
 		Payload:      blob,
 		ETA:          task.ETA,
@@ -414,3 +490,32 @@
 
 	return task, nil
 }
+
+////////////////////////////////////////////////////////////////////////////////
+
+// flattenErrors collapses a multi-dimensional MultiError space into a flat
+// MultiError, removing "nil" errors.
+//
+// If err is not an errors.MultiError, will return err directly.
+//
+// As a special case, if merr contains no non-nil errors, nil will be returned.
+func flattenErrors(err error) error {
+	var ret errors.MultiError
+	flattenErrorsRec(&ret, err)
+	if len(ret) == 0 {
+		return nil
+	}
+	return ret
+}
+
+func flattenErrorsRec(ret *errors.MultiError, err error) {
+	switch et := err.(type) {
+	case nil:
+	case errors.MultiError:
+		for _, e := range et {
+			flattenErrorsRec(ret, e)
+		}
+	default:
+		*ret = append(*ret, et)
+	}
+}
diff --git a/appengine/tq/tq_test.go b/appengine/tq/tq_test.go
index e3176d0..316be4d 100644
--- a/appengine/tq/tq_test.go
+++ b/appengine/tq/tq_test.go
@@ -57,6 +57,19 @@
 				next(c)
 			}))
 		}
+		runTasks := func(ctx context.Context) []int {
+			var codes []int
+			for _, tasks := range taskqueue.GetTestable(ctx).GetScheduledTasks() {
+				for _, task := range tasks {
+					// Execute the task.
+					req := httptest.NewRequest("POST", "http://example.com"+task.Path, bytes.NewReader(task.Payload))
+					rw := httptest.NewRecorder()
+					r.ServeHTTP(rw, req)
+					codes = append(codes, rw.Code)
+				}
+			}
+			return codes
+		}
 
 		Convey("Single task", func() {
 			var calls []proto.Message
@@ -74,6 +87,7 @@
 			err := d.AddTask(ctx, &Task{
 				Payload:          &duration.Duration{Seconds: 123},
 				DeduplicationKey: "abc",
+				NamePrefix:       "prefix",
 				Title:            "abc-def",
 				Delay:            30 * time.Second,
 			})
@@ -81,7 +95,7 @@
 
 			// Added the task.
 			expectedPath := "/internal/tasks/default/abc-def"
-			expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
+			expectedName := "prefix-afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
 			expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123.000s"}`)
 			tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
 			So(tasks, ShouldResemble, taskqueue.QueueData{
@@ -101,6 +115,7 @@
 			err = d.AddTask(ctx, &Task{
 				Payload:          &duration.Duration{Seconds: 123},
 				DeduplicationKey: "abc",
+				NamePrefix:       "prefix",
 			})
 			So(err, ShouldBeNil)
 
@@ -108,16 +123,35 @@
 			tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
 			So(len(tasks["default"]), ShouldResemble, 1)
 
-			// Execute the task.
-			req := httptest.NewRequest("POST", "http://example.com"+expectedPath, bytes.NewReader(expectedBody))
-			rw := httptest.NewRecorder()
-			r.ServeHTTP(rw, req)
-
-			// Executed.
-			So(calls, ShouldResemble, []proto.Message{
-				&duration.Duration{Seconds: 123},
+			Convey("Executed", func() {
+				// Execute the task.
+				So(runTasks(ctx), ShouldResemble, []int{200})
+				So(calls, ShouldResemble, []proto.Message{
+					&duration.Duration{Seconds: 123},
+				})
 			})
-			So(rw.Code, ShouldEqual, 200)
+
+			Convey("Deleted", func() {
+				So(d.DeleteTask(ctx, &Task{
+					Payload:          &duration.Duration{Seconds: 123},
+					DeduplicationKey: "abc",
+					NamePrefix:       "prefix",
+				}), ShouldBeNil)
+
+				// Did not execute any tasks.
+				So(runTasks(ctx), ShouldHaveLength, 0)
+				So(calls, ShouldHaveLength, 0)
+			})
+		})
+
+		Convey("Deleting unknown task returns nil", func() {
+			handler := func(c context.Context, payload proto.Message, execCount int) error { return nil }
+			d.RegisterTask(&duration.Duration{}, handler, "default", nil)
+
+			So(d.DeleteTask(ctx, &Task{
+				Payload:          &duration.Duration{Seconds: 123},
+				DeduplicationKey: "something",
+			}), ShouldBeNil)
 		})
 
 		Convey("Many tasks", func() {
@@ -166,6 +200,10 @@
 			}
 			So(len(delaysDefault), ShouldEqual, 100)
 			So(len(delaysAnotherQ), ShouldEqual, 100)
+
+			// Delete the tasks.
+			So(d.DeleteTask(ctx, t...), ShouldBeNil)
+			So(runTasks(ctx), ShouldHaveLength, 0)
 		})
 
 		Convey("Execution errors", func() {