blob: 8bf2864639a28fe2a4c481e5dd68aa0f4fadfc76 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tumble
import (
"context"
"fmt"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/gae/filter/txnBuf"
ds "go.chromium.org/luci/gae/service/datastore"
)
// RunMutation immediately runs the Mutation `m` in a transaction. This method
// should be used to start a tumble chain when you have transactional checks
// to do (e.g. `m` implements the first transactional link in the chain).
//
// Usually this is called from your application's handlers to begin a tumble
// state machine as a result of some API interaction.
func RunMutation(c context.Context, m Mutation) error {
cfg := getConfig(c)
shardSet, _, _, err := enterTransactionMutation(txnBuf.FilterRDS(c), cfg, m, 0)
if err != nil {
return err
}
fireTasks(c, cfg, shardSet, true)
return nil
}
// RunUnbuffered opens a lightweight unbuffered transaction on "root"
// runs "fn" inside of it. Any mutations returned by "fn" will be registered
// at the end of the transaction if "fn" doesn't return an error.
//
// This is useful as an initial starting point without incurring any of the
// overhead of spinning up a new buffered transaction.
//
// During "fn"'s execution, standard Tumble operations such as PutNamedMutation
// and CancelNamedMutation may be performed.
func RunUnbuffered(c context.Context, root *ds.Key, fn func(context.Context) ([]Mutation, error)) error {
cfg := getConfig(c)
shardSet, _, _, err := enterTransactionInternal(c, cfg, root, fn, 0)
if err != nil {
return err
}
fireTasks(c, cfg, shardSet, true)
return nil
}
func enterTransactionMutation(c context.Context, cfg *Config, m Mutation, round uint64) (
map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
return enterTransactionInternal(c, cfg, m.Root(c), m.RollForward, round)
}
func enterTransactionInternal(c context.Context, cfg *Config, root *ds.Key, fn func(context.Context) ([]Mutation, error), round uint64) (
map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
if root == nil {
return nil, nil, nil, fmt.Errorf("tumble: Passing nil as root is illegal")
}
shardSet := map[taskShard]struct{}(nil)
retMuts := []Mutation(nil)
retMutKeys := []*ds.Key(nil)
err := ds.RunInTransaction(c, func(c context.Context) error {
// do a Get on the root to ensure that this transaction is associated
// with that entity group.
_, _ = ds.Exists(c, root)
muts, err := fn(c)
if err != nil {
return err
}
retMuts = muts
shardSet, retMutKeys, err = putMutations(c, cfg, root, muts, round)
return err
}, nil)
if err != nil {
return nil, nil, nil, err
}
return shardSet, retMuts, retMutKeys, nil
}
// PutNamedMutations writes the provided named mutations to the datastore.
//
// Named mutations are singletons children of a given `parent`. So for any given
// `parent`, there can be only one mutation with any given name. Named
// mutations, by design, cannot collide with unnamed (anonymous) mutations.
// `parent` does NOT need to be a root Key; the mutations will be created as
// direct descendants of the provided parent entity. The names of the various
// mutations should be valid datastore key StringIDs, which generally means that
// they should be UTF-8 strings.
//
// The current implementation reserves 2 characters of the StringID (as of
// writing this means that a named mutation name may only be 498 bytes long).
//
// This can be used to leverage tumble for e.g. cancellable, delayed cleanup
// tasks (like timeouts).
//
// This may be called from within an existing datastore transaction which
// includes `parent` to make this Put atomic with the remainder of the
// transaction.
//
// If called multiple times with the same name, the newly named mutation will
// overwrite the existing mutation (assuming it hasn't run already).
func PutNamedMutations(c context.Context, parent *ds.Key, muts map[string]Mutation) error {
cfg := getConfig(c)
now := clock.Now(c).UTC()
shardSet := map[taskShard]struct{}{}
toPut := make([]*realMutation, 0, len(muts))
for name, m := range muts {
realMut, err := newRealMutation(c, cfg, "n:"+name, parent, m, now)
if err != nil {
logging.WithError(err).Errorf(c, "error creating real mutation for %v", m)
return err
}
toPut = append(toPut, realMut)
shardSet[realMut.shard(cfg)] = struct{}{}
}
err := ds.Put(c, toPut)
if err == nil {
metricCreated.Add(c, int64(len(toPut)), parent.Namespace())
}
fireTasks(c, getConfig(c), shardSet, true)
return err
}
// CancelNamedMutations does a best-effort cancellation of the named mutations.
func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) error {
toDel := make([]*ds.Key, 0, len(names))
nameSet := stringset.NewFromSlice(names...)
nameSet.Iter(func(name string) bool {
toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name, 0, parent))
return true
})
err := ds.Delete(c, toDel)
numFailed := int64(0)
if me, ok := err.(errors.MultiError); ok {
numFailed = int64(len(me))
}
metricDeleted.Add(c, int64(len(toDel))-numFailed, parent.Namespace())
return errors.Filter(err, ds.ErrNoSuchEntity)
}