| /* |
| * Copyright 2017 Dgraph Labs, Inc. and Contributors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package badger |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/hex" |
| "math" |
| "sort" |
| "strconv" |
| "sync" |
| "sync/atomic" |
| |
| "github.com/dgraph-io/badger/v3/y" |
| "github.com/dgraph-io/ristretto/z" |
| "github.com/pkg/errors" |
| ) |
| |
| type oracle struct { |
| isManaged bool // Does not change value, so no locking required. |
| detectConflicts bool // Determines if the txns should be checked for conflicts. |
| |
| sync.Mutex // For nextTxnTs and commits. |
| // writeChLock lock is for ensuring that transactions go to the write |
| // channel in the same order as their commit timestamps. |
| writeChLock sync.Mutex |
| nextTxnTs uint64 |
| |
| // Used to block NewTransaction, so all previous commits are visible to a new read. |
| txnMark *y.WaterMark |
| |
| // Either of these is used to determine which versions can be permanently |
| // discarded during compaction. |
| discardTs uint64 // Used by ManagedDB. |
| readMark *y.WaterMark // Used by DB. |
| |
| // committedTxns contains all committed writes (contains fingerprints |
| // of keys written and their latest commit counter). |
| committedTxns []committedTxn |
| lastCleanupTs uint64 |
| |
| // closer is used to stop watermarks. |
| closer *z.Closer |
| } |
| |
| type committedTxn struct { |
| ts uint64 |
| // ConflictKeys Keeps track of the entries written at timestamp ts. |
| conflictKeys map[uint64]struct{} |
| } |
| |
| func newOracle(opt Options) *oracle { |
| orc := &oracle{ |
| isManaged: opt.managedTxns, |
| detectConflicts: opt.DetectConflicts, |
| // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open. |
| // |
| // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here. |
| // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG. |
| readMark: &y.WaterMark{Name: "badger.PendingReads"}, |
| txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"}, |
| closer: z.NewCloser(2), |
| } |
| orc.readMark.Init(orc.closer) |
| orc.txnMark.Init(orc.closer) |
| return orc |
| } |
| |
| func (o *oracle) Stop() { |
| o.closer.SignalAndWait() |
| } |
| |
| func (o *oracle) readTs() uint64 { |
| if o.isManaged { |
| panic("ReadTs should not be retrieved for managed DB") |
| } |
| |
| var readTs uint64 |
| o.Lock() |
| readTs = o.nextTxnTs - 1 |
| o.readMark.Begin(readTs) |
| o.Unlock() |
| |
| // Wait for all txns which have no conflicts, have been assigned a commit |
| // timestamp and are going through the write to value log and LSM tree |
| // process. Not waiting here could mean that some txns which have been |
| // committed would not be read. |
| y.Check(o.txnMark.WaitForMark(context.Background(), readTs)) |
| return readTs |
| } |
| |
| func (o *oracle) nextTs() uint64 { |
| o.Lock() |
| defer o.Unlock() |
| return o.nextTxnTs |
| } |
| |
| func (o *oracle) incrementNextTs() { |
| o.Lock() |
| defer o.Unlock() |
| o.nextTxnTs++ |
| } |
| |
| // Any deleted or invalid versions at or below ts would be discarded during |
| // compaction to reclaim disk space in LSM tree and thence value log. |
| func (o *oracle) setDiscardTs(ts uint64) { |
| o.Lock() |
| defer o.Unlock() |
| o.discardTs = ts |
| o.cleanupCommittedTransactions() |
| } |
| |
| func (o *oracle) discardAtOrBelow() uint64 { |
| if o.isManaged { |
| o.Lock() |
| defer o.Unlock() |
| return o.discardTs |
| } |
| return o.readMark.DoneUntil() |
| } |
| |
| // hasConflict must be called while having a lock. |
| func (o *oracle) hasConflict(txn *Txn) bool { |
| if len(txn.reads) == 0 { |
| return false |
| } |
| for _, committedTxn := range o.committedTxns { |
| // If the committedTxn.ts is less than txn.readTs that implies that the |
| // committedTxn finished before the current transaction started. |
| // We don't need to check for conflict in that case. |
| // This change assumes linearizability. Lack of linearizability could |
| // cause the read ts of a new txn to be lower than the commit ts of |
| // a txn before it (@mrjn). |
| if committedTxn.ts <= txn.readTs { |
| continue |
| } |
| |
| for _, ro := range txn.reads { |
| if _, has := committedTxn.conflictKeys[ro]; has { |
| return true |
| } |
| } |
| } |
| |
| return false |
| } |
| |
| func (o *oracle) newCommitTs(txn *Txn) uint64 { |
| o.Lock() |
| defer o.Unlock() |
| |
| if o.hasConflict(txn) { |
| return 0 |
| } |
| |
| var ts uint64 |
| if !o.isManaged { |
| o.doneRead(txn) |
| o.cleanupCommittedTransactions() |
| |
| // This is the general case, when user doesn't specify the read and commit ts. |
| ts = o.nextTxnTs |
| o.nextTxnTs++ |
| o.txnMark.Begin(ts) |
| |
| } else { |
| // If commitTs is set, use it instead. |
| ts = txn.commitTs |
| } |
| |
| y.AssertTrue(ts >= o.lastCleanupTs) |
| |
| if o.detectConflicts { |
| // We should ensure that txns are not added to o.committedTxns slice when |
| // conflict detection is disabled otherwise this slice would keep growing. |
| o.committedTxns = append(o.committedTxns, committedTxn{ |
| ts: ts, |
| conflictKeys: txn.conflictKeys, |
| }) |
| } |
| |
| return ts |
| } |
| |
| func (o *oracle) doneRead(txn *Txn) { |
| if !txn.doneRead { |
| txn.doneRead = true |
| o.readMark.Done(txn.readTs) |
| } |
| } |
| |
| func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock |
| if !o.detectConflicts { |
| // When detectConflicts is set to false, we do not store any |
| // committedTxns and so there's nothing to clean up. |
| return |
| } |
| // Same logic as discardAtOrBelow but unlocked |
| var maxReadTs uint64 |
| if o.isManaged { |
| maxReadTs = o.discardTs |
| } else { |
| maxReadTs = o.readMark.DoneUntil() |
| } |
| |
| y.AssertTrue(maxReadTs >= o.lastCleanupTs) |
| |
| // do not run clean up if the maxReadTs (read timestamp of the |
| // oldest transaction that is still in flight) has not increased |
| if maxReadTs == o.lastCleanupTs { |
| return |
| } |
| o.lastCleanupTs = maxReadTs |
| |
| tmp := o.committedTxns[:0] |
| for _, txn := range o.committedTxns { |
| if txn.ts <= maxReadTs { |
| continue |
| } |
| tmp = append(tmp, txn) |
| } |
| o.committedTxns = tmp |
| } |
| |
| func (o *oracle) doneCommit(cts uint64) { |
| if o.isManaged { |
| // No need to update anything. |
| return |
| } |
| o.txnMark.Done(cts) |
| } |
| |
| // Txn represents a Badger transaction. |
| type Txn struct { |
| readTs uint64 |
| commitTs uint64 |
| size int64 |
| count int64 |
| db *DB |
| |
| reads []uint64 // contains fingerprints of keys read. |
| // contains fingerprints of keys written. This is used for conflict detection. |
| conflictKeys map[uint64]struct{} |
| readsLock sync.Mutex // guards the reads slice. See addReadKey. |
| |
| pendingWrites map[string]*Entry // cache stores any writes done by txn. |
| duplicateWrites []*Entry // Used in managed mode to store duplicate entries. |
| |
| numIterators int32 |
| discarded bool |
| doneRead bool |
| update bool // update is used to conditionally keep track of reads. |
| } |
| |
| type pendingWritesIterator struct { |
| entries []*Entry |
| nextIdx int |
| readTs uint64 |
| reversed bool |
| } |
| |
| func (pi *pendingWritesIterator) Next() { |
| pi.nextIdx++ |
| } |
| |
| func (pi *pendingWritesIterator) Rewind() { |
| pi.nextIdx = 0 |
| } |
| |
| func (pi *pendingWritesIterator) Seek(key []byte) { |
| key = y.ParseKey(key) |
| pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool { |
| cmp := bytes.Compare(pi.entries[idx].Key, key) |
| if !pi.reversed { |
| return cmp >= 0 |
| } |
| return cmp <= 0 |
| }) |
| } |
| |
| func (pi *pendingWritesIterator) Key() []byte { |
| y.AssertTrue(pi.Valid()) |
| entry := pi.entries[pi.nextIdx] |
| return y.KeyWithTs(entry.Key, pi.readTs) |
| } |
| |
| func (pi *pendingWritesIterator) Value() y.ValueStruct { |
| y.AssertTrue(pi.Valid()) |
| entry := pi.entries[pi.nextIdx] |
| return y.ValueStruct{ |
| Value: entry.Value, |
| Meta: entry.meta, |
| UserMeta: entry.UserMeta, |
| ExpiresAt: entry.ExpiresAt, |
| Version: pi.readTs, |
| } |
| } |
| |
| func (pi *pendingWritesIterator) Valid() bool { |
| return pi.nextIdx < len(pi.entries) |
| } |
| |
| func (pi *pendingWritesIterator) Close() error { |
| return nil |
| } |
| |
| func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator { |
| if !txn.update || len(txn.pendingWrites) == 0 { |
| return nil |
| } |
| entries := make([]*Entry, 0, len(txn.pendingWrites)) |
| for _, e := range txn.pendingWrites { |
| entries = append(entries, e) |
| } |
| // Number of pending writes per transaction shouldn't be too big in general. |
| sort.Slice(entries, func(i, j int) bool { |
| cmp := bytes.Compare(entries[i].Key, entries[j].Key) |
| if !reversed { |
| return cmp < 0 |
| } |
| return cmp > 0 |
| }) |
| return &pendingWritesIterator{ |
| readTs: txn.readTs, |
| entries: entries, |
| reversed: reversed, |
| } |
| } |
| |
| func (txn *Txn) checkSize(e *Entry) error { |
| count := txn.count + 1 |
| // Extra bytes for the version in key. |
| size := txn.size + int64(e.estimateSize(txn.db.opt.ValueThreshold)) + 10 |
| if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize { |
| return ErrTxnTooBig |
| } |
| txn.count, txn.size = count, size |
| return nil |
| } |
| |
| func exceedsSize(prefix string, max int64, key []byte) error { |
| return errors.Errorf("%s with size %d exceeded %d limit. %s:\n%s", |
| prefix, len(key), max, prefix, hex.Dump(key[:1<<10])) |
| } |
| |
| func (txn *Txn) modify(e *Entry) error { |
| const maxKeySize = 65000 |
| |
| switch { |
| case !txn.update: |
| return ErrReadOnlyTxn |
| case txn.discarded: |
| return ErrDiscardedTxn |
| case len(e.Key) == 0: |
| return ErrEmptyKey |
| case bytes.HasPrefix(e.Key, badgerPrefix): |
| return ErrInvalidKey |
| case len(e.Key) > maxKeySize: |
| // Key length can't be more than uint16, as determined by table::header. To |
| // keep things safe and allow badger move prefix and a timestamp suffix, let's |
| // cut it down to 65000, instead of using 65536. |
| return exceedsSize("Key", maxKeySize, e.Key) |
| case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize: |
| return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value) |
| case txn.db.opt.InMemory && len(e.Value) > txn.db.opt.ValueThreshold: |
| return exceedsSize("Value", int64(txn.db.opt.ValueThreshold), e.Value) |
| } |
| |
| if err := txn.checkSize(e); err != nil { |
| return err |
| } |
| |
| // The txn.conflictKeys is used for conflict detection. If conflict detection |
| // is disabled, we don't need to store key hashes in this map. |
| if txn.db.opt.DetectConflicts { |
| fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. |
| txn.conflictKeys[fp] = struct{}{} |
| } |
| // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice. |
| // Add the entry to duplicateWrites only if both the entries have different versions. For |
| // same versions, we will overwrite the existing entry. |
| if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version { |
| txn.duplicateWrites = append(txn.duplicateWrites, oldEntry) |
| } |
| txn.pendingWrites[string(e.Key)] = e |
| return nil |
| } |
| |
| // Set adds a key-value pair to the database. |
| // It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction. |
| // |
| // The current transaction keeps a reference to the key and val byte slice |
| // arguments. Users must not modify key and val until the end of the transaction. |
| func (txn *Txn) Set(key, val []byte) error { |
| return txn.SetEntry(NewEntry(key, val)) |
| } |
| |
| // SetEntry takes an Entry struct and adds the key-value pair in the struct, |
| // along with other metadata to the database. |
| // |
| // The current transaction keeps a reference to the entry passed in argument. |
| // Users must not modify the entry until the end of the transaction. |
| func (txn *Txn) SetEntry(e *Entry) error { |
| return txn.modify(e) |
| } |
| |
| // Delete deletes a key. |
| // |
| // This is done by adding a delete marker for the key at commit timestamp. Any |
| // reads happening before this timestamp would be unaffected. Any reads after |
| // this commit would see the deletion. |
| // |
| // The current transaction keeps a reference to the key byte slice argument. |
| // Users must not modify the key until the end of the transaction. |
| func (txn *Txn) Delete(key []byte) error { |
| e := &Entry{ |
| Key: key, |
| meta: bitDelete, |
| } |
| return txn.modify(e) |
| } |
| |
| // Get looks for key and returns corresponding Item. |
| // If key is not found, ErrKeyNotFound is returned. |
| func (txn *Txn) Get(key []byte) (item *Item, rerr error) { |
| if len(key) == 0 { |
| return nil, ErrEmptyKey |
| } else if txn.discarded { |
| return nil, ErrDiscardedTxn |
| } |
| |
| item = new(Item) |
| if txn.update { |
| if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) { |
| if isDeletedOrExpired(e.meta, e.ExpiresAt) { |
| return nil, ErrKeyNotFound |
| } |
| // Fulfill from cache. |
| item.meta = e.meta |
| item.val = e.Value |
| item.userMeta = e.UserMeta |
| item.key = key |
| item.status = prefetched |
| item.version = txn.readTs |
| item.expiresAt = e.ExpiresAt |
| // We probably don't need to set db on item here. |
| return item, nil |
| } |
| // Only track reads if this is update txn. No need to track read if txn serviced it |
| // internally. |
| txn.addReadKey(key) |
| } |
| |
| seek := y.KeyWithTs(key, txn.readTs) |
| vs, err := txn.db.get(seek) |
| if err != nil { |
| return nil, y.Wrapf(err, "DB::Get key: %q", key) |
| } |
| if vs.Value == nil && vs.Meta == 0 { |
| return nil, ErrKeyNotFound |
| } |
| if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) { |
| return nil, ErrKeyNotFound |
| } |
| |
| item.key = key |
| item.version = vs.Version |
| item.meta = vs.Meta |
| item.userMeta = vs.UserMeta |
| item.vptr = y.SafeCopy(item.vptr, vs.Value) |
| item.txn = txn |
| item.expiresAt = vs.ExpiresAt |
| return item, nil |
| } |
| |
| func (txn *Txn) addReadKey(key []byte) { |
| if txn.update { |
| fp := z.MemHash(key) |
| |
| // Because of the possibility of multiple iterators it is now possible |
| // for multiple threads within a read-write transaction to read keys at |
| // the same time. The reads slice is not currently thread-safe and |
| // needs to be locked whenever we mark a key as read. |
| txn.readsLock.Lock() |
| txn.reads = append(txn.reads, fp) |
| txn.readsLock.Unlock() |
| } |
| } |
| |
| // Discard discards a created transaction. This method is very important and must be called. Commit |
| // method calls this internally, however, calling this multiple times doesn't cause any issues. So, |
| // this can safely be called via a defer right when transaction is created. |
| // |
| // NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned. |
| func (txn *Txn) Discard() { |
| if txn.discarded { // Avoid a re-run. |
| return |
| } |
| if atomic.LoadInt32(&txn.numIterators) > 0 { |
| panic("Unclosed iterator at time of Txn.Discard.") |
| } |
| txn.discarded = true |
| if !txn.db.orc.isManaged { |
| txn.db.orc.doneRead(txn) |
| } |
| } |
| |
| func (txn *Txn) commitAndSend() (func() error, error) { |
| orc := txn.db.orc |
| // Ensure that the order in which we get the commit timestamp is the same as |
| // the order in which we push these updates to the write channel. So, we |
| // acquire a writeChLock before getting a commit timestamp, and only release |
| // it after pushing the entries to it. |
| orc.writeChLock.Lock() |
| defer orc.writeChLock.Unlock() |
| |
| commitTs := orc.newCommitTs(txn) |
| // The commitTs can be zero if the transaction is running in managed mode. |
| // Individual entries might have their own timestamps. |
| if commitTs == 0 && !txn.db.opt.managedTxns { |
| return nil, ErrConflict |
| } |
| |
| keepTogether := true |
| setVersion := func(e *Entry) { |
| if e.version == 0 { |
| e.version = commitTs |
| } else { |
| keepTogether = false |
| } |
| } |
| for _, e := range txn.pendingWrites { |
| setVersion(e) |
| } |
| // The duplicateWrites slice will be non-empty only if there are duplicate |
| // entries with different versions. |
| for _, e := range txn.duplicateWrites { |
| setVersion(e) |
| } |
| |
| entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1) |
| |
| processEntry := func(e *Entry) { |
| // Suffix the keys with commit ts, so the key versions are sorted in |
| // descending order of commit timestamp. |
| e.Key = y.KeyWithTs(e.Key, e.version) |
| // Add bitTxn only if these entries are part of a transaction. We |
| // support SetEntryAt(..) in managed mode which means a single |
| // transaction can have entries with different timestamps. If entries |
| // in a single transaction have different timestamps, we don't add the |
| // transaction markers. |
| if keepTogether { |
| e.meta |= bitTxn |
| } |
| entries = append(entries, e) |
| } |
| |
| // The following debug information is what led to determining the cause of |
| // bank txn violation bug, and it took a whole bunch of effort to narrow it |
| // down to here. So, keep this around for at least a couple of months. |
| // var b strings.Builder |
| // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ", |
| // txn.readTs, commitTs, txn.reads, txn.conflictKeys) |
| for _, e := range txn.pendingWrites { |
| processEntry(e) |
| } |
| for _, e := range txn.duplicateWrites { |
| processEntry(e) |
| } |
| |
| if keepTogether { |
| // CommitTs should not be zero if we're inserting transaction markers. |
| y.AssertTrue(commitTs != 0) |
| e := &Entry{ |
| Key: y.KeyWithTs(txnKey, commitTs), |
| Value: []byte(strconv.FormatUint(commitTs, 10)), |
| meta: bitFinTxn, |
| } |
| entries = append(entries, e) |
| } |
| |
| req, err := txn.db.sendToWriteCh(entries) |
| if err != nil { |
| orc.doneCommit(commitTs) |
| return nil, err |
| } |
| ret := func() error { |
| err := req.Wait() |
| // Wait before marking commitTs as done. |
| // We can't defer doneCommit above, because it is being called from a |
| // callback here. |
| orc.doneCommit(commitTs) |
| return err |
| } |
| return ret, nil |
| } |
| |
| func (txn *Txn) commitPrecheck() error { |
| if txn.discarded { |
| return errors.New("Trying to commit a discarded txn") |
| } |
| keepTogether := true |
| for _, e := range txn.pendingWrites { |
| if e.version != 0 { |
| keepTogether = false |
| } |
| } |
| |
| // If keepTogether is True, it implies transaction markers will be added. |
| // In that case, commitTs should not be never be zero. This might happen if |
| // someone uses txn.Commit instead of txn.CommitAt in managed mode. This |
| // should happen only in managed mode. In normal mode, keepTogether will |
| // always be true. |
| if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 { |
| return errors.New("CommitTs cannot be zero. Please use commitAt instead") |
| } |
| return nil |
| } |
| |
| // Commit commits the transaction, following these steps: |
| // |
| // 1. If there are no writes, return immediately. |
| // |
| // 2. Check if read rows were updated since txn started. If so, return ErrConflict. |
| // |
| // 3. If no conflict, generate a commit timestamp and update written rows' commit ts. |
| // |
| // 4. Batch up all writes, write them to value log and LSM tree. |
| // |
| // 5. If callback is provided, Badger will return immediately after checking |
| // for conflicts. Writes to the database will happen in the background. If |
| // there is a conflict, an error will be returned and the callback will not |
| // run. If there are no conflicts, the callback will be called in the |
| // background upon successful completion of writes or any error during write. |
| // |
| // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM |
| // tree won't be updated, so there's no need for any rollback. |
| func (txn *Txn) Commit() error { |
| // txn.conflictKeys can be zero if conflict detection is turned off. So we |
| // should check txn.pendingWrites. |
| if len(txn.pendingWrites) == 0 { |
| return nil // Nothing to do. |
| } |
| // Precheck before discarding txn. |
| if err := txn.commitPrecheck(); err != nil { |
| return err |
| } |
| defer txn.Discard() |
| |
| txnCb, err := txn.commitAndSend() |
| if err != nil { |
| return err |
| } |
| // If batchSet failed, LSM would not have been updated. So, no need to rollback anything. |
| |
| // TODO: What if some of the txns successfully make it to value log, but others fail. |
| // Nothing gets updated to LSM, until a restart happens. |
| return txnCb() |
| } |
| |
| type txnCb struct { |
| commit func() error |
| user func(error) |
| err error |
| } |
| |
| func runTxnCallback(cb *txnCb) { |
| switch { |
| case cb == nil: |
| panic("txn callback is nil") |
| case cb.user == nil: |
| panic("Must have caught a nil callback for txn.CommitWith") |
| case cb.err != nil: |
| cb.user(cb.err) |
| case cb.commit != nil: |
| err := cb.commit() |
| cb.user(err) |
| default: |
| cb.user(nil) |
| } |
| } |
| |
| // CommitWith acts like Commit, but takes a callback, which gets run via a |
| // goroutine to avoid blocking this function. The callback is guaranteed to run, |
| // so it is safe to increment sync.WaitGroup before calling CommitWith, and |
| // decrementing it in the callback; to block until all callbacks are run. |
| func (txn *Txn) CommitWith(cb func(error)) { |
| if cb == nil { |
| panic("Nil callback provided to CommitWith") |
| } |
| |
| if len(txn.pendingWrites) == 0 { |
| // Do not run these callbacks from here, because the CommitWith and the |
| // callback might be acquiring the same locks. Instead run the callback |
| // from another goroutine. |
| go runTxnCallback(&txnCb{user: cb, err: nil}) |
| return |
| } |
| |
| // Precheck before discarding txn. |
| if err := txn.commitPrecheck(); err != nil { |
| cb(err) |
| return |
| } |
| |
| defer txn.Discard() |
| |
| commitCb, err := txn.commitAndSend() |
| if err != nil { |
| go runTxnCallback(&txnCb{user: cb, err: err}) |
| return |
| } |
| |
| go runTxnCallback(&txnCb{user: cb, commit: commitCb}) |
| } |
| |
| // ReadTs returns the read timestamp of the transaction. |
| func (txn *Txn) ReadTs() uint64 { |
| return txn.readTs |
| } |
| |
| // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions, |
| // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking |
| // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by |
| // another transaction. |
| // |
| // For read-only transactions, set update to false. In this mode, we don't track the rows read for |
| // any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead. |
| // |
| // Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and |
| // should only be run serially. It doesn't matter if a transaction is created by one goroutine and |
| // passed down to other, as long as the Txn APIs are called serially. |
| // |
| // When you create a new transaction, it is absolutely essential to call |
| // Discard(). This should be done irrespective of what the update param is set |
| // to. Commit API internally runs Discard, but running it twice wouldn't cause |
| // any issues. |
| // |
| // txn := db.NewTransaction(false) |
| // defer txn.Discard() |
| // // Call various APIs. |
| func (db *DB) NewTransaction(update bool) *Txn { |
| return db.newTransaction(update, false) |
| } |
| |
| func (db *DB) newTransaction(update, isManaged bool) *Txn { |
| if db.opt.ReadOnly && update { |
| // DB is read-only, force read-only transaction. |
| update = false |
| } |
| |
| txn := &Txn{ |
| update: update, |
| db: db, |
| count: 1, // One extra entry for BitFin. |
| size: int64(len(txnKey) + 10), // Some buffer for the extra entry. |
| } |
| if update { |
| if db.opt.DetectConflicts { |
| txn.conflictKeys = make(map[uint64]struct{}) |
| } |
| txn.pendingWrites = make(map[string]*Entry) |
| } |
| if !isManaged { |
| txn.readTs = db.orc.readTs() |
| } |
| return txn |
| } |
| |
| // View executes a function creating and managing a read-only transaction for the user. Error |
| // returned by the function is relayed by the View method. |
| // If View is used with managed transactions, it would assume a read timestamp of MaxUint64. |
| func (db *DB) View(fn func(txn *Txn) error) error { |
| if db.IsClosed() { |
| return ErrDBClosed |
| } |
| var txn *Txn |
| if db.opt.managedTxns { |
| txn = db.NewTransactionAt(math.MaxUint64, false) |
| } else { |
| txn = db.NewTransaction(false) |
| } |
| defer txn.Discard() |
| |
| return fn(txn) |
| } |
| |
| // Update executes a function, creating and managing a read-write transaction |
| // for the user. Error returned by the function is relayed by the Update method. |
| // Update cannot be used with managed transactions. |
| func (db *DB) Update(fn func(txn *Txn) error) error { |
| if db.IsClosed() { |
| return ErrDBClosed |
| } |
| if db.opt.managedTxns { |
| panic("Update can only be used with managedDB=false.") |
| } |
| txn := db.NewTransaction(true) |
| defer txn.Discard() |
| |
| if err := fn(txn); err != nil { |
| return err |
| } |
| |
| return txn.Commit() |
| } |