Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: fix store maybe always overloaded #1586

Merged
merged 5 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,11 @@ func (c *clusterInfo) UnblockStore(storeID uint64) {
c.core.UnblockStore(storeID)
}

// SetStoreOverload stops balancer from selecting the store.
func (c *clusterInfo) SetStoreOverload(storeID uint64) {
// AttachOverloadStatus attaches the overload status to a store.
func (c *clusterInfo) AttachOverloadStatus(storeID uint64, f func() bool) {
c.Lock()
defer c.Unlock()
c.core.SetStoreOverload(storeID)
}

// ResetStoreOverload allows balancer to select the store.
func (c *clusterInfo) ResetStoreOverload(storeID uint64) {
c.Lock()
defer c.Unlock()
c.core.ResetStoreOverload(storeID)
c.core.AttachOverloadStatus(storeID, f)
}

// GetStores returns all stores in the cluster.
Expand Down
60 changes: 58 additions & 2 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/pingcap/pd/server/schedulers"
)

func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind) *schedule.Operator {
return schedule.NewOperator("test", regionID, regionEpoch, kind)
func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind, steps ...schedule.OperatorStep) *schedule.Operator {
return schedule.NewOperator("test", regionID, regionEpoch, kind, steps...)
}

func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption, error) {
Expand Down Expand Up @@ -765,6 +765,62 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) {
c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(0))
}

func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()
oc := schedule.NewOperatorController(tc.clusterInfo, hbStreams)
lb, err := schedule.CreateScheduler("balance-region", oc)
c.Assert(err, IsNil)

c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 40), IsNil)
c.Assert(tc.addRegionStore(2, 40), IsNil)
c.Assert(tc.addRegionStore(1, 10), IsNil)
c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil)
op1 := lb.Schedule(tc)[0]
c.Assert(op1, NotNil)
c.Assert(oc.AddOperator(op1), IsTrue)
for i := 0; i < 10; i++ {
c.Assert(lb.Schedule(tc), IsNil)
}
oc.RemoveOperator(op1)
time.Sleep(1 * time.Second)
for i := 0; i < 100; i++ {
c.Assert(lb.Schedule(tc), NotNil)
}
}

func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()
oc := schedule.NewOperatorController(tc.clusterInfo, hbStreams)
lb, err := schedule.CreateScheduler("balance-region", oc)
c.Assert(err, IsNil)

c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 40), IsNil)
c.Assert(tc.addRegionStore(2, 40), IsNil)
c.Assert(tc.addRegionStore(1, 10), IsNil)
c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil)
c.Assert(tc.addLeaderRegion(2, 1, 3, 4), IsNil)
op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 1, PeerID: 1})
c.Assert(oc.AddOperator(op1), IsTrue)
op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 2, PeerID: 2})
op2.SetPriorityLevel(core.HighPriority)
c.Assert(oc.AddOperator(op2), IsTrue)
op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 1, PeerID: 3})
c.Assert(oc.AddOperator(op3), IsFalse)
c.Assert(lb.Schedule(tc), IsNil)
time.Sleep(1 * time.Second)
c.Assert(lb.Schedule(tc), NotNil)
}

var _ = Suite(&testScheduleControllerSuite{})

type testScheduleControllerSuite struct{}
Expand Down
11 changes: 3 additions & 8 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,9 @@ func (bc *BasicCluster) UnblockStore(storeID uint64) {
bc.Stores.UnblockStore(storeID)
}

// SetStoreOverload stops balancer from selecting the store.
func (bc *BasicCluster) SetStoreOverload(storeID uint64) {
bc.Stores.SetStoreOverload(storeID)
}

// ResetStoreOverload allows balancer to select the store.
func (bc *BasicCluster) ResetStoreOverload(storeID uint64) {
bc.Stores.ResetStoreOverload(storeID)
// AttachOverloadStatus attaches the overload status to a store.
func (bc *BasicCluster) AttachOverloadStatus(storeID uint64, f func() bool) {
bc.Stores.AttachOverloadStatus(storeID, f)
}

// RandFollowerRegion returns a random region that has a follower on the store.
Expand Down
28 changes: 9 additions & 19 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type StoreInfo struct {
lastHeartbeatTS time.Time
leaderWeight float64
regionWeight float64
overloaded bool
overloaded func() bool
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -87,7 +87,10 @@ func (s *StoreInfo) IsBlocked() bool {

// IsOverloaded returns if the store is overloaded.
func (s *StoreInfo) IsOverloaded() bool {
return s.overloaded
if s.overloaded == nil {
return false
}
return s.overloaded()
}

// IsUp checks if the store's state is Up.
Expand Down Expand Up @@ -520,24 +523,11 @@ func (s *StoresInfo) UnblockStore(storeID uint64) {
s.stores[storeID] = store.Clone(SetStoreUnBlock())
}

// SetStoreOverload set a StoreInfo with storeID overload.
func (s *StoresInfo) SetStoreOverload(storeID uint64) {
store, ok := s.stores[storeID]
if !ok {
log.Fatal("store is overloaded, but it is not found",
zap.Uint64("store-id", storeID))
}
s.stores[storeID] = store.Clone(SetStoreOverload())
}

// ResetStoreOverload reset a StoreInfo with storeID overload.
func (s *StoresInfo) ResetStoreOverload(storeID uint64) {
store, ok := s.stores[storeID]
if !ok {
log.Fatal("store is not overloaded anymore, but it is not found",
zap.Uint64("store-id", storeID))
// AttachOverloadStatus attaches the overload status to a store.
func (s *StoresInfo) AttachOverloadStatus(storeID uint64, f func() bool) {
if store, ok := s.stores[storeID]; ok {
s.stores[storeID] = store.Clone(SetOverloadStatus(f))
}
s.stores[storeID] = store.Clone(ResetStoreOverload())
}

// GetStores gets a complete set of StoreInfo.
Expand Down
21 changes: 7 additions & 14 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,6 @@ func SetStoreUnBlock() StoreCreateOption {
}
}

// SetStoreOverload stops balancer from selecting the store.
func SetStoreOverload() StoreCreateOption {
return func(store *StoreInfo) {
store.overloaded = true
}
}

// ResetStoreOverload allows balancer to select the store.
func ResetStoreOverload() StoreCreateOption {
return func(store *StoreInfo) {
store.overloaded = false
}
}

// SetLeaderCount sets the leader count for the store.
func SetLeaderCount(leaderCount int) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down Expand Up @@ -141,3 +127,10 @@ func SetStoreStats(stats *pdpb.StoreStats) StoreCreateOption {
store.stats = stats
}
}

// SetOverloadStatus sets the overload status for the store.
func SetOverloadStatus(f func() bool) StoreCreateOption {
return func(store *StoreInfo) {
store.overloaded = f
}
}
32 changes: 16 additions & 16 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,6 @@ func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus {
func (oc *OperatorController) removeOperatorLocked(op *Operator) {
regionID := op.RegionID()
delete(oc.operators, regionID)
opInfluence := NewTotalOpInfluence([]*Operator{op}, oc.cluster)
for storeID := range opInfluence.storesInfluence {
if opInfluence.GetStoreInfluence(storeID).StepCost == 0 {
continue
}
if oc.cluster.GetStore(storeID).IsOverloaded() &&
oc.storesLimit[storeID].Available() >= RegionInfluence {
oc.cluster.ResetStoreOverload(storeID)
}
}
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
}
Expand Down Expand Up @@ -627,18 +617,14 @@ func (o *OperatorRecords) Put(op *Operator, status pdpb.OperatorStatus) {
func (oc *OperatorController) exceedStoreLimit(ops ...*Operator) bool {
opInfluence := NewTotalOpInfluence(ops, oc.cluster)
for storeID := range opInfluence.storesInfluence {
if oc.storesLimit[storeID] == nil {
rate := oc.cluster.GetStoreBalanceRate()
oc.newStoreLimit(storeID, rate)
}
stepCost := opInfluence.GetStoreInfluence(storeID).StepCost
if stepCost == 0 {
continue
}
available := oc.storesLimit[storeID].Available()

available := oc.getOrCreateStoreLimit(storeID).Available()
storeLimit.WithLabelValues(strconv.FormatUint(storeID, 10), "available").Set(float64(available) / float64(RegionInfluence))
if available < stepCost {
oc.cluster.SetStoreOverload(storeID)
return true
}
}
Expand Down Expand Up @@ -671,6 +657,20 @@ func (oc *OperatorController) newStoreLimit(storeID uint64, rate float64) {
oc.storesLimit[storeID] = ratelimit.NewBucketWithRate(rate, capacity)
}

// getOrCreateStoreLimit is used to get or create the limit of a store.
func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64) *ratelimit.Bucket {
if oc.storesLimit[storeID] == nil {
rate := oc.cluster.GetStoreBalanceRate()
oc.newStoreLimit(storeID, rate)
oc.cluster.AttachOverloadStatus(storeID, func() bool {
oc.RLock()
defer oc.RUnlock()
return oc.storesLimit[storeID].Available() < RegionInfluence
})
}
return oc.storesLimit[storeID]
}

// GetAllStoresLimit is used to get limit of all stores.
func (oc *OperatorController) GetAllStoresLimit() map[uint64]float64 {
oc.RLock()
Expand Down
3 changes: 1 addition & 2 deletions server/schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ type Cluster interface {
BlockStore(id uint64) error
UnblockStore(id uint64)

SetStoreOverload(id uint64)
ResetStoreOverload(id uint64)
AttachOverloadStatus(id uint64, f func() bool)

IsRegionHot(id uint64) bool
RegionWriteStats() []*statistics.RegionStat
Expand Down