blob: 65acab98e59b03ede204ba2ba3f624e4ea3a802f [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tq
import (
taskspb ""
// TestingContext creates a scheduler that executes tasks through the given
// dispatcher (or Default one if nil) and puts it into the context as Submitter,
// so AddTask calls eventually submit tasks into this scheduler.
// The end result is that tasks submitted using such context end up in the
// returned Scheduler (allowing them to be examined), and when the Scheduler
// delivers them, they result in calls to corresponding handlers registered in
// the Dispatcher.
func TestingContext(ctx context.Context, d *Dispatcher) (context.Context, *tqtesting.Scheduler) {
if d == nil {
d = &Default
sched := &tqtesting.Scheduler{Executor: directExecutor{d}}
return UseSubmitter(ctx, sched), sched
// directExecutor implements tqtesting.Executor via handlePush.
type directExecutor struct {
d *Dispatcher
func (e directExecutor) Execute(ctx context.Context, t *tqtesting.Task, done func(retry bool)) {
retry := false
defer done(retry)
if t.Message != nil {
panic("Executing PubSub tasks is not supported yet") // break tests loudly
var body []byte
var headers map[string]string
switch mt := t.Task.MessageType.(type) {
case *taskspb.Task_HttpRequest:
body = mt.HttpRequest.Body
headers = mt.HttpRequest.Headers
case *taskspb.Task_AppEngineHttpRequest:
body = mt.AppEngineHttpRequest.Body
headers = mt.AppEngineHttpRequest.Headers
panic(fmt.Sprintf("Bad task, no payload: %q", t.Task))
hdr := make(http.Header, len(headers))
for k, v := range headers {
hdr.Set(k, v)
info := parseHeaders(hdr)
// The direct executor doesn't emulate X-CloudTasks-* headers.
info.ExecutionCount = t.Attempts - 1
err := e.d.handlePush(ctx, body, info)
retry = err != nil && !Fatal.In(err)