From 57238dde95672e6cb37105be53eb69267f3d5809 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 20 Jun 2019 14:45:39 +0800 Subject: [PATCH] schedule: fix store maybe always overloaded (#1586) (#1590) * fix store maybe always overloaded Signed-off-by: Ryan Leung --- server/cluster_info.go | 13 ++---- server/coordinator_test.go | 60 +++++++++++++++++++++++++- server/core/store.go | 28 ++++-------- server/core/store_option.go | 21 +++------ server/schedule/basic_cluster.go | 11 ++--- server/schedule/operator_controller.go | 32 +++++++------- server/schedule/scheduler.go | 3 +- 7 files changed, 97 insertions(+), 71 deletions(-) diff --git a/server/cluster_info.go b/server/cluster_info.go index 8fad4f288a3..81d2f2eb065 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -233,18 +233,11 @@ func (c *clusterInfo) UnblockStore(storeID uint64) { c.core.UnblockStore(storeID) } -// SetStoreOverload stops balancer from selecting the store. -func (c *clusterInfo) SetStoreOverload(storeID uint64) { +// AttachOverloadStatus attaches the overload status to a store. +func (c *clusterInfo) AttachOverloadStatus(storeID uint64, f func() bool) { c.Lock() defer c.Unlock() - c.core.SetStoreOverload(storeID) -} - -// ResetStoreOverload allows balancer to select the store. -func (c *clusterInfo) ResetStoreOverload(storeID uint64) { - c.Lock() - defer c.Unlock() - c.core.ResetStoreOverload(storeID) + c.core.AttachOverloadStatus(storeID, f) } // GetStores returns all stores in the cluster. diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 2b3fa951ab5..ca9587e3632 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -31,8 +31,8 @@ import ( "github.com/pkg/errors" ) -func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind) *schedule.Operator { - return schedule.NewOperator("test", regionID, regionEpoch, kind) +func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind, steps ...schedule.OperatorStep) *schedule.Operator { + return schedule.NewOperator("test", regionID, regionEpoch, kind, steps...) } func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption, error) { @@ -792,6 +792,62 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) { c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(0)) } +func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + tc := newTestClusterInfo(opt) + hbStreams := getHeartBeatStreams(c, tc) + defer hbStreams.Close() + oc := schedule.NewOperatorController(tc.clusterInfo, hbStreams) + lb, err := schedule.CreateScheduler("balance-region", oc) + c.Assert(err, IsNil) + + c.Assert(tc.addRegionStore(4, 40), IsNil) + c.Assert(tc.addRegionStore(3, 40), IsNil) + c.Assert(tc.addRegionStore(2, 40), IsNil) + c.Assert(tc.addRegionStore(1, 10), IsNil) + c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil) + op1 := lb.Schedule(tc)[0] + c.Assert(op1, NotNil) + c.Assert(oc.AddOperator(op1), IsTrue) + for i := 0; i < 10; i++ { + c.Assert(lb.Schedule(tc), IsNil) + } + oc.RemoveOperator(op1) + time.Sleep(1 * time.Second) + for i := 0; i < 100; i++ { + c.Assert(lb.Schedule(tc), NotNil) + } +} + +func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) { + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + tc := newTestClusterInfo(opt) + hbStreams := getHeartBeatStreams(c, tc) + defer hbStreams.Close() + oc := schedule.NewOperatorController(tc.clusterInfo, hbStreams) + lb, err := schedule.CreateScheduler("balance-region", oc) + c.Assert(err, IsNil) + + c.Assert(tc.addRegionStore(4, 40), IsNil) + c.Assert(tc.addRegionStore(3, 40), IsNil) + c.Assert(tc.addRegionStore(2, 40), IsNil) + c.Assert(tc.addRegionStore(1, 10), IsNil) + c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil) + c.Assert(tc.addLeaderRegion(2, 1, 3, 4), IsNil) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 1, PeerID: 1}) + c.Assert(oc.AddOperator(op1), IsTrue) + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 2, PeerID: 2}) + op2.SetPriorityLevel(core.HighPriority) + c.Assert(oc.AddOperator(op2), IsTrue) + op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), schedule.OpRegion, schedule.AddPeer{ToStore: 1, PeerID: 3}) + c.Assert(oc.AddOperator(op3), IsFalse) + c.Assert(lb.Schedule(tc), IsNil) + time.Sleep(1 * time.Second) + c.Assert(lb.Schedule(tc), NotNil) +} + var _ = Suite(&testScheduleControllerSuite{}) type testScheduleControllerSuite struct{} diff --git a/server/core/store.go b/server/core/store.go index dd67b627e79..7e630b6d0d0 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -42,7 +42,7 @@ type StoreInfo struct { leaderWeight float64 regionWeight float64 rollingStoreStats *RollingStoreStats - overloaded bool + overloaded func() bool } // NewStoreInfo creates StoreInfo with meta data. @@ -91,7 +91,10 @@ func (s *StoreInfo) IsBlocked() bool { // IsOverloaded returns if the store is overloaded. func (s *StoreInfo) IsOverloaded() bool { - return s.overloaded + if s.overloaded == nil { + return false + } + return s.overloaded() } // IsUp checks if the store's state is Up. @@ -542,24 +545,11 @@ func (s *StoresInfo) UnblockStore(storeID uint64) { s.stores[storeID] = store.Clone(SetStoreUnBlock()) } -// SetStoreOverload set a StoreInfo with storeID overload. -func (s *StoresInfo) SetStoreOverload(storeID uint64) { - store, ok := s.stores[storeID] - if !ok { - log.Fatal("store is overloaded, but it is not found", - zap.Uint64("store-id", storeID)) - } - s.stores[storeID] = store.Clone(SetStoreOverload()) -} - -// ResetStoreOverload reset a StoreInfo with storeID overload. -func (s *StoresInfo) ResetStoreOverload(storeID uint64) { - store, ok := s.stores[storeID] - if !ok { - log.Fatal("store is not overloaded anymore, but it is not found", - zap.Uint64("store-id", storeID)) +// AttachOverloadStatus attaches the overload status to a store. +func (s *StoresInfo) AttachOverloadStatus(storeID uint64, f func() bool) { + if store, ok := s.stores[storeID]; ok { + s.stores[storeID] = store.Clone(SetOverloadStatus(f)) } - s.stores[storeID] = store.Clone(ResetStoreOverload()) } // GetStores gets a complete set of StoreInfo. diff --git a/server/core/store_option.go b/server/core/store_option.go index 1b7401356bf..75a412a81cc 100644 --- a/server/core/store_option.go +++ b/server/core/store_option.go @@ -65,20 +65,6 @@ func SetStoreUnBlock() StoreCreateOption { } } -// SetStoreOverload stops balancer from selecting the store. -func SetStoreOverload() StoreCreateOption { - return func(store *StoreInfo) { - store.overloaded = true - } -} - -// ResetStoreOverload allows balancer to select the store. -func ResetStoreOverload() StoreCreateOption { - return func(store *StoreInfo) { - store.overloaded = false - } -} - // SetLeaderCount sets the leader count for the store. func SetLeaderCount(leaderCount int) StoreCreateOption { return func(store *StoreInfo) { @@ -141,3 +127,10 @@ func SetStoreStats(stats *pdpb.StoreStats) StoreCreateOption { store.stats = stats } } + +// SetOverloadStatus sets the overload status for the store. +func SetOverloadStatus(f func() bool) StoreCreateOption { + return func(store *StoreInfo) { + store.overloaded = f + } +} diff --git a/server/schedule/basic_cluster.go b/server/schedule/basic_cluster.go index 20587626172..403ddde29a9 100644 --- a/server/schedule/basic_cluster.go +++ b/server/schedule/basic_cluster.go @@ -102,14 +102,9 @@ func (bc *BasicCluster) UnblockStore(storeID uint64) { bc.Stores.UnblockStore(storeID) } -// SetStoreOverload stops balancer from selecting the store. -func (bc *BasicCluster) SetStoreOverload(storeID uint64) { - bc.Stores.SetStoreOverload(storeID) -} - -// ResetStoreOverload allows balancer to select the store. -func (bc *BasicCluster) ResetStoreOverload(storeID uint64) { - bc.Stores.ResetStoreOverload(storeID) +// AttachOverloadStatus attaches the overload status to a store. +func (bc *BasicCluster) AttachOverloadStatus(storeID uint64, f func() bool) { + bc.Stores.AttachOverloadStatus(storeID, f) } // RandFollowerRegion returns a random region that has a follower on the store. diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 9e11924daef..1ccfb020797 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -278,16 +278,6 @@ func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus { func (oc *OperatorController) removeOperatorLocked(op *Operator) { regionID := op.RegionID() delete(oc.operators, regionID) - opInfluence := NewTotalOpInfluence([]*Operator{op}, oc.cluster) - for storeID := range opInfluence.storesInfluence { - if opInfluence.GetStoreInfluence(storeID).StepCost == 0 { - continue - } - if oc.cluster.GetStore(storeID).IsOverloaded() && - oc.storesLimit[storeID].Available() >= RegionInfluence { - oc.cluster.ResetStoreOverload(storeID) - } - } oc.updateCounts(oc.operators) operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() } @@ -627,18 +617,14 @@ func (o *OperatorRecords) Put(op *Operator, status pdpb.OperatorStatus) { func (oc *OperatorController) exceedStoreLimit(ops ...*Operator) bool { opInfluence := NewTotalOpInfluence(ops, oc.cluster) for storeID := range opInfluence.storesInfluence { - if oc.storesLimit[storeID] == nil { - rate := oc.cluster.GetStoreBalanceRate() - oc.newStoreLimit(storeID, rate) - } stepCost := opInfluence.GetStoreInfluence(storeID).StepCost if stepCost == 0 { continue } - available := oc.storesLimit[storeID].Available() + + available := oc.getOrCreateStoreLimit(storeID).Available() storeLimit.WithLabelValues(strconv.FormatUint(storeID, 10), "available").Set(float64(available) / float64(RegionInfluence)) if available < stepCost { - oc.cluster.SetStoreOverload(storeID) return true } } @@ -671,6 +657,20 @@ func (oc *OperatorController) newStoreLimit(storeID uint64, rate float64) { oc.storesLimit[storeID] = ratelimit.NewBucketWithRate(rate, capacity) } +// getOrCreateStoreLimit is used to get or create the limit of a store. +func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64) *ratelimit.Bucket { + if oc.storesLimit[storeID] == nil { + rate := oc.cluster.GetStoreBalanceRate() + oc.newStoreLimit(storeID, rate) + oc.cluster.AttachOverloadStatus(storeID, func() bool { + oc.RLock() + defer oc.RUnlock() + return oc.storesLimit[storeID].Available() < RegionInfluence + }) + } + return oc.storesLimit[storeID] +} + // GetAllStoresLimit is used to get limit of all stores. func (oc *OperatorController) GetAllStoresLimit() map[uint64]float64 { oc.RLock() diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index 6db141683f1..e4da6e90871 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -42,8 +42,7 @@ type Cluster interface { BlockStore(id uint64) error UnblockStore(id uint64) - SetStoreOverload(id uint64) - ResetStoreOverload(id uint64) + AttachOverloadStatus(id uint64, f func() bool) IsRegionHot(id uint64) bool RegionWriteStats() []*core.RegionStat