blob: 316be4dcd9187404846d9eaec2cb5ebf63ad02cc [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tq
import (
"bytes"
"fmt"
"net/http/httptest"
"testing"
"time"
"golang.org/x/net/context"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/duration"
"github.com/golang/protobuf/ptypes/empty"
"github.com/luci/gae/impl/memory"
"github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/clock/testclock"
"github.com/luci/luci-go/common/retry/transient"
"github.com/luci/luci-go/server/router"
. "github.com/smartystreets/goconvey/convey"
)
var epoch = time.Unix(1500000000, 0).UTC()
func TestDispatcher(t *testing.T) {
t.Parallel()
Convey("With dispatcher", t, func() {
ctx := memory.Use(context.Background())
ctx = clock.Set(ctx, testclock.New(epoch))
taskqueue.GetTestable(ctx).CreateQueue("another-q")
d := Dispatcher{}
r := router.New()
installRoutes := func() {
d.InstallRoutes(r, router.NewMiddlewareChain(func(c *router.Context, next router.Handler) {
c.Context = ctx
next(c)
}))
}
runTasks := func(ctx context.Context) []int {
var codes []int
for _, tasks := range taskqueue.GetTestable(ctx).GetScheduledTasks() {
for _, task := range tasks {
// Execute the task.
req := httptest.NewRequest("POST", "http://example.com"+task.Path, bytes.NewReader(task.Payload))
rw := httptest.NewRecorder()
r.ServeHTTP(rw, req)
codes = append(codes, rw.Code)
}
}
return codes
}
Convey("Single task", func() {
var calls []proto.Message
handler := func(c context.Context, payload proto.Message, execCount int) error {
calls = append(calls, payload)
return nil
}
// Abuse some well-known proto type to simplify the test. It's doesn't
// matter what proto type we use here as long as it is registered in
// protobuf type registry.
d.RegisterTask(&duration.Duration{}, handler, "", nil)
installRoutes()
err := d.AddTask(ctx, &Task{
Payload: &duration.Duration{Seconds: 123},
DeduplicationKey: "abc",
NamePrefix: "prefix",
Title: "abc-def",
Delay: 30 * time.Second,
})
So(err, ShouldBeNil)
// Added the task.
expectedPath := "/internal/tasks/default/abc-def"
expectedName := "prefix-afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123.000s"}`)
tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
So(tasks, ShouldResemble, taskqueue.QueueData{
"default": map[string]*taskqueue.Task{
expectedName: {
Path: expectedPath,
Payload: expectedBody,
Name: expectedName,
Method: "POST",
ETA: epoch.Add(30 * time.Second),
},
},
"another-q": {},
})
// Readd a task with same dedup key. Should be silently ignored.
err = d.AddTask(ctx, &Task{
Payload: &duration.Duration{Seconds: 123},
DeduplicationKey: "abc",
NamePrefix: "prefix",
})
So(err, ShouldBeNil)
// No new tasks.
tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
So(len(tasks["default"]), ShouldResemble, 1)
Convey("Executed", func() {
// Execute the task.
So(runTasks(ctx), ShouldResemble, []int{200})
So(calls, ShouldResemble, []proto.Message{
&duration.Duration{Seconds: 123},
})
})
Convey("Deleted", func() {
So(d.DeleteTask(ctx, &Task{
Payload: &duration.Duration{Seconds: 123},
DeduplicationKey: "abc",
NamePrefix: "prefix",
}), ShouldBeNil)
// Did not execute any tasks.
So(runTasks(ctx), ShouldHaveLength, 0)
So(calls, ShouldHaveLength, 0)
})
})
Convey("Deleting unknown task returns nil", func() {
handler := func(c context.Context, payload proto.Message, execCount int) error { return nil }
d.RegisterTask(&duration.Duration{}, handler, "default", nil)
So(d.DeleteTask(ctx, &Task{
Payload: &duration.Duration{Seconds: 123},
DeduplicationKey: "something",
}), ShouldBeNil)
})
Convey("Many tasks", func() {
handler := func(c context.Context, payload proto.Message, execCount int) error { return nil }
d.RegisterTask(&duration.Duration{}, handler, "default", nil)
d.RegisterTask(&empty.Empty{}, handler, "another-q", nil)
installRoutes()
t := []*Task{}
for i := 0; i < 200; i++ {
var task *Task
if i%2 == 0 {
task = &Task{
DeduplicationKey: fmt.Sprintf("%d", i/2),
Payload: &duration.Duration{},
Delay: time.Duration(i) * time.Second,
}
} else {
task = &Task{
DeduplicationKey: fmt.Sprintf("%d", (i-1)/2),
Payload: &empty.Empty{},
Delay: time.Duration(i) * time.Second,
}
}
t = append(t, task)
// Mix in some duplicates.
if i > 0 && i%100 == 0 {
t = append(t, task)
}
}
err := d.AddTask(ctx, t...)
So(err, ShouldBeNil)
// Added all the tasks.
allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
delaysDefault := map[time.Duration]struct{}{}
for _, task := range allTasks["default"] {
delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{}
}
delaysAnotherQ := map[time.Duration]struct{}{}
for _, task := range allTasks["another-q"] {
delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{}
}
So(len(delaysDefault), ShouldEqual, 100)
So(len(delaysAnotherQ), ShouldEqual, 100)
// Delete the tasks.
So(d.DeleteTask(ctx, t...), ShouldBeNil)
So(runTasks(ctx), ShouldHaveLength, 0)
})
Convey("Execution errors", func() {
var returnErr error
panicNow := false
handler := func(c context.Context, payload proto.Message, execCount int) error {
if panicNow {
panic("must not be called")
}
return returnErr
}
d.RegisterTask(&duration.Duration{}, handler, "", nil)
installRoutes()
goodBody := `{"type":"google.protobuf.Duration","body":"123.000s"}`
execute := func(body string) *httptest.ResponseRecorder {
req := httptest.NewRequest(
"POST",
"http://example.com/internal/tasks/default/abc-def",
bytes.NewReader([]byte(body)))
rw := httptest.NewRecorder()
r.ServeHTTP(rw, req)
return rw
}
// Error conditions inside the task body.
returnErr = nil
rw := execute(goodBody)
So(rw.Code, ShouldEqual, 200)
So(rw.Body.String(), ShouldEqual, "OK\n")
returnErr = fmt.Errorf("fatal err")
rw = execute(goodBody)
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldEqual, "Fatal error: fatal err\n")
returnErr = transient.Tag.Apply(fmt.Errorf("transient err"))
rw = execute(goodBody)
So(rw.Code, ShouldEqual, 500) // 500 for retry
So(rw.Body.String(), ShouldEqual, "Transient error: transient err\n")
// Error conditions when routing the task.
panicNow = true
rw = execute("not a json")
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
rw = execute(`{"type":"google.protobuf.Duration"}`)
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
rw = execute(`{"type":"google.protobuf.Duration","body":"blah"}`)
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
rw = execute(`{"type":"unknown.proto.type","body":"{}"}`)
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
})
})
}