[datastore] Implement __scatter__ special entity property.

It is useful when implementing datastore mappers, and for queries that want to
query for a random sample of the data.

See https://github.com/GoogleCloudPlatform/appengine-mapreduce/wiki/ScatterPropertyImplementation

R=iannucci@chromium.org

Change-Id: Ie954b123d632867ae2fe0833df51b117d078e06f
Reviewed-on: https://chromium-review.googlesource.com/1189742
Commit-Queue: Vadim Shtayura <vadimsh@chromium.org>
Reviewed-by: Robbie Iannucci <iannucci@chromium.org>
diff --git a/impl/memory/datastore.go b/impl/memory/datastore.go
index ec059ae..2a845cd 100644
--- a/impl/memory/datastore.go
+++ b/impl/memory/datastore.go
@@ -101,6 +101,7 @@
 }
 
 func (d *dsImpl) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
+	cb = d.data.stripSpecialPropsRunCB(cb)
 	idx, head := d.data.getQuerySnaps(!fq.EventuallyConsistent())
 	err := executeQuery(fq, d.kc, false, idx, head, cb)
 	if d.data.maybeAutoIndex(err) {
@@ -171,6 +172,10 @@
 	d.data.setDisableSpecialEntities(disabled)
 }
 
+func (d *dsImpl) ShowSpecialProperties(show bool) {
+	d.data.setShowSpecialProperties(show)
+}
+
 func (d *dsImpl) SetConstraints(c *ds.Constraints) error {
 	if c == nil {
 		c = &ds.Constraints{}
@@ -227,6 +232,7 @@
 	// It's possible that if you have full-consistency and also auto index enabled
 	// that this would make sense... but at that point you should probably just
 	// add the index up front.
+	cb = d.data.parent.stripSpecialPropsRunCB(cb)
 	return executeQuery(q, d.kc, true, d.data.snap, d.data.snap, cb)
 }
 
diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go
index 4a17f23..fc84c36 100644
--- a/impl/memory/datastore_data.go
+++ b/impl/memory/datastore_data.go
@@ -16,6 +16,8 @@
 
 import (
 	"bytes"
+	"crypto/sha256"
+	"encoding/binary"
 	"fmt"
 	"strings"
 	"sync"
@@ -64,6 +66,11 @@
 	// key will become an error.
 	disableSpecialEntities bool
 
+	// true means __scatter__ and other internal special properties won't be
+	// stripped off from getMutli results. Note that in real datastore there's
+	// no way to expose them.
+	showSpecialProps bool
+
 	// constraints is the fake datastore constraints. By default, this will match
 	// the Constraints of the "impl/prod" datastore.
 	constraints ds.Constraints
@@ -142,6 +149,40 @@
 	return d.disableSpecialEntities
 }
 
+func (d *dataStoreData) setShowSpecialProperties(show bool) {
+	d.rwlock.Lock()
+	defer d.rwlock.Unlock()
+	d.showSpecialProps = show
+}
+
+func (d *dataStoreData) stripSpecialPropsGetCB(cb ds.GetMultiCB) ds.GetMultiCB {
+	d.rwlock.RLock()
+	defer d.rwlock.RUnlock()
+
+	if d.showSpecialProps {
+		return cb
+	}
+
+	return func(idx int, val ds.PropertyMap, err error) error {
+		stripSpecialProps(val)
+		return cb(idx, val, err)
+	}
+}
+
+func (d *dataStoreData) stripSpecialPropsRunCB(cb ds.RawRunCB) ds.RawRunCB {
+	d.rwlock.RLock()
+	defer d.rwlock.RUnlock()
+
+	if d.showSpecialProps {
+		return cb
+	}
+
+	return func(key *ds.Key, val ds.PropertyMap, getCursor ds.CursorCB) error {
+		stripSpecialProps(val)
+		return cb(key, val, getCursor)
+	}
+}
+
 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head memStore) {
 	d.rwlock.RLock()
 	defer d.rwlock.RUnlock()
@@ -331,7 +372,6 @@
 
 	for i, k := range keys {
 		newPM, _ := vals[i].Save(false)
-		entityBlob := serialize.ToBytesWithContext(newPM)
 
 		k, err := func() (key *ds.Key, err error) {
 			if !lockedAlready {
@@ -350,13 +390,23 @@
 			}
 			keyBlob := keyBytes(key)
 
+			// Now that we have the complete key, we can use it to generate special
+			// __scatter__ property, which is a function of the key. We can't
+			// serialize newPM to bytes until we've done this step. Thus we do the
+			// serialization under the lock.
+			//
+			// If this is undesirable, this code can be restructured to grab the lock
+			// twice: once to generate the key, and the second time to actually store
+			// the entity and update indexes.
+			ensureSpecialProps(keyBlob, newPM)
+
 			var oldPM ds.PropertyMap
 			if old := ents.Get(keyBlob); old != nil {
 				if oldPM, err = readPropMap(old); err != nil {
 					return
 				}
 			}
-			ents.Set(keyBlob, entityBlob)
+			ents.Set(keyBlob, serialize.ToBytesWithContext(newPM))
 			updateIndexes(d.head, key, oldPM, newPM)
 			return
 		}()
@@ -390,7 +440,7 @@
 
 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
 	ents := d.takeSnapshot().GetCollection("ents:" + keys[0].Namespace())
-	getMultiInner(keys, cb, ents)
+	getMultiInner(keys, d.stripSpecialPropsGetCB(cb), ents)
 	return nil
 }
 
@@ -627,7 +677,7 @@
 		}
 	}
 	ents := td.snap.GetCollection("ents:" + keys[0].Namespace())
-	getMultiInner(keys, cb, ents)
+	getMultiInner(keys, td.parent.stripSpecialPropsGetCB(cb), ents)
 	return nil
 }
 
@@ -671,3 +721,34 @@
 	}
 	return v, false
 }
+
+// __scatter__ is a "hidden" indexed byte string property containing a hash of
+// the key (of some unspecified nature). It is added to a small percentage of
+// the datastore entities (0.78% in prod), to be used in .order(__scatter__)
+// queries, to aid mapper frameworks to partition the key space into
+// approximately even ranges. Here we use 50% percentage instead, as also does
+// dev_appserver.
+//
+// See https://github.com/GoogleCloudPlatform/appengine-mapreduce/wiki/ScatterPropertyImplementation
+
+func isSpecialProp(prop string) bool {
+	return prop == "__scatter__"
+}
+
+func ensureSpecialProps(keyBlob []byte, pm ds.PropertyMap) {
+	h := sha256.Sum256(keyBlob)
+	i := binary.BigEndian.Uint16(h[:2])
+	if i >= 32768 {
+		pm["__scatter__"] = ds.MkProperty(h[:])
+	} else {
+		// This is for the case when the entity struct has "output only" __scatter__
+		// field. We don't want to store and index empty strings. This is possible
+		// only in testing code, since real datastore never accepts nor returns
+		// __scatter__.
+		delete(pm, "__scatter__")
+	}
+}
+
+func stripSpecialProps(pm ds.PropertyMap) {
+	delete(pm, "__scatter__")
+}
diff --git a/impl/memory/datastore_index_selection.go b/impl/memory/datastore_index_selection.go
index 1603250..7d02c01 100644
--- a/impl/memory/datastore_index_selection.go
+++ b/impl/memory/datastore_index_selection.go
@@ -271,7 +271,7 @@
 		props.Add(col.Property)
 	}
 	for _, prop := range props.ToSlice() {
-		if strings.HasPrefix(prop, "__") && strings.HasSuffix(prop, "__") {
+		if !isSpecialProp(prop) && (strings.HasPrefix(prop, "__") && strings.HasSuffix(prop, "__")) {
 			continue
 		}
 		if idxs.maybeAddDefinition(q, s, missingTerms, &ds.IndexDefinition{
@@ -303,7 +303,7 @@
 		return !idxs.maybeAddDefinition(q, s, missingTerms, def)
 	})
 
-	// this query is impossible to fulfil with the current indexes. Not all the
+	// this query is impossible to fulfill with the current indexes. Not all the
 	// terms (equality + projection) are satisfied.
 	if missingTerms.Len() > 0 || len(idxs) == 0 {
 		remains := &ds.IndexDefinition{
diff --git a/impl/memory/datastore_test.go b/impl/memory/datastore_test.go
index 2c51f29..cd56d69 100644
--- a/impl/memory/datastore_test.go
+++ b/impl/memory/datastore_test.go
@@ -15,6 +15,7 @@
 package memory
 
 import (
+	"encoding/hex"
 	"errors"
 	"fmt"
 	"sync"
@@ -58,6 +59,8 @@
 	Name  string
 	Multi []string
 	Key   *ds.Key
+
+	Scatter []byte `gae:"__scatter__"` // this is normally invisible
 }
 
 func TestDatastoreSingleReadWriter(t *testing.T) {
@@ -668,6 +671,53 @@
 				})
 			}
 		})
+
+		Convey("Testable.ShowSpecialProperties", func() {
+			ds.GetTestable(c).ShowSpecialProperties(true)
+
+			var ents []Foo
+			for i := 0; i < 10; i++ {
+				ent := &Foo{}
+				So(ds.Put(c, ent), ShouldBeNil)
+				ents = append(ents, *ent)
+			}
+			So(ds.Get(c, ents), ShouldBeNil)
+
+			// Some of these entities (~50%) should have __scatter__ property
+			// populated. The algorithm is deterministic.
+			scatter := make([]string, len(ents))
+			for i, e := range ents {
+				scatter[i] = hex.EncodeToString(e.Scatter)
+			}
+			So(scatter, ShouldResemble, []string{
+				"d77e219d0669b1808f236ca5b25127bf8e865e3f0e68b792374526251c873c61",
+				"",
+				"",
+				"",
+				"",
+				"",
+				"b592c9de652ffc3f458910247fc16690ba2ceeef20a8566fda5dd989a5fc160e",
+				"bcefad8a2212ee1cfa3636e94264b8c73c90eaded9f429e27c7384830c1e381c",
+				"d2358c1d9e5951be7117e06eaec96a6a63090f181615e2c51afaf7f214e4d873",
+				"b29a46a6c01adb88d7001fe399d6346d5d2725b190f4fb025c9cb7c73c4ffb15",
+			})
+		})
+
+		Convey("Query by __scatter__", func() {
+			for i := 0; i < 100; i++ {
+				So(ds.Put(c, &Foo{}), ShouldBeNil)
+			}
+			ds.GetTestable(c).CatchupIndexes()
+
+			var ids []int64
+			So(ds.Run(c, ds.NewQuery("Foo").Order("__scatter__").Limit(5), func(f *Foo) {
+				So(f.Scatter, ShouldEqual, nil) // it is "invisible"
+				ids = append(ids, f.ID)
+			}), ShouldBeNil)
+
+			// Approximately "even" distribution within [1, 100] range.
+			So(ids, ShouldResemble, []int64{43, 55, 99, 23, 17})
+		})
 	})
 }
 
diff --git a/service/datastore/query.go b/service/datastore/query.go
index b63ff7e..96e2d16 100644
--- a/service/datastore/query.go
+++ b/service/datastore/query.go
@@ -315,7 +315,7 @@
 }
 
 func (q *Query) reserved(field string) bool {
-	if field == "__key__" {
+	if field == "__key__" || field == "__scatter__" {
 		return false
 	}
 	if field == "" {
diff --git a/service/datastore/testable.go b/service/datastore/testable.go
index e122a98..151621a 100644
--- a/service/datastore/testable.go
+++ b/service/datastore/testable.go
@@ -84,6 +84,14 @@
 	// to the user code.
 	DisableSpecialEntities(bool)
 
+	// ShowSpecialProperties disables stripping of special properties added by
+	// the datastore internally (like __scatter__) from result of Get calls.
+	//
+	// Normally such properties are used internally by the datastore or only for
+	// queries. Returning them explicitly is useful for assertions in tests that
+	// rely on queries over special properties.
+	ShowSpecialProperties(bool)
+
 	// SetConstraints sets this instance's constraints. If the supplied
 	// constraints are invalid, an error will be returned.
 	//