Skip to content

Commit

Permalink
Merge branch 'pendingrate' of github.com:lhy1024/pd into pendingrate
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 committed Aug 5, 2021
2 parents c09e578 + fc01f6f commit 18398a1
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 41 deletions.
83 changes: 52 additions & 31 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func filterHotPeers(
return ret
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool {
func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
Expand All @@ -384,19 +384,45 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS
}

func (h *hotScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Operator {
// prefer to balance by leader
leaderSolver := newBalanceSolver(h, cluster, read, transferLeader)
ops := leaderSolver.solve()
if len(ops) > 0 {
return ops
}

leaderOps := leaderSolver.solve()
peerSolver := newBalanceSolver(h, cluster, read, movePeer)
ops = peerSolver.solve()
if len(ops) > 0 {
return ops
peerOps := peerSolver.solve()
if len(leaderOps) == 0 && len(peerOps) == 0 {
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
}
if len(leaderOps) == 0 {
if peerSolver.tryAddPendingInfluence() {
return peerOps
}
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
}
if len(peerOps) == 0 {
if leaderSolver.tryAddPendingInfluence() {
return leaderOps
}
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
}
leaderSolver.cur = leaderSolver.best
if leaderSolver.betterThan(peerSolver.best) {
if leaderSolver.tryAddPendingInfluence() {
return leaderOps
}
if peerSolver.tryAddPendingInfluence() {
return peerOps
}

} else {
if peerSolver.tryAddPendingInfluence() {
return peerOps
}
if leaderSolver.tryAddPendingInfluence() {
return leaderOps
}
}
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
}
Expand All @@ -408,15 +434,15 @@ func (h *hotScheduler) balanceHotWriteRegions(cluster opt.Cluster) []*operator.O
case s < int(schedulePeerPr*100):
peerSolver := newBalanceSolver(h, cluster, write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 {
if len(ops) > 0 && peerSolver.tryAddPendingInfluence() {
return ops
}
default:
}

leaderSolver := newBalanceSolver(h, cluster, write, transferLeader)
ops := leaderSolver.solve()
if len(ops) > 0 {
if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() {
return ops
}

Expand All @@ -431,7 +457,10 @@ type balanceSolver struct {
rwTy rwType
opTy opType

cur *solution
cur *solution
best *solution
ops []*operator.Operator
infl Influence

maxSrc *storeLoad
minDst *storeLoad
Expand Down Expand Up @@ -568,11 +597,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
return nil
}
bs.cur = &solution{}
var (
best *solution
op *operator.Operator
infl Influence
)

for srcStoreID := range bs.filterSrcStores() {
bs.cur.srcStoreID = srcStoreID
Expand All @@ -586,22 +610,24 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for dstStoreID := range bs.filterDstStores() {
bs.cur.dstStoreID = dstStoreID
bs.calcProgressiveRank()
if bs.cur.progressiveRank < 0 && bs.betterThan(best) {
if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) {
if newOp, newInfl := bs.buildOperator(); newOp != nil {
op = newOp
infl = *newInfl
bs.ops = []*operator.Operator{newOp}
bs.infl = *newInfl
clone := *bs.cur
best = &clone
bs.best = &clone
}
}
}
}
}
return bs.ops
}

if best == nil {
return nil
func (bs *balanceSolver) tryAddPendingInfluence() bool {
if bs.best == nil || len(bs.ops) == 0 {
return false
}

// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
Expand All @@ -611,12 +637,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}

if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, maxZombieDur) {
return nil
}

return []*operator.Operator{op}
return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcStoreID, bs.best.dstStoreID, bs.infl, maxZombieDur)
}

func (bs *balanceSolver) isForWriteLeader() bool {
Expand Down
28 changes: 18 additions & 10 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,9 +744,14 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) {
}
}

testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3)
op := hb.Schedule(tc)[0]

// move leader from store 1 to store 5
// it is better than transfer leader from store 1 to store 3
testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 5)
hb.(*hotScheduler).clearPendingInfluence()
// assume handle the operator

// assume handle the transfer leader operator rather than move leader
tc.AddRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 2})
// After transfer a hot region leader from store 1 to store 3
// the three region leader will be evenly distributed in three stores
Expand Down Expand Up @@ -903,12 +908,14 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) {
})
}

// Before schedule, store byte/key rate: 7.1 | 6.1 | 6 | 5
// Min and max from storeLoadPred. They will be generated in the comparison of current and future.
for i := 0; i < 20; i++ {
hb.(*hotScheduler).clearPendingInfluence()

op1 := hb.Schedule(tc)[0]
testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5
testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4)
// After move-peer, store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5)

pendingAmpFactor = old
ops := hb.Schedule(tc)
Expand All @@ -917,28 +924,29 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) {

op2 := hb.Schedule(tc)[0]
testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4)
// store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5)
// After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6)

ops = hb.Schedule(tc)
c.Logf("%v", ops)
c.Assert(ops, HasLen, 0)
}

// Before schedule, store byte/key rate: 7.1 | 6.1 | 6 | 5
for i := 0; i < 20; i++ {
hb.(*hotScheduler).clearPendingInfluence()

op1 := hb.Schedule(tc)[0]
testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5
testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4)
// After move-peer, store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5)

op2 := hb.Schedule(tc)[0]
testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4)
// store bytekey rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5)
// After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6)
c.Assert(op2.Cancel(), IsTrue)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5

op2 = hb.Schedule(tc)[0]
testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4)
// store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5)
// After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5)

c.Assert(op1.Cancel(), IsTrue)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5)
Expand Down

0 comments on commit 18398a1

Please sign in to comment.