blob: 96434d813c90710bb0d70cfa5df2ade71cbcac18 [file] [log] [blame]
package badger
import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/ristretto/z"
"github.com/stretchr/testify/require"
)
func val(large bool) []byte {
var buf []byte
if large {
buf = make([]byte, 8192)
} else {
buf = make([]byte, 16)
}
rand.Read(buf)
return buf
}
func numKeys(db *DB) int {
var count int
err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
}
return nil
})
y.Check(err)
return count
}
func numKeysManaged(db *DB, readTs uint64) int {
txn := db.NewTransactionAt(readTs, false)
defer txn.Discard()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
var count int
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
}
return count
}
func TestDropAllManaged(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.managedTxns = true
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(10000)
populate := func(db *DB, start uint64) {
var wg sync.WaitGroup
for i := start; i < start+N; i++ {
wg.Add(1)
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", int(i))), val(true))))
require.NoError(t, txn.CommitAt(uint64(i), func(err error) {
require.NoError(t, err)
wg.Done()
}))
}
wg.Wait()
}
populate(db, N)
require.Equal(t, int(N), numKeysManaged(db, math.MaxUint64))
require.NoError(t, db.DropAll())
require.NoError(t, db.DropAll()) // Just call it twice, for fun.
require.Equal(t, 0, numKeysManaged(db, math.MaxUint64))
// Check that we can still write to db, and using lower timestamps.
populate(db, 1)
require.Equal(t, int(N), numKeysManaged(db, math.MaxUint64))
require.NoError(t, db.Close())
// Ensure that value log is correctly replayed, that we are preserving badgerHead.
opts.managedTxns = true
db2, err := Open(opts)
require.NoError(t, err)
require.Equal(t, int(N), numKeysManaged(db2, math.MaxUint64))
require.NoError(t, db2.Close())
}
func TestDropAll(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.DropAll())
require.Equal(t, 0, numKeys(db))
// Check that we can still write to mdb, and using lower timestamps.
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.Close())
// Ensure that value log is correctly replayed.
db2, err := Open(opts)
require.NoError(t, err)
require.Equal(t, int(N), numKeys(db2))
require.NoError(t, db2.Close())
}
func TestDropAllTwice(t *testing.T) {
test := func(t *testing.T, opts Options) {
db, err := Open(opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(false)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.DropAll())
require.Equal(t, 0, numKeys(db))
// Call DropAll again.
require.NoError(t, db.DropAll())
require.NoError(t, db.Close())
}
t.Run("disk mode", func(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
test(t, opts)
})
t.Run("InMemory mode", func(t *testing.T) {
opts := getTestOptions("")
opts.InMemory = true
test(t, opts)
})
}
func TestDropAllWithPendingTxn(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
txn := db.NewTransaction(true)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
var keys []string
for {
var count int
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
item := itr.Item()
keys = append(keys, string(item.KeyCopy(nil)))
_, err := item.ValueCopy(nil)
if err != nil {
t.Logf("Got error during value copy: %v", err)
return
}
}
t.Logf("Got number of keys: %d\n", count)
for _, key := range keys {
item, err := txn.Get([]byte(key))
if err != nil {
t.Logf("Got error during key lookup: %v", err)
return
}
if _, err := item.ValueCopy(nil); err != nil {
t.Logf("Got error during second value copy: %v", err)
return
}
}
}
}()
// Do not cancel txn.
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
require.NoError(t, db.DropAll())
}()
wg.Wait()
}
func TestDropReadOnly(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.Close())
opts.ReadOnly = true
db2, err := Open(opts)
// acquireDirectoryLock returns ErrWindowsNotSupported on Windows. It can be ignored safely.
if runtime.GOOS == "windows" {
require.Equal(t, err, ErrWindowsNotSupported)
} else {
require.NoError(t, err)
require.Panics(t, func() { db2.DropAll() })
require.NoError(t, db2.Close())
}
}
func TestWriteAfterClose(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.Close())
err = db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("a"), []byte("b")))
})
require.Equal(t, ErrDBClosed, err)
}
func TestDropAllRace(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.managedTxns = true
db, err := Open(opts)
require.NoError(t, err)
N := 10000
// Start a goroutine to keep trying to write to DB while DropAll happens.
closer := z.NewCloser(1)
go func() {
defer closer.Done()
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
i := N + 1 // Writes would happen above N.
var errors int32
for {
select {
case <-ticker.C:
i++
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", i)), val(false))))
if err := txn.CommitAt(uint64(i), func(err error) {
if err != nil {
atomic.AddInt32(&errors, 1)
}
}); err != nil {
atomic.AddInt32(&errors, 1)
}
case <-closer.HasBeenClosed():
// The following causes a data race.
// t.Logf("i: %d. Number of (expected) write errors: %d.\n", i, errors)
return
}
}
}()
var wg sync.WaitGroup
for i := 1; i <= N; i++ {
wg.Add(1)
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", i)), val(false))))
require.NoError(t, txn.CommitAt(uint64(i), func(err error) {
require.NoError(t, err)
wg.Done()
}))
}
wg.Wait()
before := numKeysManaged(db, math.MaxUint64)
require.True(t, before > N)
require.NoError(t, db.DropAll())
closer.SignalAndWait()
after := numKeysManaged(db, math.MaxUint64)
t.Logf("Before: %d. After dropall: %d\n", before, after)
require.True(t, after < before)
db.Close()
}
func TestDropPrefix(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.DropPrefix([]byte("key000")))
require.Equal(t, int(N)-10, numKeys(db))
require.NoError(t, db.DropPrefix([]byte("key00")))
require.Equal(t, int(N)-100, numKeys(db))
expected := int(N)
for i := 0; i < 10; i++ {
require.NoError(t, db.DropPrefix([]byte(fmt.Sprintf("key%d", i))))
expected -= 1000
require.Equal(t, expected, numKeys(db))
}
require.NoError(t, db.DropPrefix([]byte("key1")))
require.Equal(t, 0, numKeys(db))
require.NoError(t, db.DropPrefix([]byte("key")))
require.Equal(t, 0, numKeys(db))
// Check that we can still write to mdb, and using lower timestamps.
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.DropPrefix([]byte("key")))
require.Equal(t, 0, numKeys(db))
require.NoError(t, db.Close())
// Ensure that value log is correctly replayed.
db2, err := Open(opts)
require.NoError(t, err)
require.Equal(t, 0, numKeys(db2))
require.NoError(t, db2.Close())
}
func TestDropPrefixWithPendingTxn(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
txn := db.NewTransaction(true)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
var keys []string
for {
var count int
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
item := itr.Item()
keys = append(keys, string(item.KeyCopy(nil)))
_, err := item.ValueCopy(nil)
if err != nil {
t.Logf("Got error during value copy: %v", err)
return
}
}
t.Logf("Got number of keys: %d\n", count)
for _, key := range keys {
item, err := txn.Get([]byte(key))
if err != nil {
t.Logf("Got error during key lookup: %v", err)
return
}
if _, err := item.ValueCopy(nil); err != nil {
t.Logf("Got error during second value copy: %v", err)
return
}
}
}
}()
// Do not cancel txn.
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
require.NoError(t, db.DropPrefix([]byte("key0")))
require.NoError(t, db.DropPrefix([]byte("key00")))
require.NoError(t, db.DropPrefix([]byte("key")))
}()
wg.Wait()
}
func TestDropPrefixReadOnly(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.Close())
opts.ReadOnly = true
db2, err := Open(opts)
// acquireDirectoryLock returns ErrWindowsNotSupported on Windows. It can be ignored safely.
if runtime.GOOS == "windows" {
require.Equal(t, err, ErrWindowsNotSupported)
} else {
require.NoError(t, err)
require.Panics(t, func() { db2.DropPrefix([]byte("key0")) })
require.NoError(t, db2.Close())
}
}
func TestDropPrefixRace(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.managedTxns = true
db, err := Open(opts)
require.NoError(t, err)
N := 10000
// Start a goroutine to keep trying to write to DB while DropPrefix happens.
closer := z.NewCloser(1)
go func() {
defer closer.Done()
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
i := N + 1 // Writes would happen above N.
var errors int32
for {
select {
case <-ticker.C:
i++
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", i)), val(false))))
if err := txn.CommitAt(uint64(i), func(err error) {
if err != nil {
atomic.AddInt32(&errors, 1)
}
}); err != nil {
atomic.AddInt32(&errors, 1)
}
case <-closer.HasBeenClosed():
// The following causes a data race.
// t.Logf("i: %d. Number of (expected) write errors: %d.\n", i, errors)
return
}
}
}()
var wg sync.WaitGroup
for i := 1; i <= N; i++ {
wg.Add(1)
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", i)), val(false))))
require.NoError(t, txn.CommitAt(uint64(i), func(err error) {
require.NoError(t, err)
wg.Done()
}))
}
wg.Wait()
before := numKeysManaged(db, math.MaxUint64)
require.True(t, before > N)
require.NoError(t, db.DropPrefix([]byte("key00")))
require.NoError(t, db.DropPrefix([]byte("key1")))
require.NoError(t, db.DropPrefix([]byte("key")))
closer.SignalAndWait()
after := numKeysManaged(db, math.MaxUint64)
t.Logf("Before: %d. After dropprefix: %d\n", before, after)
require.True(t, after < before)
require.NoError(t, db.Close())
}
func TestWriteBatchManagedMode(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
opt.BaseTableSize = 1 << 20 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewWriteBatchAt(1)
defer wb.Cancel()
N, M := 50000, 1000
start := time.Now()
for i := 0; i < N; i++ {
require.NoError(t, wb.Set(key(i), val(i)))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.Delete(key(i)))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))
err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(1))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}
func TestWriteBatchManaged(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewManagedWriteBatch()
defer wb.Cancel()
N, M := 50000, 1000
start := time.Now()
for i := 0; i < N; i++ {
require.NoError(t, wb.SetEntryAt(&Entry{Key: key(i), Value: val(i)}, 1))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.DeleteAt(key(i), 2))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))
err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(1))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}
func TestWriteBatchDuplicate(t *testing.T) {
N := 10
k := []byte("key")
v := []byte("val")
readVerify := func(t *testing.T, db *DB, n int, versions []int) {
err := db.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true
itr := txn.NewIterator(iopt)
defer itr.Close()
i := 0
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, k, item.Key())
require.Equal(t, uint64(versions[i]), item.Version())
err := item.Value(func(val []byte) error {
require.Equal(t, v, val)
return nil
})
require.NoError(t, err)
i++
}
require.Equal(t, n, i)
return nil
})
require.NoError(t, err)
}
t.Run("writebatch", func(t *testing.T) {
opt := DefaultOptions("")
opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewWriteBatch()
defer wb.Cancel()
for i := uint64(0); i < uint64(N); i++ {
// Multiple versions of the same key.
require.NoError(t, wb.SetEntry(&Entry{Key: k, Value: v}))
}
require.NoError(t, wb.Flush())
readVerify(t, db, 1, []int{1})
})
})
t.Run("writebatch at", func(t *testing.T) {
opt := DefaultOptions("")
opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch.
opt.managedTxns = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewWriteBatchAt(10)
defer wb.Cancel()
for i := uint64(0); i < uint64(N); i++ {
// Multiple versions of the same key.
require.NoError(t, wb.SetEntry(&Entry{Key: k, Value: v}))
}
require.NoError(t, wb.Flush())
readVerify(t, db, 1, []int{10})
})
})
t.Run("managed writebatch", func(t *testing.T) {
opt := DefaultOptions("")
opt.managedTxns = true
opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewManagedWriteBatch()
defer wb.Cancel()
for i := uint64(0); i < uint64(N); i++ {
// Multiple versions of the same key.
require.NoError(t, wb.SetEntryAt(&Entry{Key: k, Value: v}, i))
}
require.NoError(t, wb.Flush())
readVerify(t, db, N, []int{9, 8, 7, 6, 5, 4, 3, 2, 1, 0})
})
})
}
func TestZeroDiscardStats(t *testing.T) {
N := uint64(10000)
populate := func(t *testing.T, db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
t.Run("after rewrite", func(t *testing.T) {
opts := getTestOptions("")
opts.ValueLogFileSize = 5 << 20
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
populate(t, db)
require.Equal(t, int(N), numKeys(db))
fids := db.vlog.sortedFids()
for _, fid := range fids {
db.vlog.discardStats.Update(uint32(fid), 1)
}
// Ensure we have some valid fids.
require.True(t, len(fids) > 2)
fid := fids[0]
require.NoError(t, db.vlog.rewrite(db.vlog.filesMap[fid]))
// All data should still be present.
require.Equal(t, int(N), numKeys(db))
db.vlog.discardStats.iterate(func(id, val uint64) {
// Vlog with id=fid has been re-written, it's discard stats should be zero.
if uint32(id) == fid {
require.Zero(t, val)
}
})
})
})
t.Run("after dropall", func(t *testing.T) {
opts := getTestOptions("")
opts.ValueLogFileSize = 5 << 20
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
populate(t, db)
require.Equal(t, int(N), numKeys(db))
// Fill discard stats. Normally these are filled by compaction.
fids := db.vlog.sortedFids()
for _, fid := range fids {
db.vlog.discardStats.Update(uint32(fid), 1)
}
db.vlog.discardStats.iterate(func(id, val uint64) { require.NotZero(t, val) })
require.NoError(t, db.DropAll())
require.Equal(t, 0, numKeys(db))
// We've deleted everything. DS should be zero.
db.vlog.discardStats.iterate(func(id, val uint64) { require.Zero(t, val) })
})
})
}