blob: 56ab6883166228e76583491e3e1c05ef0029452d [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txnBuf
import (
"bytes"
"sort"
"go.chromium.org/luci/common/data/cmpbin"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/gae/impl/memory"
ds "go.chromium.org/luci/gae/service/datastore"
)
// queryToIter takes a FinalizedQuery and returns an iterator function which
// will produce either *items or errors.
//
// - d is the raw datastore to run this query on
// - filter is a function which will return true if the given key should be
// excluded from the result set.
func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterface) func() (*item, error) {
c := make(chan *item)
go func() {
defer close(c)
err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) error {
i := &item{key: k, data: pm}
select {
case c <- i:
return nil
case <-stopChan:
return ds.Stop
}
})
if err != nil {
c <- &item{err: err}
}
}()
return func() (*item, error) {
itm := <-c
if itm == nil {
return nil, nil
}
if itm.err != nil {
return nil, itm.err
}
return itm, nil
}
}
// adjustQuery applies various mutations to the query to make it suitable for
// merging. In general, this removes limits and offsets the 'distinct' modifier,
// and it ensures that if there are sort orders which won't appear in the
// result data that the query is transformed into a projection query which
// contains all of the data. A non-projection query will never be transformed
// in this way.
func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
q := fq.Original()
// The limit and offset must be done in-memory because otherwise we may
// request too few entities from the underlying store if many matching
// entities have been deleted in the buffered transaction.
q = q.Limit(-1)
q = q.Offset(-1)
// distinction must be done in-memory, because otherwise there's no way
// to merge in the effect of the in-flight changes (because there's no way
// to push back to the datastore "yeah, I know you told me that the (1, 2)
// result came from `/Bob,1`, but would you mind pretending that it didn't
// and tell me next the one instead?
q = q.Distinct(false)
// since we need to merge results, we must have all order-related fields
// in each result. The only time we wouldn't have all the data available would
// be for a keys-only or projection query. To fix this, we convert all
// Projection and KeysOnly queries to project on /all/ Orders.
//
// FinalizedQuery already guarantees that all projected fields show up in
// the Orders, but the projected fields could be a subset of the orders.
//
// Additionally on a keys-only query, any orders other than __key__ require
// conversion of this query to a projection query including those orders in
// order to merge the results correctly.
//
// In both cases, the resulting objects returned to the higher layers of the
// stack will only include the information requested by the user; keys-only
// queries will discard all PropertyMap data, and projection queries will
// discard any field data that the user didn't ask for.
orders := fq.Orders()
if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
q = q.KeysOnly(false)
for _, o := range orders {
if o.Property == "__key__" {
continue
}
q = q.Project(o.Property)
}
}
return q.Finalize()
}
// runMergedQueries executes a user query `fq` against the parent datastore as
// well as the in-memory datastore, calling `cb` with the merged result set.
//
// It's expected that the caller of this function will apply limit and offset
// if the query contains those restrictions. This may convert the query to
// an expanded projection query with more data than the user asked for. It's the
// caller's responsibility to prune away the extra data.
//
// See also `dsTxnBuf.Run()`.
func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker,
memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) error) error {
toRun, err := adjustQuery(fq)
if err != nil {
return err
}
cmpLower, cmpUpper := memory.GetBinaryBounds(fq)
cmpOrder := fq.Orders()
cmpFn := func(i *item) string {
return i.getCmpRow(cmpLower, cmpUpper, cmpOrder)
}
dedup := stringset.Set(nil)
distinct := stringset.Set(nil)
distinctOrder := []ds.IndexColumn(nil)
if len(fq.Project()) > 0 { // the original query was a projection query
if fq.Distinct() {
// it was a distinct projection query, so we need to dedup by distinct
// options.
distinct = stringset.New(0)
proj := fq.Project()
distinctOrder = make([]ds.IndexColumn, len(proj))
for i, p := range proj {
distinctOrder[i].Property = p
}
}
} else {
// the original was a normal or keys-only query, so we need to dedup by keys.
dedup = stringset.New(0)
}
stopChan := make(chan struct{})
parIter := queryToIter(stopChan, toRun, parentDS)
memIter := queryToIter(stopChan, toRun, memDS)
parItemGet := func() (*item, error) {
for {
itm, err := parIter()
if itm == nil || err != nil {
return nil, err
}
encKey := itm.getEncKey()
if sizes.has(encKey) || (dedup != nil && dedup.Has(encKey)) {
continue
}
return itm, nil
}
}
memItemGet := func() (*item, error) {
for {
itm, err := memIter()
if itm == nil || err != nil {
return nil, err
}
if dedup != nil && dedup.Has(itm.getEncKey()) {
continue
}
return itm, nil
}
}
defer func() {
close(stopChan)
parItemGet()
memItemGet()
}()
pitm, err := parItemGet()
if err != nil {
return err
}
mitm, err := memItemGet()
if err != nil {
return err
}
for {
// the err can be set during the loop below. If we come around the bend and
// it's set, then we need to return it. We don't check it immediately
// because it's set after we already have a good result to return to the
// user.
if err != nil {
return err
}
usePitm := pitm != nil
if pitm != nil && mitm != nil {
usePitm = cmpFn(pitm) < cmpFn(mitm)
} else if pitm == nil && mitm == nil {
break
}
toUse := (*item)(nil)
// we check the error at the beginning of the loop.
if usePitm {
toUse = pitm
pitm, err = parItemGet()
} else {
toUse = mitm
mitm, err = memItemGet()
}
if dedup != nil {
if !dedup.Add(toUse.getEncKey()) {
continue
}
}
if distinct != nil {
// NOTE: We know that toUse will not be used after this point for
// comparison purposes, so re-use its cmpRow property for our distinct
// filter here.
toUse.cmpRow = ""
if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder)) {
continue
}
}
if err := cb(toUse.key, toUse.data); err != nil {
return err
}
}
return nil
}
// toComparableString computes the byte-sortable 'order' string for the given
// key/PropertyMap.
//
// * start/end are byte sequences which are the inequality bounds of the
// query, if any. These are a serialized datastore.Property. If the
// inequality column is inverted, then start and end are also inverted and
// swapped with each other.
// * order is the list of sort orders in the actual executing queries.
// * k / pm are the data to derive a sortable string for.
//
// The result of this function is the series of serialized properties, one per
// order column, which represent this key/pm's first entry in the composite
// index that would point to it (e.g. the one with `order` sort orders).
func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) {
doCmp := true
soFar := []byte{}
ps := ds.Serialize.PropertyMapPartially(k, nil)
for _, ord := range order {
row, ok := ps[ord.Property]
if !ok {
if pslice := pm.Slice(ord.Property); len(pslice) > 0 {
row = ds.Serialize.PropertySlicePartially(pslice)
}
}
sort.Sort(row)
foundOne := false
for _, serialized := range row {
if ord.Descending {
serialized = cmpbin.InvertBytes(serialized)
}
if doCmp {
maybe := cmpbin.ConcatBytes(soFar, serialized)
cmp := bytes.Compare(maybe, start)
if cmp >= 0 {
foundOne = true
soFar = maybe
doCmp = len(soFar) < len(start)
break
}
} else {
foundOne = true
soFar = cmpbin.ConcatBytes(soFar, serialized)
break
}
}
if !foundOne {
return nil, nil
}
}
if end != nil && bytes.Compare(soFar, end) >= 0 {
return nil, nil
}
return soFar, ps["__key__"][0]
}