blob: 3d3e6988af111103b4fee57acdff9e364b657a94 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"context"
"encoding/json"
"fmt"
"io"
"mime"
"net/http"
"sort"
"strings"
"time"
"cloud.google.com/go/bigquery"
"google.golang.org/api/option"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/bq"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/server/tq"
api "go.chromium.org/luci/cipd/api/cipd/v1"
"go.chromium.org/luci/cipd/appengine/impl/metadata"
"go.chromium.org/luci/cipd/appengine/impl/model/tasks"
)
// textContentTypes is a list of text-like content types (in addition to
// "text/*").
//
// Metadata entries with such content types will have their values exported to
// the event log, see IsTextContentType.
var textContentTypes = stringset.NewFromSlice(
"application/json",
"application/jwt",
)
// EventsEpoch is used to calculate timestamps used to order entities in the
// datastore.
//
// time.Time is stored with millisecond precision in the datastore. This is not
// enough to preserve the order of cipd.Event protos (that use nanoseconds for
// ordering events within a transactional group).
//
// So instead we store (event.When-EventsEpoch).Nanos() as int64. The original
// timestamp is still available in cipd.Event proto. This is only for datastore
// index.
var EventsEpoch = time.Date(2018, time.January, 1, 0, 0, 0, 0, time.UTC)
// NoInstance is used in place of empty Instance field in Event entity, so that
// we can query for all events that are not associated with instances and only
// them (these are package-level events).
const NoInstance = "NONE"
// Event in a global structured event log.
//
// It exists in both BigQuery (for adhoc queries) and in Datastore (for showing
// in web UI, e.g. for "recent tags" feature).
//
// ID as auto-generated. The parent entity is the corresponding Package entity
// (so that events can be committed transactionally with actual changes).
type Event struct {
_kind string `gae:"$kind,Event"`
_extra datastore.PropertyMap `gae:"-,extra"`
ID int64 `gae:"$id"` // auto-generated
Package *datastore.Key `gae:"$parent"` // see PackageKey()
Event []byte `gae:",noindex"` // serialized cipd.Event proto
// Fields extracted from cipd.Event for indexing.
Kind api.EventKind
Who string
When int64 // nanoseconds since EventsEpoch
Instance string // literal 'NONE' for package events, to be able to query them
Ref string
Tag string
MdKey string
MdFingerprint string
}
// FromProto fills in the entity based on the proto message.
//
// Panics if the proto can't be serialized. This should never happen.
//
// Returns the entity itself for easier chaining.
func (e *Event) FromProto(ctx context.Context, p *api.Event) *Event {
blob, err := proto.Marshal(p)
if err != nil {
panic(err)
}
e.Package = PackageKey(ctx, p.Package)
e.Event = blob
e.Kind = p.Kind
e.Who = p.Who
e.When = p.When.AsTime().Sub(EventsEpoch).Nanoseconds()
if p.Instance != "" {
e.Instance = p.Instance
} else {
e.Instance = NoInstance
}
e.Ref = p.Ref
e.Tag = p.Tag
e.MdKey = p.MdKey
e.MdFingerprint = p.MdFingerprint
return e
}
// NewEventsQuery returns an unfiltered query for Event entities.
func NewEventsQuery() *datastore.Query {
return datastore.NewQuery("Event")
}
// QueryEvents fetches and deserializes all events matching the query, sorting
// them by timestamp (most recent first).
//
// Start the query with NewEventsQuery.
func QueryEvents(ctx context.Context, q *datastore.Query) ([]*api.Event, error) {
var ev []Event
if err := datastore.GetAll(ctx, q.Order("-When"), &ev); err != nil {
return nil, transient.Tag.Apply(err)
}
out := make([]*api.Event, len(ev))
for i, e := range ev {
out[i] = &api.Event{}
if err := proto.Unmarshal(e.Event, out[i]); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal Event with ID %d", e.ID).Err()
}
}
return out, nil
}
// Events collects events emitted in some transaction and then flushes them.
type Events struct {
ev []*api.Event
}
// Emit adds an event to be flushed later in Flush.
//
// 'Who' and 'When' fields are populated in Flush using values from the context.
func (t *Events) Emit(e *api.Event) {
t.ev = append(t.ev, e)
}
// Flush sends all pending events to the datastore/bigquery.
//
// Returned errors are tagged as transient. On success, clears the pending
// events queue.
func (t *Events) Flush(ctx context.Context) error {
if len(t.ev) == 0 {
return nil
}
when := clock.Now(ctx).UTC()
who := string(auth.CurrentIdentity(ctx))
entities := make([]*Event, len(t.ev))
for idx, e := range t.ev {
// Make events in a batch ordered by time by abusing nanoseconds precision.
e.When = timestamppb.New(when.Add(time.Duration(idx)))
e.Who = who
entities[idx] = (&Event{}).FromProto(ctx, e)
}
err := parallel.FanOutIn(func(tasks chan<- func() error) {
tasks <- func() error { return datastore.Put(ctx, entities) }
tasks <- func() error { return flushToSink(ctx, t.ev) }
})
if err != nil {
return transient.Tag.Apply(err)
}
t.ev = t.ev[:0]
return nil
}
// EmitEvent adds a single event to the event log.
//
// Prefer using Events to add multiple events at once.
func EmitEvent(ctx context.Context, e *api.Event) error {
ev := Events{}
ev.Emit(e)
return ev.Flush(ctx)
}
// IsTextContentType checks the metadata content type against a list of
// known-good values.
func IsTextContentType(ct string) bool {
ct, _, err := mime.ParseMediaType(ct)
if err != nil {
return false
}
if strings.HasPrefix(ct, "text/") {
return true
}
return textContentTypes.Has(ct)
}
////////////////////////////////////////////////////////////////////////////////
var sinkCtxKey = "cipd model events sink"
type sink func(context.Context, []*api.Event) error
func installSink(ctx context.Context, s sink) context.Context {
return context.WithValue(ctx, &sinkCtxKey, s)
}
func flushToSink(ctx context.Context, ev []*api.Event) error {
if s, _ := ctx.Value(&sinkCtxKey).(sink); s != nil {
return s(ctx, ev)
}
return nil
}
////////////////////////////////////////////////////////////////////////////////
// BigQueryEventLogger moves events from PubSub to BQ.
type BigQueryEventLogger struct {
bq *bigquery.Client
}
// NewBigQueryEventLogger constructs a logger that writes to the given project.
func NewBigQueryEventLogger(ctx context.Context, projectID string) (*BigQueryEventLogger, error) {
tr, err := auth.GetRPCTransport(ctx, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return nil, err
}
bq, err := bigquery.NewClient(ctx, projectID, option.WithHTTPClient(&http.Client{Transport: tr}))
if err != nil {
return nil, err
}
return &BigQueryEventLogger{bq: bq}, nil
}
// RegisterSink registers this event logger as events sink in the context.
//
// It will use the given dispatcher to route messages between processes.
func (l *BigQueryEventLogger) RegisterSink(ctx context.Context, disp *tq.Dispatcher, prod bool) context.Context {
disp.RegisterTaskClass(tq.TaskClass{
ID: "log-events",
Prototype: &tasks.LogEvents{},
Kind: tq.Transactional,
Topic: "bigquery-log",
Custom: func(ctx context.Context, m proto.Message) (*tq.CustomPayload, error) {
blob, err := (protojson.MarshalOptions{Indent: "\t"}).Marshal(m)
if err != nil {
return nil, err
}
return &tq.CustomPayload{Body: blob}, nil
},
})
return installSink(ctx, func(ctx context.Context, ev []*api.Event) error {
task := &tasks.LogEvents{Events: ev}
if logging.IsLogging(ctx, logging.Debug) {
blob, err := (prototext.MarshalOptions{Indent: "\t"}).Marshal(task)
if err != nil {
logging.Errorf(ctx, "Failed to marshal events to proto text: %s", err)
} else {
logging.Debugf(ctx, "Logged events:\n%s", blob)
}
}
if !prod {
return nil
}
return disp.AddTask(ctx, &tq.Task{Payload: task})
})
}
// HandlePubSubPush handles incoming PubSub push messages produced by the sink.
func (l *BigQueryEventLogger) HandlePubSubPush(ctx context.Context, body io.Reader) error {
blob, err := io.ReadAll(body)
if err != nil {
return errors.Annotate(err, "failed to read the request body").Err()
}
// See https://cloud.google.com/pubsub/docs/push#receiving_messages
var msg struct {
Message struct {
Attributes map[string]string `json:"attributes"`
Data []byte `json:"data"`
MessageID string `json:"messageId"`
} `json:"message"`
}
if json.Unmarshal(blob, &msg); err != nil {
return errors.Annotate(err, "failed to unmarshal PubSub message").Err()
}
task := &tasks.LogEvents{}
if err := protojson.Unmarshal(msg.Message.Data, task); err != nil {
return errors.Annotate(err, "failed to unmarshal the task").Err()
}
return l.insert(ctx, task.Events, msg.Message.MessageID)
}
// insert inserts rows into the BQ table.
func (l *BigQueryEventLogger) insert(ctx context.Context, ev []*api.Event, dedupID string) error {
rows := make([]bigquery.ValueSaver, len(ev))
for idx, e := range ev {
rows[idx] = &bq.Row{
Message: e,
InsertID: fmt.Sprintf("v1:%s:%d", dedupID, idx),
}
}
return l.bq.Dataset("cipd").Table("events").Inserter().Put(ctx, rows)
}
////////////////////////////////////////////////////////////////////////////////
var sortedRoles []api.Role
func init() {
sortedRoles = make([]api.Role, 0, len(api.Role_name)-1)
for r := range api.Role_name {
if role := api.Role(r); role != api.Role_ROLE_UNSPECIFIED {
sortedRoles = append(sortedRoles, role)
}
}
sort.Slice(sortedRoles, func(i, j int) bool {
return sortedRoles[i] < sortedRoles[j]
})
}
// EmitMetadataEvents compares two metadatum of a prefix and emits events for
// fields that have changed.
func EmitMetadataEvents(ctx context.Context, before, after *api.PrefixMetadata) error {
if before.Prefix != after.Prefix {
panic(fmt.Sprintf("comparing metadata for different prefixes: %q != %q", before.Prefix, after.Prefix))
}
prefix := after.Prefix
prev := metadata.GetACLs(before)
next := metadata.GetACLs(after)
var granted []*api.PrefixMetadata_ACL
var revoked []*api.PrefixMetadata_ACL
for _, role := range sortedRoles {
if added := stringSetDiff(next[role], prev[role]); len(added) != 0 {
granted = append(granted, &api.PrefixMetadata_ACL{
Role: role,
Principals: added,
})
}
if removed := stringSetDiff(prev[role], next[role]); len(removed) != 0 {
revoked = append(revoked, &api.PrefixMetadata_ACL{
Role: role,
Principals: removed,
})
}
}
if len(granted) == 0 && len(revoked) == 0 {
return nil
}
return EmitEvent(ctx, &api.Event{
Kind: api.EventKind_PREFIX_ACL_CHANGED,
Package: prefix,
GrantedRole: granted,
RevokedRole: revoked,
})
}
// stringSetDiff returns sorted(a-b).
func stringSetDiff(a, b stringset.Set) []string {
diff := a.Difference(b).ToSlice()
sort.Strings(diff)
return diff
}