diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 896e3606d7c..86169913ba5 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -67,6 +67,11 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) { mc.PutRegion(r) } +// GetStoresStats gets stores statistics. +func (mc *Cluster) GetStoresStats() *statistics.StoresStats { + return mc.StoresStats +} + // GetStoreRegionCount gets region count with a given store. func (mc *Cluster) GetStoreRegionCount(storeID uint64) int { return mc.Regions.GetStoreRegionCount(storeID) @@ -363,6 +368,7 @@ func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64 interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval newStore := store.Clone(core.SetStoreStats(newStats)) + mc.Set(storeID, newStats) mc.PutStore(newStore) } @@ -375,6 +381,7 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) { interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval newStore := store.Clone(core.SetStoreStats(newStats)) + mc.Set(storeID, newStats) mc.PutStore(newStore) } diff --git a/server/api/trend.go b/server/api/trend.go index dc2db77d548..35fb73107e0 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -106,7 +106,7 @@ func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) { } func (h *trendHandler) getTrendStores() ([]trendStore, error) { - var readStats, writeStats statistics.StoreHotRegionsStat + var readStats, writeStats statistics.StoreHotPeersStat if hotRead := h.GetHotReadRegions(); hotRead != nil { readStats = hotRead.AsLeader } @@ -140,13 +140,13 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { return trendStores, nil } -func (h *trendHandler) getStoreFlow(stats statistics.StoreHotRegionsStat, storeID uint64) (storeFlow float64, regionFlows []float64) { +func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, storeID uint64) (storeFlow float64, regionFlows []float64) { if stats == nil { return } if stat, ok := stats[storeID]; ok { storeFlow = stat.TotalBytesRate - for _, flow := range stat.RegionsStat { + for _, flow := range stat.Stats { regionFlows = append(regionFlows, flow.GetBytesRate()) } } diff --git a/server/coordinator.go b/server/coordinator.go index cd9436e4e49..787dfd1a6d6 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -260,11 +260,11 @@ func (c *coordinator) stop() { // Hack to retrieve info from scheduler. // TODO: remove it. type hasHotStatus interface { - GetHotReadStatus() *statistics.StoreHotRegionInfos - GetHotWriteStatus() *statistics.StoreHotRegionInfos + GetHotReadStatus() *statistics.StoreHotPeersInfos + GetHotWriteStatus() *statistics.StoreHotPeersInfos } -func (c *coordinator) getHotWriteRegions() *statistics.StoreHotRegionInfos { +func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos { c.RLock() defer c.RUnlock() s, ok := c.schedulers[hotRegionScheduleName] @@ -277,7 +277,7 @@ func (c *coordinator) getHotWriteRegions() *statistics.StoreHotRegionInfos { return nil } -func (c *coordinator) getHotReadRegions() *statistics.StoreHotRegionInfos { +func (c *coordinator) getHotReadRegions() *statistics.StoreHotPeersInfos { c.RLock() defer c.RUnlock() s, ok := c.schedulers[hotRegionScheduleName] @@ -336,7 +336,7 @@ func (c *coordinator) collectHotSpotMetrics() { stat, ok := status.AsPeer[storeID] if ok { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalBytesRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.RegionsCount)) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0) @@ -345,7 +345,7 @@ func (c *coordinator) collectHotSpotMetrics() { stat, ok = status.AsLeader[storeID] if ok { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalBytesRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.RegionsCount)) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0) @@ -361,7 +361,7 @@ func (c *coordinator) collectHotSpotMetrics() { stat, ok := status.AsLeader[storeID] if ok { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalBytesRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.RegionsCount)) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0) diff --git a/server/handler.go b/server/handler.go index 1102f464c70..a16d63b4597 100644 --- a/server/handler.go +++ b/server/handler.go @@ -126,7 +126,7 @@ func (h *Handler) GetStores() ([]*core.StoreInfo, error) { } // GetHotWriteRegions gets all hot write regions stats. -func (h *Handler) GetHotWriteRegions() *statistics.StoreHotRegionInfos { +func (h *Handler) GetHotWriteRegions() *statistics.StoreHotPeersInfos { c, err := h.getCoordinator() if err != nil { return nil @@ -135,7 +135,7 @@ func (h *Handler) GetHotWriteRegions() *statistics.StoreHotRegionInfos { } // GetHotReadRegions gets all hot read regions stats. -func (h *Handler) GetHotReadRegions() *statistics.StoreHotRegionInfos { +func (h *Handler) GetHotReadRegions() *statistics.StoreHotPeersInfos { c, err := h.getCoordinator() if err != nil { return nil diff --git a/server/schedule/opt/opts.go b/server/schedule/opt/opts.go index b27777acb78..bead9004d07 100644 --- a/server/schedule/opt/opts.go +++ b/server/schedule/opt/opts.go @@ -78,6 +78,7 @@ type Cluster interface { core.StoreSetController statistics.RegionStatInformer + statistics.StoreStatInformer Options // TODO: it should be removed. Schedulers don't need to know anything diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index c1b68b3b297..58a21ffe187 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -233,7 +233,7 @@ func (s *testBalanceSpeedSuite) TestTolerantRatio(c *C) { tc := mockcluster.NewCluster(opt) // create a region to control average region size. tc.AddLeaderRegion(1, 1, 2) - regionSize := int64(96 * 1024) + regionSize := int64(96 * KB) region := tc.GetRegion(1).Clone(core.SetApproximateSize(regionSize)) tc.TolerantSizeRatio = 0 @@ -931,10 +931,10 @@ func (s *testReplicaCheckerSuite) TestStorageThreshold(c *C) { tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) tc.UpdateStorageRatio(1, 0.5, 0.5) - tc.UpdateStoreRegionSize(1, 500*1024*1024) + tc.UpdateStoreRegionSize(1, 500*MB) tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) tc.UpdateStorageRatio(2, 0.1, 0.9) - tc.UpdateStoreRegionSize(2, 100*1024*1024) + tc.UpdateStoreRegionSize(2, 100*MB) tc.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) tc.AddLabelsStore(4, 0, map[string]string{"zone": "z3"}) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index f2052248ce4..04b1364c437 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -50,10 +50,10 @@ func init() { } const ( - hotRegionLimitFactor = 0.75 - storeHotRegionsDefaultLen = 100 - hotRegionScheduleFactor = 0.9 - balanceHotRegionName = "balance-hot-region-scheduler" + hotRegionLimitFactor = 0.75 + storeHotPeersDefaultLen = 100 + hotRegionScheduleFactor = 0.9 + balanceHotRegionName = "balance-hot-region-scheduler" ) // BalanceType : the perspective of balance @@ -65,16 +65,16 @@ const ( ) type storeStatistics struct { - readStatAsLeader statistics.StoreHotRegionsStat - writeStatAsPeer statistics.StoreHotRegionsStat - writeStatAsLeader statistics.StoreHotRegionsStat + readStatAsLeader statistics.StoreHotPeersStat + writeStatAsPeer statistics.StoreHotPeersStat + writeStatAsLeader statistics.StoreHotPeersStat } func newStoreStaticstics() *storeStatistics { return &storeStatistics{ - readStatAsLeader: make(statistics.StoreHotRegionsStat), - writeStatAsLeader: make(statistics.StoreHotRegionsStat), - writeStatAsPeer: make(statistics.StoreHotRegionsStat), + readStatAsLeader: make(statistics.StoreHotPeersStat), + writeStatAsLeader: make(statistics.StoreHotPeersStat), + writeStatAsPeer: make(statistics.StoreHotPeersStat), } } @@ -157,13 +157,14 @@ func (h *balanceHotRegionsScheduler) Schedule(cluster opt.Cluster) []*operator.O func (h *balanceHotRegionsScheduler) dispatch(typ BalanceType, cluster opt.Cluster) []*operator.Operator { h.Lock() defer h.Unlock() + storesStat := cluster.GetStoresStats() switch typ { case hotReadRegionBalance: - h.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind) + h.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), storesStat.GetStoresBytesReadStat(), cluster, core.LeaderKind) return h.balanceHotReadRegions(cluster) case hotWriteRegionBalance: - h.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind) - h.stats.writeStatAsPeer = calcScore(cluster.RegionWriteStats(), cluster, core.RegionKind) + h.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), storesStat.GetStoresBytesWriteStat(), cluster, core.LeaderKind) + h.stats.writeStatAsPeer = calcScore(cluster.RegionWriteStats(), storesStat.GetStoresBytesWriteStat(), cluster, core.RegionKind) return h.balanceHotWriteRegions(cluster) } return nil @@ -230,17 +231,24 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster opt.Cluster) return nil } -func calcScore(storeItems map[uint64][]*statistics.HotPeerStat, cluster opt.Cluster, kind core.ResourceKind) statistics.StoreHotRegionsStat { - stats := make(statistics.StoreHotRegionsStat) - for storeID, items := range storeItems { - // HotDegree is the update times on the hot cache. If the heartbeat report - // the flow of the region exceeds the threshold, the scheduler will update the region in - // the hot cache and the hotdegree of the region will increase. +func calcScore(storeHotPeers map[uint64][]*statistics.HotPeerStat, storeBytesStat map[uint64]float64, cluster opt.Cluster, kind core.ResourceKind) statistics.StoreHotPeersStat { + stats := make(statistics.StoreHotPeersStat) + for storeID, items := range storeHotPeers { + hotPeers, ok := stats[storeID] + if !ok { + hotPeers = &statistics.HotPeersStat{ + Stats: make([]statistics.HotPeerStat, 0, storeHotPeersDefaultLen), + } + stats[storeID] = hotPeers + } for _, r := range items { if kind == core.LeaderKind && !r.IsLeader() { continue } + // HotDegree is the update times on the hot cache. If the heartbeat report + // the flow of the region exceeds the threshold, the scheduler will update the region in + // the hot cache and the hotdegree of the region will increase. if r.HotDegree < cluster.GetHotRegionCacheHitsThreshold() { continue } @@ -250,14 +258,6 @@ func calcScore(storeItems map[uint64][]*statistics.HotPeerStat, cluster opt.Clus continue } - storeStat, ok := stats[storeID] - if !ok { - storeStat = &statistics.HotRegionsStat{ - RegionsStat: make([]statistics.HotPeerStat, 0, storeHotRegionsDefaultLen), - } - stats[storeID] = storeStat - } - s := statistics.HotPeerStat{ StoreID: storeID, RegionID: r.RegionID, @@ -267,16 +267,20 @@ func calcScore(storeItems map[uint64][]*statistics.HotPeerStat, cluster opt.Clus LastUpdateTime: r.LastUpdateTime, Version: r.Version, } - storeStat.TotalBytesRate += r.GetBytesRate() - storeStat.RegionsCount++ - storeStat.RegionsStat = append(storeStat.RegionsStat, s) + hotPeers.TotalBytesRate += r.GetBytesRate() + hotPeers.Count++ + hotPeers.Stats = append(hotPeers.Stats, s) + } + + if rate, ok := storeBytesStat[storeID]; ok { + hotPeers.StoreBytesRate = rate } } return stats } // balanceByPeer balances the peer distribution of hot regions. -func (h *balanceHotRegionsScheduler) balanceByPeer(cluster opt.Cluster, storesStat statistics.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer, *metapb.Peer) { +func (h *balanceHotRegionsScheduler) balanceByPeer(cluster opt.Cluster, storesStat statistics.StoreHotPeersStat) (*core.RegionInfo, *metapb.Peer, *metapb.Peer) { if !h.allowBalanceRegion(cluster) { return nil, nil, nil } @@ -291,8 +295,8 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster opt.Cluster, storesSt // If we can find a target store, then return from this method. stores := cluster.GetStores() var destStoreID uint64 - for _, i := range h.r.Perm(len(storesStat[srcStoreID].RegionsStat)) { - rs := storesStat[srcStoreID].RegionsStat[i] + for _, i := range h.r.Perm(len(storesStat[srcStoreID].Stats)) { + rs := storesStat[srcStoreID].Stats[i] srcRegion := cluster.GetRegion(rs.RegionID) if srcRegion == nil { schedulerCounter.WithLabelValues(h.GetName(), "no-region").Inc() @@ -352,7 +356,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster opt.Cluster, storesSt } // balanceByLeader balances the leader distribution of hot regions. -func (h *balanceHotRegionsScheduler) balanceByLeader(cluster opt.Cluster, storesStat statistics.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer) { +func (h *balanceHotRegionsScheduler) balanceByLeader(cluster opt.Cluster, storesStat statistics.StoreHotPeersStat) (*core.RegionInfo, *metapb.Peer) { if !h.allowBalanceLeader(cluster) { return nil, nil } @@ -363,8 +367,8 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster opt.Cluster, stores } // select destPeer - for _, i := range h.r.Perm(len(storesStat[srcStoreID].RegionsStat)) { - rs := storesStat[srcStoreID].RegionsStat[i] + for _, i := range h.r.Perm(len(storesStat[srcStoreID].Stats)) { + rs := storesStat[srcStoreID].Stats[i] srcRegion := cluster.GetRegion(rs.RegionID) if srcRegion == nil { schedulerCounter.WithLabelValues(h.GetName(), "no-region").Inc() @@ -404,16 +408,19 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster opt.Cluster, stores // Select the store to move hot regions from. // We choose the store with the maximum number of hot region first. // Inside these stores, we choose the one with maximum flow bytes. -func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotRegionsStat) (srcStoreID uint64) { +func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotPeersStat) (srcStoreID uint64) { var ( - maxFlowBytes float64 - maxHotStoreRegionCount int + maxFlowBytes float64 + maxCount int ) - for storeID, statistics := range stats { - count, flowBytes := len(statistics.RegionsStat), statistics.TotalBytesRate - if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && flowBytes > maxFlowBytes)) { - maxHotStoreRegionCount = count + for storeID, stat := range stats { + count, flowBytes := len(stat.Stats), stat.StoreBytesRate + if count <= 1 { + continue + } + if flowBytes > maxFlowBytes || (flowBytes == maxFlowBytes && count > maxCount) { + maxCount = count maxFlowBytes = flowBytes srcStoreID = storeID } @@ -423,26 +430,19 @@ func (h *balanceHotRegionsScheduler) selectSrcStore(stats statistics.StoreHotReg // selectDestStore selects a target store to hold the region of the source region. // We choose a target store based on the hot region number and flow bytes of this store. -func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes float64, srcStoreID uint64, storesStat statistics.StoreHotRegionsStat) (destStoreID uint64) { - sr := storesStat[srcStoreID] - srcFlowBytes := sr.TotalBytesRate - srcHotRegionsCount := len(sr.RegionsStat) +func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionBytesRate float64, srcStoreID uint64, storesStat statistics.StoreHotPeersStat) (destStoreID uint64) { + srcBytesRate := storesStat[srcStoreID].StoreBytesRate var ( - minFlowBytes float64 = math.MaxFloat64 - minRegionsCount = int(math.MaxInt32) + minBytesRate float64 = srcBytesRate*hotRegionScheduleFactor - regionBytesRate + minCount = int(math.MaxInt32) ) for _, storeID := range candidateStoreIDs { if s, ok := storesStat[storeID]; ok { - if srcHotRegionsCount-len(s.RegionsStat) > 1 && minRegionsCount > len(s.RegionsStat) { - destStoreID = storeID - minFlowBytes = s.TotalBytesRate - minRegionsCount = len(s.RegionsStat) - continue - } - if minRegionsCount == len(s.RegionsStat) && minFlowBytes > s.TotalBytesRate && - srcFlowBytes*hotRegionScheduleFactor > s.TotalBytesRate+2*regionFlowBytes { - minFlowBytes = s.TotalBytesRate + count, dstBytesRate := len(s.Stats), s.StoreBytesRate + if minBytesRate > dstBytesRate || (minBytesRate == dstBytesRate && minCount > count) { + minCount = count + minBytesRate = dstBytesRate destStoreID = storeID } } else { @@ -453,38 +453,38 @@ func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, return } -func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat statistics.StoreHotRegionsStat) uint64 { +func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat statistics.StoreHotPeersStat) uint64 { srcStoreStatistics := storesStat[storeID] var hotRegionTotalCount int for _, m := range storesStat { - hotRegionTotalCount += len(m.RegionsStat) + hotRegionTotalCount += len(m.Stats) } avgRegionCount := float64(hotRegionTotalCount) / float64(len(storesStat)) // Multiplied by hotRegionLimitFactor to avoid transfer back and forth - limit := uint64((float64(len(srcStoreStatistics.RegionsStat)) - avgRegionCount) * hotRegionLimitFactor) + limit := uint64((float64(len(srcStoreStatistics.Stats)) - avgRegionCount) * hotRegionLimitFactor) return maxUint64(limit, 1) } -func (h *balanceHotRegionsScheduler) GetHotReadStatus() *statistics.StoreHotRegionInfos { +func (h *balanceHotRegionsScheduler) GetHotReadStatus() *statistics.StoreHotPeersInfos { h.RLock() defer h.RUnlock() - asLeader := make(statistics.StoreHotRegionsStat, len(h.stats.readStatAsLeader)) + asLeader := make(statistics.StoreHotPeersStat, len(h.stats.readStatAsLeader)) for id, stat := range h.stats.readStatAsLeader { clone := *stat asLeader[id] = &clone } - return &statistics.StoreHotRegionInfos{ + return &statistics.StoreHotPeersInfos{ AsLeader: asLeader, } } -func (h *balanceHotRegionsScheduler) GetHotWriteStatus() *statistics.StoreHotRegionInfos { +func (h *balanceHotRegionsScheduler) GetHotWriteStatus() *statistics.StoreHotPeersInfos { h.RLock() defer h.RUnlock() - asLeader := make(statistics.StoreHotRegionsStat, len(h.stats.writeStatAsLeader)) - asPeer := make(statistics.StoreHotRegionsStat, len(h.stats.writeStatAsPeer)) + asLeader := make(statistics.StoreHotPeersStat, len(h.stats.writeStatAsLeader)) + asPeer := make(statistics.StoreHotPeersStat, len(h.stats.writeStatAsPeer)) for id, stat := range h.stats.writeStatAsLeader { clone := *stat asLeader[id] = &clone @@ -493,7 +493,7 @@ func (h *balanceHotRegionsScheduler) GetHotWriteStatus() *statistics.StoreHotReg clone := *stat asPeer[id] = &clone } - return &statistics.StoreHotRegionInfos{ + return &statistics.StoreHotPeersInfos{ AsLeader: asLeader, AsPeer: asPeer, } diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 228d61ced6e..922e0da16af 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -40,6 +40,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestSchedule(c *C) { tc := mockcluster.NewCluster(opt) hb, err := schedule.CreateScheduler("hot-write-region", schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) + opt.HotRegionCacheHitsThreshold = 0 // Add stores 1, 2, 3, 4, 5, 6 with region counts 3, 2, 2, 2, 0, 0. @@ -52,55 +53,93 @@ func (s *testHotWriteRegionSchedulerSuite) TestSchedule(c *C) { tc.AddLabelsStore(7, 0, map[string]string{"zone": "z5", "host": "h7"}) tc.SetStoreDown(7) - // Report store written bytes. - tc.UpdateStorageWrittenBytes(1, 75*1024*1024) - tc.UpdateStorageWrittenBytes(2, 45*1024*1024) - tc.UpdateStorageWrittenBytes(3, 45*1024*1024) - tc.UpdateStorageWrittenBytes(4, 60*1024*1024) + //| store_id | write_bytes_rate | + //|----------|------------------| + //| 1 | 7.5MB | + //| 2 | 4.5MB | + //| 3 | 4.5MB | + //| 4 | 6MB | + //| 5 | 0MB | + //| 6 | 0MB | + tc.UpdateStorageWrittenBytes(1, 7.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(4, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenBytes(5, 0) tc.UpdateStorageWrittenBytes(6, 0) - // Region 1, 2 and 3 are hot regions. //| region_id | leader_store | follower_store | follower_store | written_bytes | //|-----------|--------------|----------------|----------------|---------------| //| 1 | 1 | 2 | 3 | 512KB | //| 2 | 1 | 3 | 4 | 512KB | //| 3 | 1 | 2 | 4 | 512KB | - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 3, 4) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 4) - opt.HotRegionCacheHitsThreshold = 0 + // Region 1, 2 and 3 are hot regions. + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 3, 4) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 4) // Will transfer a hot region from store 1, because the total count of peers // which is hot for store 1 is more larger than other stores. - op := hb.Schedule(tc) - c.Assert(op, NotNil) - switch op[0].Len() { + op := hb.Schedule(tc)[0] + switch op.Len() { case 1: // balance by leader selected - testutil.CheckTransferLeaderFrom(c, op[0], operator.OpHotRegion, 1) + testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 1) case 4: // balance by peer selected - if op[0].RegionID() == 2 { + if op.RegionID() == 2 { // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label - testutil.CheckTransferPeerWithLeaderTransferFrom(c, op[0], operator.OpHotRegion, 1) + testutil.CheckTransferPeerWithLeaderTransferFrom(c, op, operator.OpHotRegion, 1) } else { - // peer in store 1 of the region 1,2 can only transfer to store 6 - testutil.CheckTransferPeerWithLeaderTransfer(c, op[0], operator.OpHotRegion, 1, 6) + // peer in store 1 of the region 1,3 can only transfer to store 6 + testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 6) } + default: + c.Fatalf("wrong op: %v", op) } // hot region scheduler is restricted by `hot-region-schedule-limit`. opt.HotRegionScheduleLimit = 0 c.Assert(hb.Schedule(tc), HasLen, 0) - // hot region scheduler is not affect by `balance-region-schedule-limit`. opt.HotRegionScheduleLimit = mockoption.NewScheduleOptions().HotRegionScheduleLimit + + // hot region scheduler is restricted by schedule limit. + opt.LeaderScheduleLimit = 0 + for i := 0; i < 20; i++ { + op := hb.Schedule(tc)[0] + c.Assert(op.Len(), Equals, 4) + if op.RegionID() == 2 { + // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label + testutil.CheckTransferPeerWithLeaderTransferFrom(c, op, operator.OpHotRegion, 1) + } else { + // peer in store 1 of the region 1,3 can only transfer to store 6 + testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 6) + } + } + opt.LeaderScheduleLimit = mockoption.NewScheduleOptions().LeaderScheduleLimit + + // hot region scheduler is not affect by `balance-region-schedule-limit`. opt.RegionScheduleLimit = 0 c.Assert(hb.Schedule(tc), HasLen, 1) // Always produce operator c.Assert(hb.Schedule(tc), HasLen, 1) c.Assert(hb.Schedule(tc), HasLen, 1) + //| store_id | write_bytes_rate | + //|----------|------------------| + //| 1 | 6MB | + //| 2 | 5MB | + //| 3 | 6MB | + //| 4 | 3MB | + //| 5 | 0MB | + //| 6 | 3MB | + tc.UpdateStorageWrittenBytes(1, 6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(4, 3*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(5, 0) + tc.UpdateStorageWrittenBytes(6, 3*MB*statistics.StoreHeartBeatReportInterval) + //| region_id | leader_store | follower_store | follower_store | written_bytes | //|-----------|--------------|----------------|----------------|---------------| //| 1 | 1 | 2 | 3 | 512KB | @@ -108,29 +147,41 @@ func (s *testHotWriteRegionSchedulerSuite) TestSchedule(c *C) { //| 3 | 6 | 1 | 4 | 512KB | //| 4 | 5 | 6 | 4 | 512KB | //| 5 | 3 | 4 | 5 | 512KB | - tc.UpdateStorageWrittenBytes(1, 60*1024*1024) - tc.UpdateStorageWrittenBytes(2, 30*1024*1024) - tc.UpdateStorageWrittenBytes(3, 60*1024*1024) - tc.UpdateStorageWrittenBytes(4, 30*1024*1024) - tc.UpdateStorageWrittenBytes(5, 0*1024*1024) - tc.UpdateStorageWrittenBytes(6, 30*1024*1024) - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(3, 6, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 4) - tc.AddLeaderRegionWithWriteInfo(4, 5, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 6, 4) - tc.AddLeaderRegionWithWriteInfo(5, 3, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 4, 5) - // We can find that the leader of all hot regions are on store 1, - // so one of the leader will transfer to another store. - op = hb.Schedule(tc) - if op != nil { - testutil.CheckTransferLeaderFrom(c, op[0], operator.OpHotRegion, 1) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(3, 6, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 4) + tc.AddLeaderRegionWithWriteInfo(4, 5, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 6, 4) + tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 4, 5) + + // 6 possible operator. + // Assuming different operators have the same possibility, + // if code has bug, at most 6/7 possibility to success, + // test 30 times, possibility of success < 0.1%. + // Cannot transfer leader because store 2 and store 3 are hot. + // Source store is 1 or 3. + // Region 1 and 2 are the same, cannot move peer to store 5 due to the label. + // Region 3 can only move peer to store 5. + // Region 5 can only move peer to store 6. + for i := 0; i < 30; i++ { + op := hb.Schedule(tc)[0] + switch op.RegionID() { + case 1, 2: + if op.Len() == 3 { + testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 3, 6) + } else if op.Len() == 4 { + testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 6) + } else { + c.Fatalf("wrong operator: %v", op) + } + case 3: + testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 5) + case 5: + testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 3, 6) + default: + c.Fatalf("wrong operator: %v", op) + } } - // hot region scheduler is restricted by schedule limit. - opt.LeaderScheduleLimit = 0 - c.Assert(hb.Schedule(tc), HasLen, 0) - opt.LeaderScheduleLimit = mockoption.NewScheduleOptions().LeaderScheduleLimit - // Should not panic if region not found. for i := uint64(1); i <= 3; i++ { tc.Regions.RemoveRegion(tc.GetRegion(i)) @@ -149,6 +200,7 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { tc := mockcluster.NewCluster(opt) hb, err := schedule.CreateScheduler("hot-read-region", schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) + opt.HotRegionCacheHitsThreshold = 0 // Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0. tc.AddRegionStore(1, 3) @@ -157,25 +209,32 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { tc.AddRegionStore(4, 2) tc.AddRegionStore(5, 0) - // Report store read bytes. - tc.UpdateStorageReadBytes(1, 75*1024*1024) - tc.UpdateStorageReadBytes(2, 45*1024*1024) - tc.UpdateStorageReadBytes(3, 45*1024*1024) - tc.UpdateStorageReadBytes(4, 60*1024*1024) + //| store_id | read_bytes_rate | + //|----------|-----------------| + //| 1 | 7.5MB | + //| 2 | 4.5MB | + //| 3 | 4.5MB | + //| 4 | 6MB | + //| 5 | 0MB | + tc.UpdateStorageReadBytes(1, 7.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(4, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(5, 0) + //| region_id | leader_store | follower_store | follower_store | read_bytes_rate | + //|-----------|--------------|----------------|----------------|--------------------| + //| 1 | 1 | 2 | 3 | 512KB | + //| 2 | 2 | 1 | 3 | 512KB | + //| 3 | 1 | 2 | 3 | 512KB | + //| 11 | 1 | 2 | 3 | 24KB | // Region 1, 2 and 3 are hot regions. - //| region_id | leader_store | follower_store | follower_store | read_bytes | - //|-----------|--------------|----------------|----------------|---------------| - //| 1 | 1 | 2 | 3 | 512KB | - //| 2 | 2 | 1 | 3 | 512KB | - //| 3 | 1 | 2 | 3 | 512KB | - tc.AddLeaderRegionWithReadInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) - tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) // lower than hot read flow rate, but higher than write flow rate - tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - opt.HotRegionCacheHitsThreshold = 0 + tc.AddLeaderRegionWithReadInfo(11, 1, 24*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) c.Assert(tc.IsRegionHot(tc.GetRegion(11)), IsFalse) // check randomly pick hot region @@ -187,27 +246,49 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { c.Assert(len(stats), Equals, 2) for _, ss := range stats { for _, s := range ss { - c.Assert(s.BytesRate, Equals, 512.0*1024) + c.Assert(s.BytesRate, Equals, 512.0*KB) } } - // Will transfer a hot region leader from store 1 to store 3, because the total count of peers - // which is hot for store 1 is more larger than other stores. + + // Will transfer a hot region leader from store 1 to store 3. + // bytes_rate[store 1] * 0.9 > bytes_rate[store 3] + region_bytes_rate + // hot_region_count[store 3] < hot_regin_count[store 2] testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3) // assume handle the operator - tc.AddLeaderRegionWithReadInfo(3, 3, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 2) - + tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 2) // After transfer a hot region leader from store 1 to store 3 - // the tree region leader will be evenly distributed in three stores - tc.UpdateStorageReadBytes(1, 60*1024*1024) - tc.UpdateStorageReadBytes(2, 30*1024*1024) - tc.UpdateStorageReadBytes(3, 60*1024*1024) - tc.UpdateStorageReadBytes(4, 30*1024*1024) - tc.UpdateStorageReadBytes(5, 30*1024*1024) - tc.AddLeaderRegionWithReadInfo(4, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(5, 4, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 5) - - // Now appear two read hot region in store 1 and 4 - // We will Transfer peer from 1 to 5 + // the three region leader will be evenly distributed in three stores + + //| store_id | read_bytes_rate | + //|----------|-----------------| + //| 1 | 6MB | + //| 2 | 5MB | + //| 3 | 6MB | + //| 4 | 3MB | + //| 5 | 3MB | + tc.UpdateStorageReadBytes(1, 6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(2, 5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(4, 3*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(5, 3*MB*statistics.StoreHeartBeatReportInterval) + + //| region_id | leader_store | follower_store | follower_store | read_bytes_rate | + //|-----------|--------------|----------------|----------------|--------------------| + //| 1 | 1 | 2 | 3 | 512KB | + //| 2 | 2 | 1 | 3 | 512KB | + //| 3 | 3 | 2 | 1 | 512KB | + //| 4 | 1 | 2 | 3 | 512KB | + //| 5 | 4 | 2 | 5 | 512KB | + //| 11 | 1 | 2 | 3 | 24KB | + tc.AddLeaderRegionWithReadInfo(4, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(5, 4, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 5) + + // We will move leader peer of region 1 from 1 to 5 + // Store 1 will be selected as source store (max rate, count > store 4 count). + // When trying to transfer leader: + // Store 2 and store 3 are also hot, failed. + // Trying to move leader peer: + // Store 5 is selected as destination because of less hot region count. testutil.CheckTransferPeerWithLeaderTransfer(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 5) // Should not panic if region not found. @@ -233,26 +314,26 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { tc.AddRegionStore(5, 0) // Report store read bytes. - tc.UpdateStorageReadBytes(1, 75*1024*1024) - tc.UpdateStorageReadBytes(2, 45*1024*1024) - tc.UpdateStorageReadBytes(3, 45*1024*1024) - tc.UpdateStorageReadBytes(4, 60*1024*1024) + tc.UpdateStorageReadBytes(1, 7.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(4, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(5, 0) /// For read flow - tc.AddLeaderRegionWithReadInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) - tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) // lower than hot read flow rate, but higher than write flow rate - tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(11, 1, 24*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) opt.HotRegionCacheHitsThreshold = 0 stats := tc.RegionStats(statistics.ReadFlow) c.Assert(len(stats[1]), Equals, 2) c.Assert(len(stats[2]), Equals, 1) c.Assert(len(stats[3]), Equals, 0) - tc.AddLeaderRegionWithReadInfo(3, 2, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(3, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(11, 1, 24*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) stats = tc.RegionStats(statistics.ReadFlow) c.Assert(len(stats[1]), Equals, 1) @@ -260,21 +341,21 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { c.Assert(len(stats[3]), Equals, 0) // For write flow - tc.UpdateStorageWrittenBytes(1, 60*1024*1024) - tc.UpdateStorageWrittenBytes(2, 30*1024*1024) - tc.UpdateStorageWrittenBytes(3, 60*1024*1024) - tc.UpdateStorageWrittenBytes(4, 30*1024*1024) - tc.UpdateStorageWrittenBytes(5, 0*1024*1024) - tc.AddLeaderRegionWithWriteInfo(4, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(5, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(6, 1, 12*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.UpdateStorageWrittenBytes(1, 6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 3*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(4, 3*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(5, 0) + tc.AddLeaderRegionWithWriteInfo(4, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(5, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(6, 1, 12*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) stats = tc.RegionStats(statistics.WriteFlow) c.Assert(len(stats[1]), Equals, 2) c.Assert(len(stats[2]), Equals, 2) c.Assert(len(stats[3]), Equals, 2) - tc.AddLeaderRegionWithWriteInfo(5, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 5) + tc.AddLeaderRegionWithWriteInfo(5, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 5) stats = tc.RegionStats(statistics.WriteFlow) c.Assert(len(stats[1]), Equals, 2) diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 34ef9db6748..96a5ee5bc2c 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -353,10 +353,10 @@ func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) { tc.AddLabelsStore(6, 0, map[string]string{"zone": "z4", "host": "h6"}) // Report store written bytes. - tc.UpdateStorageWrittenBytes(1, 75*1024*1024) - tc.UpdateStorageWrittenBytes(2, 45*1024*1024) - tc.UpdateStorageWrittenBytes(3, 45*1024*1024) - tc.UpdateStorageWrittenBytes(4, 60*1024*1024) + tc.UpdateStorageWrittenBytes(1, 7.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(4, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenBytes(5, 0) tc.UpdateStorageWrittenBytes(6, 0) @@ -366,9 +366,9 @@ func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) { //| 1 | 1 | 2 | 3 | 512KB | //| 2 | 1 | 3 | 4 | 512KB | //| 3 | 1 | 2 | 4 | 512KB | - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 3, 4) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 4) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 3, 4) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 4) opt.HotRegionCacheHitsThreshold = 0 // try to get an operator @@ -402,13 +402,13 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) { tc.AddRegionStore(3, 2) // Report store read bytes. - tc.UpdateStorageReadBytes(1, 75*1024*1024) - tc.UpdateStorageReadBytes(2, 45*1024*1024) - tc.UpdateStorageReadBytes(3, 45*1024*1024) + tc.UpdateStorageReadBytes(1, 7.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) - tc.AddLeaderRegionWithReadInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2) - tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) - tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) opt.HotRegionCacheHitsThreshold = 0 c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) c.Assert(hb.Schedule(tc), IsNil) diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index f8f9d632714..dbe73dc45de 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -107,21 +107,25 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op } func (s *shuffleHotRegionScheduler) dispatch(typ BalanceType, cluster opt.Cluster) []*operator.Operator { + storesStats := cluster.GetStoresStats() switch typ { case hotReadRegionBalance: - s.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), cluster, core.LeaderKind) + s.stats.readStatAsLeader = calcScore(cluster.RegionReadStats(), storesStats.GetStoresBytesReadStat(), cluster, core.LeaderKind) return s.randomSchedule(cluster, s.stats.readStatAsLeader) case hotWriteRegionBalance: - s.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), cluster, core.LeaderKind) + s.stats.writeStatAsLeader = calcScore(cluster.RegionWriteStats(), storesStats.GetStoresBytesWriteStat(), cluster, core.LeaderKind) return s.randomSchedule(cluster, s.stats.writeStatAsLeader) } return nil } -func (s *shuffleHotRegionScheduler) randomSchedule(cluster opt.Cluster, storeStats statistics.StoreHotRegionsStat) []*operator.Operator { +func (s *shuffleHotRegionScheduler) randomSchedule(cluster opt.Cluster, storeStats statistics.StoreHotPeersStat) []*operator.Operator { for _, stats := range storeStats { - i := s.r.Intn(len(stats.RegionsStat)) - r := stats.RegionsStat[i] + if len(stats.Stats) < 1 { + continue + } + i := s.r.Intn(len(stats.Stats)) + r := stats.Stats[i] // select src region srcRegion := cluster.GetRegion(r.RegionID) if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 { diff --git a/server/schedulers/utils_test.go b/server/schedulers/utils_test.go index e24d69b30cb..9836ee0a54a 100644 --- a/server/schedulers/utils_test.go +++ b/server/schedulers/utils_test.go @@ -23,6 +23,11 @@ import ( "github.com/pingcap/pd/server/core" ) +const ( + KB = 1024 + MB = 1024 * KB +) + func TestSchedulers(t *testing.T) { TestingT(t) } diff --git a/server/statistics/hot_regions_stat.go b/server/statistics/hot_regions_stat.go index d56a247735e..d03457d9e49 100644 --- a/server/statistics/hot_regions_stat.go +++ b/server/statistics/hot_regions_stat.go @@ -13,9 +13,10 @@ package statistics -// HotRegionsStat records all hot regions statistics -type HotRegionsStat struct { +// HotPeersStat records all hot regions statistics +type HotPeersStat struct { + StoreBytesRate float64 `json:"-"` TotalBytesRate float64 `json:"total_flow_bytes"` - RegionsCount int `json:"regions_count"` - RegionsStat []HotPeerStat `json:"statistics"` + Count int `json:"regions_count"` + Stats []HotPeerStat `json:"statistics"` } diff --git a/server/statistics/store.go b/server/statistics/store.go index 99dd15738c9..f21e981b55a 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -61,24 +61,45 @@ func (s *StoresStats) GetRollingStoreStats(storeID uint64) *RollingStoreStats { return s.rollingStoresStats[storeID] } +// GetOrCreateRollingStoreStats gets or creates RollingStoreStats with a given store ID. +func (s *StoresStats) GetOrCreateRollingStoreStats(storeID uint64) *RollingStoreStats { + s.Lock() + defer s.Unlock() + ret, ok := s.rollingStoresStats[storeID] + if !ok { + ret = newRollingStoreStats() + s.rollingStoresStats[storeID] = ret + } + return ret +} + // Observe records the current store status with a given store. func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats) { - s.RLock() - defer s.RUnlock() - s.rollingStoresStats[storeID].Observe(stats) + store := s.GetOrCreateRollingStoreStats(storeID) + store.Observe(stats) +} + +// Set sets the store statistics (for test). +func (s *StoresStats) Set(storeID uint64, stats *pdpb.StoreStats) { + store := s.GetOrCreateRollingStoreStats(storeID) + store.Set(stats) } // UpdateTotalBytesRate updates the total bytes write rate and read rate. func (s *StoresStats) UpdateTotalBytesRate(f func() []*core.StoreInfo) { - s.RLock() - defer s.RUnlock() var totalBytesWriteRate float64 var totalBytesReadRate float64 var writeRate, readRate float64 ss := f() + s.RLock() + defer s.RUnlock() for _, store := range ss { if store.IsUp() { - writeRate, readRate = s.rollingStoresStats[store.GetID()].GetBytesRate() + stats, ok := s.rollingStoresStats[store.GetID()] + if !ok { + continue + } + writeRate, readRate = stats.GetBytesRate() totalBytesWriteRate += writeRate totalBytesReadRate += readRate } @@ -107,6 +128,26 @@ func (s *StoresStats) GetStoreBytesRate(storeID uint64) (writeRate float64, read return 0, 0 } +// GetStoreBytesWriteRate returns the bytes write stat of the specified store. +func (s *StoresStats) GetStoreBytesWriteRate(storeID uint64) float64 { + s.RLock() + defer s.RUnlock() + if storeStat, ok := s.rollingStoresStats[storeID]; ok { + return storeStat.GetBytesWriteRate() + } + return 0 +} + +// GetStoreBytesReadRate returns the bytes read stat of the specified store. +func (s *StoresStats) GetStoreBytesReadRate(storeID uint64) float64 { + s.RLock() + defer s.RUnlock() + if storeStat, ok := s.rollingStoresStats[storeID]; ok { + return storeStat.GetBytesReadRate() + } + return 0 +} + // GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo. func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]float64 { s.RLock() @@ -189,6 +230,21 @@ func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) { r.keysReadRate.Add(float64(stats.KeysRead) / float64(interval)) } +// Set sets the statistics (for test). +func (r *RollingStoreStats) Set(stats *pdpb.StoreStats) { + statInterval := stats.GetInterval() + interval := statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp() + if interval == 0 { + return + } + r.Lock() + defer r.Unlock() + r.bytesWriteRate.Set(float64(stats.BytesWritten) / float64(interval)) + r.bytesReadRate.Set(float64(stats.BytesRead) / float64(interval)) + r.keysWriteRate.Set(float64(stats.KeysWritten) / float64(interval)) + r.keysReadRate.Set(float64(stats.KeysRead) / float64(interval)) +} + // GetBytesRate returns the bytes write rate and the bytes read rate. func (r *RollingStoreStats) GetBytesRate() (writeRate float64, readRate float64) { r.RLock() @@ -196,6 +252,20 @@ func (r *RollingStoreStats) GetBytesRate() (writeRate float64, readRate float64) return r.bytesWriteRate.Get(), r.bytesReadRate.Get() } +// GetBytesWriteRate returns the bytes write rate. +func (r *RollingStoreStats) GetBytesWriteRate() float64 { + r.RLock() + defer r.RUnlock() + return r.bytesWriteRate.Get() +} + +// GetBytesReadRate returns the bytes read rate. +func (r *RollingStoreStats) GetBytesReadRate() float64 { + r.RLock() + defer r.RUnlock() + return r.bytesReadRate.Get() +} + // GetKeysWriteRate returns the keys write rate. func (r *RollingStoreStats) GetKeysWriteRate() float64 { r.RLock() diff --git a/server/statistics/store_hot_region_infos.go b/server/statistics/store_hot_peers_infos.go similarity index 63% rename from server/statistics/store_hot_region_infos.go rename to server/statistics/store_hot_peers_infos.go index a5f22a2ce35..a2b933565f4 100644 --- a/server/statistics/store_hot_region_infos.go +++ b/server/statistics/store_hot_peers_infos.go @@ -13,11 +13,11 @@ package statistics -// StoreHotRegionInfos : used to get human readable description for hot regions. -type StoreHotRegionInfos struct { - AsPeer StoreHotRegionsStat `json:"as_peer"` - AsLeader StoreHotRegionsStat `json:"as_leader"` +// StoreHotPeersInfos is used to get human-readable description for hot regions. +type StoreHotPeersInfos struct { + AsPeer StoreHotPeersStat `json:"as_peer"` + AsLeader StoreHotPeersStat `json:"as_leader"` } -// StoreHotRegionsStat used to record the hot region statistics group by store -type StoreHotRegionsStat map[uint64]*HotRegionsStat +// StoreHotPeersStat is used to record the hot region statistics group by store. +type StoreHotPeersStat map[uint64]*HotPeersStat diff --git a/server/statistics/store_stat_informer.go b/server/statistics/store_stat_informer.go new file mode 100644 index 00000000000..43e110ffd1b --- /dev/null +++ b/server/statistics/store_stat_informer.go @@ -0,0 +1,19 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +// StoreStatInformer provides access to a shared informer of statistics. +type StoreStatInformer interface { + GetStoresStats() *StoresStats +} diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 3440c729c1c..18a78dd442f 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -102,12 +102,12 @@ func (s *hotTestSuite) TestHot(c *C) { testHot := func(hotRegionID, hotStoreID uint64, hotType string) { args = []string{"-u", pdAddr, "hot", hotType} _, output, e := pdctl.ExecuteCommandC(cmd, args...) - hotRegion := statistics.StoreHotRegionInfos{} + hotRegion := statistics.StoreHotPeersInfos{} c.Assert(e, IsNil) c.Assert(json.Unmarshal(output, &hotRegion), IsNil) c.Assert(hotRegion.AsLeader, HasKey, hotStoreID) - c.Assert(hotRegion.AsLeader[hotStoreID].RegionsCount, Equals, 1) - c.Assert(hotRegion.AsLeader[hotStoreID].RegionsStat[0].RegionID, Equals, hotRegionID) + c.Assert(hotRegion.AsLeader[hotStoreID].Count, Equals, 1) + c.Assert(hotRegion.AsLeader[hotStoreID].Stats[0].RegionID, Equals, hotRegionID) } hotReadRegionID, hotWriteRegionID, hotStoreId := uint64(3), uint64(2), uint64(1)