[dscache] Change Cache interface to reduce data copying.

Also catch serialization errors.

R=tandrii@chromium.org
BUG=1140747

Change-Id: I81843fa2b409279099ca2ce852e025985799bc93
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-go/+/2561723
Reviewed-by: Andrii Shyshkalov <tandrii@google.com>
Commit-Queue: Vadim Shtayura <vadimsh@chromium.org>
diff --git a/gae/filter/dscache/ds.go b/gae/filter/dscache/ds.go
index 51226c5..4195528 100644
--- a/gae/filter/dscache/ds.go
+++ b/gae/filter/dscache/ds.go
@@ -19,6 +19,7 @@
 	"time"
 
 	"go.chromium.org/luci/common/data/rand/mathrand"
+	"go.chromium.org/luci/common/errors"
 	"go.chromium.org/luci/common/logging"
 
 	ds "go.chromium.org/luci/gae/service/datastore"
@@ -70,51 +71,55 @@
 		var toCas []CacheItem
 		err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(j int, pm ds.PropertyMap, err error) {
 			i := p.idxMap[j]
-			toSave := p.toSave[j]
 
-			data := []byte(nil)
-
-			// true: save entity to memcache
-			// false: lock entity in memcache forever
-			shouldSave := true
 			if err == nil {
 				p.decoded[i] = pm
-				if toSave != nil {
-					data = encodeItemValue(pm)
-					if len(data) > internalValueSizeLimit {
-						shouldSave = false
-						logging.Warningf(
-							d.c, "dscache: encoded entity too big (%d/%d)!",
-							len(data), internalValueSizeLimit)
-					}
-				}
 			} else {
 				p.lme.Assign(i, err)
 				if err != ds.ErrNoSuchEntity {
 					return
 				}
+				pm = nil
 			}
 
-			if toSave != nil {
-				if shouldSave {
-					// The item was successfully encoded and should be able to fit into
-					// the cache.
-					mg := metas.GetSingle(i)
-					expSecs := ds.GetMetaDefault(mg, CacheExpirationMeta, int64(CacheDuration.Seconds())).(int64)
-					toSave.PromoteToData(data, time.Duration(expSecs)*time.Second)
+			toSave := p.toSave[j]
+			if toSave == nil {
+				return
+			}
+
+			expiry := time.Duration(ds.GetMetaDefault(
+				metas.GetSingle(i),
+				CacheExpirationMeta,
+				int64(CacheDuration.Seconds()),
+			).(int64)) * time.Second
+
+			if pm == nil {
+				// Missing entities are denoted by an empty data buffer.
+				toSave.PromoteToData(toSave.Prefix(), expiry)
+			} else {
+				// Serialize and compress the PropertyMap, bail if too large.
+				buf, err := encodeItemValue(pm, toSave.Prefix())
+				if err == nil && len(buf) > internalValueSizeLimit {
+					err = errors.Reason("encoded entity too big (%d > %d)", len(buf), internalValueSizeLimit).Err()
+				}
+				if err == nil {
+					// The item should be able to fit into the cache.
+					toSave.PromoteToData(buf, expiry)
 				} else {
-					// The item is most likely too big to be cached. Set a lock with an
-					// infinite timeout. No one else should try to serialize this item to
-					// memcache until something Put/Delete's it.
+					// The item is "broken". No one else should try to serialize this item
+					// until something Put/Delete's it. Set a lock on it.
+					logging.WithError(err).Warningf(d.c, "dscache: PropertyMap serialization error")
 					toSave.PromoteToIndefiniteLock()
 				}
-				toCas = append(toCas, toSave)
 			}
+			toCas = append(toCas, toSave)
 		})
+
 		if err != nil {
 			// TODO(vadimsh): Should we drop locks owned by us?
 			return err
 		}
+
 		if len(toCas) > 0 {
 			// Store stuff we fetched back into memcache unless someone (like
 			// a concurrent Put) deleted our locks already.
diff --git a/gae/filter/dscache/dscache.go b/gae/filter/dscache/dscache.go
index 2187895..8954409 100644
--- a/gae/filter/dscache/dscache.go
+++ b/gae/filter/dscache/dscache.go
@@ -92,8 +92,13 @@
 	// Data returns nil for lock items or an item's data for data items.
 	Data() []byte
 
+	// Prefix should be written to the data buffer passed to PromoteToData.
+	Prefix() []byte
+
 	// PromoteToData converts this lock item into a data item.
 	//
+	// `data` must start with whatever Prefix() returned.
+	//
 	// Panics if self is not a lock item.
 	PromoteToData(data []byte, exp time.Duration)
 
diff --git a/gae/filter/dscache/memcache.go b/gae/filter/dscache/memcache.go
index dfc44f8..cee947e 100644
--- a/gae/filter/dscache/memcache.go
+++ b/gae/filter/dscache/memcache.go
@@ -128,6 +128,10 @@
 	return nil
 }
 
+func (m memcacheItem) Prefix() []byte {
+	return nil
+}
+
 func (m memcacheItem) PromoteToData(data []byte, exp time.Duration) {
 	if m.item.Flags() != itemFlagHasLock {
 		panic("only locks should be promoted")
diff --git a/gae/filter/dscache/serialize.go b/gae/filter/dscache/serialize.go
index d60f127..d61eafb 100644
--- a/gae/filter/dscache/serialize.go
+++ b/gae/filter/dscache/serialize.go
@@ -29,25 +29,39 @@
 	compressionZlib
 )
 
-func encodeItemValue(pm ds.PropertyMap) []byte {
-	pm, _ = pm.Save(false)
-
-	buf := bytes.Buffer{}
-	// errs can't happen, since we're using a byte buffer.
-	_ = buf.WriteByte(byte(compressionNone))
-	_ = ds.Serialize.PropertyMap(&buf, pm)
-
-	data := buf.Bytes()
-	if buf.Len() > CompressionThreshold {
-		buf2 := bytes.NewBuffer(make([]byte, 0, len(data)))
-		_ = buf2.WriteByte(byte(compressionZlib))
-		writer := zlib.NewWriter(buf2)
-		_, _ = writer.Write(data[1:]) // skip the compressionNone byte
-		writer.Close()
-		data = buf2.Bytes()
+func encodeItemValue(pm ds.PropertyMap, pfx []byte) ([]byte, error) {
+	var err error
+	if pm, err = pm.Save(false); err != nil {
+		return nil, err
 	}
 
-	return data
+	// Try to write as uncompressed first. Capacity of 256 is picked arbitrarily.
+	// Most entities are pretty small.
+	buf := bytes.NewBuffer(make([]byte, 0, 256))
+	buf.Write(pfx)
+	buf.WriteByte(byte(compressionNone))
+	if err := ds.Serialize.PropertyMap(buf, pm); err != nil {
+		return nil, err
+	}
+
+	// If it is small enough, we are done.
+	if buf.Len() <= CompressionThreshold {
+		return buf.Bytes(), nil
+	}
+
+	// If too big, grab a new buffer and compress data there. Preallocate a new
+	// buffer assuming 2x compression.
+	data := buf.Bytes()[len(pfx)+1:] // skip pfx and compressionNone byte
+	buf2 := bytes.NewBuffer(make([]byte, 0, len(data)/2))
+
+	// Compress into the new buffer.
+	buf2.Write(pfx)
+	buf2.WriteByte(byte(compressionZlib))
+	writer := zlib.NewWriter(buf2)
+	writer.Write(data)
+	writer.Close()
+
+	return buf2.Bytes(), nil
 }
 
 func decodeItemValue(val []byte, kc ds.KeyContext) (ds.PropertyMap, error) {