blob: 22953908f4a384c6c101dd979ed95c8a6fdef11f [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package eventpb
import (
"context"
"fmt"
"time"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/cv/internal/common"
)
const (
// ManageRunTaskClass is the ID of ManageRunTask Class.
ManageRunTaskClass = "manage-run"
// ManageRunLongOpTaskClass is the ID of the ManageRunLongOp Class.
ManageRunLongOpTaskClass = "manage-run-long-op"
// taskInterval is target frequency of executions of ManageRunTask.
//
// See Dispatch() for details.
taskInterval = time.Second
)
// TasksBinding binds Run Manager tasks to a TQ Dispatcher.
//
// This struct exists to separate task creation and handling,
// which in turns avoids circular dependency.
type TasksBinding struct {
ManageRun tq.TaskClassRef
KickManage tq.TaskClassRef
ManageRunLongOp tq.TaskClassRef
TQDispatcher *tq.Dispatcher
}
// Register registers tasks with the given TQ Dispatcher.
func Register(tqd *tq.Dispatcher) TasksBinding {
t := TasksBinding{
ManageRun: tqd.RegisterTaskClass(tq.TaskClass{
ID: ManageRunTaskClass,
Prototype: &ManageRunTask{},
Queue: "manage-run",
Kind: tq.NonTransactional,
QuietOnError: true,
Quiet: true,
}),
KickManage: tqd.RegisterTaskClass(tq.TaskClass{
ID: fmt.Sprintf("kick-%s", ManageRunTaskClass),
Prototype: &KickManageRunTask{},
Queue: "kick-manage-run",
Kind: tq.Transactional,
QuietOnError: true,
Quiet: true,
}),
ManageRunLongOp: tqd.RegisterTaskClass(tq.TaskClass{
ID: ManageRunLongOpTaskClass,
Prototype: &ManageRunLongOpTask{},
Queue: "manage-run-long-op",
Kind: tq.Transactional,
QuietOnError: true,
Quiet: true,
}),
TQDispatcher: tqd,
}
t.KickManage.AttachHandler(func(ctx context.Context, payload proto.Message) error {
task := payload.(*KickManageRunTask)
var eta time.Time
if t := task.GetEta(); t != nil {
eta = t.AsTime()
}
err := t.Dispatch(ctx, task.GetRunId(), eta)
return common.TQifyError(ctx, err)
})
return t
}
// Dispatch ensures invocation of RunManager via ManageRunTask.
//
// RunManager will be invoked at approximately no earlier than both:
// - eta time (if given)
// - next possible.
//
// To avoid actually dispatching TQ tasks in tests, use runtest.MockDispatch().
func (tr TasksBinding) Dispatch(ctx context.Context, runID string, eta time.Time) error {
mock, mocked := ctx.Value(&mockDispatcherContextKey).(func(string, time.Time))
if datastore.CurrentTransaction(ctx) != nil {
payload := &KickManageRunTask{RunId: runID}
if !eta.IsZero() {
payload.Eta = timestamppb.New(eta)
}
if mocked {
mock(runID, eta)
return nil
}
return tr.TQDispatcher.AddTask(ctx, &tq.Task{
DeduplicationKey: "", // not allowed in a transaction
Payload: payload,
})
}
// If actual local clock is more than `clockDrift` behind, the "next" computed
// ManageRunTask moment might be already executing, meaning task dedup will
// ensure no new task will be scheduled AND the already executing run
// might not have read the Event that was just written.
// Thus, for safety, this should be large, however, will also leads to higher
// latency of event processing of non-busy RunManager.
// TODO(tandrii/yiwzhang): this can be reduced significantly once safety
// "ping" events are originated from Config import cron tasks.
const clockDrift = 100 * time.Millisecond
now := clock.Now(ctx).Add(clockDrift) // Use the worst possible time.
if eta.IsZero() || eta.Before(now) {
eta = now
}
eta = eta.Truncate(taskInterval).Add(taskInterval)
if mocked {
mock(runID, eta)
return nil
}
return tr.TQDispatcher.AddTask(ctx, &tq.Task{
Title: runID,
DeduplicationKey: fmt.Sprintf("%s\n%d", runID, eta.UnixNano()),
ETA: eta,
Payload: &ManageRunTask{RunId: runID},
})
}
var mockDispatcherContextKey = "eventpb.mockDispatcher"
// InstallMockDispatcher is used in test to run tests emitting RM events without
// actually dispatching RM tasks.
//
// See runtest.MockDispatch().
func InstallMockDispatcher(ctx context.Context, f func(runID string, eta time.Time)) context.Context {
return context.WithValue(ctx, &mockDispatcherContextKey, f)
}