Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tiny change
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
rleungx committed Mar 24, 2019

Verified

This commit was signed with the committer’s verified signature.
germa89 German
1 parent 411d51b commit b6bfd73
Showing 3 changed files with 30 additions and 22 deletions.
6 changes: 6 additions & 0 deletions server/coordinator.go
Original file line number Diff line number Diff line change
@@ -133,6 +133,9 @@ func (c *coordinator) checkRegion(region *core.RegionInfo) bool {
// If PD has restarted, it need to check learners added before and promote them.
// Don't check isRaftLearnerEnabled cause it maybe disable learner feature but there are still some learners to promote.
opController := c.opController
if c.opController.GetScheduleCost() >= c.cluster.GetMaxScheduleCost() {
return false
}
for _, p := range region.GetLearners() {
if region.GetPendingLearner(p.GetId()) != nil {
continue
@@ -469,5 +472,8 @@ func (s *scheduleController) GetInterval() time.Duration {

// AllowSchedule returns if a scheduler is allowed to schedule.
func (s *scheduleController) AllowSchedule() bool {
if s.opController.GetScheduleCost() >= s.cluster.GetMaxScheduleCost() {
return false
}
return s.Scheduler.IsScheduleAllowed(s.cluster)
}
44 changes: 23 additions & 21 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
@@ -84,16 +84,13 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
oc.Lock()
defer oc.Unlock()

if oc.exceedStoreCost(ops...) || oc.getScheduleCost() >= oc.cluster.GetMaxScheduleCost() {
return false
}

for _, op := range ops {
if !oc.checkAddOperator(op) {
if oc.exceedStoreCost(ops...) || !oc.checkAddOperator(ops...) {
for _, op := range ops {
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
return false
}
return false
}

for _, op := range ops {
oc.addOperatorLocked(op)
}
@@ -106,19 +103,21 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
// - There is no such region in the cluster
// - The epoch of the operator and the epoch of the corresponding region are no longer consistent.
// - The region already has a higher priority or same priority operator.
func (oc *OperatorController) checkAddOperator(op *Operator) bool {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
log.Debug("region not found, cancel add operator", zap.Uint64("region-id", op.RegionID()))
return false
}
if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() {
log.Debug("region epoch not match, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch()))
return false
}
if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.Debug("already have operator, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", old))
return false
func (oc *OperatorController) checkAddOperator(ops ...*Operator) bool {
for _, op := range ops {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
log.Debug("region not found, cancel add operator", zap.Uint64("region-id", op.RegionID()))
return false
}
if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() {
log.Debug("region epoch not match, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch()))
return false
}
if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.Debug("already have operator, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", old))
return false
}
}
return true
}
@@ -446,7 +445,10 @@ func (oc *OperatorController) exceedStoreCost(ops ...*Operator) bool {
return false
}

func (oc *OperatorController) getScheduleCost() uint64 {
// GetScheduleCost gets the cost of running operators.
func (oc *OperatorController) GetScheduleCost() uint64 {
oc.RLock()
defer oc.RUnlock()
var scheduleCost uint64
for _, cost := range oc.storesCost {
scheduleCost += cost
2 changes: 1 addition & 1 deletion server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) {
opt := NewMockSchedulerOptions()
tc := NewMockCluster(opt)
oc := NewOperatorController(tc, nil)
tc.AddLeaderStore(2, 2)
tc.AddLeaderStore(2, 1)
tc.AddLeaderRegion(1, 1, 2)
tc.AddLeaderRegion(2, 1, 2)
steps := []OperatorStep{

0 comments on commit b6bfd73

Please sign in to comment.