blob: ca6ae497c79da6317e9d06e1624c75c1eda11686 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mutate
import (
"context"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
dm "go.chromium.org/luci/dm/api/service/v1"
"go.chromium.org/luci/dm/appengine/model"
ds "go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/tumble"
"go.chromium.org/luci/tumble/bitfield"
"google.golang.org/grpc/codes"
)
// AddDeps transactionally stops the current execution and adds one or more
// dependencies.
type AddDeps struct {
Auth *dm.Execution_Auth
Quests []*model.Quest
// Attempts is attempts we think are missing from the global graph.
Attempts *dm.AttemptList
// Deps are fwddeps we think are missing from the auth'd attempt.
Deps *dm.AttemptList
}
// Root implements tumble.Mutation
func (a *AddDeps) Root(c context.Context) *ds.Key {
return model.AttemptKeyFromID(c, a.Auth.Id.AttemptID())
}
// RollForward implements tumble.Mutation
//
// This mutation is called directly.
func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error) {
// Invalidate the execution key so that they can't make more API calls.
atmpt, ex, err := model.InvalidateExecution(c, a.Auth)
if err != nil {
return
}
if err = ResetExecutionTimeout(c, ex); err != nil {
logging.WithError(err).Errorf(c, "could not reset timeout")
return
}
fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.AttemptID(), a.Deps))
err = errors.Annotate(err, "while filtering deps").Tag(grpcutil.Tag.With(codes.Internal)).Err()
if err != nil || len(fwdDeps) == 0 {
return
}
logging.Fields{"aid": atmpt.ID, "count": len(fwdDeps)}.Infof(c, "added deps")
atmpt.DepMap = bitfield.Make(uint32(len(fwdDeps)))
for i, fdp := range fwdDeps {
fdp.BitIndex = uint32(i)
fdp.ForExecution = atmpt.CurExecution
}
if err = ds.Put(c, fwdDeps, atmpt, ex); err != nil {
err = errors.Annotate(err, "putting stuff").Tag(grpcutil.Tag.With(codes.Internal)).Err()
return
}
mergeQuestMap := map[string]*MergeQuest(nil)
if len(a.Quests) > 0 {
mergeQuestMap = make(map[string]*MergeQuest, len(a.Quests))
for _, q := range a.Quests {
mergeQuestMap[q.ID] = &MergeQuest{Quest: q}
}
}
muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Attempts.GetTo())+len(a.Quests))
for _, dep := range fwdDeps {
toAppend := &muts
if mq := mergeQuestMap[dep.Dependee.Quest]; mq != nil {
toAppend = &mq.AndThen
}
if nums, ok := a.Attempts.GetTo()[dep.Dependee.Quest]; ok {
for _, n := range nums.Nums {
if n == dep.Dependee.Id {
*toAppend = append(*toAppend, &EnsureAttempt{ID: &dep.Dependee})
break
}
}
}
*toAppend = append(*toAppend, &AddBackDep{
Dep: dep.Edge(),
NeedsAck: true,
})
}
// TODO(iannucci): This could run into datastore transaction limits. We could
// allieviate this by only emitting a single mutation which does tail-calls to
// decrease its own, unprocessed size by emitting new MergeQuest mutations.
for _, mut := range mergeQuestMap {
muts = append(muts, mut)
}
return
}
func init() {
tumble.Register((*AddDeps)(nil))
}