blob: 12eabf2c00cc5bf06b7928418e471f0b746ad3b5 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package admin
import (
"context"
"crypto/sha256"
"encoding/base64"
"fmt"
"go.chromium.org/luci/cipd/appengine/impl/model"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/dsmapper"
)
// markedTag is a root entity created for each tag marked by some particular
// mapper operation.
//
// We can create many of these at arbitrary rate. Once the mapper finishes
// running and (eventually consistent) datastore indexes settle, we can query
// for all such marked tags using Eq() filter and deal with them in arbitrary
// order or at arbitrary rate.
//
// The core assumption here is that number of "interesting" tags found by
// a mapper is much less than total number of tags, but they may be clustered
// close together (so naively updating them inside the mapper causes transaction
// collisions).
type markedTag struct {
_kind string `gae:"$kind,mapper.MarkedTag"`
_extra datastore.PropertyMap `gae:"-,extra"`
ID string `gae:"$id"` // see .genID()
Job dsmapper.JobID // ID of a mapping job that produced it, for queries
Key *datastore.Key `gae:",noindex"` // key of the corresponding Tag entity
Tag string `gae:",noindex"` // the original tag string (k:v)
Why string `gae:",noindex"` // why the tag was marked, for humans
}
// genID derives an ID for this markedTag entity.
//
// Called internally by visitAndMarkTags().
//
// Each unique tag marked by some particular job gets its own key, i.e. each
// job has its own namespace for marked tags.
func (t *markedTag) genID() {
h := sha256.New()
fmt.Fprintf(h, "%d\n%s", t.Job, t.Key.Encode())
t.ID = base64.RawURLEncoding.EncodeToString(h.Sum(nil))
}
// queryMarkedTags returns a query for markedTags entities produced by a job.
func queryMarkedTags(job dsmapper.JobID) *datastore.Query {
return datastore.NewQuery("mapper.MarkedTag").Eq("Job", job)
}
// multiGetTags fetches a bunch of tags using GetMulti.
//
// On overall RPC error returns a transient error.
//
// If it managed to fetch something, calls cb(key, tag) sequentially for
// all tags that were found (silently skipping missing ones). If a tag was not
// fetched for some other reason, returns the error right away.
//
// If the callback returns an error this error is returned immediately by
// multiGetTags.
func multiGetTags(ctx context.Context, keys []*datastore.Key, cb func(*datastore.Key, *model.Tag) error) error {
tags := make([]model.Tag, len(keys))
for i, k := range keys {
tags[i] = model.Tag{
ID: k.StringID(),
Instance: k.Parent(),
}
}
errAt := func(idx int) error { return nil }
if err := datastore.Get(ctx, tags); err != nil {
merr, ok := err.(errors.MultiError)
if !ok {
return errors.Annotate(err, "GetMulti RPC error when fetching %d tags", len(tags)).Tag(transient.Tag).Err()
}
errAt = func(idx int) error { return merr[idx] }
}
for i, t := range tags {
switch err := errAt(i); {
case err == datastore.ErrNoSuchEntity:
continue
case err != nil:
return errors.Annotate(err, "failed to fetch tag entity with key %s", keys[i]).Err()
}
if err := cb(keys[i], &t); err != nil {
return err
}
}
return nil
}
// visitAndMarkTags fetches Tag entities given their keys and passes them to
// the callback, which may choose to mark a tag by returning non-empty string
// with the human-readable reason why the tag was marked.
//
// Such marked tags are then stored in the datastore and later can be queried.
func visitAndMarkTags(ctx context.Context, job dsmapper.JobID, keys []*datastore.Key, cb func(*model.Tag) string) error {
var marked []*markedTag
err := multiGetTags(ctx, keys, func(key *datastore.Key, tag *model.Tag) error {
if why := cb(tag); why != "" {
mt := &markedTag{
Job: job,
Key: key,
Tag: tag.Tag,
Why: why,
}
mt.genID()
marked = append(marked, mt)
}
return nil
})
if err != nil {
return err
}
if err := datastore.Put(ctx, marked); err != nil {
return errors.Annotate(err, "failed to store %d markedTag(s)", len(marked)).Tag(transient.Tag).Err()
}
return nil
}