Skip to content

Commit

Permalink
mvcc: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyih committed Jun 13, 2019
1 parent 2a9320e commit a823555
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
8 changes: 5 additions & 3 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
)

type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
Expand Down Expand Up @@ -200,7 +201,7 @@ func (b *backend) ReadTx() ReadTx { return b.readTx }
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock()
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done.
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
return &concurrentReadTx{
Expand Down Expand Up @@ -516,9 +517,10 @@ func (b *backend) begin(write bool) *bolt.Tx {

size := tx.Size()
db := tx.DB()
stats := db.Stats()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.openReadTxN, int64(db.Stats().OpenTxN))
atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))

return tx
}
Expand Down
12 changes: 8 additions & 4 deletions mvcc/backend/read_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,17 @@ type concurrentReadTx struct {
buf txReadBuffer
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket // note: A map value is a pointer
buckets map[string]*bolt.Bucket
txWg *sync.WaitGroup
}

func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() {}
func (rt *concurrentReadTx) RLock() {}
func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() {}

// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
func (rt *concurrentReadTx) RLock() {}

// RUnlock signals the end of concurrentReadTx.
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }

func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
Expand Down
3 changes: 2 additions & 1 deletion mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (s *store) Read() TxnRead {
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
// ConcurrentReadTx is created, it will not block write transaction.
tx := s.b.ConcurrentReadTx()
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
Expand All @@ -48,7 +49,7 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult,
}

func (tr *storeTxnRead) End() {
tr.tx.RUnlock()
tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
tr.s.mu.RUnlock()
}

Expand Down

0 comments on commit a823555

Please sign in to comment.