| // Copyright 2015 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package datastore |
| |
| import ( |
| "container/heap" |
| "context" |
| "fmt" |
| "reflect" |
| "sort" |
| |
| "golang.org/x/sync/errgroup" |
| |
| "go.chromium.org/luci/common/data/stringset" |
| "go.chromium.org/luci/common/errors" |
| |
| multicursor "go.chromium.org/luci/gae/service/datastore/internal/protos/multicursor" |
| ) |
| |
| type resolvedRunCallback func(reflect.Value, CursorCB) error |
| |
| func parseRunCallback(cbIface any) (rcb resolvedRunCallback, isKey bool, mat *multiArgType, hasCursorCB bool) { |
| badSig := func() { |
| panic(fmt.Errorf( |
| "cb does not match the required callback signature: `%T` != `func(TYPE, [CursorCB]) [error]`", |
| cbIface)) |
| } |
| |
| if cbIface == nil { |
| badSig() |
| } |
| |
| // TODO(riannucci): Profile and determine if any of this is causing a real |
| // slowdown. Could potentially cache reflection stuff by cbTyp? |
| cbVal := reflect.ValueOf(cbIface) |
| cbTyp := cbVal.Type() |
| |
| if cbTyp.Kind() != reflect.Func { |
| badSig() |
| } |
| |
| numIn := cbTyp.NumIn() |
| if numIn != 1 && numIn != 2 { |
| badSig() |
| } |
| |
| firstArg := cbTyp.In(0) |
| if firstArg == typeOfKey { |
| isKey = true |
| } else { |
| mat = mustParseArg(firstArg, false) |
| if mat.newElem == nil { |
| badSig() |
| } |
| } |
| |
| hasCursorCB = numIn == 2 |
| if hasCursorCB && cbTyp.In(1) != typeOfCursorCB { |
| badSig() |
| } |
| |
| if cbTyp.NumOut() > 1 { |
| badSig() |
| } else if cbTyp.NumOut() == 1 && cbTyp.Out(0) != typeOfError { |
| badSig() |
| } |
| hasErr := cbTyp.NumOut() == 1 |
| |
| // Resolve to generic function. |
| switch { |
| case hasErr && hasCursorCB: |
| // func(reflect.Value, CursorCB) error |
| rcb = func(v reflect.Value, cb CursorCB) error { |
| err := cbVal.Call([]reflect.Value{v, reflect.ValueOf(cb)})[0].Interface() |
| if err != nil { |
| return err.(error) |
| } |
| return nil |
| } |
| |
| case hasErr && !hasCursorCB: |
| // func(reflect.Value) error |
| rcb = func(v reflect.Value, _ CursorCB) error { |
| err := cbVal.Call([]reflect.Value{v})[0].Interface() |
| if err != nil { |
| return err.(error) |
| } |
| return nil |
| } |
| |
| case !hasErr && hasCursorCB: |
| // func(reflect.Value, CursorCB) |
| rcb = func(v reflect.Value, cb CursorCB) error { |
| cbVal.Call([]reflect.Value{v, reflect.ValueOf(cb)}) |
| return nil |
| } |
| |
| case !hasErr && !hasCursorCB: |
| // func(reflect.Value) |
| rcb = func(v reflect.Value, _ CursorCB) error { |
| cbVal.Call([]reflect.Value{v}) |
| return nil |
| } |
| |
| default: |
| badSig() |
| } |
| |
| return |
| } |
| |
| // AllocateIDs allows you to allocate IDs from the datastore without putting |
| // any data. |
| // |
| // A partial valid key will be constructed from each entity's kind and parent, |
| // if present. An allocation will then be performed against the datastore for |
| // each key, and the partial key will be populated with a unique integer ID. |
| // The resulting keys will be applied to their objects using PopulateKey. If |
| // successful, any existing ID will be destroyed. |
| // |
| // If the object is supplied that cannot accept an integer key, this method |
| // will panic. |
| // |
| // ent must be one of: |
| // - *S where S is a struct |
| // - *P where *P is a concrete type implementing PropertyLoadSaver |
| // - []S or []*S where S is a struct |
| // - []P or []*P where *P is a concrete type implementing PropertyLoadSaver |
| // - []I, where I is some interface type. Each element of the slice must have |
| // either *S or *P as its underlying type. |
| // - []*Key, to populate a slice of partial-valid keys. |
| // |
| // nil values (or interface-typed nils) are not allowed, neither as standalone |
| // arguments nor inside slices. Passing them will cause a panic. |
| // |
| // If an error is encountered, the returned error value will depend on the |
| // input arguments. If one argument is supplied, the result will be the |
| // encountered error type. If multiple arguments are supplied, the result will |
| // be a MultiError whose error index corresponds to the argument in which the |
| // error was encountered. |
| // |
| // If an ent argument is a slice, its error type will be a MultiError. Note |
| // that in the scenario where multiple slices are provided, this will return a |
| // MultiError containing a nested MultiError for each slice argument. |
| func AllocateIDs(c context.Context, ent ...any) error { |
| if len(ent) == 0 { |
| return nil |
| } |
| |
| mma, err := makeMetaMultiArg(ent, mmaWriteKeys) |
| if err != nil { |
| panic(err) |
| } |
| |
| keys, _, et := mma.getKeysPMs(GetKeyContext(c), false) |
| if len(keys) == 0 { |
| return nil |
| } |
| |
| var dat DroppedArgTracker |
| dat.MarkNilKeys(keys) |
| keys, dal := dat.DropKeys(keys) |
| |
| // Convert each key to be partial valid, assigning an integer ID of 0. |
| // Confirm that each object can be populated with such a key. |
| for compressedIdx, key := range keys { |
| keys[compressedIdx] = key.Incomplete() |
| } |
| |
| err = Raw(c).AllocateIDs(keys, func(compressedIdx int, key *Key, err error) { |
| idx := dal.OriginalIndex(compressedIdx) |
| |
| index := mma.index(idx) |
| |
| if err != nil { |
| et.trackError(index, err) |
| return |
| } |
| |
| mat, v := mma.get(index) |
| if !mat.setKey(v, key) { |
| et.trackError(index, MakeErrInvalidKey("failed to export key [%s]", key).Err()) |
| return |
| } |
| }) |
| if err == nil { |
| err = et.error() |
| } |
| return maybeSingleError(err, ent) |
| } |
| |
| // KeyForObj extracts a key from src. |
| // |
| // It is the same as KeyForObjErr, except that if KeyForObjErr would have |
| // returned an error, this method panics. It's safe to use if you know that |
| // src statically meets the metadata constraints described by KeyForObjErr. |
| func KeyForObj(c context.Context, src any) *Key { |
| ret, err := KeyForObjErr(c, src) |
| if err != nil { |
| panic(err) |
| } |
| return ret |
| } |
| |
| // KeyForObjErr extracts a key from src. |
| // |
| // src must be one of: |
| // - *S, where S is a struct |
| // - a PropertyLoadSaver |
| // |
| // It is expected that the struct exposes the following metadata (as retrieved |
| // by MetaGetter.GetMeta): |
| // - "key" (type: Key) - The full datastore key to use. Must not be nil. |
| // OR |
| // - "id" (type: int64 or string) - The id of the Key to create. |
| // - "kind" (optional, type: string) - The kind of the Key to create. If |
| // blank or not present, KeyForObjErr will extract the name of the src |
| // object's type. |
| // - "parent" (optional, type: Key) - The parent key to use. |
| // |
| // By default, the metadata will be extracted from the struct and its tagged |
| // properties. However, if the struct implements MetaGetterSetter it is |
| // wholly responsible for exporting the required fields. A struct that |
| // implements GetMeta to make some minor tweaks can evoke the defualt behavior |
| // by using GetPLS(s).GetMeta. |
| // |
| // If a required metadata item is missing or of the wrong type, then this will |
| // return an error. |
| func KeyForObjErr(c context.Context, src any) (*Key, error) { |
| return GetKeyContext(c).NewKeyFromMeta(getMGS(src)) |
| } |
| |
| // MakeKey is a convenience method for manufacturing a *Key. It should only be |
| // used when elems... is known statically (e.g. in the code) to be correct. |
| // |
| // elems is pairs of (string, string|int|int32|int64) pairs, which correspond |
| // to Kind/id pairs. Example: |
| // |
| // dstore.MakeKey("Parent", 1, "Child", "id") |
| // |
| // Would create the key: |
| // |
| // <current appID>:<current Namespace>:/Parent,1/Child,id |
| // |
| // If elems is not parsable (e.g. wrong length, wrong types, etc.) this method |
| // will panic. |
| func MakeKey(c context.Context, elems ...any) *Key { |
| kc := GetKeyContext(c) |
| return kc.MakeKey(elems...) |
| } |
| |
| // NewKey constructs a new key in the current appID/Namespace, using the |
| // specified parameters. |
| func NewKey(c context.Context, kind, stringID string, intID int64, parent *Key) *Key { |
| kc := GetKeyContext(c) |
| return kc.NewKey(kind, stringID, intID, parent) |
| } |
| |
| // NewIncompleteKeys allocates count incomplete keys sharing the same kind and |
| // parent. It is useful as input to AllocateIDs. |
| func NewIncompleteKeys(c context.Context, count int, kind string, parent *Key) (keys []*Key) { |
| kc := GetKeyContext(c) |
| if count > 0 { |
| keys = make([]*Key, count) |
| for i := range keys { |
| keys[i] = kc.NewKey(kind, "", 0, parent) |
| } |
| } |
| return |
| } |
| |
| // NewKeyToks constructs a new key in the current appID/Namespace, using the |
| // specified key tokens. |
| func NewKeyToks(c context.Context, toks []KeyTok) *Key { |
| kc := GetKeyContext(c) |
| return kc.NewKeyToks(toks) |
| } |
| |
| // PopulateKey loads key into obj. |
| // |
| // obj is any object that Interface.Get is able to accept. |
| // |
| // Upon successful application, this method will return true. If the key could |
| // not be applied to the object, this method will return false. It will panic if |
| // obj is an invalid datastore model. |
| func PopulateKey(obj any, key *Key) bool { |
| return populateKeyMGS(getMGS(obj), key) |
| } |
| |
| func populateKeyMGS(mgs MetaGetterSetter, key *Key) bool { |
| setViaKey := mgs.SetMeta("key", key) |
| |
| lst := key.LastTok() |
| mgs.SetMeta("kind", lst.Kind) |
| mgs.SetMeta("parent", key.Parent()) |
| |
| setViaID := false |
| if lst.StringID != "" { |
| setViaID = mgs.SetMeta("id", lst.StringID) |
| } else { |
| setViaID = mgs.SetMeta("id", lst.IntID) |
| } |
| |
| return setViaKey || setViaID |
| } |
| |
| // RunInTransaction runs f inside of a transaction. See the appengine SDK's |
| // documentation for full details on the behavior of transactions in the |
| // datastore. |
| // |
| // Note that the behavior of transactions may change depending on what filters |
| // have been installed. It's possible that we'll end up implementing things |
| // like nested/buffered transactions as filters. |
| func RunInTransaction(c context.Context, f func(c context.Context) error, opts *TransactionOptions) error { |
| return Raw(c).RunInTransaction(f, opts) |
| } |
| |
| // Run executes the given query, and calls `cb` for each successfully |
| // retrieved item. |
| // |
| // By default, datastore applies a short (~5s) timeout to queries. This can be |
| // increased, usually to around several minutes, by explicitly setting a |
| // deadline on the supplied Context. |
| // |
| // cb is a callback function whose signature is |
| // |
| // func(obj TYPE[, getCursor CursorCB]) [error] |
| // |
| // Where TYPE is one of: |
| // - S or *S, where S is a struct |
| // - P or *P, where *P is a concrete type implementing PropertyLoadSaver |
| // - *Key (implies a keys-only query) |
| // |
| // If the error is omitted from the signature, this will run until the query |
| // returns all its results, or has an error/times out. |
| // |
| // If error is in the signature, the query will continue as long as the |
| // callback returns nil. If it returns `Stop`, the query will stop and Run |
| // will return nil. Otherwise, the query will stop and Run will return the |
| // user's error. |
| // |
| // Run may also stop on the first datastore error encountered, which can occur |
| // due to flakiness, timeout, etc. If it encounters such an error, it will |
| // be returned. |
| func Run(c context.Context, q *Query, cb any) error { |
| rcb, isKey, mat, _ := parseRunCallback(cb) |
| |
| if isKey { |
| q = q.KeysOnly(true) |
| } |
| fq, err := q.Finalize() |
| if err != nil { |
| return err |
| } |
| |
| raw := Raw(c) |
| |
| if isKey { |
| err = raw.Run(fq, func(k *Key, _ PropertyMap, gc CursorCB) error { |
| return rcb(reflect.ValueOf(k), gc) |
| }) |
| } else { |
| err = raw.Run(fq, func(k *Key, pm PropertyMap, gc CursorCB) error { |
| itm := mat.newElem() |
| if err := mat.setPM(itm, pm); err != nil { |
| return err |
| } |
| mat.setKey(itm, k) |
| return rcb(itm, gc) |
| }) |
| } |
| return filterStop(err) |
| } |
| |
| // RunMulti executes the logical OR of multiple queries, calling `cb` for each |
| // unique entity (by *Key) that it finds. Results will be returned in the order |
| // of the provided queries; All queries must have matching Orders. |
| // |
| // cb is a callback function (please refer to the `Run` function comments for |
| // formats and restrictions for `cb` in this file). |
| // |
| // The cursor that is returned by the callback cannot be used on a single query |
| // by doing `query.Start(cursor)` (In some cases it may not even complain when |
| // you try to do this. But the results are undefined). Apply the cursor to the |
| // same list of queries using ApplyCursors. |
| // |
| // Note: projection queries are not supported, as they are non-trivial in |
| // complexity and haven't been needed yet. |
| // |
| // Note: The cb is called for every unique entity (by *Key) that is retrieved |
| // on the current run. It is possible to get the same entity twice over two |
| // calls to RunMulti with different cursors. |
| // |
| // DANGER: Cursors are buggy when using Cloud Datastore production backend. |
| // Paginated queries skip entities sitting on page boundaries. This doesn't |
| // happen when using `impl/memory` and thus hard to spot in unit tests. See |
| // queryIterator doc for more details. |
| func RunMulti(c context.Context, queries []*Query, cb any) error { |
| rcb, isKey, mat, hasCursorCB := parseRunCallback(cb) |
| |
| // A helper that passes an entity to the user callback. |
| var dispatchEntity func(key *Key, pm PropertyMap, ccb CursorCB) error |
| if isKey { |
| dispatchEntity = func(key *Key, _ PropertyMap, ccb CursorCB) error { |
| return rcb(reflect.ValueOf(key), ccb) |
| } |
| } else { |
| dispatchEntity = func(key *Key, pm PropertyMap, ccb CursorCB) error { |
| itm := mat.newElem() |
| if err := mat.setPM(itm, pm); err != nil { |
| return err |
| } |
| mat.setKey(itm, key) |
| return rcb(itm, ccb) |
| } |
| } |
| |
| // Finalize queries and do some basic validation. At very least queries must |
| // use the same kind and ordering, otherwise putting their results in a single |
| // sorted heap makes no sense. |
| finalized := make([]*FinalizedQuery, len(queries)) |
| overallKind := "" |
| overallOrder := "" |
| for i, q := range queries { |
| if isKey { |
| q = q.KeysOnly(true) |
| } |
| fq, err := q.Finalize() |
| if err != nil { |
| return err |
| } |
| finalized[i] = fq |
| // Build a string identifying ordering of this query, e.g. |
| // "-field1,field2,__key__". |
| order := "" |
| for j, col := range fq.orders { |
| if j != 0 { |
| order += "," |
| } |
| order += col.String() |
| } |
| switch { |
| case i == 0: |
| overallKind = fq.kind |
| overallOrder = order |
| case fq.kind != overallKind: |
| return fmt.Errorf("all RunMulti queries should query the same kind, but got %q and %q", fq.kind, overallKind) |
| case order != overallOrder: |
| return fmt.Errorf("all RunMulti queries should use the same order, but got %q and %q", order, overallOrder) |
| } |
| } |
| |
| // No queries to run => no results to return. This is an edge case. |
| if len(finalized) == 0 { |
| return nil |
| } |
| |
| // If we have only one query, just run it directly without any extra |
| // synchronization overhead. Just make sure to use the correct cursor format. |
| // This is worth optimizing since running only one query is a very very |
| // common case. |
| if len(finalized) == 1 { |
| var err error |
| if hasCursorCB { |
| err = Raw(c).Run(finalized[0], func(key *Key, pm PropertyMap, cursorCB CursorCB) error { |
| return dispatchEntity(key, pm, func() (Cursor, error) { |
| cur, err := cursorCB() |
| if err != nil { |
| return nil, err |
| } |
| cursorStr := "" |
| if cur != nil { |
| cursorStr = cur.String() |
| } |
| return multiCursor{ |
| curs: &multicursor.Cursors{ |
| Version: multiCursorVersion, |
| MagicNumber: multiCursorMagic, |
| Cursors: []string{cursorStr}, |
| }, |
| }, nil |
| }) |
| }) |
| } else { |
| err = Raw(c).Run(finalized[0], func(key *Key, pm PropertyMap, _ CursorCB) error { |
| return dispatchEntity(key, pm, nil) |
| }) |
| } |
| return filterStop(err) |
| } |
| |
| // All iterators (active and exhausted) in some arbitrary order. |
| iterators := make([]*queryIterator, 0, len(finalized)) |
| |
| c, cancel := context.WithCancel(c) |
| eg, ectx := errgroup.WithContext(c) |
| |
| // Make sure all spawned goroutines have fully stopped before returning. |
| defer func() { |
| // Signal all iterators to stop ASAP. |
| cancel() |
| // Wait for all of them to stop. Calling Next makes sure internal goroutines |
| // are not getting stuck trying to write to a channel that nothing is |
| // reading from (this blocks forever). |
| for _, iter := range iterators { |
| for done := false; !done; done, _ = iter.Next() { |
| } |
| } |
| // All goroutines should be stopping now. Wait until they are fully stopped. |
| _ = eg.Wait() |
| }() |
| |
| // Launch all queries in parallel. Do it before ordering them as a heap, since |
| // to build a heap we need to have the first result from each query. We want |
| // all such first results to be fetched *in parallel*. |
| for _, fq := range finalized { |
| iterators = append(iterators, startQueryIterator(ectx, eg, fq)) |
| } |
| |
| // Wait for first items from all iterators. Gather all non-exhausted iterators |
| // to make a sorted heap out of them. |
| iHeap := make(iteratorHeap, 0, len(iterators)) |
| for _, iter := range iterators { |
| switch done, err := iter.Next(); { |
| case err != nil: |
| return err // the defer will clean up everything |
| case !done: |
| iHeap = append(iHeap, iter) |
| } |
| } |
| heap.Init(&iHeap) |
| |
| // ccb is the cursor callback for RunMulti. This grabs all the cursors for the |
| // queries involved and returns a single cursor. It is only executed if the |
| // user callback invokes it. |
| var ccb CursorCB |
| if hasCursorCB { |
| ccb = func() (Cursor, error) { |
| // Sort the list of queries. It is OK to update `iterators` in-place here. |
| // It is only used in the defer, the order doesn't matter there. |
| sort.Slice(iterators, func(i, j int) bool { |
| queryI := iterators[i].Query() |
| queryJ := iterators[j].Query() |
| return queryI.Less(queryJ) |
| }) |
| // Create the cursor. It points to all items currently sitting in heap. |
| // We'll need to refetch them all again to repopulate the heap when |
| // resuming the query. |
| var curs multicursor.Cursors |
| curs.MagicNumber = multiCursorMagic |
| curs.Version = multiCursorVersion |
| for _, iter := range iterators { |
| switch cur, err := iter.CurrentCursor(); { |
| case err != nil: |
| return nil, err |
| case cur != nil: |
| curs.Cursors = append(curs.Cursors, cur.String()) |
| default: |
| curs.Cursors = append(curs.Cursors, "") |
| } |
| } |
| return multiCursor{curs: &curs}, nil |
| } |
| } |
| |
| // If queries are ordered only by key, all duplicates will be returned from |
| // the heap one after another and we can use a simple check to skip them. This |
| // is important for CountMulti(...) that can be visiting tens of thousands |
| // of entities: storing them all in a hash map for deduplication is a waste of |
| // memory. |
| // |
| // Use a hash map for any other ordering. There may be weird results if this |
| // is running non-transactionally and two different subqueries see two |
| // different versions of the same entity (with different values of fields |
| // affecting the order). Such entity will appear twice in the output, with |
| // some other entities in between these appearances. A simple check will not |
| // detect such deduplication. |
| var seenKey func(keyStr string) bool |
| if overallOrder == "__key__" || overallOrder == "-__key__" { |
| lastSeen := "" |
| seenKey = func(keyStr string) bool { |
| if lastSeen == keyStr { |
| return true |
| } |
| lastSeen = keyStr |
| return false |
| } |
| } else { |
| seenKeys := stringset.New(128) |
| seenKey = func(keyStr string) bool { |
| return !seenKeys.Add(keyStr) |
| } |
| } |
| |
| // Merge query results. |
| for iHeap.Len() > 0 { |
| pm, key, keyStr, err := iHeap.nextData() |
| if err != nil { |
| return err |
| } |
| if !seenKey(keyStr) { |
| if err := dispatchEntity(key, pm, ccb); err != nil { |
| return filterStop(err) |
| } |
| } |
| } |
| return nil |
| } |
| |
| // Count executes the given query and returns the number of entries which |
| // match it. |
| // |
| // If the query is marked as eventually consistent via EventualConsistency(true) |
| // will use a fast server-side aggregation, with the downside that such queries |
| // may return slightly stale results and can't be used inside transactions. |
| // |
| // If the query is strongly consistent, will essentially do a full keys-only |
| // query and count the number of matches locally. |
| func Count(c context.Context, q *Query) (int64, error) { |
| fq, err := q.Finalize() |
| if err != nil { |
| return 0, err |
| } |
| v, err := Raw(c).Count(fq) |
| return v, filterStop(err) |
| } |
| |
| // CountMulti runs multiple queries in parallel and counts the total number of |
| // unique entities produced by them. |
| // |
| // Unlike Count, this method doesn't support server-side aggregation. It always |
| // does full keys-only queries. If you have only one query and don't care about |
| // strong consistency, use `Count(c, q.EventualConsistency(true))`: it will use |
| // the server-side aggregation which is orders of magnitude faster than the |
| // local counting. |
| func CountMulti(c context.Context, queries []*Query) (int64, error) { |
| var count int64 |
| err := RunMulti(c, queries, |
| func(_ *Key) error { |
| // RunMulti already does deduplication, we just need to count unique hits. |
| count++ |
| return nil |
| }, |
| ) |
| if err != nil { |
| return 0, err |
| } |
| return count, nil |
| } |
| |
| // DecodeCursor converts a string returned by a Cursor into a Cursor instance. |
| // It will return an error if the supplied string is not valid, or could not |
| // be decoded by the implementation. |
| func DecodeCursor(c context.Context, s string) (Cursor, error) { |
| return Raw(c).DecodeCursor(s) |
| } |
| |
| // GetAll retrieves all of the Query results into dst. |
| // |
| // By default, datastore applies a short (~5s) timeout to queries. This can be |
| // increased, usually to around several minutes, by explicitly setting a |
| // deadline on the supplied Context. |
| // |
| // dst must be one of: |
| // - *[]S or *[]*S, where S is a struct |
| // - *[]P or *[]*P, where *P is a concrete type implementing |
| // PropertyLoadSaver |
| // - *[]*Key implies a keys-only query. |
| func GetAll(c context.Context, q *Query, dst any) error { |
| return getAllRaw(Raw(c), q, dst) |
| } |
| |
| func getAllRaw(raw RawInterface, q *Query, dst any) error { |
| v := reflect.ValueOf(dst) |
| if v.Kind() != reflect.Ptr { |
| panic(fmt.Errorf("invalid GetAll dst: must have a ptr-to-slice: %T", dst)) |
| } |
| if !v.IsValid() || v.IsNil() { |
| panic(errors.New("invalid GetAll dst: <nil>")) |
| } |
| |
| if keys, ok := dst.(*[]*Key); ok { |
| fq, err := q.KeysOnly(true).Finalize() |
| if err != nil { |
| return err |
| } |
| |
| return raw.Run(fq, func(k *Key, _ PropertyMap, _ CursorCB) error { |
| *keys = append(*keys, k) |
| return nil |
| }) |
| } |
| fq, err := q.Finalize() |
| if err != nil { |
| return err |
| } |
| |
| slice := v.Elem() |
| mat := mustParseMultiArg(slice.Type()) |
| if mat.newElem == nil { |
| panic(fmt.Errorf("invalid GetAll dst (non-concrete element type): %T", dst)) |
| } |
| |
| errs := map[int]error{} |
| i := 0 |
| err = filterStop(raw.Run(fq, func(k *Key, pm PropertyMap, _ CursorCB) error { |
| slice.Set(reflect.Append(slice, mat.newElem())) |
| itm := slice.Index(i) |
| mat.setKey(itm, k) |
| err := mat.setPM(itm, pm) |
| if err != nil { |
| errs[i] = err |
| } |
| i++ |
| return nil |
| })) |
| if err == nil { |
| if len(errs) > 0 { |
| me := make(errors.MultiError, slice.Len()) |
| for i, e := range errs { |
| me[i] = e |
| } |
| err = me |
| } |
| } |
| return err |
| } |
| |
| // Exists tests if the supplied objects are present in the datastore. |
| // |
| // ent must be one of: |
| // - *S, where S is a struct |
| // - *P, where *P is a concrete type implementing PropertyLoadSaver |
| // - []S or []*S, where S is a struct |
| // - []P or []*P, where *P is a concrete type implementing PropertyLoadSaver |
| // - []I, where I is some interface type. Each element of the slice must have |
| // either *S or *P as its underlying type. |
| // - *Key, to check a specific key from the datastore. |
| // - []*Key, to check a slice of keys from the datastore. |
| // |
| // nil values (or interface-typed nils) are not allowed, neither as standalone |
| // arguments nor inside slices. Passing them will cause a panic. |
| // |
| // If an error is encountered, the returned error value will depend on the |
| // input arguments. If one argument is supplied, the result will be the |
| // encountered error type. If multiple arguments are supplied, the result will |
| // be a MultiError whose error index corresponds to the argument in which the |
| // error was encountered. |
| // |
| // If an ent argument is a slice, its error type will be a MultiError. Note |
| // that in the scenario, where multiple slices are provided, this will return a |
| // MultiError containing a nested MultiError for each slice argument. |
| func Exists(c context.Context, ent ...any) (*ExistsResult, error) { |
| if len(ent) == 0 { |
| return nil, nil |
| } |
| |
| mma, err := makeMetaMultiArg(ent, mmaKeysOnly) |
| if err != nil { |
| panic(err) |
| } |
| |
| keys, _, et := mma.getKeysPMs(GetKeyContext(c), false) |
| if len(keys) == 0 { |
| return nil, nil |
| } |
| |
| var dat DroppedArgTracker |
| dat.MarkNilKeys(keys) |
| keys, dal := dat.DropKeys(keys) |
| |
| bt := newBoolTracker(mma, et) |
| err = Raw(c).GetMulti(keys, nil, func(compressedIdx int, _ PropertyMap, err error) { |
| idx := dal.OriginalIndex(compressedIdx) |
| bt.trackExistsResult(mma.index(idx), err) |
| }) |
| |
| if err == nil { |
| err = bt.error() |
| } |
| return bt.result(), maybeSingleError(err, ent) |
| } |
| |
| // Get retrieves objects from the datastore. |
| // |
| // Each element in dst must be one of: |
| // - *S, where S is a struct |
| // - *P, where *P is a concrete type implementing PropertyLoadSaver |
| // - []S or []*S, where S is a struct |
| // - []P or []*P, where *P is a concrete type implementing PropertyLoadSaver |
| // - []I, where I is some interface type. Each element of the slice must have |
| // either *S or *P as its underlying type. |
| // |
| // nil values (or interface-typed nils) are not allowed, neither as standalone |
| // arguments nor inside slices. Passing them will cause a panic. |
| // |
| // If an error is encountered, the returned error value will depend on the |
| // input arguments. If one argument is supplied, the result will be the |
| // encountered error type. If multiple arguments are supplied, the result will |
| // be a MultiError whose error index corresponds to the argument in which the |
| // error was encountered. |
| // |
| // If a dst argument is a slice, its error type will be a MultiError. Note |
| // that in the scenario where multiple slices are provided, this will return a |
| // MultiError containing a nested MultiError for each slice argument. |
| // |
| // If there was an issue retrieving the entity, the input `dst` objects will |
| // not be affected. This means that you can populate an object for dst with some |
| // values, do a Get, and on an ErrNoSuchEntity, do a Put (inside a transaction, |
| // of course :)). |
| func Get(c context.Context, dst ...any) error { |
| if len(dst) == 0 { |
| return nil |
| } |
| |
| mma, err := makeMetaMultiArg(dst, mmaReadWrite) |
| if err != nil { |
| panic(err) |
| } |
| |
| keys, pms, et := mma.getKeysPMs(GetKeyContext(c), true) |
| if len(keys) == 0 { |
| return nil |
| } |
| |
| var dat DroppedArgTracker |
| dat.MarkNilKeysVals(keys, pms) |
| keys, pms, dal := dat.DropKeysAndVals(keys, pms) |
| |
| meta := NewMultiMetaGetter(pms) |
| err = Raw(c).GetMulti(keys, meta, func(compressedIdx int, pm PropertyMap, err error) { |
| idx := dal.OriginalIndex(compressedIdx) |
| index := mma.index(idx) |
| if err != nil { |
| et.trackError(index, err) |
| return |
| } |
| |
| mat, v := mma.get(index) |
| if err := mat.setPM(v, pm); err != nil { |
| et.trackError(index, err) |
| return |
| } |
| }) |
| |
| if err == nil { |
| err = et.error() |
| } |
| return maybeSingleError(err, dst) |
| } |
| |
| // Put writes objects into the datastore. |
| // |
| // src must be one of: |
| // - *S, where S is a struct |
| // - *P, where *P is a concrete type implementing PropertyLoadSaver |
| // - []S or []*S, where S is a struct |
| // - []P or []*P, where *P is a concrete type implementing PropertyLoadSaver |
| // - []I, where I is some interface type. Each element of the slice must have |
| // either *S or *P as its underlying type. |
| // |
| // nil values (or interface-typed nils) are not allowed, neither as standalone |
| // arguments nor inside slices. Passing them will cause a panic. |
| // |
| // A *Key will be extracted from src via KeyForObj. If |
| // extractedKey.IsIncomplete() is true, and the object is put to the datastore |
| // successfully, then Put will write the resolved (datastore-generated) *Key |
| // back to src. |
| // |
| // NOTE: The datastore only autogenerates *Keys with integer IDs. Only models |
| // which use a raw `$key` or integer-typed `$id` field are elegible for this. |
| // A model with a string-typed `$id` field will not accept an integer id'd *Key |
| // and will cause the Put to fail. |
| // |
| // If an error is encountered, the returned error value will depend on the |
| // input arguments. If one argument is supplied, the result will be the |
| // encountered error type. If multiple arguments are supplied, the result will |
| // be a MultiError whose error index corresponds to the argument in which the |
| // error was encountered. |
| // |
| // If a src argument is a slice, its error type will be a MultiError. Note |
| // that in the scenario where multiple slices are provided, this will return a |
| // MultiError containing a nested MultiError for each slice argument. |
| func Put(c context.Context, src ...any) error { |
| return putRaw(Raw(c), GetKeyContext(c), src) |
| } |
| |
| func putRaw(raw RawInterface, kctx KeyContext, src []any) error { |
| if len(src) == 0 { |
| return nil |
| } |
| |
| mma, err := makeMetaMultiArg(src, mmaReadWrite) |
| if err != nil { |
| panic(err) |
| } |
| |
| keys, vals, et := mma.getKeysPMs(kctx, false) |
| if len(keys) == 0 { |
| return nil |
| } |
| |
| var dat DroppedArgTracker |
| dat.MarkNilKeysVals(keys, vals) |
| keys, vals, dal := dat.DropKeysAndVals(keys, vals) |
| |
| err = raw.PutMulti(keys, vals, func(compressedIdx int, key *Key, err error) { |
| idx := dal.OriginalIndex(compressedIdx) |
| index := mma.index(idx) |
| |
| if err != nil { |
| et.trackError(index, err) |
| return |
| } |
| |
| if !key.Equal(keys[compressedIdx]) { |
| mat, v := mma.get(index) |
| mat.setKey(v, key) |
| } |
| }) |
| if err == nil { |
| err = et.error() |
| } |
| return maybeSingleError(err, src) |
| } |
| |
| // Delete removes the supplied entities from the datastore. |
| // |
| // ent must be one of: |
| // - *S, where S is a struct |
| // - *P, where *P is a concrete type implementing PropertyLoadSaver |
| // - []S or []*S, where S is a struct |
| // - []P or []*P, where *P is a concrete type implementing PropertyLoadSaver |
| // - []I, where I is some interface type. Each element of the slice must have |
| // either *S or *P as its underlying type. |
| // - *Key, to remove a specific key from the datastore. |
| // - []*Key, to remove a slice of keys from the datastore. |
| // |
| // nil values (or interface-typed nils) are not allowed, neither as standalone |
| // arguments nor inside slices. Passing them will cause a panic. |
| // |
| // If an error is encountered, the returned error value will depend on the |
| // input arguments. If one argument is supplied, the result will be the |
| // encountered error type. If multiple arguments are supplied, the result will |
| // be a MultiError whose error index corresponds to the argument in which the |
| // error was encountered. |
| // |
| // If an ent argument is a slice, its error type will be a MultiError. Note |
| // that in the scenario where multiple slices are provided, this will return a |
| // MultiError containing a nested MultiError for each slice argument. |
| func Delete(c context.Context, ent ...any) error { |
| if len(ent) == 0 { |
| return nil |
| } |
| |
| mma, err := makeMetaMultiArg(ent, mmaKeysOnly) |
| if err != nil { |
| panic(err) |
| } |
| |
| keys, _, et := mma.getKeysPMs(GetKeyContext(c), false) |
| if len(keys) == 0 { |
| return nil |
| } |
| |
| var dat DroppedArgTracker |
| dat.MarkNilKeys(keys) |
| keys, dal := dat.DropKeys(keys) |
| |
| err = Raw(c).DeleteMulti(keys, func(compressedIdx int, err error) { |
| idx := dal.OriginalIndex(compressedIdx) |
| |
| if err != nil { |
| index := mma.index(idx) |
| et.trackError(index, err) |
| } |
| }) |
| |
| if err == nil { |
| err = et.error() |
| } |
| return maybeSingleError(err, ent) |
| } |
| |
| // GetTestable returns the Testable interface for the implementation, or nil if |
| // there is none. |
| func GetTestable(c context.Context) Testable { |
| return Raw(c).GetTestable() |
| } |
| |
| // maybeSingleError normalizes the error experience between single- and |
| // multi-element API calls. |
| // |
| // Single-element API calls will return a single error for that element, while |
| // multi-element API calls will return a MultiError, one for each element. This |
| // accepts the slice of elements that is being operated on and determines what |
| // sort of error to return. |
| func maybeSingleError(err error, elems []any) error { |
| if err == nil { |
| return nil |
| } |
| if len(elems) == 1 { |
| return errors.SingleError(err) |
| } |
| return err |
| } |
| |
| func filterStop(err error) error { |
| if err == Stop { |
| err = nil |
| } |
| return err |
| } |
| |
| // a min heap for a slice of queryIterator. |
| // |
| // All iterators are in "not done" state. |
| type iteratorHeap []*queryIterator |
| |
| var _ heap.Interface = &iteratorHeap{} |
| |
| func (h iteratorHeap) Len() int { return len(h) } |
| |
| func (h iteratorHeap) Less(i, j int) bool { return h[i].CurrentItemOrder() < h[j].CurrentItemOrder() } |
| |
| func (h iteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
| |
| func (h *iteratorHeap) Push(x any) { |
| *h = append(*h, x.(*queryIterator)) |
| } |
| |
| func (h *iteratorHeap) Pop() any { |
| old := *h |
| n := len(old) |
| item := old[n-1] |
| *h = old[0 : n-1] |
| return item |
| } |
| |
| // nextData returns data of the peak queryIterator, advances the queryIterator |
| // and either removes it from the heap (if it has no results left) or adjusts |
| // its position in the heap. |
| // |
| // Must be called only with a non-empty heap. |
| func (h *iteratorHeap) nextData() (pm PropertyMap, key *Key, keyStr string, err error) { |
| if len(*h) == 0 { |
| panic("the heap is empty") |
| } |
| |
| qi := (*h)[0] |
| key, pm = qi.CurrentItem() |
| keyStr = qi.CurrentItemKey() |
| |
| var done bool |
| done, err = qi.Next() |
| if !done { |
| heap.Fix(h, 0) |
| } else { |
| heap.Remove(h, 0) |
| } |
| |
| return |
| } |