blob: fa887d957632dfb9a574fe184df47642c88a5d3c [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
// assertInTransaction panics if the context is not transactional.
func assertInTransaction(c context.Context) {
if datastore.CurrentTransaction(c) == nil {
panic("expecting to be called from inside a transaction")
// assertNotInTransaction panics if the context is transactional.
func assertNotInTransaction(c context.Context) {
if datastore.CurrentTransaction(c) != nil {
panic("expecting to be called from outside transactions")
// debugLog mutates a string by appending a line to it.
func debugLog(c context.Context, str *string, format string, args ...any) {
prefix := clock.Now(c).UTC().Format("[15:04:05.000] ")
*str += prefix + fmt.Sprintf(format+"\n", args...)
// defaultTransactionOptions is used for all transactions.
// Almost all transactions done by the scheduler service happen in background
// task queues, it is fine to retry more there.
var defaultTransactionOptions = datastore.TransactionOptions{
Attempts: 10,
// abortTransaction makes the error abort the transaction (even if it is marked
// as transient).
// See runTxn for more info. This is used primarily by errUpdateConflict.
var abortTransaction = errors.BoolTag{Key: errors.NewTagKey("this error aborts the transaction")}
// runTxn runs a datastore transaction retrying the body on transient errors or
// when encountering a commit conflict.
// It will NOT retry errors (even if transient) marked with abortTransaction
// tag. This is primarily used to tag errors that are transient at a level
// higher than the transaction: errors marked with both transient.Tag and
// abortTransaction are not retried by runTxn, but may be retried by something
// on top (like Task Queue).
func runTxn(c context.Context, cb func(context.Context) error) error {
var attempt int
var innerErr error
err := datastore.RunInTransaction(c, func(c context.Context) error {
if attempt != 1 {
if innerErr != nil {
logging.Warningf(c, "Retrying the transaction after the error: %s", innerErr)
} else {
logging.Warningf(c, "Retrying the transaction: failed to commit")
innerErr = cb(c)
if transient.Tag.In(innerErr) && !abortTransaction.In(innerErr) {
return datastore.ErrConcurrentTransaction // causes a retry
return innerErr
}, &defaultTransactionOptions)
if err != nil {
logging.WithError(err).Errorf(c, "Transaction failed")
if innerErr != nil {
return innerErr
// Here it can only be a commit error (i.e. produced by RunInTransaction
// itself, not by its callback). We treat them as transient.
return transient.Tag.Apply(err)
return nil
// runIsolatedTxn is like runTxn, except it executes the callback in a new
// isolated transaction (even if the original context is already transactional).
func runIsolatedTxn(c context.Context, cb func(context.Context) error) error {
return runTxn(datastore.WithoutTransaction(c), cb)
// equalSortedLists returns true if lists contain the same sequence of strings.
func equalSortedLists(a, b []string) bool {
if len(a) != len(b) {
return false
for i, s := range a {
if s != b[i] {
return false
return true
// equalInt64Lists returns true if two lists of int64 are equal.
// Order is important.
func equalInt64Lists(a, b []int64) bool {
if len(a) != len(b) {
return false
for i, s := range a {
if s != b[i] {
return false
return true
// marshalTriggersList serializes list of triggers.
// Panics on errors.
func marshalTriggersList(t []*internal.Trigger) []byte {
if len(t) == 0 {
return nil
blob, err := proto.Marshal(&internal.TriggerList{Triggers: t})
if err != nil {
return blob
// unmarshalTriggersList deserializes list of triggers.
func unmarshalTriggersList(blob []byte) ([]*internal.Trigger, error) {
if len(blob) == 0 {
return nil, nil
list := internal.TriggerList{}
if err := proto.Unmarshal(blob, &list); err != nil {
return nil, err
return list.Triggers, nil
// mutateTriggersList deserializes the list, calls a callback, which modifies
// the list and serializes it back.
func mutateTriggersList(blob *[]byte, cb func(*[]*internal.Trigger)) error {
list, err := unmarshalTriggersList(*blob)
if err != nil {
return err
*blob = marshalTriggersList(list)
return nil
// sortTriggers sorts the triggers by time, most recent last.
func sortTriggers(t []*internal.Trigger) {
sort.Slice(t, func(i, j int) bool { return isTriggerOlder(t[i], t[j]) })
// isTriggerOlder returns true if t1 is older than t2.
// Compares IDs in case of a tie.
func isTriggerOlder(t1, t2 *internal.Trigger) bool {
ts1 := t1.Created.AsTime()
ts2 := t2.Created.AsTime()
switch {
case ts1.After(ts2):
return false
case ts2.After(ts1):
return true
default: // equal timestamps
if t1.OrderInBatch != t2.OrderInBatch {
return t1.OrderInBatch < t2.OrderInBatch
return t1.Id < t2.Id
// marshalTimersList serializes list of timers.
// Panics on errors.
func marshalTimersList(t []*internal.Timer) []byte {
if len(t) == 0 {
return nil
blob, err := proto.Marshal(&internal.TimerList{Timers: t})
if err != nil {
return blob
// unmarshalTimersList deserializes list of timers.
func unmarshalTimersList(blob []byte) ([]*internal.Timer, error) {
if len(blob) == 0 {
return nil, nil
list := internal.TimerList{}
if err := proto.Unmarshal(blob, &list); err != nil {
return nil, err
return list.Timers, nil
// mutateTimersList deserializes the list, calls a callback, which modifies
// the list and serializes it back.
func mutateTimersList(blob *[]byte, cb func(*[]*internal.Timer)) error {
list, err := unmarshalTimersList(*blob)
if err != nil {
return err
*blob = marshalTimersList(list)
return nil
// marshalFinishedInvs marshals list of invocations into FinishedInvocationList.
// Panics on errors.
func marshalFinishedInvs(invs []*internal.FinishedInvocation) []byte {
if len(invs) == 0 {
return nil
blob, err := proto.Marshal(&internal.FinishedInvocationList{Invocations: invs})
if err != nil {
return blob
// unmarshalFinishedInvs unmarshals FinishedInvocationList proto message.
func unmarshalFinishedInvs(raw []byte) ([]*internal.FinishedInvocation, error) {
if len(raw) == 0 {
return nil, nil
invs := internal.FinishedInvocationList{}
if err := proto.Unmarshal(raw, &invs); err != nil {
return nil, err
return invs.Invocations, nil
// filteredFinishedInvocations unmarshals FinishedInvocationList and filters
// it to keep only entries whose Finished timestamp is newer than 'oldest'.
func filteredFinishedInvs(raw []byte, oldest time.Time) ([]*internal.FinishedInvocation, error) {
invs, err := unmarshalFinishedInvs(raw)
if err != nil {
return nil, err
filtered := make([]*internal.FinishedInvocation, 0, len(invs))
for _, inv := range invs {
if inv.Finished.AsTime().After(oldest) {
filtered = append(filtered, inv)
return filtered, nil
// opsCache "remembers" recently executed operations, and skips executing them
// if they already were done.
// Expected cardinality of a set of all possible actions should be small (we
// store the cache in memory).
type opsCache struct {
lock sync.RWMutex
doneFlags map[string]bool
// Do calls callback only if it wasn't called before.
// Works on best effort basis: callback can and will be called multiple times
// (just not the every time 'Do' is called).
// Keeps "done" flag in local memory and in memcache (using 'key' as
// identifier). The callback should be idempotent, since it still may be called
// multiple times if multiple processes attempt to execute the action at once.
func (o *opsCache) Do(c context.Context, key string, cb func() error) error {
// Check the local cache.
if o.getFlag(key) {
return nil
// Check the global cache.
switch _, err := memcache.GetKey(c, key); {
case err == nil:
return nil
case err == memcache.ErrCacheMiss:
logging.WithError(err).Warningf(c, "opsCache failed to check memcache, will proceed executing op")
// Do it.
if err := cb(); err != nil {
return err
// Store in the local cache.
// Store in the global cache. Ignore errors, it's not a big deal.
item := memcache.NewItem(c, key)
item.SetExpiration(24 * time.Hour)
if err := memcache.Set(c, item); err != nil {
logging.WithError(err).Warningf(c, "opsCache failed to write item to memcache")
return nil
func (o *opsCache) getFlag(key string) bool {
defer o.lock.RUnlock()
return o.doneFlags[key]
func (o *opsCache) setFlag(key string) {
defer o.lock.Unlock()
if o.doneFlags == nil {
o.doneFlags = map[string]bool{}
o.doneFlags[key] = true