Skip to content

Commit

Permalink
Pass the indexer instance to NewPrometheusClient (kube-burner#566)
Browse files Browse the repository at this point in the history
* Pass the indexer instance to NewPrometheusClient

Signed-off-by: Raul Sevilla <[email protected]>

* Remove useless message

Signed-off-by: Raul Sevilla <[email protected]>

---------

Signed-off-by: Raul Sevilla <[email protected]>
  • Loading branch information
rsevilla87 authored Jan 11, 2024
1 parent 4c9c3f4 commit 8c41b85
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 33 deletions.
11 changes: 4 additions & 7 deletions cmd/kube-burner/kube-burner.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,19 @@ func indexCmd() *cobra.Command {
Username: username,
UserMetaData: userMetadata,
})
docsToIndex := make(map[string][]interface{})
for _, prometheusClients := range metricsScraper.PrometheusClients {
for _, prometheusClient := range metricsScraper.PrometheusClients {
prometheusJob := prometheus.Job{
Start: time.Unix(start, 0),
End: time.Unix(end, 0),
JobConfig: config.Job{
Name: jobName,
},
}
prometheusClients.JobList = append(prometheusClients.JobList, prometheusJob)
if err := prometheusClients.ScrapeJobsMetrics(docsToIndex); err != nil {
prometheusClient.JobList = append(prometheusClient.JobList, prometheusJob)
if err := prometheusClient.ScrapeJobsMetrics(); err != nil {
log.Fatal(err)
}
}
log.Infof("Indexing metrics with UUID %s", uuid)
metrics.IndexDatapoints(docsToIndex, metricsScraper.Indexer)
if configSpec.GlobalConfig.IndexerConfig.Type == indexers.LocalIndexer && tarballName != "" {
if err := metrics.CreateTarball(configSpec.GlobalConfig.IndexerConfig, tarballName); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -434,7 +431,7 @@ func alertCmd() *cobra.Command {
Token: token,
SkipTLSVerify: skipTLSVerify,
}
p, err := prometheus.NewPrometheusClient(configSpec, url, auth, prometheusStep, map[string]interface{}{}, false)
p, err := prometheus.NewPrometheusClient(configSpec, url, auth, prometheusStep, nil, indexer, false)
if err != nil {
log.Fatal(err)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/burner/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func Run(configSpec config.Spec, prometheusClients []*prometheus.Prometheus, ale
}
}
if len(prometheusClients) > 0 {
docsToIndex := make(map[string][]interface{})
for idx, prometheusClient := range prometheusClients {
// If alertManager is configured
if alertMs[idx] != nil {
Expand All @@ -246,12 +245,10 @@ func Run(configSpec config.Spec, prometheusClients []*prometheus.Prometheus, ale
prometheusClient.JobList = executedJobs
// If prometheus is enabled query metrics from the start of the first job to the end of the last one
if indexer != nil {
prometheusClient.ScrapeJobsMetrics(docsToIndex)
prometheusClient.ScrapeJobsMetrics()
}
}
if indexer != nil {
log.Infof("Indexing metrics with UUID %s", uuid)
metrics.IndexDatapoints(docsToIndex, indexer)
if globalConfig.IndexerConfig.Type == indexers.LocalIndexer && globalConfig.IndexerConfig.CreateTarball {
metrics.CreateTarball(globalConfig.IndexerConfig, globalConfig.IndexerConfig.TarballName)
}
Expand Down
21 changes: 19 additions & 2 deletions pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"text/template"
"time"

"github.com/cloud-bulldozer/go-commons/indexers"
"github.com/cloud-bulldozer/go-commons/prometheus"
"github.com/kube-burner/kube-burner/pkg/config"
"github.com/kube-burner/kube-burner/pkg/util"
Expand All @@ -32,7 +33,7 @@ import (
)

// NewPrometheusClient creates a prometheus struct instance with the given parameters
func NewPrometheusClient(configSpec config.Spec, url string, auth Auth, step time.Duration, metadata map[string]interface{}, embedConfig bool) (*Prometheus, error) {
func NewPrometheusClient(configSpec config.Spec, url string, auth Auth, step time.Duration, metadata map[string]interface{}, indexer *indexers.Indexer, embedConfig bool) (*Prometheus, error) {
var err error
p := Prometheus{
Step: step,
Expand All @@ -41,6 +42,7 @@ func NewPrometheusClient(configSpec config.Spec, url string, auth Auth, step tim
Endpoint: url,
metadata: metadata,
embedConfig: embedConfig,
indexer: indexer,
}
log.Infof("👽 Initializing prometheus client with URL: %s", url)
p.Client, err = prometheus.NewClient(url, auth.Token, auth.Username, auth.Password, auth.SkipTLSVerify)
Expand All @@ -51,7 +53,8 @@ func NewPrometheusClient(configSpec config.Spec, url string, auth Auth, step tim
}

// ScrapeJobsMetrics gets all prometheus metrics required and handles them
func (p *Prometheus) ScrapeJobsMetrics(docsToIndex map[string][]interface{}) error {
func (p *Prometheus) ScrapeJobsMetrics() error {
docsToIndex := make(map[string][]interface{})
start := p.JobList[0].Start
end := p.JobList[len(p.JobList)-1].End
log.Infof("🔍 Scraping %v Profile: %v Start: %v End: %v",
Expand Down Expand Up @@ -92,6 +95,7 @@ func (p *Prometheus) ScrapeJobsMetrics(docsToIndex map[string][]interface{}) err
}
}
}
p.indexDatapoints(docsToIndex)
return nil
}

Expand Down Expand Up @@ -209,3 +213,16 @@ func (p *Prometheus) runRangeQuery(query, metricName string, jobStart, jobEnd ti
}
return datapoints
}

// Indexes datapoints to a specified indexer.
func (p *Prometheus) indexDatapoints(docsToIndex map[string][]interface{}) {
for metricName, docs := range docsToIndex {
log.Infof("Indexing [%d] documents from metric %s", len(docs), metricName)
resp, err := (*p.indexer).Index(docs, indexers.IndexingOpts{MetricName: metricName})
if err != nil {
log.Error(err.Error())
} else {
log.Info(resp)
}
}
}
2 changes: 2 additions & 0 deletions pkg/prometheus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package prometheus
import (
"time"

"github.com/cloud-bulldozer/go-commons/indexers"
"github.com/cloud-bulldozer/go-commons/prometheus"
"github.com/kube-burner/kube-burner/pkg/config"
)
Expand All @@ -40,6 +41,7 @@ type Prometheus struct {
JobList []Job
metadata map[string]interface{}
embedConfig bool
indexer *indexers.Indexer
}

type Job struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func ProcessMetricsScraperConfig(metricsScraperConfig ScraperConfig) Scraper {
Token: metricsEndpoint.Token,
SkipTLSVerify: metricsScraperConfig.SkipTLSVerify,
}
p, err := prometheus.NewPrometheusClient(metricsScraperConfig.ConfigSpec, metricsEndpoint.Endpoint, auth, metricsScraperConfig.PrometheusStep, metadata, false)
p, err := prometheus.NewPrometheusClient(metricsScraperConfig.ConfigSpec, metricsEndpoint.Endpoint, auth, metricsScraperConfig.PrometheusStep, metadata, indexer, false)
if err != nil {
log.Fatal(err)
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/util/metrics/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"io"

"github.com/cloud-bulldozer/go-commons/indexers"
"github.com/kube-burner/kube-burner/pkg/prometheus"
"github.com/kube-burner/kube-burner/pkg/util"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -52,16 +51,3 @@ func DecodeMetricsEndpoint(metricsEndpoint string, metricsEndpoints *[]prometheu
log.Fatalf("Error decoding metricsEndpoint %s: %s", metricsEndpoint, err)
}
}

// Indexes datapoints to a specified indexer.
func IndexDatapoints(docsToIndex map[string][]interface{}, indexer *indexers.Indexer) {
for metricName, docs := range docsToIndex {
log.Infof("Indexing [%d] documents from metric %s", len(docs), metricName)
resp, err := (*indexer).Index(docs, indexers.IndexingOpts{MetricName: metricName})
if err != nil {
log.Error(err.Error())
} else {
log.Info(resp)
}
}
}
2 changes: 1 addition & 1 deletion pkg/workloads/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (wh *WorkloadHelper) run(workload, metricsProfile string) {
Token: metricsEndpoint.Token,
SkipTLSVerify: true,
}
p, err := prometheus.NewPrometheusClient(configSpec, metricsEndpoint.Endpoint, auth, stepSize, metadata, embedConfig)
p, err := prometheus.NewPrometheusClient(configSpec, metricsEndpoint.Endpoint, auth, stepSize, metadata, indexer, embedConfig)
if err != nil {
log.Fatal(err)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/workloads/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func NewIndex(metricsEndpoint *string, ocpMetaAgent *ocpmetadata.Metadata) *cobr
UserMetaData: userMetadata,
RawMetadata: metadata,
})
docsToIndex := make(map[string][]interface{})
for _, prometheusClients := range metricsScraper.PrometheusClients {
prometheusJob := prometheus.Job{
Start: time.Unix(start, 0),
Expand All @@ -106,12 +105,10 @@ func NewIndex(metricsEndpoint *string, ocpMetaAgent *ocpmetadata.Metadata) *cobr
},
}
prometheusClients.JobList = append(prometheusClients.JobList, prometheusJob)
if prometheusClients.ScrapeJobsMetrics(docsToIndex) != nil {
if prometheusClients.ScrapeJobsMetrics() != nil {
rc = 1
}
}
log.Infof("Indexing metrics with UUID %s", uuid)
metrics.IndexDatapoints(docsToIndex, metricsScraper.Indexer)
if configSpec.GlobalConfig.IndexerConfig.Type == indexers.LocalIndexer && tarballName != "" {
if err := metrics.CreateTarball(configSpec.GlobalConfig.IndexerConfig, tarballName); err != nil {
log.Fatal(err)
Expand Down

0 comments on commit 8c41b85

Please sign in to comment.