| // Copyright 2015 The LUCI Authors. All rights reserved. |
| // Use of this source code is governed under the Apache License, Version 2.0 |
| // that can be found in the LICENSE file. |
| |
| package memory |
| |
| import ( |
| "fmt" |
| "math/rand" |
| "net/http" |
| "testing" |
| "time" |
| |
| ds "github.com/luci/gae/service/datastore" |
| "github.com/luci/gae/service/info" |
| tq "github.com/luci/gae/service/taskqueue" |
| |
| "github.com/luci/luci-go/common/clock" |
| "github.com/luci/luci-go/common/clock/testclock" |
| "github.com/luci/luci-go/common/data/rand/mathrand" |
| |
| "golang.org/x/net/context" |
| |
| . "github.com/luci/luci-go/common/testing/assertions" |
| . "github.com/smartystreets/goconvey/convey" |
| ) |
| |
| func TestTaskQueue(t *testing.T) { |
| t.Parallel() |
| |
| Convey("TaskQueue", t, func() { |
| now := time.Date(2000, time.January, 1, 1, 1, 1, 1, time.UTC) |
| c, tc := testclock.UseTime(context.Background(), now) |
| c = mathrand.Set(c, rand.New(rand.NewSource(clock.Now(c).UnixNano()))) |
| c = Use(c) |
| |
| tqt := tq.GetTestable(c) |
| So(tqt, ShouldNotBeNil) |
| |
| So(tq.Raw(c), ShouldNotBeNil) |
| |
| Convey("implements TQMultiReadWriter", func() { |
| Convey("Add", func() { |
| t := &tq.Task{Path: "/hello/world"} |
| |
| Convey("works", func() { |
| t.Delay = 4 * time.Second |
| t.Header = http.Header{} |
| t.Header.Add("Cat", "tabby") |
| t.Payload = []byte("watwatwat") |
| t.RetryOptions = &tq.RetryOptions{AgeLimit: 7 * time.Second} |
| So(tq.Add(c, "", t), ShouldBeNil) |
| |
| var scheduled *tq.Task |
| for _, t := range tqt.GetScheduledTasks()["default"] { |
| scheduled = t |
| break |
| } |
| So(scheduled, ShouldResemble, &tq.Task{ |
| ETA: now.Add(4 * time.Second), |
| Header: http.Header{"Cat": []string{"tabby"}}, |
| Method: "POST", |
| Name: "7569393727548727334", |
| Path: "/hello/world", |
| Payload: []byte("watwatwat"), |
| RetryOptions: &tq.RetryOptions{AgeLimit: 7 * time.Second}, |
| }) |
| }) |
| |
| Convey("picks up namespace", func() { |
| c, err := info.Namespace(c, "coolNamespace") |
| So(err, ShouldBeNil) |
| |
| t := &tq.Task{} |
| So(tq.Add(c, "", t), ShouldBeNil) |
| So(t.Header, ShouldResemble, http.Header{ |
| "X-Appengine-Current-Namespace": {"coolNamespace"}, |
| }) |
| |
| }) |
| |
| Convey("cannot add to bad queues", func() { |
| So(tq.Add(c, "waaat", nil).Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
| |
| Convey("but you can add Queues when testing", func() { |
| tqt.CreateQueue("waaat") |
| So(tq.Add(c, "waaat", t), ShouldBeNil) |
| |
| Convey("you just can't add them twice", func() { |
| So(func() { tqt.CreateQueue("waaat") }, ShouldPanic) |
| }) |
| }) |
| }) |
| |
| Convey("supplies a URL if it's missing", func() { |
| t.Path = "" |
| So(tq.Add(c, "", t), ShouldBeNil) |
| So(t.Path, ShouldEqual, "/_ah/queue/default") |
| }) |
| |
| Convey("cannot add twice", func() { |
| t.Name = "bob" |
| So(tq.Add(c, "", t), ShouldBeNil) |
| |
| // can't add the same one twice! |
| So(tq.Add(c, "", t), ShouldEqual, tq.ErrTaskAlreadyAdded) |
| }) |
| |
| Convey("cannot add deleted task", func() { |
| t.Name = "bob" |
| So(tq.Add(c, "", t), ShouldBeNil) |
| |
| So(tq.Delete(c, "", t), ShouldBeNil) |
| |
| // can't add a deleted task! |
| So(tq.Add(c, "", t), ShouldEqual, tq.ErrTaskAlreadyAdded) |
| }) |
| |
| Convey("cannot set ETA+Delay", func() { |
| t.ETA = clock.Now(c).Add(time.Hour) |
| tc.Add(time.Second) |
| t.Delay = time.Hour |
| So(func() { |
| So(tq.Add(c, "", t), ShouldBeNil) |
| }, ShouldPanic) |
| }) |
| |
| Convey("must use a reasonable method", func() { |
| t.Method = "Crystal" |
| So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "bad method") |
| }) |
| |
| Convey("payload gets dumped for non POST/PUT methods", func() { |
| t.Method = "HEAD" |
| t.Payload = []byte("coool") |
| So(tq.Add(c, "", t), ShouldBeNil) |
| So(t.Payload, ShouldBeNil) |
| }) |
| |
| Convey("invalid names are rejected", func() { |
| t.Name = "happy times" |
| So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "INVALID_TASK_NAME") |
| }) |
| |
| Convey("AddMulti also works", func() { |
| t2 := t.Duplicate() |
| t2.Path = "/hi/city" |
| |
| expect := []*tq.Task{t, t2} |
| |
| So(tq.Add(c, "default", expect...), ShouldBeNil) |
| So(len(expect), ShouldEqual, 2) |
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 2) |
| |
| for i := range expect { |
| Convey(fmt.Sprintf("task %d: %s", i, expect[i].Path), func() { |
| So(expect[i].Method, ShouldEqual, "POST") |
| So(expect[i].ETA, ShouldHappenOnOrBefore, now) |
| So(len(expect[i].Name), ShouldEqual, 19) |
| }) |
| } |
| |
| Convey("stats work too", func() { |
| delay := -time.Second * 400 |
| |
| t := &tq.Task{Path: "/somewhere"} |
| t.Delay = delay |
| So(tq.Add(c, "", t), ShouldBeNil) |
| |
| stats, err := tq.Stats(c, "") |
| So(err, ShouldBeNil) |
| So(stats[0].Tasks, ShouldEqual, 3) |
| So(stats[0].OldestETA, ShouldHappenOnOrBefore, clock.Now(c).Add(delay)) |
| |
| _, err = tq.Stats(c, "noexist") |
| So(err.Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
| }) |
| |
| Convey("can purge all tasks", func() { |
| So(tq.Add(c, "", &tq.Task{Path: "/wut/nerbs"}), ShouldBeNil) |
| So(tq.Purge(c, ""), ShouldBeNil) |
| |
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0) |
| So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 0) |
| So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 0) |
| |
| Convey("purging a queue which DNE fails", func() { |
| So(tq.Purge(c, "noexist").Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
| }) |
| }) |
| |
| }) |
| }) |
| |
| Convey("Delete", func() { |
| t := &tq.Task{Path: "/hello/world"} |
| So(tq.Add(c, "", t), ShouldBeNil) |
| |
| Convey("works", func() { |
| err := tq.Delete(c, "", t) |
| So(err, ShouldBeNil) |
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0) |
| So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 1) |
| So(tqt.GetTombstonedTasks()["default"][t.Name], ShouldResemble, t) |
| }) |
| |
| Convey("cannot delete a task twice", func() { |
| So(tq.Delete(c, "", t), ShouldBeNil) |
| |
| So(tq.Delete(c, "", t).Error(), ShouldContainSubstring, "TOMBSTONED_TASK") |
| |
| Convey("but you can if you do a reset", func() { |
| tqt.ResetTasks() |
| |
| So(tq.Add(c, "", t), ShouldBeNil) |
| So(tq.Delete(c, "", t), ShouldBeNil) |
| }) |
| }) |
| |
| Convey("cannot delete from bogus queues", func() { |
| err := tq.Delete(c, "wat", t) |
| So(err.Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
| }) |
| |
| Convey("cannot delete a missing task", func() { |
| t.Name = "tarntioarenstyw" |
| err := tq.Delete(c, "", t) |
| So(err.Error(), ShouldContainSubstring, "UNKNOWN_TASK") |
| }) |
| |
| Convey("DeleteMulti also works", func() { |
| t2 := t.Duplicate() |
| t2.Name = "" |
| t2.Path = "/hi/city" |
| So(tq.Add(c, "", t2), ShouldBeNil) |
| |
| Convey("usually works", func() { |
| So(tq.Delete(c, "", t, t2), ShouldBeNil) |
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0) |
| So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 2) |
| }) |
| }) |
| }) |
| }) |
| |
| Convey("works with transactions", func() { |
| t := &tq.Task{Path: "/hello/world"} |
| So(tq.Add(c, "", t), ShouldBeNil) |
| |
| t2 := &tq.Task{Path: "/hi/city"} |
| So(tq.Add(c, "", t2), ShouldBeNil) |
| |
| So(tq.Delete(c, "", t2), ShouldBeNil) |
| |
| Convey("can view regular tasks", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| tqt := tq.Raw(c).GetTestable() |
| |
| So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t) |
| So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2) |
| So(tqt.GetTransactionTasks()["default"], ShouldBeNil) |
| return nil |
| }, nil), ShouldBeNil) |
| }) |
| |
| Convey("can add a new task", func() { |
| t3 := &tq.Task{Path: "/sandwitch/victory"} |
| |
| err := ds.RunInTransaction(c, func(c context.Context) error { |
| tqt := tq.GetTestable(c) |
| |
| So(tq.Add(c, "", t3), ShouldBeNil) |
| So(t3.Name, ShouldEqual, "") |
| |
| So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t) |
| So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2) |
| So(tqt.GetTransactionTasks()["default"][0], ShouldResemble, t3) |
| return nil |
| }, nil) |
| So(err, ShouldBeNil) |
| |
| for _, tsk := range tqt.GetScheduledTasks()["default"] { |
| if tsk.Name == t.Name { |
| So(tsk, ShouldResemble, t) |
| } else { |
| tsk.Name = "" |
| So(tsk, ShouldResemble, t3) |
| } |
| } |
| |
| So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2) |
| So(tqt.GetTransactionTasks()["default"], ShouldBeNil) |
| }) |
| |
| Convey("can add a new task (but reset the state in a test)", func() { |
| t3 := &tq.Task{Path: "/sandwitch/victory"} |
| |
| var txnCtx context.Context |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| txnCtx = c |
| tqt := tq.GetTestable(c) |
| |
| So(tq.Add(c, "", t3), ShouldBeNil) |
| |
| So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t) |
| So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2) |
| So(tqt.GetTransactionTasks()["default"][0], ShouldResemble, t3) |
| |
| tqt.ResetTasks() |
| |
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0) |
| So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 0) |
| So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 0) |
| |
| return nil |
| }, nil), ShouldBeNil) |
| |
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 0) |
| So(len(tqt.GetTombstonedTasks()["default"]), ShouldEqual, 0) |
| So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 0) |
| |
| Convey("and reusing a closed context is bad times", func() { |
| So(tq.Add(txnCtx, "", nil).Error(), ShouldContainSubstring, "expired") |
| }) |
| }) |
| |
| Convey("you can AddMulti as well", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| tqt := tq.GetTestable(c) |
| |
| t.Name = "" |
| tasks := []*tq.Task{t.Duplicate(), t.Duplicate(), t.Duplicate()} |
| So(tq.Add(c, "", tasks...), ShouldBeNil) |
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 1) |
| So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 3) |
| return nil |
| }, nil), ShouldBeNil) |
| So(len(tqt.GetScheduledTasks()["default"]), ShouldEqual, 4) |
| So(len(tqt.GetTransactionTasks()["default"]), ShouldEqual, 0) |
| }) |
| |
| Convey("unless you add too many things", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| for i := 0; i < 5; i++ { |
| So(tq.Add(c, "", t.Duplicate()), ShouldBeNil) |
| } |
| So(tq.Add(c, "", t).Error(), ShouldContainSubstring, "BAD_REQUEST") |
| return nil |
| }, nil), ShouldBeNil) |
| }) |
| |
| Convey("unless you Add to a bad queue", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| So(tq.Add(c, "meat", t).Error(), ShouldContainSubstring, "UNKNOWN_QUEUE") |
| |
| Convey("unless you add it!", func() { |
| tq.Raw(c).GetTestable().CreateQueue("meat") |
| So(tq.Add(c, "meat", t), ShouldBeNil) |
| }) |
| |
| return nil |
| }, nil), ShouldBeNil) |
| }) |
| |
| Convey("No other features are available, however", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| So(tq.Delete(c, "", t).Error(), ShouldContainSubstring, "cannot DeleteMulti from a transaction") |
| So(tq.Purge(c, "").Error(), ShouldContainSubstring, "cannot Purge from a transaction") |
| _, err := tq.Stats(c, "") |
| So(err.Error(), ShouldContainSubstring, "cannot Stats from a transaction") |
| return nil |
| }, nil), ShouldBeNil) |
| }) |
| |
| Convey("can get the non-transactional taskqueue context though", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| noTxn := ds.WithoutTransaction(c) |
| So(tq.Delete(noTxn, "", t), ShouldBeNil) |
| So(tq.Purge(noTxn, ""), ShouldBeNil) |
| _, err := tq.Stats(noTxn, "") |
| So(err, ShouldBeNil) |
| return nil |
| }, nil), ShouldBeNil) |
| }) |
| |
| Convey("adding a new task only happens if we don't errout", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| t3 := &tq.Task{Path: "/sandwitch/victory"} |
| So(tq.Add(c, "", t3), ShouldBeNil) |
| return fmt.Errorf("nooooo") |
| }, nil), ShouldErrLike, "nooooo") |
| |
| So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t) |
| So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2) |
| So(tqt.GetTransactionTasks()["default"], ShouldBeNil) |
| }) |
| |
| Convey("likewise, a panic doesn't schedule anything", func() { |
| func() { |
| defer func() { _ = recover() }() |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| So(tq.Add(c, "", &tq.Task{Path: "/sandwitch/victory"}), ShouldBeNil) |
| |
| panic(fmt.Errorf("nooooo")) |
| }, nil), ShouldBeNil) |
| }() |
| |
| So(tqt.GetScheduledTasks()["default"][t.Name], ShouldResemble, t) |
| So(tqt.GetTombstonedTasks()["default"][t2.Name], ShouldResemble, t2) |
| So(tqt.GetTransactionTasks()["default"], ShouldBeNil) |
| }) |
| |
| }) |
| |
| Convey("Pull queues", func() { |
| tqt.CreatePullQueue("pull") |
| tqt.CreateQueue("push") |
| |
| Convey("One task scenarios", func() { |
| Convey("enqueue, lease, delete", func() { |
| // Enqueue. |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| Tag: "tag", |
| }) |
| So(err, ShouldBeNil) |
| |
| // Lease. |
| tasks, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 1) |
| So(tasks[0].Payload, ShouldResemble, []byte("zzz")) |
| |
| // "Disappears" from the queue while leased. |
| tc.Add(30 * time.Second) |
| tasks2, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks2), ShouldEqual, 0) |
| |
| // Remove after "processing". |
| So(tq.Delete(c, "pull", tasks[0]), ShouldBeNil) |
| |
| // Still nothing there even after lease expires. |
| tc.Add(50 * time.Second) |
| tasks3, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks3), ShouldEqual, 0) |
| }) |
| |
| Convey("enqueue, lease, loose", func() { |
| // Enqueue. |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Lease. |
| tasks, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 1) |
| So(tasks[0].Payload, ShouldResemble, []byte("zzz")) |
| |
| // Time passes, lease expires. |
| tc.Add(61 * time.Second) |
| |
| // Available again, someone grabs it. |
| tasks2, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks2), ShouldEqual, 1) |
| So(tasks2[0].Payload, ShouldResemble, []byte("zzz")) |
| |
| // Previously leased task is no longer owned. |
| err = tq.ModifyLease(c, tasks[0], "pull", time.Minute) |
| So(err, ShouldErrLike, "TASK_LEASE_EXPIRED") |
| }) |
| |
| Convey("enqueue, lease, sleep", func() { |
| // Enqueue. |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Lease. |
| tasks, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 1) |
| So(tasks[0].Payload, ShouldResemble, []byte("zzz")) |
| |
| // Time passes, the lease expires. |
| tc.Add(61 * time.Second) |
| err = tq.ModifyLease(c, tasks[0], "pull", time.Minute) |
| So(err, ShouldErrLike, "TASK_LEASE_EXPIRED") |
| }) |
| |
| Convey("enqueue, lease, extend", func() { |
| // Enqueue. |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Lease. |
| tasks, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 1) |
| So(tasks[0].Payload, ShouldResemble, []byte("zzz")) |
| |
| // Time passes, the lease is updated. |
| tc.Add(59 * time.Second) |
| err = tq.ModifyLease(c, tasks[0], "pull", time.Minute) |
| So(err, ShouldBeNil) |
| |
| // Not available, still leased. |
| tc.Add(30 * time.Second) |
| tasks2, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks2), ShouldEqual, 0) |
| }) |
| |
| Convey("enqueue, lease, return", func() { |
| // Enqueue. |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Lease. |
| tasks, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 1) |
| So(tasks[0].Payload, ShouldResemble, []byte("zzz")) |
| |
| // Put back by using 0 sec lease. |
| err = tq.ModifyLease(c, tasks[0], "pull", 0) |
| So(err, ShouldBeNil) |
| |
| // Available again. |
| tasks2, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks2), ShouldEqual, 1) |
| }) |
| |
| Convey("lease by existing tag", func() { |
| // Enqueue. |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| Tag: "tag", |
| }) |
| So(err, ShouldBeNil) |
| |
| // Try different tag first, should return nothing. |
| tasks, err := tq.LeaseByTag(c, 1, "pull", time.Minute, "wrong_tag") |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 0) |
| |
| // Leased. |
| tasks, err = tq.LeaseByTag(c, 1, "pull", time.Minute, "tag") |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 1) |
| |
| // No ready tasks anymore. |
| tasks2, err := tq.LeaseByTag(c, 1, "pull", time.Minute, "tag") |
| So(err, ShouldBeNil) |
| So(len(tasks2), ShouldEqual, 0) |
| tasks2, err = tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks2), ShouldEqual, 0) |
| |
| // Return back to the queue later. |
| tc.Add(30 * time.Second) |
| So(tq.ModifyLease(c, tasks[0], "pull", 0), ShouldBeNil) |
| |
| // Available again. |
| tasks, err = tq.LeaseByTag(c, 1, "pull", time.Minute, "tag") |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 1) |
| }) |
| |
| Convey("transactions (success)", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| return tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| }) |
| }, nil), ShouldBeNil) |
| |
| tasks, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 1) |
| }) |
| |
| Convey("transactions (rollback)", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| }) |
| So(err, ShouldBeNil) |
| return fmt.Errorf("meh") |
| }, nil), ShouldErrLike, "meh") |
| |
| tasks, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(tasks), ShouldEqual, 0) |
| }) |
| |
| Convey("transactions (invalid ops)", func() { |
| So(ds.RunInTransaction(c, func(c context.Context) error { |
| _, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldErrLike, "cannot Lease") |
| |
| _, err = tq.LeaseByTag(c, 1, "pull", time.Minute, "tag") |
| So(err, ShouldErrLike, "cannot LeaseByTag") |
| |
| err = tq.ModifyLease(c, &tq.Task{}, "pull", time.Minute) |
| So(err, ShouldErrLike, "cannot ModifyLease") |
| |
| return nil |
| }, nil), ShouldBeNil) |
| }) |
| |
| Convey("wrong queue mode", func() { |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "POST", |
| Payload: []byte("zzz"), |
| }) |
| So(err, ShouldErrLike, "INVALID_QUEUE_MODE") |
| |
| err = tq.Add(c, "push", &tq.Task{ |
| Method: "PULL", |
| Payload: []byte("zzz"), |
| }) |
| So(err, ShouldErrLike, "INVALID_QUEUE_MODE") |
| |
| _, err = tq.Lease(c, 1, "push", time.Minute) |
| So(err, ShouldErrLike, "INVALID_QUEUE_MODE") |
| |
| err = tq.ModifyLease(c, &tq.Task{}, "push", time.Minute) |
| So(err, ShouldErrLike, "INVALID_QUEUE_MODE") |
| }) |
| |
| Convey("bad requests", func() { |
| _, err := tq.Lease(c, 0, "pull", time.Minute) |
| So(err, ShouldErrLike, "BAD_REQUEST") |
| |
| _, err = tq.Lease(c, 1, "pull", -time.Minute) |
| So(err, ShouldErrLike, "BAD_REQUEST") |
| |
| err = tq.ModifyLease(c, &tq.Task{}, "pull", -time.Minute) |
| So(err, ShouldErrLike, "BAD_REQUEST") |
| }) |
| |
| Convey("tombstoned task", func() { |
| task := &tq.Task{ |
| Method: "PULL", |
| Name: "deleted", |
| Payload: []byte("zzz"), |
| } |
| So(tq.Add(c, "pull", task), ShouldBeNil) |
| So(tq.Delete(c, "pull", task), ShouldBeNil) |
| |
| err := tq.ModifyLease(c, task, "pull", time.Minute) |
| So(err, ShouldErrLike, "TOMBSTONED_TASK") |
| }) |
| |
| Convey("missing task", func() { |
| err := tq.ModifyLease(c, &tq.Task{Name: "missing"}, "pull", time.Minute) |
| So(err, ShouldErrLike, "UNKNOWN_TASK") |
| }) |
| }) |
| |
| Convey("Many-tasks scenarios (sorting)", func() { |
| Convey("Lease sorts by ETA (no tags)", func() { |
| now := clock.Now(c) |
| |
| tasks := []*tq.Task{} |
| for i := 0; i < 5; i++ { |
| tasks = append(tasks, &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-%d", i), |
| ETA: now.Add(time.Duration(i+1) * time.Second), |
| }) |
| } |
| |
| // Add in some "random" order. |
| err := tq.Add(c, "pull", tasks[4], tasks[2], tasks[0], tasks[1], tasks[3]) |
| So(err, ShouldBeNil) |
| |
| // Nothing to pull, no available tasks yet. |
| leased, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 0) |
| |
| tc.Add(time.Second) |
| |
| // First task appears. |
| leased, err = tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 1) |
| So(leased[0].Name, ShouldEqual, "task-0") |
| |
| tc.Add(4 * time.Second) |
| |
| // The rest of them appear, in sorted order. |
| leased, err = tq.Lease(c, 100, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 4) |
| for i := 0; i < 4; i++ { |
| So(leased[i].Name, ShouldEqual, fmt.Sprintf("task-%d", i+1)) |
| } |
| }) |
| }) |
| |
| Convey("Lease and forget (no tags)", func() { |
| now := clock.Now(c) |
| |
| for i := 0; i < 5; i++ { |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-%d", i), |
| ETA: now.Add(time.Duration(i+1) * time.Second), |
| }) |
| So(err, ShouldBeNil) |
| } |
| |
| tc.Add(time.Second) |
| |
| // Lease the first task for 3 sec. |
| leased, err := tq.Lease(c, 1, "pull", 3*time.Second) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 1) |
| So(leased[0].Name, ShouldEqual, "task-0") |
| |
| // "Forget" about the lease. |
| tc.Add(10 * time.Second) |
| |
| // Lease all we have there. |
| leased, err = tq.Lease(c, 100, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 5) |
| So(leased[0].Name, ShouldEqual, "task-1") |
| So(leased[1].Name, ShouldEqual, "task-2") |
| So(leased[2].Name, ShouldEqual, "task-0") |
| So(leased[3].Name, ShouldEqual, "task-3") |
| So(leased[4].Name, ShouldEqual, "task-4") |
| }) |
| |
| Convey("Modify lease moves the task (no tags)", func() { |
| now := clock.Now(c) |
| |
| for i := 0; i < 5; i++ { |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-%d", i), |
| ETA: now.Add(time.Duration(i+1) * time.Second), |
| }) |
| So(err, ShouldBeNil) |
| } |
| |
| tc.Add(time.Second) |
| |
| // Lease the first task for 1 minute. |
| leased, err := tq.Lease(c, 1, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 1) |
| So(leased[0].Name, ShouldEqual, "task-0") |
| |
| // 3 sec later release the lease. |
| tc.Add(3 * time.Second) |
| So(tq.ModifyLease(c, leased[0], "pull", 0), ShouldEqual, nil) |
| |
| tc.Add(10 * time.Second) |
| |
| // Lease all we have there. |
| leased, err = tq.Lease(c, 100, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 5) |
| So(leased[0].Name, ShouldEqual, "task-1") |
| So(leased[1].Name, ShouldEqual, "task-2") |
| So(leased[2].Name, ShouldEqual, "task-0") |
| So(leased[3].Name, ShouldEqual, "task-3") |
| So(leased[4].Name, ShouldEqual, "task-4") |
| }) |
| |
| Convey("Delete task deletes from the middle", func() { |
| now := clock.Now(c) |
| |
| for i := 0; i < 5; i++ { |
| err := tq.Add(c, "pull", &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-%d", i), |
| ETA: now.Add(time.Duration(i+1) * time.Second), |
| }) |
| So(err, ShouldBeNil) |
| } |
| |
| tc.Add(time.Second) |
| |
| // Lease the first task for 3 sec. |
| leased, err := tq.Lease(c, 1, "pull", 3*time.Second) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 1) |
| So(leased[0].Name, ShouldEqual, "task-0") |
| |
| // Kill it. |
| So(tq.Delete(c, "pull", leased[0]), ShouldBeNil) |
| |
| tc.Add(10 * time.Second) |
| |
| // Lease all we have there. |
| leased, err = tq.Lease(c, 100, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 4) |
| So(leased[0].Name, ShouldEqual, "task-1") |
| So(leased[1].Name, ShouldEqual, "task-2") |
| So(leased[2].Name, ShouldEqual, "task-3") |
| So(leased[3].Name, ShouldEqual, "task-4") |
| }) |
| |
| Convey("Tags work", func() { |
| now := clock.Now(c) |
| |
| for i := 0; i < 5; i++ { |
| err := tq.Add(c, "pull", |
| &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-%d", i), |
| ETA: now.Add(time.Duration(i+1) * time.Second), |
| }, |
| &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-a-%d", i), |
| ETA: now.Add(time.Duration(i+1) * time.Second), |
| Tag: "a", |
| }, |
| &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-b-%d", i), |
| ETA: now.Add(time.Duration(i+1) * time.Second), |
| Tag: "b", |
| }) |
| So(err, ShouldBeNil) |
| } |
| |
| tc.Add(time.Second) |
| |
| // Lease leases all regardless of tags. |
| leased, err := tq.Lease(c, 100, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 3) |
| So(leased[0].Name, ShouldEqual, "task-0") |
| So(leased[1].Name, ShouldEqual, "task-a-0") |
| So(leased[2].Name, ShouldEqual, "task-b-0") |
| |
| // Nothing to least per tag for now. |
| leased, err = tq.LeaseByTag(c, 100, "pull", time.Minute, "a") |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 0) |
| |
| tc.Add(10 * time.Second) |
| |
| // Grab all "a" tasks. |
| leased, err = tq.LeaseByTag(c, 100, "pull", time.Minute, "a") |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 4) |
| for i := 0; i < 4; i++ { |
| So(leased[i].Name, ShouldEqual, fmt.Sprintf("task-a-%d", i+1)) |
| } |
| |
| // Only "b" and untagged tasks left. |
| leased, err = tq.Lease(c, 100, "pull", time.Minute) |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 8) |
| for i := 0; i < 4; i++ { |
| So(leased[i*2].Name, ShouldEqual, fmt.Sprintf("task-%d", i+1)) |
| So(leased[i*2+1].Name, ShouldEqual, fmt.Sprintf("task-b-%d", i+1)) |
| } |
| }) |
| |
| Convey("LeaseByTag with empty tag, hitting tag first", func() { |
| now := clock.Now(c) |
| |
| // Nothing to pull, nothing there yet. |
| leased, err := tq.LeaseByTag(c, 1, "pull", time.Minute, "") |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 0) |
| |
| for i := 0; i < 5; i++ { |
| err := tq.Add(c, "pull", |
| &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-a-%d", i), |
| ETA: now.Add(time.Duration(i*2+1) * time.Second), |
| Tag: "a", |
| }, |
| &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-%d", i), |
| ETA: now.Add(time.Duration(i*2+2) * time.Second), |
| }) |
| So(err, ShouldBeNil) |
| } |
| |
| // Nothing to pull, no available tasks yet. |
| leased, err = tq.LeaseByTag(c, 1, "pull", time.Minute, "") |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 0) |
| |
| tc.Add(time.Minute) |
| |
| // Hits "a" first and fetches only "a" tasks. |
| leased, err = tq.LeaseByTag(c, 100, "pull", time.Minute, "") |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 5) |
| for i := 0; i < 5; i++ { |
| So(leased[i].Name, ShouldEqual, fmt.Sprintf("task-a-%d", i)) |
| } |
| }) |
| |
| Convey("LeaseByTag with empty tag, hitting untagged first", func() { |
| now := clock.Now(c) |
| |
| for i := 0; i < 5; i++ { |
| err := tq.Add(c, "pull", |
| &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-%d", i), |
| ETA: now.Add(time.Duration(i*2+1) * time.Second), |
| }, |
| &tq.Task{ |
| Method: "PULL", |
| Name: fmt.Sprintf("task-a-%d", i), |
| ETA: now.Add(time.Duration(i*2+2) * time.Second), |
| Tag: "a", |
| }) |
| So(err, ShouldBeNil) |
| } |
| |
| tc.Add(time.Minute) |
| |
| // Hits "" first and fetches only "" tasks. |
| leased, err := tq.LeaseByTag(c, 100, "pull", time.Minute, "") |
| So(err, ShouldBeNil) |
| So(len(leased), ShouldEqual, 5) |
| for i := 0; i < 5; i++ { |
| So(leased[i].Name, ShouldEqual, fmt.Sprintf("task-%d", i)) |
| } |
| }) |
| }) |
| }) |
| } |