blob: f72255d06d0693dc5d945e33c2002ea931db0c50 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tq
import (
taskspb ""
// TestingContext creates a scheduler that executes tasks through the given
// dispatcher (or Default one if nil) and puts it into the context as Submitter,
// so AddTask calls eventually submit tasks into this scheduler.
// The end result is that tasks submitted using such context end up in the
// returned Scheduler (allowing them to be examined), and when the Scheduler
// delivers them, they result in calls to corresponding handlers registered in
// the Dispatcher.
func TestingContext(ctx context.Context, d *Dispatcher) (context.Context, *tqtesting.Scheduler) {
if d == nil {
d = &Default
sched := &tqtesting.Scheduler{Executor: &directExecutor{d, 0}}
return UseSubmitter(ctx, sched), sched
// directExecutor implements tqtesting.Executor via handlePush.
type directExecutor struct {
d *Dispatcher
cnt int64
func (e *directExecutor) Execute(ctx context.Context, t *tqtesting.Task, done func(retry bool)) {
retry := false
defer func() { done(retry) }()
if t.Message != nil {
panic("Executing PubSub tasks is not supported yet") // break tests loudly
var body []byte
var headers map[string]string
switch mt := t.Task.MessageType.(type) {
case *taskspb.Task_HttpRequest:
body = mt.HttpRequest.Body
headers = mt.HttpRequest.Headers
case *taskspb.Task_AppEngineHttpRequest:
body = mt.AppEngineHttpRequest.Body
headers = mt.AppEngineHttpRequest.Headers
panic(fmt.Sprintf("Bad task, no payload: %q", t.Task))
hdr := make(http.Header, len(headers))
for k, v := range headers {
hdr.Set(k, v)
info := parseHeaders(hdr)
// The direct executor doesn't emulate X-CloudTasks-* headers.
info.ExecutionCount = t.Attempts - 1
ctx = logging.SetField(ctx, "TQ#", strconv.FormatInt(atomic.AddInt64(&e.cnt, 1), 10))
err := e.d.handlePush(ctx, body, info)
if err != nil {
logging.Errorf(ctx, "server/tq task error: %s", err)
retry = !Fatal.In(err)