Skip to content

Commit

Permalink
mvcc: add TestConcurrentReadTxAndWrite
Browse files Browse the repository at this point in the history
Add TestConcurrentReadTxAndWrite which creates random reads and writes,
and ensures reads always see latest writes.
  • Loading branch information
jingyih committed Jun 11, 2019
1 parent 693afd8 commit 119134b
Showing 1 changed file with 112 additions and 1 deletion.
113 changes: 112 additions & 1 deletion mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package mvcc

import (
"bytes"
"crypto/rand"
"encoding/binary"
"fmt"
"math"
mrand "math/rand"
"os"
"reflect"
"sort"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -645,7 +648,8 @@ func TestTxnPut(t *testing.T) {
}
}

func TestConcurrentReadAndWrite(t *testing.T) {
// TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
func TestConcurrentReadNotBlockingWrite(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)
Expand Down Expand Up @@ -706,6 +710,113 @@ func TestConcurrentReadAndWrite(t *testing.T) {
readTx1.End()
}

type kv struct {
key []byte
val []byte
}

type kvs []kv

func (kvs kvs) Len() int { return len(kvs) }
func (kvs kvs) Less(i, j int) bool { return bytes.Compare(kvs[i].key, kvs[j].key) < 0 }
func (kvs kvs) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] }

// TestConcurrentReadTxAndWrite creates random concurrent Reads and Writes, and ensures Reads always see latest Writes
func TestConcurrentReadTxAndWrite(t *testing.T) {
var (
numOfReads = 1000
numOfWrites = 1000
maxNumOfPutsPerWrite = 100
committedKVs kvs // committedKVs records the key-value pairs written by the finished Write Txns
mu sync.Mutex // mu protectes wKVs
)
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

var wg sync.WaitGroup
wg.Add(numOfWrites)
for i := 0; i < numOfWrites; i++ {
go func() {
defer wg.Done()
time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time

tx := s.Write()
numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
var pendingKvs kvs
for j := 0; j < numOfPuts; j++ {
k := []byte(strconv.Itoa(mrand.Int()))
v := []byte(strconv.Itoa(mrand.Int()))
tx.Put(k, v, lease.NoLease)
pendingKvs = append(pendingKvs, kv{k, v})
}
// reads should not see above Puts until write is finished
mu.Lock()
committedKVs = merge(committedKVs, pendingKvs) // update shared data structure
tx.End()
mu.Unlock()
}()
}

wg.Add(numOfReads)
for i := 0; i < numOfReads; i++ {
go func() {
defer wg.Done()
time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time

mu.Lock()
wKVs := make(kvs, len(committedKVs))
copy(wKVs, committedKVs)
tx := s.Read()
mu.Unlock()
// get all keys in backend store, and compare with wKVs
ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
tx.End()
if err != nil {
t.Errorf("failed to range keys: %v", err)
return
}
if len(wKVs) == 0 && len(ret.KVs) == 0 { // no committed KVs yet
return
}
var result kvs
for _, keyValue := range ret.KVs {
result = append(result, kv{keyValue.Key, keyValue.Value})
}
if !reflect.DeepEqual(wKVs, result) {
t.Errorf("unexpected range result") // too many key value pairs, skip printing them
}
}()
}

// wait until go routines finish or timeout
doneC := make(chan struct{})
go func() {
wg.Wait()
close(doneC)
}()
select {
case <-doneC:
case <-time.After(5 * time.Minute):
testutil.FatalStack(t, "timeout")
}
}

func merge(dst, src kvs) kvs {
dst = append(dst, src...)
sort.Stable(dst)
// remove duplicates, using only the newest value
// ref: tx_buffer.go
widx := 0
for ridx := 1; ridx < len(dst); ridx++ {
if !bytes.Equal(dst[widx].key, dst[ridx].key) {
widx++
}
dst[widx] = dst[ridx]
}
return dst[:widx+1]
}

// TODO: test attach key to lessor

func newTestRevBytes(rev revision) []byte {
Expand Down

0 comments on commit 119134b

Please sign in to comment.