Skip to content

Commit

Permalink
scheduler: refactor solve in hot-region-scheduler (#4952)
Browse files Browse the repository at this point in the history
ref #4949

Signed-off-by: HunDunDM <[email protected]>

Co-authored-by: ShuNing <[email protected]>
  • Loading branch information
HunDunDM and nolouch authored May 17, 2022
1 parent dac3836 commit 648a6ba
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 102 deletions.
231 changes: 135 additions & 96 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ type balanceSolver struct {

greatDecRatio float64
minorDecRatio float64
maxPeerNum int
minHotDegree int
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -416,6 +418,8 @@ func (bs *balanceSolver) init() {

bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold()
}

func (bs *balanceSolver) isSelectedDim(dim int) bool {
Expand Down Expand Up @@ -465,27 +469,31 @@ func (bs *balanceSolver) solve() []*operator.Operator {
return nil
}
bs.cur = &solution{}
tryUpdateBestSolution := func() {
if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) {
if newOps, newInfl := bs.buildOperators(); len(newOps) > 0 {
bs.ops = newOps
bs.infl = *newInfl
clone := *bs.cur
bs.best = &clone
}
}
}

for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()

for _, srcPeerStat := range bs.filterHotPeers() {
bs.cur.srcPeerStat = srcPeerStat
bs.cur.region = bs.getRegion()
if bs.cur.region == nil {
for _, srcPeerStat := range bs.filterHotPeers(srcStore) {
if bs.cur.region = bs.getRegion(srcPeerStat, srcStoreID); bs.cur.region == nil {
continue
}
bs.cur.srcPeerStat = srcPeerStat

for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
bs.calcProgressiveRank()
if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) {
if newOp, newInfl := bs.buildOperator(); newOp != nil {
bs.ops = []*operator.Operator{newOp}
bs.infl = *newInfl
clone := *bs.cur
bs.best = &clone
}
}
tryUpdateBestSolution()
}
}
}
Expand Down Expand Up @@ -563,37 +571,33 @@ func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
ret := bs.cur.srcStore.HotPeers
// Return at most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
maxPeerNum := bs.sche.conf.GetMaxPeerNumber()

// filter pending region
appendItem := func(items []*statistics.HotPeerStat, item *statistics.HotPeerStat) []*statistics.HotPeerStat {
minHotDegree := bs.GetOpts().GetHotRegionCacheHitsThreshold()
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(minHotDegree) {
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
appendItem := func(item *statistics.HotPeerStat) {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(bs.minHotDegree) {
// no in pending operator and no need cool down after transfer leader
items = append(items, item)
ret = append(ret, item)
}
return items
}
if len(ret) <= maxPeerNum {
nret := make([]*statistics.HotPeerStat, 0, len(ret))
for _, peer := range ret {
nret = appendItem(nret, peer)

src := storeLoad.HotPeers
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
if len(src) <= bs.maxPeerNum {
ret = make([]*statistics.HotPeerStat, 0, len(src))
for _, peer := range src {
appendItem(peer)
}
} else {
union := bs.sortHotPeers(src)
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
appendItem(peer)
}
return nret
}

union := bs.sortHotPeers(ret, maxPeerNum)
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
ret = appendItem(ret, peer)
}
return ret
return
}

func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat, maxPeerNum int) map[*statistics.HotPeerStat]struct{} {
func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
firstSort := make([]*statistics.HotPeerStat, len(ret))
copy(firstSort, ret)
sort.Slice(firstSort, func(i, j int) bool {
Expand All @@ -606,8 +610,8 @@ func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat, maxPeerNum
k := statistics.GetRegionStatKind(bs.rwTy, bs.secondPriority)
return secondSort[i].GetLoad(k) > secondSort[j].GetLoad(k)
})
union := make(map[*statistics.HotPeerStat]struct{}, maxPeerNum)
for len(union) < maxPeerNum {
union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum)
for len(union) < bs.maxPeerNum {
for len(firstSort) > 0 {
peer := firstSort[0]
firstSort = firstSort[1:]
Expand All @@ -616,7 +620,7 @@ func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat, maxPeerNum
break
}
}
for len(union) < maxPeerNum && len(secondSort) > 0 {
for len(union) < bs.maxPeerNum && len(secondSort) > 0 {
peer := secondSort[0]
secondSort = secondSort[1:]
if _, ok := union[peer]; !ok {
Expand Down Expand Up @@ -660,22 +664,26 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool {
return true
}

func (bs *balanceSolver) getRegion() *core.RegionInfo {
region := bs.GetRegion(bs.cur.srcPeerStat.ID())
func (bs *balanceSolver) getRegion(peerStat *statistics.HotPeerStat, storeID uint64) *core.RegionInfo {
region := bs.GetRegion(peerStat.ID())
if !bs.isRegionAvailable(region) {
return nil
}

switch bs.opTy {
case movePeer:
srcPeer := region.GetStorePeer(bs.cur.srcStore.GetID())
srcPeer := region.GetStorePeer(storeID)
if srcPeer == nil {
log.Debug("region does not have a peer on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID()))
log.Debug("region does not have a peer on source store, maybe stat out of date",
zap.Uint64("region-id", peerStat.ID()),
zap.Uint64("leader-store-id", storeID))
return nil
}
case transferLeader:
if region.GetLeader().GetStoreId() != bs.cur.srcStore.GetID() {
log.Debug("region leader is not on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID()))
if region.GetLeader().GetStoreId() != storeID {
log.Debug("region leader is not on source store, maybe stat out of date",
zap.Uint64("region-id", peerStat.ID()),
zap.Uint64("leader-store-id", storeID))
return nil
}
default:
Expand Down Expand Up @@ -800,7 +808,7 @@ func (bs *balanceSolver) isTolerance(dim int) bool {
return false
}
srcPending, dstPending := bs.cur.getPendingLoad(dim)
pendingAmp := (1 + pendingAmpFactor*srcRate/(srcRate-dstRate))
pendingAmp := 1 + pendingAmpFactor*srcRate/(srcRate-dstRate)
hotPendingStatus.WithLabelValues(bs.rwTy.String(), strconv.FormatUint(bs.cur.srcStore.GetID(), 10), strconv.FormatUint(bs.cur.dstStore.GetID(), 10)).Set(pendingAmp)
return srcRate-pendingAmp*srcPending > dstRate+pendingAmp*dstPending
}
Expand Down Expand Up @@ -832,6 +840,7 @@ func (bs *balanceSolver) isBetter(dim int) bool {
return isHot && decRatio <= bs.greatDecRatio && bs.isTolerance(dim)
}

// isNotWorsened must be true if isBetter is true.
func (bs *balanceSolver) isNotWorsened(dim int) bool {
isHot, decRatio := bs.getHotDecRatioByPriorities(dim)
return !isHot || decRatio <= bs.minorDecRatio
Expand Down Expand Up @@ -1012,94 +1021,124 @@ func (bs *balanceSolver) isReadyToBuild() bool {
bs.cur.region != nil && bs.cur.region.GetID() == bs.cur.srcPeerStat.ID()
}

func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistics.Influence) {
func (bs *balanceSolver) buildOperators() (ops []*operator.Operator, infl *statistics.Influence) {
if !bs.isReadyToBuild() {
return nil, nil
}
var (
err error
typ string
sourceLabel string
targetLabel string
)

srcStoreID := bs.cur.srcStore.GetID()
dstStoreID := bs.cur.dstStore.GetID()
switch bs.opTy {
case movePeer:
srcPeer := bs.cur.region.GetStorePeer(srcStoreID) // checked in getRegionAndSrcPeer
dstPeer := &metapb.Peer{StoreId: dstStoreID, Role: srcPeer.Role}
sourceLabel = strconv.FormatUint(srcStoreID, 10)
targetLabel = strconv.FormatUint(dstPeer.GetStoreId(), 10)
sourceLabel := strconv.FormatUint(srcStoreID, 10)
targetLabel := strconv.FormatUint(dstStoreID, 10)
dim := ""
switch bs.cur.progressiveRank {
case -3:
dim = "all"
case -2:
dim = dimToString(bs.secondPriority)
case -1:
dim = dimToString(bs.firstPriority)
}

var createOperator func(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error)
switch bs.rwTy {
case statistics.Read:
createOperator = bs.createReadOperator
case statistics.Write:
createOperator = bs.createWriteOperator
}

currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID)
if err == nil {
bs.decorateOperator(currentOp, sourceLabel, targetLabel, typ, dim)
ops = []*operator.Operator{currentOp}
infl = &statistics.Influence{
Loads: append(bs.cur.srcPeerStat.Loads[:0:0], bs.cur.srcPeerStat.Loads...),
Count: 1,
}
}

if err != nil {
log.Debug("fail to create operator", zap.Stringer("rw-type", bs.rwTy), zap.Stringer("op-type", bs.opTy), errs.ZapError(err))
schedulerCounter.WithLabelValues(bs.sche.GetName(), "create-operator-fail").Inc()
return nil, nil
}

return
}

if bs.rwTy == statistics.Read && bs.cur.region.GetLeader().StoreId == srcStoreID { // move read leader
func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-read-leader",
bs,
region,
srcStoreID,
dstStoreID,
[]uint64{},
operator.OpHotRegion)
} else {
srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers`
dstPeer := &metapb.Peer{StoreId: dstStoreID, Role: srcPeer.Role}
if region.GetLeader().GetStoreId() == srcStoreID {
typ = "move-leader"
op, err = operator.CreateMoveLeaderOperator(
"move-hot-read-leader",
bs,
bs.cur.region,
region,
operator.OpHotRegion,
srcStoreID,
dstPeer)
typ = "move-leader"
} else {
desc := "move-hot-" + bs.rwTy.String() + "-peer"
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
desc,
"move-hot-read-peer",
bs,
bs.cur.region,
region,
operator.OpHotRegion,
srcStoreID,
dstPeer)
}
case transferLeader:
if bs.cur.region.GetStoreVoter(dstStoreID) == nil {
return nil, nil
}
desc := "transfer-hot-" + bs.rwTy.String() + "-leader"
}
return
}

func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
sourceLabel = strconv.FormatUint(srcStoreID, 10)
targetLabel = strconv.FormatUint(dstStoreID, 10)
op, err = operator.CreateTransferLeaderOperator(
desc,
"transfer-hot-write-leader",
bs,
bs.cur.region,
region,
srcStoreID,
dstStoreID,
[]uint64{},
operator.OpHotRegion)
} else {
srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers`
dstPeer := &metapb.Peer{StoreId: dstStoreID, Role: srcPeer.Role}
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
"move-hot-write-peer",
bs,
region,
operator.OpHotRegion,
srcStoreID,
dstPeer)
}
return
}

if err != nil {
log.Debug("fail to create operator", zap.Stringer("rw-type", bs.rwTy), zap.Stringer("op-type", bs.opTy), errs.ZapError(err))
schedulerCounter.WithLabelValues(bs.sche.GetName(), "create-operator-fail").Inc()
return nil, nil
}

dim := ""
switch bs.cur.progressiveRank {
case -3:
dim = "all"
case -2:
dim = dimToString(bs.secondPriority)
case -1:
dim = dimToString(bs.firstPriority)
}

func (bs *balanceSolver) decorateOperator(op *operator.Operator, sourceLabel, targetLabel, typ, dim string) {
op.SetPriorityLevel(core.HighPriority)
op.FinishedCounters = append(op.FinishedCounters,
hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), sourceLabel, "out", dim),
hotDirectionCounter.WithLabelValues(typ, bs.rwTy.String(), targetLabel, "in", dim),
balanceDirectionCounter.WithLabelValues(bs.sche.GetName(), sourceLabel, targetLabel))
op.Counters = append(op.Counters,
schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String()))

infl = &statistics.Influence{
Loads: append(bs.cur.srcPeerStat.Loads[:0:0], bs.cur.srcPeerStat.Loads...),
Count: 1,
}
return op, infl
schedulerCounter.WithLabelValues(bs.sche.GetName(), typ))
}

// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1]
Expand Down
14 changes: 8 additions & 6 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,11 +1540,11 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {
hb.prepareForBalance(testcase.kind, tc)
leaderSolver := newBalanceSolver(hb, tc, testcase.kind, transferLeader)
leaderSolver.cur = &solution{srcStore: hb.stLoadInfos[toResourceType(testcase.kind, transferLeader)][2]}
c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule
c.Check(leaderSolver.filterHotPeers(leaderSolver.cur.srcStore), HasLen, 0) // skip schedule
threshold := tc.GetHotRegionCacheHitsThreshold()
tc.SetHotRegionCacheHitsThreshold(0)
c.Check(leaderSolver.filterHotPeers(), HasLen, 1)
tc.SetHotRegionCacheHitsThreshold(threshold)
leaderSolver.minHotDegree = 0
c.Check(leaderSolver.filterHotPeers(leaderSolver.cur.srcStore), HasLen, 1)
leaderSolver.minHotDegree = threshold
}

// move peer: add peer and remove peer
Expand Down Expand Up @@ -1628,10 +1628,12 @@ func (s *testHotCacheSuite) TestSortHotPeer(c *C) {
},
}}

u := leaderSolver.sortHotPeers(hotPeers, 1)
leaderSolver.maxPeerNum = 1
u := leaderSolver.sortHotPeers(hotPeers)
checkSortResult(c, []uint64{1}, u)

u = leaderSolver.sortHotPeers(hotPeers, 2)
leaderSolver.maxPeerNum = 2
u = leaderSolver.sortHotPeers(hotPeers)
checkSortResult(c, []uint64{1, 2}, u)
}

Expand Down

0 comments on commit 648a6ba

Please sign in to comment.