blob: f31cfe7f4f22281490752745220a5c21ccf196ae [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
// errUpdateConflict means Invocation is being modified by two TaskController's
// concurrently. It should not be happening often. The most likely situation
// is when the same invocation is poked via a timer and via an external PubSub
// event at the exact same time.
// This error is marked as transient to indicate to the scheduler guts that this
// is a transient condition (there's a bunch of transient.Tag.In checks which
// are sensitive to this). At the same time we don't want to keep retrying
// the second part of a two-part transaction: it's useless, the result will be
// the same conflict. So the error is tagged with abortTransaction to exit the
// inner retry loop in runTxn.
// Finally, we do want to retry the entire two-part transaction. To indicate
// this, the error is tagged with tq.Retry. It signals to the task queue layer
// (and to the pubsub) that the task should be redelivered later.
var errUpdateConflict = errors.New(
"concurrent modifications of an Invocation",
// taskController manages execution of single invocation.
// It is short-lived object spawned to handle some single event in the lifecycle
// of the invocation.
type taskController struct {
ctx context.Context // for DebugLog logging only
eng *engineImpl // for prepareTopic and invChanging
task proto.Message // extracted from saved.Task blob
manager task.Manager
request task.Request
saved Invocation // what have been given initially or saved in Save()
state task.State // state mutated by TaskManager
debugLog string // mutated by DebugLog
timers []*internal.Timer // new timers, mutated by AddTimer
triggers []*internal.Trigger // new outgoing triggers, mutated by EmitTrigger
triggerIndex int64 // incremented with each new emitted trigger
consumedTimers stringset.Set // timers popped in consumeTimer
// controllerForInvocation returns new instance of taskController configured
// to work with given invocation.
// If task definition can't be deserialized, returns both controller and error.
func controllerForInvocation(c context.Context, e *engineImpl, inv *Invocation) (*taskController, error) {
ctl := &taskController{
ctx: c,
eng: e,
saved: *inv,
var err error
if ctl.task, err = e.cfg.Catalog.UnmarshalTask(c, inv.Task); err != nil {
return ctl, fmt.Errorf("failed to unmarshal the task - %s", err)
if ctl.manager = e.cfg.Catalog.GetTaskManager(ctl.task); ctl.manager == nil {
return ctl, fmt.Errorf("TaskManager is unexpectedly missing")
if err = ctl.populateState(); err != nil {
return ctl, fmt.Errorf("failed to construct task.State - %s", err)
if ctl.request, err = getRequestFromInv(&ctl.saved); err != nil {
return ctl, fmt.Errorf("failed to construct task.Request - %s", err)
return ctl, nil
// populateState populates 'state' using data in 'saved'.
func (ctl *taskController) populateState() error {
ctl.state = task.State{
Status: ctl.saved.Status,
ViewURL: ctl.saved.ViewURL,
TaskData: append([]byte(nil), ctl.saved.TaskData...), // copy
return nil
// consumeTimer removes the given timer from the invocation, if it is there.
// Returns true if it was there, false if it wasn't, or an error on
// deserialization errors.
// We can't modify 'saved' in place, since it is supposed to always match
// what is (or has been) in the datastore. So delay the entity modification
// until Save() call.
func (ctl *taskController) consumeTimer(timerID string) (bool, error) {
if ctl.consumedTimers != nil && ctl.consumedTimers.Has(timerID) {
return false, nil // already consumed
timers, err := unmarshalTimersList(ctl.saved.PendingTimersRaw)
if err != nil {
return false, err
for _, t := range timers {
if t.Id == timerID {
if ctl.consumedTimers == nil {
ctl.consumedTimers = stringset.New(1)
return true, nil
return false, nil
// JobID is part of task.Controller interface.
func (ctl *taskController) JobID() string {
return ctl.saved.JobID
// InvocationID is part of task.Controller interface.
func (ctl *taskController) InvocationID() int64 {
return ctl.saved.ID
// Task is part of task.Controller interface.
func (ctl *taskController) Task() proto.Message {
return ctl.task
// Request is part of task.Controller interface.
func (ctl *taskController) Request() task.Request {
return ctl.request
// State is part of task.Controller interface.
func (ctl *taskController) State() *task.State {
return &ctl.state
// DebugLog is part of task.Controller interface.
func (ctl *taskController) DebugLog(format string, args ...interface{}) {
logging.Infof(ctl.ctx, format, args...)
debugLog(ctl.ctx, &ctl.debugLog, format, args...)
// AddTimer is part of Controller interface.
func (ctl *taskController) AddTimer(ctx context.Context, delay time.Duration, title string, payload []byte) {
// ID for the new timer. It is guaranteed to be unique when we land the
// transaction because all ctl.saved modifications are serialized in time (see
// MutationsCount checks in Save), and we include serial number of such
// modification into the timer id (and suffix it with the index of the timer
// emitted by this particular modification). If two modification happen
// concurrently, they may temporary get same timer ID, but only one will
// actually land.
timerID := fmt.Sprintf("%s:%d:%d:%d",
ctl.JobID(), ctl.InvocationID(),
ctl.saved.MutationsCount, len(ctl.timers))
ctl.DebugLog("Scheduling timer %q (%s) after %s", title, timerID, delay)
now := clock.Now(ctx)
ctl.timers = append(ctl.timers, &internal.Timer{
Id: timerID,
Created: google.NewTimestamp(now),
Eta: google.NewTimestamp(now.Add(delay)),
Title: title,
Payload: payload,
// PrepareTopic is part of task.Controller interface.
func (ctl *taskController) PrepareTopic(ctx context.Context, publisher string) (topic string, token string, err error) {
return ctl.eng.prepareTopic(ctx, &topicParams{
jobID: ctl.JobID(),
invID: ctl.InvocationID(),
manager: ctl.manager,
publisher: publisher,
// GetClient is part of task.Controller interface
func (ctl *taskController) GetClient(ctx context.Context, opts ...auth.RPCOption) (*http.Client, error) {
opts = append(opts, auth.WithProject(ctl.saved.GetProjectID()))
t, err := auth.GetRPCTransport(ctx, auth.AsProject, opts...)
if err != nil {
return nil, err
return &http.Client{Transport: t}, nil
// EmitTrigger is part of task.Controller interface.
func (ctl *taskController) EmitTrigger(ctx context.Context, trigger *internal.Trigger) {
ctl.DebugLog("Emitting a trigger %s", trigger.Id)
trigger.JobId = ctl.JobID()
trigger.InvocationId = ctl.InvocationID()
// See docs for internal.Trigger proto. Tuple (created, order_in_batch) used
// for casual ordering of triggers emitted by an invocation. Callers of
// EmitTrigger are free to override this by supplying their own timestamp. In
// such case they also should provide order_in_batch. Otherwise we build the
// tuple for them based on the order of EmitTrigger calls.
if trigger.Created == nil {
trigger.Created = google.NewTimestamp(clock.Now(ctx))
trigger.OrderInBatch = ctl.triggerIndex
ctl.triggers = append(ctl.triggers, trigger)
ctl.triggerIndex++ // note: this is NOT reset in Save, unlike ctl.triggers.
// Save uploads updated Invocation to the datastore, updating the state of the
// corresponding job, if necessary.
// Returns errUpdateConflict if the invocation was modified by some other task
// controller concurrently.
// May also return transient errors (on RPC timeouts, etc).
func (ctl *taskController) Save(ctx context.Context) (err error) {
// Log the intent to trigger jobs, if any.
if len(ctl.triggers) != 0 && len(ctl.saved.TriggeredJobIDs) != 0 {
ctl.DebugLog("Dispatching triggers (%d of them) to the following jobs:", len(ctl.triggers))
for _, jobID := range ctl.saved.TriggeredJobIDs {
ctl.DebugLog(" %s", jobID)
// Mutate copy in case transaction below fails. Also unpacks ctl.state back
// into the entity (reverse of 'populateState').
saving := ctl.saved
saving.Status = ctl.state.Status
saving.TaskData = append([]byte(nil), ctl.state.TaskData...)
saving.ViewURL = ctl.state.ViewURL
saving.DebugLog += ctl.debugLog
// Cleanup all consumed timers.
if ctl.consumedTimers != nil {
err := mutateTimersList(&saving.PendingTimersRaw, func(out *[]*internal.Timer) {
filtered := make([]*internal.Timer, 0, len(*out))
for _, t := range *out {
if !ctl.consumedTimers.Has(t.Id) {
filtered = append(filtered, t)
*out = filtered
if err != nil {
return err
// No changes at all? Skip transaction.
if saving.isEqual(&ctl.saved) && len(ctl.timers) == 0 && len(ctl.triggers) == 0 {
return nil
hasFinished := !ctl.saved.Status.Final() && saving.Status.Final()
// Update local copy of Invocation with what's in the datastore on success.
defer func() {
if err != nil {
ctl.saved = saving
ctl.debugLog = "" // debug log was successfully flushed
ctl.timers = nil // new timers were successfully scheduled
ctl.triggers = nil // new triggers were successfully emitted
ctl.consumedTimers = nil // pending timers were consumed
if hasFinished {
saving.Finished = clock.Now(ctx).UTC()
ctx, "Invocation finished in %s with status %s",
saving.Finished.Sub(saving.Started), saving.Status)
// Finished invocations can't schedule timers.
for _, t := range ctl.timers {
saving.debugLog(ctx, "Ignoring timer %q (%s)...", t.Title, t.Id)
ctl.timers = nil
// Update the invocation entity, notifying the engine about all changes.
err = runTxn(ctx, func(c context.Context) error {
// Grab what's currently in the store to compare MutationsCount to what we
// expect it to be.
mostRecent := Invocation{ID: saving.ID}
switch err := datastore.Get(c, &mostRecent); {
case err == datastore.ErrNoSuchEntity: // should not happen
logging.Errorf(c, "Invocation is suddenly gone")
return errors.New("invocation is suddenly gone")
case err != nil:
return transient.Tag.Apply(err)
// Make sure no one touched it while we were handling the invocation.
if saving.MutationsCount != mostRecent.MutationsCount+1 {
logging.Errorf(c, "Invocation was modified by someone else while we were handling it")
return errUpdateConflict
// Notify the engine about the invocation state change and all timers and
// triggers. The engine may decide to update the corresponding job.
if err := ctl.eng.invChanging(c, &ctl.saved, &saving, ctl.timers, ctl.triggers); err != nil {
return err
// Persist all changes to the invocation.
return transient.Tag.Apply(datastore.Put(c, &saving))
switch {
case err != nil:
return err
case hasFinished:
return nil