Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: unify leaderSolver and peerSolver in hot read scheduler #3925

Merged
merged 10 commits into from
Aug 4, 2021
71 changes: 45 additions & 26 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,37 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS
}

func (h *hotScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Operator {
// prefer to balance by leader
leaderSolver := newBalanceSolver(h, cluster, read, transferLeader)
ops := leaderSolver.solve()
if len(ops) > 0 {
return ops
}

leaderOps := leaderSolver.solve()
peerSolver := newBalanceSolver(h, cluster, read, movePeer)
ops = peerSolver.solve()
if len(ops) > 0 {
return ops
peerOps := peerSolver.solve()
if len(leaderOps) == 0 && len(peerOps) == 0 {
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
}
if len(leaderOps) == 0 && peerSolver.addPendingInfluence() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we prefer to balance by the leader. In this PR, it changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this pr will compared them and then selected

return peerOps
}
if len(peerOps) == 0 && leaderSolver.addPendingInfluence() {
return leaderOps
}
leaderSolver.cur = leaderSolver.best
if leaderSolver.betterThan(peerSolver.best) {
if leaderSolver.addPendingInfluence() {
return leaderOps
}
if peerSolver.addPendingInfluence() {
return peerOps
}

} else {
if peerSolver.addPendingInfluence() {
return peerOps
}
if leaderSolver.addPendingInfluence() {
return leaderOps
}
}
schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
}
Expand All @@ -389,15 +407,15 @@ func (h *hotScheduler) balanceHotWriteRegions(cluster opt.Cluster) []*operator.O
case s < int(schedulePeerPr*100):
peerSolver := newBalanceSolver(h, cluster, write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 {
if len(ops) > 0 && peerSolver.addPendingInfluence() {
return ops
}
default:
}

leaderSolver := newBalanceSolver(h, cluster, write, transferLeader)
ops := leaderSolver.solve()
if len(ops) > 0 {
if len(ops) > 0 && leaderSolver.addPendingInfluence() {
return ops
}

Expand All @@ -412,7 +430,10 @@ type balanceSolver struct {
rwTy rwType
opTy opType

cur *solution
cur *solution
best *solution
ops []*operator.Operator
infl Influence

maxSrc *storeLoad
minDst *storeLoad
Expand All @@ -436,6 +457,13 @@ type solution struct {
progressiveRank int64
}

func (bs *balanceSolver) addPendingInfluence() bool {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
if bs.best == nil || len(bs.ops) == 0 {
return false
}
return bs.sche.addPendingInfluence(bs.ops[0], bs.best.srcStoreID, bs.best.dstStoreID, bs.infl)
}

func (bs *balanceSolver) init() {
switch toResourceType(bs.rwTy, bs.opTy) {
case writePeer:
Expand Down Expand Up @@ -513,11 +541,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
return nil
}
bs.cur = &solution{}
var (
best *solution
op *operator.Operator
infl Influence
)

for srcStoreID := range bs.filterSrcStores() {
bs.cur.srcStoreID = srcStoreID
Expand All @@ -531,23 +554,19 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for dstStoreID := range bs.filterDstStores() {
bs.cur.dstStoreID = dstStoreID
bs.calcProgressiveRank()
if bs.cur.progressiveRank < 0 && bs.betterThan(best) {
if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) {
if newOp, newInfl := bs.buildOperator(); newOp != nil {
op = newOp
infl = *newInfl
bs.ops = []*operator.Operator{newOp}
bs.infl = *newInfl
clone := *bs.cur
best = &clone
bs.best = &clone
}
}
}
}
}

if best == nil || !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl) {
return nil
}

return []*operator.Operator{op}
return bs.ops
}

// filterSrcStores compare the min rate and the ratio * expectation rate, if both key and byte rate is greater than
Expand Down
22 changes: 13 additions & 9 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,14 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) {
}
}

testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3)
op := hb.Schedule(tc)[0]

// move leader from store 1 to store 5
// it is better than transfer leader from store 1 to store 3
testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 5)
hb.(*hotScheduler).clearPendingInfluence()
// assume handle the operator

// assume handle the transfer leader operator rather than move leader
tc.AddRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 2})
// After transfer a hot region leader from store 1 to store 3
// the three region leader will be evenly distributed in three stores
Expand Down Expand Up @@ -891,12 +896,12 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) {
hb.(*hotScheduler).clearPendingInfluence()

op1 := hb.Schedule(tc)[0]
testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5
testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5,5.5)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved

op2 := hb.Schedule(tc)[0]
testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4)
// store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5)
// store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6)

ops := hb.Schedule(tc)
c.Logf("%v", ops)
Expand All @@ -906,14 +911,13 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) {
hb.(*hotScheduler).clearPendingInfluence()

op1 := hb.Schedule(tc)[0]
testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5
testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5)

op2 := hb.Schedule(tc)[0]
testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4)
// store bytekey rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5)
// store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6)
c.Assert(op2.Cancel(), IsTrue)
// store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5

op2 = hb.Schedule(tc)[0]
testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4)
Expand Down