Skip to content

Commit

Permalink
schedule: one operator only occupy one limit (tikv#3820)
Browse files Browse the repository at this point in the history
* use O(1) to get WMA

Signed-off-by: tongjian <[email protected]>

* add sonar file ignore code duplicated
Signed-off-by: tongjian <[email protected]>

Signed-off-by: buffer <[email protected]>

* one operator should occupy one limit

Signed-off-by: bufferflies <[email protected]>

* remove no need change

Signed-off-by: bufferflies <[email protected]>

* move scheduler OpKind to operator.go

Signed-off-by: bufferflies <[email protected]>

* replace opkind to get op producer by using LowBit

Signed-off-by: bufferflies <[email protected]>

* pass uint test

Signed-off-by: bufferflies <[email protected]>

* replace chinese link

Signed-off-by: bufferflies <[email protected]>

* OperatorCount use O(1)

Signed-off-by: bufferflies <[email protected]>
  • Loading branch information
bufferflies committed Jul 9, 2021
1 parent ca8b360 commit 97dc51c
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 27 deletions.
6 changes: 3 additions & 3 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,19 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {

op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader)
oc.AddWaitingOperator(op1)
c.Assert(oc.OperatorCount(op1.Kind()), Equals, uint64(1))
c.Assert(oc.OperatorCount(operator.OpLeader), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op1.RegionID())

// Region 1 already has an operator, cannot add another one.
op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(0))
c.Assert(oc.OperatorCount(operator.OpRegion), Equals, uint64(0))

// Remove the operator manually, then we can add a new operator.
c.Assert(oc.RemoveOperator(op1), IsTrue)
op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion)
oc.AddWaitingOperator(op3)
c.Assert(oc.OperatorCount(op3.Kind()), Equals, uint64(1))
c.Assert(oc.OperatorCount(operator.OpRegion), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op3.RegionID())
}

Expand Down
25 changes: 13 additions & 12 deletions server/schedule/operator/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,23 @@ type OpKind uint32

// Flags for operators.
const (
// Include leader transfer.
OpLeader OpKind = 1 << iota
// Include peer addition or removal. This means that this operator may take a long time.
OpRegion
// Initiated by merge checker or merge scheduler. Note that it may not include region merge.
// the order describe the operator's producer and is very helpful to decouple scheduler or checker limit
OpMerge OpKind = 1 << iota
// Initiated by range scheduler.
OpRange
// Initiated by replica checker.
OpReplica
// Include region split. Initiated by rule checker if `kind & OpAdmin == 0`.
OpSplit
// Initiated by admin.
OpAdmin
// Initiated by hot region scheduler.
OpHotRegion
// Initiated by replica checker.
OpReplica
// Initiated by merge checker or merge scheduler. Note that it may not include region merge.
OpMerge
// Initiated by range scheduler.
OpRange
// Include peer addition or removal. This means that this operator may take a long time.
OpRegion
// Include leader transfer.
OpLeader
// Initiated by admin.
OpAdmin
opMax
)

Expand Down
10 changes: 10 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ func (o *Operator) Kind() OpKind {
return o.kind
}

// SchedulerKind return the highest OpKind even if the operator has many OpKind
// fix #3778
func (o *Operator) SchedulerKind() OpKind {
// LowBit ref: https://en.wikipedia.org/wiki/Find_first_set
// 6(110) ==> 2(10)
// 5(101) ==> 1(01)
// 4(100) ==> 4(100)
return o.kind & (-o.kind)
}

// Status returns operator status.
func (o *Operator) Status() OpStatus {
return o.status.Status()
Expand Down
35 changes: 34 additions & 1 deletion server/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *testOperatorSuite) TestInfluence(c *C) {
}

func (s *testOperatorSuite) TestOperatorKind(c *C) {
c.Assert((OpLeader | OpReplica).String(), Equals, "leader,replica")
c.Assert((OpLeader | OpReplica).String(), Equals, "replica,leader")
c.Assert(OpKind(0).String(), Equals, "unknown")
k, err := ParseOperatorKind("region,leader")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -382,3 +382,36 @@ func (s *testOperatorSuite) TestCheck(c *C) {
c.Assert(op.Status(), Equals, SUCCESS)
}
}

func (s *testOperatorSuite) TestSchedulerKind(c *C) {
testdata := []struct {
op *Operator
expect OpKind
}{
{
op: s.newTestOperator(1, OpMerge|OpLeader|OpRegion),
expect: OpMerge,
}, {
op: s.newTestOperator(1, OpReplica|OpRegion),
expect: OpReplica,
}, {
op: s.newTestOperator(1, OpSplit|OpRegion),
expect: OpSplit,
}, {
op: s.newTestOperator(1, OpRange|OpRegion),
expect: OpRange,
}, {
op: s.newTestOperator(1, OpHotRegion|OpLeader|OpRegion),
expect: OpHotRegion,
}, {
op: s.newTestOperator(1, OpRegion|OpLeader),
expect: OpRegion,
}, {
op: s.newTestOperator(1, OpLeader),
expect: OpLeader,
},
}
for _, v := range testdata {
c.Assert(v.op.SchedulerKind(), Equals, v.expect)
}
}
15 changes: 5 additions & 10 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,21 +781,16 @@ func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operat
delete(oc.counts, k)
}
for _, op := range operators {
oc.counts[op.Kind()]++
oc.counts[op.SchedulerKind()]++
}
}

// OperatorCount gets the count of operators filtered by mask.
func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64 {
// OperatorCount gets the count of operators filtered by kind.
// kind only has one OpKind.
func (oc *OperatorController) OperatorCount(kind operator.OpKind) uint64 {
oc.RLock()
defer oc.RUnlock()
var total uint64
for k, count := range oc.counts {
if k&mask != 0 {
total += count
}
}
return total
return oc.counts[kind]
}

// GetOpInfluence gets OpInfluence.
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) {
}

func (s *balanceRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
allowed := s.opController.OperatorCount(operator.OpRegion)-s.opController.OperatorCount(operator.OpMerge) < cluster.GetOpts().GetRegionScheduleLimit()
allowed := s.opController.OperatorCount(operator.OpRegion) < cluster.GetOpts().GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc()
}
Expand Down

0 comments on commit 97dc51c

Please sign in to comment.