Skip to content

Commit

Permalink
add watermark metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Kadane <[email protected]>
  • Loading branch information
Tomáš Kadaně committed Aug 8, 2022
1 parent ca4b237 commit f30db1d
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 4 deletions.
98 changes: 94 additions & 4 deletions collector/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,52 @@ import (
"net/url"
"path"
"strconv"
"strings"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/imdario/mergo"
"github.com/prometheus/client_golang/prometheus"
)

type watermarkMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
ValueWithUnit func(clusterSettings ClusterSettingsResponse) (float64, string)
}

func getUnitAndValue(value string) (float64, string) {
if strings.HasSuffix(value, "%") {
floodStage, err := strconv.ParseInt(strings.TrimSuffix(value, "%"), 10, 64)
if err == nil {
return float64(floodStage), "%"
}
}

if strings.HasSuffix(value, "b") {
var floodStage datasize.ByteSize

if err := floodStage.UnmarshalText([]byte(value)); err == nil {
return float64(floodStage.Bytes()), "bytes"
}
}

return 0, "unknown"
}

// ClusterSettings information struct
type ClusterSettings struct {
logger log.Logger
client *http.Client
url *url.URL

up prometheus.Gauge
shardAllocationEnabled prometheus.Gauge
maxShardsPerNode prometheus.Gauge
up prometheus.Gauge
shardAllocationEnabled prometheus.Gauge
maxShardsPerNode prometheus.Gauge
thresholdEnabled prometheus.Gauge
watermarkMetrics []*watermarkMetric

totalScrapes, jsonParseFailures prometheus.Counter
}

Expand Down Expand Up @@ -67,15 +97,59 @@ func NewClusterSettings(logger log.Logger, client *http.Client, url *url.URL) *C
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
thresholdEnabled: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats_allocation", "threshold_enabled"),
Help: "Is disk allocation decider enabled.",
}),
watermarkMetrics: []*watermarkMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "clustersettings_stats_allocation_watermark", "flood_stage"),
"Flood stage watermark.",
[]string{"unit"}, nil,
),
ValueWithUnit: func(clusterSettings ClusterSettingsResponse) (float64, string) {
return getUnitAndValue(clusterSettings.Cluster.Routing.Allocation.Disk.Watermark.FloodStage)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "clustersettings_stats_allocation_watermark", "high"),
"High watermark for disk usage.",
[]string{"unit"}, nil,
),
ValueWithUnit: func(clusterSettings ClusterSettingsResponse) (float64, string) {
return getUnitAndValue(clusterSettings.Cluster.Routing.Allocation.Disk.Watermark.High)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "clustersettings_stats_allocation_watermark", "low"),
"Low watermark for disk usage.",
[]string{"unit"}, nil,
),
ValueWithUnit: func(clusterSettings ClusterSettingsResponse) (float64, string) {
return getUnitAndValue(clusterSettings.Cluster.Routing.Allocation.Disk.Watermark.Low)
},
},
},
}
}

// Describe add Snapshots metrics descriptions
func (cs *ClusterSettings) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range cs.watermarkMetrics {
ch <- metric.Desc
}

ch <- cs.up.Desc()
ch <- cs.totalScrapes.Desc()
ch <- cs.shardAllocationEnabled.Desc()
ch <- cs.maxShardsPerNode.Desc()
ch <- cs.thresholdEnabled.Desc()
ch <- cs.jsonParseFailures.Desc()
}

Expand Down Expand Up @@ -143,19 +217,20 @@ func (cs *ClusterSettings) fetchAndDecodeClusterSettingsStats() (ClusterSettings

// Collect gets cluster settings metric values
func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) {

cs.totalScrapes.Inc()
defer func() {
ch <- cs.up
ch <- cs.totalScrapes
ch <- cs.jsonParseFailures
ch <- cs.shardAllocationEnabled
ch <- cs.maxShardsPerNode
ch <- cs.thresholdEnabled
}()

csr, err := cs.fetchAndDecodeClusterSettingsStats()
if err != nil {
cs.shardAllocationEnabled.Set(0)
cs.thresholdEnabled.Set(0)
cs.up.Set(0)
_ = level.Warn(cs.logger).Log(
"msg", "failed to fetch and decode cluster settings stats",
Expand All @@ -180,4 +255,19 @@ func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) {
cs.maxShardsPerNode.Set(float64(maxShardsPerNode))
}
}

if csr.Cluster.Routing.Allocation.Disk.ThresholdEnabled == "true" {
cs.thresholdEnabled.Set(1)
}

for _, metric := range cs.watermarkMetrics {
value, unit := metric.ValueWithUnit(csr)

ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
value,
unit,
)
}
}
14 changes: 14 additions & 0 deletions collector/cluster_settings_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,18 @@ type Routing struct {
// Allocation is a representation of a Elasticsearch Cluster shard routing allocation settings
type Allocation struct {
Enabled string `json:"enable"`
Disk Disk `json:"disk"`
}

// Disk is a representation of a Elasticsearch Cluster shard routing disk allocation settings
type Disk struct {
ThresholdEnabled string `json:"threshold_enabled"`
Watermark Watermark `json:"watermark"`
}

// Watermark is representation of Elasticsearch Cluster shard routing disk allocation watermark settings
type Watermark struct {
FloodStage string `json:"flood_stage"`
High string `json:"high"`
Low string `json:"low"`
}
16 changes: 16 additions & 0 deletions collector/cluster_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ func TestClusterMaxShardsPerNode(t *testing.T) {
if nsr.Cluster.MaxShardsPerNode != "1000" {
t.Errorf("Wrong value for MaxShardsPerNode")
}

if nsr.Cluster.Routing.Allocation.Disk.ThresholdEnabled != "true" {
t.Errorf("Wrong value for ThresholdEnabled")
}

if nsr.Cluster.Routing.Allocation.Disk.Watermark.Low != "85%" {
t.Errorf("Wrong value for Low Watermark")
}

if nsr.Cluster.Routing.Allocation.Disk.Watermark.High != "90%" {
t.Errorf("Wrong value for High Watermark")
}

if nsr.Cluster.Routing.Allocation.Disk.Watermark.FloodStage != "95%" {
t.Errorf("Wrong value for Flood stage Watermark")
}
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/aws/aws-sdk-go-v2 v1.16.7
github.com/aws/aws-sdk-go-v2/config v1.15.14
github.com/blang/semver/v4 v4.0.0
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b
github.com/go-kit/log v0.2.1
github.com/imdario/mergo v0.3.13
github.com/prometheus/client_golang v1.12.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
Expand Down

0 comments on commit f30db1d

Please sign in to comment.