Skip to content

Commit

Permalink
Adding integration test for elasticsearch Metricbeat module, xpack co…
Browse files Browse the repository at this point in the history
…de path (elastic#15975) (elastic#16008)

* Define x-pack metricsets for testing purposes

* Extract test setup into function

* Move test skipping to correct level

* Add integration test for xpack.enabled:true data path

* Remove debugging statement

* Replace usages of assert.* with require.*

* One more use of require.*

* Fixing method call args
  • Loading branch information
ycombinator authored Feb 11, 2020
1 parent f6822f1 commit 1f6ac35
Showing 1 changed file with 142 additions and 17 deletions.
159 changes: 142 additions & 17 deletions metricbeat/module/elasticsearch/elasticsearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/tests/compose"
Expand Down Expand Up @@ -62,33 +63,32 @@ var metricSets = []string{
"shard",
}

var xpackMetricSets = []string{
"ccr",
"enrich",
"cluster_stats",
"index",
"index_recovery",
"index_summary",
"ml_job",
"node_stats",
"shard",
}

func TestFetch(t *testing.T) {
service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch")

host := service.Host()
err := createIndex(host)
assert.NoError(t, err)

version, err := getElasticsearchVersion(host)
if err != nil {
t.Fatal("getting elasticsearch version", err)
}

err = enableTrialLicense(host, version)
assert.NoError(t, err)

err = createMLJob(host, version)
assert.NoError(t, err)

err = createCCRStats(host)
assert.NoError(t, err)

err = createEnrichStats(host)
assert.NoError(t, err)
setupTest(t, host, version)

for _, metricSet := range metricSets {
checkSkip(t, metricSet, version)
t.Run(metricSet, func(t *testing.T) {
checkSkip(t, metricSet, version)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet, host))
events, errs := mbtest.ReportingFetchV2Error(f)

Expand All @@ -104,7 +104,6 @@ func TestFetch(t *testing.T) {

func TestData(t *testing.T) {
service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch")

host := service.Host()

version, err := getElasticsearchVersion(host)
Expand All @@ -113,8 +112,8 @@ func TestData(t *testing.T) {
}

for _, metricSet := range metricSets {
checkSkip(t, metricSet, version)
t.Run(metricSet, func(t *testing.T) {
checkSkip(t, metricSet, version)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet, host))
err := mbtest.WriteEventsReporterV2Error(f, t, metricSet)
if err != nil {
Expand All @@ -124,6 +123,76 @@ func TestData(t *testing.T) {
}
}

func TestXPackEnabled(t *testing.T) {
service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch")
host := service.Host()

version, err := getElasticsearchVersion(host)
require.NoError(t, err)

setupTest(t, host, version)

metricSetToTypesMap := map[string][]string{
"ccr": []string{"ccr_stats", "ccr_auto_follow_stats"},
"cluster_stats": []string{"cluster_stats"},
"enrich": []string{"enrich_coordinator_stats"},
"index_recovery": []string{"index_recovery"},
"index_summary": []string{"indices_stats"},
"ml_job": []string{"job_stats"},
"node_stats": []string{"node_stats"},
}

config := getXPackConfig(host)

metricSets := mbtest.NewReportingMetricSetV2Errors(t, config)
for _, metricSet := range metricSets {
t.Run(metricSet.Name(), func(t *testing.T) {
checkSkip(t, metricSet.Name(), version)
events, errs := mbtest.ReportingFetchV2Error(metricSet)
require.Empty(t, errs)
require.NotEmpty(t, events)

// Special case: the `index` metricset generates as many events
// as there are distinct indices in Elasticsearch
if metricSet.Name() == "index" {
numIndices, err := countIndices(host)
require.NoError(t, err)
require.Len(t, events, numIndices)

for _, event := range events {
require.Equal(t, "index_stats", event.RootFields["type"])
require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index)
}

return
}

// Special case: the `shard` metricset generates as many events
// as there are distinct shards in Elasticsearch
if metricSet.Name() == "shard" {
numShards, err := countShards(host)
require.NoError(t, err)
require.Len(t, events, numShards)

for _, event := range events {
require.Equal(t, "shards", event.RootFields["type"])
require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index)
}

return
}

types := metricSetToTypesMap[metricSet.Name()]
require.Len(t, events, len(types))

for i, event := range events {
require.Equal(t, types[i], event.RootFields["type"])
require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index)
}
})
}
}

// GetConfig returns config for elasticsearch module
func getConfig(metricset string, host string) map[string]interface{} {
return map[string]interface{}{
Expand All @@ -134,6 +203,32 @@ func getConfig(metricset string, host string) map[string]interface{} {
}
}

func getXPackConfig(host string) map[string]interface{} {
return map[string]interface{}{
"module": elasticsearch.ModuleName,
"metricsets": xpackMetricSets,
"hosts": []string{host},
"xpack.enabled": true,
}
}

func setupTest(t *testing.T, esHost string, esVersion *common.Version) {
err := createIndex(esHost)
require.NoError(t, err)

err = enableTrialLicense(esHost, esVersion)
require.NoError(t, err)

err = createMLJob(esHost, esVersion)
require.NoError(t, err)

err = createCCRStats(esHost)
require.NoError(t, err)

err = createEnrichStats(esHost)
require.NoError(t, err)
}

// createIndex creates and elasticsearch index in case it does not exit yet
func createIndex(host string) error {
client := &http.Client{}
Expand Down Expand Up @@ -451,6 +546,36 @@ func ingestAndEnrichDoc(host string) error {
return err
}

func countIndices(elasticsearchHostPort string) (int, error) {
return countCatItems(elasticsearchHostPort, "indices")

}

func countShards(elasticsearchHostPort string) (int, error) {
return countCatItems(elasticsearchHostPort, "shards")
}

func countCatItems(elasticsearchHostPort, catObject string) (int, error) {
resp, err := http.Get("http://" + elasticsearchHostPort + "/_cat/" + catObject + "?format=json")
if err != nil {
return 0, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}

var data []common.MapStr
err = json.Unmarshal(body, &data)
if err != nil {
return 0, err
}

return len(data), nil
}

func checkSkip(t *testing.T, metricset string, version *common.Version) {
checkSkipFeature := func(name string, availableVersion *common.Version) {
isAPIAvailable := elastic.IsFeatureAvailable(version, availableVersion)
Expand Down

0 comments on commit 1f6ac35

Please sign in to comment.