blob: 8df7b0a423480b3de93dbfeed4c01c8221623007 [file] [log] [blame]
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"bufio"
"bytes"
"crypto/aes"
cryptorand "crypto/rand"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/ristretto/z"
"github.com/pkg/errors"
)
// memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
// both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
// its pre-crash form.
type memTable struct {
// TODO: Give skiplist z.Calloc'd []byte.
sl *skl.Skiplist
wal *logFile
maxVersion uint64
opt Options
buf *bytes.Buffer
}
func (db *DB) openMemTables(opt Options) error {
// We don't need to open any tables in in-memory mode.
if db.opt.InMemory {
return nil
}
files, err := ioutil.ReadDir(db.opt.Dir)
if err != nil {
return errFile(err, db.opt.Dir, "Unable to open mem dir.")
}
var fids []int
for _, file := range files {
if !strings.HasSuffix(file.Name(), memFileExt) {
continue
}
fsz := len(file.Name())
fid, err := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)
if err != nil {
return errFile(err, file.Name(), "Unable to parse log id.")
}
fids = append(fids, int(fid))
}
// Sort in ascending order.
sort.Slice(fids, func(i, j int) bool {
return fids[i] < fids[j]
})
for _, fid := range fids {
flags := os.O_RDWR
if db.opt.ReadOnly {
flags = os.O_RDONLY
}
mt, err := db.openMemTable(fid, flags)
if err != nil {
return y.Wrapf(err, "while opening fid: %d", fid)
}
// If this memtable is empty we don't need to add it. This is a
// memtable that was completely truncated.
if mt.sl.Empty() {
mt.DecrRef()
continue
}
// These should no longer be written to. So, make them part of the imm.
db.imm = append(db.imm, mt)
}
if len(fids) != 0 {
db.nextMemFid = fids[len(fids)-1]
}
db.nextMemFid++
return nil
}
const memFileExt string = ".mem"
func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
filepath := db.mtFilePath(fid)
s := skl.NewSkiplist(arenaSize(db.opt))
mt := &memTable{
sl: s,
opt: db.opt,
buf: &bytes.Buffer{},
}
// We don't need to create the wal for the skiplist in in-memory mode so return the mt.
if db.opt.InMemory {
return mt, z.NewFile
}
mt.wal = &logFile{
fid: uint32(fid),
path: filepath,
registry: db.registry,
writeAt: vlogHeaderSize,
opt: db.opt,
}
lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
if lerr != z.NewFile && lerr != nil {
return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
}
// Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
// when it gets flushed to L0.
s.OnClose = func() {
if err := mt.wal.Delete(); err != nil {
db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
}
}
if lerr == z.NewFile {
return mt, lerr
}
err := mt.UpdateSkipList()
return mt, y.Wrapf(err, "while updating skiplist")
}
var errExpectingNewFile = errors.New("Expecting to create a new file, but found an existing file")
func (db *DB) newMemTable() (*memTable, error) {
mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
if err == z.NewFile {
db.nextMemFid++
return mt, nil
}
if err != nil {
db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
return nil, y.Wrapf(err, "newMemTable")
}
return nil, errors.Errorf("File %s already exists", mt.wal.Fd.Name())
}
func (db *DB) mtFilePath(fid int) string {
return filepath.Join(db.opt.Dir, fmt.Sprintf("%05d%s", fid, memFileExt))
}
func (mt *memTable) SyncWAL() error {
return mt.wal.Sync()
}
func (mt *memTable) isFull() bool {
if mt.sl.MemSize() >= mt.opt.MemTableSize {
return true
}
if mt.opt.InMemory {
// InMemory mode doesn't have any WAL.
return false
}
return int64(mt.wal.writeAt) >= mt.opt.MemTableSize
}
func (mt *memTable) Put(key []byte, value y.ValueStruct) error {
entry := &Entry{
Key: key,
Value: value.Value,
UserMeta: value.UserMeta,
meta: value.Meta,
ExpiresAt: value.ExpiresAt,
}
// wal is nil only when badger in running in in-memory mode and we don't need the wal.
if mt.wal != nil {
// If WAL exceeds opt.ValueLogFileSize, we'll force flush the memTable. See logic in
// ensureRoomForWrite.
if err := mt.wal.writeEntry(mt.buf, entry, mt.opt); err != nil {
return y.Wrapf(err, "cannot write entry to WAL file")
}
}
// We insert the finish marker in the WAL but not in the memtable.
if entry.meta&bitFinTxn > 0 {
return nil
}
// Write to skiplist and update maxVersion encountered.
mt.sl.Put(key, value)
if ts := y.ParseTs(entry.Key); ts > mt.maxVersion {
mt.maxVersion = ts
}
return nil
}
func (mt *memTable) UpdateSkipList() error {
if mt.wal == nil || mt.sl == nil {
return nil
}
endOff, err := mt.wal.iterate(true, 0, mt.replayFunction(mt.opt))
if err != nil {
return y.Wrapf(err, "while iterating wal: %s", mt.wal.Fd.Name())
}
if endOff < mt.wal.size && mt.opt.ReadOnly {
return y.Wrapf(ErrTruncateNeeded, "end offset: %d < size: %d", endOff, mt.wal.size)
}
return mt.wal.Truncate(int64(endOff))
}
// IncrRef increases the refcount
func (mt *memTable) IncrRef() {
mt.sl.IncrRef()
}
// DecrRef decrements the refcount, deallocating the Skiplist when done using it
func (mt *memTable) DecrRef() {
mt.sl.DecrRef()
}
func (mt *memTable) replayFunction(opt Options) func(Entry, valuePointer) error {
first := true
return func(e Entry, _ valuePointer) error { // Function for replaying.
if first {
opt.Debugf("First key=%q\n", e.Key)
}
first = false
if ts := y.ParseTs(e.Key); ts > mt.maxVersion {
mt.maxVersion = ts
}
v := y.ValueStruct{
Value: e.Value,
Meta: e.meta,
UserMeta: e.UserMeta,
ExpiresAt: e.ExpiresAt,
}
// This is already encoded correctly. Value would be either a vptr, or a full value
// depending upon how big the original value was. Skiplist makes a copy of the key and
// value.
mt.sl.Put(e.Key, v)
return nil
}
}
type logFile struct {
*z.MmapFile
path string
// This is a lock on the log file. It guards the fd’s value, the file’s
// existence and the file’s memory map.
//
// Use shared ownership when reading/writing the file or memory map, use
// exclusive ownership to open/close the descriptor, unmap or remove the file.
lock sync.RWMutex
fid uint32
size uint32
dataKey *pb.DataKey
baseIV []byte
registry *KeyRegistry
writeAt uint32
opt Options
}
func (lf *logFile) Truncate(end int64) error {
if fi, err := lf.Fd.Stat(); err != nil {
return fmt.Errorf("while file.stat on file: %s, error: %v\n", lf.Fd.Name(), err)
} else if fi.Size() == end {
return nil
}
y.AssertTrue(!lf.opt.ReadOnly)
lf.size = uint32(end)
return lf.MmapFile.Truncate(end)
}
// encodeEntry will encode entry to the buf
// layout of entry
// +--------+-----+-------+-------+
// | header | key | value | crc32 |
// +--------+-----+-------+-------+
func (lf *logFile) encodeEntry(buf *bytes.Buffer, e *Entry, offset uint32) (int, error) {
h := header{
klen: uint32(len(e.Key)),
vlen: uint32(len(e.Value)),
expiresAt: e.ExpiresAt,
meta: e.meta,
userMeta: e.UserMeta,
}
hash := crc32.New(y.CastagnoliCrcTable)
writer := io.MultiWriter(buf, hash)
// encode header.
var headerEnc [maxHeaderSize]byte
sz := h.Encode(headerEnc[:])
y.Check2(writer.Write(headerEnc[:sz]))
// we'll encrypt only key and value.
if lf.encryptionEnabled() {
// TODO: no need to allocate the bytes. we can calculate the encrypted buf one by one
// since we're using ctr mode of AES encryption. Ordering won't changed. Need some
// refactoring in XORBlock which will work like stream cipher.
eBuf := make([]byte, 0, len(e.Key)+len(e.Value))
eBuf = append(eBuf, e.Key...)
eBuf = append(eBuf, e.Value...)
if err := y.XORBlockStream(
writer, eBuf, lf.dataKey.Data, lf.generateIV(offset)); err != nil {
return 0, y.Wrapf(err, "Error while encoding entry for vlog.")
}
} else {
// Encryption is disabled so writing directly to the buffer.
y.Check2(writer.Write(e.Key))
y.Check2(writer.Write(e.Value))
}
// write crc32 hash.
var crcBuf [crc32.Size]byte
binary.BigEndian.PutUint32(crcBuf[:], hash.Sum32())
y.Check2(buf.Write(crcBuf[:]))
// return encoded length.
return len(headerEnc[:sz]) + len(e.Key) + len(e.Value) + len(crcBuf), nil
}
func (lf *logFile) writeEntry(buf *bytes.Buffer, e *Entry, opt Options) error {
buf.Reset()
plen, err := lf.encodeEntry(buf, e, lf.writeAt)
if err != nil {
return err
}
y.AssertTrue(plen == copy(lf.Data[lf.writeAt:], buf.Bytes()))
lf.writeAt += uint32(plen)
lf.zeroNextEntry()
return nil
}
func (lf *logFile) decodeEntry(buf []byte, offset uint32) (*Entry, error) {
var h header
hlen := h.Decode(buf)
kv := buf[hlen:]
if lf.encryptionEnabled() {
var err error
// No need to worry about mmap. because, XORBlock allocates a byte array to do the
// xor. So, the given slice is not being mutated.
if kv, err = lf.decryptKV(kv, offset); err != nil {
return nil, err
}
}
e := &Entry{
meta: h.meta,
UserMeta: h.userMeta,
ExpiresAt: h.expiresAt,
offset: offset,
Key: kv[:h.klen],
Value: kv[h.klen : h.klen+h.vlen],
}
return e, nil
}
func (lf *logFile) decryptKV(buf []byte, offset uint32) ([]byte, error) {
return y.XORBlockAllocate(buf, lf.dataKey.Data, lf.generateIV(offset))
}
// KeyID returns datakey's ID.
func (lf *logFile) keyID() uint64 {
if lf.dataKey == nil {
// If there is no datakey, then we'll return 0. Which means no encryption.
return 0
}
return lf.dataKey.KeyId
}
func (lf *logFile) encryptionEnabled() bool {
return lf.dataKey != nil
}
// Acquire lock on mmap/file if you are calling this
func (lf *logFile) read(p valuePointer, s *y.Slice) (buf []byte, err error) {
var nbr int64
offset := p.Offset
// Do not convert size to uint32, because the lf.Data can be of size
// 4GB, which overflows the uint32 during conversion to make the size 0,
// causing the read to fail with ErrEOF. See issue #585.
size := int64(len(lf.Data))
valsz := p.Len
lfsz := atomic.LoadUint32(&lf.size)
if int64(offset) >= size || int64(offset+valsz) > size ||
// Ensure that the read is within the file's actual size. It might be possible that
// the offset+valsz length is beyond the file's actual size. This could happen when
// dropAll and iterations are running simultaneously.
int64(offset+valsz) > int64(lfsz) {
err = y.ErrEOF
} else {
buf = lf.Data[offset : offset+valsz]
nbr = int64(valsz)
}
y.NumReads.Add(1)
y.NumBytesRead.Add(nbr)
return buf, err
}
// generateIV will generate IV by appending given offset with the base IV.
func (lf *logFile) generateIV(offset uint32) []byte {
iv := make([]byte, aes.BlockSize)
// baseIV is of 12 bytes.
y.AssertTrue(12 == copy(iv[:12], lf.baseIV))
// remaining 4 bytes is obtained from offset.
binary.BigEndian.PutUint32(iv[12:], offset)
return iv
}
func (lf *logFile) doneWriting(offset uint32) error {
if lf.opt.SyncWrites {
if err := lf.Sync(); err != nil {
return y.Wrapf(err, "Unable to sync value log: %q", lf.path)
}
}
// Before we were acquiring a lock here on lf.lock, because we were invalidating the file
// descriptor due to reopening it as read-only. Now, we don't invalidate the fd, but unmap it,
// truncate it and remap it. That creates a window where we have segfaults because the mmap is
// no longer valid, while someone might be reading it. Therefore, we need a lock here again.
lf.lock.Lock()
defer lf.lock.Unlock()
if err := lf.Truncate(int64(offset)); err != nil {
return y.Wrapf(err, "Unable to truncate file: %q", lf.path)
}
// Previously we used to close the file after it was written and reopen it in read-only mode.
// We no longer open files in read-only mode. We keep all vlog files open in read-write mode.
return nil
}
// iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
// Therefore, the kv pair is only valid for the duration of fn call.
func (lf *logFile) iterate(readOnly bool, offset uint32, fn logEntry) (uint32, error) {
if offset == 0 {
// If offset is set to zero, let's advance past the encryption key header.
offset = vlogHeaderSize
}
// For now, read directly from file, because it allows
reader := bufio.NewReader(lf.NewReader(int(offset)))
read := &safeRead{
k: make([]byte, 10),
v: make([]byte, 10),
recordOffset: offset,
lf: lf,
}
var lastCommit uint64
var validEndOffset uint32 = offset
var entries []*Entry
var vptrs []valuePointer
loop:
for {
e, err := read.Entry(reader)
switch {
// We have not reached the end of the file but the entry we read is
// zero. This happens because we have truncated the file and
// zero'ed it out.
case err == io.EOF:
break loop
case err == io.ErrUnexpectedEOF || err == errTruncate:
break loop
case err != nil:
return 0, err
case e == nil:
continue
case e.isZero():
break loop
}
var vp valuePointer
vp.Len = uint32(int(e.hlen) + len(e.Key) + len(e.Value) + crc32.Size)
read.recordOffset += vp.Len
vp.Offset = e.offset
vp.Fid = lf.fid
switch {
case e.meta&bitTxn > 0:
txnTs := y.ParseTs(e.Key)
if lastCommit == 0 {
lastCommit = txnTs
}
if lastCommit != txnTs {
break loop
}
entries = append(entries, e)
vptrs = append(vptrs, vp)
case e.meta&bitFinTxn > 0:
txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
if err != nil || lastCommit != txnTs {
break loop
}
// Got the end of txn. Now we can store them.
lastCommit = 0
validEndOffset = read.recordOffset
for i, e := range entries {
vp := vptrs[i]
if err := fn(*e, vp); err != nil {
if err == errStop {
break
}
return 0, errFile(err, lf.path, "Iteration function")
}
}
entries = entries[:0]
vptrs = vptrs[:0]
default:
if lastCommit != 0 {
// This is most likely an entry which was moved as part of GC.
// We shouldn't get this entry in the middle of a transaction.
break loop
}
validEndOffset = read.recordOffset
if err := fn(*e, vp); err != nil {
if err == errStop {
break
}
return 0, errFile(err, lf.path, "Iteration function")
}
}
}
return validEndOffset, nil
}
// Zero out the next entry to deal with any crashes.
func (lf *logFile) zeroNextEntry() {
z.ZeroOut(lf.Data, int(lf.writeAt), int(lf.writeAt+maxHeaderSize))
}
func (lf *logFile) open(path string, flags int, fsize int64) error {
mf, ferr := z.OpenMmapFile(path, flags, int(fsize))
lf.MmapFile = mf
if ferr == z.NewFile {
if err := lf.bootstrap(); err != nil {
os.Remove(path)
return err
}
lf.size = vlogHeaderSize
} else if ferr != nil {
return y.Wrapf(ferr, "while opening file: %s", path)
}
lf.size = uint32(len(lf.Data))
if lf.size < vlogHeaderSize {
// Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize
// then it must have been corrupted. But no need to handle here. log replayer will truncate
// and bootstrap the logfile. So ignoring here.
return nil
}
// Copy over the encryption registry data.
buf := make([]byte, vlogHeaderSize)
y.AssertTruef(vlogHeaderSize == copy(buf, lf.Data),
"Unable to copy from %s, size %d", path, lf.size)
keyID := binary.BigEndian.Uint64(buf[:8])
// retrieve datakey.
if dk, err := lf.registry.DataKey(keyID); err != nil {
return y.Wrapf(err, "While opening vlog file %d", lf.fid)
} else {
lf.dataKey = dk
}
lf.baseIV = buf[8:]
y.AssertTrue(len(lf.baseIV) == 12)
// Preserved ferr so we can return if this was a new file.
return ferr
}
// bootstrap will initialize the log file with key id and baseIV.
// The below figure shows the layout of log file.
// +----------------+------------------+------------------+
// | keyID(8 bytes) | baseIV(12 bytes)| entry... |
// +----------------+------------------+------------------+
func (lf *logFile) bootstrap() error {
var err error
// generate data key for the log file.
var dk *pb.DataKey
if dk, err = lf.registry.LatestDataKey(); err != nil {
return y.Wrapf(err, "Error while retrieving datakey in logFile.bootstarp")
}
lf.dataKey = dk
// We'll always preserve vlogHeaderSize for key id and baseIV.
buf := make([]byte, vlogHeaderSize)
// write key id to the buf.
// key id will be zero if the logfile is in plain text.
binary.BigEndian.PutUint64(buf[:8], lf.keyID())
// generate base IV. It'll be used with offset of the vptr to encrypt the entry.
if _, err := cryptorand.Read(buf[8:]); err != nil {
return y.Wrapf(err, "Error while creating base IV, while creating logfile")
}
// Initialize base IV.
lf.baseIV = buf[8:]
y.AssertTrue(len(lf.baseIV) == 12)
// Copy over to the logFile.
y.AssertTrue(vlogHeaderSize == copy(lf.Data[0:], buf))
// Zero out the next entry.
lf.zeroNextEntry()
return nil
}