Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: reset metrics after the leader steps down #1790

Merged
merged 6 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
for {
select {
case <-c.quit:
log.Info("metrics are reset")
c.resetMetrics()
log.Info("background jobs has been stopped")
return
case <-ticker.C:
Expand Down Expand Up @@ -1014,6 +1016,15 @@ func (c *RaftCluster) collectMetrics() {
c.collectHealthStatus()
}

func (c *RaftCluster) resetMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.opt, c.GetNamespaceClassifier())
statsMap.Reset()

c.coordinator.resetSchedulerMetrics()
c.coordinator.resetHotSpotMetrics()
c.resetClusterMetrics()
}

func (c *RaftCluster) collectClusterMetrics() {
c.RLock()
defer c.RUnlock()
Expand All @@ -1026,6 +1037,18 @@ func (c *RaftCluster) collectClusterMetrics() {
c.hotSpotCache.CollectMetrics(c.storesStats)
}

func (c *RaftCluster) resetClusterMetrics() {
c.RLock()
defer c.RUnlock()
if c.regionStats == nil {
return
}
c.regionStats.Reset()
c.labelLevelStats.Reset()
// reset hot cache metrics
c.hotSpotCache.ResetMetrics()
}

func (c *RaftCluster) collectHealthStatus() {
client := c.s.GetClient()
members, err := GetMembers(client)
Expand Down
7 changes: 7 additions & 0 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ func (c *coordinator) collectSchedulerMetrics() {
}
}

func (c *coordinator) resetSchedulerMetrics() {
schedulerStatusGauge.Reset()
}

func (c *coordinator) collectHotSpotMetrics() {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -368,7 +372,10 @@ func (c *coordinator) collectHotSpotMetrics() {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
}
}
}

func (c *coordinator) resetHotSpotMetrics() {
hotSpotStatusGauge.Reset()
}

func (c *coordinator) shouldRun() bool {
Expand Down
5 changes: 5 additions & 0 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedule/operator"
"github.com/pingcap/pd/server/schedulers"
"github.com/pingcap/pd/server/statistics"
)

func newTestScheduleConfig() (*config.ScheduleConfig, *config.ScheduleOption, error) {
Expand Down Expand Up @@ -260,6 +261,7 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) {
defer cleanup()
defer hbStreams.Close()

tc.regionStats = statistics.NewRegionStatistics(tc.s.scheduleOpt, tc.s.classifier)
co := newCoordinator(tc.RaftCluster, hbStreams, namespace.DefaultClassifier)
co.run()
// Make sure there are no problem when concurrent write and read
Expand All @@ -275,6 +277,9 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) {
co.collectSchedulerMetrics()
co.cluster.collectClusterMetrics()
}
co.resetHotSpotMetrics()
co.resetSchedulerMetrics()
co.cluster.resetClusterMetrics()
}

func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
Expand Down
7 changes: 6 additions & 1 deletion server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,17 @@ func (w *HotCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool {
w.readFlow.IsRegionHot(region, hotDegree)
}

// CollectMetrics collect the hot cache metrics
// CollectMetrics collects the hot cache metrics.
func (w *HotCache) CollectMetrics(stats *StoresStats) {
w.writeFlow.CollectMetrics(stats, "write")
w.readFlow.CollectMetrics(stats, "read")
}

// ResetMetrics resets the hot cache metrics.
func (w *HotCache) ResetMetrics() {
hotCacheStatusGauge.Reset()
}

func (w *HotCache) incMetrics(name string, storeID uint64, kind FlowKind) {
store := storeTag(storeID)
switch kind {
Expand Down
10 changes: 10 additions & 0 deletions server/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (r *RegionStatistics) Collect() {
regionStatusGauge.WithLabelValues("empty-region-count").Set(float64(len(r.stats[EmptyRegion])))
}

// Reset resets the metrics of the regions' status.
rleungx marked this conversation as resolved.
Show resolved Hide resolved
func (r *RegionStatistics) Reset() {
regionStatusGauge.Reset()
}

// LabelStatistics is the statistics of the level of labels.
type LabelStatistics struct {
regionLabelStats map[uint64]string
Expand Down Expand Up @@ -194,6 +199,11 @@ func (l *LabelStatistics) Collect() {
}
}

// Reset resets the metrics of the label status.
func (l *LabelStatistics) Reset() {
regionLabelLevelGauge.Reset()
}

// ClearDefunctRegion is used to handle the overlap region.
func (l *LabelStatistics) ClearDefunctRegion(regionID uint64, labels []string) {
if label, ok := l.regionLabelStats[regionID]; ok {
Expand Down
27 changes: 18 additions & 9 deletions server/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,19 @@ func (s *storeStatistics) Collect() {
}

func (s *storeStatistics) resetStoreStatistics(storeAddress string, id string) {
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "region_score").Set(0)
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "leader_score").Set(0)
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "region_size").Set(0)
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "region_count").Set(0)
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "leader_size").Set(0)
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "leader_count").Set(0)
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "store_available").Set(0)
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "store_used").Set(0)
storeStatusGauge.WithLabelValues(s.namespace, storeAddress, id, "store_capacity").Set(0)
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "region_score")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "leader_score")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "region_size")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "region_count")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "leader_size")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "leader_count")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "store_available")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "store_used")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "store_capacity")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "store_write_rate_bytes")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "store_read_rate_bytes")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "store_write_rate_keys")
storeStatusGauge.DeleteLabelValues(s.namespace, storeAddress, id, "store_read_rate_keys")
rleungx marked this conversation as resolved.
Show resolved Hide resolved
}

type storeStatisticsMap struct {
Expand Down Expand Up @@ -218,3 +222,8 @@ func (m *storeStatisticsMap) Collect() {
s.Collect()
}
}

func (m *storeStatisticsMap) Reset() {
storeStatusGauge.Reset()
clusterStatusGauge.Reset()
}