| // Copyright 2015 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package prod |
| |
| import ( |
| ds "github.com/luci/gae/service/datastore" |
| "github.com/luci/gae/service/info" |
| "github.com/luci/luci-go/common/errors" |
| "golang.org/x/net/context" |
| "google.golang.org/appengine/datastore" |
| ) |
| |
| // useRDS adds a gae.RawDatastore implementation to context, accessible |
| // by gae.GetDS(c) |
| func useRDS(c context.Context) context.Context { |
| return ds.SetRawFactory(c, func(ci context.Context, wantTxn bool) ds.RawInterface { |
| ns := info.Get(ci).GetNamespace() |
| maybeTxnCtx := AEContext(ci) |
| |
| if wantTxn { |
| return rdsImpl{ci, maybeTxnCtx, ns} |
| } |
| aeCtx := AEContextNoTxn(ci) |
| if maybeTxnCtx != aeCtx { |
| ci = context.WithValue(ci, prodContextKey, aeCtx) |
| } |
| return rdsImpl{ci, aeCtx, ns} |
| }) |
| } |
| |
| ////////// Datastore |
| |
| type rdsImpl struct { |
| // userCtx is the context that has the luci/gae services and user objects in |
| // it. |
| userCtx context.Context |
| |
| // aeCtx is the context with the appengine connection information in it. |
| aeCtx context.Context |
| |
| ns string |
| } |
| |
| func idxCallbacker(err error, amt int, cb func(idx int, err error)) error { |
| if err == nil { |
| for i := 0; i < amt; i++ { |
| cb(i, nil) |
| } |
| return nil |
| } |
| err = errors.Fix(err) |
| me, ok := err.(errors.MultiError) |
| if ok { |
| for i, err := range me { |
| cb(i, err) |
| } |
| return nil |
| } |
| return err |
| } |
| |
| func (d rdsImpl) AllocateIDs(incomplete *ds.Key, n int) (start int64, err error) { |
| par, err := dsF2R(d.aeCtx, incomplete.Parent()) |
| if err != nil { |
| return |
| } |
| |
| start, _, err = datastore.AllocateIDs(d.aeCtx, incomplete.Kind(), par, n) |
| return |
| } |
| |
| func (d rdsImpl) DeleteMulti(ks []*ds.Key, cb ds.DeleteMultiCB) error { |
| keys, err := dsMF2R(d.aeCtx, ks) |
| if err == nil { |
| err = datastore.DeleteMulti(d.aeCtx, keys) |
| } |
| return idxCallbacker(err, len(ks), func(_ int, err error) { |
| cb(err) |
| }) |
| } |
| |
| func (d rdsImpl) GetMulti(keys []*ds.Key, _meta ds.MultiMetaGetter, cb ds.GetMultiCB) error { |
| vals := make([]datastore.PropertyLoadSaver, len(keys)) |
| rkeys, err := dsMF2R(d.aeCtx, keys) |
| if err == nil { |
| for i := range keys { |
| vals[i] = &typeFilter{d.aeCtx, ds.PropertyMap{}} |
| } |
| err = datastore.GetMulti(d.aeCtx, rkeys, vals) |
| } |
| return idxCallbacker(err, len(keys), func(idx int, err error) { |
| if pls := vals[idx]; pls != nil { |
| cb(pls.(*typeFilter).pm, err) |
| } else { |
| cb(nil, err) |
| } |
| }) |
| } |
| |
| func (d rdsImpl) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error { |
| rkeys, err := dsMF2R(d.aeCtx, keys) |
| if err == nil { |
| rvals := make([]datastore.PropertyLoadSaver, len(vals)) |
| for i, val := range vals { |
| rvals[i] = &typeFilter{d.aeCtx, val} |
| } |
| rkeys, err = datastore.PutMulti(d.aeCtx, rkeys, rvals) |
| } |
| return idxCallbacker(err, len(keys), func(idx int, err error) { |
| k := (*ds.Key)(nil) |
| if err == nil { |
| k = dsR2F(rkeys[idx]) |
| } |
| cb(k, err) |
| }) |
| } |
| |
| func (d rdsImpl) fixQuery(fq *ds.FinalizedQuery) (*datastore.Query, error) { |
| ret := datastore.NewQuery(fq.Kind()) |
| |
| start, end := fq.Bounds() |
| if start != nil { |
| ret = ret.Start(start.(datastore.Cursor)) |
| } |
| if end != nil { |
| ret = ret.End(end.(datastore.Cursor)) |
| } |
| |
| for prop, vals := range fq.EqFilters() { |
| if prop == "__ancestor__" { |
| p, err := dsF2RProp(d.aeCtx, vals[0]) |
| if err != nil { |
| return nil, err |
| } |
| ret = ret.Ancestor(p.Value.(*datastore.Key)) |
| } else { |
| filt := prop + "=" |
| for _, v := range vals { |
| p, err := dsF2RProp(d.aeCtx, v) |
| if err != nil { |
| return nil, err |
| } |
| |
| ret = ret.Filter(filt, p.Value) |
| } |
| } |
| } |
| |
| if lnam, lop, lprop := fq.IneqFilterLow(); lnam != "" { |
| p, err := dsF2RProp(d.aeCtx, lprop) |
| if err != nil { |
| return nil, err |
| } |
| ret = ret.Filter(lnam+" "+lop, p.Value) |
| } |
| |
| if hnam, hop, hprop := fq.IneqFilterHigh(); hnam != "" { |
| p, err := dsF2RProp(d.aeCtx, hprop) |
| if err != nil { |
| return nil, err |
| } |
| ret = ret.Filter(hnam+" "+hop, p.Value) |
| } |
| |
| if fq.EventuallyConsistent() { |
| ret = ret.EventualConsistency() |
| } |
| |
| if fq.KeysOnly() { |
| ret = ret.KeysOnly() |
| } |
| |
| if lim, ok := fq.Limit(); ok { |
| ret = ret.Limit(int(lim)) |
| } |
| |
| if off, ok := fq.Offset(); ok { |
| ret = ret.Offset(int(off)) |
| } |
| |
| for _, o := range fq.Orders() { |
| ret = ret.Order(o.String()) |
| } |
| |
| ret = ret.Project(fq.Project()...) |
| if fq.Distinct() { |
| ret = ret.Distinct() |
| } |
| |
| return ret, nil |
| } |
| |
| func (d rdsImpl) DecodeCursor(s string) (ds.Cursor, error) { |
| return datastore.DecodeCursor(s) |
| } |
| |
| func (d rdsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error { |
| q, err := d.fixQuery(fq) |
| if err != nil { |
| return err |
| } |
| |
| t := q.Run(d.aeCtx) |
| |
| cfunc := func() (ds.Cursor, error) { |
| return t.Cursor() |
| } |
| tf := typeFilter{} |
| for { |
| k, err := t.Next(&tf) |
| if err == datastore.Done { |
| return nil |
| } |
| if err != nil { |
| return err |
| } |
| if !cb(dsR2F(k), tf.pm, cfunc) { |
| return nil |
| } |
| } |
| } |
| |
| func (d rdsImpl) Count(fq *ds.FinalizedQuery) (int64, error) { |
| q, err := d.fixQuery(fq) |
| if err != nil { |
| return 0, err |
| } |
| ret, err := q.Count(d.aeCtx) |
| return int64(ret), err |
| } |
| |
| func (d rdsImpl) RunInTransaction(f func(c context.Context) error, opts *ds.TransactionOptions) error { |
| ropts := (*datastore.TransactionOptions)(opts) |
| return datastore.RunInTransaction(d.aeCtx, func(c context.Context) error { |
| return f(context.WithValue(d.userCtx, prodContextKey, c)) |
| }, ropts) |
| } |
| |
| func (d rdsImpl) Testable() ds.Testable { |
| return nil |
| } |