blob: 0a8cf35162c1e3198fa3b0ce14a1c31cc772c0f1 [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tq
import (
"context"
"testing"
"time"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"go.chromium.org/luci/auth/integration/authtest"
"go.chromium.org/luci/auth/integration/localauth"
"go.chromium.org/luci/lucictx"
"go.chromium.org/luci/server"
"go.chromium.org/luci/server/module"
)
func TestLoopbackHTTPExecutor(t *testing.T) {
t.Parallel()
ctx := context.Background()
incomingTasks := make(chan proto.Message)
// Register a task class.
disp := Dispatcher{}
disp.RegisterTaskClass(TaskClass{
ID: "test-dur",
Prototype: &durationpb.Duration{}, // just some proto type
Kind: NonTransactional,
Queue: "queue-1",
Handler: func(ctx context.Context, payload proto.Message) error {
incomingTasks <- payload
return nil
},
})
// The server needs an auth context to run in (it won't actually use any
// tokens though in this particular test).
authSrv := localauth.Server{
TokenGenerators: map[string]localauth.TokenGenerator{
"authtest": &authtest.FakeTokenGenerator{Email: "test@example.com"},
},
DefaultAccountID: "authtest",
}
la, err := authSrv.Start(ctx)
if err != nil {
t.Fatalf("Failed to launch localauth.Server: %s", err)
}
ctx = lucictx.SetLocalAuth(ctx, la)
t.Cleanup(func() { _ = authSrv.Stop(ctx) })
// Actually run the server with the TQ module.
srv, err := server.New(ctx, server.Options{
Prod: false,
HTTPAddr: "127.0.0.1:0",
AdminAddr: "-",
}, []module.Module{
NewModule(&ModuleOptions{
Dispatcher: &disp,
ServingPrefix: "/internal/tasks",
SweepMode: "inproc",
}),
})
if err != nil {
t.Fatalf("failed to initialize the server: %s", err)
}
// Run its loop in background, then kill it.
go func() { _ = srv.Serve() }()
defer srv.Shutdown()
const TaskCount = 5
// Emit a bunch of tasks via the submitter assigned to the server (it lives
// in the server's context).
for i := time.Duration(0); i < time.Duration(TaskCount); i++ {
err = disp.AddTask(srv.Context, &Task{Payload: durationpb.New(i)})
if err != nil {
t.Fatalf("failed to add a task: %s", err)
}
}
// Make sure they eventually are handled.
seen := map[time.Duration]struct{}{}
for i := 0; i < TaskCount; i++ {
select {
case got := <-incomingTasks:
seen[got.(*durationpb.Duration).AsDuration()] = struct{}{}
case <-time.After(time.Minute): // gross overestimate to deflake
t.Fatalf("timeout while waiting for a task handler to be called")
}
}
if len(seen) != TaskCount {
t.Fatalf("expected %d tasks, got %v", TaskCount, seen)
}
}