| // Copyright 2015 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package txnBuf |
| |
| import ( |
| "bytes" |
| "context" |
| "sync" |
| |
| "go.chromium.org/luci/common/data/stringset" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/sync/parallel" |
| "go.chromium.org/luci/gae/impl/memory" |
| "go.chromium.org/luci/gae/service/datastore" |
| "go.chromium.org/luci/gae/service/info" |
| ) |
| |
| // DefaultSizeBudget is the size budget for the root transaction. |
| // |
| // Because our estimation algorithm isn't entirely correct, we take 5% off |
| // the limit for encoding and estimate inaccuracies. |
| // |
| // 10MB taken on 2015/09/24: |
| // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits |
| const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95) |
| |
| // DefaultWriteCountBudget is the maximum number of entities that can be written |
| // in a single call. |
| // |
| // This is not known to be documented, and has instead been extracted from a |
| // datastore error message. |
| const DefaultWriteCountBudget = 500 |
| |
| // XGTransactionGroupLimit is the number of transaction groups to allow in an |
| // XG transaction. |
| // |
| // 25 taken on 2015/09/24: |
| // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can_be_done_in_a_transaction |
| const XGTransactionGroupLimit = 25 |
| |
| // sizeTracker tracks the size of a buffered transaction. The rules are simple: |
| // * deletes count for the size of their key, but 0 data |
| // * puts count for the size of their key plus the 'EstimateSize' for their |
| // data. |
| type sizeTracker struct { |
| keyToSize map[string]int64 |
| total int64 |
| } |
| |
| // set states that the given key is being set to an entity with the size `val`. |
| // A val of 0 means "I'm deleting this key" |
| func (s *sizeTracker) set(key string, val int64) { |
| if s.keyToSize == nil { |
| s.keyToSize = make(map[string]int64) |
| } |
| prev, existed := s.keyToSize[key] |
| s.keyToSize[key] = val |
| s.total += val - prev |
| if !existed { |
| s.total += int64(len(key)) |
| } |
| } |
| |
| // get returns the currently tracked size for key, and wheter or not the key |
| // has any tracked value. |
| func (s *sizeTracker) get(key string) (int64, bool) { |
| size, has := s.keyToSize[key] |
| return size, has |
| } |
| |
| // has returns true iff key has a tracked value. |
| func (s *sizeTracker) has(key string) bool { |
| _, has := s.keyToSize[key] |
| return has |
| } |
| |
| // numWrites returns the number of tracked write operations. |
| func (s *sizeTracker) numWrites() int { |
| return len(s.keyToSize) |
| } |
| |
| // dup returns a duplicate sizeTracker. |
| func (s *sizeTracker) dup() *sizeTracker { |
| if len(s.keyToSize) == 0 { |
| return &sizeTracker{} |
| } |
| k2s := make(map[string]int64, len(s.keyToSize)) |
| for k, v := range s.keyToSize { |
| k2s[k] = v |
| } |
| return &sizeTracker{k2s, s.total} |
| } |
| |
| type txnBufState struct { |
| sync.Mutex |
| |
| // encoded key -> size of entity. A size of 0 means that the entity is |
| // deleted. |
| entState *sizeTracker |
| bufDS datastore.RawInterface |
| |
| roots stringset.Set |
| rootLimit int |
| |
| kc datastore.KeyContext |
| parentDS datastore.RawInterface |
| |
| // sizeBudget is the number of bytes that this transaction has to operate |
| // within. It's only used when attempting to apply() the transaction, and |
| // it is the threshold for the delta of applying this transaction to the |
| // parent transaction. Note that a buffered transaction could actually have |
| // a negative delta if the parent transaction had many large entities which |
| // the inner transaction deleted. |
| sizeBudget int64 |
| // countBudget is the number of entity writes that this transaction has to |
| // operate in. |
| writeCountBudget int |
| } |
| |
| func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datastore.TransactionOptions) error { |
| parentState, _ := ctx.Value(&dsTxnBufParent).(*txnBufState) |
| roots := stringset.New(0) |
| rootLimit := XGTransactionGroupLimit |
| sizeBudget, writeCountBudget := DefaultSizeBudget, DefaultWriteCountBudget |
| if parentState != nil { |
| // TODO(riannucci): this is a bit wonky since it means that a child |
| // transaction declaring XG=true will only get to modify 25 groups IF |
| // they're same groups affected by the parent transactions. So instead of |
| // respecting opts.XG for inner transactions, we just dup everything from |
| // the parent transaction. |
| roots = parentState.roots.Dup() |
| rootLimit = parentState.rootLimit |
| |
| sizeBudget = parentState.sizeBudget - parentState.entState.total |
| writeCountBudget = parentState.writeCountBudget - parentState.entState.numWrites() |
| } |
| |
| state := &txnBufState{ |
| entState: &sizeTracker{}, |
| bufDS: memory.NewDatastore(ctx, info.Raw(ctx)), |
| roots: roots, |
| rootLimit: rootLimit, |
| kc: datastore.GetKeyContext(ctx), |
| parentDS: datastore.Raw(context.WithValue(ctx, &dsTxnBufHaveLock, true)), |
| sizeBudget: sizeBudget, |
| writeCountBudget: writeCountBudget, |
| } |
| if err := cb(context.WithValue(ctx, &dsTxnBufParent, state)); err != nil { |
| return err |
| } |
| |
| // no reason to unlock this ever. At this point it's toast. |
| state.Lock() |
| |
| if parentState == nil { |
| return commitToReal(state) |
| } |
| |
| if err := parentState.canApplyLocked(state); err != nil { |
| return err |
| } |
| |
| parentState.commitLocked(state) |
| return nil |
| } |
| |
| // item is a temporary object for representing key/entity pairs and their cache |
| // state (e.g. if they exist in the in-memory datastore buffer or not). |
| // Additionally item memoizes some common comparison strings. item objects |
| // must never be persisted outside of a single function/query context. |
| type item struct { |
| key *datastore.Key |
| data datastore.PropertyMap |
| buffered bool |
| |
| encKey string |
| |
| // cmpRow is used to hold the toComparableString value for this item during |
| // a query. |
| cmpRow string |
| |
| // err is a bit of a hack for passing back synchronized errors from |
| // queryToIter. |
| err error |
| } |
| |
| func (i *item) getEncKey() string { |
| if i.encKey == "" { |
| i.encKey = string(datastore.Serialize.ToBytes(i.key)) |
| } |
| return i.encKey |
| } |
| |
| func (i *item) getCmpRow(lower, upper []byte, order []datastore.IndexColumn) string { |
| if i.cmpRow == "" { |
| row, key := toComparableString(lower, upper, order, i.key, i.data) |
| i.cmpRow = string(row) |
| if i.encKey == "" { |
| i.encKey = string(key) |
| } |
| } |
| return i.cmpRow |
| } |
| |
| func (t *txnBufState) updateRootsLocked(roots stringset.Set) error { |
| curRootLen := t.roots.Len() |
| proposedRoots := stringset.New(1) |
| roots.Iter(func(root string) bool { |
| if !t.roots.Has(root) { |
| proposedRoots.Add(root) |
| } |
| return proposedRoots.Len()+curRootLen <= t.rootLimit |
| }) |
| if proposedRoots.Len()+curRootLen > t.rootLimit { |
| return ErrTooManyRoots |
| } |
| // only need to update the roots if they did something that required updating |
| if proposedRoots.Len() > 0 { |
| proposedRoots.Iter(func(root string) bool { |
| t.roots.Add(root) |
| return true |
| }) |
| } |
| return nil |
| } |
| |
| func (t *txnBufState) getMulti(keys []*datastore.Key, metas datastore.MultiMetaGetter, cb datastore.GetMultiCB, haveLock bool) error { |
| encKeys, roots := toEncoded(keys) |
| data := make([]item, len(keys)) |
| |
| idxMap := []int(nil) |
| toGetKeys := []*datastore.Key(nil) |
| |
| lme := errors.NewLazyMultiError(len(keys)) |
| err := func() error { |
| if !haveLock { |
| t.Lock() |
| defer t.Unlock() |
| } |
| |
| if err := t.updateRootsLocked(roots); err != nil { |
| return err |
| } |
| |
| for i, key := range keys { |
| data[i].key = key |
| data[i].encKey = encKeys[i] |
| if size, ok := t.entState.get(data[i].getEncKey()); ok { |
| data[i].buffered = true |
| if size > 0 { |
| idxMap = append(idxMap, i) |
| toGetKeys = append(toGetKeys, key) |
| } |
| } |
| } |
| |
| if len(toGetKeys) > 0 { |
| t.bufDS.GetMulti(toGetKeys, nil, func(j int, pm datastore.PropertyMap, err error) { |
| impossible(err) |
| data[idxMap[j]].data = pm |
| }) |
| } |
| |
| idxMap = nil |
| getKeys := []*datastore.Key(nil) |
| getMetas := datastore.MultiMetaGetter(nil) |
| |
| for i, itm := range data { |
| if !itm.buffered { |
| idxMap = append(idxMap, i) |
| getKeys = append(getKeys, itm.key) |
| getMetas = append(getMetas, metas.GetSingle(i)) |
| } |
| } |
| |
| if len(idxMap) > 0 { |
| err := t.parentDS.GetMulti(getKeys, getMetas, func(j int, pm datastore.PropertyMap, err error) { |
| if err != datastore.ErrNoSuchEntity { |
| i := idxMap[j] |
| if !lme.Assign(i, err) { |
| data[i].data = pm |
| } |
| } |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| }() |
| if err != nil { |
| return err |
| } |
| |
| for i, itm := range data { |
| err := lme.GetOne(i) |
| if err != nil { |
| cb(i, nil, err) |
| } else if itm.data == nil { |
| cb(i, nil, datastore.ErrNoSuchEntity) |
| } else { |
| cb(i, itm.data, nil) |
| } |
| } |
| return nil |
| } |
| |
| func (t *txnBufState) deleteMulti(keys []*datastore.Key, cb datastore.DeleteMultiCB, haveLock bool) error { |
| encKeys, roots := toEncoded(keys) |
| |
| err := func() error { |
| if !haveLock { |
| t.Lock() |
| defer t.Unlock() |
| } |
| |
| if err := t.updateRootsLocked(roots); err != nil { |
| return err |
| } |
| |
| err := t.bufDS.DeleteMulti(keys, func(i int, err error) { |
| impossible(err) |
| t.entState.set(encKeys[i], 0) |
| }) |
| impossible(err) |
| return nil |
| }() |
| if err != nil { |
| return err |
| } |
| |
| for i := range keys { |
| cb(i, nil) |
| } |
| |
| return nil |
| } |
| |
| func (t *txnBufState) fixKeys(keys []*datastore.Key) ([]*datastore.Key, error) { |
| // Identify any incomplete keys and allocate IDs for them. |
| // |
| // In order to facilitate this, we will maintain a mapping of the |
| // incompleteKeys index to the key's corresponding index in the keys array. |
| // Any errors or allocations on incompleteKeys operations will be propagated |
| // to the correct keys index using this map. |
| var ( |
| incompleteKeys []*datastore.Key |
| incompleteMap map[int]int |
| ) |
| |
| for i, key := range keys { |
| if key.IsIncomplete() { |
| if incompleteMap == nil { |
| incompleteMap = make(map[int]int) |
| } |
| incompleteMap[len(incompleteKeys)] = i |
| incompleteKeys = append(incompleteKeys, key) |
| } |
| } |
| if len(incompleteKeys) == 0 { |
| return keys, nil |
| } |
| |
| // We're going to update keys, so clone it. |
| keys, origKeys := make([]*datastore.Key, len(keys)), keys |
| copy(keys, origKeys) |
| |
| // Intentionally call AllocateIDs without lock. |
| outerErr := errors.NewLazyMultiError(len(keys)) |
| err := t.parentDS.AllocateIDs(incompleteKeys, func(i int, key *datastore.Key, err error) { |
| outerIdx := incompleteMap[i] |
| |
| if err != nil { |
| outerErr.Assign(outerIdx, err) |
| } else { |
| keys[outerIdx] = key |
| } |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return keys, outerErr.Get() |
| } |
| |
| func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyMap, cb datastore.NewKeyCB, haveLock bool) error { |
| keys, err := t.fixKeys(keys) |
| if err != nil { |
| for i, e := range err.(errors.MultiError) { |
| cb(i, nil, e) |
| } |
| return nil |
| } |
| |
| encKeys, roots := toEncoded(keys) |
| |
| err = func() error { |
| if !haveLock { |
| t.Lock() |
| defer t.Unlock() |
| } |
| |
| if err := t.updateRootsLocked(roots); err != nil { |
| return err |
| } |
| |
| err := t.bufDS.PutMulti(keys, vals, func(i int, k *datastore.Key, err error) { |
| impossible(err) |
| t.entState.set(encKeys[i], vals[i].EstimateSize()) |
| }) |
| impossible(err) |
| return nil |
| }() |
| if err != nil { |
| return err |
| } |
| |
| for i, k := range keys { |
| cb(i, k, nil) |
| } |
| return nil |
| } |
| |
| func commitToReal(s *txnBufState) error { |
| toPut, toPutKeys, toDel := s.effect() |
| |
| return parallel.FanOutIn(func(ch chan<- func() error) { |
| if len(toPut) > 0 { |
| ch <- func() error { |
| mErr := errors.NewLazyMultiError(len(toPut)) |
| err := s.parentDS.PutMulti(toPutKeys, toPut, func(i int, _ *datastore.Key, err error) { |
| mErr.Assign(i, err) |
| }) |
| if err == nil { |
| err = mErr.Get() |
| } |
| return err |
| } |
| } |
| if len(toDel) > 0 { |
| ch <- func() error { |
| mErr := errors.NewLazyMultiError(len(toDel)) |
| err := s.parentDS.DeleteMulti(toDel, func(i int, err error) { |
| mErr.Assign(i, err) |
| }) |
| if err == nil { |
| err = mErr.Get() |
| } |
| return err |
| } |
| } |
| }) |
| } |
| |
| func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel []*datastore.Key) { |
| // TODO(riannucci): preallocate return slices |
| |
| // need to pull all items out of the in-memory datastore. Fortunately we have |
| // kindless queries, and we disabled all the special entities, so just |
| // run a kindless query without any filters and it will return all data |
| // currently in bufDS :). |
| fq, err := datastore.NewQuery("").Finalize() |
| impossible(err) |
| |
| err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMap, _ datastore.CursorCB) error { |
| toPutKeys = append(toPutKeys, key) |
| toPut = append(toPut, data) |
| return nil |
| }) |
| memoryCorruption(err) |
| |
| for keyStr, size := range t.entState.keyToSize { |
| if size == 0 { |
| k, err := datastore.Deserializer{KeyContext: t.kc}.Key(bytes.NewBufferString(keyStr)) |
| memoryCorruption(err) |
| toDel = append(toDel, k) |
| } |
| } |
| |
| return |
| } |
| |
| func (t *txnBufState) canApplyLocked(s *txnBufState) error { |
| proposedState := t.entState.dup() |
| |
| for k, v := range s.entState.keyToSize { |
| proposedState.set(k, v) |
| } |
| switch { |
| case proposedState.numWrites() > t.writeCountBudget: |
| // The new net number of writes must be below the parent's write count |
| // cutoff. |
| fallthrough |
| |
| case proposedState.total > t.sizeBudget: |
| // Make sure our new calculated size is within the parent's size budget. |
| // |
| // We have: |
| // - proposedState.total: The "new world" total bytes were this child |
| // transaction committed to the parent. |
| // - t.sizeBudget: The maximum number of bytes that this parent can |
| // accommodate. |
| return ErrTransactionTooLarge |
| } |
| |
| return nil |
| } |
| |
| func (t *txnBufState) commitLocked(s *txnBufState) { |
| toPut, toPutKeys, toDel := s.effect() |
| |
| if len(toPut) > 0 { |
| impossible(t.putMulti(toPutKeys, toPut, |
| func(_ int, _ *datastore.Key, err error) { impossible(err) }, true)) |
| } |
| |
| if len(toDel) > 0 { |
| impossible(t.deleteMulti(toDel, |
| func(_ int, err error) { impossible(err) }, true)) |
| } |
| } |
| |
| // toEncoded returns a list of all of the serialized versions of these keys, |
| // plus a stringset of all the encoded root keys that `keys` represents. |
| func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |
| roots = stringset.New(len(keys)) |
| full = make([]string, len(keys)) |
| for i, k := range keys { |
| roots.Add(string(datastore.Serialize.ToBytes(k.Root()))) |
| full[i] = string(datastore.Serialize.ToBytes(k)) |
| } |
| return |
| } |