Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add nomad input plugin #10106

Merged
merged 14 commits into from
Dec 8, 2021
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_sts"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_upstream_check"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_vts"
_ "github.com/influxdata/telegraf/plugins/inputs/nomad"
_ "github.com/influxdata/telegraf/plugins/inputs/nsd"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer"
Expand Down
36 changes: 36 additions & 0 deletions plugins/inputs/nomad/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Hashicorp Nomad Input Plugin

The Nomad plugin must grab metrics from every Nomad agent of the cluster. Telegraf may be present in every node and connect to the agent locally. In this case should be something like `http://127.0.0.1:4646`.

> Tested on Nomad 1.1.6

### Configuration

```toml
[[inputs.nomad]]
## URL for the Nomad agent
url = "http://127.0.0.1:4646"
srebhan marked this conversation as resolved.
Show resolved Hide resolved
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## Use auth token for authorization.
## If both are set, an error is thrown.
## If both are empty, no token will be used.
# auth_token = "/path/to/auth/token"
## OR
# auth_token_string = "a1234567-40c7-9048-7bae-378687048181"

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
```


### Metrics

Both Nomad servers and agents collect various metrics. For every details, please have a look at Nomad following documentation:

- [https://www.nomadproject.io/docs/operations/metrics](https://www.nomadproject.io/docs/operations/metrics)
- [https://www.nomadproject.io/docs/operations/telemetry](https://www.nomadproject.io/docs/operations/telemetry)
214 changes: 214 additions & 0 deletions plugins/inputs/nomad/nomad.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package nomad

import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

// Nomad configuration object
type Nomad struct {
URL string `toml:"url"`

AuthToken string `toml:"auth_token"`
AuthTokenString string `toml:"auth_token_string"`

ResponseTimeout config.Duration `toml:"response_timeout"`

tls.ClientConfig

roundTripper http.RoundTripper
}

const timeLayout = "2006-01-02 15:04:05 -0700 MST"

var sampleConfig = `
## URL for the Nomad agent
url = "http://127.0.0.1:4646"
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## Use auth token for authorization.
## If both are set, an error is thrown.
## If both are empty, no token will be used.
srebhan marked this conversation as resolved.
Show resolved Hide resolved
# auth_token = "/path/to/auth/token"
## OR
# auth_token_string = "a1234567-40c7-9048-7bae-378687048181"

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
`

func init() {
inputs.Add("nomad", func() telegraf.Input {
return &Nomad{}
})
}

// SampleConfig returns a sample config
func (n *Nomad) SampleConfig() string {
return sampleConfig
}

// Description returns a description of the plugin
func (n *Nomad) Description() string {
return "Read metrics from the Nomad api"
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

func (n *Nomad) Init() error {

srebhan marked this conversation as resolved.
Show resolved Hide resolved
if n.AuthToken != "" && n.AuthTokenString != "" {
return fmt.Errorf("config error: both auth_token and auth_token_string are set")
}

if n.AuthToken == "" && n.AuthTokenString == "" {
n.AuthToken = ""
}

srebhan marked this conversation as resolved.
Show resolved Hide resolved
if n.AuthToken != "" {
token, err := os.ReadFile(n.AuthToken)
if err != nil {
return fmt.Errorf("reading file failed: %v", err)
}
n.AuthTokenString = strings.TrimSpace(string(token))
}

tlsCfg, err := n.ClientConfig.TLSConfig()
if err != nil {
return err
srebhan marked this conversation as resolved.
Show resolved Hide resolved
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}
if n.roundTripper == nil {
srebhan marked this conversation as resolved.
Show resolved Hide resolved
if n.ResponseTimeout < config.Duration(time.Second) {
n.ResponseTimeout = config.Duration(time.Second * 5)
}
n.roundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: time.Duration(n.ResponseTimeout),
}
}

return nil
}

// Gather, collects metrics from Nomad endpoint
func (n *Nomad) Gather(acc telegraf.Accumulator) error {
return n.gatherSummary(acc, n.URL)
}

// gatherSummary, decodes response from Nomad api and triggers the builder
func (n *Nomad) gatherSummary(acc telegraf.Accumulator, baseURL string) error {
summaryMetrics := &MetricsSummary{}
err := n.loadJSON(fmt.Sprintf("%s/v1/metrics", baseURL), summaryMetrics)
if err != nil {
return err
}

err = buildNomadMetrics(acc, summaryMetrics)
if err != nil {
return err
}

return nil
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved

func (n *Nomad) loadJSON(url string, v interface{}) error {
var req, err = http.NewRequest("GET", url, nil)
srebhan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

var resp *http.Response

req.Header.Set("Authorization", "X-Nomad-Token "+n.AuthTokenString)
req.Header.Add("Accept", "application/json")

resp, err = n.roundTripper.RoundTrip(req)
srebhan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}

err = json.NewDecoder(resp.Body).Decode(v)
if err != nil {
return fmt.Errorf("error parsing json response: %s", err)
}

return nil
}

// buildNomadMetrics, it builds all the metrics and adds them to the accumulator)
func buildNomadMetrics(acc telegraf.Accumulator, summaryMetrics *MetricsSummary) error {

srebhan marked this conversation as resolved.
Show resolved Hide resolved
t, err := time.Parse(timeLayout, summaryMetrics.Timestamp)
if err != nil {
return fmt.Errorf("error parsing time: %s", err)
}
sampledValueFields := make(map[string]interface{})

for _, counters := range summaryMetrics.Counters {
tags := counters.DisplayLabels

sampledValueFields["count"] = counters.Count
sampledValueFields["rate"] = counters.Rate
sampledValueFields["sum"] = counters.Sum
sampledValueFields["sumsq"] = counters.SumSq
sampledValueFields["min"] = counters.Min
sampledValueFields["max"] = counters.Max
sampledValueFields["mean"] = counters.Mean

acc.AddCounter(counters.Name, sampledValueFields, tags, t)
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

for _, gauges := range summaryMetrics.Gauges {
tags := gauges.DisplayLabels

fields := make(map[string]interface{})
fields["value"] = gauges.Value

acc.AddGauge(gauges.Name, fields, tags, t)

srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

for _, points := range summaryMetrics.Points {
tags := make(map[string]string)

fields := make(map[string]interface{})
fields["value"] = points.Points
srebhan marked this conversation as resolved.
Show resolved Hide resolved

acc.AddFields(points.Name, fields, tags, t)
}

for _, samples := range summaryMetrics.Samples {
tags := samples.DisplayLabels

sampledValueFields := make(map[string]interface{})
sampledValueFields["count"] = samples.Count
sampledValueFields["rate"] = samples.Rate
sampledValueFields["sum"] = samples.Sum
sampledValueFields["sumsq"] = samples.SumSq
sampledValueFields["stddev"] = samples.Stddev
sampledValueFields["min"] = samples.Min
sampledValueFields["max"] = samples.Max
sampledValueFields["mean"] = samples.Mean
srebhan marked this conversation as resolved.
Show resolved Hide resolved

acc.AddCounter(samples.Name, sampledValueFields, tags, t)
}

return nil
}
53 changes: 53 additions & 0 deletions plugins/inputs/nomad/nomad_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package nomad

import (
"time"
)

type MetricsSummary struct {
Timestamp string `json:"timestamp"`
Gauges []GaugeValue `json:"gauges"`
Points []PointValue `json:"points"`
Counters []SampledValue `json:"counters"`
Samples []SampledValue `json:"samples"`
}

type GaugeValue struct {
Name string `json:"name"`
Hash string `json:"-"`
Value float32 `json:"value"`

Labels []Label `json:"-"`
DisplayLabels map[string]string `json:"Labels"`
}

type PointValue struct {
Name string `json:"name"`
Points []float32 `json:"points"`
}

type SampledValue struct {
Name string `json:"name"`
Hash string `json:"-"`
*AggregateSample
Mean float64 `json:"mean"`
Stddev float64 `json:"stddev"`

Labels []Label `json:"-"`
DisplayLabels map[string]string `json:"Labels"`
}

type AggregateSample struct {
Count int `json:"count"`
Rate float64 `json:"rate"`
Sum float64 `json:"sum"`
SumSq float64 `json:"-"`
Min float64 `json:"min"`
Max float64 `json:"max"`
LastUpdated time.Time `json:"-"`
}

type Label struct {
Name string `json:"name"`
Value string `json:"value"`
}
Loading