blob: d0d1dc6841d1e3bfbc429336b9a396b286fb65b9 [file] [log] [blame]
// Copyright 2016 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package cloud
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/luci/luci-go/common/errors"
"github.com/luci/gae/impl/prod/constraints"
ds "github.com/luci/gae/service/datastore"
"cloud.google.com/go/datastore"
"google.golang.org/api/iterator"
"golang.org/x/net/context"
)
type cloudDatastore struct {
client *datastore.Client
}
func (cds *cloudDatastore) use(c context.Context) context.Context {
return ds.SetRawFactory(c, func(ic context.Context) ds.RawInterface {
return &boundDatastore{
Context: ic,
cloudDatastore: cds,
transaction: datastoreTransaction(ic),
kc: ds.GetKeyContext(ic),
}
})
}
// boundDatastore is a bound instance of the cloudDatastore installed in the
// Context.
type boundDatastore struct {
context.Context
// Context is the bound user Context. It includes the datastore namespace, if
// one is set.
*cloudDatastore
transaction *datastore.Transaction
kc ds.KeyContext
}
func (bds *boundDatastore) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error {
nativeKeys, err := bds.client.AllocateIDs(bds, bds.gaeKeysToNative(keys...))
if err != nil {
return normalizeError(err)
}
keys = bds.nativeKeysToGAE(nativeKeys...)
for _, key := range keys {
cb(key, nil)
}
return nil
}
func (bds *boundDatastore) RunInTransaction(fn func(context.Context) error, opts *ds.TransactionOptions) error {
if bds.transaction != nil {
return errors.New("nested transactions are not supported")
}
// The cloud datastore SDK does not expose any transaction options.
if opts != nil {
switch {
case opts.XG:
return errors.New("cross-group transactions are not supported")
}
}
attempts := 3
if opts != nil && opts.Attempts > 0 {
attempts = opts.Attempts
}
for i := 0; i < attempts; i++ {
_, err := bds.client.RunInTransaction(bds, func(tx *datastore.Transaction) error {
return fn(withDatastoreTransaction(bds, tx))
})
if err = normalizeError(err); err != ds.ErrConcurrentTransaction {
return err
}
}
return ds.ErrConcurrentTransaction
}
func (bds *boundDatastore) DecodeCursor(s string) (ds.Cursor, error) {
cursor, err := datastore.DecodeCursor(s)
return cursor, normalizeError(err)
}
func (bds *boundDatastore) Run(q *ds.FinalizedQuery, cb ds.RawRunCB) error {
it := bds.client.Run(bds, bds.prepareNativeQuery(q))
cursorFn := func() (ds.Cursor, error) {
return it.Cursor()
}
for {
var npls *nativePropertyLoadSaver
if !q.KeysOnly() {
npls = bds.mkNPLS(nil)
}
nativeKey, err := it.Next(npls)
if err != nil {
if err == iterator.Done {
return nil
}
return normalizeError(err)
}
var pmap ds.PropertyMap
if npls != nil {
pmap = npls.pmap
}
if err := cb(bds.nativeKeysToGAE(nativeKey)[0], pmap, cursorFn); err != nil {
if err == ds.Stop {
return nil
}
return normalizeError(err)
}
}
}
func (bds *boundDatastore) Count(q *ds.FinalizedQuery) (int64, error) {
v, err := bds.client.Count(bds, bds.prepareNativeQuery(q))
if err != nil {
return -1, normalizeError(err)
}
return int64(v), nil
}
func idxCallbacker(err error, amt int, cb func(idx int, err error) error) error {
if err == nil {
for i := 0; i < amt; i++ {
if err := cb(i, nil); err != nil {
return err
}
}
return nil
}
err = errors.Fix(err)
if me, ok := err.(errors.MultiError); ok {
for i, err := range me {
if err := cb(i, normalizeError(err)); err != nil {
return err
}
}
return nil
}
return normalizeError(err)
}
func (bds *boundDatastore) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error {
nativeKeys := bds.gaeKeysToNative(keys...)
nativePLS := make([]*nativePropertyLoadSaver, len(nativeKeys))
for i := range nativePLS {
nativePLS[i] = bds.mkNPLS(nil)
}
var err error
if bds.transaction != nil {
// Transactional GetMulti.
err = bds.transaction.GetMulti(nativeKeys, nativePLS)
} else {
// Non-transactional GetMulti.
err = bds.client.GetMulti(bds, nativeKeys, nativePLS)
}
return idxCallbacker(err, len(nativePLS), func(idx int, err error) error {
return cb(nativePLS[idx].pmap, err)
})
}
func (bds *boundDatastore) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB) error {
nativeKeys := bds.gaeKeysToNative(keys...)
nativePLS := make([]*nativePropertyLoadSaver, len(vals))
for i := range nativePLS {
nativePLS[i] = bds.mkNPLS(vals[i])
}
var err error
if bds.transaction != nil {
// Transactional PutMulti.
//
// In order to simulate the presence of mid-transaction key allocation, we
// will identify any incomplete keys and allocate IDs for them. This is
// potentially wasteful in the event of failed or retried transactions, but
// it is required to maintain API compatibility with the datastore
// interface.
var incompleteKeys []*datastore.Key
var incompleteKeyMap map[int]int
for i, k := range nativeKeys {
if k.Incomplete() {
if incompleteKeyMap == nil {
// Optimization: if there are any incomplete keys, allocate room for
// the full range.
incompleteKeyMap = make(map[int]int, len(nativeKeys)-i)
incompleteKeys = make([]*datastore.Key, 0, len(nativeKeys)-i)
}
incompleteKeyMap[len(incompleteKeys)] = i
incompleteKeys = append(incompleteKeys, k)
}
}
if len(incompleteKeys) > 0 {
idKeys, err := bds.client.AllocateIDs(bds, incompleteKeys)
if err != nil {
return err
}
for i, idKey := range idKeys {
nativeKeys[incompleteKeyMap[i]] = idKey
}
}
_, err = bds.transaction.PutMulti(nativeKeys, nativePLS)
} else {
// Non-transactional PutMulti.
nativeKeys, err = bds.client.PutMulti(bds, nativeKeys, nativePLS)
}
return idxCallbacker(err, len(nativeKeys), func(idx int, err error) error {
if err == nil {
return cb(bds.nativeKeysToGAE(nativeKeys[idx])[0], nil)
}
return cb(nil, err)
})
}
func (bds *boundDatastore) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
nativeKeys := bds.gaeKeysToNative(keys...)
var err error
if bds.transaction != nil {
// Transactional DeleteMulti.
err = bds.transaction.DeleteMulti(nativeKeys)
} else {
// Non-transactional DeleteMulti.
err = bds.client.DeleteMulti(bds, nativeKeys)
}
return idxCallbacker(err, len(nativeKeys), func(_ int, err error) error {
return cb(err)
})
}
func (bds *boundDatastore) WithoutTransaction() context.Context {
return withDatastoreTransaction(bds, nil)
}
func (bds *boundDatastore) CurrentTransaction() ds.Transaction { return bds.transaction }
func (bds *boundDatastore) Constraints() ds.Constraints { return constraints.DS() }
func (bds *boundDatastore) GetTestable() ds.Testable { return nil }
func (bds *boundDatastore) prepareNativeQuery(fq *ds.FinalizedQuery) *datastore.Query {
nq := datastore.NewQuery(fq.Kind())
if bds.transaction != nil {
nq = nq.Transaction(bds.transaction)
}
if ns := bds.kc.Namespace; ns != "" {
nq = nq.Namespace(ns)
}
// nativeFilter translates a filter field. If the translation fails, we'll
// pass the result through to the underlying datastore and allow it to
// reject it.
nativeFilter := func(prop ds.Property) interface{} {
if np, err := bds.gaePropertyToNative("", prop); err == nil {
return np.Value
}
return prop.Value()
}
// Equality filters.
for field, props := range fq.EqFilters() {
for _, prop := range props {
nq = nq.Filter(fmt.Sprintf("%s =", field), nativeFilter(prop))
}
}
// Inequality filters.
if ineq := fq.IneqFilterProp(); ineq != "" {
if field, op, prop := fq.IneqFilterLow(); field != "" {
nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFilter(prop))
}
if field, op, prop := fq.IneqFilterHigh(); field != "" {
nq = nq.Filter(fmt.Sprintf("%s %s", field, op), nativeFilter(prop))
}
}
start, end := fq.Bounds()
if start != nil {
nq = nq.Start(start.(datastore.Cursor))
}
if end != nil {
nq = nq.End(end.(datastore.Cursor))
}
if fq.Distinct() {
nq = nq.Distinct()
}
if fq.KeysOnly() {
nq = nq.KeysOnly()
}
if limit, ok := fq.Limit(); ok {
nq = nq.Limit(int(limit))
}
if offset, ok := fq.Offset(); ok {
nq = nq.Offset(int(offset))
}
if proj := fq.Project(); proj != nil {
nq = nq.Project(proj...)
}
if ancestor := fq.Ancestor(); ancestor != nil {
nq = nq.Ancestor(bds.gaeKeysToNative(ancestor)[0])
}
if fq.EventuallyConsistent() {
nq = nq.EventualConsistency()
}
for _, ic := range fq.Orders() {
prop := ic.Property
if ic.Descending {
prop = "-" + prop
}
nq = nq.Order(prop)
}
return nq
}
func (bds *boundDatastore) mkNPLS(base ds.PropertyMap) *nativePropertyLoadSaver {
return &nativePropertyLoadSaver{
bds: bds,
pmap: clonePropertyMap(base),
}
}
func (bds *boundDatastore) gaePropertyToNative(name string, pdata ds.PropertyData) (
nativeProp datastore.Property, err error) {
nativeProp.Name = name
convert := func(prop *ds.Property) (interface{}, error) {
switch pt := prop.Type(); pt {
case ds.PTNull, ds.PTInt, ds.PTTime, ds.PTBool, ds.PTBytes, ds.PTString, ds.PTFloat:
return prop.Value(), nil
case ds.PTKey:
return bds.gaeKeysToNative(prop.Value().(*ds.Key))[0], nil
default:
return nil, fmt.Errorf("unsupported property type: %v", pt)
}
}
switch t := pdata.(type) {
case ds.Property:
if nativeProp.Value, err = convert(&t); err != nil {
return
}
nativeProp.NoIndex = (t.IndexSetting() != ds.ShouldIndex)
case ds.PropertySlice:
// Don't index by default. If *any* sub-property requests being indexed,
// then we will index.
nativeProp.NoIndex = true
// Pack this into an interface{} so it is marked as a multi-value.
multiProp := make([]interface{}, len(t))
for i := range t {
prop := &t[i]
if multiProp[i], err = convert(prop); err != nil {
return
}
if prop.IndexSetting() == ds.ShouldIndex {
nativeProp.NoIndex = false
}
}
nativeProp.Value = multiProp
default:
err = fmt.Errorf("unsupported PropertyData type for %q: %T", name, pdata)
}
return
}
func (bds *boundDatastore) nativePropertyToGAE(nativeProp datastore.Property) (
name string, pdata ds.PropertyData, err error) {
name = nativeProp.Name
convert := func(nv interface{}, prop *ds.Property) error {
switch nvt := nv.(type) {
case nil:
nv = nil
case int64, bool, string, float64, []byte:
break
case time.Time:
// Cloud datastore library returns local time.
nv = nvt.UTC()
case *datastore.Key:
nv = bds.nativeKeysToGAE(nvt)[0]
default:
return fmt.Errorf("unsupported datastore.Value type for %q: %T", name, nvt)
}
indexSetting := ds.ShouldIndex
if nativeProp.NoIndex {
indexSetting = ds.NoIndex
}
prop.SetValue(nv, indexSetting)
return nil
}
// Slice of supported native type. Break this into a slice of datastore
// properties.
//
// It must be an []interface{}.
if rv := reflect.ValueOf(nativeProp.Value); rv.Kind() == reflect.Slice && rv.Type().Elem().Kind() == reflect.Interface {
// []interface{}, which is a multi-valued property with a single name.
// Convert to a PropertySlice.
nativeValues := rv.Interface().([]interface{})
pslice := make(ds.PropertySlice, len(nativeValues))
for i, nv := range nativeValues {
if err = convert(nv, &pslice[i]); err != nil {
return
}
}
pdata = pslice
return
}
var prop ds.Property
if err = convert(nativeProp.Value, &prop); err != nil {
return
}
pdata = prop
return
}
func (bds *boundDatastore) gaeKeysToNative(keys ...*ds.Key) []*datastore.Key {
nativeKeys := make([]*datastore.Key, len(keys))
for i, key := range keys {
_, _, toks := key.Split()
var nativeKey *datastore.Key
for _, tok := range toks {
nativeKey = &datastore.Key{
Kind: tok.Kind,
ID: tok.IntID,
Name: tok.StringID,
Parent: nativeKey,
Namespace: key.Namespace(),
}
}
nativeKeys[i] = nativeKey
}
return nativeKeys
}
func (bds *boundDatastore) nativeKeysToGAE(nativeKeys ...*datastore.Key) []*ds.Key {
keys := make([]*ds.Key, len(nativeKeys))
toks := make([]ds.KeyTok, 1)
kc := bds.kc
for i, nativeKey := range nativeKeys {
toks = toks[:0]
cur := nativeKey
for {
toks = append(toks, ds.KeyTok{Kind: cur.Kind, IntID: cur.ID, StringID: cur.Name})
cur = cur.Parent
if cur == nil {
break
}
}
// Reverse "toks" so we have ancestor-to-child lineage.
for i := 0; i < len(toks)/2; i++ {
ri := len(toks) - i - 1
toks[i], toks[ri] = toks[ri], toks[i]
}
kc.Namespace = nativeKey.Namespace
keys[i] = kc.NewKeyToks(toks)
}
return keys
}
// nativePropertyLoadSaver is a ds.PropertyMap which implements
// datastore.PropertyLoadSaver.
//
// It naturally converts between native and GAE properties and values.
type nativePropertyLoadSaver struct {
bds *boundDatastore
pmap ds.PropertyMap
}
var _ datastore.PropertyLoadSaver = (*nativePropertyLoadSaver)(nil)
func (npls *nativePropertyLoadSaver) Load(props []datastore.Property) error {
if npls.pmap == nil {
// Allocate for common case: one property per property name.
npls.pmap = make(ds.PropertyMap, len(props))
}
for _, nativeProp := range props {
name, pdata, err := npls.bds.nativePropertyToGAE(nativeProp)
if err != nil {
return err
}
if _, ok := npls.pmap[name]; ok {
return fmt.Errorf("duplicate properties for %q", name)
}
npls.pmap[name] = pdata
}
return nil
}
func (npls *nativePropertyLoadSaver) Save() ([]datastore.Property, error) {
if len(npls.pmap) == 0 {
return nil, nil
}
props := make([]datastore.Property, 0, len(npls.pmap))
for name, pdata := range npls.pmap {
// Strip meta.
if strings.HasPrefix(name, "$") {
continue
}
nativeProp, err := npls.bds.gaePropertyToNative(name, pdata)
if err != nil {
return nil, err
}
props = append(props, nativeProp)
}
return props, nil
}
var datastoreTransactionKey = "*datastore.Transaction"
func withDatastoreTransaction(c context.Context, tx *datastore.Transaction) context.Context {
return context.WithValue(c, &datastoreTransactionKey, tx)
}
func datastoreTransaction(c context.Context) *datastore.Transaction {
if tx, ok := c.Value(&datastoreTransactionKey).(*datastore.Transaction); ok {
return tx
}
return nil
}
func clonePropertyMap(pmap ds.PropertyMap) ds.PropertyMap {
if pmap == nil {
return nil
}
clone := make(ds.PropertyMap, len(pmap))
for k, pdata := range pmap {
clone[k] = pdata.Clone()
}
return clone
}
func normalizeError(err error) error {
switch err {
case datastore.ErrNoSuchEntity:
return ds.ErrNoSuchEntity
case datastore.ErrConcurrentTransaction:
return ds.ErrConcurrentTransaction
case datastore.ErrInvalidKey:
return ds.MakeErrInvalidKey().Err()
default:
return err
}
}