Skip to content

Commit

Permalink
Fixed false counters resets for aggregated metrics (#3506)
Browse files Browse the repository at this point in the history
* Fixed false counters resets for aggregated metrics

Signed-off-by: Marco Pracucci <[email protected]>

* Fixed linter

Signed-off-by: Marco Pracucci <[email protected]>

* Addressed review comments

Signed-off-by: Marco Pracucci <[email protected]>

* Cleanup

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Nov 18, 2020
1 parent a04b547 commit 666719c
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 235 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452
* [BUGFIX] Fixed float64 precision stability when aggregating metrics before exposing them. This could have lead to false counters resets when querying some metrics exposed by Cortex. #3506

## Blocksconvert

Expand Down
27 changes: 4 additions & 23 deletions pkg/alertmanager/alertmanager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package alertmanager

import (
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -11,9 +9,7 @@ import (
// This struct aggregates metrics exported by Alertmanager
// and re-exports those aggregates as Cortex metrics.
type alertmanagerMetrics struct {
// Maps userID -> registry
regsMu sync.Mutex
regs map[string]*prometheus.Registry
regs *util.UserRegistries

// exported metrics, gathered from Alertmanager API
alertsReceived *prometheus.Desc
Expand Down Expand Up @@ -52,8 +48,7 @@ type alertmanagerMetrics struct {

func newAlertmanagerMetrics() *alertmanagerMetrics {
return &alertmanagerMetrics{
regs: map[string]*prometheus.Registry{},
regsMu: sync.Mutex{},
regs: util.NewUserRegistries(),
alertsReceived: prometheus.NewDesc(
"cortex_alertmanager_alerts_received_total",
"The total number of received alerts.",
Expand Down Expand Up @@ -146,21 +141,7 @@ func newAlertmanagerMetrics() *alertmanagerMetrics {
}

func (m *alertmanagerMetrics) addUserRegistry(user string, reg *prometheus.Registry) {
m.regsMu.Lock()
m.regs[user] = reg
m.regsMu.Unlock()
}

func (m *alertmanagerMetrics) registries() map[string]*prometheus.Registry {
regs := map[string]*prometheus.Registry{}

m.regsMu.Lock()
defer m.regsMu.Unlock()
for uid, r := range m.regs {
regs[uid] = r
}

return regs
m.regs.AddUserRegistry(user, reg)
}

func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -189,7 +170,7 @@ func (m *alertmanagerMetrics) Describe(out chan<- *prometheus.Desc) {
}

func (m *alertmanagerMetrics) Collect(out chan<- prometheus.Metric) {
data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.registries())
data := m.regs.BuildMetricFamiliesPerUser()

data.SendSumOfCountersPerUser(out, m.alertsReceived, "alertmanager_alerts_received_total")
data.SendSumOfCountersPerUser(out, m.alertsInvalid, "alertmanager_alerts_invalid_total")
Expand Down
25 changes: 4 additions & 21 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package ingester

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -256,13 +254,12 @@ type tsdbMetrics struct {
memSeriesCreatedTotal *prometheus.Desc
memSeriesRemovedTotal *prometheus.Desc

regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection
regs map[string]*prometheus.Registry // One prometheus registry per tenant
regs *util.UserRegistries
}

func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
m := &tsdbMetrics{
regs: make(map[string]*prometheus.Registry),
regs: util.NewUserRegistries(),

dirSyncs: prometheus.NewDesc(
"cortex_ingester_shipper_dir_syncs_total",
Expand Down Expand Up @@ -419,7 +416,7 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
}

func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
data := util.BuildMetricFamiliesPerUserFromUserRegistries(sm.registries())
data := sm.regs.BuildMetricFamiliesPerUser()

// OK, we have it all. Let's build results.
data.SendSumOfCounters(out, sm.dirSyncs, "thanos_shipper_dir_syncs_total")
Expand Down Expand Up @@ -455,20 +452,6 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total")
}

// make a copy of the map, so that metrics can be gathered while the new registry is being added.
func (sm *tsdbMetrics) registries() map[string]*prometheus.Registry {
sm.regsMu.RLock()
defer sm.regsMu.RUnlock()

regs := make(map[string]*prometheus.Registry, len(sm.regs))
for u, r := range sm.regs {
regs[u] = r
}
return regs
}

func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Registry) {
sm.regsMu.Lock()
sm.regs[userID] = registry
sm.regsMu.Unlock()
sm.regs.AddUserRegistry(userID, registry)
}
2 changes: 1 addition & 1 deletion pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
r.lastReloadSuccessful.DeleteLabelValues(userID)
r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID)
r.configUpdatesTotal.DeleteLabelValues(userID)
r.userManagerMetrics.DeleteUserRegistry(userID)
r.userManagerMetrics.RemoveUserRegistry(userID)
level.Info(r.logger).Log("msg", "deleting rule manager", "user", userID)
}
}
Expand Down
42 changes: 9 additions & 33 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package ruler

import (
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -11,9 +9,7 @@ import (
// ManagerMetrics aggregates metrics exported by the Prometheus
// rules package and returns them as Cortex metrics
type ManagerMetrics struct {
// Maps userID -> registry
regsMu sync.Mutex
regs map[string]*prometheus.Registry
regs *util.UserRegistries

EvalDuration *prometheus.Desc
IterationDuration *prometheus.Desc
Expand All @@ -30,8 +26,7 @@ type ManagerMetrics struct {
// NewManagerMetrics returns a ManagerMetrics struct
func NewManagerMetrics() *ManagerMetrics {
return &ManagerMetrics{
regs: map[string]*prometheus.Registry{},
regsMu: sync.Mutex{},
regs: util.NewUserRegistries(),

EvalDuration: prometheus.NewDesc(
"cortex_prometheus_rule_evaluation_duration_seconds",
Expand Down Expand Up @@ -96,33 +91,14 @@ func NewManagerMetrics() *ManagerMetrics {
}
}

// AddUserRegistry adds a Prometheus registry to the struct
// AddUserRegistry adds a user-specific Prometheus registry.
func (m *ManagerMetrics) AddUserRegistry(user string, reg *prometheus.Registry) {
m.regsMu.Lock()
defer m.regsMu.Unlock()

m.regs[user] = reg
}

// DeleteUserRegistry removes user-specific Prometheus registry.
func (m *ManagerMetrics) DeleteUserRegistry(user string) {
m.regsMu.Lock()
defer m.regsMu.Unlock()

delete(m.regs, user)
m.regs.AddUserRegistry(user, reg)
}

// Registries returns a map of prometheus registries managed by the struct
func (m *ManagerMetrics) Registries() map[string]*prometheus.Registry {
regs := map[string]*prometheus.Registry{}

m.regsMu.Lock()
defer m.regsMu.Unlock()
for uid, r := range m.regs {
regs[uid] = r
}

return regs
// RemoveUserRegistry removes user-specific Prometheus registry.
func (m *ManagerMetrics) RemoveUserRegistry(user string) {
m.regs.RemoveUserRegistry(user)
}

// Describe implements the Collector interface
Expand All @@ -141,10 +117,10 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) {

// Collect implements the Collector interface
func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) {
data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.Registries())
data := m.regs.BuildMetricFamiliesPerUser()

// WARNING: It is important that all metrics generated in this method are "Per User".
// Thanks to that we can actually *remove* metrics for given user (see DeleteUserRegistry).
// Thanks to that we can actually *remove* metrics for given user (see RemoveUserRegistry).
// If same user is later re-added, all metrics will start from 0, which is fine.

data.SendSumOfSummariesPerUser(out, m.EvalDuration, "prometheus_rule_evaluation_duration_seconds")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/manager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestManagerMetrics(t *testing.T) {
managerMetrics.AddUserRegistry("user3", populateManager(100))

managerMetrics.AddUserRegistry("user4", populateManager(1000))
managerMetrics.DeleteUserRegistry("user4")
managerMetrics.RemoveUserRegistry("user4")

//noinspection ALL
err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
Expand Down
26 changes: 4 additions & 22 deletions pkg/storegateway/bucket_store_metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package storegateway

import (
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -11,9 +9,7 @@ import (
// BucketStoreMetrics aggregates metrics exported by Thanos Bucket Store
// and re-exports those aggregates as Cortex metrics.
type BucketStoreMetrics struct {
// Maps userID -> registry
regsMu sync.Mutex
regs map[string]*prometheus.Registry
regs *util.UserRegistries

// exported metrics, gathered from Thanos BucketStore
blockLoads *prometheus.Desc
Expand Down Expand Up @@ -50,7 +46,7 @@ type BucketStoreMetrics struct {

func NewBucketStoreMetrics() *BucketStoreMetrics {
return &BucketStoreMetrics{
regs: map[string]*prometheus.Registry{},
regs: util.NewUserRegistries(),

blockLoads: prometheus.NewDesc(
"cortex_bucket_store_block_loads_total",
Expand Down Expand Up @@ -168,21 +164,7 @@ func NewBucketStoreMetrics() *BucketStoreMetrics {
}

func (m *BucketStoreMetrics) AddUserRegistry(user string, reg *prometheus.Registry) {
m.regsMu.Lock()
m.regs[user] = reg
m.regsMu.Unlock()
}

func (m *BucketStoreMetrics) registries() map[string]*prometheus.Registry {
regs := map[string]*prometheus.Registry{}

m.regsMu.Lock()
defer m.regsMu.Unlock()
for uid, r := range m.regs {
regs[uid] = r
}

return regs
m.regs.AddUserRegistry(user, reg)
}

func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -219,7 +201,7 @@ func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc) {
}

func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric) {
data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.registries())
data := m.regs.BuildMetricFamiliesPerUser()

data.SendSumOfCounters(out, m.blockLoads, "thanos_bucket_store_block_loads_total")
data.SendSumOfCounters(out, m.blockLoadFailures, "thanos_bucket_store_block_load_failures_total")
Expand Down
14 changes: 8 additions & 6 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) {
// Start the configure number of gateways.
var gateways []*StoreGateway
var gatewayIds []string
registries := map[string]*prometheus.Registry{}
registries := util.NewUserRegistries()

for i := 1; i <= testData.numGateways; i++ {
instanceID := fmt.Sprintf("gateway-%d", i)
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) {

gateways = append(gateways, g)
gatewayIds = append(gatewayIds, instanceID)
registries[instanceID] = reg
registries.AddUserRegistry(instanceID, reg)
}

// Wait until the ring client of each gateway has synced (to avoid flaky tests on subsequent assertions).
Expand All @@ -356,7 +356,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) {
}

// Assert on the number of blocks loaded extracting this information from metrics.
metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(registries)
metrics := registries.BuildMetricFamiliesPerUser()
assert.Equal(t, float64(testData.expectedBlocksLoaded), metrics.GetSumOfGauges("cortex_bucket_store_blocks_loaded"))
assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered"))

Expand Down Expand Up @@ -550,7 +550,9 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) {
defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck

// Assert on the initial state.
metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg})
regs := util.NewUserRegistries()
regs.AddUserRegistry("test", reg)
metrics := regs.BuildMetricFamiliesPerUser()
assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total"))

// Change the ring topology.
Expand All @@ -563,14 +565,14 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) {
// Assert whether the sync triggered or not.
if testData.expectedSync {
test.Poll(t, time.Second, float64(2), func() interface{} {
metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg})
metrics := regs.BuildMetricFamiliesPerUser()
return metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total")
})
} else {
// Give some time to the store-gateway to trigger the sync (if any).
time.Sleep(250 * time.Millisecond)

metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{"test": reg})
metrics := regs.BuildMetricFamiliesPerUser()
assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_storegateway_bucket_sync_total"))
}
})
Expand Down
Loading

0 comments on commit 666719c

Please sign in to comment.