Skip to content

Commit

Permalink
[Metricbeat]Combine metrics with no dimension into one event (#17345)
Browse files Browse the repository at this point in the history
* Combine metrics with no dimension
* Add unit tests for CreateEvents function w/o dimensions
  • Loading branch information
kaiyan-sheng authored Apr 1, 2020
1 parent 58a3ddc commit d853f27
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Use max in k8s overview dashboard aggregations. {pull}17015[17015]
- Fix Disk Used and Disk Usage visualizations in the Metricbeat System dashboards. {issue}12435[12435] {pull}17272[17272]
- Fix missing Accept header for Prometheus and OpenMetrics module. {issue}16870[16870] {pull}17291[17291]
- Combine cloudwatch aggregated metrics into single event. {pull}17345[17345]

*Packetbeat*

Expand Down
1 change: 1 addition & 0 deletions metricbeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ image::./images/metricbeat-aws-elb-overview.png[]
[float]
=== `lambda`
When an invocation completes, Lambda sends a set of metrics to CloudWatch for that invocation.
Default period in aws module configuration is set to `5m` for lambda metricset.
The lambda metricset comes with a predefined dashboard:

image::./images/metricbeat-aws-lambda-overview.png[]
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/module/aws/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ image::./images/metricbeat-aws-elb-overview.png[]
[float]
=== `lambda`
When an invocation completes, Lambda sends a set of metrics to CloudWatch for that invocation.
Default period in aws module configuration is set to `5m` for lambda metricset.
The lambda metricset comes with a predefined dashboard:

image::./images/metricbeat-aws-lambda-overview.png[]
Expand Down
48 changes: 23 additions & 25 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
svcResourceAPI := resourcegroupstaggingapi.New(awscommon.EnrichAWSConfigWithEndpoint(
m.Endpoint, "tagging", regionName, awsConfig))

eventsWithIdentifier, eventsNoIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, listMetricDetailTotal.metricsWithStats, listMetricDetailTotal.resourceTypeFilters, regionName, startTime, endTime)
eventsWithIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, listMetricDetailTotal.metricsWithStats, listMetricDetailTotal.resourceTypeFilters, regionName, startTime, endTime)
if err != nil {
return errors.Wrap(err, "createEvents failed for region "+regionName)
}

err = reportEvents(eventsWithIdentifier, eventsNoIdentifier, report)
err = reportEvents(eventsWithIdentifier, report)
if err != nil {
return errors.Wrap(err, "reportEvents failed")
}
Expand Down Expand Up @@ -183,12 +183,12 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// get resource type filters and tags filters for each namespace
resourceTypeTagFilters := constructTagsFilters(namespaceDetails)

eventsWithIdentifier, eventsNoIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, filteredMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
eventsWithIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, filteredMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
if err != nil {
return errors.Wrap(err, "createEvents failed for region "+regionName)
}

err = reportEvents(eventsWithIdentifier, eventsNoIdentifier, report)
err = reportEvents(eventsWithIdentifier, report)
if err != nil {
return errors.Wrap(err, "reportEvents failed")
}
Expand Down Expand Up @@ -427,23 +427,20 @@ func insertRootFields(event mb.Event, metricValue float64, labels []string) mb.E
return event
}

func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcResourceAPI resourcegroupstaggingapiiface.ClientAPI, listMetricWithStatsTotal []metricsWithStatistics, resourceTypeTagFilters map[string][]aws.Tag, regionName string, startTime time.Time, endTime time.Time) (map[string]mb.Event, []mb.Event, error) {
func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcResourceAPI resourcegroupstaggingapiiface.ClientAPI, listMetricWithStatsTotal []metricsWithStatistics, resourceTypeTagFilters map[string][]aws.Tag, regionName string, startTime time.Time, endTime time.Time) (map[string]mb.Event, error) {
// Initialize events for each identifier.
events := map[string]mb.Event{}

// Initialize events for the ones without identifiers.
var eventsNoIdentifier []mb.Event

// Construct metricDataQueries
metricDataQueries := createMetricDataQueries(listMetricWithStatsTotal, m.Period)
if len(metricDataQueries) == 0 {
return events, eventsNoIdentifier, nil
return events, nil
}

// Use metricDataQueries to make GetMetricData API calls
metricDataResults, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime)
if err != nil {
return events, eventsNoIdentifier, errors.Wrap(err, "GetMetricDataResults failed")
return events, errors.Wrap(err, "GetMetricDataResults failed")
}

// Find a timestamp for all metrics in output
Expand All @@ -459,9 +456,12 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
if exists {
labels := strings.Split(*output.Label, labelSeperator)
if len(labels) != 5 {
eventNew := aws.InitEvent(regionName, m.AccountName, m.AccountID)
eventNew = insertRootFields(eventNew, output.Values[timestampIdx], labels)
eventsNoIdentifier = append(eventsNoIdentifier, eventNew)
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
continue
}

Expand All @@ -473,7 +473,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
}
}
}
return events, eventsNoIdentifier, nil
return events, nil
}

// Get tags
Expand Down Expand Up @@ -509,9 +509,13 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
if len(tagsFilter) != 0 {
continue
}
eventNew := aws.InitEvent(regionName, m.AccountName, m.AccountID)
eventNew = insertRootFields(eventNew, output.Values[timestampIdx], labels)
eventsNoIdentifier = append(eventsNoIdentifier, eventNew)

// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
continue
}

Expand All @@ -534,21 +538,15 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
}
}
}
return events, eventsNoIdentifier, nil
return events, nil
}

func reportEvents(eventsWithIdentifier map[string]mb.Event, eventsNoIdentifier []mb.Event, report mb.ReporterV2) error {
func reportEvents(eventsWithIdentifier map[string]mb.Event, report mb.ReporterV2) error {
for _, event := range eventsWithIdentifier {
if reported := report.Event(event); !reported {
return nil
}
}

for _, event := range eventsNoIdentifier {
if reported := report.Event(event); !reported {
return nil
}
}
return nil
}

Expand Down
231 changes: 231 additions & 0 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,37 @@
package cloudwatch

import (
"net/http"
"testing"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/x-pack/metricbeat/module/aws"
)

var (
regionName = "us-west-1"
timestamp = time.Now()
accountID = "123456789012"

id1 = "cpu"
value1 = 0.25
label1 = "CPUUtilization|AWS/EC2|Average|InstanceId|i-1"

id2 = "disk"
value2 = 5.0
label2 = "DiskReadOps|AWS/EC2|Average|InstanceId|i-1"

label3 = "CPUUtilization|AWS/EC2|Average"
label4 = "DiskReadOps|AWS/EC2|Average"

instanceID1 = "i-1"
instanceID2 = "i-2"
namespace = "AWS/EC2"
Expand Down Expand Up @@ -1075,3 +1095,214 @@ func TestCheckStatistics(t *testing.T) {
})
}
}

// MockCloudWatchClient struct is used for unit tests.
type MockCloudWatchClient struct {
cloudwatchiface.ClientAPI
}

// MockCloudWatchClientWithoutDim struct is used for unit tests.
type MockCloudWatchClientWithoutDim struct {
cloudwatchiface.ClientAPI
}

// MockResourceGroupsTaggingClient is used for unit tests.
type MockResourceGroupsTaggingClient struct {
resourcegroupstaggingapiiface.ClientAPI
}

func (m *MockCloudWatchClient) ListMetricsRequest(input *cloudwatch.ListMetricsInput) cloudwatch.ListMetricsRequest {
dim := cloudwatch.Dimension{
Name: &dimName,
Value: &instanceID1,
}
httpReq, _ := http.NewRequest("", "", nil)
return cloudwatch.ListMetricsRequest{
Request: &awssdk.Request{
Data: &cloudwatch.ListMetricsOutput{
Metrics: []cloudwatch.Metric{
{
MetricName: &metricName1,
Namespace: &namespace,
Dimensions: []cloudwatch.Dimension{dim},
},
},
},
HTTPRequest: httpReq,
},
}
}

func (m *MockCloudWatchClient) GetMetricDataRequest(input *cloudwatch.GetMetricDataInput) cloudwatch.GetMetricDataRequest {
httpReq, _ := http.NewRequest("", "", nil)

return cloudwatch.GetMetricDataRequest{
Request: &awssdk.Request{
Data: &cloudwatch.GetMetricDataOutput{
MetricDataResults: []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
Values: []float64{value1},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label2,
Values: []float64{value2},
Timestamps: []time.Time{timestamp},
},
},
},
HTTPRequest: httpReq,
},
}
}

func (m *MockCloudWatchClientWithoutDim) ListMetricsRequest(input *cloudwatch.ListMetricsInput) cloudwatch.ListMetricsRequest {
httpReq, _ := http.NewRequest("", "", nil)
return cloudwatch.ListMetricsRequest{
Request: &awssdk.Request{
Data: &cloudwatch.ListMetricsOutput{
Metrics: []cloudwatch.Metric{
{
MetricName: &metricName1,
Namespace: &namespace,
},
},
},
HTTPRequest: httpReq,
},
}
}

func (m *MockCloudWatchClientWithoutDim) GetMetricDataRequest(input *cloudwatch.GetMetricDataInput) cloudwatch.GetMetricDataRequest {
httpReq, _ := http.NewRequest("", "", nil)

return cloudwatch.GetMetricDataRequest{
Request: &awssdk.Request{
Data: &cloudwatch.GetMetricDataOutput{
MetricDataResults: []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label3,
Values: []float64{value1},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label4,
Values: []float64{value2},
Timestamps: []time.Time{timestamp},
},
},
},
HTTPRequest: httpReq,
},
}
}

func (m *MockResourceGroupsTaggingClient) GetResourcesRequest(input *resourcegroupstaggingapi.GetResourcesInput) resourcegroupstaggingapi.GetResourcesRequest {
httpReq, _ := http.NewRequest("", "", nil)
return resourcegroupstaggingapi.GetResourcesRequest{
Request: &awssdk.Request{
Data: &resourcegroupstaggingapi.GetResourcesOutput{
PaginationToken: awssdk.String(""),
ResourceTagMappingList: []resourcegroupstaggingapi.ResourceTagMapping{
{
ResourceARN: awssdk.String("arn:aws:ec2:us-west-1:123456789012:instance:i-1"),
Tags: []resourcegroupstaggingapi.Tag{
{
Key: awssdk.String("name"),
Value: awssdk.String("test-ec2"),
},
},
},
},
},
HTTPRequest: httpReq,
},
}
}

func TestCreateEventsWithIdentifier(t *testing.T) {
m := MetricSet{}
m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}}
m.MetricSet = &aws.MetricSet{Period: 5}

mockTaggingSvc := &MockResourceGroupsTaggingClient{}
mockCloudwatchSvc := &MockCloudWatchClient{}
listMetricWithStatsTotal := []metricsWithStatistics{{
cloudwatch.Metric{
Dimensions: []cloudwatch.Dimension{{
Name: awssdk.String("InstanceId"),
Value: awssdk.String("i-1"),
}},
MetricName: awssdk.String("CPUUtilization"),
Namespace: awssdk.String("AWS/EC2"),
},
[]string{"Average"},
nil,
}}
resourceTypeTagFilters := map[string][]aws.Tag{}
resourceTypeTagFilters["ec2:instance"] = []aws.Tag{
{
Key: "name",
Value: "test-ec2",
},
}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)

metricValue, err := events["i-1"].RootFields.GetValue("aws.ec2.metrics.CPUUtilization.avg")
assert.NoError(t, err)
assert.Equal(t, value1, metricValue)

dimension, err := events["i-1"].RootFields.GetValue("aws.dimensions.InstanceId")
assert.NoError(t, err)
assert.Equal(t, instanceID1, dimension)
}

func TestCreateEventsWithoutIdentifier(t *testing.T) {
m := MetricSet{}
m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}}
m.MetricSet = &aws.MetricSet{Period: 5, AccountID: accountID}

mockTaggingSvc := &MockResourceGroupsTaggingClient{}
mockCloudwatchSvc := &MockCloudWatchClientWithoutDim{}
listMetricWithStatsTotal := []metricsWithStatistics{
{
cloudwatch.Metric{
MetricName: awssdk.String("CPUUtilization"),
Namespace: awssdk.String("AWS/EC2"),
},
[]string{"Average"},
nil,
},
{
cloudwatch.Metric{
MetricName: awssdk.String("DiskReadOps"),
Namespace: awssdk.String("AWS/EC2"),
},
[]string{"Average"},
nil,
},
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)

expectedID := regionName + accountID + namespace
metricValue, err := events[expectedID].RootFields.GetValue("aws.ec2.metrics.CPUUtilization.avg")
assert.NoError(t, err)
assert.Equal(t, value1, metricValue)

dimension, err := events[expectedID].RootFields.GetValue("aws.ec2.metrics.DiskReadOps.avg")
assert.NoError(t, err)
assert.Equal(t, value2, dimension)
}

0 comments on commit d853f27

Please sign in to comment.