diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b7f9702c16..6c513635708 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,8 @@ ### Mimirtool * [ENHANCEMENT] Added `mimirtool rules delete-namespace` command to delete all of the rule groups in a namespace including the namespace itself. #3136 +* [ENHANCEMENT] Refactor `mimirtool analyze prometheus`: add concurrency and resiliency #3062 + * Add `--concurrency` flag. Default: number of logical CPUs * [BUGFIX] `--log.level=debug` now correctly prints the response from the remote endpoint when a request fails. #3180 ### Documentation diff --git a/pkg/mimirtool/analyze/grafana.go b/pkg/mimirtool/analyze/grafana.go index 20a2c870179..88dec23f6f9 100644 --- a/pkg/mimirtool/analyze/grafana.go +++ b/pkg/mimirtool/analyze/grafana.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/regexp" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" log "github.com/sirupsen/logrus" @@ -20,7 +21,7 @@ import ( ) type MetricsInGrafana struct { - MetricsUsed []string `json:"metricsUsed"` + MetricsUsed model.LabelValues `json:"metricsUsed"` OverallMetrics map[string]struct{} `json:"-"` Dashboards []DashboardMetrics `json:"dashboards"` } @@ -80,7 +81,6 @@ func ParseMetricsInBoard(mig *MetricsInGrafana, board minisdk.Board) { Metrics: metricsInBoard, ParseErrors: parseErrs, }) - } func metricsFromTemplating(templating minisdk.Templating, metrics map[string]struct{}) []error { diff --git a/pkg/mimirtool/analyze/prometheus.go b/pkg/mimirtool/analyze/prometheus.go index 80b617cc6ea..74bcb19b1ca 100644 --- a/pkg/mimirtool/analyze/prometheus.go +++ b/pkg/mimirtool/analyze/prometheus.go @@ -6,9 +6,9 @@ package analyze type MetricsInPrometheus struct { - TotalActiveSeries int `json:"total_active_series"` - InUseActiveSeries int `json:"in_use_active_series"` - AdditionalActiveSeries int `json:"additional_active_series"` + TotalActiveSeries uint64 `json:"total_active_series"` + InUseActiveSeries uint64 `json:"in_use_active_series"` + AdditionalActiveSeries uint64 `json:"additional_active_series"` InUseMetricCounts []MetricCount `json:"in_use_metric_counts"` AdditionalMetricCounts []MetricCount `json:"additional_metric_counts"` diff --git a/pkg/mimirtool/analyze/ruler.go b/pkg/mimirtool/analyze/ruler.go index 71c62cab180..d8a2ae8c29f 100644 --- a/pkg/mimirtool/analyze/ruler.go +++ b/pkg/mimirtool/analyze/ruler.go @@ -9,6 +9,7 @@ import ( "sort" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" log "github.com/sirupsen/logrus" @@ -16,7 +17,7 @@ import ( ) type MetricsInRuler struct { - MetricsUsed []string `json:"metricsUsed"` + MetricsUsed model.LabelValues `json:"metricsUsed"` OverallMetrics map[string]struct{} `json:"-"` RuleGroups []RuleGroupMetrics `json:"ruleGroups"` } diff --git a/pkg/mimirtool/commands/analyse.go b/pkg/mimirtool/commands/analyse.go index 9aff1e56f46..35975b41b80 100644 --- a/pkg/mimirtool/commands/analyse.go +++ b/pkg/mimirtool/commands/analyse.go @@ -6,11 +6,13 @@ package commands import ( + "runtime" + "strconv" + "gopkg.in/alecthomas/kingpin.v2" ) -type AnalyzeCommand struct { -} +type AnalyzeCommand struct{} func (cmd *AnalyzeCommand) Register(app *kingpin.Application, envVars EnvVarNames) { analyzeCmd := app.Command("analyze", "Run analysis against your Prometheus, Grafana, and Grafana Mimir to see which metrics are being used and exported.") @@ -38,6 +40,9 @@ func (cmd *AnalyzeCommand) Register(app *kingpin.Application, envVars EnvVarName prometheusAnalyzeCmd.Flag("ruler-metrics-file", "The path for the input file containing the metrics from ruler-analyze command"). Default("metrics-in-ruler.json"). StringVar(&paCmd.rulerMetricsFile) + prometheusAnalyzeCmd.Flag("concurrency", "Concurrency (Default: runtime.NumCPU())"). + Default(strconv.Itoa(runtime.NumCPU())). + IntVar(&paCmd.concurrency) prometheusAnalyzeCmd.Flag("output", "The path for the output file"). Default("prometheus-metrics.json"). StringVar(&paCmd.outputFile) diff --git a/pkg/mimirtool/commands/analyse_grafana.go b/pkg/mimirtool/commands/analyse_grafana.go index 301f5190ff8..7917ce02abf 100644 --- a/pkg/mimirtool/commands/analyse_grafana.go +++ b/pkg/mimirtool/commands/analyse_grafana.go @@ -13,6 +13,8 @@ import ( "sort" "time" + "github.com/prometheus/common/model" + "github.com/grafana-tools/sdk" "gopkg.in/alecthomas/kingpin.v2" @@ -77,11 +79,11 @@ func unmarshalDashboard(data []byte, link sdk.FoundBoard) (minisdk.Board, error) } func writeOut(mig *analyze.MetricsInGrafana, outputFile string) error { - var metricsUsed []string + var metricsUsed model.LabelValues for metric := range mig.OverallMetrics { - metricsUsed = append(metricsUsed, metric) + metricsUsed = append(metricsUsed, model.LabelValue(metric)) } - sort.Strings(metricsUsed) + sort.Sort(metricsUsed) mig.MetricsUsed = metricsUsed out, err := json.MarshalIndent(mig, "", " ") @@ -89,7 +91,7 @@ func writeOut(mig *analyze.MetricsInGrafana, outputFile string) error { return err } - if err := os.WriteFile(outputFile, out, os.FileMode(int(0666))); err != nil { + if err := os.WriteFile(outputFile, out, os.FileMode(int(0o666))); err != nil { return err } diff --git a/pkg/mimirtool/commands/analyse_prometheus.go b/pkg/mimirtool/commands/analyse_prometheus.go index b217b91473b..12dd9f27234 100644 --- a/pkg/mimirtool/commands/analyse_prometheus.go +++ b/pkg/mimirtool/commands/analyse_prometheus.go @@ -11,8 +11,11 @@ import ( "fmt" "os" "sort" + "sync" "time" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/concurrency" "github.com/pkg/errors" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" @@ -34,173 +37,211 @@ type PrometheusAnalyzeCommand struct { grafanaMetricsFile string rulerMetricsFile string outputFile string + concurrency int } func (cmd *PrometheusAnalyzeCommand) run(k *kingpin.ParseContext) error { + metricsUsed, err := cmd.parseUsedMetrics() + if err != nil { + return err + } + + v1api, err := cmd.newAPI() + if err != nil { + return err + } + + metricNames, err := cmd.queryMetricNames(v1api) + if err != nil { + return err + } + + log.Debugln("Starting to analyze metrics in use") + inUseMetricsAnalysisResult, err := cmd.analyze(v1api, metricsUsed, func(model.LabelValue) bool { return false }) + if err != nil { + return err + } + log.Infof("%d active series are being used in dashboards", inUseMetricsAnalysisResult.cardinality) + + log.Debugln("Starting to analyze metrics not in use") + metricsNotInUseResult, err := cmd.analyze(v1api, metricNames, func(metric model.LabelValue) bool { + return inUseMetricsAnalysisResult.stats[metric].totalCount > 0 + }) + if err != nil { + return err + } + log.Infof("%d active series are NOT being used in dashboards", metricsNotInUseResult.cardinality) + + metricsInPrometheus := cmd.result(inUseMetricsAnalysisResult, metricsNotInUseResult) + log.Infof("%d in use active series metric count", len(metricsInPrometheus.InUseMetricCounts)) + log.Infof("%d not in use active series metric count", len(metricsInPrometheus.AdditionalMetricCounts)) + + return cmd.write(metricsInPrometheus) +} + +func (cmd *PrometheusAnalyzeCommand) parseUsedMetrics() (model.LabelValues, error) { var ( - hasGrafanaMetrics, hasRulerMetrics = false, false - grafanaMetrics = analyze.MetricsInGrafana{} - rulerMetrics = analyze.MetricsInRuler{} - metricsUsed []string + metricsUsed model.LabelValues + grafanaMetrics = &analyze.MetricsInGrafana{} + rulerMetrics = &analyze.MetricsInRuler{} ) - if _, err := os.Stat(cmd.grafanaMetricsFile); err == nil { - hasGrafanaMetrics = true - byt, err := os.ReadFile(cmd.grafanaMetricsFile) - if err != nil { - return err - } - if err := json.Unmarshal(byt, &grafanaMetrics); err != nil { - return err - } - metricsUsed = append(metricsUsed, grafanaMetrics.MetricsUsed...) + if err := parseMetricFileIfExist(cmd.grafanaMetricsFile, grafanaMetrics); err != nil { + return nil, err } + metricsUsed = append(metricsUsed, grafanaMetrics.MetricsUsed...) - if _, err := os.Stat(cmd.rulerMetricsFile); err == nil { - hasRulerMetrics = true - byt, err := os.ReadFile(cmd.rulerMetricsFile) - if err != nil { - return err - } - if err := json.Unmarshal(byt, &rulerMetrics); err != nil { - return err - } - metricsUsed = append(metricsUsed, rulerMetrics.MetricsUsed...) + if err := parseMetricFileIfExist(cmd.rulerMetricsFile, rulerMetrics); err != nil { + return nil, err } + metricsUsed = append(metricsUsed, rulerMetrics.MetricsUsed...) - if !hasGrafanaMetrics && !hasRulerMetrics { - return errors.New("No Grafana or Ruler metrics files") + if len(metricsUsed) == 0 { + return nil, errors.New("no Grafana or Ruler metrics files") } + return metricsUsed, nil +} + +func (cmd *PrometheusAnalyzeCommand) newAPI() (v1.API, error) { rt := api.DefaultRoundTripper if cmd.username != "" { rt = config.NewBasicAuthRoundTripper(cmd.username, config.Secret(cmd.password), "", api.DefaultRoundTripper) } - promClient, err := api.NewClient(api.Config{ + + client, err := api.NewClient(api.Config{ Address: cmd.address, RoundTripper: rt, }) if err != nil { - return err + return nil, err } - v1api := v1.NewAPI(promClient) + return v1.NewAPI(client), nil +} +func (cmd *PrometheusAnalyzeCommand) queryMetricNames(api v1.API) (model.LabelValues, error) { ctx, cancel := context.WithTimeout(context.Background(), cmd.readTimeout) defer cancel() - metricNames, _, err := v1api.LabelValues(ctx, labels.MetricName, nil, time.Now().Add(-10*time.Minute), time.Now()) + + var metricNames model.LabelValues + err := withBackoff(ctx, func() error { + var err error + metricNames, _, err = api.LabelValues(ctx, labels.MetricName, nil, time.Now().Add(-10*time.Minute), time.Now()) + return err + }) if err != nil { - return errors.Wrap(err, "error querying for metric names") - } - log.Infof("Found %d metric names\n", len(metricNames)) - - inUseMetrics := map[string]struct { - totalCount int - jobCount map[string]int - }{} - inUseCardinality := 0 - var errorMetrics []string - for _, metric := range metricsUsed { - ctx, cancel := context.WithTimeout(context.Background(), cmd.readTimeout) + return nil, errors.Wrap(err, "error querying for metric names") + } + + log.Infof("Found %d metric names", len(metricNames)) + return metricNames, nil +} + +func (cmd *PrometheusAnalyzeCommand) analyze(api v1.API, metrics model.LabelValues, skip func(model.LabelValue) bool) (analyzeResult, error) { + var ( + stats = make(stats) + metricErrors []string + cardinality uint64 + mutex sync.Mutex + ) + + err := concurrency.ForEachJob(context.Background(), len(metrics), cmd.concurrency, func(ctx context.Context, idx int) error { + metric := metrics[idx] + if skip(metric) { + return nil + } + + ctx, cancel := context.WithTimeout(ctx, cmd.readTimeout) defer cancel() - query := "count by (job) (" + metric + ")" - result, _, err := v1api.Query(ctx, query, time.Now()) + var result model.Value + query := string("count by (job) (" + metric + ")") + err := withBackoff(ctx, func() error { + var err error + result, _, err = api.Query(ctx, query, time.Now()) + return err + }) if err != nil { errStr := fmt.Sprintf("skipped %s analysis because failed to run query %v: %s", metric, query, err.Error()) log.Warnln(errStr) - errorMetrics = append(errorMetrics, errStr) - continue + mutex.Lock() + metricErrors = append(metricErrors, errStr) + mutex.Unlock() + return nil } - vec := result.(model.Vector) - if len(vec) == 0 { - counts := inUseMetrics[metric] - counts.totalCount += 0 - inUseMetrics[metric] = counts - log.Debugln("in use", metric, 0) - - continue + vec, ok := result.(model.Vector) + if !ok || len(vec) == 0 { + log.Debugln("no active metrics found for", metric, 0) + return nil } for _, sample := range vec { - counts := inUseMetrics[metric] + mutex.Lock() + counts := stats[metric] if counts.jobCount == nil { counts.jobCount = make(map[string]int) } counts.totalCount += int(sample.Value) counts.jobCount[string(sample.Metric["job"])] += int(sample.Value) - inUseMetrics[metric] = counts - - inUseCardinality += int(sample.Value) + stats[metric] = counts + cardinality += uint64(sample.Value) + mutex.Unlock() } - log.Debugln("in use", metric, vec[0].Value) + log.Debugln("metric", metric, vec[0].Value) + return nil + }) + if err != nil { + return analyzeResult{}, err } - log.Infof("%d active series are being used in dashboards", inUseCardinality) - - // Count the not-in-use active series. - additionalMetrics := map[string]struct { - totalCount int - jobCount map[string]int - }{} - additionalMetricsCardinality := 0 - for _, metricName := range metricNames { - metric := string(metricName) - if _, ok := inUseMetrics[metric]; ok { - continue - } - - ctx, cancel := context.WithTimeout(context.Background(), cmd.readTimeout) - defer cancel() - - query := "count by (job) (" + metric + ")" - result, _, err := v1api.Query(ctx, query, time.Now()) - if err != nil { - errStr := fmt.Sprintf("skipped %s analysis because failed to run query %v: %s", metric, query, err.Error()) - log.Warnln(errStr) - errorMetrics = append(errorMetrics, errStr) - continue - } - - vec := result.(model.Vector) - if len(vec) == 0 { - counts := additionalMetrics[metric] - counts.totalCount += 0 - additionalMetrics[metric] = counts - - log.Debugln("additional", metric, 0) + return analyzeResult{ + stats: stats, + cardinality: cardinality, + metricErrors: metricErrors, + }, nil +} - continue - } +func (cmd *PrometheusAnalyzeCommand) result(inUse, notInUse analyzeResult) analyze.MetricsInPrometheus { + return analyze.MetricsInPrometheus{ + InUseActiveSeries: inUse.cardinality, + AdditionalActiveSeries: notInUse.cardinality, + InUseMetricCounts: inUse.metricsCounts(), + AdditionalMetricCounts: notInUse.metricsCounts(), + TotalActiveSeries: inUse.cardinality + notInUse.cardinality, + Errors: append(inUse.metricErrors, notInUse.metricErrors...), + } +} - for _, sample := range vec { - counts := additionalMetrics[metric] - if counts.jobCount == nil { - counts.jobCount = make(map[string]int) - } +func (cmd *PrometheusAnalyzeCommand) write(output analyze.MetricsInPrometheus) error { + buf, err := json.MarshalIndent(output, "", " ") + if err != nil { + return err + } - counts.totalCount += int(sample.Value) - counts.jobCount[string(sample.Metric["job"])] += int(sample.Value) - additionalMetrics[metric] = counts + return os.WriteFile(cmd.outputFile, buf, os.FileMode(0o666)) +} - additionalMetricsCardinality += int(sample.Value) - } +type stat struct { + totalCount int + jobCount map[string]int +} - log.Debugln("additional", metric, vec[0].Value) - } +type stats map[model.LabelValue]stat - log.Infof("%d active series are NOT being used in dashboards", additionalMetricsCardinality) +type analyzeResult struct { + stats stats + cardinality uint64 + metricErrors []string +} - output := analyze.MetricsInPrometheus{} - output.TotalActiveSeries = inUseCardinality + additionalMetricsCardinality - output.InUseActiveSeries = inUseCardinality - output.AdditionalActiveSeries = additionalMetricsCardinality - output.Errors = errorMetrics +func (a analyzeResult) metricsCounts() []analyze.MetricCount { + var metricCount []analyze.MetricCount - for metric, counts := range inUseMetrics { + for metric, counts := range a.stats { jobCounts := make([]analyze.JobCount, 0, len(counts.jobCount)) for job, count := range counts.jobCount { jobCounts = append(jobCounts, analyze.JobCount{ @@ -212,37 +253,42 @@ func (cmd *PrometheusAnalyzeCommand) run(k *kingpin.ParseContext) error { return jobCounts[i].Count > jobCounts[j].Count }) - output.InUseMetricCounts = append(output.InUseMetricCounts, analyze.MetricCount{Metric: metric, Count: counts.totalCount, JobCounts: jobCounts}) + metricCount = append(metricCount, analyze.MetricCount{Metric: string(metric), Count: counts.totalCount, JobCounts: jobCounts}) } - sort.Slice(output.InUseMetricCounts, func(i, j int) bool { - return output.InUseMetricCounts[i].Count > output.InUseMetricCounts[j].Count + + sort.Slice(metricCount, func(i, j int) bool { + return metricCount[i].Count > metricCount[j].Count }) - for metric, counts := range additionalMetrics { - jobCounts := make([]analyze.JobCount, 0, len(counts.jobCount)) - for job, count := range counts.jobCount { - jobCounts = append(jobCounts, analyze.JobCount{ - Job: job, - Count: count, - }) - } - sort.Slice(jobCounts, func(i, j int) bool { - return jobCounts[i].Count > jobCounts[j].Count - }) - output.AdditionalMetricCounts = append(output.AdditionalMetricCounts, analyze.MetricCount{Metric: metric, Count: counts.totalCount, JobCounts: jobCounts}) + return metricCount +} + +func parseMetricFileIfExist(path string, out any) error { + if _, err := os.Stat(path); err != nil { + // if given file does not exist, it's OK to skip. + return nil } - sort.Slice(output.AdditionalMetricCounts, func(i, j int) bool { - return output.AdditionalMetricCounts[i].Count > output.AdditionalMetricCounts[j].Count - }) - out, err := json.MarshalIndent(output, "", " ") + buf, err := os.ReadFile(path) if err != nil { return err } - if err := os.WriteFile(cmd.outputFile, out, os.FileMode(int(0666))); err != nil { - return err + return json.Unmarshal(buf, out) +} + +func withBackoff(ctx context.Context, fn func() error) error { + backoff := backoff.New(ctx, backoff.Config{ + MaxRetries: 10, + }) + + for backoff.Ongoing() { + if err := fn(); err == nil { + return nil + } + + backoff.Wait() } - return nil + return backoff.Err() } diff --git a/pkg/mimirtool/commands/analyse_ruler.go b/pkg/mimirtool/commands/analyse_ruler.go index 6ebf50a6ad0..ed436c60c0f 100644 --- a/pkg/mimirtool/commands/analyse_ruler.go +++ b/pkg/mimirtool/commands/analyse_ruler.go @@ -11,6 +11,8 @@ import ( "os" "sort" + "github.com/prometheus/common/model" + log "github.com/sirupsen/logrus" "gopkg.in/alecthomas/kingpin.v2" @@ -57,11 +59,11 @@ func (cmd *RulerAnalyzeCommand) run(k *kingpin.ParseContext) error { } func writeOutRuleMetrics(mir *analyze.MetricsInRuler, outputFile string) error { - var metricsUsed []string + var metricsUsed model.LabelValues for metric := range mir.OverallMetrics { - metricsUsed = append(metricsUsed, metric) + metricsUsed = append(metricsUsed, model.LabelValue(metric)) } - sort.Strings(metricsUsed) + sort.Sort(metricsUsed) mir.MetricsUsed = metricsUsed out, err := json.MarshalIndent(mir, "", " ") @@ -69,7 +71,7 @@ func writeOutRuleMetrics(mir *analyze.MetricsInRuler, outputFile string) error { return err } - if err := os.WriteFile(outputFile, out, os.FileMode(int(0666))); err != nil { + if err := os.WriteFile(outputFile, out, os.FileMode(int(0o666))); err != nil { return err }