diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 72a58fdd3da..1ae8505e864 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -19,6 +19,7 @@ import ( "flag" "fmt" "log" + "math/rand" "time" "github.com/pingcap/kvproto/pkg/metapb" @@ -29,13 +30,22 @@ import ( var ( pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address") - storeCount = flag.Int("store", 20, "store count") - regionCount = flag.Uint64("region", 1000000, "region count") - keyLen = flag.Int("keylen", 56, "key length") + storeCount = flag.Int("store", 40, "store count") + regionCount = flag.Int("region", 1000000, "region count") + keyLen = flag.Int("key-len", 56, "key length") replica = flag.Int("replica", 3, "replica count") - regionUpdateRatio = flag.Float64("region-update-ratio", 0.05, "ratio of the region need to update") + leaderUpdateRatio = flag.Float64("leader", 0.06, "ratio of the region leader need to update, they need save-tree") + epochUpdateRatio = flag.Float64("epoch", 0.04, "ratio of the region epoch need to update, they need save-kv") + spaceUpdateRatio = flag.Float64("space", 0.15, "ratio of the region space need to update") + flowUpdateRatio = flag.Float64("flow", 0.35, "ratio of the region flow need to update") sample = flag.Bool("sample", false, "sample per second") - heartbeatRounds = flag.Int("heartbeat-rounds", 5, "total rounds of heartbeat") + heartbeatRounds = flag.Int("heartbeat-rounds", 4, "total rounds of heartbeat") +) + +const ( + bytesUnit = 1 << 23 // 8MB + keysUint = 1 << 13 // 8K + intervalUint = 60 // 60s ) var clusterID uint64 @@ -136,61 +146,168 @@ func newEndKey(id uint64) []byte { return k } -// Store simulates a TiKV to heartbeat. -type Store struct { - id uint64 +// Regions simulates all regions to heartbeat. +type Regions struct { + regions []*pdpb.RegionHeartbeatRequest + + updateRound int + + updateLeader []int + updateEpoch []int + updateSpace []int + updateFlow []int +} + +func (rs *Regions) init() { + rs.regions = make([]*pdpb.RegionHeartbeatRequest, 0, *regionCount) + rs.updateRound = 0 + + // Generate regions + id := uint64(1) + now := uint64(time.Now().Unix()) + + for i := 0; i < *regionCount; i++ { + region := &pdpb.RegionHeartbeatRequest{ + Header: header(), + Region: &metapb.Region{ + Id: id, + StartKey: newStartKey(id), + EndKey: newEndKey(id), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1}, + }, + ApproximateSize: bytesUnit, + Interval: &pdpb.TimeInterval{ + StartTimestamp: now, + EndTimestamp: now + intervalUint, + }, + ApproximateKeys: keysUint, + Term: 1, + } + id += 1 + + peers := make([]*metapb.Peer, 0, *replica) + for j := 0; j < *replica; j++ { + peers = append(peers, &metapb.Peer{Id: id, StoreId: uint64((i+j)%*storeCount + 1)}) + id += 1 + } + + region.Region.Peers = peers + region.Leader = peers[0] + rs.regions = append(rs.regions, region) + } + + // Generate sample index + slice := make([]int, *regionCount) + for i := range slice { + slice[i] = i + } + + rand.Seed(0) // Ensure consistent behavior multiple times + pick := func(ratio float64) []int { + rand.Shuffle(*regionCount, func(i, j int) { + slice[i], slice[j] = slice[j], slice[i] + }) + return append(slice[:0:0], slice[0:int(float64(*regionCount)*ratio)]...) + } + + rs.updateLeader = pick(*leaderUpdateRatio) + rs.updateEpoch = pick(*epochUpdateRatio) + rs.updateSpace = pick(*spaceUpdateRatio) + rs.updateFlow = pick(*flowUpdateRatio) +} + +func (rs *Regions) update() { + rs.updateRound += 1 + + // update leader + for _, i := range rs.updateLeader { + region := rs.regions[i] + region.Leader = region.Region.Peers[rs.updateRound%*replica] + } + // update epoch + for _, i := range rs.updateEpoch { + region := rs.regions[i] + region.Region.RegionEpoch.Version += 1 + } + // update space + for _, i := range rs.updateSpace { + region := rs.regions[i] + region.ApproximateSize += bytesUnit + region.ApproximateKeys += keysUint + } + // update flow + for _, i := range rs.updateFlow { + region := rs.regions[i] + region.BytesWritten += bytesUnit + region.BytesRead += bytesUnit + region.KeysWritten += keysUint + region.KeysRead += keysUint + } + // update interval + for _, region := range rs.regions { + region.Interval.StartTimestamp = region.Interval.EndTimestamp + region.Interval.EndTimestamp = region.Interval.StartTimestamp + intervalUint + } } -// Run runs the store. -func (s *Store) Run(startNotifier chan report.Report, endNotifier chan struct{}) { +func (rs *Regions) send(storeID uint64, startNotifier chan report.Report, endNotifier chan struct{}) { cli := newClient() stream, err := cli.RegionHeartbeat(context.TODO()) if err != nil { log.Fatal(err) } - var peers []*metapb.Peer - for i := 0; i < *replica; i++ { - storeID := s.id + uint64(i) - if storeID > uint64(*storeCount) { - storeID -= uint64(*storeCount) - } - peers = append(peers, &metapb.Peer{Id: uint64(i + 1), StoreId: storeID}) - } - count := 1 for r := range startNotifier { startTime := time.Now() - for regionID := s.id; regionID <= *regionCount+uint64(*storeCount); regionID += uint64(*storeCount) { - updateRegionCount := uint64(float64(*regionCount) * (*regionUpdateRatio) / float64(*storeCount)) - storeUpdateRegionMaxID := s.id + updateRegionCount*uint64(*storeCount) - meta := &metapb.Region{ - Id: regionID, - Peers: peers, - RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1}, - StartKey: newStartKey(regionID), - EndKey: newEndKey(regionID), - } - if regionID < storeUpdateRegionMaxID { - meta.RegionEpoch.Version = uint64(count) + count := 0 + for _, region := range rs.regions { + if region.Leader.StoreId != storeID { + continue } + count += 1 reqStart := time.Now() - err = stream.Send(&pdpb.RegionHeartbeatRequest{ - Header: header(), - Region: meta, - Leader: peers[0], - }) - + err = stream.Send(region) r.Results() <- report.Result{Start: reqStart, End: time.Now(), Err: err} if err != nil { log.Fatal(err) } } - log.Printf("store %v finish heartbeat, cost time: %v", s.id, time.Since(startTime)) - count++ + log.Printf("store %v finish heartbeat, count: %v, cost time: %v", storeID, count, time.Since(startTime)) endNotifier <- struct{}{} } } +func (rs *Regions) result(sec float64) string { + if rs.updateRound == 0 { + // There was no difference in the first round + return "" + } + + updated := make(map[int]struct{}) + for _, i := range rs.updateLeader { + updated[i] = struct{}{} + } + for _, i := range rs.updateEpoch { + updated[i] = struct{}{} + } + for _, i := range rs.updateSpace { + updated[i] = struct{}{} + } + for _, i := range rs.updateFlow { + updated[i] = struct{}{} + } + inactiveCount := *regionCount - len(updated) + + ret := "Update speed of each category:\n" + ret += fmt.Sprintf(" Requests/sec: %12.4f\n", float64(*regionCount)/sec) + ret += fmt.Sprintf(" Save-Tree/sec: %12.4f\n", float64(len(rs.updateLeader))/sec) + ret += fmt.Sprintf(" Save-KV/sec: %12.4f\n", float64(len(rs.updateEpoch))/sec) + ret += fmt.Sprintf(" Save-Space/sec: %12.4f\n", float64(len(rs.updateSpace))/sec) + ret += fmt.Sprintf(" Save-Flow/sec: %12.4f\n", float64(len(rs.updateFlow))/sec) + ret += fmt.Sprintf(" Skip/sec: %12.4f\n", float64(inactiveCount)/sec) + return ret +} + func main() { log.SetFlags(0) flag.Parse() @@ -203,30 +320,35 @@ func main() { log.Println("finish put stores") groupStartNotify := make([]chan report.Report, *storeCount+1) groupEndNotify := make([]chan struct{}, *storeCount+1) + regions := new(Regions) + regions.init() + for i := 1; i <= *storeCount; i++ { - s := Store{id: uint64(i)} startNotifier := make(chan report.Report) endNotifier := make(chan struct{}) groupStartNotify[i] = startNotifier groupEndNotify[i] = endNotifier - go s.Run(startNotifier, endNotifier) + go regions.send(uint64(i), startNotifier, endNotifier) } for i := 0; i < *heartbeatRounds; i++ { log.Printf("\n--------- Bench heartbeat (Round %d) ----------\n", i+1) - report := newReport() - rs := report.Run() + repo := newReport() + rs := repo.Run() // All stores start heartbeat. + startTime := time.Now() for storeID := 1; storeID <= *storeCount; storeID++ { startNotifier := groupStartNotify[storeID] - startNotifier <- report + startNotifier <- repo } // All stores finished heartbeat once. for storeID := 1; storeID <= *storeCount; storeID++ { <-groupEndNotify[storeID] } - - close(report.Results()) + since := time.Since(startTime).Seconds() + close(repo.Results()) log.Println(<-rs) + log.Println(regions.result(since)) + regions.update() } }