From 8eb7e2e361da3f51f8cc8a94a600b0ecd02175b3 Mon Sep 17 00:00:00 2001 From: Francesco Bartolini <17676301+efbar@users.noreply.github.com> Date: Fri, 10 Dec 2021 21:10:26 +0100 Subject: [PATCH] feat: add Vault input plugin (#10198) --- plugins/inputs/all/all.go | 1 + plugins/inputs/vault/README.md | 35 +++ .../vault/testdata/response_key_metrics.json | 40 ++++ plugins/inputs/vault/vault.go | 214 ++++++++++++++++++ plugins/inputs/vault/vault_metrics.go | 40 ++++ plugins/inputs/vault/vault_test.go | 97 ++++++++ 6 files changed, 427 insertions(+) create mode 100644 plugins/inputs/vault/README.md create mode 100644 plugins/inputs/vault/testdata/response_key_metrics.json create mode 100644 plugins/inputs/vault/vault.go create mode 100644 plugins/inputs/vault/vault_metrics.go create mode 100644 plugins/inputs/vault/vault_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 5fc1a79ca5345..b0a41447ea9f0 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -198,6 +198,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/unbound" _ "github.com/influxdata/telegraf/plugins/inputs/uwsgi" _ "github.com/influxdata/telegraf/plugins/inputs/varnish" + _ "github.com/influxdata/telegraf/plugins/inputs/vault" _ "github.com/influxdata/telegraf/plugins/inputs/vsphere" _ "github.com/influxdata/telegraf/plugins/inputs/webhooks" _ "github.com/influxdata/telegraf/plugins/inputs/win_eventlog" diff --git a/plugins/inputs/vault/README.md b/plugins/inputs/vault/README.md new file mode 100644 index 0000000000000..0261efe6c1c34 --- /dev/null +++ b/plugins/inputs/vault/README.md @@ -0,0 +1,35 @@ +# Hashicorp Vault Input Plugin + +The Vault plugin could grab metrics from every Vault agent of the cluster. Telegraf may be present in every node and connect to the agent locally. In this case should be something like `http://127.0.0.1:8200`. + +> Tested on vault 1.8.5 + +## Configuration + +```toml +[[inputs.vault]] + ## URL for the vault agent + # url = "http://127.0.0.1:8200" + + ## Use Vault token for authorization. + ## Vault token configuration is mandatory. + ## If both are empty or both are set, an error is thrown. + # token_file = "/path/to/auth/token" + ## OR + token = "s.CDDrgg5zPv5ssI0Z2P4qxJj2" + + ## Set response_timeout (default 5 seconds) + # response_timeout = "5s" + + ## Optional TLS Config + # tls_ca = /path/to/cafile + # tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile +``` + +## Metrics + +For a more deep understanding of Vault monitoring, please have a look at the following Vault documentation: + +- [https://www.vaultproject.io/docs/internals/telemetry](https://www.vaultproject.io/docs/internals/telemetry) +- [https://learn.hashicorp.com/tutorials/vault/monitor-telemetry-audit-splunk?in=vault/monitoring](https://learn.hashicorp.com/tutorials/vault/monitor-telemetry-audit-splunk?in=vault/monitoring) diff --git a/plugins/inputs/vault/testdata/response_key_metrics.json b/plugins/inputs/vault/testdata/response_key_metrics.json new file mode 100644 index 0000000000000..845acb3690d6b --- /dev/null +++ b/plugins/inputs/vault/testdata/response_key_metrics.json @@ -0,0 +1,40 @@ +{ + "Gauges": [ + { + "Name": "vault.core.unsealed", + "Value": 1, + "Labels": { + "cluster": "vault-cluster-23b671c7" + } + } + ], + "Counters": [ + { + "Name": "vault.raft.replication.appendEntries.logs", + "Count": 130, + "Rate": 0.2, + "Sum": 2, + "Min": 0, + "Max": 1, + "Mean": 0.015384615384615385, + "Stddev": 0.12355304447984486, + "Labels": { + "peer_id": "clustnode-02" + } + } + ], + "Samples": [ + { + "Name": "vault.token.lookup", + "Count": 5135, + "Rate": 87.21228296905755, + "Sum": 872.1228296905756, + "Min": 0.06690400093793869, + "Max": 16.22449493408203, + "Mean": 0.1698389152269865, + "Stddev": 0.24637634000854705, + "Labels": {} + } + ], + "Timestamp": "2021-11-30 15:49:00 +0000 UTC" +} diff --git a/plugins/inputs/vault/vault.go b/plugins/inputs/vault/vault.go new file mode 100644 index 0000000000000..04524c78ab28d --- /dev/null +++ b/plugins/inputs/vault/vault.go @@ -0,0 +1,214 @@ +package vault + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Vault configuration object +type Vault struct { + URL string `toml:"url"` + + TokenFile string `toml:"token_file"` + Token string `toml:"token"` + + ResponseTimeout config.Duration `toml:"response_timeout"` + + tls.ClientConfig + + roundTripper http.RoundTripper +} + +const timeLayout = "2006-01-02 15:04:05 -0700 MST" + +const sampleConfig = ` + ## URL for the Vault agent + # url = "http://127.0.0.1:8200" + + ## Use Vault token for authorization. + ## Vault token configuration is mandatory. + ## If both are empty or both are set, an error is thrown. + # token_file = "/path/to/auth/token" + ## OR + token = "s.CDDrgg5zPv5ssI0Z2P4qxJj2" + + ## Set response_timeout (default 5 seconds) + # response_timeout = "5s" + + ## Optional TLS Config + # tls_ca = /path/to/cafile + # tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile +` + +func init() { + inputs.Add("vault", func() telegraf.Input { + return &Vault{ + ResponseTimeout: config.Duration(5 * time.Second), + } + }) +} + +// SampleConfig returns a sample config +func (n *Vault) SampleConfig() string { + return sampleConfig +} + +// Description returns a description of the plugin +func (n *Vault) Description() string { + return "Read metrics from the Vault API" +} + +func (n *Vault) Init() error { + if n.URL == "" { + n.URL = "http://127.0.0.1:8200" + } + + if n.TokenFile == "" && n.Token == "" { + return fmt.Errorf("token missing") + } + + if n.TokenFile != "" && n.Token != "" { + return fmt.Errorf("both token_file and token are set") + } + + if n.TokenFile != "" { + token, err := os.ReadFile(n.TokenFile) + if err != nil { + return fmt.Errorf("reading file failed: %v", err) + } + n.Token = strings.TrimSpace(string(token)) + } + + tlsCfg, err := n.ClientConfig.TLSConfig() + if err != nil { + return fmt.Errorf("setting up TLS configuration failed: %v", err) + } + + n.roundTripper = &http.Transport{ + TLSHandshakeTimeout: 5 * time.Second, + TLSClientConfig: tlsCfg, + ResponseHeaderTimeout: time.Duration(n.ResponseTimeout), + } + + return nil +} + +// Gather, collects metrics from Vault endpoint +func (n *Vault) Gather(acc telegraf.Accumulator) error { + sysMetrics, err := n.loadJSON(n.URL + "/v1/sys/metrics") + if err != nil { + return err + } + + return buildVaultMetrics(acc, sysMetrics) +} + +func (n *Vault) loadJSON(url string) (*SysMetrics, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req.Header.Set("X-Vault-Token", n.Token) + req.Header.Add("Accept", "application/json") + + resp, err := n.roundTripper.RoundTrip(req) + if err != nil { + return nil, fmt.Errorf("error making HTTP request to %s: %s", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%s returned HTTP status %s", url, resp.Status) + } + + var metrics SysMetrics + err = json.NewDecoder(resp.Body).Decode(&metrics) + if err != nil { + return nil, fmt.Errorf("error parsing json response: %s", err) + } + + return &metrics, nil +} + +// buildVaultMetrics, it builds all the metrics and adds them to the accumulator +func buildVaultMetrics(acc telegraf.Accumulator, sysMetrics *SysMetrics) error { + t, err := time.Parse(timeLayout, sysMetrics.Timestamp) + if err != nil { + return fmt.Errorf("error parsing time: %s", err) + } + + for _, counters := range sysMetrics.Counters { + tags := make(map[string]string) + for key, val := range counters.baseInfo.Labels { + convertedVal, err := internal.ToString(val) + if err != nil { + return fmt.Errorf("converting counter %s=%v failed: %v", key, val, err) + } + tags[key] = convertedVal + } + + fields := map[string]interface{}{ + "count": counters.Count, + "rate": counters.Rate, + "sum": counters.Sum, + "min": counters.Min, + "max": counters.Max, + "mean": counters.Mean, + "stddev": counters.Stddev, + } + acc.AddCounter(counters.baseInfo.Name, fields, tags, t) + } + + for _, gauges := range sysMetrics.Gauges { + tags := make(map[string]string) + for key, val := range gauges.baseInfo.Labels { + convertedVal, err := internal.ToString(val) + if err != nil { + return fmt.Errorf("converting gauges %s=%v failed: %v", key, val, err) + } + tags[key] = convertedVal + } + + fields := map[string]interface{}{ + "value": gauges.Value, + } + + acc.AddGauge(gauges.Name, fields, tags, t) + } + + for _, summary := range sysMetrics.Summaries { + tags := make(map[string]string) + for key, val := range summary.baseInfo.Labels { + convertedVal, err := internal.ToString(val) + if err != nil { + return fmt.Errorf("converting summary %s=%v failed: %v", key, val, err) + } + tags[key] = convertedVal + } + + fields := map[string]interface{}{ + "count": summary.Count, + "rate": summary.Rate, + "sum": summary.Sum, + "stddev": summary.Stddev, + "min": summary.Min, + "max": summary.Max, + "mean": summary.Mean, + } + acc.AddCounter(summary.Name, fields, tags, t) + } + + return nil +} diff --git a/plugins/inputs/vault/vault_metrics.go b/plugins/inputs/vault/vault_metrics.go new file mode 100644 index 0000000000000..8100f98915d22 --- /dev/null +++ b/plugins/inputs/vault/vault_metrics.go @@ -0,0 +1,40 @@ +package vault + +type SysMetrics struct { + Timestamp string `json:"timestamp"` + Gauges []gauge `json:"Gauges"` + Counters []counter `json:"Counters"` + Summaries []summary `json:"Samples"` +} + +type baseInfo struct { + Name string `json:"Name"` + Labels map[string]interface{} `json:"Labels"` +} + +type gauge struct { + baseInfo + Value int `json:"Value"` +} + +type counter struct { + baseInfo + Count int `json:"Count"` + Rate float64 `json:"Rate"` + Sum int `json:"Sum"` + Min int `json:"Min"` + Max int `json:"Max"` + Mean float64 `json:"Mean"` + Stddev float64 `json:"Stddev"` +} + +type summary struct { + baseInfo + Count int `json:"Count"` + Rate float64 `json:"Rate"` + Sum float64 `json:"Sum"` + Min float64 `json:"Min"` + Max float64 `json:"Max"` + Mean float64 `json:"Mean"` + Stddev float64 `json:"Stddev"` +} diff --git a/plugins/inputs/vault/vault_test.go b/plugins/inputs/vault/vault_test.go new file mode 100644 index 0000000000000..1cf72584d1cb4 --- /dev/null +++ b/plugins/inputs/vault/vault_test.go @@ -0,0 +1,97 @@ +package vault + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestVaultStats(t *testing.T) { + var applyTests = []struct { + name string + expected []telegraf.Metric + }{ + { + name: "Metrics", + expected: []telegraf.Metric{ + testutil.MustMetric( + "vault.raft.replication.appendEntries.logs", + map[string]string{ + "peer_id": "clustnode-02", + }, + map[string]interface{}{ + "count": int(130), + "rate": float64(0.2), + "sum": int(2), + "min": int(0), + "max": int(1), + "mean": float64(0.015384615384615385), + "stddev": float64(0.12355304447984486), + }, + time.Unix(1638287340, 0), + 1, + ), + testutil.MustMetric( + "vault.core.unsealed", + map[string]string{ + "cluster": "vault-cluster-23b671c7", + }, + map[string]interface{}{ + "value": int(1), + }, + time.Unix(1638287340, 0), + 2, + ), + testutil.MustMetric( + "vault.token.lookup", + map[string]string{}, + map[string]interface{}{ + "count": int(5135), + "max": float64(16.22449493408203), + "mean": float64(0.1698389152269865), + "min": float64(0.06690400093793869), + "rate": float64(87.21228296905755), + "stddev": float64(0.24637634000854705), + "sum": float64(872.1228296905756), + }, + time.Unix(1638287340, 0), + 1, + ), + }, + }, + } + + for _, tt := range applyTests { + t.Run(tt.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.RequestURI == "/v1/sys/metrics" { + w.WriteHeader(http.StatusOK) + responseKeyMetrics, _ := ioutil.ReadFile("testdata/response_key_metrics.json") + _, err := fmt.Fprintln(w, string(responseKeyMetrics)) + require.NoError(t, err) + } + })) + defer ts.Close() + + plugin := &Vault{ + URL: ts.URL, + Token: "s.CDDrgg5zPv5ssI0Z2P4qxJj2", + } + err := plugin.Init() + require.NoError(t, err) + + acc := testutil.Accumulator{} + err = plugin.Gather(&acc) + require.NoError(t, err) + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics()) + }) + } +}