blob: 317f11297322c3a223bc16d78c4f6d28295f13a7 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cloud
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
"cloud.google.com/go/datastore"
"google.golang.org/api/iterator"
pb "google.golang.org/genproto/googleapis/datastore/v1"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/gae/impl/prod/constraints"
ds "go.chromium.org/luci/gae/service/datastore"
)
type cloudDatastore struct {
client *datastore.Client
}
func (cds *cloudDatastore) use(c context.Context) context.Context {
return ds.SetRawFactory(c, func(ic context.Context) ds.RawInterface {
return &boundDatastore{
Context: ic,
cloudDatastore: cds,
transaction: datastoreTransaction(ic),
kc: ds.GetKeyContext(ic),
}
})
}
// boundDatastore is a bound instance of the cloudDatastore installed in the
// Context.
type boundDatastore struct {
context.Context
// Context is the bound user Context. It includes the datastore namespace, if
// one is set.
*cloudDatastore
transaction *transactionWrapper
kc ds.KeyContext
}
func (bds *boundDatastore) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error {
nativeKeys, err := bds.client.AllocateIDs(bds, gaeKeysToNative(keys))
if err != nil {
return normalizeError(err)
}
for i, key := range nativeKeys {
cb(i, nativeKeyToGAE(bds.kc, key), nil)
}
return nil
}
func (bds *boundDatastore) RunInTransaction(fn func(context.Context) error, opts *ds.TransactionOptions) error {
if bds.transaction != nil {
return errors.New("nested transactions are not supported")
}
var txOpts []datastore.TransactionOption
if opts != nil {
if opts.ReadOnly {
txOpts = append(txOpts, datastore.ReadOnly)
}
if opts.Attempts > 0 {
txOpts = append(txOpts, datastore.MaxAttempts(opts.Attempts))
}
}
_, err := bds.client.RunInTransaction(bds, func(tx *datastore.Transaction) error {
return fn(withDatastoreTransaction(bds, tx))
}, txOpts...)
return normalizeError(err)
}
func (bds *boundDatastore) DecodeCursor(s string) (ds.Cursor, error) {
cursor, err := datastore.DecodeCursor(s)
return cursor, normalizeError(err)
}
func (bds *boundDatastore) Run(q *ds.FinalizedQuery, cb ds.RawRunCB) error {
it := bds.client.Run(bds, bds.prepareNativeQuery(q))
cursorFn := func() (ds.Cursor, error) {
return it.Cursor()
}
for {
var npl *nativePropertyLoader
if !q.KeysOnly() {
npl = &nativePropertyLoader{kc: bds.kc}
}
nativeKey, err := it.Next(npl)
if err != nil {
if err == iterator.Done {
return nil
}
return normalizeError(err)
}
var pmap ds.PropertyMap
if npl != nil {
pmap = npl.pmap
}
if err := cb(nativeKeyToGAE(bds.kc, nativeKey), pmap, cursorFn); err != nil {
if err == ds.Stop {
return nil
}
return normalizeError(err)
}
}
}
func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) {
// If the query is eventually consistent, use faster server-side aggregation.
// For strongly-consistent queries we'll have to do local counting.
if q.EventuallyConsistent() {
res, err := bds.client.RunAggregationQuery(bds,
bds.prepareNativeQuery(q).
NewAggregationQuery().
WithCount("total"),
)
if err != nil {
return -1, normalizeError(err)
}
total, _ := res["total"].(*pb.Value)
if total == nil {
return -1, fmt.Errorf("aggregation result is unexpectedly missing")
}
return int64(total.GetIntegerValue()), nil
}
// Local counting. It is the only strongly-consistent method.
v, err := bds.client.Count(bds, bds.prepareNativeQuery(q))
if err != nil {
return -1, normalizeError(err)
}
return int64(v), nil
}
func fixMultiError(err error) error {
if err == nil {
return nil
}
if baseME, ok := err.(datastore.MultiError); ok {
return errors.NewMultiError(baseME...)
}
return err
}
func idxCallbacker(err error, amt int, cb func(idx int, err error)) error {
if err == nil {
for i := 0; i < amt; i++ {
cb(i, nil)
}
return nil
}
err = fixMultiError(err)
if me, ok := err.(errors.MultiError); ok {
for i, err := range me {
cb(i, normalizeError(err))
}
return nil
}
return normalizeError(err)
}
func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error {
nativeKeys := gaeKeysToNative(keys)
nativePLS := make([]*nativePropertyLoader, len(nativeKeys))
for i := range nativePLS {
nativePLS[i] = &nativePropertyLoader{kc: bds.kc}
}
var err error
if bds.transaction != nil {
// Transactional GetMulti.
err = bds.transaction.GetMulti(nativeKeys, nativePLS)
} else {
// Non-transactional GetMulti.
err = bds.client.GetMulti(bds, nativeKeys, nativePLS)
}
return idxCallbacker(err, len(nativePLS), func(idx int, err error) {
cb(idx, nativePLS[idx].pmap, err)
})
}
func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB) error {
nativeKeys := gaeKeysToNative(keys)
nativePLS := make([]*nativePropertySaver, len(vals))
for i := range nativePLS {
nativePLS[i] = &nativePropertySaver{kc: bds.kc, pmap: vals[i]}
}
var err error
if bds.transaction != nil {
// Transactional PutMulti.
//
// In order to simulate the presence of mid-transaction key allocation, we
// will identify any incomplete keys and allocate IDs for them. This is
// potentially wasteful in the event of failed or retried transactions, but
// it is required to maintain API compatibility with the datastore
// interface.
var incompleteKeys []*datastore.Key
var incompleteKeyMap map[int]int
for i, k := range nativeKeys {
if k.Incomplete() {
if incompleteKeyMap == nil {
// Optimization: if there are any incomplete keys, allocate room for
// the full range.
incompleteKeyMap = make(map[int]int, len(nativeKeys)-i)
incompleteKeys = make([]*datastore.Key, 0, len(nativeKeys)-i)
}
incompleteKeyMap[len(incompleteKeys)] = i
incompleteKeys = append(incompleteKeys, k)
}
}
if len(incompleteKeys) > 0 {
idKeys, err := bds.client.AllocateIDs(bds, incompleteKeys)
if err != nil {
return err
}
for i, idKey := range idKeys {
nativeKeys[incompleteKeyMap[i]] = idKey
}
}
_, err = bds.transaction.PutMulti(nativeKeys, nativePLS)
} else {
// Non-transactional PutMulti.
nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS)
}
return idxCallbacker(err, len(nativeKeys), func(idx int, err error) {
if err == nil {
cb(idx, nativeKeyToGAE(bds.kc, nativeKeys[idx]), nil)
return
}
cb(idx, nil, err)
})
}
func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
nativeKeys := gaeKeysToNative(keys)
var err error
if bds.transaction != nil {
// Transactional DeleteMulti.
err = bds.transaction.DeleteMulti(nativeKeys)
} else {
// Non-transactional DeleteMulti.
err = bds.client.DeleteMulti(bds, nativeKeys)
}
return idxCallbacker(err, len(nativeKeys), cb)
}
func (bds *boundDatastore) WithoutTransaction() context.Context {
return withoutDatastoreTransaction(bds)
}
func (bds *boundDatastore) CurrentTransaction() ds.Transaction {
if bds.transaction == nil {
return nil
}
return bds.transaction
}
func (bds *boundDatastore) Constraints() ds.Constraints { return constraints.DS() }
func (bds *boundDatastore) GetTestable() ds.Testable { return nil }
func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore.Query {
nq := datastore.NewQuery(fq.Kind())
if bds.transaction != nil {
// NOTE: As of 2021 Q1 this is safe because it's documented that:
//
// "Queries are re-usable and it is safe to call Query.Run from concurrent
// goroutines"
//
// Inspecting the datastore client code reveals that it only uses the `id`
// field of the *Transaction object, not any of the state within the
// *Transaction object which needs protection via the *transactionWrapper.
nq = nq.Transaction(bds.transaction.tx)
}
if ns := bds.kc.Namespace; ns != "" {
nq = nq.Namespace(ns)
}
// nativeFilter translates a filter field. If the translation fails, we'll
// pass the result through to the underlying datastore and allow it to
// reject it.
nativeFilter := func(prop ds.Property) any {
if np, err := gaePropertyToNative(bds.kc, "", prop); err == nil {
return np.Value
}
return prop.Value()
}
// Equality filters.
for field, props := range fq.EqFilters() {
if field != "__ancestor__" {
for _, prop := range props {
nq = nq.FilterField(field, "=", nativeFilter(prop))
}
}
}
for field, slices := range fq.InFilters() {
for _, slice := range slices {
native := make([]any, len(slice))
for idx, prop := range slice {
native[idx] = nativeFilter(prop)
}
nq = nq.FilterField(field, "in", native)
}
}
// Inequality filters.
if ineq := fq.IneqFilterProp(); ineq != "" {
if field, op, prop := fq.IneqFilterLow(); field != "" {
nq = nq.FilterField(field, op, nativeFilter(prop))
}
if field, op, prop := fq.IneqFilterHigh(); field != "" {
nq = nq.FilterField(field, op, nativeFilter(prop))
}
}
start, end := fq.Bounds()
if start != nil {
nq = nq.Start(start.(datastore.Cursor))
}
if end != nil {
nq = nq.End(end.(datastore.Cursor))
}
if fq.Distinct() {
nq = nq.Distinct()
}
if fq.KeysOnly() {
nq = nq.KeysOnly()
}
if limit, ok := fq.Limit(); ok {
nq = nq.Limit(int(limit))
}
if offset, ok := fq.Offset(); ok {
nq = nq.Offset(int(offset))
}
if proj := fq.Project(); proj != nil {
nq = nq.Project(proj...)
}
if ancestor := fq.Ancestor(); ancestor != nil {
nq = nq.Ancestor(gaeKeyToNative(ancestor))
}
if fq.EventuallyConsistent() {
nq = nq.EventualConsistency()
}
for _, ic := range fq.Orders() {
prop := ic.Property
if ic.Descending {
prop = "-" + prop
}
nq = nq.Order(prop)
}
return nq
}
func gaePropertyToNative(kc ds.KeyContext, name string, pdata ds.PropertyData) (nativeProp datastore.Property, err error) {
nativeProp.Name = name
convert := func(prop *ds.Property) (any, error) {
switch pt := prop.Type(); pt {
case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.PTString, ds.PTFloat:
return prop.Value(), nil
case ds.PTGeoPoint:
gp := prop.Value().(ds.GeoPoint)
return datastore.GeoPoint{Lat: gp.Lat, Lng: gp.Lng}, nil
case ds.PTKey:
return gaeKeyToNative(prop.Value().(*ds.Key)), nil
case ds.PTPropertyMap:
return gaeEntityToNative(kc, prop.Value().(ds.PropertyMap)), nil
default:
return nil, fmt.Errorf("unsupported property type: %v", pt)
}
}
switch t := pdata.(type) {
case ds.Property:
if nativeProp.Value, err = convert(&t); err != nil {
return
}
nativeProp.NoIndex = (t.IndexSetting() != ds.ShouldIndex)
case ds.PropertySlice:
// Don't index by default. If *any* sub-property requests being indexed,
// then we will index.
nativeProp.NoIndex = true
// Pack this into an any so it is marked as a multi-value.
multiProp := make([]any, len(t))
for i := range t {
prop := &t[i]
if multiProp[i], err = convert(prop); err != nil {
return
}
if prop.IndexSetting() == ds.ShouldIndex {
nativeProp.NoIndex = false
}
}
nativeProp.Value = multiProp
default:
err = fmt.Errorf("unsupported PropertyData type for %q: %T", name, pdata)
}
return
}
func nativePropertyToGAE(kc ds.KeyContext, nativeProp datastore.Property) (name string, pdata ds.PropertyData, err error) {
name = nativeProp.Name
convert := func(nv any, prop *ds.Property) error {
switch nvt := nv.(type) {
case nil:
nv = nil
case int64, bool, string, float64:
break
case []byte:
if len(nvt) == 0 {
// Cloud datastore library returns []byte{} if it is empty.
// Make it nil as more convenient to deal with in tests.
nv = []byte(nil)
}
case time.Time:
// Cloud datastore library returns local time.
nv = nvt.UTC()
case datastore.GeoPoint:
nv = ds.GeoPoint{Lat: nvt.Lat, Lng: nvt.Lng}
case *datastore.Key:
nv = nativeKeyToGAE(kc, nvt)
case *datastore.Entity:
nv = nativeEntityToGAE(kc, nvt)
default:
return fmt.Errorf("unsupported datastore.Value type for %q: %T", name, nvt)
}
indexSetting := ds.ShouldIndex
if nativeProp.NoIndex {
indexSetting = ds.NoIndex
}
prop.SetValue(nv, indexSetting)
return nil
}
// Slice of supported native type. Convert this into PropertySlice.
//
// It must be an []any.
if nativeValues, ok := nativeProp.Value.([]any); ok {
pslice := make(ds.PropertySlice, len(nativeValues))
for i, nv := range nativeValues {
if err = convert(nv, &pslice[i]); err != nil {
return
}
}
pdata = pslice
return
}
var prop ds.Property
if err = convert(nativeProp.Value, &prop); err != nil {
return
}
pdata = prop
return
}
func gaeKeyToNative(key *ds.Key) *datastore.Key {
var nativeKey *datastore.Key
_, _, toks := key.Split()
for _, tok := range toks {
nativeKey = &datastore.Key{
Kind: tok.Kind,
ID: tok.IntID,
Name: tok.StringID,
Parent: nativeKey,
Namespace: key.Namespace(),
}
}
return nativeKey
}
func gaeKeysToNative(keys []*ds.Key) []*datastore.Key {
nativeKeys := make([]*datastore.Key, len(keys))
for i, key := range keys {
nativeKeys[i] = gaeKeyToNative(key)
}
return nativeKeys
}
func nativeKeyToGAE(kc ds.KeyContext, nativeKey *datastore.Key) *ds.Key {
toks := make([]ds.KeyTok, 0, 2)
cur := nativeKey
for {
toks = append(toks, ds.KeyTok{Kind: cur.Kind, IntID: cur.ID, StringID: cur.Name})
cur = cur.Parent
if cur == nil {
break
}
}
// Reverse "toks" so we have ancestor-to-child lineage.
for i := 0; i < len(toks)/2; i++ {
ri := len(toks) - i - 1
toks[i], toks[ri] = toks[ri], toks[i]
}
kc.Namespace = nativeKey.Namespace
return kc.NewKeyToks(toks)
}
// nativeEntityToGAE returns a ds.PropertyMap representation of the given
// *datastore.Entity. Since properties can themselves be *datastore.Entities,
// the caller is responsible for ensuring there are no reference cycles.
func nativeEntityToGAE(kc ds.KeyContext, ent *datastore.Entity) ds.PropertyMap {
if ent == nil {
return nil
}
pm := make(ds.PropertyMap, len(ent.Properties)+4)
if ent.Key != nil {
// Populate all potentially supported meta properties. Whatever consumes
// the property map (usually the default struct PLS) will choose properties
// it cares about and ignore the rest.
ds.PopulateKey(pm, nativeKeyToGAE(kc, ent.Key))
}
// Property ordering is lost since it's encoded to a map, but *datastore.Entity is
// sourced from https://godoc.org/google.golang.org/genproto/googleapis/datastore/v1#Entity
// which originally held properties in a map to begin with, meaning order is irrelevant.
for _, p := range ent.Properties {
_, prop, err := nativePropertyToGAE(kc, p)
if err != nil {
// Shouldn't happen. It means the *datastore.Entity contained an unsupported type.
panic(err)
}
pm[p.Name] = prop
}
return pm
}
// gaeEntityToNative returns a *datastore.Entity representation of the given
// PropertyMap (assumed to have been produced by nativeEntityToGAE).
func gaeEntityToNative(kc ds.KeyContext, pm ds.PropertyMap) *datastore.Entity {
// Ensure stable order. Skip meta fields, they'll be used in NewKeyFromMeta.
keys := make([]string, 0, len(pm))
for name := range pm {
if !strings.HasPrefix(name, "$") {
keys = append(keys, name)
}
}
sort.Strings(keys)
ent := &datastore.Entity{
Properties: make([]datastore.Property, 0, len(keys)),
}
// Try to extract the entity key from available meta fields. Ignore incomplete
// keys. This actually happens for structs that don't have any explicitly
// defined meta properties (because `$kind` is implicitly defined, so they end
// up with an incomplete key, since they have no `$id`).
if key, _ := kc.NewKeyFromMeta(pm); key != nil && !key.IsIncomplete() {
ent.Key = gaeKeyToNative(key)
}
// Convert non-meta fields.
for _, name := range keys {
p, err := gaePropertyToNative(kc, name, pm[name])
if err != nil {
// Shouldn't happen. It means nativeEntityToGAE encoded an unsupported type.
panic(err)
}
ent.Properties = append(ent.Properties, p)
}
return ent
}
// nativePropertyLoader is a datastore.PropertyLoadSaver that implement Load
// by writing properties into a ds.PropertyMap.
type nativePropertyLoader struct {
kc ds.KeyContext
pmap ds.PropertyMap // starts as nil, gets created and populated in Load
}
var _ datastore.PropertyLoadSaver = (*nativePropertyLoader)(nil)
func (npl *nativePropertyLoader) Load(props []datastore.Property) error {
if npl.pmap == nil {
npl.pmap = make(ds.PropertyMap, len(props))
}
for _, nativeProp := range props {
name, pdata, err := nativePropertyToGAE(npl.kc, nativeProp)
if err != nil {
return err
}
if _, ok := npl.pmap[name]; ok {
return fmt.Errorf("duplicate properties for %q", name)
}
npl.pmap[name] = pdata
}
return nil
}
func (npl *nativePropertyLoader) Save() ([]datastore.Property, error) {
panic("must not be called")
}
// nativePropertySaver is a datastore.PropertyLoadSaver that implement Save
// by reading properties from a ds.PropertyMap.
type nativePropertySaver struct {
kc ds.KeyContext
pmap ds.PropertyMap // must be set by the caller
}
var _ datastore.PropertyLoadSaver = (*nativePropertySaver)(nil)
func (nps *nativePropertySaver) Load(props []datastore.Property) error {
panic("must not be called")
}
func (nps *nativePropertySaver) Save() ([]datastore.Property, error) {
if len(nps.pmap) == 0 {
return nil, nil
}
props := make([]datastore.Property, 0, len(nps.pmap))
for name, pdata := range nps.pmap {
// Strip meta.
if strings.HasPrefix(name, "$") {
continue
}
nativeProp, err := gaePropertyToNative(nps.kc, name, pdata)
if err != nil {
return nil, err
}
props = append(props, nativeProp)
}
return props, nil
}
// transactionWrapper provides a Mutex around mutation calls on the Transaction.
//
// This is required until https://github.com/googleapis/google-cloud-go/issues/3750
// is fixed.
type transactionWrapper struct {
mu sync.Mutex
tx *datastore.Transaction
}
func (tw *transactionWrapper) GetMulti(keys []*datastore.Key, dst any) (err error) {
// We don't acquire a lock here because as of 2021 Q1 Transaction.GetMulti
// only reads the Transaction.id field, and doesn't make any mutations to the
// *Transaction state at all.
return tw.tx.GetMulti(keys, dst)
}
func (tw *transactionWrapper) PutMulti(keys []*datastore.Key, src any) (ret []*datastore.PendingKey, err error) {
tw.mu.Lock()
defer tw.mu.Unlock()
return tw.tx.PutMulti(keys, src)
}
func (tw *transactionWrapper) DeleteMulti(keys []*datastore.Key) (err error) {
tw.mu.Lock()
defer tw.mu.Unlock()
return tw.tx.DeleteMulti(keys)
}
var datastoreTransactionKey = "*transactionWrapper"
func withDatastoreTransaction(c context.Context, tx *datastore.Transaction) context.Context {
return context.WithValue(c, &datastoreTransactionKey, &transactionWrapper{tx: tx})
}
func withoutDatastoreTransaction(c context.Context) context.Context {
return context.WithValue(c, &datastoreTransactionKey, nil)
}
func datastoreTransaction(c context.Context) *transactionWrapper {
if tw, ok := c.Value(&datastoreTransactionKey).(*transactionWrapper); ok {
return tw
}
return nil
}
func normalizeError(err error) error {
switch err {
case datastore.ErrNoSuchEntity:
return ds.ErrNoSuchEntity
case datastore.ErrConcurrentTransaction:
return ds.ErrConcurrentTransaction
case datastore.ErrInvalidKey:
return ds.MakeErrInvalidKey("").Err()
default:
return err
}
}