Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvcc: fully concurrent read #10523

Merged
merged 5 commits into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions integration/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"context"
"fmt"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -103,22 +104,39 @@ func testMetricDbSizeDefrag(t *testing.T, name string) {
t.Fatal(kerr)
}

// Put to move PendingPages to FreePages
if _, err = kvc.Put(context.TODO(), putreq); err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)
validateAfterCompactionInUse := func() error {
// Put to move PendingPages to FreePages
if _, err = kvc.Put(context.TODO(), putreq); err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)

afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes")
if err != nil {
t.Fatal(err)
}
aciu, err := strconv.Atoi(afterCompactionInUse)
if err != nil {
t.Fatal(err)
afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes")
if err != nil {
t.Fatal(err)
}
aciu, err := strconv.Atoi(afterCompactionInUse)
if err != nil {
t.Fatal(err)
}
if biu <= aciu {
return fmt.Errorf("expected less than %d, got %d after compaction", biu, aciu)
}
return nil
}
if biu <= aciu {
t.Fatalf("expected less than %d, got %d after compaction", biu, aciu)

// backend rollbacks read transaction asynchronously (PR #10523),
// which causes the result to be flaky. Retry 3 times.
maxRetry, retry := 3, 0
for {
err := validateAfterCompactionInUse()
if err == nil {
break
}
retry++
if retry >= maxRetry {
t.Fatalf(err.Error())
}
}

// defrag should give freed space back to fs
Expand Down
30 changes: 30 additions & 0 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
type Backend interface {
ReadTx() ReadTx
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx

Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
Expand All @@ -63,6 +65,8 @@ type Backend interface {
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
Expand All @@ -87,6 +91,8 @@ type backend struct {
sizeInUse int64
// commits counts number of commits since start
commits int64
// openReadTxN is the number of currently open read transactions in the backend
openReadTxN int64

mu sync.RWMutex
db *bolt.DB
Expand Down Expand Up @@ -166,6 +172,7 @@ func newBackend(bcfg BackendConfig) *backend {
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
},

stopc: make(chan struct{}),
Expand All @@ -187,6 +194,24 @@ func (b *backend) BatchTx() BatchTx {

func (b *backend) ReadTx() ReadTx { return b.readTx }

// ConcurrentReadTx creates and returns a new ReadTx, which:
// A) creates and keeps a copy of backend.readTx.txReadBuffer,
// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock()
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done.
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
return &concurrentReadTx{
buf: b.readTx.buf.unsafeCopy(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i remember the design doc mentioned that this is lazy copy. the first read wont need to do the copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a copy at the beginning is just a natural way of preserving the correct view [1] throughout the life cycle of a read transaction. I think lazy copy could be a nice follow up improvement, especially if copying the buffer is time or memory consuming. So far in my benchmark tests I do not feel like this is an issue.

[1] By correct view, I mean the read buffer's state at the beginning of the read transaction, combined with the view of boltdb Tx opened at the beginning of the current batch interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. can you add a todo or something? just in case we want to do an optimization in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will add a TODO.

tx: b.readTx.tx,
txMu: &b.readTx.txMu,
buckets: b.readTx.buckets,
txWg: b.readTx.txWg,
}
}

// ForceCommit forces the current batching tx to commit.
func (b *backend) ForceCommit() {
b.batchTx.Commit()
Expand Down Expand Up @@ -493,6 +518,7 @@ func (b *backend) begin(write bool) *bolt.Tx {
db := tx.DB()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since the db.Stats() call acquires a read lock, only call it once here (instead of once for FreePageN and once for OpenTxN) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

atomic.StoreInt64(&b.openReadTxN, int64(db.Stats().OpenTxN))

return tx
}
Expand All @@ -509,6 +535,10 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx {
return tx
}

func (b *backend) OpenReadTxN() int64 {
return atomic.LoadInt64(&b.openReadTxN)
}

// NewTmpBackend creates a backend implementation for testing.
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
Expand Down
29 changes: 29 additions & 0 deletions mvcc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,35 @@ func TestBackendWriteback(t *testing.T) {
}
}

// TestConcurrentReadTx ensures that current read transaction can see all prior writes stored in read buffer
func TestConcurrentReadTx(t *testing.T) {
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)

wtx1 := b.BatchTx()
wtx1.Lock()
wtx1.UnsafeCreateBucket([]byte("key"))
wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC"))
wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
wtx1.Unlock()

wtx2 := b.BatchTx()
wtx2.Lock()
wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF"))
wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
wtx2.Unlock()

rtx := b.ConcurrentReadTx()
rtx.RLock() // no-op
k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0)
rtx.RUnlock()
wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
if !reflect.DeepEqual(wKey, k) || !reflect.DeepEqual(wVal, v) {
t.Errorf("want k=%+v, v=%+v; got k=%+v, v=%+v", wKey, wVal, k, v)
}
}

// TestBackendWritebackForEach checks that partially written / buffered
// data is visited in the same order as fully committed data.
func TestBackendWritebackForEach(t *testing.T) {
Expand Down
21 changes: 14 additions & 7 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,9 @@ func (t *batchTxBuffered) commit(stop bool) {

func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx
go waitAndRollback(t.backend.readTx.tx, t.backend.readTx.txWg, t.backend.lg)
t.backend.readTx.reset()
}

Expand All @@ -323,6 +319,17 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}
}

func waitAndRollback(tx *bolt.Tx, wg *sync.WaitGroup, lg *zap.Logger) {
jingyih marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a receiver function of batchTxBuffered? The parameters don't all appear necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tx and wg need to be passes into the function. batchTxBuffered's tx and wg will be reset / re-created immediate after this function call, whereas this function waits until all ongoing reads using the input tx is done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this a receiver and moving the logger out of the params might make that even more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean something like func (lg *zap.Logger) waitAndRollback(tx *bolt.Tx, wg *sync.WaitGroup)? I have mixed feeling about this. By convention (in this repo), zap logger is passed to functions as input parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more like func (t *batchTxBuffered) waitAndRollback(tx *bolt.Tx, wg *sync.WaitGroup) so that the only params that need to be passed are the ones that are semantically important to the call. But this is minor,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just inline the function in unsafeCommit? it doesn't seem to benefit from being separate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

wg.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to create a metric for this. make sure there is no leaky transactions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is important since a leaky read tx will keep boltdb size growing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

if err := tx.Rollback(); err != nil {
if lg != nil {
lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
}

func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafePut(bucketName, key, value)
t.buf.put(bucketName, key, value)
Expand Down
102 changes: 92 additions & 10 deletions mvcc/backend/read_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ type readTx struct {
mu sync.RWMutex
buf txReadBuffer

// txmu protects accesses to buckets and tx on Range requests.
txmu sync.RWMutex
// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
// txMu protects accesses to buckets and tx on Range requests.
jingyih marked this conversation as resolved.
Show resolved Hide resolved
txMu sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
txWg *sync.WaitGroup
}

func (rt *readTx) Lock() { rt.mu.Lock() }
Expand All @@ -71,23 +74,23 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]

// find/cache bucket
bn := string(bucketName)
rt.txmu.RLock()
rt.txMu.RLock()
bucket, ok := rt.buckets[bn]
rt.txmu.RUnlock()
rt.txMu.RUnlock()
if !ok {
rt.txmu.Lock()
rt.txMu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txmu.Unlock()
rt.txMu.Unlock()
}

// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txmu.Lock()
rt.txMu.Lock()
c := bucket.Cursor()
rt.txmu.Unlock()
rt.txMu.Unlock()

k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
Expand All @@ -108,9 +111,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txmu.Lock()
rt.txMu.Lock()
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txmu.Unlock()
rt.txMu.Unlock()
if err != nil {
return err
}
Expand All @@ -121,4 +124,83 @@ func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[string]*bolt.Bucket)
rt.tx = nil
rt.txWg = new(sync.WaitGroup)
}

// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
jingyih marked this conversation as resolved.
Show resolved Hide resolved
type concurrentReadTx struct {
buf txReadBuffer
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket // note: A map value is a pointer
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
txWg *sync.WaitGroup
}

func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() {}
func (rt *concurrentReadTx) RLock() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit surprised we don't do anything in RLock(). Should we try to adhere to the lock contract more closely (i.e. move logic from ConcurrentReadTx to here) ? If not we should clearly document what's going on.

Should Lock() and Unlock() be noops? Alternatives would be to delegate to RLock()/RUnock() or to panic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is tricky. ConcurrentReadTx() holds a lock when it is being created (and everything needs to happen when holding that lock, so cannot really move part of the logic to a separate function). After creation, there is no need / concept of locking and unlocking. So these functions end up being no-op. They only exist so that *concurrentReadTx implements ReadTx interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some comments with what you just said would certainly make it easier to understand.

The violation of the ReadTx interface guarantees do make for some odd behavior, e.g.:

tx := ConcurrentReadTx()
tx.RLock()
// do something
tx.RUnlock()
tx.RLock()
// do something
tx.RUnlock()

Would certainly not do what a developer might expect..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I am not happy with the current state. The current implementation of ConcurrentReadTx is compromised in readability because I wanted to do minimum change to various other existing interfaces. (Tried to define a new interface for concurrent read Tx, it caused ripple effects and the end result is kind of messy). Fortunately ReadTx interface is used internally in mvcc backend. Developer normally do not need to interact with this level. Instead, the mvcc store level transaction interface [1] should be enough (which is kept unchanged).

But I will add some comments try to clarify the implementation.

[1]

etcd/mvcc/kv.go

Lines 55 to 57 in 0de9b8a

// TxnRead represents a read-only transaction with operations that will not
// block other read transactions.
type TxnRead interface {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let's loop back on this interface later. The comments should be sufficient for now.

func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }

func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
return nil
}
visitNoDup := func(k, v []byte) error {
if _, ok := dups[string(k)]; ok {
return nil
}
return visitor(k, v)
}
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txMu.Lock()
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txMu.Unlock()
if err != nil {
return err
}
return rt.buf.ForEach(bucketName, visitor)
}

func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
// forbid duplicates for single keys
limit = 1
}
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
panic("do not use unsafeRange on non-keys bucket")
}
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}

// find/cache bucket
bn := string(bucketName)
rt.txMu.RLock()
bucket, ok := rt.buckets[bn]
rt.txMu.RUnlock()
if !ok {
rt.txMu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txMu.Unlock()
}

// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txMu.Lock()
c := bucket.Cursor()
rt.txMu.Unlock()

k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}
22 changes: 22 additions & 0 deletions mvcc/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) er
return nil
}

// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock()
func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txrCopy := txReadBuffer{
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
},
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
}
return txrCopy
}

type kv struct {
key []byte
val []byte
Expand Down Expand Up @@ -179,3 +192,12 @@ func (bb *bucketBuffer) Less(i, j int) bool {
return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
}
func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }

func (bb *bucketBuffer) Copy() *bucketBuffer {
bbCopy := bucketBuffer{
buf: make([]kv, len(bb.buf)),
used: bb.used,
}
copy(bbCopy.buf, bb.buf)
return &bbCopy
}
3 changes: 3 additions & 0 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ func (s *store) restore() error {
reportDbTotalSizeInUseInBytesMu.Lock()
reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
reportDbTotalSizeInUseInBytesMu.Unlock()
reportDbOpenReadTxNMu.Lock()
reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
reportDbOpenReadTxNMu.Unlock()

min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
Expand Down
Loading