| // Copyright 2015 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bigtable |
| |
| import ( |
| "bytes" |
| "crypto/sha256" |
| "encoding/base64" |
| "encoding/hex" |
| "errors" |
| "strings" |
| "sync" |
| |
| "go.chromium.org/luci/common/data/cmpbin" |
| ) |
| |
| // rowKeyBufferPool stores a pool of allocated Buffer instances for reuse when |
| // constructing row keys. |
| var ( |
| // errMalformedRowKey is an error that is returned if the row key in the |
| // tables does not comform to our row key structure. |
| errMalformedRowKey = errors.New("bigtable: malformed row key") |
| |
| // encodedPrefixSize is the size in bytes of the encoded row key prefix. All |
| // rows from the same stream path share this prefix. |
| encodedPrefixSize = base64.URLEncoding.EncodedLen(sha256.Size) |
| // maxEncodedKeySize is the maximum size in bytes of a full row key. |
| maxEncodedKeySize = encodedPrefixSize + (2 * (len("~") + hex.EncodedLen(cmpbin.MaxIntLen64))) |
| |
| rowKeyBufferPool = sync.Pool{ |
| New: func() interface{} { |
| return &rowKeyBuffers{} |
| }, |
| } |
| ) |
| |
| type rowKeyBuffers struct { |
| // binBuf is a Buffer to write binary data for encoding. |
| binBuf bytes.Buffer |
| // key is where the encoded key get built. |
| key []byte |
| // size is the current number of bytes used in "key". |
| size int |
| } |
| |
| func withRowKeyBuffers(f func(rkb *rowKeyBuffers)) { |
| rkb := rowKeyBufferPool.Get().(*rowKeyBuffers) |
| defer rowKeyBufferPool.Put(rkb) |
| |
| rkb.reset() |
| f(rkb) |
| } |
| |
| func (rkb *rowKeyBuffers) reset() { |
| if rkb.key == nil { |
| rkb.key = make([]byte, maxEncodedKeySize) |
| } |
| rkb.size = 0 |
| } |
| |
| func (rkb *rowKeyBuffers) appendPathPrefix(pathHash []byte) { |
| base64.URLEncoding.Encode(rkb.remaining(), pathHash) |
| rkb.size += base64.URLEncoding.EncodedLen(len(pathHash)) |
| } |
| |
| func (rkb *rowKeyBuffers) appendInt64(i int64) { |
| // Encode index to "cmpbin". |
| rkb.binBuf.Reset() |
| cmpbin.WriteInt(&rkb.binBuf, i) |
| |
| rkb.size += hex.Encode(rkb.remaining(), rkb.binBuf.Bytes()) |
| } |
| |
| func (rkb *rowKeyBuffers) appendBytes(d []byte) { |
| rkb.size += copy(rkb.remaining(), d) |
| } |
| |
| func (rkb *rowKeyBuffers) remaining() []byte { |
| return rkb.key[rkb.size:] |
| } |
| |
| func (rkb *rowKeyBuffers) value() string { |
| return string(rkb.key[:rkb.size]) |
| } |
| |
| // rowKey is a BigTable row key. |
| // |
| // The row key is formed from a Path and its Index. The goal: |
| // - Rows with the same path should be clustered. |
| // - Rows with the same path should be sorted according to index. |
| // |
| // The row key index is the index of the LAST entry in the row. Therefore, a |
| // row for a given row key will span log indexes [index-count+1..index]. |
| // |
| // Since BigTable rows must be valid UTF8, and since paths are effectively |
| // unbounded, the row key will be formed by composing: |
| // |
| // [ base64(sha256(path)) ] + '~' + [ hex(cmpbin(index)) ] + '~' + |
| // [hex(cmpbin(count)] |
| type rowKey struct { |
| pathHash []byte |
| index int64 |
| count int64 |
| } |
| |
| // newRowKey generates the row key matching a given entry path and index. |
| func newRowKey(project, path string, index, count int64) *rowKey { |
| h := sha256.New() |
| |
| _, _ = h.Write([]byte(project)) |
| _, _ = h.Write([]byte("/")) |
| _, _ = h.Write([]byte(path)) |
| return &rowKey{ |
| pathHash: h.Sum(nil), |
| index: index, |
| count: count, |
| } |
| } |
| |
| // decodeRowKey decodes an encoded row key into its structural components. |
| func decodeRowKey(v string) (*rowKey, error) { |
| keyParts := strings.SplitN(v, "~", 3) |
| if len(keyParts) != 3 { |
| return nil, errMalformedRowKey |
| } |
| |
| hashEnc, idxEnc, countEnc := keyParts[0], keyParts[1], keyParts[2] |
| if base64.URLEncoding.DecodedLen(len(hashEnc)) < sha256.Size { |
| return nil, errMalformedRowKey |
| } |
| |
| // Decode encoded project/path hash. |
| var err error |
| rk := rowKey{} |
| rk.pathHash, err = base64.URLEncoding.DecodeString(hashEnc) |
| if err != nil { |
| return nil, errMalformedRowKey |
| } |
| |
| // Decode index. |
| rk.index, err = readHexInt64(idxEnc) |
| if err != nil { |
| return nil, err |
| } |
| |
| // If a count is available, decode that as well. |
| rk.count, err = readHexInt64(countEnc) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &rk, nil |
| } |
| |
| func (rk *rowKey) String() string { |
| return rk.encode() |
| } |
| |
| // newRowKey instantiates a new rowKey from its components. |
| func (rk *rowKey) encode() (v string) { |
| // Write the final key to "key": (base64(HASH)~hex(INDEX)) |
| withRowKeyBuffers(func(rkb *rowKeyBuffers) { |
| rkb.appendPathPrefix(rk.pathHash) |
| rkb.appendBytes([]byte("~")) |
| rkb.appendInt64(rk.index) |
| rkb.appendBytes([]byte("~")) |
| rkb.appendInt64(rk.count) |
| v = rkb.value() |
| }) |
| return |
| } |
| |
| // prefix returns the encoded path prefix for the row key, which is the hash of |
| // that row's project/path. |
| func (rk *rowKey) pathPrefix() (v string) { |
| withRowKeyBuffers(func(rkb *rowKeyBuffers) { |
| rkb.appendPathPrefix(rk.pathHash) |
| rkb.appendBytes([]byte("~")) |
| v = rkb.value() |
| }) |
| return |
| } |
| |
| // pathPrefixUpperBound returns the path prefix that is higher than any path |
| // allowed in the row key space. |
| // |
| // This is accomplished by appending a "~" character to the path prefix, |
| // creating something like this: |
| // |
| // prefix~~ |
| // |
| // The "prefix~" is shared with all keys in "rk", but the extra "~" is larger |
| // than any hex-encoded row index, so this key will always be larger. |
| func (rk *rowKey) pathPrefixUpperBound() (v string) { |
| withRowKeyBuffers(func(rkb *rowKeyBuffers) { |
| rkb.appendPathPrefix(rk.pathHash) |
| rkb.appendBytes([]byte("~~")) |
| v = rkb.value() |
| }) |
| return |
| } |
| |
| // firstIndex returns the first log entry index represented by this row key. |
| func (rk *rowKey) firstIndex() int64 { return rk.index - rk.count + 1 } |
| |
| // sharesPrefixWith tests if the "path" component of the row key "rk" matches |
| // the "path" component of "o". |
| func (rk *rowKey) sharesPathWith(o *rowKey) bool { |
| return bytes.Equal(rk.pathHash, o.pathHash) |
| } |
| |
| func readHexInt64(v string) (int64, error) { |
| d, err := hex.DecodeString(v) |
| if err != nil { |
| return 0, errMalformedRowKey |
| } |
| |
| dr := bytes.NewReader(d) |
| value, _, err := cmpbin.ReadInt(dr) |
| if err != nil { |
| return 0, errMalformedRowKey |
| } |
| |
| // There should be no more data. |
| if dr.Len() > 0 { |
| return 0, errMalformedRowKey |
| } |
| |
| return value, nil |
| } |