| /* |
| * Copyright 2017 Dgraph Labs, Inc. and Contributors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package table |
| |
| import ( |
| "crypto/aes" |
| "encoding/binary" |
| "fmt" |
| "io" |
| "math" |
| "os" |
| "path" |
| "path/filepath" |
| "strconv" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "unsafe" |
| |
| "github.com/golang/protobuf/proto" |
| "github.com/golang/snappy" |
| "github.com/pkg/errors" |
| |
| "github.com/dgraph-io/badger/v2/options" |
| "github.com/dgraph-io/badger/v2/pb" |
| "github.com/dgraph-io/badger/v2/y" |
| "github.com/dgraph-io/ristretto" |
| "github.com/dgraph-io/ristretto/z" |
| ) |
| |
| const fileSuffix = ".sst" |
| const intSize = int(unsafe.Sizeof(int(0))) |
| |
| // 1 word = 8 bytes |
| // sizeOfOffsetStruct is the size of pb.BlockOffset |
| const sizeOfOffsetStruct int64 = 3*8 + // key array take 3 words |
| 1*8 + // offset and len takes 1 word |
| 3*8 + // XXX_unrecognized array takes 3 word. |
| 1*8 // so far 7 words, in order to round the slab we're adding one more word. |
| |
| // Options contains configurable options for Table/Builder. |
| type Options struct { |
| // Options for Opening/Building Table. |
| |
| // Maximum size of the table. |
| TableSize uint64 |
| |
| // ChkMode is the checksum verification mode for Table. |
| ChkMode options.ChecksumVerificationMode |
| |
| // LoadingMode is the mode to be used for loading Table. |
| LoadingMode options.FileLoadingMode |
| |
| // Options for Table builder. |
| |
| // BloomFalsePositive is the false positive probabiltiy of bloom filter. |
| BloomFalsePositive float64 |
| |
| // BlockSize is the size of each block inside SSTable in bytes. |
| BlockSize int |
| |
| // DataKey is the key used to decrypt the encrypted text. |
| DataKey *pb.DataKey |
| |
| // Compression indicates the compression algorithm used for block compression. |
| Compression options.CompressionType |
| |
| BlockCache *ristretto.Cache |
| IndexCache *ristretto.Cache |
| |
| // ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks. |
| ZSTDCompressionLevel int |
| |
| // When LoadBloomsOnOpen is set, bloom filters will be read only when they are accessed. |
| // Otherwise they will be loaded on table open. |
| LoadBloomsOnOpen bool |
| } |
| |
| // TableInterface is useful for testing. |
| type TableInterface interface { |
| Smallest() []byte |
| Biggest() []byte |
| DoesNotHave(hash uint64) bool |
| } |
| |
| // cachedIndex represent a cached index. |
| type cachedIndex struct { |
| blockOffset []*pb.BlockOffset |
| bf *z.Bloom |
| } |
| |
| // Table represents a loaded table file with the info we have about it. |
| type Table struct { |
| sync.Mutex |
| |
| fd *os.File // Own fd. |
| tableSize int // Initialized in OpenTable, using fd.Stat(). |
| |
| index cachedIndex |
| ref int32 // For file garbage collection. Atomic. |
| |
| mmap []byte // Memory mapped. |
| |
| // The following are initialized once and const. |
| smallest, biggest []byte // Smallest and largest keys (with timestamps). |
| id uint64 // file id, part of filename |
| |
| Checksum []byte |
| // Stores the total size of key-values stored in this table (including the size on vlog). |
| estimatedSize uint64 |
| indexStart int |
| indexLen int |
| hasBloomFilter bool |
| |
| IsInmemory bool // Set to true if the table is on level 0 and opened in memory. |
| opt *Options |
| |
| noOfBlocks int // Total number of blocks. |
| } |
| |
| // CompressionType returns the compression algorithm used for block compression. |
| func (t *Table) CompressionType() options.CompressionType { |
| return t.opt.Compression |
| } |
| |
| // IncrRef increments the refcount (having to do with whether the file should be deleted) |
| func (t *Table) IncrRef() { |
| atomic.AddInt32(&t.ref, 1) |
| } |
| |
| // DecrRef decrements the refcount and possibly deletes the table |
| func (t *Table) DecrRef() error { |
| newRef := atomic.AddInt32(&t.ref, -1) |
| if newRef == 0 { |
| // We can safely delete this file, because for all the current files, we always have |
| // at least one reference pointing to them. |
| |
| // It's necessary to delete windows files. |
| if t.opt.LoadingMode == options.MemoryMap { |
| if err := y.Munmap(t.mmap); err != nil { |
| return err |
| } |
| t.mmap = nil |
| } |
| // fd can be nil if the table belongs to L0 and it is opened in memory. See |
| // OpenTableInMemory method. |
| if t.fd == nil { |
| return nil |
| } |
| if err := t.fd.Truncate(0); err != nil { |
| // This is very important to let the FS know that the file is deleted. |
| return err |
| } |
| filename := t.fd.Name() |
| if err := t.fd.Close(); err != nil { |
| return err |
| } |
| if err := os.Remove(filename); err != nil { |
| return err |
| } |
| // Delete all blocks from the cache. |
| for i := 0; i < t.noOfBlocks; i++ { |
| t.opt.BlockCache.Del(t.blockCacheKey(i)) |
| } |
| // Delete bloom filter from the cache. |
| t.opt.IndexCache.Del(t.ID()) |
| |
| } |
| return nil |
| } |
| |
| // BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction. |
| func BlockEvictHandler(value interface{}) { |
| if b, ok := value.(*block); ok { |
| b.decrRef() |
| } |
| } |
| |
| type block struct { |
| offset int |
| data []byte |
| checksum []byte |
| entriesIndexStart int // start index of entryOffsets list |
| entryOffsets []uint32 // used to binary search an entry in the block. |
| chkLen int // checksum length. |
| freeMe bool // used to determine if the blocked should be reused. |
| ref int32 |
| } |
| |
| var NumBlocks int32 |
| |
| // incrRef increments the ref of a block and return a bool indicating if the |
| // increment was successful. A true value indicates that the block can be used. |
| func (b *block) incrRef() bool { |
| for { |
| // We can't blindly add 1 to ref. We need to check whether it has |
| // reached zero first, because if it did, then we should absolutely not |
| // use this block. |
| ref := atomic.LoadInt32(&b.ref) |
| // The ref would not be equal to 0 unless the existing |
| // block get evicted before this line. If the ref is zero, it means that |
| // the block is already added the the blockPool and cannot be used |
| // anymore. The ref of a new block is 1 so the following condition will |
| // be true only if the block got reused before we could increment its |
| // ref. |
| if ref == 0 { |
| return false |
| } |
| // Increment the ref only if it is not zero and has not changed between |
| // the time we read it and we're updating it. |
| // |
| if atomic.CompareAndSwapInt32(&b.ref, ref, ref+1) { |
| return true |
| } |
| } |
| } |
| func (b *block) decrRef() { |
| if b == nil { |
| return |
| } |
| |
| // Insert the []byte into pool only if the block is resuable. When a block |
| // is reusable a new []byte is used for decompression and this []byte can |
| // be reused. |
| // In case of an uncompressed block, the []byte is a reference to the |
| // table.mmap []byte slice. Any attempt to write data to the mmap []byte |
| // will lead to SEGFAULT. |
| if atomic.AddInt32(&b.ref, -1) == 0 { |
| if b.freeMe { |
| z.Free(b.data) |
| } |
| atomic.AddInt32(&NumBlocks, -1) |
| // blockPool.Put(&b.data) |
| } |
| y.AssertTrue(atomic.LoadInt32(&b.ref) >= 0) |
| } |
| func (b *block) size() int64 { |
| return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ + |
| cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4) |
| } |
| |
| func (b block) verifyCheckSum() error { |
| cs := &pb.Checksum{} |
| if err := proto.Unmarshal(b.checksum, cs); err != nil { |
| return y.Wrapf(err, "unable to unmarshal checksum for block") |
| } |
| return y.VerifyChecksum(b.data, cs) |
| } |
| |
| // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function |
| // entry. Returns a table with one reference count on it (decrementing which may delete the file! |
| // -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before |
| // deleting. Checksum for all blocks of table is verified based on value of chkMode. |
| func OpenTable(fd *os.File, opts Options) (*Table, error) { |
| // BlockSize is used to compute the approximate size of the decompressed |
| // block. It should not be zero if the table is compressed. |
| if opts.BlockSize == 0 && opts.Compression != options.None { |
| return nil, errors.New("Block size cannot be zero") |
| } |
| fileInfo, err := fd.Stat() |
| if err != nil { |
| // It's OK to ignore fd.Close() errs in this function because we have only read |
| // from the file. |
| _ = fd.Close() |
| return nil, y.Wrap(err) |
| } |
| |
| filename := fileInfo.Name() |
| id, ok := ParseFileID(filename) |
| if !ok { |
| _ = fd.Close() |
| return nil, errors.Errorf("Invalid filename: %s", filename) |
| } |
| t := &Table{ |
| fd: fd, |
| ref: 1, // Caller is given one reference. |
| id: id, |
| opt: &opts, |
| IsInmemory: false, |
| } |
| |
| t.tableSize = int(fileInfo.Size()) |
| |
| switch opts.LoadingMode { |
| case options.LoadToRAM: |
| if _, err := t.fd.Seek(0, io.SeekStart); err != nil { |
| return nil, err |
| } |
| t.mmap = make([]byte, t.tableSize) |
| n, err := t.fd.Read(t.mmap) |
| if err != nil { |
| // It's OK to ignore fd.Close() error because we have only read from the file. |
| _ = t.fd.Close() |
| return nil, y.Wrapf(err, "Failed to load file into RAM") |
| } |
| if n != t.tableSize { |
| return nil, errors.Errorf("Failed to read all bytes from the file."+ |
| "Bytes in file: %d Bytes actually Read: %d", t.tableSize, n) |
| } |
| case options.MemoryMap: |
| t.mmap, err = y.Mmap(fd, false, fileInfo.Size()) |
| if err != nil { |
| _ = fd.Close() |
| return nil, y.Wrapf(err, "Unable to map file: %q", fileInfo.Name()) |
| } |
| case options.FileIO: |
| t.mmap = nil |
| default: |
| panic(fmt.Sprintf("Invalid loading mode: %v", opts.LoadingMode)) |
| } |
| |
| if err := t.initBiggestAndSmallest(); err != nil { |
| return nil, errors.Wrapf(err, "failed to initialize table") |
| } |
| |
| if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead { |
| if err := t.VerifyChecksum(); err != nil { |
| _ = fd.Close() |
| return nil, errors.Wrapf(err, "failed to verify checksum") |
| } |
| } |
| |
| return t, nil |
| } |
| |
| // OpenInMemoryTable is similar to OpenTable but it opens a new table from the provided data. |
| // OpenInMemoryTable is used for L0 tables. |
| func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) { |
| opt.LoadingMode = options.LoadToRAM |
| t := &Table{ |
| ref: 1, // Caller is given one reference. |
| opt: opt, |
| mmap: data, |
| tableSize: len(data), |
| IsInmemory: true, |
| id: id, // It is important that each table gets a unique ID. |
| } |
| |
| if err := t.initBiggestAndSmallest(); err != nil { |
| return nil, err |
| } |
| return t, nil |
| } |
| |
| func (t *Table) initBiggestAndSmallest() error { |
| var err error |
| var ko *pb.BlockOffset |
| if ko, err = t.initIndex(); err != nil { |
| return errors.Wrapf(err, "failed to read index.") |
| } |
| |
| t.smallest = ko.Key |
| |
| it2 := t.NewIterator(REVERSED | NOCACHE) |
| defer it2.Close() |
| it2.Rewind() |
| if !it2.Valid() { |
| return errors.Wrapf(it2.err, "failed to initialize biggest for table %s", t.Filename()) |
| } |
| t.biggest = it2.Key() |
| return nil |
| } |
| |
| // Close closes the open table. (Releases resources back to the OS.) |
| func (t *Table) Close() error { |
| if t.opt.LoadingMode == options.MemoryMap { |
| if err := y.Munmap(t.mmap); err != nil { |
| return err |
| } |
| t.mmap = nil |
| } |
| if t.fd == nil { |
| return nil |
| } |
| return t.fd.Close() |
| } |
| |
| func (t *Table) read(off, sz int) ([]byte, error) { |
| if len(t.mmap) > 0 { |
| if len(t.mmap[off:]) < sz { |
| return nil, y.ErrEOF |
| } |
| return t.mmap[off : off+sz], nil |
| } |
| |
| res := make([]byte, sz) |
| nbr, err := t.fd.ReadAt(res, int64(off)) |
| y.NumReads.Add(1) |
| y.NumBytesRead.Add(int64(nbr)) |
| return res, err |
| } |
| |
| func (t *Table) readNoFail(off, sz int) []byte { |
| res, err := t.read(off, sz) |
| y.Check(err) |
| return res |
| } |
| |
| // initIndex reads the index and populate the necessary table fields and returns |
| // first block offset |
| func (t *Table) initIndex() (*pb.BlockOffset, error) { |
| readPos := t.tableSize |
| |
| // Read checksum len from the last 4 bytes. |
| readPos -= 4 |
| buf := t.readNoFail(readPos, 4) |
| checksumLen := int(y.BytesToU32(buf)) |
| if checksumLen < 0 { |
| return nil, errors.New("checksum length less than zero. Data corrupted") |
| } |
| |
| // Read checksum. |
| expectedChk := &pb.Checksum{} |
| readPos -= checksumLen |
| buf = t.readNoFail(readPos, checksumLen) |
| if err := proto.Unmarshal(buf, expectedChk); err != nil { |
| return nil, err |
| } |
| |
| // Read index size from the footer. |
| readPos -= 4 |
| buf = t.readNoFail(readPos, 4) |
| t.indexLen = int(y.BytesToU32(buf)) |
| |
| // Read index. |
| readPos -= t.indexLen |
| t.indexStart = readPos |
| data := t.readNoFail(readPos, t.indexLen) |
| |
| if err := y.VerifyChecksum(data, expectedChk); err != nil { |
| return nil, y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename()) |
| } |
| |
| index, err := t.readTableIndex() |
| if err != nil { |
| return nil, err |
| } |
| |
| t.estimatedSize = index.EstimatedSize |
| t.hasBloomFilter = len(index.BloomFilter) > 0 |
| t.noOfBlocks = len(index.Offsets) |
| |
| // No cache |
| if t.opt.IndexCache == nil { |
| // Keep blooms in memory. |
| if t.hasBloomFilter { |
| bf, err := z.JSONUnmarshal(index.BloomFilter) |
| y.Check(err) |
| |
| t.index.bf = bf |
| } |
| // Keep block offsets in memory. |
| t.index.blockOffset = index.Offsets |
| } |
| |
| // We don't need to put anything in the indexCache here. Table.Open will |
| // create an iterator and that iterator will push the indices in cache. |
| return index.Offsets[0], nil |
| } |
| |
| // blockOffsets returns block offsets of this table. |
| func (t *Table) blockOffsets() []*pb.BlockOffset { |
| if t.opt.IndexCache == nil { |
| return t.index.blockOffset |
| } |
| |
| if val, ok := t.opt.IndexCache.Get(t.ID()); ok && val != nil { |
| idx := val.(*cachedIndex) |
| return idx.blockOffset |
| } |
| |
| offsets, _ := t.readAndSetIndex() |
| return offsets |
| } |
| |
| // calculateOffsetsSize returns the size of *pb.BlockOffset array |
| func calculateOffsetsSize(offsets []*pb.BlockOffset) int64 { |
| totalSize := sizeOfOffsetStruct * int64(len(offsets)) |
| |
| for _, ko := range offsets { |
| // add key size. |
| totalSize += int64(cap(ko.Key)) |
| // add XXX_unrecognized size. |
| totalSize += int64(cap(ko.XXX_unrecognized)) |
| } |
| // Add three words for array size. |
| return totalSize + 3*8 |
| } |
| |
| // block function return a new block. Each block holds a ref and the byte |
| // slice stored in the block will be reused when the ref becomes zero. The |
| // caller should release the block by calling block.decrRef() on it. |
| func (t *Table) block(idx int, useCache bool) (*block, error) { |
| y.AssertTruef(idx >= 0, "idx=%d", idx) |
| if idx >= t.noOfBlocks { |
| return nil, errors.New("block out of index") |
| } |
| if t.opt.BlockCache != nil { |
| key := t.blockCacheKey(idx) |
| blk, ok := t.opt.BlockCache.Get(key) |
| if ok && blk != nil { |
| // Use the block only if the increment was successful. The block |
| // could get evicted from the cache between the Get() call and the |
| // incrRef() call. |
| if b := blk.(*block); b.incrRef() { |
| return b, nil |
| } |
| } |
| } |
| |
| // Read the block index if it's nil |
| ko := t.blockOffsets()[idx] |
| blk := &block{ |
| offset: int(ko.Offset), |
| ref: 1, |
| } |
| defer blk.decrRef() // Deal with any errors, where blk would not be returned. |
| atomic.AddInt32(&NumBlocks, 1) |
| |
| var err error |
| if blk.data, err = t.read(blk.offset, int(ko.Len)); err != nil { |
| return nil, errors.Wrapf(err, |
| "failed to read from file: %s at offset: %d, len: %d", t.fd.Name(), blk.offset, ko.Len) |
| } |
| |
| if t.shouldDecrypt() { |
| // Decrypt the block if it is encrypted. |
| if blk.data, err = t.decrypt(blk.data); err != nil { |
| return nil, err |
| } |
| } |
| |
| if err = t.decompress(blk); err != nil { |
| return nil, errors.Wrapf(err, |
| "failed to decode compressed data in file: %s at offset: %d, len: %d", |
| t.fd.Name(), blk.offset, ko.Len) |
| } |
| |
| // Read meta data related to block. |
| readPos := len(blk.data) - 4 // First read checksum length. |
| blk.chkLen = int(y.BytesToU32(blk.data[readPos : readPos+4])) |
| |
| // Checksum length greater than block size could happen if the table was compressed and |
| // it was opened with an incorrect compression algorithm (or the data was corrupted). |
| if blk.chkLen > len(blk.data) { |
| return nil, errors.New("invalid checksum length. Either the data is " + |
| "corrupted or the table options are incorrectly set") |
| } |
| |
| // Read checksum and store it |
| readPos -= blk.chkLen |
| blk.checksum = blk.data[readPos : readPos+blk.chkLen] |
| // Move back and read numEntries in the block. |
| readPos -= 4 |
| numEntries := int(y.BytesToU32(blk.data[readPos : readPos+4])) |
| entriesIndexStart := readPos - (numEntries * 4) |
| entriesIndexEnd := entriesIndexStart + numEntries*4 |
| |
| blk.entryOffsets = y.BytesToU32Slice(blk.data[entriesIndexStart:entriesIndexEnd]) |
| |
| blk.entriesIndexStart = entriesIndexStart |
| |
| // Drop checksum and checksum length. |
| // The checksum is calculated for actual data + entry index + index length |
| blk.data = blk.data[:readPos+4] |
| |
| // Verify checksum on if checksum verification mode is OnRead on OnStartAndRead. |
| if t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead { |
| if err = blk.verifyCheckSum(); err != nil { |
| return nil, err |
| } |
| } |
| |
| blk.incrRef() |
| if useCache && t.opt.BlockCache != nil { |
| key := t.blockCacheKey(idx) |
| // incrRef should never return false here because we're calling it on a |
| // new block with ref=1. |
| y.AssertTrue(blk.incrRef()) |
| |
| // Decrement the block ref if we could not insert it in the cache. |
| if !t.opt.BlockCache.Set(key, blk, blk.size()) { |
| blk.decrRef() |
| } |
| // We have added an OnReject func in our cache, which gets called in case the block is not |
| // admitted to the cache. So, every block would be accounted for. |
| } |
| return blk, nil |
| } |
| |
| func (t *Table) blockCacheKey(idx int) []byte { |
| y.AssertTrue(t.id < math.MaxUint32) |
| y.AssertTrue(uint32(idx) < math.MaxUint32) |
| |
| buf := make([]byte, 8) |
| // Assume t.ID does not overflow uint32. |
| binary.BigEndian.PutUint32(buf[:4], uint32(t.ID())) |
| binary.BigEndian.PutUint32(buf[4:], uint32(idx)) |
| return buf |
| } |
| |
| // EstimatedSize returns the total size of key-values stored in this table (including the |
| // disk space occupied on the value log). |
| func (t *Table) EstimatedSize() uint64 { return t.estimatedSize } |
| |
| // Size is its file size in bytes |
| func (t *Table) Size() int64 { return int64(t.tableSize) } |
| |
| // Smallest is its smallest key, or nil if there are none |
| func (t *Table) Smallest() []byte { return t.smallest } |
| |
| // Biggest is its biggest key, or nil if there are none |
| func (t *Table) Biggest() []byte { return t.biggest } |
| |
| // Filename is NOT the file name. Just kidding, it is. |
| func (t *Table) Filename() string { return t.fd.Name() } |
| |
| // ID is the table's ID number (used to make the file name). |
| func (t *Table) ID() uint64 { return t.id } |
| |
| // DoesNotHave returns true if and only if the table does not have the key hash. |
| // It does a bloom filter lookup. |
| func (t *Table) DoesNotHave(hash uint64) bool { |
| if !t.hasBloomFilter { |
| return false |
| } |
| |
| // Return fast if the cache is absent. |
| if t.opt.IndexCache == nil { |
| // TODO check why did we have a lock here. |
| return !t.index.bf.Has(hash) |
| } |
| |
| // Check if the index exists in the cache. |
| if idx, ok := t.opt.IndexCache.Get(t.ID()); idx != nil && ok { |
| index := idx.(*cachedIndex) |
| return !index.bf.Has(hash) |
| } |
| |
| // We couldn't find the index in cache. Read index and bloom filter and |
| // push both of them. |
| _, bf := t.readAndSetIndex() |
| return !bf.Has(hash) |
| } |
| |
| // readAndSetIndex reads table index from the sst and returns its pb format. |
| func (t *Table) readAndSetIndex() ([]*pb.BlockOffset, *z.Bloom) { |
| index, err := t.readTableIndex() |
| y.Check(err) |
| |
| bf, err := z.JSONUnmarshal(index.BloomFilter) |
| y.Check(err) |
| |
| t.opt.IndexCache.Set(t.ID(), &cachedIndex{ |
| blockOffset: index.Offsets, |
| bf: bf, |
| }, calculateOffsetsSize(index.Offsets)) |
| return index.Offsets, bf |
| } |
| |
| // readTableIndex reads table index from the sst and returns its pb format. |
| func (t *Table) readTableIndex() (*pb.TableIndex, error) { |
| data := t.readNoFail(t.indexStart, t.indexLen) |
| index := pb.TableIndex{} |
| var err error |
| // Decrypt the table index if it is encrypted. |
| if t.shouldDecrypt() { |
| if data, err = t.decrypt(data); err != nil { |
| return nil, y.Wrapf(err, |
| "Error while decrypting table index for the table %d in readTableIndex", t.id) |
| } |
| } |
| y.Check(proto.Unmarshal(data, &index)) |
| return &index, nil |
| } |
| |
| // VerifyChecksum verifies checksum for all blocks of table. This function is called by |
| // OpenTable() function. This function is also called inside levelsController.VerifyChecksum(). |
| func (t *Table) VerifyChecksum() error { |
| for i, os := range t.blockOffsets() { |
| b, err := t.block(i, true) |
| if err != nil { |
| return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d", |
| t.Filename(), i, os.Offset) |
| } |
| // We should not call incrRef here, because the block already has one ref when created. |
| defer b.decrRef() |
| // OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum |
| // on block, verification would be done while reading block itself. |
| if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) { |
| if err = b.verifyCheckSum(); err != nil { |
| return y.Wrapf(err, |
| "checksum validation failed for table: %s, block: %d, offset:%d", |
| t.Filename(), i, os.Offset) |
| } |
| } |
| } |
| return nil |
| } |
| |
| // shouldDecrypt tells whether to decrypt or not. We decrypt only if the datakey exist |
| // for the table. |
| func (t *Table) shouldDecrypt() bool { |
| return t.opt.DataKey != nil |
| } |
| |
| // KeyID returns data key id. |
| func (t *Table) KeyID() uint64 { |
| if t.opt.DataKey != nil { |
| return t.opt.DataKey.KeyId |
| } |
| // By default it's 0, if it is plain text. |
| return 0 |
| } |
| |
| // decrypt decrypts the given data. It should be called only after checking shouldDecrypt. |
| func (t *Table) decrypt(data []byte) ([]byte, error) { |
| // Last BlockSize bytes of the data is the IV. |
| iv := data[len(data)-aes.BlockSize:] |
| // Rest all bytes are data. |
| data = data[:len(data)-aes.BlockSize] |
| |
| // TODO: Check if this is done via Calloc. Otherwise, we'll have a memory leak. |
| dst := make([]byte, len(data)) |
| if err := y.XORBlock(dst, data, t.opt.DataKey.Data, iv); err != nil { |
| return nil, errors.Wrapf(err, "while decrypt") |
| } |
| return dst, nil |
| } |
| |
| // ParseFileID reads the file id out of a filename. |
| func ParseFileID(name string) (uint64, bool) { |
| name = path.Base(name) |
| if !strings.HasSuffix(name, fileSuffix) { |
| return 0, false |
| } |
| // suffix := name[len(fileSuffix):] |
| name = strings.TrimSuffix(name, fileSuffix) |
| id, err := strconv.Atoi(name) |
| if err != nil { |
| return 0, false |
| } |
| y.AssertTrue(id >= 0) |
| return uint64(id), true |
| } |
| |
| // IDToFilename does the inverse of ParseFileID |
| func IDToFilename(id uint64) string { |
| return fmt.Sprintf("%06d", id) + fileSuffix |
| } |
| |
| // NewFilename should be named TableFilepath -- it combines the dir with the ID to make a table |
| // filepath. |
| func NewFilename(id uint64, dir string) string { |
| return filepath.Join(dir, IDToFilename(id)) |
| } |
| |
| // decompress decompresses the data stored in a block. |
| func (t *Table) decompress(b *block) error { |
| var dst []byte |
| var err error |
| |
| switch t.opt.Compression { |
| case options.None: |
| // Nothing to be done here. |
| return nil |
| case options.Snappy: |
| if sz, err := snappy.DecodedLen(b.data); err == nil { |
| dst = z.Calloc(sz) |
| } else { |
| dst = z.Calloc(len(b.data) * 4) // Take a guess. |
| } |
| b.data, err = snappy.Decode(dst, b.data) |
| if err != nil { |
| z.Free(dst) |
| return errors.Wrap(err, "failed to decompress") |
| } |
| case options.ZSTD: |
| sz := int(float64(t.opt.BlockSize) * 1.2) |
| dst = z.Calloc(sz) |
| b.data, err = y.ZSTDDecompress(dst, b.data) |
| if err != nil { |
| z.Free(dst) |
| return errors.Wrap(err, "failed to decompress") |
| } |
| default: |
| return errors.New("Unsupported compression type") |
| } |
| |
| if len(b.data) > 0 && len(dst) > 0 && &dst[0] != &b.data[0] { |
| z.Free(dst) |
| } else { |
| b.freeMe = true |
| } |
| return nil |
| } |