blob: 80f0b7133e8428be11f90c64e1fa91f7df493deb [file] [log] [blame]
/*
* Copyright (c) 2012 The Goon Authors
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
package goon
import (
"context"
"encoding/ascii85"
"fmt"
"net/http"
"path/filepath"
"reflect"
"runtime"
"sync"
"time"
"golang.org/x/crypto/blake2b"
"google.golang.org/appengine/v2"
"google.golang.org/appengine/v2/datastore"
"google.golang.org/appengine/v2/log"
"google.golang.org/appengine/v2/memcache"
)
var (
// LogErrors issues appengine.Context.Errorf on any error.
LogErrors = true
// LogTimeoutErrors issues appengine.Context.Warningf on memcache timeout errors.
LogTimeoutErrors = false
// MemcachePutTimeoutThreshold is the number of bytes after which the large
// timeout setting is added to the small timeout setting. This repeats.
// Which means that with a 20 KiB threshold and a 100 KiB memcache payload,
// the final timeout is MemcachePutTimeoutSmall + 5*MemcachePutTimeoutLarge
MemcachePutTimeoutThreshold = 20 * 1024 // 20 KiB
// MemcachePutTimeoutSmall is the minimum time to wait during memcache
// Put operations before aborting them and using the datastore.
MemcachePutTimeoutSmall = 5 * time.Millisecond
// MemcachePutTimeoutLarge is the amount of extra time to wait for larger
// memcache Put requests. See also MemcachePutTimeoutThreshold.
MemcachePutTimeoutLarge = 1 * time.Millisecond
// MemcacheGetTimeout is the amount of time to wait for all memcache Get
// requests, per key fetched. Because we can't really know how big entities
// we are requesting, this setting should be for the maximum size entity.
// The final timeout is limited to the number of maximum sized entities
// an RPC result can contain, so the timeout won't grow insanely large
// if you're fetching a ton of small entities.
MemcacheGetTimeout = 31250 * time.Microsecond // 31.25 milliseconds
// IgnoreFieldMismatch decides whether *datastore.ErrFieldMismatch errors
// should be silently ignored. This allows you to easily remove fields from structs.
IgnoreFieldMismatch = true
)
var (
// Determines if memcache.PutMulti errors are returned by goon.
// Currently only meant for use during goon development testing.
propagateMemcachePutError = false
)
// Goon holds the app engine context and the request memory cache.
type Goon struct {
Context context.Context
cache *cache
inTransaction bool
txnCacheLock sync.Mutex // protects toDelete / toDeleteMC
toDelete map[string]struct{}
toDeleteMC map[string]struct{}
// KindNameResolver is used to determine what Kind to give an Entity.
// Defaults to DefaultKindName
KindNameResolver KindNameResolver
}
// MemcacheKey returns the string form of the provided datastore key.
var MemcacheKey = func(k *datastore.Key) string {
return k.Encode()
}
// Versioning, so that incompatible changes to the cache system won't cause problems
var cacheKeyPrefix = fmt.Sprintf("g%X:", serializationFormatVersion)
// cacheKey returns the fully legal string key used for cache systems
func cacheKey(k *datastore.Key) string {
// By default we just use the prefix + MemcacheKey result
key := cacheKeyPrefix + MemcacheKey(k)
// However if the resulting key length exceeds the maximum allowed ..
if len(key) > memcacheMaxKeySize {
// .. then we need to shorten it while still staying unique.
// We pass the key through the BLAKE2b hash function,
// which is cryptographically secure but also very fast.
// We request an output of 24 bytes (192-bit) for speed reasons.
h, err := blake2b.New(24, nil)
if err != nil {
panic(fmt.Sprintf("Unexpected error initializing blake2b: %v", err))
}
h.Write([]byte(key))
hash := h.Sum(make([]byte, 0, 24))
// After hashing, we encode the results with ascii85.
// Ascii85 works in 4 byte chunks, generating 5 letters for each.
// Which means we turn our 24 byte hash into a 30 letter string.
//
// We want letters instead of using the hash directly for debug reasons.
// As it will be easier for people to observe & manage keys manually.
//
// We aim the length of 30, because 32 is the maximum length
// where the Go std map does key lookups directly without hashing.
encoded := make([]byte, 30, 30)
ascii85.Encode(encoded, hash)
key = string(encoded)
}
return key
}
// Returns the duration that should be used for a memcache.GetMulti timeout.
func memcacheGetTimeout(keyCount int) time.Duration {
// Takes the number of keys given to memcache.GetMulti,
// clamps that to the maximum number of max sized items,
// and multiplies it with the per-max-sized-item timeout.
if keyCount > memcacheMaxRPCSize/memcacheMaxItemSize {
keyCount = memcacheMaxRPCSize / memcacheMaxItemSize
}
return time.Duration(keyCount) * MemcacheGetTimeout
}
// Returns the duration that should be used for a memcache.PutMulti timeout.
func memcachePutTimeout(payloadSize int) time.Duration {
// Takes the payload size given to memcache.PutMulti,
// and returns a dynamic duration based on that size.
return MemcachePutTimeoutSmall + time.Duration(payloadSize/MemcachePutTimeoutThreshold)*MemcachePutTimeoutLarge
}
// Memcache limits
// These are based on the constants in SDK/google/appengine/api/memcache/memcache_stub.py
// Also documented in https://cloud.google.com/appengine/docs/standard/go/memcache/
const (
// The Google provided overhead is 73 bytes, which seems wrong in practice.
// Testing has shown that Put/PutMulti will succeed with even a lower overhead.
// However by 'succeed' I mean not erroring. The value never gets stored.
// The actual overhead seems to be 76 at which point the items will
// start being stored in memcache and show up in the statistics.
memcacheOverhead = 76
memcacheMaxKeySize = 250
memcacheMaxItemSize = 1 << 20 // 1 MiB
memcacheMaxValueSize = memcacheMaxItemSize - memcacheMaxKeySize - memcacheOverhead
memcacheMaxRPCSize = 32 << 20 // 32 MiB
)
// Datastore limits
const (
datastoreGetMultiMaxItems = 1000
datastorePutMultiMaxItems = 500
datastoreDeleteMultiMaxItems = 500
// The maximum GetMulti result RPC size was determined experimentally on 2019-05-20
datastoreGetMultiMaxRPCSize = 50 << 20 // 50 MiB
)
// NewGoon creates a new Goon object from the given request.
func NewGoon(r *http.Request) *Goon {
return FromContext(appengine.NewContext(r))
}
// FromContext creates a new Goon object from the given appengine Context.
// Useful with profiling packages like appstats.
func FromContext(c context.Context) *Goon {
return &Goon{
Context: c,
cache: newCache(defaultCacheLimit),
KindNameResolver: DefaultKindName,
}
}
func (g *Goon) error(err error) {
if !LogErrors {
return
}
_, filename, line, ok := runtime.Caller(1)
if ok {
log.Errorf(g.Context, "goon - %s:%d - %v", filepath.Base(filename), line, err)
} else {
log.Errorf(g.Context, "goon - %v", err)
}
}
func (g *Goon) timeoutError(err error) {
if LogTimeoutErrors {
log.Warningf(g.Context, "goon memcache timeout: %v", err)
}
}
func (g *Goon) memcacheDeleteError(err error) {
if err == nil || !LogErrors {
return
}
if me, ok := err.(appengine.MultiError); ok {
for i := range me {
if me[i] != nil && me[i] != memcache.ErrCacheMiss {
err = me[i]
goto reportError
}
}
return
}
reportError:
_, filename, line, ok := runtime.Caller(1)
if ok {
log.Errorf(g.Context, "goon - %s:%d - memcache.DeleteMulti failed: %v - the goon cache may be out of sync now!", filepath.Base(filename), line, err)
} else {
log.Errorf(g.Context, "goon - memcache.DeleteMulti failed: %v - the goon cache may be out of sync now!", err)
}
}
func (g *Goon) extractKeys(src interface{}, putRequest bool) ([]*datastore.Key, error) {
v := reflect.Indirect(reflect.ValueOf(src))
if v.Kind() != reflect.Slice {
return nil, fmt.Errorf("goon: value must be a slice or pointer-to-slice")
}
l := v.Len()
keys := make([]*datastore.Key, l)
for i := 0; i < l; i++ {
vi := v.Index(i)
key, hasStringId, err := g.getStructKey(vi.Interface())
if err != nil {
return nil, err
}
if !putRequest && key.Incomplete() {
return nil, fmt.Errorf("goon: cannot find a key for struct - %v", vi.Interface())
} else if putRequest && key.Incomplete() && hasStringId {
return nil, fmt.Errorf("goon: empty string id on put")
}
keys[i] = key
}
return keys, nil
}
// Key is the same as KeyError, except nil is returned on error or if the key
// is incomplete.
func (g *Goon) Key(src interface{}) *datastore.Key {
if k, err := g.KeyError(src); err == nil {
return k
}
return nil
}
// Kind returns src's datastore Kind or "" on error.
func (g *Goon) Kind(src interface{}) string {
if k, err := g.KeyError(src); err == nil {
return k.Kind()
}
return ""
}
// KeyError returns the key of src based on its properties.
func (g *Goon) KeyError(src interface{}) (*datastore.Key, error) {
key, _, err := g.getStructKey(src)
return key, err
}
// RunInTransaction runs f in a transaction. It calls f with a transaction
// context tg that f should use for all App Engine operations. Neither cache nor
// memcache are used or set during a transaction.
//
// Otherwise similar to appengine/datastore.RunInTransaction:
// https://developers.google.com/appengine/docs/go/datastore/reference#RunInTransaction
func (g *Goon) RunInTransaction(f func(tg *Goon) error, opts *datastore.TransactionOptions) error {
var ng *Goon
err := datastore.RunInTransaction(g.Context, func(tc context.Context) error {
ng = &Goon{
Context: tc,
inTransaction: true,
toDelete: make(map[string]struct{}),
toDeleteMC: make(map[string]struct{}),
KindNameResolver: g.KindNameResolver,
}
return f(ng)
}, opts)
if err == nil {
ng.txnCacheLock.Lock()
defer ng.txnCacheLock.Unlock()
if len(ng.toDeleteMC) > 0 {
var memkeys []string
for k := range ng.toDeleteMC {
memkeys = append(memkeys, k)
}
g.memcacheDeleteError(memcache.DeleteMulti(g.Context, memkeys))
}
for k := range ng.toDelete {
g.cache.Delete(k)
}
} else {
g.error(err)
}
return err
}
// Put saves the entity src into the datastore based on src's key k. If k
// is an incomplete key, the returned key will be a unique key generated by
// the datastore.
func (g *Goon) Put(src interface{}) (*datastore.Key, error) {
v := reflect.ValueOf(src)
if v.Kind() != reflect.Ptr {
return nil, fmt.Errorf("goon: expected pointer to a struct, got %#v", src)
}
ks, err := g.PutMulti([]interface{}{src})
if err != nil {
if me, ok := err.(appengine.MultiError); ok {
return nil, me[0]
}
return nil, err
}
return ks[0], nil
}
// PutMulti is a batch version of Put.
//
// src must be a *[]S, *[]*S, *[]I, []S, []*S, or []I, for some struct type S,
// or some interface type I. If *[]I or []I, each element must be a struct pointer.
func (g *Goon) PutMulti(src interface{}) ([]*datastore.Key, error) {
keys, err := g.extractKeys(src, true) // allow incomplete keys on a Put request
if err != nil {
return nil, err
}
v := reflect.Indirect(reflect.ValueOf(src))
mu := new(sync.Mutex)
multiErr, any := make(appengine.MultiError, len(keys)), false
goroutines := (len(keys)-1)/datastorePutMultiMaxItems + 1
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(i int) {
defer wg.Done()
lo := i * datastorePutMultiMaxItems
hi := (i + 1) * datastorePutMultiMaxItems
if hi > len(keys) {
hi = len(keys)
}
rkeys, pmerr := datastore.PutMulti(g.Context, keys[lo:hi], v.Slice(lo, hi).Interface())
if pmerr != nil {
mu.Lock()
any = true // this flag tells PutMulti to return multiErr later
mu.Unlock()
merr, ok := pmerr.(appengine.MultiError)
if !ok {
g.error(pmerr)
for j := lo; j < hi; j++ {
multiErr[j] = pmerr
}
return
}
copy(multiErr[lo:hi], merr)
}
for i, key := range keys[lo:hi] {
if multiErr[lo+i] != nil {
continue // there was an error writing this value, go to next
}
vi := v.Index(lo + i).Interface()
if key.Incomplete() {
g.setStructKey(vi, rkeys[i])
keys[lo+i] = rkeys[i]
}
}
}(i)
}
wg.Wait()
// Caches need to be updated after the datastore to prevent a common race condition,
// where a concurrent request will fetch the not-yet-updated data from the datastore
// and populate the caches with it.
cachekeys := make([]string, 0, len(keys))
for _, key := range keys {
if !key.Incomplete() {
cachekeys = append(cachekeys, cacheKey(key))
}
}
if g.inTransaction {
g.txnCacheLock.Lock()
for _, ck := range cachekeys {
g.toDelete[ck] = struct{}{}
g.toDeleteMC[ck] = struct{}{}
}
g.txnCacheLock.Unlock()
} else {
g.cache.DeleteMulti(cachekeys)
g.memcacheDeleteError(memcache.DeleteMulti(g.Context, cachekeys))
}
if any {
return keys, realError(multiErr)
}
return keys, nil
}
// FlushLocalCache clears the local memory cache.
func (g *Goon) FlushLocalCache() {
g.cache.Flush()
}
// ClearCache removes the provided entity from cache.
// Takes either *S or *datastore.Key.
// The 'mem' and 'local' booleans dictate the type of caches to clear.
func (g *Goon) ClearCache(src interface{}, mem, local bool) error {
if !mem && !local {
// Nothing to clear ..
return nil
}
var srcs interface{}
if key, ok := src.(*datastore.Key); ok {
srcs = []*datastore.Key{key}
} else {
v := reflect.ValueOf(src)
if v.Kind() != reflect.Ptr {
return fmt.Errorf("goon: expected pointer to a struct, got %#v", src)
}
srcs = []interface{}{src}
}
err := g.ClearCacheMulti(srcs, mem, local)
if err != nil {
// Look for an embedded error if it's multi
if me, ok := err.(appengine.MultiError); ok {
return me[0]
}
}
return err
}
// ClearCacheMulti removes the provided entities from cache.
// Takes either []*S or []*datastore.Key.
// The 'mem' and 'local' booleans dictate the type of caches to clear.
func (g *Goon) ClearCacheMulti(src interface{}, mem, local bool) error {
if !mem && !local {
// Nothing to clear ..
return nil
}
keys, ok := src.([]*datastore.Key)
if !ok {
var err error
keys, err = g.extractKeys(src, false) // don't allow incomplete keys on a Clear request
if err != nil {
return err
}
}
if len(keys) == 0 {
return nil
// not an error, and it was "successful", so return nil
}
cachekeys := make([]string, 0, len(keys))
for _, key := range keys {
cachekeys = append(cachekeys, cacheKey(key))
}
if g.inTransaction {
g.txnCacheLock.Lock()
for _, ck := range cachekeys {
if local {
g.toDelete[ck] = struct{}{}
}
if mem {
g.toDeleteMC[ck] = struct{}{}
}
}
g.txnCacheLock.Unlock()
} else {
if local {
g.cache.DeleteMulti(cachekeys)
}
if mem {
g.memcacheDeleteError(memcache.DeleteMulti(g.Context, cachekeys))
}
}
return nil
}
type memcacheTask struct {
items []*memcache.Item
size int
}
// NB! putMemcache is expected to treat cacheItem as immutable!
func (g *Goon) putMemcache(citems []*cacheItem) error {
// Go over all the cache items and generate memcache tasks from them,
// by splitting them up based on payload size
items := make([]*memcache.Item, len(citems))
tasks := make([]memcacheTask, 0, 1) // In most cases there's just a single task
lastSplit := 0
payloadSize := 0
for i, citem := range citems {
items[i] = &memcache.Item{
Key: citem.key,
Value: citem.value,
}
itemSize := memcacheOverhead + len(citem.key) + len(citem.value)
if payloadSize+itemSize > memcacheMaxRPCSize {
tasks = append(tasks, memcacheTask{items: items[lastSplit:i], size: payloadSize})
lastSplit = i
payloadSize = 0
}
payloadSize += itemSize
}
tasks = append(tasks, memcacheTask{items: items[lastSplit:len(citems)], size: payloadSize})
// Execute all the tasks with goroutines
count := len(tasks)
errc := make(chan error, count)
for i := 0; i < count; i++ {
go func(idx int) {
tc, cf := context.WithTimeout(g.Context, memcachePutTimeout(tasks[idx].size))
errc <- memcache.SetMulti(tc, tasks[idx].items)
cf()
}(i)
}
// Wait for all goroutines to finish and log any errors.
// Also return a non-deterministic error if there are any.
var rerr error
for i := 0; i < count; i++ {
err := <-errc
if err != nil {
if appengine.IsTimeoutError(err) {
g.timeoutError(err)
} else {
g.error(err)
}
rerr = err
}
}
return rerr
}
// Get loads the entity based on dst's key into dst
// If there is no such entity for the key, Get returns
// datastore.ErrNoSuchEntity.
func (g *Goon) Get(dst interface{}) error {
v := reflect.ValueOf(dst)
if v.Kind() != reflect.Ptr {
return fmt.Errorf("goon: expected pointer to a struct, got %#v", dst)
}
if !v.CanSet() {
v = v.Elem()
}
dsts := []interface{}{dst}
if err := g.GetMulti(dsts); err != nil {
// Look for an embedded error if it's multi
if me, ok := err.(appengine.MultiError); ok {
return me[0]
}
// Not multi, normal error
return err
}
v.Set(reflect.Indirect(reflect.ValueOf(dsts[0])))
return nil
}
// GetMulti is a batch version of Get.
//
// dst must be a *[]S, *[]*S, *[]I, []S, []*S, or []I, for some struct type S,
// or some interface type I. If *[]I or []I, each element must be a struct pointer.
func (g *Goon) GetMulti(dst interface{}) error {
keys, err := g.extractKeys(dst, false) // don't allow incomplete keys on a Get request
if err != nil {
return err
}
v := reflect.Indirect(reflect.ValueOf(dst))
multiErr, anyErr := make(appengine.MultiError, len(keys)), false
var extraErr error
if g.inTransaction {
// todo: support getMultiLimit in transactions
if err := datastore.GetMulti(g.Context, keys, v.Interface()); err != nil {
if merr, ok := err.(appengine.MultiError); ok {
for i := 0; i < len(keys); i++ {
if merr[i] != nil && (!IgnoreFieldMismatch || !errFieldMismatch(merr[i])) {
anyErr = true // this flag tells GetMulti to return multiErr later
multiErr[i] = merr[i]
}
}
} else {
g.error(err)
anyErr = true // this flag tells GetMulti to return multiErr later
for i := 0; i < len(keys); i++ {
multiErr[i] = err
}
}
if anyErr {
return realError(multiErr)
}
}
return nil
}
var dskeys []*datastore.Key
var dsdst []interface{}
var dixs []int // dskeys[5] === keys[dixs[5]]
var mckeys []string
var mixs []int // mckeys[3] =~= keys[mixs[3]]
lckeys := make([]string, 0, len(keys))
for _, key := range keys {
// NB! Current implementation has optimizations in place
// that expect memcache & local cache keys to match.
lckeys = append(lckeys, cacheKey(key))
}
lcvalues := g.cache.GetMulti(lckeys)
for i, key := range keys {
vi := v.Index(i)
if vi.Kind() == reflect.Struct {
vi = vi.Addr()
}
d := vi.Interface()
if data := lcvalues[i]; data != nil {
// Attempt to deserialize the cached value into the struct
err := deserializeStruct(d, data)
if err != nil && (!IgnoreFieldMismatch || !errFieldMismatch(err)) {
if err == datastore.ErrNoSuchEntity || errFieldMismatch(err) {
anyErr = true // this flag tells GetMulti to return multiErr later
multiErr[i] = err
} else {
g.error(err)
return err
}
}
} else {
mckeys = append(mckeys, lckeys[i])
mixs = append(mixs, i)
dskeys = append(dskeys, key)
dsdst = append(dsdst, d)
dixs = append(dixs, i)
}
}
if len(mckeys) == 0 {
if anyErr {
return realError(multiErr)
}
return nil
}
// memcache.GetMulti is limited to memcacheMaxRPCSize for the data returned.
// Thus if the returned data is bigger than memcacheMaxRPCSize - memcacheMaxItemSize
// then we do another memcache.GetMulti on the missing keys.
memvalues := make(map[string]*memcache.Item, len(mckeys))
mcKeysSet := make(map[string]struct{}, len(mckeys))
for _, mk := range mckeys {
mcKeysSet[mk] = struct{}{}
}
for {
nextmckeys := make([]string, 0, len(mcKeysSet))
for mk := range mcKeysSet {
nextmckeys = append(nextmckeys, mk)
}
tc, cf := context.WithTimeout(g.Context, memcacheGetTimeout(len(nextmckeys)))
mvs, err := memcache.GetMulti(tc, nextmckeys)
cf()
// timing out or another error from memcache isn't something to fail over, but do log it
if appengine.IsTimeoutError(err) {
g.timeoutError(err)
break
} else if err != nil {
g.error(err)
break
}
payloadSize := 0
for k, v := range mvs {
memvalues[k] = v
payloadSize += memcacheOverhead + len(v.Key) + len(v.Value)
delete(mcKeysSet, k)
}
if len(mcKeysSet) == 0 || payloadSize < memcacheMaxRPCSize-memcacheMaxItemSize {
break
}
}
if len(memvalues) > 0 {
// since memcache fetch was successful, reset the datastore fetch list and repopulate it
dskeys = dskeys[:0]
dsdst = dsdst[:0]
dixs = dixs[:0]
// we only want to check the returned map if there weren't any errors
// unlike the datastore, memcache will return a smaller map with no error if some of the keys were missed
for i, m := range mckeys {
d := v.Index(mixs[i]).Interface()
if v.Index(mixs[i]).Kind() == reflect.Struct {
d = v.Index(mixs[i]).Addr().Interface()
}
if s, present := memvalues[m]; present {
// Mirror any memcache entries in local cache
g.cache.Set(&cacheItem{key: m, value: s.Value})
// Attempt to deserialize the cached value into the struct
err := deserializeStruct(d, s.Value)
if err != nil && (!IgnoreFieldMismatch || !errFieldMismatch(err)) {
if err == datastore.ErrNoSuchEntity || errFieldMismatch(err) {
anyErr = true // this flag tells GetMulti to return multiErr later
multiErr[mixs[i]] = err
} else {
g.error(err)
return err
}
}
} else {
dskeys = append(dskeys, keys[mixs[i]])
dsdst = append(dsdst, d)
dixs = append(dixs, mixs[i])
}
}
if len(dskeys) == 0 {
if anyErr {
return realError(multiErr)
}
return nil
}
}
mu := new(sync.Mutex)
goroutines := (len(dskeys)-1)/datastoreGetMultiMaxItems + 1
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(i int) {
defer wg.Done()
lo := i * datastoreGetMultiMaxItems
hi := (i + 1) * datastoreGetMultiMaxItems
if hi > len(dskeys) {
hi = len(dskeys)
}
toCache := make([]*cacheItem, 0, hi-lo)
propLists := make([]datastore.PropertyList, hi-lo)
handleProp := func(i, idx int, exists bool) {
// Serialize the properties
data, err := serializeProperties(propLists[i], exists)
if err != nil {
g.error(err)
multiErr[idx] = err
return
}
// Prepare the properties for caching
toCache = append(toCache, &cacheItem{key: lckeys[idx], value: data})
// Deserialize the properties into a struct
if exists {
err = deserializeProperties(dsdst[lo+i], propLists[i])
if err != nil && (!IgnoreFieldMismatch || !errFieldMismatch(err)) {
multiErr[idx] = err
}
}
}
gmerr := datastore.GetMulti(g.Context, dskeys[lo:hi], propLists)
if gmerr != nil {
mu.Lock()
anyErr = true // this flag tells GetMulti to return multiErr later
mu.Unlock()
merr, ok := gmerr.(appengine.MultiError)
if !ok {
g.error(gmerr)
for j := lo; j < hi; j++ {
multiErr[j] = gmerr
}
return
}
for i, idx := range dixs[lo:hi] {
if merr[i] == nil {
handleProp(i, idx, true)
} else {
if merr[i] == datastore.ErrNoSuchEntity {
handleProp(i, idx, false)
}
multiErr[idx] = merr[i]
}
}
} else {
for i, idx := range dixs[lo:hi] {
handleProp(i, idx, true)
}
}
if len(toCache) > 0 {
// Populate memcache in a goroutine because there's network involved
// and we can be doing useful work while waiting for I/O
errc := make(chan error)
go func() {
errc <- g.putMemcache(toCache)
}()
// Populate local cache
g.cache.SetMulti(toCache)
// Wait for memcache population to finish
err := <-errc
// .. but only propagate the memcache error if configured to do so
if propagateMemcachePutError && err != nil {
mu.Lock()
extraErr = err
mu.Unlock()
}
}
}(i)
}
wg.Wait()
if anyErr {
return realError(multiErr)
} else if extraErr != nil {
return extraErr
}
return nil
}
// Delete deletes the provided entity.
// Takes either *S or *datastore.Key.
func (g *Goon) Delete(src interface{}) error {
var srcs interface{}
if key, ok := src.(*datastore.Key); ok {
srcs = []*datastore.Key{key}
} else {
v := reflect.ValueOf(src)
if v.Kind() != reflect.Ptr {
return fmt.Errorf("goon: expected pointer to a struct, got %#v", src)
}
srcs = []interface{}{src}
}
err := g.DeleteMulti(srcs)
if err != nil {
// Look for an embedded error if it's multi
if me, ok := err.(appengine.MultiError); ok {
return me[0]
}
}
return err
}
// DeleteMulti is a batch version of Delete.
// Takes either []*S or []*datastore.Key.
func (g *Goon) DeleteMulti(src interface{}) error {
keys, ok := src.([]*datastore.Key)
if !ok {
var err error
keys, err = g.extractKeys(src, false) // don't allow incomplete keys on a Delete request
if err != nil {
return err
}
}
if len(keys) == 0 {
return nil
// not an error, and it was "successful", so return nil
}
mu := new(sync.Mutex)
multiErr, any := make(appengine.MultiError, len(keys)), false
goroutines := (len(keys)-1)/datastoreDeleteMultiMaxItems + 1
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(i int) {
defer wg.Done()
lo := i * datastoreDeleteMultiMaxItems
hi := (i + 1) * datastoreDeleteMultiMaxItems
if hi > len(keys) {
hi = len(keys)
}
dmerr := datastore.DeleteMulti(g.Context, keys[lo:hi])
if dmerr != nil {
mu.Lock()
any = true // this flag tells DeleteMulti to return multiErr later
mu.Unlock()
merr, ok := dmerr.(appengine.MultiError)
if !ok {
g.error(dmerr)
for j := lo; j < hi; j++ {
multiErr[j] = dmerr
}
return
}
copy(multiErr[lo:hi], merr)
}
}(i)
}
wg.Wait()
// Caches need to be updated after the datastore to prevent a common race condition,
// where a concurrent request will fetch the not-yet-updated data from the datastore
// and populate the caches with it.
cachekeys := make([]string, 0, len(keys))
for _, key := range keys {
cachekeys = append(cachekeys, cacheKey(key))
}
if g.inTransaction {
g.txnCacheLock.Lock()
for _, ck := range cachekeys {
g.toDelete[ck] = struct{}{}
g.toDeleteMC[ck] = struct{}{}
}
g.txnCacheLock.Unlock()
} else {
g.cache.DeleteMulti(cachekeys)
g.memcacheDeleteError(memcache.DeleteMulti(g.Context, cachekeys))
}
if any {
return realError(multiErr)
}
return nil
}
// NotFound returns true if err is an appengine.MultiError and err[idx] is a datastore.ErrNoSuchEntity.
func NotFound(err error, idx int) bool {
if merr, ok := err.(appengine.MultiError); ok {
return idx < len(merr) && merr[idx] == datastore.ErrNoSuchEntity
}
return false
}
// errFieldMismatch returns true if err is *datastore.ErrFieldMismatch
func errFieldMismatch(err error) bool {
_, ok := err.(*datastore.ErrFieldMismatch)
return ok
}
// Returns a single error if each error in MultiError is the same
// otherwise, returns multiError or nil (if multiError is empty)
func realError(multiError appengine.MultiError) error {
if len(multiError) == 0 {
return nil
}
init := multiError[0]
// some errors are *always* returned in MultiError form from the datastore
if _, ok := init.(*datastore.ErrFieldMismatch); ok { // returned in GetMulti
return multiError
}
if init == datastore.ErrInvalidEntityType || // returned in GetMulti
init == datastore.ErrNoSuchEntity { // returned in GetMulti
return multiError
}
// check if all errors are the same
for i := 1; i < len(multiError); i++ {
// since type error could hold structs, pointers, etc,
// the only way to compare non-nil errors is by their string output
if init == nil || multiError[i] == nil {
if init != multiError[i] {
return multiError
}
} else if init.Error() != multiError[i].Error() {
return multiError
}
}
// datastore.ErrInvalidKey is returned as a single error in PutMulti
return init
}