Skip to content

Commit

Permalink
statistics: add more test for hot statistics (#4499)
Browse files Browse the repository at this point in the history
* add more test

Signed-off-by: lhy1024 <[email protected]>

* ref #4399

Signed-off-by: lhy1024 <[email protected]>

* add test for cool down transfer leader

Signed-off-by: lhy1024 <[email protected]>

* address comment

Signed-off-by: lhy1024 <[email protected]>

* address comment

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
lhy1024 and ti-chi-bot authored Jan 10, 2022
1 parent 6f8edf2 commit cfb909d
Showing 1 changed file with 116 additions and 55 deletions.
171 changes: 116 additions & 55 deletions server/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package statistics

import (
"math/rand"
"sort"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/server/core"
)

Expand All @@ -31,23 +31,9 @@ type testHotPeerCache struct{}

func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) {
cache := NewHotPeerCache(Write)
peers := newPeers(3,
func(i int) uint64 { return uint64(10000 + i) },
func(i int) uint64 { return uint64(i) })
meta := &metapb.Region{
Id: 1000,
Peers: peers,
StartKey: []byte(""),
EndKey: []byte(""),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6},
}
intervals := []uint64{120, 60}
for _, interval := range intervals {
region := core.NewRegionInfo(meta, peers[0],
// interval is [0, interval]
core.SetReportInterval(interval),
core.SetWrittenBytes(interval*100*1024))

region := buildRegion(Write, 3, interval)
checkAndUpdate(c, cache, region, 3)
{
stats := cache.RegionStats(0)
Expand Down Expand Up @@ -191,6 +177,36 @@ func checkOp(c *C, ret []*HotPeerStat, storeID uint64, actionType ActionType) {
}
}

// checkIntervalSum checks whether the interval sum of the peers are different.
func checkIntervalSum(cache *hotPeerCache, region *core.RegionInfo) bool {
var intervalSums []int
for _, peer := range region.GetPeers() {
oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId)
if oldItem != nil {
intervalSums = append(intervalSums, int(oldItem.getIntervalSum()))
}
}
sort.Ints(intervalSums)
return intervalSums[0] != intervalSums[len(intervalSums)-1]
}

// checkIntervalSumContinuous checks whether the interval sum of the peer is continuous.
func checkIntervalSumContinuous(c *C, intervalSums map[uint64]int, rets []*HotPeerStat, interval uint64) {
for _, ret := range rets {
if ret.actionType == Remove {
delete(intervalSums, ret.StoreID)
continue
}
new := int(ret.getIntervalSum() / 1000000000)
if ret.source == direct {
if old, ok := intervalSums[ret.StoreID]; ok {
c.Assert(new, Equals, (old+int(interval))%RegionHeartBeatReportInterval)
}
}
intervalSums[ret.StoreID] = new
}
}

func schedule(c *C, operator operator, region *core.RegionInfo, targets ...uint64) (srcStore uint64, _ *core.RegionInfo) {
switch operator {
case transferLeader:
Expand Down Expand Up @@ -392,29 +408,28 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold
}

func (t *testHotPeerCache) TestRemoveFromCache(c *C) {
peerCount := 3
interval := uint64(5)
checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering}
for _, checker := range checkers {
cache := NewHotPeerCache(Write)
region := buildRegion(Write, 3, 5)
region := buildRegion(Write, peerCount, interval)
// prepare
intervalSums := make(map[uint64]int)
for i := 1; i <= 200; i++ {
checker(c, cache, region)
rets := checker(c, cache, region)
checkIntervalSumContinuous(c, intervalSums, rets, interval)
}
// make the interval sum of peers are different
checkAndUpdateSkipOne(c, cache, region)
var intervalSums []int
for _, peer := range region.GetPeers() {
oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId)
intervalSums = append(intervalSums, int(oldItem.getIntervalSum()))
}
c.Assert(intervalSums, HasLen, 3)
c.Assert(intervalSums[0], Not(Equals), intervalSums[1])
c.Assert(intervalSums[0], Not(Equals), intervalSums[2])
checkIntervalSum(cache, region)
// check whether cold cache is cleared
var isClear bool
intervalSums = make(map[uint64]int)
region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0))
for i := 1; i <= 200; i++ {
checker(c, cache, region)
rets := checker(c, cache, region)
checkIntervalSumContinuous(c, intervalSums, rets, interval)
if len(cache.storesOfRegion[region.GetID()]) == 0 {
isClear = true
break
Expand All @@ -435,28 +450,38 @@ func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) {
region := buildRegion(Write, peerCount, interval)

target := uint64(10)
movePeer := func() {
intervalSums := make(map[uint64]int)
step := func(i int) {
tmp := uint64(0)
tmp, region = schedule(c, removeReplica, region)
_, region = schedule(c, addReplica, region, target)
target = tmp
if i%5 == 0 {
tmp, region = schedule(c, removeReplica, region)
}
rets := checker(c, cache, region)
checkIntervalSumContinuous(c, intervalSums, rets, interval)
if i%5 == 0 {
_, region = schedule(c, addReplica, region, target)
target = tmp
}
}

// prepare with random move peer to make the interval sum of peers are different
for i := 1; i <= 200; i++ {
if i%5 == 0 {
movePeer()
for i := 1; i < 150; i++ {
step(i)
if i > 150 && checkIntervalSum(cache, region) {
break
}
checker(c, cache, region)
}
if interval < RegionHeartBeatReportInterval {
c.Assert(checkIntervalSum(cache, region), IsTrue)
}
c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount)

// check whether cold cache is cleared
var isClear bool
intervalSums = make(map[uint64]int)
region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0))
for i := 1; i <= 200; i++ {
if i%5 == 0 {
movePeer()
}
checker(c, cache, region)
for i := 1; i < 200; i++ {
step(i)
if len(cache.storesOfRegion[region.GetID()]) == 0 {
isClear = true
break
Expand All @@ -468,6 +493,55 @@ func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) {
}
}

func checkCoolDown(c *C, cache *hotPeerCache, region *core.RegionInfo, expect bool) {
item := cache.getOldHotPeerStat(region.GetID(), region.GetLeader().GetStoreId())
c.Assert(item.IsNeedCoolDownTransferLeader(3), Equals, expect)
}

func (t *testHotPeerCache) TestCoolDownTransferLeader(c *C) {
cache := NewHotPeerCache(Read)
region := buildRegion(Read, 3, 60)

moveLeader := func() {
_, region = schedule(c, movePeer, region, 10)
checkAndUpdate(c, cache, region)
checkCoolDown(c, cache, region, false)
_, region = schedule(c, transferLeader, region, 10)
checkAndUpdate(c, cache, region)
checkCoolDown(c, cache, region, true)
}
transferLeader := func() {
_, region = schedule(c, transferLeader, region)
checkAndUpdate(c, cache, region)
checkCoolDown(c, cache, region, true)
}
movePeer := func() {
_, region = schedule(c, movePeer, region, 10)
checkAndUpdate(c, cache, region)
checkCoolDown(c, cache, region, false)
}
addReplica := func() {
_, region = schedule(c, addReplica, region, 10)
checkAndUpdate(c, cache, region)
checkCoolDown(c, cache, region, false)
}
removeReplica := func() {
_, region = schedule(c, removeReplica, region, 10)
checkAndUpdate(c, cache, region)
checkCoolDown(c, cache, region, false)
}
cases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica}
for _, runCase := range cases {
cache = NewHotPeerCache(Read)
region = buildRegion(Read, 3, 60)
for i := 1; i <= 200; i++ {
checkAndUpdate(c, cache, region)
}
checkCoolDown(c, cache, region, false)
runCase()
}
}

// See issue #4510
func (t *testHotPeerCache) TestCacheInherit(c *C) {
cache := NewHotPeerCache(Read)
Expand Down Expand Up @@ -509,22 +583,9 @@ func (t *testHotPeerCache) TestCacheInherit(c *C) {

func BenchmarkCheckRegionFlow(b *testing.B) {
cache := NewHotPeerCache(Read)
region := core.NewRegionInfo(&metapb.Region{
Id: 1,
Peers: []*metapb.Peer{
{Id: 101, StoreId: 1},
{Id: 102, StoreId: 2},
{Id: 103, StoreId: 3},
},
},
&metapb.Peer{Id: 101, StoreId: 1},
)
newRegion := region.Clone(
core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}),
core.SetReadBytes(30000*10),
core.SetReadKeys(300000*10))
region := buildRegion(Read, 3, 10)
peerInfos := make([]*core.PeerInfo, 0)
for _, peer := range newRegion.GetPeers() {
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10)
peerInfos = append(peerInfos, peerInfo)
}
Expand Down

0 comments on commit cfb909d

Please sign in to comment.