Skip to content

Commit

Permalink
scheduler: filter unhealthy store in summaryStoresLoad (#2737)
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 authored Aug 20, 2020
1 parent 4b8ad99 commit 3a4cae5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 15 deletions.
2 changes: 2 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats())
c.storesStats.UpdateTotalBytesRate(c.core.GetStores)
c.storesStats.UpdateTotalKeysRate(c.core.GetStores)
c.storesStats.FilterUnhealthyStore(c)

// c.limiter is nil before "start" is called
if c.limiter != nil && c.opt.GetStoreLimitMode() == "auto" {
Expand Down Expand Up @@ -829,6 +830,7 @@ func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.Region
}

// GetStoresStats returns stores' statistics from cluster.
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
func (c *RaftCluster) GetStoresStats() *statistics.StoresStats {
c.RLock()
defer c.RUnlock()
Expand Down
32 changes: 32 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,38 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
}
}

func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

stores := newTestStores(3)
for _, store := range stores {
storeStats := &pdpb.StoreStats{
StoreId: store.GetID(),
Capacity: 100,
Available: 50,
RegionCount: 1,
}
c.Assert(cluster.putStoreLocked(store), IsNil)
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), NotNil)
}

for _, store := range stores {
storeStats := &pdpb.StoreStats{
StoreId: store.GetID(),
Capacity: 100,
Available: 50,
RegionCount: 1,
}
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
c.Assert(cluster.putStoreLocked(newStore), IsNil)
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), IsNil)
}
}

func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
Expand Down
16 changes: 1 addition & 15 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,7 @@ func (bs *balanceSolver) init() {
case readLeader:
bs.stLoadDetail = bs.sche.stLoadInfos[readLeader]
}
for _, id := range getUnhealthyStores(bs.cluster) {
delete(bs.stLoadDetail, id)
}
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat

bs.maxSrc = &storeLoad{}
bs.minDst = &storeLoad{
Expand All @@ -561,18 +559,6 @@ func (bs *balanceSolver) init() {
}
}

func getUnhealthyStores(cluster opt.Cluster) []uint64 {
ret := make([]uint64, 0)
stores := cluster.GetStores()
for _, store := range stores {
if store.IsTombstone() ||
store.DownTime() > cluster.GetMaxStoreDownTime() {
ret = append(ret, store.GetID())
}
}
return ret
}

func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver {
solver := &balanceSolver{
sche: sche,
Expand Down
16 changes: 16 additions & 0 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ func (s *StoresStats) GetStoresKeysReadStat() map[uint64]float64 {
return res
}

func (s *StoresStats) storeIsUnhealthy(cluster core.StoreSetInformer, storeID uint64) bool {
store := cluster.GetStore(storeID)
return store.IsTombstone() || store.IsUnhealth()
}

// FilterUnhealthyStore filter unhealthy store
func (s *StoresStats) FilterUnhealthyStore(cluster core.StoreSetInformer) {
s.Lock()
defer s.Unlock()
for storeID := range s.rollingStoresStats {
if s.storeIsUnhealthy(cluster, storeID) {
delete(s.rollingStoresStats, storeID)
}
}
}

// RollingStoreStats are multiple sets of recent historical records with specified windows size.
type RollingStoreStats struct {
sync.RWMutex
Expand Down

0 comments on commit 3a4cae5

Please sign in to comment.