diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 0d3cd87ec4c4..b8212a3f30e3 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -51,6 +51,8 @@ var ( type Backend interface { ReadTx() ReadTx BatchTx() BatchTx + // ConcurrentReadTx returns a non-blocking read transaction. + ConcurrentReadTx() ReadTx Snapshot() Snapshot Hash(ignores map[IgnoreKey]struct{}) (uint32, error) @@ -166,6 +168,7 @@ func newBackend(bcfg BackendConfig) *backend { txBuffer: txBuffer{make(map[string]*bucketBuffer)}, }, buckets: make(map[string]*bolt.Bucket), + txWg: new(sync.WaitGroup), }, stopc: make(chan struct{}), @@ -187,6 +190,24 @@ func (b *backend) BatchTx() BatchTx { func (b *backend) ReadTx() ReadTx { return b.readTx } +// ConcurrentReadTx creates and returns a new ReadTx, which: +// A) creates and keeps a copy of txReadBuffer in backend read Tx, +// B) references the boltdb read Tx (and its bucket cache) of current batch interval. +func (b *backend) ConcurrentReadTx() ReadTx { + b.readTx.RLock() + defer b.readTx.RUnlock() + // prevent boltdb read Tx from been rolled back until store read Tx is done. + b.readTx.txWg.Add(1) + newTxReadBuf := copyTxReadBuffer(b.readTx.buf) + return &concurrentReadTx{ + buf: newTxReadBuf, + tx: b.readTx.tx, + txMu: &b.readTx.txMu, + buckets: b.readTx.buckets, + txWg: b.readTx.txWg, + } +} + // ForceCommit forces the current batching tx to commit. func (b *backend) ForceCommit() { b.batchTx.Commit() @@ -536,3 +557,16 @@ func (s *snapshot) Close() error { <-s.donec return s.Tx.Rollback() } + +func copyTxReadBuffer(from txReadBuffer) txReadBuffer { + to := txReadBuffer{txBuffer: txBuffer{buckets: make(map[string]*bucketBuffer)}} + for k, v := range from.txBuffer.buckets { + bufCopy := make([]kv, len(v.buf)) + copy(bufCopy, v.buf) + to.txBuffer.buckets[k] = &bucketBuffer{ + buf: bufCopy, + used: v.used, + } + } + return to +} diff --git a/mvcc/backend/backend_test.go b/mvcc/backend/backend_test.go index 69bd21423687..f08782464f56 100644 --- a/mvcc/backend/backend_test.go +++ b/mvcc/backend/backend_test.go @@ -300,6 +300,8 @@ func TestBackendWritebackForEach(t *testing.T) { } } +// TODO: add a unit test for concurrentReadTx + func cleanup(b Backend, path string) { b.Close() os.Remove(path) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 77d0648b8c4f..364190a0589e 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -306,13 +306,9 @@ func (t *batchTxBuffered) commit(stop bool) { func (t *batchTxBuffered) unsafeCommit(stop bool) { if t.backend.readTx.tx != nil { - if err := t.backend.readTx.tx.Rollback(); err != nil { - if t.backend.lg != nil { - t.backend.lg.Fatal("failed to rollback tx", zap.Error(err)) - } else { - plog.Fatalf("cannot rollback tx (%s)", err) - } - } + // wait all store read transactions using the current boltdb tx to finish, + // then close the boltdb tx + go waitAndRollback(t.backend.readTx.tx, t.backend.readTx.txWg, t.backend.lg) t.backend.readTx.reset() } @@ -323,6 +319,17 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } } +func waitAndRollback(tx *bolt.Tx, wg *sync.WaitGroup, lg *zap.Logger) { + wg.Wait() + if err := tx.Rollback(); err != nil { + if lg != nil { + lg.Fatal("failed to rollback tx", zap.Error(err)) + } else { + plog.Fatalf("cannot rollback tx (%s)", err) + } + } +} + func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) { t.batchTx.UnsafePut(bucketName, key, value) t.buf.put(bucketName, key, value) diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go index 7b8d855eb76f..8409daa75b55 100644 --- a/mvcc/backend/read_tx.go +++ b/mvcc/backend/read_tx.go @@ -42,10 +42,12 @@ type readTx struct { mu sync.RWMutex buf txReadBuffer - // txmu protects accesses to buckets and tx on Range requests. - txmu sync.RWMutex + // txMu protects accesses to buckets and tx on Range requests. + txMu sync.RWMutex tx *bolt.Tx buckets map[string]*bolt.Bucket + // txWg protects tx from being rolled back until all reads using this tx are done. + txWg *sync.WaitGroup } func (rt *readTx) Lock() { rt.mu.Lock() } @@ -71,23 +73,23 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][] // find/cache bucket bn := string(bucketName) - rt.txmu.RLock() + rt.txMu.RLock() bucket, ok := rt.buckets[bn] - rt.txmu.RUnlock() + rt.txMu.RUnlock() if !ok { - rt.txmu.Lock() + rt.txMu.Lock() bucket = rt.tx.Bucket(bucketName) rt.buckets[bn] = bucket - rt.txmu.Unlock() + rt.txMu.Unlock() } // ignore missing bucket since may have been created in this batch if bucket == nil { return keys, vals } - rt.txmu.Lock() + rt.txMu.Lock() c := bucket.Cursor() - rt.txmu.Unlock() + rt.txMu.Unlock() k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) return append(k2, keys...), append(v2, vals...) @@ -108,9 +110,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err if err := rt.buf.ForEach(bucketName, getDups); err != nil { return err } - rt.txmu.Lock() + rt.txMu.Lock() err := unsafeForEach(rt.tx, bucketName, visitNoDup) - rt.txmu.Unlock() + rt.txMu.Unlock() if err != nil { return err } @@ -121,4 +123,88 @@ func (rt *readTx) reset() { rt.buf.reset() rt.buckets = make(map[string]*bolt.Bucket) rt.tx = nil + rt.txWg = new(sync.WaitGroup) +} + +// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation? +type concurrentReadTx struct { + buf txReadBuffer + txMu *sync.RWMutex + tx *bolt.Tx + buckets map[string]*bolt.Bucket // note: A map value is a pointer + txWg *sync.WaitGroup +} + +func (rt *concurrentReadTx) Lock() {} + +func (rt *concurrentReadTx) Unlock() {} + +func (rt *concurrentReadTx) RLock() {} + +func (rt *concurrentReadTx) RUnlock() { + rt.txWg.Done() +} + +func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { + dups := make(map[string]struct{}) + getDups := func(k, v []byte) error { + dups[string(k)] = struct{}{} + return nil + } + visitNoDup := func(k, v []byte) error { + if _, ok := dups[string(k)]; ok { + return nil + } + return visitor(k, v) + } + if err := rt.buf.ForEach(bucketName, getDups); err != nil { + return err + } + rt.txMu.Lock() + err := unsafeForEach(rt.tx, bucketName, visitNoDup) + rt.txMu.Unlock() + if err != nil { + return err + } + return rt.buf.ForEach(bucketName, visitor) +} + +func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + if endKey == nil { + // forbid duplicates for single keys + limit = 1 + } + if limit <= 0 { + limit = math.MaxInt64 + } + if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) { + panic("do not use unsafeRange on non-keys bucket") + } + keys, vals := rt.buf.Range(bucketName, key, endKey, limit) + if int64(len(keys)) == limit { + return keys, vals + } + + // find/cache bucket + bn := string(bucketName) + rt.txMu.RLock() + bucket, ok := rt.buckets[bn] + rt.txMu.RUnlock() + if !ok { + rt.txMu.Lock() + bucket = rt.tx.Bucket(bucketName) + rt.buckets[bn] = bucket + rt.txMu.Unlock() + } + + // ignore missing bucket since may have been created in this batch + if bucket == nil { + return keys, vals + } + rt.txMu.Lock() + c := bucket.Cursor() + rt.txMu.Unlock() + + k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) + return append(k2, keys...), append(v2, vals...) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 91183969901e..7a5a6c4f4b6c 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -645,30 +645,65 @@ func TestTxnPut(t *testing.T) { } } -func TestTxnBlockBackendForceCommit(t *testing.T) { +func TestConcurrentReadAndWrite(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) - txn := s.Read() + // write something to read later + s.Put([]byte("foo"), []byte("bar"), lease.NoLease) + // readTx simulates a long read request + readTx1 := s.Read() + + // write should not be blocked by reads done := make(chan struct{}) go func() { - s.b.ForceCommit() + s.Put([]byte("foo"), []byte("newBar"), lease.NoLease) // this is a write Txn done <- struct{}{} }() select { case <-done: - t.Fatalf("failed to block ForceCommit") - case <-time.After(100 * time.Millisecond): + case <-time.After(1 * time.Second): + t.Fatalf("write should not be blocked by read") } - txn.End() - select { - case <-done: - case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO - testutil.FatalStack(t, "failed to execute ForceCommit") + // readTx2 simulates a short read request + readTx2 := s.Read() + ro := RangeOptions{Limit: 1, Rev: 0, Count: false} + ret, err := readTx2.Range([]byte("foo"), nil, ro) + if err != nil { + t.Fatalf("failed to range: %v", err) + } + // readTx2 should see the result of new write + w := mvccpb.KeyValue{ + Key: []byte("foo"), + Value: []byte("newBar"), + CreateRevision: 2, + ModRevision: 3, + Version: 2, + } + if !reflect.DeepEqual(ret.KVs[0], w) { + t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w) + } + readTx2.End() + + ret, err = readTx1.Range([]byte("foo"), nil, ro) + if err != nil { + t.Fatalf("failed to range: %v", err) + } + // readTx1 should not see the result of new write + w = mvccpb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 2, + Version: 1, + } + if !reflect.DeepEqual(ret.KVs[0], w) { + t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w) } + readTx1.End() } // TODO: test attach key to lessor @@ -754,6 +789,7 @@ type fakeBackend struct { func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } +func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx } func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) SizeInUse() int64 { return 0 } diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 088ea734141b..c97e290f6359 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -31,13 +31,8 @@ type storeTxnRead struct { func (s *store) Read() TxnRead { s.mu.RLock() - tx := s.b.ReadTx() s.revMu.RLock() - // tx.RLock() blocks txReadBuffer for reading, which could potentially block the following two operations: - // A) writeback from txWriteBuffer to txReadBuffer at the end of a write transaction (TxnWrite). - // B) starting of a new backend batch transaction, where the pending changes need to be committed to boltdb - // and txReadBuffer needs to be reset. - tx.RLock() + tx := s.b.ConcurrentReadTx() firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})