blob: d2f587c00d6bdf1f24987180e85b932456e51cbb [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"bytes"
"fmt"
"sort"
"go.chromium.org/luci/common/data/cmpbin"
ds "go.chromium.org/luci/gae/service/datastore"
)
type qIndexSlice []*ds.IndexDefinition
func (s qIndexSlice) Len() int { return len(s) }
func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) }
func defaultIndexes(kind string, sip ds.IndexedProperties) []*ds.IndexDefinition {
ret := make(qIndexSlice, 0, 2*len(sip)+1)
ret = append(ret, &ds.IndexDefinition{Kind: kind})
for name := range sip {
// Skip top-level magical properties. But keep "nested_entity.__key__".
if name == "__key__" || name == "__ancestor__" {
continue
}
ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}})
ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Descending: true}}})
}
if serializationDeterministic {
sort.Sort(ret)
}
return ret
}
// indexEntriesWithBuiltins generates a new memStore containing the default
// indexes for (k, pm) combined with complexIdxs.
//
// If "pm" is nil, this indicates an absence of a value. This is used
// specifically for deletion.
func indexEntriesWithBuiltins(k *ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) memStore {
var sip ds.IndexedProperties
if pm == nil {
return newMemStore()
}
sip = ds.Serialize.IndexedProperties(k, pm)
return indexEntries(k, sip, append(defaultIndexes(k.Kind(), sip), complexIdxs...))
}
// indexRowGen contains enough information to generate all of the index rows which
// correspond with a propertyList and a ds.IndexDefinition.
type indexRowGen struct {
propVec []ds.IndexedPropertySlice
decending []bool
}
// permute calls cb for each index row, in some arbitrary order.
func (s indexRowGen) permute(collSetFn func(k, v []byte)) {
iVec := make([]int, len(s.propVec))
iVecLim := make([]int, len(s.propVec))
incPos := func() bool {
for i := len(iVec) - 1; i >= 0; i-- {
var done bool
var newVal int
if !s.decending[i] {
newVal = (iVec[i] + 1) % iVecLim[i]
done = newVal != 0
} else {
newVal = (iVec[i] - 1)
if newVal < 0 {
newVal = iVecLim[i] - 1
} else {
done = true
}
}
iVec[i] = newVal
if done {
return true
}
}
return false
}
for i, sps := range s.propVec {
iVecLim[i] = len(sps)
}
for i := range iVec {
if s.decending[i] {
iVec[i] = iVecLim[i] - 1
}
}
for {
bufsiz := 0
for pvalSliceIdx, pvalIdx := range iVec {
bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx])
}
buf := cmpbin.Invertible(bytes.NewBuffer(make([]byte, 0, bufsiz)))
for pvalSliceIdx, pvalIdx := range iVec {
data := s.propVec[pvalSliceIdx][pvalIdx]
buf.SetInvert(s.decending[pvalSliceIdx])
_, _ = buf.Write(data)
}
collSetFn(buf.Bytes(), []byte{})
if !incPos() {
break
}
}
}
type matcher struct {
buf indexRowGen
}
// matcher.match checks to see if the mapped, serialized property values
// match the index. If they do, it returns a indexRowGen. Do not write or modify
// the data in the indexRowGen.
func (m *matcher) match(sortBy []ds.IndexColumn, sip ds.IndexedProperties) (indexRowGen, bool) {
m.buf.propVec = m.buf.propVec[:0]
m.buf.decending = m.buf.decending[:0]
for _, sb := range sortBy {
if pv, ok := sip[sb.Property]; ok {
m.buf.propVec = append(m.buf.propVec, pv)
m.buf.decending = append(m.buf.decending, sb.Descending)
} else {
return indexRowGen{}, false
}
}
return m.buf, true
}
// indexEntries generates a new memStore containing index entries for sip for
// the supplied index definitions.
func indexEntries(key *ds.Key, sip ds.IndexedProperties, idxs []*ds.IndexDefinition) memStore {
ret := newMemStore()
idxColl := ret.GetOrCreateCollection("idx")
mtch := matcher{}
for _, idx := range idxs {
idx = idx.Normalize()
if idx.Kind != "" && idx.Kind != key.Kind() {
continue
}
if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok {
idxBin := ds.Serialize.ToBytes(*idx.PrepForIdxTable())
idxColl.Set(idxBin, []byte{})
coll := ret.GetOrCreateCollection(
fmt.Sprintf("idx:%s:%s", key.Namespace(), idxBin))
irg.permute(coll.Set)
}
}
return ret
}
// walkCompIdxs walks the table of compound indexes in the store. If `endsWith`
// is provided, this will only walk over compound indexes which match
// Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix.
func walkCompIdxs(store memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) {
idxColl := store.GetCollection("idx")
if idxColl == nil {
return
}
itrDef := iterDefinition{c: idxColl}
if endsWith != nil {
full := ds.Serialize.ToBytes(*endsWith.Flip())
// chop off the null terminating byte
itrDef.prefix = full[:len(full)-1]
}
it := itrDef.mkIter()
for ent := it.next(); ent != nil; ent = it.next() {
qi, err := ds.Deserialize.IndexDefinition(bytes.NewReader(ent.key))
memoryCorruption(err)
if !cb(qi.Flip()) {
break
}
}
}
func mergeIndexes(ns string, store, oldIdx, newIdx memStore) {
prefixBuf := []byte("idx:" + ns + ":")
origPrefixBufLen := len(prefixBuf)
oldIdx = oldIdx.Snapshot()
newIdx = newIdx.Snapshot()
memStoreCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) {
prefixBuf = append(prefixBuf[:origPrefixBufLen], k...)
ks := string(prefixBuf)
coll := store.GetOrCreateCollection(ks)
oldColl := oldIdx.GetCollection(ks)
newColl := newIdx.GetCollection(ks)
switch {
case ov == nil && nv != nil: // all additions
newColl.ForEachItem(func(k, _ []byte) bool {
coll.Set(k, []byte{})
return true
})
case ov != nil && nv == nil: // all deletions
oldColl.ForEachItem(func(k, _ []byte) bool {
coll.Delete(k)
return true
})
case ov != nil && nv != nil: // merge
memStoreCollide(oldColl, newColl, func(k, ov, nv []byte) {
if nv == nil {
coll.Delete(k)
} else {
coll.Set(k, []byte{})
}
})
default:
impossible(fmt.Errorf("both values from memStoreCollide were nil?"))
}
// TODO(riannucci): remove entries from idxColl and remove index collections
// when there are no index entries for that index any more.
})
}
func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) {
normalized := make([]*ds.IndexDefinition, len(compIdx))
idxColl := store.GetOrCreateCollection("idx")
for i, idx := range compIdx {
normalized[i] = idx.Normalize()
idxColl.Set(ds.Serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{})
}
for _, ns := range namespaces(store) {
kctx := ds.MkKeyContext(aid, ns)
if allEnts := store.Snapshot().GetCollection("ents:" + ns); allEnts != nil {
allEnts.ForEachItem(func(ik, iv []byte) bool {
pm, err := readPropMap(iv)
memoryCorruption(err)
prop, err := ds.Deserializer{KeyContext: kctx}.Property(bytes.NewBuffer(ik))
memoryCorruption(err)
k := prop.Value().(*ds.Key)
sip := ds.Serialize.IndexedProperties(k, pm)
mergeIndexes(ns, store,
newMemStore(),
indexEntries(k, sip, normalized))
return true
})
}
}
}
// updateIndexes updates the indexes in store to accommodate a change in entity
// value.
//
// oldEnt is the previous entity value, and newEnt is the new entity value. If
// newEnt is nil, that signifies deletion.
func updateIndexes(store memStore, key *ds.Key, oldEnt, newEnt ds.PropertyMap) {
// load all current complex query index definitions.
var compIdx []*ds.IndexDefinition
walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool {
compIdx = append(compIdx, i)
return true
})
mergeIndexes(key.Namespace(), store,
indexEntriesWithBuiltins(key, oldEnt, compIdx),
indexEntriesWithBuiltins(key, newEnt, compIdx))
}