blob: 109dec6b984d38fc1ee39aeffd1e4d7107c9f61d [file] [log] [blame]
* Copyright 2017 Dgraph Labs, Inc. and Contributors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package badger
import (
type oracle struct {
isManaged bool // Does not change value, so no locking required.
detectConflicts bool // Determines if the txns should be checked for conflicts.
sync.Mutex // For nextTxnTs and commits.
// writeChLock lock is for ensuring that transactions go to the write
// channel in the same order as their commit timestamps.
writeChLock sync.Mutex
nextTxnTs uint64
// Used to block NewTransaction, so all previous commits are visible to a new read.
txnMark *y.WaterMark
// Either of these is used to determine which versions can be permanently
// discarded during compaction.
discardTs uint64 // Used by ManagedDB.
readMark *y.WaterMark // Used by DB.
// committedTxns contains all committed writes (contains fingerprints
// of keys written and their latest commit counter).
committedTxns []committedTxn
lastCleanupTs uint64
// closer is used to stop watermarks.
closer *z.Closer
type committedTxn struct {
ts uint64
// ConflictKeys Keeps track of the entries written at timestamp ts.
conflictKeys map[uint64]struct{}
func newOracle(opt Options) *oracle {
orc := &oracle{
isManaged: opt.managedTxns,
detectConflicts: opt.DetectConflicts,
// We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
// WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
// See
readMark: &y.WaterMark{Name: "badger.PendingReads"},
txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
closer: z.NewCloser(2),
return orc
func (o *oracle) Stop() {
func (o *oracle) readTs() uint64 {
if o.isManaged {
panic("ReadTs should not be retrieved for managed DB")
var readTs uint64
readTs = o.nextTxnTs - 1
// Wait for all txns which have no conflicts, have been assigned a commit
// timestamp and are going through the write to value log and LSM tree
// process. Not waiting here could mean that some txns which have been
// committed would not be read.
y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
return readTs
func (o *oracle) nextTs() uint64 {
defer o.Unlock()
return o.nextTxnTs
func (o *oracle) incrementNextTs() {
defer o.Unlock()
// Any deleted or invalid versions at or below ts would be discarded during
// compaction to reclaim disk space in LSM tree and thence value log.
func (o *oracle) setDiscardTs(ts uint64) {
defer o.Unlock()
o.discardTs = ts
func (o *oracle) discardAtOrBelow() uint64 {
if o.isManaged {
defer o.Unlock()
return o.discardTs
return o.readMark.DoneUntil()
// hasConflict must be called while having a lock.
func (o *oracle) hasConflict(txn *Txn) bool {
if len(txn.reads) == 0 {
return false
for _, committedTxn := range o.committedTxns {
// If the committedTxn.ts is less than txn.readTs that implies that the
// committedTxn finished before the current transaction started.
// We don't need to check for conflict in that case.
// This change assumes linearizability. Lack of linearizability could
// cause the read ts of a new txn to be lower than the commit ts of
// a txn before it (@mrjn).
if committedTxn.ts <= txn.readTs {
for _, ro := range txn.reads {
if _, has := committedTxn.conflictKeys[ro]; has {
return true
return false
func (o *oracle) newCommitTs(txn *Txn) uint64 {
defer o.Unlock()
if o.hasConflict(txn) {
return 0
var ts uint64
if !o.isManaged {
// This is the general case, when user doesn't specify the read and commit ts.
ts = o.nextTxnTs
} else {
// If commitTs is set, use it instead.
ts = txn.commitTs
y.AssertTrue(ts >= o.lastCleanupTs)
if o.detectConflicts {
// We should ensure that txns are not added to o.committedTxns slice when
// conflict detection is disabled otherwise this slice would keep growing.
o.committedTxns = append(o.committedTxns, committedTxn{
ts: ts,
conflictKeys: txn.conflictKeys,
return ts
func (o *oracle) doneRead(txn *Txn) {
if !txn.doneRead {
txn.doneRead = true
func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
if !o.detectConflicts {
// When detectConflicts is set to false, we do not store any
// committedTxns and so there's nothing to clean up.
// Same logic as discardAtOrBelow but unlocked
var maxReadTs uint64
if o.isManaged {
maxReadTs = o.discardTs
} else {
maxReadTs = o.readMark.DoneUntil()
y.AssertTrue(maxReadTs >= o.lastCleanupTs)
// do not run clean up if the maxReadTs (read timestamp of the
// oldest transaction that is still in flight) has not increased
if maxReadTs == o.lastCleanupTs {
o.lastCleanupTs = maxReadTs
tmp := o.committedTxns[:0]
for _, txn := range o.committedTxns {
if txn.ts <= maxReadTs {
tmp = append(tmp, txn)
o.committedTxns = tmp
func (o *oracle) doneCommit(cts uint64) {
if o.isManaged {
// No need to update anything.
// Txn represents a Badger transaction.
type Txn struct {
readTs uint64
commitTs uint64
size int64
count int64
db *DB
reads []uint64 // contains fingerprints of keys read.
// contains fingerprints of keys written. This is used for conflict detection.
conflictKeys map[uint64]struct{}
readsLock sync.Mutex // guards the reads slice. See addReadKey.
pendingWrites map[string]*Entry // cache stores any writes done by txn.
duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
numIterators int32
discarded bool
doneRead bool
update bool // update is used to conditionally keep track of reads.
type pendingWritesIterator struct {
entries []*Entry
nextIdx int
readTs uint64
reversed bool
func (pi *pendingWritesIterator) Next() {
func (pi *pendingWritesIterator) Rewind() {
pi.nextIdx = 0
func (pi *pendingWritesIterator) Seek(key []byte) {
key = y.ParseKey(key)
pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool {
cmp := bytes.Compare(pi.entries[idx].Key, key)
if !pi.reversed {
return cmp >= 0
return cmp <= 0
func (pi *pendingWritesIterator) Key() []byte {
entry := pi.entries[pi.nextIdx]
return y.KeyWithTs(entry.Key, pi.readTs)
func (pi *pendingWritesIterator) Value() y.ValueStruct {
entry := pi.entries[pi.nextIdx]
return y.ValueStruct{
Value: entry.Value,
Meta: entry.meta,
UserMeta: entry.UserMeta,
ExpiresAt: entry.ExpiresAt,
Version: pi.readTs,
func (pi *pendingWritesIterator) Valid() bool {
return pi.nextIdx < len(pi.entries)
func (pi *pendingWritesIterator) Close() error {
return nil
func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
if !txn.update || len(txn.pendingWrites) == 0 {
return nil
entries := make([]*Entry, 0, len(txn.pendingWrites))
for _, e := range txn.pendingWrites {
entries = append(entries, e)
// Number of pending writes per transaction shouldn't be too big in general.
sort.Slice(entries, func(i, j int) bool {
cmp := bytes.Compare(entries[i].Key, entries[j].Key)
if !reversed {
return cmp < 0
return cmp > 0
return &pendingWritesIterator{
readTs: txn.readTs,
entries: entries,
reversed: reversed,
func (txn *Txn) checkSize(e *Entry) error {
count := txn.count + 1
// Extra bytes for the version in key.
size := txn.size + int64(e.estimateSize(txn.db.opt.ValueThreshold)) + 10
if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
return ErrTxnTooBig
txn.count, txn.size = count, size
return nil
func exceedsSize(prefix string, max int64, key []byte) error {
return errors.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
func (txn *Txn) modify(e *Entry) error {
const maxKeySize = 65000
switch {
case !txn.update:
return ErrReadOnlyTxn
case txn.discarded:
return ErrDiscardedTxn
case len(e.Key) == 0:
return ErrEmptyKey
case bytes.HasPrefix(e.Key, badgerPrefix):
return ErrInvalidKey
case len(e.Key) > maxKeySize:
// Key length can't be more than uint16, as determined by table::header. To
// keep things safe and allow badger move prefix and a timestamp suffix, let's
// cut it down to 65000, instead of using 65536.
return exceedsSize("Key", maxKeySize, e.Key)
case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
case txn.db.opt.InMemory && len(e.Value) > txn.db.opt.ValueThreshold:
return exceedsSize("Value", int64(txn.db.opt.ValueThreshold), e.Value)
if err := txn.checkSize(e); err != nil {
return err
// The txn.conflictKeys is used for conflict detection. If conflict detection
// is disabled, we don't need to store key hashes in this map.
if txn.db.opt.DetectConflicts {
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = struct{}{}
// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
// Add the entry to duplicateWrites only if both the entries have different versions. For
// same versions, we will overwrite the existing entry.
if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version {
txn.duplicateWrites = append(txn.duplicateWrites, oldEntry)
txn.pendingWrites[string(e.Key)] = e
return nil
// Set adds a key-value pair to the database.
// It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction.
// The current transaction keeps a reference to the key and val byte slice
// arguments. Users must not modify key and val until the end of the transaction.
func (txn *Txn) Set(key, val []byte) error {
return txn.SetEntry(NewEntry(key, val))
// SetEntry takes an Entry struct and adds the key-value pair in the struct,
// along with other metadata to the database.
// The current transaction keeps a reference to the entry passed in argument.
// Users must not modify the entry until the end of the transaction.
func (txn *Txn) SetEntry(e *Entry) error {
return txn.modify(e)
// Delete deletes a key.
// This is done by adding a delete marker for the key at commit timestamp. Any
// reads happening before this timestamp would be unaffected. Any reads after
// this commit would see the deletion.
// The current transaction keeps a reference to the key byte slice argument.
// Users must not modify the key until the end of the transaction.
func (txn *Txn) Delete(key []byte) error {
e := &Entry{
Key: key,
meta: bitDelete,
return txn.modify(e)
// Get looks for key and returns corresponding Item.
// If key is not found, ErrKeyNotFound is returned.
func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
if len(key) == 0 {
return nil, ErrEmptyKey
} else if txn.discarded {
return nil, ErrDiscardedTxn
item = new(Item)
if txn.update {
if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) {
if isDeletedOrExpired(e.meta, e.ExpiresAt) {
return nil, ErrKeyNotFound
// Fulfill from cache.
item.meta = e.meta
item.val = e.Value
item.userMeta = e.UserMeta
item.key = key
item.status = prefetched
item.version = txn.readTs
item.expiresAt = e.ExpiresAt
// We probably don't need to set db on item here.
return item, nil
// Only track reads if this is update txn. No need to track read if txn serviced it
// internally.
seek := y.KeyWithTs(key, txn.readTs)
vs, err := txn.db.get(seek)
if err != nil {
return nil, y.Wrapf(err, "DB::Get key: %q", key)
if vs.Value == nil && vs.Meta == 0 {
return nil, ErrKeyNotFound
if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
return nil, ErrKeyNotFound
item.key = key
item.version = vs.Version
item.meta = vs.Meta
item.userMeta = vs.UserMeta
item.vptr = y.SafeCopy(item.vptr, vs.Value)
item.txn = txn
item.expiresAt = vs.ExpiresAt
return item, nil
func (txn *Txn) addReadKey(key []byte) {
if txn.update {
fp := z.MemHash(key)
// Because of the possibility of multiple iterators it is now possible
// for multiple threads within a read-write transaction to read keys at
// the same time. The reads slice is not currently thread-safe and
// needs to be locked whenever we mark a key as read.
txn.reads = append(txn.reads, fp)
// Discard discards a created transaction. This method is very important and must be called. Commit
// method calls this internally, however, calling this multiple times doesn't cause any issues. So,
// this can safely be called via a defer right when transaction is created.
// NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
func (txn *Txn) Discard() {
if txn.discarded { // Avoid a re-run.
if atomic.LoadInt32(&txn.numIterators) > 0 {
panic("Unclosed iterator at time of Txn.Discard.")
txn.discarded = true
if !txn.db.orc.isManaged {
func (txn *Txn) commitAndSend() (func() error, error) {
orc := txn.db.orc
// Ensure that the order in which we get the commit timestamp is the same as
// the order in which we push these updates to the write channel. So, we
// acquire a writeChLock before getting a commit timestamp, and only release
// it after pushing the entries to it.
defer orc.writeChLock.Unlock()
commitTs := orc.newCommitTs(txn)
// The commitTs can be zero if the transaction is running in managed mode.
// Individual entries might have their own timestamps.
if commitTs == 0 && !txn.db.opt.managedTxns {
return nil, ErrConflict
keepTogether := true
setVersion := func(e *Entry) {
if e.version == 0 {
e.version = commitTs
} else {
keepTogether = false
for _, e := range txn.pendingWrites {
// The duplicateWrites slice will be non-empty only if there are duplicate
// entries with different versions.
for _, e := range txn.duplicateWrites {
entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
processEntry := func(e *Entry) {
// Suffix the keys with commit ts, so the key versions are sorted in
// descending order of commit timestamp.
e.Key = y.KeyWithTs(e.Key, e.version)
// Add bitTxn only if these entries are part of a transaction. We
// support SetEntryAt(..) in managed mode which means a single
// transaction can have entries with different timestamps. If entries
// in a single transaction have different timestamps, we don't add the
// transaction markers.
if keepTogether {
e.meta |= bitTxn
entries = append(entries, e)
// The following debug information is what led to determining the cause of
// bank txn violation bug, and it took a whole bunch of effort to narrow it
// down to here. So, keep this around for at least a couple of months.
// var b strings.Builder
// fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
// txn.readTs, commitTs, txn.reads, txn.conflictKeys)
for _, e := range txn.pendingWrites {
for _, e := range txn.duplicateWrites {
if keepTogether {
// CommitTs should not be zero if we're inserting transaction markers.
y.AssertTrue(commitTs != 0)
e := &Entry{
Key: y.KeyWithTs(txnKey, commitTs),
Value: []byte(strconv.FormatUint(commitTs, 10)),
meta: bitFinTxn,
entries = append(entries, e)
req, err := txn.db.sendToWriteCh(entries)
if err != nil {
return nil, err
ret := func() error {
err := req.Wait()
// Wait before marking commitTs as done.
// We can't defer doneCommit above, because it is being called from a
// callback here.
return err
return ret, nil
func (txn *Txn) commitPrecheck() error {
if txn.discarded {
return errors.New("Trying to commit a discarded txn")
keepTogether := true
for _, e := range txn.pendingWrites {
if e.version != 0 {
keepTogether = false
// If keepTogether is True, it implies transaction markers will be added.
// In that case, commitTs should not be never be zero. This might happen if
// someone uses txn.Commit instead of txn.CommitAt in managed mode. This
// should happen only in managed mode. In normal mode, keepTogether will
// always be true.
if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
return errors.New("CommitTs cannot be zero. Please use commitAt instead")
return nil
// Commit commits the transaction, following these steps:
// 1. If there are no writes, return immediately.
// 2. Check if read rows were updated since txn started. If so, return ErrConflict.
// 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
// 4. Batch up all writes, write them to value log and LSM tree.
// 5. If callback is provided, Badger will return immediately after checking
// for conflicts. Writes to the database will happen in the background. If
// there is a conflict, an error will be returned and the callback will not
// run. If there are no conflicts, the callback will be called in the
// background upon successful completion of writes or any error during write.
// If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
// tree won't be updated, so there's no need for any rollback.
func (txn *Txn) Commit() error {
// txn.conflictKeys can be zero if conflict detection is turned off. So we
// should check txn.pendingWrites.
if len(txn.pendingWrites) == 0 {
return nil // Nothing to do.
// Precheck before discarding txn.
if err := txn.commitPrecheck(); err != nil {
return err
defer txn.Discard()
txnCb, err := txn.commitAndSend()
if err != nil {
return err
// If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
// TODO: What if some of the txns successfully make it to value log, but others fail.
// Nothing gets updated to LSM, until a restart happens.
return txnCb()
type txnCb struct {
commit func() error
user func(error)
err error
func runTxnCallback(cb *txnCb) {
switch {
case cb == nil:
panic("txn callback is nil")
case cb.user == nil:
panic("Must have caught a nil callback for txn.CommitWith")
case cb.err != nil:
case cb.commit != nil:
err := cb.commit()
// CommitWith acts like Commit, but takes a callback, which gets run via a
// goroutine to avoid blocking this function. The callback is guaranteed to run,
// so it is safe to increment sync.WaitGroup before calling CommitWith, and
// decrementing it in the callback; to block until all callbacks are run.
func (txn *Txn) CommitWith(cb func(error)) {
if cb == nil {
panic("Nil callback provided to CommitWith")
if len(txn.pendingWrites) == 0 {
// Do not run these callbacks from here, because the CommitWith and the
// callback might be acquiring the same locks. Instead run the callback
// from another goroutine.
go runTxnCallback(&txnCb{user: cb, err: nil})
// Precheck before discarding txn.
if err := txn.commitPrecheck(); err != nil {
defer txn.Discard()
commitCb, err := txn.commitAndSend()
if err != nil {
go runTxnCallback(&txnCb{user: cb, err: err})
go runTxnCallback(&txnCb{user: cb, commit: commitCb})
// ReadTs returns the read timestamp of the transaction.
func (txn *Txn) ReadTs() uint64 {
return txn.readTs
// NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
// providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
// the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
// another transaction.
// For read-only transactions, set update to false. In this mode, we don't track the rows read for
// any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
// Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
// should only be run serially. It doesn't matter if a transaction is created by one goroutine and
// passed down to other, as long as the Txn APIs are called serially.
// When you create a new transaction, it is absolutely essential to call
// Discard(). This should be done irrespective of what the update param is set
// to. Commit API internally runs Discard, but running it twice wouldn't cause
// any issues.
// txn := db.NewTransaction(false)
// defer txn.Discard()
// // Call various APIs.
func (db *DB) NewTransaction(update bool) *Txn {
return db.newTransaction(update, false)
func (db *DB) newTransaction(update, isManaged bool) *Txn {
if db.opt.ReadOnly && update {
// DB is read-only, force read-only transaction.
update = false
txn := &Txn{
update: update,
db: db,
count: 1, // One extra entry for BitFin.
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
if update {
if db.opt.DetectConflicts {
txn.conflictKeys = make(map[uint64]struct{})
txn.pendingWrites = make(map[string]*Entry)
if !isManaged {
txn.readTs = db.orc.readTs()
return txn
// View executes a function creating and managing a read-only transaction for the user. Error
// returned by the function is relayed by the View method.
// If View is used with managed transactions, it would assume a read timestamp of MaxUint64.
func (db *DB) View(fn func(txn *Txn) error) error {
if db.IsClosed() {
return ErrDBClosed
var txn *Txn
if db.opt.managedTxns {
txn = db.NewTransactionAt(math.MaxUint64, false)
} else {
txn = db.NewTransaction(false)
defer txn.Discard()
return fn(txn)
// Update executes a function, creating and managing a read-write transaction
// for the user. Error returned by the function is relayed by the Update method.
// Update cannot be used with managed transactions.
func (db *DB) Update(fn func(txn *Txn) error) error {
if db.IsClosed() {
return ErrDBClosed
if db.opt.managedTxns {
panic("Update can only be used with managedDB=false.")
txn := db.NewTransaction(true)
defer txn.Discard()
if err := fn(txn); err != nil {
return err
return txn.Commit()