blob: d4d851d79893321a1d18130c264cf3df2ee47121 [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tq
import (
"bytes"
"context"
"fmt"
"net/http/httptest"
"sort"
"testing"
"time"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"
"go.chromium.org/luci/gae/impl/memory"
"go.chromium.org/luci/gae/service/taskqueue"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/server/router"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
var epoch = time.Unix(1500000000, 0).UTC()
func TestDispatcher(t *testing.T) {
t.Parallel()
Convey("With dispatcher", t, func() {
ctx := memory.Use(context.Background())
ctx = clock.Set(ctx, testclock.New(epoch))
taskqueue.GetTestable(ctx).CreateQueue("another-q")
d := Dispatcher{}
r := router.New()
installRoutes := func() {
d.InstallRoutes(r, router.NewMiddlewareChain(func(c *router.Context, next router.Handler) {
c.Context = ctx
next(c)
}))
}
runTasks := func(ctx context.Context) []int {
var codes []int
for _, tasks := range taskqueue.GetTestable(ctx).GetScheduledTasks() {
for _, task := range tasks {
// Execute the task.
req := httptest.NewRequest("POST", "http://example.com"+task.Path, bytes.NewReader(task.Payload))
rw := httptest.NewRecorder()
r.ServeHTTP(rw, req)
codes = append(codes, rw.Code)
}
}
return codes
}
Convey("Single task", func() {
var calls []proto.Message
handler := func(ctx context.Context, payload proto.Message) error {
hdr, err := RequestHeaders(ctx)
So(err, ShouldBeNil)
So(hdr, ShouldResemble, &taskqueue.RequestHeaders{})
calls = append(calls, payload)
return nil
}
// Abuse some well-known proto type to simplify the test. It's doesn't
// matter what proto type we use here as long as it is registered in
// protobuf type registry.
d.RegisterTask(&durationpb.Duration{}, handler, "", nil)
installRoutes()
err := d.AddTask(ctx, &Task{
Payload: &durationpb.Duration{Seconds: 123},
DeduplicationKey: "abc",
NamePrefix: "prefix",
Title: "abc-def",
Delay: 30 * time.Second,
})
So(err, ShouldBeNil)
// Added the task.
expectedPath := "/internal/tasks/default/abc-def"
expectedName := "prefix-afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123s"}`)
tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
So(tasks, ShouldResemble, taskqueue.QueueData{
"default": map[string]*taskqueue.Task{
expectedName: {
Path: expectedPath,
Payload: expectedBody,
Name: expectedName,
Method: "POST",
ETA: epoch.Add(30 * time.Second),
},
},
"another-q": {},
})
// Read a task with same dedup key. Should be silently ignored.
err = d.AddTask(ctx, &Task{
Payload: &durationpb.Duration{Seconds: 123},
DeduplicationKey: "abc",
NamePrefix: "prefix",
})
So(err, ShouldBeNil)
// No new tasks.
tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
So(len(tasks["default"]), ShouldResemble, 1)
Convey("Executed", func() {
// Execute the task.
So(runTasks(ctx), ShouldResemble, []int{200})
So(calls, ShouldResembleProto, []proto.Message{
&durationpb.Duration{Seconds: 123},
})
})
Convey("Deleted", func() {
So(d.DeleteTask(ctx, &Task{
Payload: &durationpb.Duration{Seconds: 123},
DeduplicationKey: "abc",
NamePrefix: "prefix",
}), ShouldBeNil)
// Did not execute any tasks.
So(runTasks(ctx), ShouldHaveLength, 0)
So(calls, ShouldHaveLength, 0)
})
})
Convey("Deleting unknown task returns nil", func() {
handler := func(ctx context.Context, payload proto.Message) error { return nil }
d.RegisterTask(&durationpb.Duration{}, handler, "default", nil)
So(d.DeleteTask(ctx, &Task{
Payload: &durationpb.Duration{Seconds: 123},
DeduplicationKey: "something",
}), ShouldBeNil)
})
Convey("Many tasks", func() {
handler := func(ctx context.Context, payload proto.Message) error { return nil }
d.RegisterTask(&durationpb.Duration{}, handler, "default", nil)
d.RegisterTask(&emptypb.Empty{}, handler, "another-q", nil)
installRoutes()
t := []*Task{}
for i := 0; i < 200; i++ {
var task *Task
if i%2 == 0 {
task = &Task{
DeduplicationKey: fmt.Sprintf("%d", i/2),
Payload: &durationpb.Duration{},
Delay: time.Duration(i) * time.Second,
}
} else {
task = &Task{
DeduplicationKey: fmt.Sprintf("%d", (i-1)/2),
Payload: &emptypb.Empty{},
Delay: time.Duration(i) * time.Second,
}
}
t = append(t, task)
// Mix in some duplicates.
if i > 0 && i%100 == 0 {
t = append(t, task)
}
}
err := d.AddTask(ctx, t...)
So(err, ShouldBeNil)
// Added all the tasks.
allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
delaysDefault := map[time.Duration]struct{}{}
for _, task := range allTasks["default"] {
delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{}
}
delaysAnotherQ := map[time.Duration]struct{}{}
for _, task := range allTasks["another-q"] {
delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{}
}
So(len(delaysDefault), ShouldEqual, 100)
So(len(delaysAnotherQ), ShouldEqual, 100)
// Delete the tasks.
So(d.DeleteTask(ctx, t...), ShouldBeNil)
So(runTasks(ctx), ShouldHaveLength, 0)
})
Convey("Execution errors", func() {
var returnErr error
panicNow := false
handler := func(ctx context.Context, payload proto.Message) error {
if panicNow {
panic("must not be called")
}
return returnErr
}
d.RegisterTask(&durationpb.Duration{}, handler, "", nil)
installRoutes()
goodBody := `{"type":"google.protobuf.Duration","body":"123.000s"}`
execute := func(body string) *httptest.ResponseRecorder {
req := httptest.NewRequest(
"POST",
"http://example.com/internal/tasks/default/abc-def",
bytes.NewReader([]byte(body)))
rw := httptest.NewRecorder()
r.ServeHTTP(rw, req)
return rw
}
// Error conditions inside the task body.
returnErr = nil
rw := execute(goodBody)
So(rw.Code, ShouldEqual, 200)
So(rw.Body.String(), ShouldEqual, "OK\n")
returnErr = fmt.Errorf("fatal err")
rw = execute(goodBody)
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldEqual, "Fatal error: fatal err\n")
// 500 for retry on transient errors.
returnErr = errors.New("transient err", transient.Tag)
rw = execute(goodBody)
So(rw.Code, ShouldEqual, 500)
So(rw.Body.String(), ShouldEqual, "Transient error: transient err\n")
// 409 for retry on Retry-tagged errors. Retry tag trumps transient.Tag.
returnErr = errors.New("retry me", transient.Tag, Retry)
rw = execute(goodBody)
So(rw.Code, ShouldEqual, 409)
So(rw.Body.String(), ShouldEqual, "The handler asked for retry: retry me\n")
// Error conditions when routing the task.
panicNow = true
rw = execute("not a json")
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
rw = execute(`{"type":"google.protobuf.Duration"}`)
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
rw = execute(`{"type":"google.protobuf.Duration","body":"blah"}`)
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
rw = execute(`{"type":"unknown.proto.type","body":"{}"}`)
So(rw.Code, ShouldEqual, 202) // no retry!
So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
})
Convey("GetQueues", func() {
// Never called.
handler := func(ctx context.Context, payload proto.Message) error {
panic("handler was called in GetQueues")
return nil
}
Convey("empty queue name", func() {
d.RegisterTask(&durationpb.Duration{}, handler, "", nil)
So(d.GetQueues(), ShouldResemble, []string{"default"})
})
Convey("multiple queue names", func() {
d.RegisterTask(&durationpb.Duration{}, handler, "default", nil)
d.RegisterTask(&emptypb.Empty{}, handler, "another", nil)
queues := d.GetQueues()
sort.Strings(queues)
So(queues, ShouldResemble, []string{"another", "default"})
})
Convey("duplicated queue names", func() {
d.RegisterTask(&durationpb.Duration{}, handler, "default", nil)
d.RegisterTask(&emptypb.Empty{}, handler, "default", nil)
queues := d.GetQueues()
sort.Strings(queues)
So(queues, ShouldResemble, []string{"default"})
})
})
})
}