blob: 1c6a1deafeba08f896049c758524562b435d63c3 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prod
import (
"go.chromium.org/gae/impl/prod/constraints"
ds "go.chromium.org/gae/service/datastore"
"go.chromium.org/luci/common/errors"
"golang.org/x/net/context"
"google.golang.org/appengine"
"google.golang.org/appengine/datastore"
)
// useRDS adds a gae.RawDatastore implementation to context, accessible
// by gae.GetDS(c)
func useRDS(c context.Context) context.Context {
return ds.SetRawFactory(c, func(ci context.Context) ds.RawInterface {
rds := rdsImpl{
userCtx: ci,
ps: getProdState(ci),
}
rds.aeCtx = rds.ps.context(ci)
return &rds
})
}
////////// Datastore
type rdsImpl struct {
// userCtx is the context that has the luci/gae services and user objects in
// it.
userCtx context.Context
// aeCtx is the AppEngine Context that will be used in method calls. This is
// derived from ps.
aeCtx context.Context
// ps is the current production state.
ps prodState
}
func fixMultiError(err error) error {
if err == nil {
return nil
}
if baseME, ok := err.(appengine.MultiError); ok {
return errors.NewMultiError(baseME...)
}
return err
}
func idxCallbacker(err error, amt int, cb func(idx int, err error)) error {
if err == nil {
for i := 0; i < amt; i++ {
cb(i, nil)
}
return nil
}
err = fixMultiError(err)
me, ok := err.(errors.MultiError)
if ok {
for i, err := range me {
cb(i, err)
}
return nil
}
return err
}
func (d *rdsImpl) AllocateIDs(keys []*ds.Key, cb ds.NewKeyCB) error {
// Map keys by entity type.
entityMap := make(map[string][]int)
for i, key := range keys {
ks := key.String()
entityMap[ks] = append(entityMap[ks], i)
}
// Allocate a set of IDs for each unique entity type.
errors := errors.NewLazyMultiError(len(keys))
setErrs := func(idxs []int, err error) {
for _, idx := range idxs {
errors.Assign(idx, err)
}
}
for _, idxs := range entityMap {
incomplete := keys[idxs[0]]
par, err := dsF2R(d.aeCtx, incomplete.Parent())
if err != nil {
setErrs(idxs, err)
continue
}
start, _, err := datastore.AllocateIDs(d.aeCtx, incomplete.Kind(), par, len(idxs))
if err != nil {
setErrs(idxs, err)
continue
}
for i, idx := range idxs {
keys[idx] = incomplete.WithID("", start+int64(i))
}
}
for i, key := range keys {
if err := errors.GetOne(i); err != nil {
cb(i, nil, err)
} else {
cb(i, key, nil)
}
}
return nil
}
func (d *rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error {
keys, err := dsMF2R(d.aeCtx, ks)
if err == nil {
err = datastore.DeleteMulti(d.aeCtx, keys)
}
return idxCallbacker(err, len(ks), func(idx int, err error) {
cb(idx, err)
})
}
func (d *rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error {
vals := make([]datastore.PropertyLoadSaver, len(keys))
rkeys, err := dsMF2R(d.aeCtx, keys)
if err == nil {
for i := range keys {
vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}}
}
err = datastore.GetMulti(d.aeCtx, rkeys, vals)
}
return idxCallbacker(err, len(keys), func(idx int, err error) {
if pls := vals[idx]; pls != nil {
cb(idx, pls.(*typeFilter).pm, err)
return
}
cb(idx, nil, err)
})
}
func (d *rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.NewKeyCB) error {
rkeys, err := dsMF2R(d.aeCtx, keys)
if err == nil {
rvals := make([]datastore.PropertyLoadSaver, len(vals))
for i, val := range vals {
rvals[i] = &typeFilter{d.aeCtx, val}
}
rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals)
}
return idxCallbacker(err, len(keys), func(idx int, err error) {
k := (*ds.Key)(nil)
if err == nil {
k = dsR2F(rkeys[idx])
}
cb(idx, k, err)
})
}
func (d *rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) {
ret := datastore.NewQuery(fq.Kind())
start, end := fq.Bounds()
if start != nil {
ret = ret.Start(start.(datastore.Cursor))
}
if end != nil {
ret = ret.End(end.(datastore.Cursor))
}
for prop, vals := range fq.EqFilters() {
if prop == "__ancestor__" {
p, err := dsF2RProp(d.aeCtx, vals[0])
if err != nil {
return nil, err
}
ret = ret.Ancestor(p.Value.(*datastore.Key))
} else {
filt := prop + "="
for _, v := range vals {
p, err := dsF2RProp(d.aeCtx, v)
if err != nil {
return nil, err
}
// Filter doesn't like ByteString, even though ByteString is the indexed
// counterpart of []byte... sigh.
if ds, ok := p.Value.(datastore.ByteString); ok {
p.Value = []byte(ds)
}
ret = ret.Filter(filt, p.Value)
}
}
}
if lnam, lop, lprop := fq.IneqFilterLow(); lnam != "" {
p, err := dsF2RProp(d.aeCtx, lprop)
if err != nil {
return nil, err
}
ret = ret.Filter(lnam+" "+lop, p.Value)
}
if hnam, hop, hprop := fq.IneqFilterHigh(); hnam != "" {
p, err := dsF2RProp(d.aeCtx, hprop)
if err != nil {
return nil, err
}
ret = ret.Filter(hnam+" "+hop, p.Value)
}
if fq.EventuallyConsistent() {
ret = ret.EventualConsistency()
}
if fq.KeysOnly() {
ret = ret.KeysOnly()
}
if lim, ok := fq.Limit(); ok {
ret = ret.Limit(int(lim))
}
if off, ok := fq.Offset(); ok {
ret = ret.Offset(int(off))
}
for _, o := range fq.Orders() {
ret = ret.Order(o.String())
}
ret = ret.Project(fq.Project()...)
if fq.Distinct() {
ret = ret.Distinct()
}
return ret, nil
}
func (d *rdsImpl) DecodeCursor(s string) (ds.Cursor, error) {
return datastore.DecodeCursor(s)
}
func (d *rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
q, err := d.fixQuery(fq)
if err != nil {
return err
}
t := q.Run(d.aeCtx)
cfunc := func() (ds.Cursor, error) {
return t.Cursor()
}
tf := typeFilter{}
for {
k, err := t.Next(&tf)
if err == datastore.Done {
return nil
}
if err != nil {
return err
}
if err := cb(dsR2F(k), tf.pm, cfunc); err != nil {
return err
}
}
}
func (d *rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) {
q, err := d.fixQuery(fq)
if err != nil {
return 0, err
}
ret, err := q.Count(d.aeCtx)
return int64(ret), err
}
func (d *rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.TransactionOptions) error {
ropts := &datastore.TransactionOptions{
// Cloud Datastore no longer exposes the ability to explicitly allow
// cross-group transactions. Since appengine datastore is effectively
// deprecated in favor of Cloud Datastore (e.g. the only way to access
// Datastore in "current" GAE versions), we just set all transactions as
// XG=true now. We believe that this should have no observable difference
// for code which only touches a single entity group. The only difference
// will be that transactions which would error (due to touching multiple
// groups) will now no longer error out.
XG: true,
}
if opts != nil {
ropts.Attempts = opts.Attempts
ropts.ReadOnly = opts.ReadOnly
}
return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error {
// Derive a prodState with this transaction Context.
ps := d.ps
ps.ctx = c
ps.inTxn = true
c = withProdState(d.userCtx, ps)
return f(c)
}, ropts)
}
func (d *rdsImpl) WithoutTransaction() context.Context {
c := d.userCtx
if d.ps.inTxn {
// We're in a transaction. Reset to non-transactional state.
ps := d.ps
ps.ctx = ps.noTxnCtx
ps.inTxn = false
c = withProdState(c, ps)
}
return c
}
func (d *rdsImpl) CurrentTransaction() ds.Transaction {
if d.ps.inTxn {
// Since we don't distinguish between transactions (yet), we just need this
// to be non-nil.
return struct{}{}
}
return nil
}
func (d *rdsImpl) Constraints() ds.Constraints { return constraints.DS() }
func (d *rdsImpl) GetTestable() ds.Testable {
return nil
}