blob: 1d97cde33bb3f289df8e80f0237a19b769e89cd6 [file] [log] [blame]
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"bytes"
"context"
"encoding/binary"
"flag"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"runtime"
"sort"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/dgraph-io/badger/v3/options"
"github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/ristretto/z"
)
// summary is produced when DB is closed. Currently it is used only for testing.
type summary struct {
fileIDs map[uint64]bool
}
func (s *levelsController) getSummary() *summary {
out := &summary{
fileIDs: make(map[uint64]bool),
}
for _, l := range s.levels {
l.getSummary(out)
}
return out
}
func (s *levelHandler) getSummary(sum *summary) {
s.RLock()
defer s.RUnlock()
for _, t := range s.tables {
sum.fileIDs[t.ID()] = true
}
}
func (s *DB) validate() error { return s.lc.validate() }
func getTestOptions(dir string) Options {
opt := DefaultOptions(dir).
WithMemTableSize(1 << 15).
WithBaseTableSize(1 << 15). // Force more compaction.
WithBaseLevelSize(4 << 15). // Force more compaction.
WithSyncWrites(false).
WithLoggingLevel(WARNING)
return opt
}
func getItemValue(t *testing.T, item *Item) (val []byte) {
t.Helper()
var v []byte
err := item.Value(func(val []byte) error {
v = append(v, val...)
return nil
})
if err != nil {
t.Error(err)
}
if v == nil {
return nil
}
another, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, v, another)
return v
}
func txnSet(t *testing.T, kv *DB, key []byte, val []byte, meta byte) {
txn := kv.NewTransaction(true)
require.NoError(t, txn.SetEntry(NewEntry(key, val).WithMeta(meta)))
require.NoError(t, txn.Commit())
}
func txnDelete(t *testing.T, kv *DB, key []byte) {
txn := kv.NewTransaction(true)
require.NoError(t, txn.Delete(key))
require.NoError(t, txn.Commit())
}
// Opens a badger db and runs a a test on it.
func runBadgerTest(t *testing.T, opts *Options, test func(t *testing.T, db *DB)) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
if opts == nil {
opts = new(Options)
*opts = getTestOptions(dir)
} else {
opts.Dir = dir
opts.ValueDir = dir
}
if opts.InMemory {
opts.Dir = ""
opts.ValueDir = ""
}
db, err := Open(*opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
test(t, db)
}
func TestWrite(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
for i := 0; i < 100; i++ {
txnSet(t, db, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i)), 0x00)
}
})
}
func TestUpdateAndView(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
err := db.Update(func(txn *Txn) error {
for i := 0; i < 10; i++ {
entry := NewEntry([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i)))
if err := txn.SetEntry(entry); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
err = db.View(func(txn *Txn) error {
for i := 0; i < 10; i++ {
item, err := txn.Get([]byte(fmt.Sprintf("key%d", i)))
if err != nil {
return err
}
expected := []byte(fmt.Sprintf("val%d", i))
if err := item.Value(func(val []byte) error {
require.Equal(t, expected, val,
"Invalid value for key %q. expected: %q, actual: %q",
item.Key(), expected, val)
return nil
}); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
})
}
func TestConcurrentWrite(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// Not a benchmark. Just a simple test for concurrent writes.
n := 20
m := 500
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < m; j++ {
txnSet(t, db, []byte(fmt.Sprintf("k%05d_%08d", i, j)),
[]byte(fmt.Sprintf("v%05d_%08d", i, j)), byte(j%127))
}
}(i)
}
wg.Wait()
t.Log("Starting iteration")
opt := IteratorOptions{}
opt.Reverse = false
opt.PrefetchSize = 10
opt.PrefetchValues = true
txn := db.NewTransaction(true)
it := txn.NewIterator(opt)
defer it.Close()
var i, j int
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
if k == nil {
break // end of iteration.
}
require.EqualValues(t, fmt.Sprintf("k%05d_%08d", i, j), string(k))
v := getItemValue(t, item)
require.EqualValues(t, fmt.Sprintf("v%05d_%08d", i, j), string(v))
require.Equal(t, item.UserMeta(), byte(j%127))
j++
if j == m {
i++
j = 0
}
}
require.EqualValues(t, n, i)
require.EqualValues(t, 0, j)
})
}
func TestGet(t *testing.T) {
test := func(t *testing.T, db *DB) {
txnSet(t, db, []byte("key1"), []byte("val1"), 0x08)
txn := db.NewTransaction(false)
item, err := txn.Get([]byte("key1"))
require.NoError(t, err)
require.EqualValues(t, "val1", getItemValue(t, item))
require.Equal(t, byte(0x08), item.UserMeta())
txn.Discard()
txnSet(t, db, []byte("key1"), []byte("val2"), 0x09)
txn = db.NewTransaction(false)
item, err = txn.Get([]byte("key1"))
require.NoError(t, err)
require.EqualValues(t, "val2", getItemValue(t, item))
require.Equal(t, byte(0x09), item.UserMeta())
txn.Discard()
txnDelete(t, db, []byte("key1"))
txn = db.NewTransaction(false)
_, err = txn.Get([]byte("key1"))
require.Equal(t, ErrKeyNotFound, err)
txn.Discard()
txnSet(t, db, []byte("key1"), []byte("val3"), 0x01)
txn = db.NewTransaction(false)
item, err = txn.Get([]byte("key1"))
require.NoError(t, err)
require.EqualValues(t, "val3", getItemValue(t, item))
require.Equal(t, byte(0x01), item.UserMeta())
longVal := make([]byte, 1000)
txnSet(t, db, []byte("key1"), longVal, 0x00)
txn = db.NewTransaction(false)
item, err = txn.Get([]byte("key1"))
require.NoError(t, err)
require.EqualValues(t, longVal, getItemValue(t, item))
txn.Discard()
}
t.Run("disk mode", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
test(t, db)
})
})
t.Run("InMemory mode", func(t *testing.T) {
opts := DefaultOptions("").WithInMemory(true)
db, err := Open(opts)
require.NoError(t, err)
test(t, db)
require.NoError(t, db.Close())
})
t.Run("cache enabled", func(t *testing.T) {
opts := DefaultOptions("").WithBlockCacheSize(10 << 20)
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
test(t, db)
})
})
}
func TestGetAfterDelete(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// populate with one entry
key := []byte("key")
txnSet(t, db, key, []byte("val1"), 0x00)
require.NoError(t, db.Update(func(txn *Txn) error {
err := txn.Delete(key)
require.NoError(t, err)
_, err = txn.Get(key)
require.Equal(t, ErrKeyNotFound, err)
return nil
}))
})
}
func TestTxnTooBig(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
data := func(i int) []byte {
return []byte(fmt.Sprintf("%b", i))
}
// n := 500000
n := 1000
txn := db.NewTransaction(true)
for i := 0; i < n; {
if err := txn.SetEntry(NewEntry(data(i), data(i))); err != nil {
require.NoError(t, txn.Commit())
txn = db.NewTransaction(true)
} else {
i++
}
}
require.NoError(t, txn.Commit())
txn = db.NewTransaction(true)
for i := 0; i < n; {
if err := txn.Delete(data(i)); err != nil {
require.NoError(t, txn.Commit())
txn = db.NewTransaction(true)
} else {
i++
}
}
require.NoError(t, txn.Commit())
})
}
func TestForceCompactL0(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
// This test relies on CompactL0OnClose
opts := getTestOptions(dir).WithCompactL0OnClose(true)
opts.ValueLogFileSize = 15 << 20
opts.managedTxns = true
db, err := Open(opts)
require.NoError(t, err)
data := func(i int) []byte {
return []byte(fmt.Sprintf("%b", i))
}
n := 80
m := 45 // Increasing would cause ErrTxnTooBig
sz := 32 << 10
v := make([]byte, sz)
for i := 0; i < n; i += 2 {
version := uint64(i)
txn := db.NewTransactionAt(version, true)
for j := 0; j < m; j++ {
require.NoError(t, txn.SetEntry(NewEntry(data(j), v)))
}
require.NoError(t, txn.CommitAt(version+1, nil))
}
db.Close()
opts.managedTxns = true
db, err = Open(opts)
require.NoError(t, err)
require.Equal(t, len(db.lc.levels[0].tables), 0)
require.NoError(t, db.Close())
}
func TestStreamDB(t *testing.T) {
check := func(db *DB) {
for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key%d", i))
val := []byte(fmt.Sprintf("val%d", i))
txn := db.NewTransactionAt(1, false)
item, err := txn.Get(key)
require.NoError(t, err)
require.EqualValues(t, val, getItemValue(t, item))
require.Equal(t, byte(0x00), item.UserMeta())
txn.Discard()
}
}
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir).
WithCompression(options.ZSTD).
WithBlockCacheSize(100 << 20)
db, err := OpenManaged(opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
writer := db.NewManagedWriteBatch()
for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key%d", i))
val := []byte(fmt.Sprintf("val%d", i))
require.NoError(t, writer.SetEntryAt(NewEntry(key, val).WithMeta(0x00), 1))
}
require.NoError(t, writer.Flush())
check(db)
outDir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
outOpt := getTestOptions(outDir)
require.NoError(t, db.StreamDB(outOpt))
outDB, err := OpenManaged(outOpt)
require.NoError(t, err)
defer func() {
require.NoError(t, outDB.Close())
}()
check(outDB)
}
func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})
return (size >> 20), err
}
// BenchmarkDbGrowth ensures DB does not grow with repeated adds and deletes.
//
// New keys are created with each for-loop iteration. During each
// iteration, the previous for-loop iteration's keys are deleted.
//
// To reproduce continous growth problem due to `badgerMove` keys,
// update `value.go` `discardEntry` line 1628 to return false
//
// Also with PR #1303, the delete keys are properly cleaned which
// further reduces disk size.
func BenchmarkDbGrowth(b *testing.B) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(b, err)
defer removeDir(dir)
start := 0
lastStart := 0
numKeys := 2000
valueSize := 1024
value := make([]byte, valueSize)
discardRatio := 0.001
maxWrites := 200
opts := getTestOptions(dir)
opts.ValueLogFileSize = 64 << 15
opts.BaseTableSize = 4 << 15
opts.BaseLevelSize = 16 << 15
opts.NumVersionsToKeep = 1
opts.NumLevelZeroTables = 1
opts.NumLevelZeroTablesStall = 2
db, err := Open(opts)
require.NoError(b, err)
for numWrites := 0; numWrites < maxWrites; numWrites++ {
txn := db.NewTransaction(true)
if start > 0 {
for i := lastStart; i < start; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key[:], uint64(i))
err := txn.Delete(key)
if err == ErrTxnTooBig {
require.NoError(b, txn.Commit())
txn = db.NewTransaction(true)
} else {
require.NoError(b, err)
}
}
}
for i := start; i < numKeys+start; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key[:], uint64(i))
err := txn.SetEntry(NewEntry(key, value))
if err == ErrTxnTooBig {
require.NoError(b, txn.Commit())
txn = db.NewTransaction(true)
} else {
require.NoError(b, err)
}
}
require.NoError(b, txn.Commit())
require.NoError(b, db.Flatten(1))
for {
err = db.RunValueLogGC(discardRatio)
if err == ErrNoRewrite {
break
} else {
require.NoError(b, err)
}
}
size, err := dirSize(dir)
require.NoError(b, err)
fmt.Printf("Badger DB Size = %dMB\n", size)
lastStart = start
start += numKeys
}
db.Close()
size, err := dirSize(dir)
require.NoError(b, err)
require.LessOrEqual(b, size, int64(16))
fmt.Printf("Badger DB Size = %dMB\n", size)
}
// Put a lot of data to move some data to disk.
// WARNING: This test might take a while but it should pass!
func TestGetMore(t *testing.T) {
if !*manual {
t.Skip("Skipping test meant to be run manually.")
return
}
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
data := func(i int) []byte {
return []byte(fmt.Sprintf("%b", i))
}
n := 200000
m := 45 // Increasing would cause ErrTxnTooBig
for i := 0; i < n; i += m {
if (i % 10000) == 0 {
fmt.Printf("Inserting i=%d\n", i)
}
txn := db.NewTransaction(true)
for j := i; j < i+m && j < n; j++ {
require.NoError(t, txn.SetEntry(NewEntry(data(j), data(j))))
}
require.NoError(t, txn.Commit())
}
require.NoError(t, db.validate())
for i := 0; i < n; i++ {
txn := db.NewTransaction(false)
item, err := txn.Get(data(i))
if err != nil {
t.Error(err)
}
require.EqualValues(t, string(data(i)), string(getItemValue(t, item)))
txn.Discard()
}
// Overwrite
for i := 0; i < n; i += m {
txn := db.NewTransaction(true)
for j := i; j < i+m && j < n; j++ {
require.NoError(t, txn.SetEntry(NewEntry(data(j),
// Use a long value that will certainly exceed value threshold.
[]byte(fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", j)))))
}
require.NoError(t, txn.Commit())
}
require.NoError(t, db.validate())
for i := 0; i < n; i++ {
expectedValue := fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", i)
k := data(i)
txn := db.NewTransaction(false)
item, err := txn.Get(k)
if err != nil {
t.Error(err)
}
got := string(getItemValue(t, item))
if expectedValue != got {
vs, err := db.get(y.KeyWithTs(k, math.MaxUint64))
require.NoError(t, err)
fmt.Printf("wanted=%q Item: %s\n", k, item)
fmt.Printf("on re-run, got version: %+v\n", vs)
txn := db.NewTransaction(false)
itr := txn.NewIterator(DefaultIteratorOptions)
for itr.Seek(k); itr.Valid(); itr.Next() {
item := itr.Item()
fmt.Printf("item=%s\n", item)
if !bytes.Equal(item.Key(), k) {
break
}
}
itr.Close()
txn.Discard()
}
require.EqualValues(t, expectedValue, string(getItemValue(t, item)), "wanted=%q Item: %s\n", k, item)
txn.Discard()
}
// "Delete" key.
for i := 0; i < n; i += m {
if (i % 10000) == 0 {
fmt.Printf("Deleting i=%d\n", i)
}
txn := db.NewTransaction(true)
for j := i; j < i+m && j < n; j++ {
require.NoError(t, txn.Delete(data(j)))
}
require.NoError(t, txn.Commit())
}
db.validate()
for i := 0; i < n; i++ {
if (i % 10000) == 0 {
// Display some progress. Right now, it's not very fast with no caching.
fmt.Printf("Testing i=%d\n", i)
}
k := data(i)
txn := db.NewTransaction(false)
_, err := txn.Get([]byte(k))
require.Equal(t, ErrKeyNotFound, err, "should not have found k: %q", k)
txn.Discard()
}
})
}
// Put a lot of data to move some data to disk.
// WARNING: This test might take a while but it should pass!
func TestExistsMore(t *testing.T) {
test := func(t *testing.T, db *DB) {
// n := 500000
n := 10000
m := 45
for i := 0; i < n; i += m {
if (i % 1000) == 0 {
t.Logf("Putting i=%d\n", i)
}
txn := db.NewTransaction(true)
for j := i; j < i+m && j < n; j++ {
require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("%09d", j)),
[]byte(fmt.Sprintf("%09d", j)))))
}
require.NoError(t, txn.Commit())
}
db.validate()
for i := 0; i < n; i++ {
if (i % 1000) == 0 {
fmt.Printf("Testing i=%d\n", i)
}
k := fmt.Sprintf("%09d", i)
require.NoError(t, db.View(func(txn *Txn) error {
_, err := txn.Get([]byte(k))
require.NoError(t, err)
return nil
}))
}
require.NoError(t, db.View(func(txn *Txn) error {
_, err := txn.Get([]byte("non-exists"))
require.Error(t, err)
return nil
}))
// "Delete" key.
for i := 0; i < n; i += m {
if (i % 1000) == 0 {
fmt.Printf("Deleting i=%d\n", i)
}
txn := db.NewTransaction(true)
for j := i; j < i+m && j < n; j++ {
require.NoError(t, txn.Delete([]byte(fmt.Sprintf("%09d", j))))
}
require.NoError(t, txn.Commit())
}
db.validate()
for i := 0; i < n; i++ {
if (i % 10000) == 0 {
// Display some progress. Right now, it's not very fast with no caching.
fmt.Printf("Testing i=%d\n", i)
}
k := fmt.Sprintf("%09d", i)
require.NoError(t, db.View(func(txn *Txn) error {
_, err := txn.Get([]byte(k))
require.Error(t, err)
return nil
}))
}
fmt.Println("Done and closing")
}
t.Run("disk mode", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
test(t, db)
})
})
t.Run("InMemory mode", func(t *testing.T) {
opt := DefaultOptions("").WithInMemory(true)
db, err := Open(opt)
require.NoError(t, err)
test(t, db)
require.NoError(t, db.Close())
})
}
func TestIterate2Basic(t *testing.T) {
test := func(t *testing.T, db *DB) {
bkey := func(i int) []byte {
return []byte(fmt.Sprintf("%09d", i))
}
bval := func(i int) []byte {
return []byte(fmt.Sprintf("%025d", i))
}
// n := 500000
n := 10000
for i := 0; i < n; i++ {
if (i % 1000) == 0 {
t.Logf("Put i=%d\n", i)
}
txnSet(t, db, bkey(i), bval(i), byte(i%127))
}
opt := IteratorOptions{}
opt.PrefetchValues = true
opt.PrefetchSize = 10
txn := db.NewTransaction(false)
it := txn.NewIterator(opt)
{
var count int
rewind := true
t.Log("Starting first basic iteration")
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.Key()
if rewind && count == 5000 {
// Rewind would skip /head/ key, and it.Next() would skip 0.
count = 1
it.Rewind()
t.Log("Rewinding from 5000 to zero.")
rewind = false
continue
}
require.EqualValues(t, bkey(count), string(key))
val := getItemValue(t, item)
require.EqualValues(t, bval(count), string(val))
require.Equal(t, byte(count%127), item.UserMeta())
count++
}
require.EqualValues(t, n, count)
}
{
t.Log("Starting second basic iteration")
idx := 5030
for it.Seek(bkey(idx)); it.Valid(); it.Next() {
item := it.Item()
require.EqualValues(t, bkey(idx), string(item.Key()))
require.EqualValues(t, bval(idx), string(getItemValue(t, item)))
idx++
}
}
it.Close()
}
t.Run("disk mode", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
test(t, db)
})
})
t.Run("InMemory mode", func(t *testing.T) {
opt := DefaultOptions("").WithInMemory(true)
db, err := Open(opt)
require.NoError(t, err)
test(t, db)
require.NoError(t, db.Close())
})
}
func TestLoad(t *testing.T) {
testLoad := func(t *testing.T, opt Options) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opt.Dir = dir
opt.ValueDir = dir
n := 10000
{
kv, err := Open(opt)
require.NoError(t, err)
for i := 0; i < n; i++ {
if (i % 10000) == 0 {
fmt.Printf("Putting i=%d\n", i)
}
k := []byte(fmt.Sprintf("%09d", i))
txnSet(t, kv, k, k, 0x00)
}
require.Equal(t, 10000, int(kv.orc.readTs()))
kv.Close()
}
kv, err := Open(opt)
require.NoError(t, err)
require.Equal(t, 10000, int(kv.orc.readTs()))
for i := 0; i < n; i++ {
if (i % 10000) == 0 {
fmt.Printf("Testing i=%d\n", i)
}
k := fmt.Sprintf("%09d", i)
require.NoError(t, kv.View(func(txn *Txn) error {
item, err := txn.Get([]byte(k))
require.NoError(t, err)
require.EqualValues(t, k, string(getItemValue(t, item)))
return nil
}))
}
kv.Close()
summary := kv.lc.getSummary()
// Check that files are garbage collected.
idMap := getIDMap(dir)
for fileID := range idMap {
// Check that name is in summary.filenames.
require.True(t, summary.fileIDs[fileID], "%d", fileID)
}
require.EqualValues(t, len(idMap), len(summary.fileIDs))
var fileIDs []uint64
for k := range summary.fileIDs { // Map to array.
fileIDs = append(fileIDs, k)
}
sort.Slice(fileIDs, func(i, j int) bool { return fileIDs[i] < fileIDs[j] })
fmt.Printf("FileIDs: %v\n", fileIDs)
}
t.Run("TestLoad Without Encryption/Compression", func(t *testing.T) {
opt := getTestOptions("")
opt.Compression = options.None
testLoad(t, opt)
})
t.Run("TestLoad With Encryption and no compression", func(t *testing.T) {
key := make([]byte, 32)
_, err := rand.Read(key)
require.NoError(t, err)
opt := getTestOptions("")
opt.EncryptionKey = key
opt.BlockCacheSize = 100 << 20
opt.IndexCacheSize = 100 << 20
opt.Compression = options.None
testLoad(t, opt)
})
t.Run("TestLoad With Encryption and compression", func(t *testing.T) {
key := make([]byte, 32)
_, err := rand.Read(key)
require.NoError(t, err)
opt := getTestOptions("")
opt.EncryptionKey = key
opt.Compression = options.ZSTD
opt.BlockCacheSize = 100 << 20
opt.IndexCacheSize = 100 << 20
testLoad(t, opt)
})
t.Run("TestLoad without Encryption and with compression", func(t *testing.T) {
opt := getTestOptions("")
opt.Compression = options.ZSTD
opt.BlockCacheSize = 100 << 20
testLoad(t, opt)
})
}
func TestIterateDeleted(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
txnSet(t, db, []byte("Key1"), []byte("Value1"), 0x00)
txnSet(t, db, []byte("Key2"), []byte("Value2"), 0x00)
iterOpt := DefaultIteratorOptions
iterOpt.PrefetchValues = false
txn := db.NewTransaction(false)
idxIt := txn.NewIterator(iterOpt)
defer idxIt.Close()
count := 0
txn2 := db.NewTransaction(true)
prefix := []byte("Key")
for idxIt.Seek(prefix); idxIt.ValidForPrefix(prefix); idxIt.Next() {
key := idxIt.Item().Key()
count++
newKey := make([]byte, len(key))
copy(newKey, key)
require.NoError(t, txn2.Delete(newKey))
}
require.Equal(t, 2, count)
require.NoError(t, txn2.Commit())
for _, prefetch := range [...]bool{true, false} {
t.Run(fmt.Sprintf("Prefetch=%t", prefetch), func(t *testing.T) {
txn := db.NewTransaction(false)
iterOpt = DefaultIteratorOptions
iterOpt.PrefetchValues = prefetch
idxIt = txn.NewIterator(iterOpt)
var estSize int64
var idxKeys []string
for idxIt.Seek(prefix); idxIt.Valid(); idxIt.Next() {
item := idxIt.Item()
key := item.Key()
estSize += item.EstimatedSize()
if !bytes.HasPrefix(key, prefix) {
break
}
idxKeys = append(idxKeys, string(key))
t.Logf("%+v\n", idxIt.Item())
}
require.Equal(t, 0, len(idxKeys))
require.Equal(t, int64(0), estSize)
})
}
})
}
func TestIterateParallel(t *testing.T) {
if !*manual {
t.Skip("Skipping test meant to be run manually.")
return
}
key := func(account int) []byte {
var b [4]byte
binary.BigEndian.PutUint32(b[:], uint32(account))
return append([]byte("account-"), b[:]...)
}
N := 100000
iterate := func(txn *Txn, wg *sync.WaitGroup) {
defer wg.Done()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
var count int
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
item := itr.Item()
require.Equal(t, "account-", string(item.Key()[0:8]))
err := item.Value(func(val []byte) error {
require.Equal(t, "1000", string(val))
return nil
})
require.NoError(t, err)
}
require.Equal(t, N, count)
itr.Close() // Double close.
}
opt := DefaultOptions("")
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
var wg sync.WaitGroup
var txns []*Txn
for i := 0; i < N; i++ {
wg.Add(1)
txn := db.NewTransaction(true)
require.NoError(t, txn.SetEntry(NewEntry(key(i), []byte("1000"))))
txns = append(txns, txn)
}
for _, txn := range txns {
txn.CommitWith(func(err error) {
y.Check(err)
wg.Done()
})
}
wg.Wait()
// Check that a RW txn can run multiple iterators.
txn := db.NewTransaction(true)
itr := txn.NewIterator(DefaultIteratorOptions)
require.NotPanics(t, func() {
// Now that multiple iterators are supported in read-write
// transactions, make sure this does not panic anymore. Then just
// close the iterator.
txn.NewIterator(DefaultIteratorOptions).Close()
})
// The transaction should still panic since there is still one pending
// iterator that is open.
require.Panics(t, txn.Discard)
itr.Close()
txn.Discard()
// (Regression) Make sure that creating multiple concurrent iterators
// within a read only transaction continues to work.
t.Run("multiple read-only iterators", func(t *testing.T) {
// Run multiple iterators for a RO txn.
txn = db.NewTransaction(false)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
})
// Make sure that when we create multiple concurrent iterators within a
// read-write transaction that it actually iterates successfully.
t.Run("multiple read-write iterators", func(t *testing.T) {
// Run multiple iterators for a RO txn.
txn = db.NewTransaction(true)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
})
})
}
func TestDeleteWithoutSyncWrite(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
kv, err := Open(DefaultOptions(dir))
if err != nil {
t.Error(err)
t.Fail()
}
key := []byte("k1")
// Set a value with size > value threshold so that its written to value log.
txnSet(t, kv, key, []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789FOOBARZOGZOG"), 0x00)
txnDelete(t, kv, key)
kv.Close()
// Reopen KV
kv, err = Open(DefaultOptions(dir))
if err != nil {
t.Error(err)
t.Fail()
}
defer kv.Close()
require.NoError(t, kv.View(func(txn *Txn) error {
_, err := txn.Get(key)
require.Equal(t, ErrKeyNotFound, err)
return nil
}))
}
func TestPidFile(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// Reopen database
_, err := Open(getTestOptions(db.opt.Dir))
require.Error(t, err)
require.Contains(t, err.Error(), "Another process is using this Badger database")
})
}
func TestInvalidKey(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
err := db.Update(func(txn *Txn) error {
err := txn.SetEntry(NewEntry([]byte("!badger!head"), nil))
require.Equal(t, ErrInvalidKey, err)
err = txn.SetEntry(NewEntry([]byte("!badger!"), nil))
require.Equal(t, ErrInvalidKey, err)
err = txn.SetEntry(NewEntry([]byte("!badger"), []byte("BadgerDB")))
require.NoError(t, err)
return err
})
require.NoError(t, err)
require.NoError(t, db.View(func(txn *Txn) error {
item, err := txn.Get([]byte("!badger"))
if err != nil {
return err
}
require.NoError(t, item.Value(func(val []byte) error {
require.Equal(t, []byte("BadgerDB"), val)
return nil
}))
return nil
}))
})
}
func TestIteratorPrefetchSize(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
bkey := func(i int) []byte {
return []byte(fmt.Sprintf("%09d", i))
}
bval := func(i int) []byte {
return []byte(fmt.Sprintf("%025d", i))
}
n := 100
for i := 0; i < n; i++ {
// if (i % 10) == 0 {
// t.Logf("Put i=%d\n", i)
// }
txnSet(t, db, bkey(i), bval(i), byte(i%127))
}
getIteratorCount := func(prefetchSize int) int {
opt := IteratorOptions{}
opt.PrefetchValues = true
opt.PrefetchSize = prefetchSize
var count int
txn := db.NewTransaction(false)
it := txn.NewIterator(opt)
{
t.Log("Starting first basic iteration")
for it.Rewind(); it.Valid(); it.Next() {
count++
}
require.EqualValues(t, n, count)
}
return count
}
var sizes = []int{-10, 0, 1, 10}
for _, size := range sizes {
c := getIteratorCount(size)
require.Equal(t, 100, c)
}
})
}
func TestSetIfAbsentAsync(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
kv, _ := Open(getTestOptions(dir))
bkey := func(i int) []byte {
return []byte(fmt.Sprintf("%09d", i))
}
f := func(err error) {}
n := 1000
for i := 0; i < n; i++ {
// if (i % 10) == 0 {
// t.Logf("Put i=%d\n", i)
// }
txn := kv.NewTransaction(true)
_, err = txn.Get(bkey(i))
require.Equal(t, ErrKeyNotFound, err)
require.NoError(t, txn.SetEntry(NewEntry(bkey(i), nil).WithMeta(byte(i%127))))
txn.CommitWith(f)
}
require.NoError(t, kv.Close())
kv, err = Open(getTestOptions(dir))
require.NoError(t, err)
opt := DefaultIteratorOptions
txn := kv.NewTransaction(false)
var count int
it := txn.NewIterator(opt)
{
t.Log("Starting first basic iteration")
for it.Rewind(); it.Valid(); it.Next() {
count++
}
require.EqualValues(t, n, count)
}
require.Equal(t, n, count)
require.NoError(t, kv.Close())
}
func TestGetSetRace(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
data := make([]byte, 4096)
_, err := rand.Read(data)
require.NoError(t, err)
var (
numOp = 100
wg sync.WaitGroup
keyCh = make(chan string)
)
// writer
wg.Add(1)
go func() {
defer func() {
wg.Done()
close(keyCh)
}()
for i := 0; i < numOp; i++ {
key := fmt.Sprintf("%d", i)
txnSet(t, db, []byte(key), data, 0x00)
keyCh <- key
}
}()
// reader
wg.Add(1)
go func() {
defer wg.Done()
for key := range keyCh {
require.NoError(t, db.View(func(txn *Txn) error {
item, err := txn.Get([]byte(key))
require.NoError(t, err)
err = item.Value(nil)
require.NoError(t, err)
return nil
}))
}
}()
wg.Wait()
})
}
func TestDiscardVersionsBelow(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// Write 4 versions of the same key
for i := 0; i < 4; i++ {
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("answer"), []byte(fmt.Sprintf("%d", i))))
})
require.NoError(t, err)
}
opts := DefaultIteratorOptions
opts.AllVersions = true
opts.PrefetchValues = false
// Verify that there are 4 versions, and record 3rd version (2nd from top in iteration)
db.View(func(txn *Txn) error {
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
count++
item := it.Item()
require.Equal(t, []byte("answer"), item.Key())
if item.DiscardEarlierVersions() {
break
}
}
require.Equal(t, 4, count)
return nil
})
// Set new version and discard older ones.
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("answer"), []byte("5")).WithDiscard())
})
require.NoError(t, err)
// Verify that there are only 2 versions left, and versions
// below ts have been deleted.
db.View(func(txn *Txn) error {
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
count++
item := it.Item()
require.Equal(t, []byte("answer"), item.Key())
if item.DiscardEarlierVersions() {
break
}
}
require.Equal(t, 1, count)
return nil
})
})
}
func TestExpiry(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// Write two keys, one with a TTL
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("answer1"), []byte("42")))
})
require.NoError(t, err)
err = db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("answer2"), []byte("43")).WithTTL(1 * time.Second))
})
require.NoError(t, err)
time.Sleep(2 * time.Second)
// Verify that only unexpired key is found during iteration
err = db.View(func(txn *Txn) error {
_, err := txn.Get([]byte("answer1"))
require.NoError(t, err)
_, err = txn.Get([]byte("answer2"))
require.Equal(t, ErrKeyNotFound, err)
return nil
})
require.NoError(t, err)
// Verify that only one key is found during iteration
opts := DefaultIteratorOptions
opts.PrefetchValues = false
err = db.View(func(txn *Txn) error {
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
count++
item := it.Item()
require.Equal(t, []byte("answer1"), item.Key())
}
require.Equal(t, 1, count)
return nil
})
require.NoError(t, err)
})
}
func TestExpiryImproperDBClose(t *testing.T) {
testReplay := func(opt Options) {
// L0 compaction doesn't affect the test in any way. It is set to allow
// graceful shutdown of db0.
db0, err := Open(opt.WithCompactL0OnClose(false))
require.NoError(t, err)
dur := 1 * time.Hour
expiryTime := uint64(time.Now().Add(dur).Unix())
err = db0.Update(func(txn *Txn) error {
err = txn.SetEntry(NewEntry([]byte("test_key"), []byte("test_value")).WithTTL(dur))
require.NoError(t, err)
return nil
})
require.NoError(t, err)
// Simulate a crash by not closing db0, but releasing the locks.
if db0.dirLockGuard != nil {
require.NoError(t, db0.dirLockGuard.release())
db0.dirLockGuard = nil
}
if db0.valueDirGuard != nil {
require.NoError(t, db0.valueDirGuard.release())
db0.valueDirGuard = nil
}
require.NoError(t, db0.Close())
db1, err := Open(opt)
require.NoError(t, err)
err = db1.View(func(txn *Txn) error {
itm, err := txn.Get([]byte("test_key"))
require.NoError(t, err)
require.True(t, expiryTime <= itm.ExpiresAt() && itm.ExpiresAt() <= uint64(time.Now().Add(dur).Unix()),
"expiry time of entry is invalid")
return nil
})
require.NoError(t, err)
require.NoError(t, db1.Close())
}
t.Run("Test plain text", func(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opt := getTestOptions(dir)
testReplay(opt)
})
t.Run("Test encryption", func(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opt := getTestOptions(dir)
key := make([]byte, 32)
_, err = rand.Read(key)
require.NoError(t, err)
opt.EncryptionKey = key
opt.BlockCacheSize = 10 << 20
opt.IndexCacheSize = 10 << 20
testReplay(opt)
})
}
func randBytes(n int) []byte {
recv := make([]byte, n)
in, err := rand.Read(recv)
if err != nil {
panic(err)
}
return recv[:in]
}
var benchmarkData = []struct {
key, value []byte
success bool // represent if KV should be inserted successfully or not
}{
{randBytes(100), nil, true},
{randBytes(1000), []byte("foo"), true},
{[]byte("foo"), randBytes(1000), true},
{[]byte(""), randBytes(1000), false},
{nil, randBytes(1000000), false},
{randBytes(100000), nil, false},
{randBytes(1000000), nil, false},
}
func TestLargeKeys(t *testing.T) {
test := func(t *testing.T, opt Options) {
db, err := Open(opt)
require.NoError(t, err)
for i := 0; i < 1000; i++ {
tx := db.NewTransaction(true)
for _, kv := range benchmarkData {
k := make([]byte, len(kv.key))
copy(k, kv.key)
v := make([]byte, len(kv.value))
copy(v, kv.value)
if err := tx.SetEntry(NewEntry(k, v)); err != nil {
// check is success should be true
if kv.success {
t.Fatalf("failed with: %s", err)
}
} else if !kv.success {
t.Fatal("insertion should fail")
}
}
if err := tx.Commit(); err != nil {
t.Fatalf("#%d: batchSet err: %v", i, err)
}
}
require.NoError(t, db.Close())
}
t.Run("disk mode", func(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opt := DefaultOptions(dir).WithValueLogFileSize(1024 * 1024 * 1024)
test(t, opt)
})
t.Run("InMemory mode", func(t *testing.T) {
opt := DefaultOptions("").WithValueLogFileSize(1024 * 1024 * 1024)
opt.InMemory = true
test(t, opt)
})
}
func TestCreateDirs(t *testing.T) {
dir, err := ioutil.TempDir("", "parent")
require.NoError(t, err)
defer removeDir(dir)
db, err := Open(DefaultOptions(filepath.Join(dir, "badger")))
require.NoError(t, err)
require.NoError(t, db.Close())
_, err = os.Stat(dir)
require.NoError(t, err)
}
func TestGetSetDeadlock(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
fmt.Println(dir)
require.NoError(t, err)
defer removeDir(dir)
db, err := Open(DefaultOptions(dir).WithValueLogFileSize(1 << 20))
require.NoError(t, err)
defer db.Close()
val := make([]byte, 1<<19)
key := []byte("key1")
require.NoError(t, db.Update(func(txn *Txn) error {
rand.Read(val)
require.NoError(t, txn.SetEntry(NewEntry(key, val)))
return nil
}))
timeout, done := time.After(10*time.Second), make(chan bool)
go func() {
db.Update(func(txn *Txn) error {
item, err := txn.Get(key)
require.NoError(t, err)
err = item.Value(nil) // This take a RLock on file
require.NoError(t, err)
rand.Read(val)
require.NoError(t, txn.SetEntry(NewEntry(key, val)))
require.NoError(t, txn.SetEntry(NewEntry([]byte("key2"), val)))
return nil
})
done <- true
}()
select {
case <-timeout:
t.Fatal("db.Update did not finish within 10s, assuming deadlock.")
case <-done:
t.Log("db.Update finished.")
}
}
func TestWriteDeadlock(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
db, err := Open(DefaultOptions(dir).WithValueLogFileSize(10 << 20))
require.NoError(t, err)
defer db.Close()
print := func(count *int) {
*count++
if *count%100 == 0 {
fmt.Printf("%05d\r", *count)
}
}
var count int
val := make([]byte, 10000)
require.NoError(t, db.Update(func(txn *Txn) error {
for i := 0; i < 1500; i++ {
key := fmt.Sprintf("%d", i)
rand.Read(val)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key), val)))
print(&count)
}
return nil
}))
count = 0
fmt.Println("\nWrites done. Iteration and updates starting...")
err = db.Update(func(txn *Txn) error {
opt := DefaultIteratorOptions
opt.PrefetchValues = false
it := txn.NewIterator(opt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
// Using Value() would cause deadlock.
// item.Value()
out, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, len(val), len(out))
key := y.Copy(item.Key())
rand.Read(val)
require.NoError(t, txn.SetEntry(NewEntry(key, val)))
print(&count)
}
return nil
})
require.NoError(t, err)
}
func TestSequence(t *testing.T) {
key0 := []byte("seq0")
key1 := []byte("seq1")
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
seq0, err := db.GetSequence(key0, 10)
require.NoError(t, err)
seq1, err := db.GetSequence(key1, 100)
require.NoError(t, err)
for i := uint64(0); i < uint64(105); i++ {
num, err := seq0.Next()
require.NoError(t, err)
require.Equal(t, i, num)
num, err = seq1.Next()
require.NoError(t, err)
require.Equal(t, i, num)
}
err = db.View(func(txn *Txn) error {
item, err := txn.Get(key0)
if err != nil {
return err
}
var num0 uint64
if err := item.Value(func(val []byte) error {
num0 = binary.BigEndian.Uint64(val)
return nil
}); err != nil {
return err
}
require.Equal(t, uint64(110), num0)
item, err = txn.Get(key1)
if err != nil {
return err
}
var num1 uint64
if err := item.Value(func(val []byte) error {
num1 = binary.BigEndian.Uint64(val)
return nil
}); err != nil {
return err
}
require.Equal(t, uint64(200), num1)
return nil
})
require.NoError(t, err)
})
}
func TestSequence_Release(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// get sequence, use once and release
key := []byte("key")
seq, err := db.GetSequence(key, 1000)
require.NoError(t, err)
num, err := seq.Next()
require.NoError(t, err)
require.Equal(t, uint64(0), num)
require.NoError(t, seq.Release())
// we used up 0 and 1 should be stored now
err = db.View(func(txn *Txn) error {
item, err := txn.Get(key)
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
require.Equal(t, num+1, binary.BigEndian.Uint64(val))
return nil
})
require.NoError(t, err)
// using it again will lease 1+1000
num, err = seq.Next()
require.NoError(t, err)
require.Equal(t, uint64(1), num)
err = db.View(func(txn *Txn) error {
item, err := txn.Get(key)
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
require.Equal(t, uint64(1001), binary.BigEndian.Uint64(val))
return nil
})
require.NoError(t, err)
})
}
func TestTestSequence2(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
key := []byte("key")
seq1, err := db.GetSequence(key, 2)
require.NoError(t, err)
seq2, err := db.GetSequence(key, 2)
require.NoError(t, err)
num, err := seq2.Next()
require.NoError(t, err)
require.Equal(t, uint64(2), num)
require.NoError(t, seq2.Release())
require.NoError(t, seq1.Release())
seq3, err := db.GetSequence(key, 2)
require.NoError(t, err)
for i := 0; i < 5; i++ {
num2, err := seq3.Next()
require.NoError(t, err)
require.Equal(t, uint64(i)+3, num2)
}
require.NoError(t, seq3.Release())
})
}
func TestReadOnly(t *testing.T) {
t.Skipf("TODO: ReadOnly needs truncation, so this fails")
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
// Create the DB
db, err := Open(opts)
require.NoError(t, err)
for i := 0; i < 10000; i++ {
txnSet(t, db, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)), 0x00)
}
// Attempt a read-only open while it's open read-write.
opts.ReadOnly = true
_, err = Open(opts)
require.Error(t, err)
if err == ErrWindowsNotSupported {
require.NoError(t, db.Close())
return
}
require.Contains(t, err.Error(), "Another process is using this Badger database")
db.Close()
// Open one read-only
opts.ReadOnly = true
kv1, err := Open(opts)
require.NoError(t, err)
defer kv1.Close()
// Open another read-only
kv2, err := Open(opts)
require.NoError(t, err)
defer kv2.Close()
// Attempt a read-write open while it's open for read-only
opts.ReadOnly = false
_, err = Open(opts)
require.Error(t, err)
require.Contains(t, err.Error(), "Another process is using this Badger database")
// Get a thing from the DB
txn1 := kv1.NewTransaction(true)
v1, err := txn1.Get([]byte("key1"))
require.NoError(t, err)
b1, err := v1.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, b1, []byte("value1"))
err = txn1.Commit()
require.NoError(t, err)
// Get a thing from the DB via the other connection
txn2 := kv2.NewTransaction(true)
v2, err := txn2.Get([]byte("key2000"))
require.NoError(t, err)
b2, err := v2.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, b2, []byte("value2000"))
err = txn2.Commit()
require.NoError(t, err)
// Attempt to set a value on a read-only connection
txn := kv1.NewTransaction(true)
err = txn.SetEntry(NewEntry([]byte("key"), []byte("value")))
require.Error(t, err)
require.Contains(t, err.Error(), "No sets or deletes are allowed in a read-only transaction")
err = txn.Commit()
require.NoError(t, err)
}
func TestLSMOnly(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := LSMOnlyOptions(dir)
dopts := DefaultOptions(dir)
require.NotEqual(t, dopts.ValueThreshold, opts.ValueThreshold)
dopts.ValueThreshold = 1 << 21
_, err = Open(dopts)
require.Contains(t, err.Error(), "Invalid ValueThreshold")
// Also test for error, when ValueThresholdSize is greater than maxBatchSize.
dopts.ValueThreshold = LSMOnlyOptions(dir).ValueThreshold
// maxBatchSize is calculated from MaxTableSize.
dopts.MemTableSize = int64(LSMOnlyOptions(dir).ValueThreshold)
_, err = Open(dopts)
require.Error(t, err, "db creation should have been failed")
require.Contains(t, err.Error(),
fmt.Sprintf("Valuethreshold %d greater than max batch size", dopts.ValueThreshold))
opts.ValueLogMaxEntries = 100
db, err := Open(opts)
require.NoError(t, err)
value := make([]byte, 128)
_, err = rand.Read(value)
for i := 0; i < 500; i++ {
require.NoError(t, err)
txnSet(t, db, []byte(fmt.Sprintf("key%d", i)), value, 0x00)
}
require.NoError(t, db.Close())
}
// This test function is doing some intricate sorcery.
func TestMinReadTs(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
for i := 0; i < 10; i++ {
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("x"), []byte("y")))
}))
}
time.Sleep(time.Millisecond)
readTxn0 := db.NewTransaction(false)
require.Equal(t, uint64(10), readTxn0.readTs)
min := db.orc.readMark.DoneUntil()
require.Equal(t, uint64(9), min)
readTxn := db.NewTransaction(false)
for i := 0; i < 10; i++ {
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("x"), []byte("y")))
}))
}
require.Equal(t, uint64(20), db.orc.readTs())
time.Sleep(time.Millisecond)
require.Equal(t, min, db.orc.readMark.DoneUntil())
readTxn0.Discard()
readTxn.Discard()
time.Sleep(time.Millisecond)
require.Equal(t, uint64(19), db.orc.readMark.DoneUntil())
db.orc.readMark.Done(uint64(20)) // Because we called readTs.
for i := 0; i < 10; i++ {
db.View(func(txn *Txn) error {
return nil
})
}
time.Sleep(time.Millisecond)
require.Equal(t, uint64(20), db.orc.readMark.DoneUntil())
})
}
func TestGoroutineLeak(t *testing.T) {
if !*manual {
t.Skip("Skipping test meant to be run manually.")
return
}
test := func(t *testing.T, opt *Options) {
time.Sleep(1 * time.Second)
before := runtime.NumGoroutine()
t.Logf("Num go: %d", before)
for i := 0; i < 12; i++ {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
updated := false
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
err := db.Subscribe(ctx, func(kvs *pb.KVList) error {
require.Equal(t, []byte("value"), kvs.Kv[0].GetValue())
updated = true
wg.Done()
return nil
}, []byte("key"))
if err != nil {
require.Equal(t, err.Error(), context.Canceled.Error())
}
}()
// Wait for the go routine to be scheduled.
time.Sleep(time.Second)
err := db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("key"), []byte("value")))
})
require.NoError(t, err)
wg.Wait()
cancel()
require.Equal(t, true, updated)
})
}
time.Sleep(2 * time.Second)
require.Equal(t, before, runtime.NumGoroutine())
}
t.Run("disk mode", func(t *testing.T) {
test(t, nil)
})
t.Run("InMemory mode", func(t *testing.T) {
opt := DefaultOptions("").WithInMemory(true)
test(t, &opt)
})
}
func ExampleOpen() {
dir, err := ioutil.TempDir("", "badger-test")
if err != nil {
panic(err)
}
defer removeDir(dir)
db, err := Open(DefaultOptions(dir))
if err != nil {
panic(err)
}
defer db.Close()
err = db.View(func(txn *Txn) error {
_, err := txn.Get([]byte("key"))
// We expect ErrKeyNotFound
fmt.Println(err)
return nil
})
if err != nil {
panic(err)
}
txn := db.NewTransaction(true) // Read-write txn
err = txn.SetEntry(NewEntry([]byte("key"), []byte("value")))
if err != nil {
panic(err)
}
err = txn.Commit()
if err != nil {
panic(err)
}
err = db.View(func(txn *Txn) error {
item, err := txn.Get([]byte("key"))
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
fmt.Printf("%s\n", string(val))
return nil
})
if err != nil {
panic(err)
}
// Output:
// Key not found
// value
}
func ExampleTxn_NewIterator() {
dir, err := ioutil.TempDir("", "badger-test")
if err != nil {
panic(err)
}
defer removeDir(dir)
db, err := Open(DefaultOptions(dir))
if err != nil {
panic(err)
}
defer db.Close()
bkey := func(i int) []byte {
return []byte(fmt.Sprintf("%09d", i))
}
bval := func(i int) []byte {
return []byte(fmt.Sprintf("%025d", i))
}
txn := db.NewTransaction(true)
// Fill in 1000 items
n := 1000
for i := 0; i < n; i++ {
err := txn.SetEntry(NewEntry(bkey(i), bval(i)))
if err != nil {
panic(err)
}
}
err = txn.Commit()
if err != nil {
panic(err)
}
opt := DefaultIteratorOptions
opt.PrefetchSize = 10
// Iterate over 1000 items
var count int
err = db.View(func(txn *Txn) error {
it := txn.NewIterator(opt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
count++
}
return nil
})
if err != nil {
panic(err)
}
fmt.Printf("Counted %d elements", count)
// Output:
// Counted 1000 elements
}
func TestSyncForRace(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
db, err := Open(DefaultOptions(dir).WithSyncWrites(false))
require.NoError(t, err)
defer db.Close()
closeChan := make(chan struct{})
doneChan := make(chan struct{})
go func() {
ticker := time.NewTicker(100 * time.Microsecond)
for {
select {
case <-ticker.C:
if err := db.Sync(); err != nil {
require.NoError(t, err)
}
db.opt.Debugf("Sync Iteration completed")
case <-closeChan:
close(doneChan)
return
}
}
}()
sz := 128 << 10 // 5 entries per value log file.
v := make([]byte, sz)
rand.Read(v[:rand.Intn(sz)])
txn := db.NewTransaction(true)
for i := 0; i < 10000; i++ {
require.NoError(t, txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key%d", i)), v)))
if i%3 == 0 {
require.NoError(t, txn.Commit())
txn = db.NewTransaction(true)
}
if i%100 == 0 {
db.opt.Debugf("next 100 entries added to DB")
}
}
require.NoError(t, txn.Commit())
close(closeChan)
<-doneChan
}
func TestForceFlushMemtable(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err, "temp dir for badger count not be created")
ops := getTestOptions(dir)
ops.ValueLogMaxEntries = 1
db, err := Open(ops)
require.NoError(t, err, "error while openning db")
defer db.Close()
for i := 0; i < 3; i++ {
err = db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte(fmt.Sprintf("key-%d", i)),
[]byte(fmt.Sprintf("value-%d", i))))
})
require.NoError(t, err, "unable to set key and value")
}
time.Sleep(1 * time.Second)
// We want to make sure that memtable is flushed on disk. While flushing memtable to disk,
// latest head is also stored in it. Hence we will try to read head from disk. To make sure
// this. we will truncate all memtables.
db.Lock()
db.mt.DecrRef()
for _, mt := range db.imm {
mt.DecrRef()
}
db.imm = db.imm[:0]
db.mt, err = db.newMemTable()
require.NoError(t, err)
db.Unlock()
// Since we are inserting 3 entries and ValueLogMaxEntries is 1, there will be 3 rotation.
require.True(t, db.nextMemFid == 3,
fmt.Sprintf("expected fid: %d, actual fid: %d", 2, db.nextMemFid))
}
func TestVerifyChecksum(t *testing.T) {
testVerfiyCheckSum := func(t *testing.T, opt Options) {
path, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer os.Remove(path)
opt.ValueDir = path
opt.Dir = path
// use stream write for writing.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
value := make([]byte, 32)
y.Check2(rand.Read(value))
st := 0
buf := z.NewBuffer(10 << 20)
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(i))
KVToBuffer(&pb.KV{
Key: key,
Value: value,
StreamId: uint32(st),
Version: 1,
}, buf)
if i%100 == 0 {
st++
}
}
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
require.NoError(t, db.VerifyChecksum(), "checksum verification failed for DB")
})
}
t.Run("Testing Verify Checksum without encryption", func(t *testing.T) {
testVerfiyCheckSum(t, getTestOptions(""))
})
t.Run("Testing Verify Checksum with Encryption", func(t *testing.T) {
key := make([]byte, 32)
_, err := rand.Read(key)
require.NoError(t, err)
opt := getTestOptions("")
opt.EncryptionKey = key
opt.BlockCacheSize = 1 << 20
opt.IndexCacheSize = 1 << 20
testVerfiyCheckSum(t, opt)
})
}
func TestMain(m *testing.M) {
flag.Parse()
os.Exit(m.Run())
}
func removeDir(dir string) {
if err := os.RemoveAll(dir); err != nil {
panic(err)
}
}
func TestWriteInemory(t *testing.T) {
opt := DefaultOptions("").WithInMemory(true)
db, err := Open(opt)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
for i := 0; i < 100; i++ {
txnSet(t, db, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i)), 0x00)
}
err = db.View(func(txn *Txn) error {
for j := 0; j < 100; j++ {
item, err := txn.Get([]byte(fmt.Sprintf("key%d", j)))
require.NoError(t, err)
expected := []byte(fmt.Sprintf("val%d", j))
item.Value(func(val []byte) error {
require.Equal(t, expected, val,
"Invalid value for key %q. expected: %q, actual: %q",
item.Key(), expected, val)
return nil
})
}
return nil
})
require.NoError(t, err)
}
func TestMinCacheSize(t *testing.T) {
opt := DefaultOptions("").
WithInMemory(true).
WithIndexCacheSize(16).
WithBlockCacheSize(16)
db, err := Open(opt)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
}
func TestUpdateMaxCost(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err, "temp dir for badger count not be created")
defer os.RemoveAll(dir)
ops := getTestOptions(dir).
WithBlockCacheSize(1 << 20).
WithIndexCacheSize(2 << 20)
db, err := Open(ops)
require.NoError(t, err)
cost, err := db.CacheMaxCost(BlockCache, -1)
require.NoError(t, err)
require.Equal(t, int64(1<<20), cost)
cost, err = db.CacheMaxCost(IndexCache, -1)
require.NoError(t, err)
require.Equal(t, int64(2<<20), cost)
_, err = db.CacheMaxCost(BlockCache, 2<<20)
require.NoError(t, err)
cost, err = db.CacheMaxCost(BlockCache, -1)
require.NoError(t, err)
require.Equal(t, int64(2<<20), cost)
_, err = db.CacheMaxCost(IndexCache, 4<<20)
require.NoError(t, err)
cost, err = db.CacheMaxCost(IndexCache, -1)
require.NoError(t, err)
require.Equal(t, int64(4<<20), cost)
}
func TestOpenDBReadOnly(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer os.RemoveAll(dir)
mp := make(map[string][]byte)
ops := getTestOptions(dir)
ops.ReadOnly = false
db, err := Open(ops)
require.NoError(t, err)
// Add bunch of entries that don't go into value log.
require.NoError(t, db.Update(func(txn *Txn) error {
val := make([]byte, 10)
rand.Read(val)
for i := 0; i < 10; i++ {
key := fmt.Sprintf("key-%05d", i)
require.NoError(t, txn.Set([]byte(key), val))
mp[key] = val
}
return nil
}))
require.NoError(t, db.Close())
ops.ReadOnly = true
db, err = Open(ops)
require.NoError(t, err)
require.NoError(t, db.Close())
db, err = Open(ops)
require.NoError(t, err)
var count int
read := func() {
count = 0
db.View(func(txn *Txn) error {
it := txn.NewIterator(DefaultIteratorOptions)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
require.NoError(t, item.Value(func(val []byte) error {
require.Equal(t, mp[string(item.Key())], val)
return nil
}))
count++
}
return nil
})
}
read()
require.Equal(t, 10, count)
require.NoError(t, db.Close())
ops.ReadOnly = false
db, err = Open(ops)
require.NoError(t, err)
// Add bunch of entries that go into value log.
require.NoError(t, db.Update(func(txn *Txn) error {
require.Greater(t, db.opt.ValueThreshold, 10)
val := make([]byte, db.opt.ValueThreshold+10)
rand.Read(val)
for i := 0; i < 10; i++ {
key := fmt.Sprintf("KEY-%05d", i)
require.NoError(t, txn.Set([]byte(key), val))
mp[key] = val
}
return nil
}))
require.NoError(t, db.Close())
ops.ReadOnly = true
db, err = Open(ops)
require.NoError(t, err)
read()
require.Equal(t, 20, count)
require.NoError(t, db.Close())
}