diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1d5cafcb155b..14ab2ec89f7b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix Elasticsearch license endpoint URL referenced in error message. {issue}17880[17880] {pull}18030[18030] - Change `decode_json_fields` processor, to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] - Explicitly detect missing variables in autodiscover configuration, log them at the debug level. {issue}20568[20568] {pull}20898[20898] +- Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197] *Auditbeat* diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 3afa7084057d..0794ee1c13bd 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -90,6 +90,7 @@ func NewClient( Kerberos: s.Kerberos, Proxy: s.Proxy, ProxyDisable: s.ProxyDisable, + Observer: s.Observer, Parameters: s.Parameters, CompressionLevel: s.CompressionLevel, EscapeHTML: s.EscapeHTML, diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 9abbbe398738..e243cb7d1e46 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -40,6 +40,7 @@ import ( "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" "github.com/elastic/beats/v7/libbeat/idxmgmt" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" ) @@ -78,7 +79,7 @@ func TestClientPublishEventKerberosAware(t *testing.T) { } func testPublishEvent(t *testing.T, index string, cfg map[string]interface{}) { - output, client := connectTestEs(t, cfg) + output, client := connectTestEsWithStats(t, cfg, index) // drop old index preparing test client.conn.Delete(index, "", "", nil) @@ -107,6 +108,12 @@ func testPublishEvent(t *testing.T, index string, cfg map[string]interface{}) { } assert.Equal(t, 1, resp.Count) + + outputSnapshot := monitoring.CollectFlatSnapshot(monitoring.Default.GetRegistry("output-"+index), monitoring.Full, true) + assert.Greater(t, outputSnapshot.Ints["write.bytes"], int64(0), "output.events.write.bytes must be greater than 0") + assert.Greater(t, outputSnapshot.Ints["read.bytes"], int64(0), "output.events.read.bytes must be greater than 0") + assert.Equal(t, int64(0), outputSnapshot.Ints["write.errors"]) + assert.Equal(t, int64(0), outputSnapshot.Ints["read.errors"]) } func TestClientPublishEventWithPipeline(t *testing.T) { @@ -117,7 +124,7 @@ func TestClientPublishEventWithPipeline(t *testing.T) { index := "beat-int-pub-single-with-pipeline" pipeline := "beat-int-pub-single-pipeline" - output, client := connectTestEs(t, obj{ + output, client := connectTestEsWithoutStats(t, obj{ "index": index, "pipeline": "%{[pipeline]}", }) @@ -199,7 +206,7 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { index := "beat-int-pub-bulk-with-pipeline" pipeline := "beat-int-pub-bulk-pipeline" - output, client := connectTestEs(t, obj{ + output, client := connectTestEsWithoutStats(t, obj{ "index": index, "pipeline": "%{[pipeline]}", }) @@ -276,7 +283,7 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { func TestClientPublishTracer(t *testing.T) { index := "beat-apm-tracer-test" - output, client := connectTestEs(t, map[string]interface{}{ + output, client := connectTestEsWithoutStats(t, map[string]interface{}{ "index": index, }) @@ -314,7 +321,17 @@ func TestClientPublishTracer(t *testing.T) { assert.Equal(t, "/_bulk", secondSpan.Context.HTTP.URL.Path) } -func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) { +func connectTestEsWithStats(t *testing.T, cfg interface{}, suffix string) (outputs.Client, *Client) { + m := monitoring.Default.NewRegistry("output-" + suffix) + s := outputs.NewStats(m) + return connectTestEs(t, cfg, s) +} + +func connectTestEsWithoutStats(t *testing.T, cfg interface{}) (outputs.Client, *Client) { + return connectTestEs(t, cfg, outputs.NewNilObserver()) +} + +func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outputs.Client, *Client) { config, err := common.NewConfigFrom(map[string]interface{}{ "hosts": eslegtest.GetEsHost(), "username": eslegtest.GetUser(), @@ -337,7 +354,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) { info := beat.Info{Beat: "libbeat"} im, _ := idxmgmt.DefaultSupport(nil, info, nil) - output, err := makeES(im, info, outputs.NewNilObserver(), config) + output, err := makeES(im, info, stats, config) if err != nil { t.Fatal(err) } @@ -356,7 +373,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) { // setupRoleMapping sets up role mapping for the Kerberos user beats@ELASTIC func setupRoleMapping(t *testing.T, host string) error { - _, client := connectTestEs(t, map[string]interface{}{ + _, client := connectTestEsWithoutStats(t, map[string]interface{}{ "hosts": host, "username": "elastic", "password": "changeme",