From 119134b59db03172a9b3022bc240d653a5966add Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Mon, 10 Jun 2019 21:26:04 -0700 Subject: [PATCH] mvcc: add TestConcurrentReadTxAndWrite Add TestConcurrentReadTxAndWrite which creates random reads and writes, and ensures reads always see latest writes. --- mvcc/kvstore_test.go | 113 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 1 deletion(-) diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 27d72b63d51b..7f97197548bc 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -15,6 +15,7 @@ package mvcc import ( + "bytes" "crypto/rand" "encoding/binary" "fmt" @@ -22,6 +23,8 @@ import ( mrand "math/rand" "os" "reflect" + "sort" + "strconv" "sync" "testing" "time" @@ -645,7 +648,8 @@ func TestTxnPut(t *testing.T) { } } -func TestConcurrentReadAndWrite(t *testing.T) { +// TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation +func TestConcurrentReadNotBlockingWrite(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) @@ -706,6 +710,113 @@ func TestConcurrentReadAndWrite(t *testing.T) { readTx1.End() } +type kv struct { + key []byte + val []byte +} + +type kvs []kv + +func (kvs kvs) Len() int { return len(kvs) } +func (kvs kvs) Less(i, j int) bool { return bytes.Compare(kvs[i].key, kvs[j].key) < 0 } +func (kvs kvs) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] } + +// TestConcurrentReadTxAndWrite creates random concurrent Reads and Writes, and ensures Reads always see latest Writes +func TestConcurrentReadTxAndWrite(t *testing.T) { + var ( + numOfReads = 1000 + numOfWrites = 1000 + maxNumOfPutsPerWrite = 100 + committedKVs kvs // committedKVs records the key-value pairs written by the finished Write Txns + mu sync.Mutex // mu protectes wKVs + ) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + defer os.Remove(tmpPath) + + var wg sync.WaitGroup + wg.Add(numOfWrites) + for i := 0; i < numOfWrites; i++ { + go func() { + defer wg.Done() + time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time + + tx := s.Write() + numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1 + var pendingKvs kvs + for j := 0; j < numOfPuts; j++ { + k := []byte(strconv.Itoa(mrand.Int())) + v := []byte(strconv.Itoa(mrand.Int())) + tx.Put(k, v, lease.NoLease) + pendingKvs = append(pendingKvs, kv{k, v}) + } + // reads should not see above Puts until write is finished + mu.Lock() + committedKVs = merge(committedKVs, pendingKvs) // update shared data structure + tx.End() + mu.Unlock() + }() + } + + wg.Add(numOfReads) + for i := 0; i < numOfReads; i++ { + go func() { + defer wg.Done() + time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time + + mu.Lock() + wKVs := make(kvs, len(committedKVs)) + copy(wKVs, committedKVs) + tx := s.Read() + mu.Unlock() + // get all keys in backend store, and compare with wKVs + ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) + tx.End() + if err != nil { + t.Errorf("failed to range keys: %v", err) + return + } + if len(wKVs) == 0 && len(ret.KVs) == 0 { // no committed KVs yet + return + } + var result kvs + for _, keyValue := range ret.KVs { + result = append(result, kv{keyValue.Key, keyValue.Value}) + } + if !reflect.DeepEqual(wKVs, result) { + t.Errorf("unexpected range result") // too many key value pairs, skip printing them + } + }() + } + + // wait until go routines finish or timeout + doneC := make(chan struct{}) + go func() { + wg.Wait() + close(doneC) + }() + select { + case <-doneC: + case <-time.After(5 * time.Minute): + testutil.FatalStack(t, "timeout") + } +} + +func merge(dst, src kvs) kvs { + dst = append(dst, src...) + sort.Stable(dst) + // remove duplicates, using only the newest value + // ref: tx_buffer.go + widx := 0 + for ridx := 1; ridx < len(dst); ridx++ { + if !bytes.Equal(dst[widx].key, dst[ridx].key) { + widx++ + } + dst[widx] = dst[ridx] + } + return dst[:widx+1] +} + // TODO: test attach key to lessor func newTestRevBytes(rev revision) []byte {