blob: addd95f7e3d6d32f7549ad5767044bb138f9bc2f [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"bytes"
"context"
"sort"
"time"
"unicode/utf8"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/server/auth"
api "go.chromium.org/luci/cipd/api/cipd/v1"
"go.chromium.org/luci/cipd/common"
)
// InstanceMetadata represents one instance metadata entry.
//
// It is a key-value pair (along with some additional attributes).
//
// The parent entity is the instance entity. ID is derived from
// the key-value pair, see common.InstanceMetadataFingerprint.
type InstanceMetadata struct {
_kind string `gae:"$kind,InstanceMetadata"`
_extra datastore.PropertyMap `gae:"-,extra"`
Fingerprint string `gae:"$id"` // see common.InstanceMetadataFingerprint
Instance *datastore.Key `gae:"$parent"` // a key of the corresponding Instance entity
Key string `gae:"key"` // the metadata key
Value []byte `gae:"value,noindex"` // the metadata payload, can be big
ContentType string `gae:"content_type,noindex"` // a content type (perhaps guessed)
AttachedBy string `gae:"attached_by"` // who added this metadata
AttachedTs time.Time `gae:"attached_ts"` // when it was added
}
// Proto returns cipd.InstanceMetadata proto with information from this entity.
//
// Assumes the entity is valid.
func (md *InstanceMetadata) Proto() *api.InstanceMetadata {
return &api.InstanceMetadata{
Key: md.Key,
Value: md.Value,
ContentType: md.ContentType,
Fingerprint: md.Fingerprint,
AttachedBy: md.AttachedBy,
AttachedTs: timestamppb.New(md.AttachedTs),
}
}
// AttachMetadata transactionally attaches metadata to an instance.
//
// Mutates `md` in place by calculating fingerprints and "guessing" content
// type if necessary.
//
// Assumes inputs are already validated. Launches a transaction inside (and thus
// can't be a part of a transaction itself). Updates 'inst' in-place with the
// most recent instance state.
//
// Returns gRPC-tagged errors:
//
// NotFound if there's no such instance or package.
// FailedPrecondition if some processors are still running.
// Aborted if some processors have failed.
// Internal on fingerprint collision.
func AttachMetadata(ctx context.Context, inst *Instance, md []*api.InstanceMetadata) error {
now := clock.Now(ctx).UTC()
who := string(auth.CurrentIdentity(ctx))
// Calculate fingerprints and guess content type before the transaction, it is
// relatively slow. Throw away duplicate entries.
seen := stringset.New(len(md))
filtered := md[:0]
for _, m := range md {
m.Fingerprint = common.InstanceMetadataFingerprint(m.Key, m.Value)
if seen.Add(m.Fingerprint) {
if m.ContentType == "" {
if guessPlainText(m.Value) {
m.ContentType = "text/plain"
} else {
m.ContentType = "application/octet-stream"
}
}
filtered = append(filtered, m)
}
}
md = filtered
return Txn(ctx, "AttachMetadata", func(ctx context.Context) error {
if err := CheckInstanceReady(ctx, inst); err != nil {
return err
}
// Prepare to fetch everything from the datastore.
instKey := datastore.KeyForObj(ctx, inst)
ents := make([]*InstanceMetadata, len(md))
for i, m := range md {
ents[i] = &InstanceMetadata{
Fingerprint: m.Fingerprint,
Instance: instKey,
}
}
// For all existing entries, double check their key-value pair matches
// the one we try to attach. If not, we've got a hash collision in the
// fingerprint. This should be super rare, but it doesn't hurt to check
// since we fetched the entity already.
checkExisting := func(ent *InstanceMetadata, msg *api.InstanceMetadata) error {
if ent.Key != msg.Key {
return errors.Reason("fingerprint %q matches two metadata keys %q and %q, aborting", ent.Fingerprint, ent.Key, msg.Key).
Tag(grpcutil.InternalTag).Err()
}
if !bytes.Equal(ent.Value, msg.Value) {
return errors.Reason("fingerprint %q matches metadata key %q with two different values, aborting", ent.Fingerprint, ent.Key).
Tag(grpcutil.InternalTag).Err()
}
return nil
}
// Find entries that don't exist yet. We don't want to blindly overwrite
// existing entries, since we want to preserve their AttachedBy/AttachedTs
// etc. and skip emitting INSTANCE_METADATA_ATTACHED event log entries.
missing := make([]*InstanceMetadata, 0, len(ents))
if err := datastore.Get(ctx, ents); err != nil {
merr, ok := err.(errors.MultiError)
if !ok {
return errors.Annotate(err, "failed to fetch metadata").Tag(transient.Tag).Err()
}
for i, err := range merr {
switch err {
case nil:
if err := checkExisting(ents[i], md[i]); err != nil {
return err
}
case datastore.ErrNoSuchEntity:
// Populate the rest of the entity fields from input proto fields.
ent, msg := ents[i], md[i]
ent.Key = msg.Key
ent.Value = msg.Value
ent.ContentType = msg.ContentType
ent.AttachedBy = who
ent.AttachedTs = now
missing = append(missing, ent)
default:
return errors.Annotate(err, "failed to fetch metadata %q", ents[i].Fingerprint).Tag(transient.Tag).Err()
}
}
} else {
// No error at all => all entries already exist, just check them.
for i := range ents {
if err := checkExisting(ents[i], md[i]); err != nil {
return err
}
}
}
if len(missing) == 0 {
return nil
}
// Store everything.
if err := datastore.Put(ctx, missing); err != nil {
return transient.Tag.Apply(err)
}
return flushToEventLog(ctx, missing, api.EventKind_INSTANCE_METADATA_ATTACHED, inst, who, now)
})
}
// DetachMetadata detaches a bunch of metadata entries from an instance.
//
// Assumes inputs are already validated. If Fingerprint is populated, uses it
// to identifies entries to detach. Otherwise calculates it from Key and Value
// (which must be populated in this case).
//
// Launches a transaction inside (and thus can't be a part of a transaction
// itself).
func DetachMetadata(ctx context.Context, inst *Instance, md []*api.InstanceMetadata) error {
now := clock.Now(ctx).UTC()
who := string(auth.CurrentIdentity(ctx))
// Calculate fingerprints before the transaction, it is relatively slow. Throw
// away duplicate entries.
seen := stringset.New(len(md))
filtered := md[:0]
for _, m := range md {
if m.Fingerprint == "" {
m.Fingerprint = common.InstanceMetadataFingerprint(m.Key, m.Value)
}
if seen.Add(m.Fingerprint) {
filtered = append(filtered, m)
}
}
md = filtered
return Txn(ctx, "DetachMetadata", func(ctx context.Context) error {
// Prepare to fetch everything from the datastore to figure out what entries
// actually exist, for the event log.
instKey := datastore.KeyForObj(ctx, inst)
ents := make([]*InstanceMetadata, len(md))
for i, m := range md {
ents[i] = &InstanceMetadata{
Fingerprint: m.Fingerprint,
Instance: instKey,
}
}
existing := make([]*InstanceMetadata, 0, len(ents))
if err := datastore.Get(ctx, ents); err != nil {
merr, ok := err.(errors.MultiError)
if !ok {
return errors.Annotate(err, "failed to fetch metadata").Tag(transient.Tag).Err()
}
for i, err := range merr {
switch err {
case nil:
existing = append(existing, ents[i])
case datastore.ErrNoSuchEntity:
// Skip, that's ok.
default:
return errors.Annotate(err, "failed to fetch metadata %q", ents[i].Fingerprint).Tag(transient.Tag).Err()
}
}
} else {
existing = ents
}
if len(existing) == 0 {
return nil
}
// Store everything.
if err := datastore.Delete(ctx, existing); err != nil {
return transient.Tag.Apply(err)
}
return flushToEventLog(ctx, existing, api.EventKind_INSTANCE_METADATA_DETACHED, inst, who, now)
})
}
// ListMetadata lists all instance metadata.
//
// The result is ordered by AttachedTs (the most recent first).
func ListMetadata(ctx context.Context, inst *Instance) ([]*InstanceMetadata, error) {
// Note: 'Order' here is unnecessary, since we sort in memory later anyhow.
// But it is here in an anticipation of eventually implementing pagination.
q := datastore.NewQuery("InstanceMetadata").
Ancestor(datastore.KeyForObj(ctx, inst)).
Order("-attached_ts")
var out []*InstanceMetadata
if err := datastore.GetAll(ctx, q, &out); err != nil {
return nil, errors.Annotate(err, "datastore query failed").Tag(transient.Tag).Err()
}
orderByTsAndKey(out)
return out, nil
}
// ListMetadataWithKeys lists instance metadata with any of the given keys.
//
// The result is ordered by AttachedTs (the most recent first).
func ListMetadataWithKeys(ctx context.Context, inst *Instance, keys []string) ([]*InstanceMetadata, error) {
if len(keys) == 0 {
panic("must not be empty")
}
qs := make([]*datastore.Query, len(keys))
for i, key := range keys {
qs[i] = datastore.NewQuery("InstanceMetadata").
Ancestor(datastore.KeyForObj(ctx, inst)).
Eq("key", key).
Order("-attached_ts")
}
var out []*InstanceMetadata
err := datastore.RunMulti(ctx, qs, func(md *InstanceMetadata) {
out = append(out, md)
})
if err != nil {
return nil, errors.Annotate(err, "datastore query failed").Tag(transient.Tag).Err()
}
orderByTsAndKey(out)
return out, nil
}
// orderByTsAndKey order entries by (-AttachedTs, Key).
func orderByTsAndKey(md []*InstanceMetadata) {
sort.Slice(md, func(i, j int) bool {
l, r := md[i], md[j]
if l.AttachedTs.Equal(r.AttachedTs) {
return l.Key < r.Key
}
return l.AttachedTs.After(r.AttachedTs)
})
}
// flushToEventLog emits a bunch of event log entries with metadata.
func flushToEventLog(ctx context.Context, ents []*InstanceMetadata, kind api.EventKind, inst *Instance, who string, now time.Time) error {
nowTS := timestamppb.New(now)
events := Events{}
for _, ent := range ents {
// Export only valid UTF-8 values of known text-like content types.
mdValue := ""
if IsTextContentType(ent.ContentType) {
mdValue = string(ent.Value)
if !utf8.ValidString(mdValue) {
mdValue = ""
}
}
events.Emit(&api.Event{
Kind: kind,
Package: inst.Package.StringID(),
Instance: inst.InstanceID,
Who: who,
When: nowTS,
MdKey: ent.Key,
MdValue: mdValue,
MdContentType: ent.ContentType,
MdFingerprint: ent.Fingerprint,
})
}
return events.Flush(ctx)
}
// guessPlainText returns true for smallish printable ASCII strings.
func guessPlainText(v []byte) bool {
if len(v) >= 32768 {
return false
}
for _, b := range v {
// Acceptable non-printable chars.
if b == '\r' || b == '\n' || b == '\t' {
continue
}
// Everything else should be from a printable ASCII range.
if b < ' ' || b >= 0x7F {
return false
}
}
return true
}