Skip to content

Commit

Permalink
Merge branch 'master' into fix_424
Browse files Browse the repository at this point in the history
  • Loading branch information
vishnuchalla authored Aug 22, 2023
2 parents 1cc1b63 + 624ad96 commit dbfe679
Show file tree
Hide file tree
Showing 18 changed files with 237 additions and 201 deletions.
97 changes: 60 additions & 37 deletions cmd/kube-burner/kube-burner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

uid "github.com/satori/go.uuid"
"github.com/spf13/cobra"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
_ "k8s.io/client-go/plugin/pkg/client/auth"
)
Expand Down Expand Up @@ -107,7 +107,7 @@ func initCmd() *cobra.Command {
// We assume configFile is config.yml
configFile = "config.yml"
}
configSpec, err := config.Parse(uuid, configFile, false)
configSpec, err := config.Parse(uuid, configFile)
if err != nil {
log.Fatal(err.Error())
}
Expand All @@ -128,7 +128,8 @@ func initCmd() *cobra.Command {
}
rc, err = burner.Run(configSpec, metricsScraper.PrometheusClients, metricsScraper.AlertMs, metricsScraper.Indexer, timeout, metricsScraper.UserMetadataContent)
if err != nil {
log.Fatalf(err.Error())
log.Errorf(err.Error())
os.Exit(rc)
}
},
}
Expand All @@ -153,7 +154,7 @@ func initCmd() *cobra.Command {
}

func destroyCmd() *cobra.Command {
var uuid, configFile string
var uuid string
var timeout time.Duration
var rc int
cmd := &cobra.Command{
Expand All @@ -165,13 +166,7 @@ func destroyCmd() *cobra.Command {
},
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
if configFile != "" {
_, err := config.Parse(uuid, configFile, false)
if err != nil {
log.Fatal(err.Error())
}
}
listOptions := v1.ListOptions{LabelSelector: fmt.Sprintf("kube-burner-uuid=%s", uuid)}
listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("kube-burner-uuid=%s", uuid)}
clientSet, restConfig, err := config.GetClientSet(0, 0)
if err != nil {
log.Fatalf("Error creating clientSet: %s", err)
Expand All @@ -191,22 +186,33 @@ func destroyCmd() *cobra.Command {
}

func indexCmd() *cobra.Command {
var url, metricsEndpoint, metricsProfile, configFile, jobName string
var url, metricsEndpoint, metricsProfile, jobName string
var start, end int64
var username, password, uuid, token, userMetadata string
var esServer, esIndex, metricsDirectory string
var configSpec config.Spec
var skipTLSVerify bool
var prometheusStep time.Duration
cmd := &cobra.Command{
Use: "index",
Short: "Index kube-burner metrics",
Long: "If no other indexer is specified, local indexer is used by default",
Args: cobra.NoArgs,
PostRun: func(cmd *cobra.Command, args []string) {
log.Info("👋 Exiting kube-burner ", uuid)
},
Run: func(cmd *cobra.Command, args []string) {
configSpec, err := config.Parse(uuid, configFile, false)
if err != nil {
log.Fatal(err.Error())
if esServer != "" && esIndex != "" {
configSpec.GlobalConfig.IndexerConfig = indexers.IndexerConfig{
Type: indexers.ElasticIndexer,
Servers: []string{esServer},
Index: esIndex,
}
} else {
configSpec.GlobalConfig.IndexerConfig = indexers.IndexerConfig{
Type: indexers.LocalIndexer,
MetricsDirectory: metricsDirectory,
}
}
_ = metrics.ProcessMetricsScraperConfig(metrics.ScraperConfig{
ConfigSpec: configSpec,
Expand Down Expand Up @@ -237,23 +243,34 @@ func indexCmd() *cobra.Command {
cmd.Flags().DurationVarP(&prometheusStep, "step", "s", 30*time.Second, "Prometheus step size")
cmd.Flags().Int64VarP(&start, "start", "", time.Now().Unix()-3600, "Epoch start time")
cmd.Flags().Int64VarP(&end, "end", "", time.Now().Unix(), "Epoch end time")
cmd.Flags().StringVarP(&configFile, "config", "c", "", "Config file path or URL")
cmd.Flags().StringVarP(&jobName, "job-name", "j", "kube-burner-indexing", "Indexing job name")
cmd.Flags().StringVar(&userMetadata, "user-metadata", "", "User provided metadata file, in YAML format")
cmd.MarkFlagRequired("config")
cmd.Flags().StringVar(&metricsDirectory, "metrics-directory", "collected-metrics", "Directory to dump the metrics files in, when using default local indexing")
cmd.Flags().StringVar(&esServer, "es-server", "", "Elastic Search endpoint")
cmd.Flags().StringVar(&esIndex, "es-index", "", "Elastic Search index")
cmd.Flags().SortFlags = false
return cmd
}

func importCmd() *cobra.Command {
var configFile, tarball string
var tarball string
var esServer, esIndex, metricsDirectory string
var configSpec config.Spec
cmd := &cobra.Command{
Use: "import",
Short: "Import metrics tarball",
Run: func(cmd *cobra.Command, args []string) {
configSpec, err := config.Parse("", configFile, false)
if err != nil {
log.Fatal(err.Error())
if esServer != "" && esIndex != "" {
configSpec.GlobalConfig.IndexerConfig = indexers.IndexerConfig{
Type: indexers.ElasticIndexer,
Servers: []string{esServer},
Index: esIndex,
}
} else {
configSpec.GlobalConfig.IndexerConfig = indexers.IndexerConfig{
Type: indexers.LocalIndexer,
MetricsDirectory: metricsDirectory,
}
}
indexerConfig := configSpec.GlobalConfig.IndexerConfig
log.Infof("📁 Creating indexer: %s", indexerConfig.Type)
Expand All @@ -267,17 +284,19 @@ func importCmd() *cobra.Command {
}
},
}
cmd.Flags().StringVarP(&configFile, "config", "c", "", "Config file path or URL")
cmd.Flags().StringVar(&tarball, "tarball", "", "Metrics tarball file")
cmd.MarkFlagRequired("config")
cmd.Flags().StringVar(&metricsDirectory, "metrics-directory", "collected-metrics", "Directory to dump the metrics files in, when using default local indexing")
cmd.Flags().StringVar(&esServer, "es-server", "", "Elastic Search endpoint")
cmd.Flags().StringVar(&esIndex, "es-index", "", "Elastic Search index")
cmd.MarkFlagRequired("tarball")
return cmd
}

func alertCmd() *cobra.Command {
var configSpec config.Spec
var err error
var url, alertProfile, configFile, username, password, uuid, token string
var url, alertProfile, username, password, uuid, token string
var esServer, esIndex, metricsDirectory string
var start, end int64
var skipTLSVerify bool
var alertM *alerting.AlertManager
Expand All @@ -291,10 +310,16 @@ func alertCmd() *cobra.Command {
log.Info("👋 Exiting kube-burner ", uuid)
},
Run: func(cmd *cobra.Command, args []string) {
if configFile != "" {
configSpec, err = config.Parse(uuid, configFile, false)
if err != nil {
log.Fatal(err.Error())
if esServer != "" && esIndex != "" {
configSpec.GlobalConfig.IndexerConfig = indexers.IndexerConfig{
Type: indexers.ElasticIndexer,
Servers: []string{esServer},
Index: esIndex,
}
} else if metricsDirectory != "" {
configSpec.GlobalConfig.IndexerConfig = indexers.IndexerConfig{
Type: indexers.LocalIndexer,
MetricsDirectory: metricsDirectory,
}
}
if configSpec.GlobalConfig.IndexerConfig.Type != "" {
Expand All @@ -305,12 +330,6 @@ func alertCmd() *cobra.Command {
log.Fatal(err.Error())
}
}
if url == "" {
url = configSpec.GlobalConfig.PrometheusURL
}
if token == "" {
token = configSpec.GlobalConfig.BearerToken
}
auth := prometheus.Auth{
Username: username,
Password: password,
Expand All @@ -326,9 +345,11 @@ func alertCmd() *cobra.Command {
if alertM, err = alerting.NewAlertManager(alertProfile, uuid, indexer, p); err != nil {
log.Fatalf("Error creating alert manager: %s", err)
}
rc := alertM.Evaluate(startTime, endTime)

log.Info("👋 Exiting kube-burner ", uuid)
os.Exit(rc)
if err := alertM.Evaluate(startTime, endTime); err != nil {
os.Exit(1)
}
},
}
cmd.Flags().StringVar(&uuid, "uuid", uid.NewV4().String(), "Benchmark UUID")
Expand All @@ -341,7 +362,9 @@ func alertCmd() *cobra.Command {
cmd.Flags().DurationVarP(&prometheusStep, "step", "s", 30*time.Second, "Prometheus step size")
cmd.Flags().Int64VarP(&start, "start", "", time.Now().Unix()-3600, "Epoch start time")
cmd.Flags().Int64VarP(&end, "end", "", time.Now().Unix(), "Epoch end time")
cmd.Flags().StringVarP(&configFile, "config", "c", "", "Config file path or URL")
cmd.Flags().StringVar(&metricsDirectory, "metrics-directory", "", "Directory to dump the alert files in, enables local indexing when specified")
cmd.Flags().StringVar(&esServer, "es-server", "", "Elastic Search endpoint")
cmd.Flags().StringVar(&esIndex, "es-index", "", "Elastic Search index")
cmd.MarkFlagRequired("prometheus-url")
cmd.MarkFlagRequired("alert-profile")
cmd.Flags().SortFlags = false
Expand Down
4 changes: 4 additions & 0 deletions cmd/kube-burner/ocp-config/metrics-aggregated.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@
- query: sum(kube_node_status_condition{status="true"}) by (condition)
metricName: nodeStatus

- query: count(kube_replicaset_labels{})
metricName: replicaSetCount
instant: true

# Prometheus metrics

- query: openshift:prometheus_tsdb_head_series:sum{job="prometheus-k8s"}
Expand Down
4 changes: 4 additions & 0 deletions cmd/kube-burner/ocp-config/metrics.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@
- query: sum(kube_node_status_condition{status="true"}) by (condition)
metricName: nodeStatus

- query: count(kube_replicaset_labels{})
metricName: replicaSetCount
instant: true

# Prometheus metrics

- query: openshift:prometheus_tsdb_head_series:sum{job="prometheus-k8s"}
Expand Down
1 change: 1 addition & 0 deletions docs/ocp.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ type ClusterMetadata struct {
SDNType string `json:"sdnType"`
ClusterName string `json:"clusterName"`
Region string `json:"region"`
ExecutionErrors string `json:"executionErrors"`
}
```

Expand Down
46 changes: 21 additions & 25 deletions pkg/alerting/alert_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import (
"github.com/prometheus/common/model"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
)

type severityLevel string

const (
Passed = 0
Failed = 1
sevWarn severityLevel = "warning"
sevError severityLevel = "error"
sevCritical severityLevel = "critical"
Expand Down Expand Up @@ -100,47 +99,45 @@ func (a *AlertManager) readProfile(alertProfileCfg string) error {
yamlDec := yaml.NewDecoder(f)
yamlDec.KnownFields(true)
if err = yamlDec.Decode(&a.alertProfile); err != nil {
return fmt.Errorf("Error decoding alert profile %s: %s", alertProfileCfg, err)
return fmt.Errorf("error decoding alert profile %s: %s", alertProfileCfg, err)
}
return a.validateTemplates()
}

// Evaluate evaluates expressions
func (a *AlertManager) Evaluate(start, end time.Time) int {
func (a *AlertManager) Evaluate(start, end time.Time) error {
errs := []error{}
log.Infof("Evaluating alerts for prometheus: %v", a.prometheus.Endpoint)
var alertList []interface{}
elapsed := int(end.Sub(start).Minutes())
var renderedQuery bytes.Buffer
result := Passed
vars := util.EnvToMap()
vars["elapsed"] = fmt.Sprintf("%dm", elapsed)
for _, alert := range a.alertProfile {
t, _ := template.New("").Parse(alert.Expr)
t.Execute(&renderedQuery, vars)
expr := renderedQuery.String()
renderedQuery.Reset()
log.Infof("Evaluating expression: '%s'", expr)
log.Debugf("Evaluating expression: '%s'", expr)
v, err := a.prometheus.Client.QueryRange(expr, start, end, a.prometheus.Step)
if err != nil {
log.Warnf("Error performing query %s: %s", expr, err)
continue
}
alarmResult, alertData, err := parseMatrix(v, alert.Description, alert.Severity)
alertData, err := parseMatrix(v, alert.Description, alert.Severity)
if err != nil {
log.Error(err)
log.Error(err.Error())
errs = append(errs, err)
}
for _, alertSet := range alertData {
alertSet.UUID = a.uuid
alertList = append(alertList, alertSet)
}
if alarmResult == Failed {
result = Failed
}
}
if len(alertList) > 0 && a.indexer != nil {
a.index(alertList)
}
return result
return utilerrors.NewAggregate(errs)
}

func (a *AlertManager) validateTemplates() error {
Expand All @@ -152,16 +149,16 @@ func (a *AlertManager) validateTemplates() error {
return nil
}

func parseMatrix(value model.Value, description string, severity severityLevel) (int, []alert, error) {
func parseMatrix(value model.Value, description string, severity severityLevel) ([]alert, error) {
var renderedDesc bytes.Buffer
var templateData descriptionTemplate
// The same query can fire multiple alerts, so we have to return an array of them
var alertSet []alert
result := Passed
errs := []error{}
t, _ := template.New("").Parse(strings.Join(append(baseTemplate, description), ""))
data, ok := value.(model.Matrix)
if !ok {
return result, alertSet, fmt.Errorf("unsupported result format: %s", value.Type().String())
return alertSet, fmt.Errorf("unsupported result format: %s", value.Type().String())
}
for _, v := range data {
templateData.Labels = make(map[string]string)
Expand All @@ -173,10 +170,11 @@ func parseMatrix(value model.Value, description string, severity severityLevel)
// Take 3 decimals
templateData.Value = math.Round(float64(val.Value)*1000) / 1000
if err := t.Execute(&renderedDesc, templateData); err != nil {
log.Errorf("Rendering error: %s", err)
result = Failed
msg := fmt.Errorf("alert rendering error: %s", err)
log.Error(msg.Error())
errs = append(errs, err)
}
msg := fmt.Sprintf("🚨 Alert at %v: '%s'", val.Timestamp.Time().Format(time.RFC3339), renderedDesc.String())
msg := fmt.Sprintf("alert at %v: '%s'", val.Timestamp.Time().Format(time.RFC3339), renderedDesc.String())
alertSet = append(alertSet, alert{
Timestamp: val.Timestamp.Time(),
Severity: string(severity),
Expand All @@ -185,24 +183,22 @@ func parseMatrix(value model.Value, description string, severity severityLevel)
})
switch severity {
case sevWarn:
log.Warn(msg)
log.Warnf("🚨 %s", msg)
case sevError:
result = Failed
log.Error(msg)
errs = append(errs, fmt.Errorf(msg))
case sevCritical:
log.Fatal(msg)
log.Fatalf("🚨 %s", msg)
default:
log.Info(msg)
log.Infof("🚨 %s", msg)
}
break
}
}
return result, alertSet, nil
return alertSet, utilerrors.NewAggregate(errs)
}

func (a *AlertManager) index(alertSet []interface{}) {
log.Info("Indexing alerts")
log.Infof("Indexing alert %s", alertMetricName)
log.Debugf("Indexing [%d] documents", len(alertSet))
resp, err := (*a.indexer).Index(alertSet, indexers.IndexingOpts{MetricName: alertMetricName})
if err != nil {
Expand Down
Loading

0 comments on commit dbfe679

Please sign in to comment.