Skip to content

Commit

Permalink
Cherry-pick #18524 to 7.7: [Metricbeat] Fix tags_filter for cloudwatc…
Browse files Browse the repository at this point in the history
…h metricset in aws module (#18618)

* [Metricbeat] Fix tags_filter for cloudwatch metricset in aws module (#18524)

* Fix tags_filter for cloudwatch metricset
* if tags_filter is given, overwrite tags in cloudwatch specific config

(cherry picked from commit 5aa3d0c)
  • Loading branch information
kaiyan-sheng authored May 18, 2020
1 parent f34eeda commit d75329c
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add privileged option for Auditbeat in Openshift {pull}17637[17637]
- Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624]
- Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753]
- Fix tags_filter for cloudwatch metricset in aws. {pull}18524[18524]

*Packetbeat*

Expand Down
5 changes: 5 additions & 0 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
TagsFilter: config.TagsFilter,
}

base.Logger().Debug("Metricset level config for period: ", metricSet.Period)
base.Logger().Debug("Metricset level config for tags filter: ", metricSet.TagsFilter)

// Get IAM account name
awsConfig.Region = "us-east-1"
svcIam := iam.New(awscommon.EnrichAWSConfigWithEndpoint(
Expand Down Expand Up @@ -129,11 +132,13 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
}

metricSet.RegionsList = completeRegionsList
base.Logger().Debug("Metricset level config for regions: ", metricSet.RegionsList)
return &metricSet, nil
}

// Construct MetricSet with specific regions list from config
metricSet.RegionsList = config.Regions
base.Logger().Debug("Metricset level config for regions: ", metricSet.RegionsList)
return &metricSet, nil
}

Expand Down
25 changes: 23 additions & 2 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type namespaceDetail struct {
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
logger := logp.NewLogger(metricsetName)
metricSet, err := aws.NewMetricSet(base)
if err != nil {
return nil, errors.Wrap(err, "error creating aws metricset")
Expand All @@ -109,13 +110,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.Wrap(err, "error unpack raw module config using UnpackConfig")
}

logger.Debugf("cloudwatch config = %s", config)
if len(config.CloudwatchMetrics) == 0 {
return nil, errors.New("metrics in config is missing")
}

return &MetricSet{
MetricSet: metricSet,
logger: logp.NewLogger(metricsetName),
logger: logger,
CloudwatchConfigs: config.CloudwatchMetrics,
}, nil
}
Expand Down Expand Up @@ -288,6 +290,12 @@ func (m *MetricSet) readCloudwatchConfig() (listMetricWithDetail, map[string][]n
resourceTypesWithTags := map[string][]aws.Tag{}

for _, config := range m.CloudwatchConfigs {
// If tags_filter on metricset level is given, overwrite tags in
// cloudwatch metrics with tags_filter.
if m.MetricSet.TagsFilter != nil {
config.Tags = m.MetricSet.TagsFilter
}

// If there is no statistic method specified, then use the default.
if config.Statistic == nil {
config.Statistic = defaultStatistics
Expand Down Expand Up @@ -461,6 +469,8 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(metricDataResults)

// Create events when there is no tags_filter or tags.resource_type_filter specified.
if len(resourceTypeTagFilters) == 0 {
if !timestamp.IsZero() {
for _, output := range metricDataResults {
Expand Down Expand Up @@ -492,8 +502,10 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
return events, nil
}

// Get tags
// Create events with tags
for resourceType, tagsFilter := range resourceTypeTagFilters {
m.logger.Debugf("resourceType = %s", resourceType)
m.logger.Debugf("tagsFilter = %s", tagsFilter)
resourceTagMap, err := aws.GetResourcesTags(svcResourceAPI, []string{resourceType})
if err != nil {
// If GetResourcesTags failed, continue report event just without tags.
Expand All @@ -507,8 +519,11 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
// filter resourceTagMap
for identifier, tags := range resourceTagMap {
if exists := aws.CheckTagFiltersExist(tagsFilter, tags); !exists {
m.logger.Debugf("In region %s, service %s tags does not match tags_filter", regionName, identifier)
delete(resourceTagMap, identifier)
continue
}
m.logger.Debugf("In region %s, service %s tags match tags_filter", regionName, identifier)
}

if !timestamp.IsZero() {
Expand Down Expand Up @@ -537,6 +552,12 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes

identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
// when tagsFilter is not empty but no entry in
// resourceTagMap for this identifier, do not initialize
// an event for this identifier.
if len(tagsFilter) != 0 && resourceTagMap[identifierValue] == nil {
continue
}
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)
Expand Down
41 changes: 41 additions & 0 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func TestConstructLabel(t *testing.T) {

func TestReadCloudwatchConfig(t *testing.T) {
m := MetricSet{}
m.MetricSet = &aws.MetricSet{Period: 5}
resourceTypeFiltersEC2 := map[string][]aws.Tag{}
resourceTypeFiltersEC2["ec2:instance"] = nil

Expand Down Expand Up @@ -1313,6 +1314,46 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) {
assert.Equal(t, value2, dimension)
}

func TestCreateEventsWithTagsFilter(t *testing.T) {
m := MetricSet{}
m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}}
m.MetricSet = &aws.MetricSet{Period: 5}
m.logger = logp.NewLogger("test")

mockTaggingSvc := &MockResourceGroupsTaggingClient{}
mockCloudwatchSvc := &MockCloudWatchClient{}
listMetricWithStatsTotal := []metricsWithStatistics{
{
cloudwatch.Metric{
Dimensions: []cloudwatch.Dimension{{
Name: awssdk.String("InstanceId"),
Value: awssdk.String("i-1"),
}},
MetricName: awssdk.String("CPUUtilization"),
Namespace: awssdk.String("AWS/EC2"),
},
[]string{"Average"},
[]aws.Tag{
{Key: "name", Value: "test-ec2"},
},
},
}

// Specify a tag filter that does not match the tag for i-1
resourceTypeTagFilters := map[string][]aws.Tag{}
resourceTypeTagFilters["ec2:instance"] = []aws.Tag{
{
Key: "name",
Value: "foo",
},
}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
assert.Equal(t, 0, len(events))
}

func TestInsertTags(t *testing.T) {
identifier1 := "StandardStorage,test-s3-1"
identifier2 := "test-s3-2"
Expand Down

0 comments on commit d75329c

Please sign in to comment.