From 26ced22044e45ba2e315fe639c3378880d61a208 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 29 Sep 2024 18:11:30 +0800 Subject: [PATCH] *: unify the receiver naming style (#8677) ref tikv/pd#8379 Signed-off-by: Ryan Leung --- pkg/schedule/checker/learner_checker.go | 6 +- pkg/schedule/checker/merge_checker.go | 50 +++---- pkg/schedule/schedulers/balance_leader.go | 116 ++++++++-------- pkg/schedule/schedulers/balance_witness.go | 98 ++++++------- pkg/schedule/schedulers/hot_region.go | 152 ++++++++++----------- pkg/schedule/schedulers/scatter_range.go | 58 ++++---- 6 files changed, 240 insertions(+), 240 deletions(-) diff --git a/pkg/schedule/checker/learner_checker.go b/pkg/schedule/checker/learner_checker.go index f9c4f7efb2b..8590904a760 100644 --- a/pkg/schedule/checker/learner_checker.go +++ b/pkg/schedule/checker/learner_checker.go @@ -36,13 +36,13 @@ func NewLearnerChecker(cluster sche.CheckerCluster) *LearnerChecker { } // Check verifies a region's role, creating an Operator if need. -func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator { - if l.IsPaused() { +func (c *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator { + if c.IsPaused() { learnerCheckerPausedCounter.Inc() return nil } for _, p := range region.GetLearners() { - op, err := operator.CreatePromoteLearnerOperator("promote-learner", l.cluster, region, p) + op, err := operator.CreatePromoteLearnerOperator("promote-learner", c.cluster, region, p) if err != nil { log.Debug("fail to create promote learner operator", errs.ZapError(err)) continue diff --git a/pkg/schedule/checker/merge_checker.go b/pkg/schedule/checker/merge_checker.go index d5a39da83ae..bf7fe4f2496 100644 --- a/pkg/schedule/checker/merge_checker.go +++ b/pkg/schedule/checker/merge_checker.go @@ -76,32 +76,32 @@ func (*MergeChecker) GetType() types.CheckerSchedulerType { // RecordRegionSplit put the recently split region into cache. MergeChecker // will skip check it for a while. -func (m *MergeChecker) RecordRegionSplit(regionIDs []uint64) { +func (c *MergeChecker) RecordRegionSplit(regionIDs []uint64) { for _, regionID := range regionIDs { - m.splitCache.PutWithTTL(regionID, nil, m.conf.GetSplitMergeInterval()) + c.splitCache.PutWithTTL(regionID, nil, c.conf.GetSplitMergeInterval()) } } // Check verifies a region's replicas, creating an Operator if need. -func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { +func (c *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { mergeCheckerCounter.Inc() - if m.IsPaused() { + if c.IsPaused() { mergeCheckerPausedCounter.Inc() return nil } // update the split cache. // It must be called before the following merge checker logic. - m.splitCache.UpdateTTL(m.conf.GetSplitMergeInterval()) + c.splitCache.UpdateTTL(c.conf.GetSplitMergeInterval()) - expireTime := m.startTime.Add(m.conf.GetSplitMergeInterval()) + expireTime := c.startTime.Add(c.conf.GetSplitMergeInterval()) if time.Now().Before(expireTime) { mergeCheckerRecentlyStartCounter.Inc() return nil } - if m.splitCache.Exists(region.GetID()) { + if c.splitCache.Exists(region.GetID()) { mergeCheckerRecentlySplitCounter.Inc() return nil } @@ -113,7 +113,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { } // region is not small enough - if !region.NeedMerge(int64(m.conf.GetMaxMergeRegionSize()), int64(m.conf.GetMaxMergeRegionKeys())) { + if !region.NeedMerge(int64(c.conf.GetMaxMergeRegionSize()), int64(c.conf.GetMaxMergeRegionKeys())) { mergeCheckerNoNeedCounter.Inc() return nil } @@ -124,24 +124,24 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { return nil } - if !filter.IsRegionReplicated(m.cluster, region) { + if !filter.IsRegionReplicated(c.cluster, region) { mergeCheckerAbnormalReplicaCounter.Inc() return nil } // skip hot region - if m.cluster.IsRegionHot(region) { + if c.cluster.IsRegionHot(region) { mergeCheckerHotRegionCounter.Inc() return nil } - prev, next := m.cluster.GetAdjacentRegions(region) + prev, next := c.cluster.GetAdjacentRegions(region) var target *core.RegionInfo - if m.checkTarget(region, next) { + if c.checkTarget(region, next) { target = next } - if !m.conf.IsOneWayMergeEnabled() && m.checkTarget(region, prev) { // allow a region can be merged by two ways. + if !c.conf.IsOneWayMergeEnabled() && c.checkTarget(region, prev) { // allow a region can be merged by two ways. if target == nil || prev.GetApproximateSize() < next.GetApproximateSize() { // pick smaller target = prev } @@ -152,7 +152,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { return nil } - regionMaxSize := m.cluster.GetStoreConfig().GetRegionMaxSize() + regionMaxSize := c.cluster.GetStoreConfig().GetRegionMaxSize() maxTargetRegionSizeThreshold := int64(float64(regionMaxSize) * float64(maxTargetRegionFactor)) if maxTargetRegionSizeThreshold < maxTargetRegionSize { maxTargetRegionSizeThreshold = maxTargetRegionSize @@ -161,14 +161,14 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { mergeCheckerTargetTooLargeCounter.Inc() return nil } - if err := m.cluster.GetStoreConfig().CheckRegionSize(uint64(target.GetApproximateSize()+region.GetApproximateSize()), - m.conf.GetMaxMergeRegionSize()); err != nil { + if err := c.cluster.GetStoreConfig().CheckRegionSize(uint64(target.GetApproximateSize()+region.GetApproximateSize()), + c.conf.GetMaxMergeRegionSize()); err != nil { mergeCheckerSplitSizeAfterMergeCounter.Inc() return nil } - if err := m.cluster.GetStoreConfig().CheckRegionKeys(uint64(target.GetApproximateKeys()+region.GetApproximateKeys()), - m.conf.GetMaxMergeRegionKeys()); err != nil { + if err := c.cluster.GetStoreConfig().CheckRegionKeys(uint64(target.GetApproximateKeys()+region.GetApproximateKeys()), + c.conf.GetMaxMergeRegionKeys()); err != nil { mergeCheckerSplitKeysAfterMergeCounter.Inc() return nil } @@ -176,7 +176,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { log.Debug("try to merge region", logutil.ZapRedactStringer("from", core.RegionToHexMeta(region.GetMeta())), logutil.ZapRedactStringer("to", core.RegionToHexMeta(target.GetMeta()))) - ops, err := operator.CreateMergeRegionOperator("merge-region", m.cluster, region, target, operator.OpMerge) + ops, err := operator.CreateMergeRegionOperator("merge-region", c.cluster, region, target, operator.OpMerge) if err != nil { log.Warn("create merge region operator failed", errs.ZapError(err)) return nil @@ -189,28 +189,28 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { return ops } -func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { +func (c *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { if adjacent == nil { mergeCheckerAdjNotExistCounter.Inc() return false } - if m.splitCache.Exists(adjacent.GetID()) { + if c.splitCache.Exists(adjacent.GetID()) { mergeCheckerAdjRecentlySplitCounter.Inc() return false } - if m.cluster.IsRegionHot(adjacent) { + if c.cluster.IsRegionHot(adjacent) { mergeCheckerAdjRegionHotCounter.Inc() return false } - if !AllowMerge(m.cluster, region, adjacent) { + if !AllowMerge(c.cluster, region, adjacent) { mergeCheckerAdjDisallowMergeCounter.Inc() return false } - if !checkPeerStore(m.cluster, region, adjacent) { + if !checkPeerStore(c.cluster, region, adjacent) { mergeCheckerAdjAbnormalPeerStoreCounter.Inc() return false } @@ -220,7 +220,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { return false } - if !filter.IsRegionReplicated(m.cluster, adjacent) { + if !filter.IsRegionReplicated(c.cluster, adjacent) { mergeCheckerAdjAbnormalReplicaCounter.Inc() return false } diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index e1412c43b23..44605f9c5b8 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -184,8 +184,8 @@ func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceL } // ServeHTTP implements the http.Handler interface. -func (l *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - l.handler.ServeHTTP(w, r) +func (s *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } // BalanceLeaderCreateOption is used to create a scheduler with an option. @@ -199,31 +199,31 @@ func WithBalanceLeaderName(name string) BalanceLeaderCreateOption { } // EncodeConfig implements the Scheduler interface. -func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { - l.conf.RLock() - defer l.conf.RUnlock() - return EncodeConfig(l.conf) +func (s *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { + s.conf.RLock() + defer s.conf.RUnlock() + return EncodeConfig(s.conf) } // ReloadConfig implements the Scheduler interface. -func (l *balanceLeaderScheduler) ReloadConfig() error { - l.conf.Lock() - defer l.conf.Unlock() +func (s *balanceLeaderScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() newCfg := &balanceLeaderSchedulerConfig{} - if err := l.conf.load(newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } - l.conf.Ranges = newCfg.Ranges - l.conf.Batch = newCfg.Batch + s.conf.Ranges = newCfg.Ranges + s.conf.Batch = newCfg.Batch return nil } // IsScheduleAllowed implements the Scheduler interface. -func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() +func (s *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(l.GetType(), operator.OpLeader) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpLeader) } return allowed } @@ -321,18 +321,18 @@ func (cs *candidateStores) resortStoreWithPos(pos int) { } // Schedule implements the Scheduler interface. -func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) } - defer l.filterCounter.Flush() - batch := l.conf.getBatch() + defer s.filterCounter.Flush() + batch := s.conf.getBatch() balanceLeaderScheduleCounter.Inc() leaderSchedulePolicy := cluster.GetSchedulerConfig().GetLeaderSchedulePolicy() - opInfluence := l.OpController.GetOpInfluence(cluster.GetBasicCluster()) + opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster()) kind := constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy) solver := newSolver(basePlan, kind, cluster, opInfluence) @@ -340,15 +340,15 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun scoreFunc := func(store *core.StoreInfo) float64 { return store.LeaderScore(solver.kind.Policy, solver.getOpInfluence(store.GetID())) } - sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetSchedulerConfig(), collector, l.filterCounter), false, scoreFunc) - targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetSchedulerConfig(), nil, l.filterCounter), true, scoreFunc) + sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, s.filters, cluster.GetSchedulerConfig(), collector, s.filterCounter), false, scoreFunc) + targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, s.filters, cluster.GetSchedulerConfig(), nil, s.filterCounter), true, scoreFunc) usedRegions := make(map[uint64]struct{}) result := make([]*operator.Operator, 0, batch) for sourceCandidate.hasStore() || targetCandidate.hasStore() { // first choose source if sourceCandidate.hasStore() { - op := createTransferLeaderOperator(sourceCandidate, transferOut, l, solver, usedRegions, collector) + op := createTransferLeaderOperator(sourceCandidate, transferOut, s, solver, usedRegions, collector) if op != nil { result = append(result, op) if len(result) >= batch { @@ -359,7 +359,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } // next choose target if targetCandidate.hasStore() { - op := createTransferLeaderOperator(targetCandidate, transferIn, l, solver, usedRegions, nil) + op := createTransferLeaderOperator(targetCandidate, transferIn, s, solver, usedRegions, nil) if op != nil { result = append(result, op) if len(result) >= batch { @@ -369,24 +369,24 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } } } - l.retryQuota.gc(append(sourceCandidate.stores, targetCandidate.stores...)) + s.retryQuota.gc(append(sourceCandidate.stores, targetCandidate.stores...)) return result, collector.GetPlans() } -func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLeaderScheduler, +func createTransferLeaderOperator(cs *candidateStores, dir string, s *balanceLeaderScheduler, ssolver *solver, usedRegions map[uint64]struct{}, collector *plan.Collector) *operator.Operator { store := cs.getStore() ssolver.Step++ defer func() { ssolver.Step-- }() - retryLimit := l.retryQuota.getLimit(store) + retryLimit := s.retryQuota.getLimit(store) var creator func(*solver, *plan.Collector) *operator.Operator switch dir { case transferOut: ssolver.Source, ssolver.Target = store, nil - creator = l.transferLeaderOut + creator = s.transferLeaderOut case transferIn: ssolver.Source, ssolver.Target = nil, store - creator = l.transferLeaderIn + creator = s.transferLeaderIn } var op *operator.Operator for i := 0; i < retryLimit; i++ { @@ -398,10 +398,10 @@ func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLea } } if op != nil { - l.retryQuota.resetLimit(store) + s.retryQuota.resetLimit(store) } else { - l.attenuate(store) - log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64(dir, store.GetID())) + s.attenuate(store) + log.Debug("no operator created for selected stores", zap.String("scheduler", s.GetName()), zap.Uint64(dir, store.GetID())) cs.next() } return op @@ -425,16 +425,16 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s // transferLeaderOut transfers leader from the source store. // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. -func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.sourceStoreID(), l.conf.getRanges()), +func (s *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator { + solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.sourceStoreID(), s.conf.getRanges()), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) + log.Debug("store has no leader", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) balanceLeaderNoLeaderRegionCounter.Inc() return nil } if solver.IsRegionHot(solver.Region) { - log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Debug("region is hot region, ignore it", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) if collector != nil { collector.Collect(plan.SetResource(solver.Region), plan.SetStatus(plan.NewStatus(plan.StatusRegionHot))) } @@ -444,12 +444,12 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl solver.Step++ defer func() { solver.Step-- }() targets := solver.GetFollowerStores(solver.Region) - finalFilters := l.filters + finalFilters := s.filters conf := solver.GetSchedulerConfig() - if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { - finalFilters = append(l.filters, leaderFilter) + if leaderFilter := filter.NewPlacementLeaderSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { + finalFilters = append(s.filters, leaderFilter) } - targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, l.filterCounter) + targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, s.filterCounter) leaderSchedulePolicy := conf.GetLeaderSchedulePolicy() sort.Slice(targets, func(i, j int) bool { iOp := solver.getOpInfluence(targets[i].GetID()) @@ -457,11 +457,11 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < targets[j].LeaderScore(leaderSchedulePolicy, jOp) }) for _, solver.Target = range targets { - if op := l.createOperator(solver, collector); op != nil { + if op := s.createOperator(solver, collector); op != nil { return op } } - log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Debug("region has no target store", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderNoTargetStoreCounter.Inc() return nil } @@ -469,16 +469,16 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl // transferLeaderIn transfers leader to the target store. // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. -func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.targetStoreID(), l.conf.getRanges()), +func (s *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { + solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.targetStoreID(), s.conf.getRanges()), nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.targetStoreID())) + log.Debug("store has no follower", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.targetStoreID())) balanceLeaderNoFollowerRegionCounter.Inc() return nil } if solver.IsRegionHot(solver.Region) { - log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Debug("region is hot region, ignore it", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderRegionHotCounter.Inc() return nil } @@ -486,38 +486,38 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla solver.Source = solver.GetStore(leaderStoreID) if solver.Source == nil { log.Debug("region has no leader or leader store cannot be found", - zap.String("scheduler", l.GetName()), + zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID()), zap.Uint64("store-id", leaderStoreID), ) balanceLeaderNoLeaderRegionCounter.Inc() return nil } - finalFilters := l.filters + finalFilters := s.filters conf := solver.GetSchedulerConfig() - if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { - finalFilters = append(l.filters, leaderFilter) + if leaderFilter := filter.NewPlacementLeaderSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { + finalFilters = append(s.filters, leaderFilter) } - target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.Target}). - FilterTarget(conf, nil, l.filterCounter, finalFilters...). + target := filter.NewCandidates(s.R, []*core.StoreInfo{solver.Target}). + FilterTarget(conf, nil, s.filterCounter, finalFilters...). PickFirst() if target == nil { - log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + log.Debug("region has no target store", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderNoTargetStoreCounter.Inc() return nil } - return l.createOperator(solver, collector) + return s.createOperator(solver, collector) } // createOperator creates the operator according to the source and target store. // If the region is hot or the difference between the two stores is tolerable, then // no new operator need to be created, otherwise create an operator that transfers // the leader from the source store to the target store for the region. -func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { +func (s *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { solver.Step++ defer func() { solver.Step-- }() - solver.sourceScore, solver.targetScore = solver.sourceStoreScore(l.GetName()), solver.targetStoreScore(l.GetName()) - if !solver.shouldBalance(l.GetName()) { + solver.sourceScore, solver.targetScore = solver.sourceStoreScore(s.GetName()), solver.targetStoreScore(s.GetName()) + if !solver.shouldBalance(s.GetName()) { balanceLeaderSkipCounter.Inc() if collector != nil { collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed))) @@ -526,7 +526,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. } solver.Step++ defer func() { solver.Step-- }() - op, err := operator.CreateTransferLeaderOperator(l.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader) + op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader) if err != nil { log.Debug("fail to create balance leader operator", errs.ZapError(err)) if collector != nil { @@ -538,7 +538,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. balanceLeaderNewOpCounter, ) op.FinishedCounters = append(op.FinishedCounters, - balanceDirectionCounter.WithLabelValues(l.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), + balanceDirectionCounter.WithLabelValues(s.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), ) op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 6953c7f7634..1fedb2769ee 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -180,8 +180,8 @@ func newBalanceWitnessScheduler(opController *operator.Controller, conf *balance } // ServeHTTP implements the http.Handler interface. -func (b *balanceWitnessScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - b.handler.ServeHTTP(w, r) +func (s *balanceWitnessScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } // BalanceWitnessCreateOption is used to create a scheduler with an option. @@ -195,46 +195,46 @@ func WithBalanceWitnessCounter(counter *prometheus.CounterVec) BalanceWitnessCre } // EncodeConfig implements the Scheduler interface. -func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { - b.conf.RLock() - defer b.conf.RUnlock() - return EncodeConfig(b.conf) +func (s *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { + s.conf.RLock() + defer s.conf.RUnlock() + return EncodeConfig(s.conf) } // ReloadConfig implements the Scheduler interface. -func (b *balanceWitnessScheduler) ReloadConfig() error { - b.conf.Lock() - defer b.conf.Unlock() +func (s *balanceWitnessScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() newCfg := &balanceWitnessSchedulerConfig{} - if err := b.conf.load(newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } - b.conf.Ranges = newCfg.Ranges - b.conf.Batch = newCfg.Batch + s.conf.Ranges = newCfg.Ranges + s.conf.Batch = newCfg.Batch return nil } // IsScheduleAllowed implements the Scheduler interface. -func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := b.OpController.OperatorCount(operator.OpWitness) < cluster.GetSchedulerConfig().GetWitnessScheduleLimit() +func (s *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpWitness) < cluster.GetSchedulerConfig().GetWitnessScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(b.GetType(), operator.OpWitness) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpWitness) } return allowed } // Schedule implements the Scheduler interface. -func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) } - batch := b.conf.getBatch() - schedulerCounter.WithLabelValues(b.GetName(), "schedule").Inc() + batch := s.conf.getBatch() + schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() - opInfluence := b.OpController.GetOpInfluence(cluster.GetBasicCluster()) + opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster()) kind := constant.NewScheduleKind(constant.WitnessKind, constant.ByCount) solver := newSolver(basePlan, kind, cluster, opInfluence) @@ -242,12 +242,12 @@ func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun scoreFunc := func(store *core.StoreInfo) float64 { return store.WitnessScore(solver.getOpInfluence(store.GetID())) } - sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, b.filters, cluster.GetSchedulerConfig(), collector, b.filterCounter), false, scoreFunc) + sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, s.filters, cluster.GetSchedulerConfig(), collector, s.filterCounter), false, scoreFunc) usedRegions := make(map[uint64]struct{}) result := make([]*operator.Operator, 0, batch) if sourceCandidate.hasStore() { - op := createTransferWitnessOperator(sourceCandidate, b, solver, usedRegions, collector) + op := createTransferWitnessOperator(sourceCandidate, s, solver, usedRegions, collector) if op != nil { result = append(result, op) if len(result) >= batch { @@ -256,21 +256,21 @@ func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun makeInfluence(op, solver, usedRegions, sourceCandidate) } } - b.retryQuota.gc(sourceCandidate.stores) + s.retryQuota.gc(sourceCandidate.stores) return result, collector.GetPlans() } -func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessScheduler, +func createTransferWitnessOperator(cs *candidateStores, s *balanceWitnessScheduler, ssolver *solver, usedRegions map[uint64]struct{}, collector *plan.Collector) *operator.Operator { store := cs.getStore() ssolver.Step++ defer func() { ssolver.Step-- }() - retryLimit := b.retryQuota.getLimit(store) + retryLimit := s.retryQuota.getLimit(store) ssolver.Source, ssolver.Target = store, nil var op *operator.Operator for i := 0; i < retryLimit; i++ { - schedulerCounter.WithLabelValues(b.GetName(), "total").Inc() - if op = b.transferWitnessOut(ssolver, collector); op != nil { + schedulerCounter.WithLabelValues(s.GetName(), "total").Inc() + if op = s.transferWitnessOut(ssolver, collector); op != nil { if _, ok := usedRegions[op.RegionID()]; !ok { break } @@ -278,10 +278,10 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul } } if op != nil { - b.retryQuota.resetLimit(store) + s.retryQuota.resetLimit(store) } else { - b.attenuate(store) - log.Debug("no operator created for selected stores", zap.String("scheduler", b.GetName()), zap.Uint64("transfer-out", store.GetID())) + s.attenuate(store) + log.Debug("no operator created for selected stores", zap.String("scheduler", s.GetName()), zap.Uint64("transfer-out", store.GetID())) cs.next() } return op @@ -290,35 +290,35 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul // transferWitnessOut transfers witness from the source store. // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the witness. -func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.sourceStoreID(), b.conf.getRanges()), +func (s *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *plan.Collector) *operator.Operator { + solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.sourceStoreID(), s.conf.getRanges()), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { - log.Debug("store has no witness", zap.String("scheduler", b.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) - schedulerCounter.WithLabelValues(b.GetName(), "no-witness-region").Inc() + log.Debug("store has no witness", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) + schedulerCounter.WithLabelValues(s.GetName(), "no-witness-region").Inc() return nil } solver.Step++ defer func() { solver.Step-- }() targets := solver.GetNonWitnessVoterStores(solver.Region) - finalFilters := b.filters + finalFilters := s.filters conf := solver.GetSchedulerConfig() - if witnessFilter := filter.NewPlacementWitnessSafeguard(b.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, solver.fit); witnessFilter != nil { - finalFilters = append(b.filters, witnessFilter) + if witnessFilter := filter.NewPlacementWitnessSafeguard(s.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, solver.fit); witnessFilter != nil { + finalFilters = append(s.filters, witnessFilter) } - targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, b.filterCounter) + targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, s.filterCounter) sort.Slice(targets, func(i, j int) bool { iOp := solver.getOpInfluence(targets[i].GetID()) jOp := solver.getOpInfluence(targets[j].GetID()) return targets[i].WitnessScore(iOp) < targets[j].WitnessScore(jOp) }) for _, solver.Target = range targets { - if op := b.createOperator(solver, collector); op != nil { + if op := s.createOperator(solver, collector); op != nil { return op } } - log.Debug("region has no target store", zap.String("scheduler", b.GetName()), zap.Uint64("region-id", solver.Region.GetID())) - schedulerCounter.WithLabelValues(b.GetName(), "no-target-store").Inc() + log.Debug("region has no target store", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) + schedulerCounter.WithLabelValues(s.GetName(), "no-target-store").Inc() return nil } @@ -326,12 +326,12 @@ func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector * // If the region is hot or the difference between the two stores is tolerable, then // no new operator need to be created, otherwise create an operator that transfers // the witness from the source store to the target store for the region. -func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { +func (s *balanceWitnessScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { solver.Step++ defer func() { solver.Step-- }() - solver.sourceScore, solver.targetScore = solver.sourceStoreScore(b.GetName()), solver.targetStoreScore(b.GetName()) - if !solver.shouldBalance(b.GetName()) { - schedulerCounter.WithLabelValues(b.GetName(), "skip").Inc() + solver.sourceScore, solver.targetScore = solver.sourceStoreScore(s.GetName()), solver.targetStoreScore(s.GetName()) + if !solver.shouldBalance(s.GetName()) { + schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc() if collector != nil { collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed))) } @@ -339,18 +339,18 @@ func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan } solver.Step++ defer func() { solver.Step-- }() - op, err := operator.CreateMoveWitnessOperator(b.GetName(), solver, solver.Region, solver.sourceStoreID(), solver.targetStoreID()) + op, err := operator.CreateMoveWitnessOperator(s.GetName(), solver, solver.Region, solver.sourceStoreID(), solver.targetStoreID()) if err != nil { log.Debug("fail to create balance witness operator", errs.ZapError(err)) return nil } op.Counters = append(op.Counters, - schedulerCounter.WithLabelValues(b.GetName(), "new-operator"), + schedulerCounter.WithLabelValues(s.GetName(), "new-operator"), ) op.FinishedCounters = append(op.FinishedCounters, - balanceDirectionCounter.WithLabelValues(b.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), - b.counter.WithLabelValues("move-witness", solver.sourceMetricLabel()+"-out"), - b.counter.WithLabelValues("move-witness", solver.targetMetricLabel()+"-in"), + balanceDirectionCounter.WithLabelValues(s.GetName(), solver.sourceMetricLabel(), solver.targetMetricLabel()), + s.counter.WithLabelValues("move-witness", solver.sourceMetricLabel()+"-out"), + s.counter.WithLabelValues("move-witness", solver.targetMetricLabel()+"-in"), ) op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)) op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64)) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index e9e369b68d4..eedbcfe4625 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -109,18 +109,18 @@ func newBaseHotScheduler( // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store, only update read or write load detail -func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) { +func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.SchedulerCluster) { storeInfos := statistics.SummaryStoreInfos(cluster.GetStores()) - h.summaryPendingInfluence(storeInfos) + s.summaryPendingInfluence(storeInfos) storesLoads := cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow() prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, rw utils.RWType, resource constant.ResourceKind) { ty := buildResourceType(rw, resource) - h.stLoadInfos[ty] = statistics.SummaryStoresLoad( + s.stLoadInfos[ty] = statistics.SummaryStoresLoad( storeInfos, storesLoads, - h.stHistoryLoads, + s.stHistoryLoads, regionStats, isTraceRegionFlow, rw, resource) @@ -129,35 +129,35 @@ func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche case readLeader, readPeer: // update read statistics // avoid to update read statistics frequently - if time.Since(h.updateReadTime) >= statisticsInterval { + if time.Since(s.updateReadTime) >= statisticsInterval { regionRead := cluster.RegionReadStats() prepare(regionRead, utils.Read, constant.LeaderKind) prepare(regionRead, utils.Read, constant.RegionKind) - h.updateReadTime = time.Now() + s.updateReadTime = time.Now() } case writeLeader, writePeer: // update write statistics // avoid to update write statistics frequently - if time.Since(h.updateWriteTime) >= statisticsInterval { + if time.Since(s.updateWriteTime) >= statisticsInterval { regionWrite := cluster.RegionWriteStats() prepare(regionWrite, utils.Write, constant.LeaderKind) prepare(regionWrite, utils.Write, constant.RegionKind) - h.updateWriteTime = time.Now() + s.updateWriteTime = time.Now() } default: log.Error("invalid resource type", zap.String("type", typ.String())) } } -func (h *baseHotScheduler) updateHistoryLoadConfig(sampleDuration, sampleInterval time.Duration) { - h.stHistoryLoads = h.stHistoryLoads.UpdateConfig(sampleDuration, sampleInterval) +func (s *baseHotScheduler) updateHistoryLoadConfig(sampleDuration, sampleInterval time.Duration) { + s.stHistoryLoads = s.stHistoryLoads.UpdateConfig(sampleDuration, sampleInterval) } // summaryPendingInfluence calculate the summary of pending Influence for each store // and clean the region from regionInfluence if they have ended operator. // It makes each dim rate or count become `weight` times to the origin value. -func (h *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statistics.StoreSummaryInfo) { - for id, p := range h.regionPendings { +func (s *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statistics.StoreSummaryInfo) { + for id, p := range s.regionPendings { for _, from := range p.froms { from := storeInfos[from] to := storeInfos[p.to] @@ -165,7 +165,7 @@ func (h *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statis weight, needGC := calcPendingInfluence(p.op, maxZombieDur) if needGC { - delete(h.regionPendings, id) + delete(s.regionPendings, id) continue } @@ -188,8 +188,8 @@ func (h *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statis } } -func (h *baseHotScheduler) randomType() resourceType { - return h.types[h.r.Int()%len(h.types)] +func (s *baseHotScheduler) randomType() resourceType { + return s.types[s.r.Int()%len(s.types)] } type hotScheduler struct { @@ -214,48 +214,48 @@ func newHotScheduler(opController *operator.Controller, conf *hotRegionScheduler } // EncodeConfig implements the Scheduler interface. -func (h *hotScheduler) EncodeConfig() ([]byte, error) { - return h.conf.encodeConfig() +func (s *hotScheduler) EncodeConfig() ([]byte, error) { + return s.conf.encodeConfig() } // ReloadConfig impl -func (h *hotScheduler) ReloadConfig() error { - h.conf.Lock() - defer h.conf.Unlock() +func (s *hotScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() newCfg := &hotRegionSchedulerConfig{} - if err := h.conf.load(newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } - h.conf.MinHotByteRate = newCfg.MinHotByteRate - h.conf.MinHotKeyRate = newCfg.MinHotKeyRate - h.conf.MinHotQueryRate = newCfg.MinHotQueryRate - h.conf.MaxZombieRounds = newCfg.MaxZombieRounds - h.conf.MaxPeerNum = newCfg.MaxPeerNum - h.conf.ByteRateRankStepRatio = newCfg.ByteRateRankStepRatio - h.conf.KeyRateRankStepRatio = newCfg.KeyRateRankStepRatio - h.conf.QueryRateRankStepRatio = newCfg.QueryRateRankStepRatio - h.conf.CountRankStepRatio = newCfg.CountRankStepRatio - h.conf.GreatDecRatio = newCfg.GreatDecRatio - h.conf.MinorDecRatio = newCfg.MinorDecRatio - h.conf.SrcToleranceRatio = newCfg.SrcToleranceRatio - h.conf.DstToleranceRatio = newCfg.DstToleranceRatio - h.conf.WriteLeaderPriorities = newCfg.WriteLeaderPriorities - h.conf.WritePeerPriorities = newCfg.WritePeerPriorities - h.conf.ReadPriorities = newCfg.ReadPriorities - h.conf.StrictPickingStore = newCfg.StrictPickingStore - h.conf.EnableForTiFlash = newCfg.EnableForTiFlash - h.conf.RankFormulaVersion = newCfg.RankFormulaVersion - h.conf.ForbidRWType = newCfg.ForbidRWType - h.conf.SplitThresholds = newCfg.SplitThresholds - h.conf.HistorySampleDuration = newCfg.HistorySampleDuration - h.conf.HistorySampleInterval = newCfg.HistorySampleInterval + s.conf.MinHotByteRate = newCfg.MinHotByteRate + s.conf.MinHotKeyRate = newCfg.MinHotKeyRate + s.conf.MinHotQueryRate = newCfg.MinHotQueryRate + s.conf.MaxZombieRounds = newCfg.MaxZombieRounds + s.conf.MaxPeerNum = newCfg.MaxPeerNum + s.conf.ByteRateRankStepRatio = newCfg.ByteRateRankStepRatio + s.conf.KeyRateRankStepRatio = newCfg.KeyRateRankStepRatio + s.conf.QueryRateRankStepRatio = newCfg.QueryRateRankStepRatio + s.conf.CountRankStepRatio = newCfg.CountRankStepRatio + s.conf.GreatDecRatio = newCfg.GreatDecRatio + s.conf.MinorDecRatio = newCfg.MinorDecRatio + s.conf.SrcToleranceRatio = newCfg.SrcToleranceRatio + s.conf.DstToleranceRatio = newCfg.DstToleranceRatio + s.conf.WriteLeaderPriorities = newCfg.WriteLeaderPriorities + s.conf.WritePeerPriorities = newCfg.WritePeerPriorities + s.conf.ReadPriorities = newCfg.ReadPriorities + s.conf.StrictPickingStore = newCfg.StrictPickingStore + s.conf.EnableForTiFlash = newCfg.EnableForTiFlash + s.conf.RankFormulaVersion = newCfg.RankFormulaVersion + s.conf.ForbidRWType = newCfg.ForbidRWType + s.conf.SplitThresholds = newCfg.SplitThresholds + s.conf.HistorySampleDuration = newCfg.HistorySampleDuration + s.conf.HistorySampleInterval = newCfg.HistorySampleInterval return nil } // ServeHTTP implements the http.Handler interface. -func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.conf.ServeHTTP(w, r) +func (s *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.conf.ServeHTTP(w, r) } // GetMinInterval implements the Scheduler interface. @@ -264,73 +264,73 @@ func (*hotScheduler) GetMinInterval() time.Duration { } // GetNextInterval implements the Scheduler interface. -func (h *hotScheduler) GetNextInterval(time.Duration) time.Duration { - return intervalGrow(h.GetMinInterval(), maxHotScheduleInterval, exponentialGrowth) +func (s *hotScheduler) GetNextInterval(time.Duration) time.Duration { + return intervalGrow(s.GetMinInterval(), maxHotScheduleInterval, exponentialGrowth) } // IsScheduleAllowed implements the Scheduler interface. -func (h *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - allowed := h.OpController.OperatorCount(operator.OpHotRegion) < cluster.GetSchedulerConfig().GetHotRegionScheduleLimit() +func (s *hotScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpHotRegion) < cluster.GetSchedulerConfig().GetHotRegionScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(h.GetType(), operator.OpHotRegion) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpHotRegion) } return allowed } // Schedule implements the Scheduler interface. -func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { +func (s *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { hotSchedulerCounter.Inc() - typ := h.randomType() - return h.dispatch(typ, cluster), nil + typ := s.randomType() + return s.dispatch(typ, cluster), nil } -func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { - h.Lock() - defer h.Unlock() - h.updateHistoryLoadConfig(h.conf.getHistorySampleDuration(), h.conf.getHistorySampleInterval()) - h.prepareForBalance(typ, cluster) +func (s *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator { + s.Lock() + defer s.Unlock() + s.updateHistoryLoadConfig(s.conf.getHistorySampleDuration(), s.conf.getHistorySampleInterval()) + s.prepareForBalance(typ, cluster) // isForbidRWType can not be move earlier to support to use api and metrics. switch typ { case readLeader, readPeer: - if h.conf.isForbidRWType(utils.Read) { + if s.conf.isForbidRWType(utils.Read) { return nil } - return h.balanceHotReadRegions(cluster) + return s.balanceHotReadRegions(cluster) case writePeer: - if h.conf.isForbidRWType(utils.Write) { + if s.conf.isForbidRWType(utils.Write) { return nil } - return h.balanceHotWritePeers(cluster) + return s.balanceHotWritePeers(cluster) case writeLeader: - if h.conf.isForbidRWType(utils.Write) { + if s.conf.isForbidRWType(utils.Write) { return nil } - return h.balanceHotWriteLeaders(cluster) + return s.balanceHotWriteLeaders(cluster) } return nil } -func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool { +func (s *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool { regionID := op.RegionID() - _, ok := h.regionPendings[regionID] + _, ok := s.regionPendings[regionID] if ok { pendingOpFailsStoreCounter.Inc() return false } influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur) - h.regionPendings[regionID] = influence + s.regionPendings[regionID] = influence utils.ForeachRegionStats(func(rwTy utils.RWType, dim int, kind utils.RegionStatKind) { - hotPeerHist.WithLabelValues(h.GetName(), rwTy.String(), utils.DimToString(dim)).Observe(infl.Loads[kind]) + hotPeerHist.WithLabelValues(s.GetName(), rwTy.String(), utils.DimToString(dim)).Observe(infl.Loads[kind]) }) return true } -func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*operator.Operator { - leaderSolver := newBalanceSolver(h, cluster, utils.Read, transferLeader) +func (s *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*operator.Operator { + leaderSolver := newBalanceSolver(s, cluster, utils.Read, transferLeader) leaderOps := leaderSolver.solve() - peerSolver := newBalanceSolver(h, cluster, utils.Read, movePeer) + peerSolver := newBalanceSolver(s, cluster, utils.Read, movePeer) peerOps := peerSolver.solve() if len(leaderOps) == 0 && len(peerOps) == 0 { hotSchedulerSkipCounter.Inc() @@ -370,8 +370,8 @@ func (h *hotScheduler) balanceHotReadRegions(cluster sche.SchedulerCluster) []*o return nil } -func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator { - peerSolver := newBalanceSolver(h, cluster, utils.Write, movePeer) +func (s *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*operator.Operator { + peerSolver := newBalanceSolver(s, cluster, utils.Write, movePeer) ops := peerSolver.solve() if len(ops) > 0 && peerSolver.tryAddPendingInfluence() { return ops @@ -379,8 +379,8 @@ func (h *hotScheduler) balanceHotWritePeers(cluster sche.SchedulerCluster) []*op return nil } -func (h *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator { - leaderSolver := newBalanceSolver(h, cluster, utils.Write, transferLeader) +func (s *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*operator.Operator { + leaderSolver := newBalanceSolver(s, cluster, utils.Write, transferLeader) ops := leaderSolver.solve() if len(ops) > 0 && leaderSolver.tryAddPendingInfluence() { return ops diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index 2e062126fea..5ba303ad05a 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -131,62 +131,62 @@ func newScatterRangeScheduler(opController *operator.Controller, config *scatter } // ServeHTTP implements the http.Handler interface. -func (l *scatterRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - l.handler.ServeHTTP(w, r) +func (s *scatterRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } // EncodeConfig implements the Scheduler interface. -func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { - l.config.RLock() - defer l.config.RUnlock() - return EncodeConfig(l.config) +func (s *scatterRangeScheduler) EncodeConfig() ([]byte, error) { + s.config.RLock() + defer s.config.RUnlock() + return EncodeConfig(s.config) } // ReloadConfig implements the Scheduler interface. -func (l *scatterRangeScheduler) ReloadConfig() error { - l.config.Lock() - defer l.config.Unlock() +func (s *scatterRangeScheduler) ReloadConfig() error { + s.config.Lock() + defer s.config.Unlock() newCfg := &scatterRangeSchedulerConfig{} - if err := l.config.load(newCfg); err != nil { + if err := s.config.load(newCfg); err != nil { return err } - l.config.RangeName = newCfg.RangeName - l.config.StartKey = newCfg.StartKey - l.config.EndKey = newCfg.EndKey + s.config.RangeName = newCfg.RangeName + s.config.StartKey = newCfg.StartKey + s.config.EndKey = newCfg.EndKey return nil } // IsScheduleAllowed implements the Scheduler interface. -func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - return l.allowBalanceLeader(cluster) || l.allowBalanceRegion(cluster) +func (s *scatterRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + return s.allowBalanceLeader(cluster) || s.allowBalanceRegion(cluster) } -func (l *scatterRangeScheduler) allowBalanceLeader(cluster sche.SchedulerCluster) bool { - allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() +func (s *scatterRangeScheduler) allowBalanceLeader(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(l.GetType(), operator.OpLeader) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpLeader) } return allowed } -func (l *scatterRangeScheduler) allowBalanceRegion(cluster sche.SchedulerCluster) bool { - allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() +func (s *scatterRangeScheduler) allowBalanceRegion(cluster sche.SchedulerCluster) bool { + allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { - operator.IncOperatorLimitCounter(l.GetType(), operator.OpRegion) + operator.IncOperatorLimitCounter(s.GetType(), operator.OpRegion) } return allowed } // Schedule implements the Scheduler interface. -func (l *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { +func (s *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { scatterRangeCounter.Inc() // isolate a new cluster according to the key range - c := genRangeCluster(cluster, l.config.getStartKey(), l.config.getEndKey()) + c := genRangeCluster(cluster, s.config.getStartKey(), s.config.getEndKey()) c.SetTolerantSizeRatio(2) - if l.allowBalanceLeader(cluster) { - ops, _ := l.balanceLeader.Schedule(c, false) + if s.allowBalanceLeader(cluster) { + ops, _ := s.balanceLeader.Schedule(c, false) if len(ops) > 0 { - ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", l.config.getRangeName())) + ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", s.config.getRangeName())) ops[0].AttachKind(operator.OpRange) ops[0].Counters = append(ops[0].Counters, scatterRangeNewOperatorCounter, @@ -195,10 +195,10 @@ func (l *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) } scatterRangeNoNeedBalanceLeaderCounter.Inc() } - if l.allowBalanceRegion(cluster) { - ops, _ := l.balanceRegion.Schedule(c, false) + if s.allowBalanceRegion(cluster) { + ops, _ := s.balanceRegion.Schedule(c, false) if len(ops) > 0 { - ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", l.config.getRangeName())) + ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", s.config.getRangeName())) ops[0].AttachKind(operator.OpRange) ops[0].Counters = append(ops[0].Counters, scatterRangeNewOperatorCounter,