diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index defc59a6fed..8f9d26ec540 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -367,7 +367,7 @@ func filterHotPeers( return ret } -func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool { +func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool { regionID := op.RegionID() _, ok := h.regionPendings[regionID] if ok { @@ -383,19 +383,45 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS } func (h *hotScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Operator { - // prefer to balance by leader leaderSolver := newBalanceSolver(h, cluster, read, transferLeader) - ops := leaderSolver.solve() - if len(ops) > 0 { - return ops - } - + leaderOps := leaderSolver.solve() peerSolver := newBalanceSolver(h, cluster, read, movePeer) - ops = peerSolver.solve() - if len(ops) > 0 { - return ops + peerOps := peerSolver.solve() + if len(leaderOps) == 0 && len(peerOps) == 0 { + schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc() + return nil + } + if len(leaderOps) == 0 { + if peerSolver.tryAddPendingInfluence() { + return peerOps + } + schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc() + return nil } + if len(peerOps) == 0 { + if leaderSolver.tryAddPendingInfluence() { + return leaderOps + } + schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc() + return nil + } + leaderSolver.cur = leaderSolver.best + if leaderSolver.betterThan(peerSolver.best) { + if leaderSolver.tryAddPendingInfluence() { + return leaderOps + } + if peerSolver.tryAddPendingInfluence() { + return peerOps + } + } else { + if peerSolver.tryAddPendingInfluence() { + return peerOps + } + if leaderSolver.tryAddPendingInfluence() { + return leaderOps + } + } schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc() return nil } @@ -407,7 +433,7 @@ func (h *hotScheduler) balanceHotWriteRegions(cluster opt.Cluster) []*operator.O case s < int(schedulePeerPr*100): peerSolver := newBalanceSolver(h, cluster, write, movePeer) ops := peerSolver.solve() - if len(ops) > 0 { + if len(ops) > 0 && peerSolver.tryAddPendingInfluence() { return ops } default: @@ -415,7 +441,7 @@ func (h *hotScheduler) balanceHotWriteRegions(cluster opt.Cluster) []*operator.O leaderSolver := newBalanceSolver(h, cluster, write, transferLeader) ops := leaderSolver.solve() - if len(ops) > 0 { + if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() { return ops } @@ -430,7 +456,10 @@ type balanceSolver struct { rwTy rwType opTy opType - cur *solution + cur *solution + best *solution + ops []*operator.Operator + infl Influence maxSrc *storeLoad minDst *storeLoad @@ -567,11 +596,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { return nil } bs.cur = &solution{} - var ( - best *solution - op *operator.Operator - infl Influence - ) for srcStoreID := range bs.filterSrcStores() { bs.cur.srcStoreID = srcStoreID @@ -585,22 +609,24 @@ func (bs *balanceSolver) solve() []*operator.Operator { for dstStoreID := range bs.filterDstStores() { bs.cur.dstStoreID = dstStoreID bs.calcProgressiveRank() - if bs.cur.progressiveRank < 0 && bs.betterThan(best) { + if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) { if newOp, newInfl := bs.buildOperator(); newOp != nil { - op = newOp - infl = *newInfl + bs.ops = []*operator.Operator{newOp} + bs.infl = *newInfl clone := *bs.cur - best = &clone + bs.best = &clone } } } } } + return bs.ops +} - if best == nil { - return nil +func (bs *balanceSolver) tryAddPendingInfluence() bool { + if bs.best == nil || len(bs.ops) == 0 { + return false } - // Depending on the source of the statistics used, a different ZombieDuration will be used. // If the statistics are from the sum of Regions, there will be a longer ZombieDuration. var maxZombieDur time.Duration @@ -610,12 +636,7 @@ func (bs *balanceSolver) solve() []*operator.Operator { default: maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration() } - - if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, maxZombieDur) { - return nil - } - - return []*operator.Operator{op} + return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcStoreID, bs.best.dstStoreID, bs.infl, maxZombieDur) } // filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index 2ab66c6080a..6a9797819a8 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -739,9 +739,14 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { } } - testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3) + op := hb.Schedule(tc)[0] + + // move leader from store 1 to store 5 + // it is better than transfer leader from store 1 to store 3 + testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 5) hb.(*hotScheduler).clearPendingInfluence() - // assume handle the operator + + // assume handle the transfer leader operator rather than move leader tc.AddRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 2}) // After transfer a hot region leader from store 1 to store 3 // the three region leader will be evenly distributed in three stores @@ -894,37 +899,40 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { }) } + // Before schedule, store byte/key rate: 7.1 | 6.1 | 6 | 5 + // Min and max from storeLoadPred. They will be generated in the comparison of current and future. for i := 0; i < 20; i++ { hb.(*hotScheduler).clearPendingInfluence() op1 := hb.Schedule(tc)[0] - testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3) - // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5 + testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4) + // After move-peer, store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5) op2 := hb.Schedule(tc)[0] testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4) - // store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) + // After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6) ops := hb.Schedule(tc) c.Logf("%v", ops) c.Assert(ops, HasLen, 0) } + + // Before schedule, store byte/key rate: 7.1 | 6.1 | 6 | 5 for i := 0; i < 20; i++ { hb.(*hotScheduler).clearPendingInfluence() op1 := hb.Schedule(tc)[0] - testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3) - // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5 + testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4) + // After move-peer, store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5) op2 := hb.Schedule(tc)[0] testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4) - // store bytekey rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) + // After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6) c.Assert(op2.Cancel(), IsTrue) - // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5 op2 = hb.Schedule(tc)[0] testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4) - // store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) + // After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) c.Assert(op1.Cancel(), IsTrue) // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5)