Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add support for Cloudtrail digest files #21086

Merged
merged 3 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908]
- Remove wrongly mapped `tls.client.server_name` from `fortinet/firewall` fileset. {pull}20983[20983]
- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048]
- Fix error when processing AWS Cloudtrail Digest logs. {pull}21086[21086] {issue}20943[20943]

*Heartbeat*

Expand Down
114 changes: 114 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,120 @@ type: flattened

--

[float]
=== digest

Fields from Cloudtrail Digest Logs


*`aws.cloudtrail.digest.log_files`*::
+
--
A list of Logfiles contained in the digest.
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved

type: nested

--

*`aws.cloudtrail.digest.start_time`*::
+
--
The starting UTC time range that the digest file covers, taking as a reference the time in which log files have been delivered by CloudTrail.

type: date

--

*`aws.cloudtrail.digest.end_time`*::
+
--
The ending UTC time range that the digest file covers, taking as a reference the time in which log files have been delivered by CloudTrail.

type: date

--

*`aws.cloudtrail.digest.s3_bucket`*::
+
--
The name of the Amazon S3 bucket to which the current digest file has been delivered.

type: keyword

--

*`aws.cloudtrail.digest.s3_object`*::
+
--
The Amazon S3 object key (that is, the Amazon S3 bucket location) of the current digest file.

type: keyword

--

*`aws.cloudtrail.digest.newest_event_time`*::
+
--
The UTC time of the most recent event among all of the events in the log files in the digest.

type: date

--

*`aws.cloudtrail.digest.oldest_event_time`*::
+
--
The UTC time of the oldest event among all of the events in the log files in the digest.

type: date

--

*`aws.cloudtrail.digest.previous_s3_bucket`*::
+
--
The Amazon S3 bucket to which the previous digest file was delivered.

type: keyword

--

*`aws.cloudtrail.digest.previous_hash_algorithm`*::
+
--
The name of the hash algorithm that was used to hash the previous digest file.

type: keyword

--

*`aws.cloudtrail.digest.public_key_fingerprint`*::
+
--
The hexadecimal encoded fingerprint of the public key that matches the private key used to sign this digest file.

type: keyword

--

*`aws.cloudtrail.digest.signature_algorithm`*::
+
--
The algorithm used to sign the digest file.

type: keyword

--

*`aws.cloudtrail.insight_details`*::
+
--
Shows information about the underlying triggers of an Insights event, such as event source, user agent, statistics, API name, and whether the event is the start or end of the Insights event.

type: flattened

--

[float]
=== cloudwatch

Expand Down
25 changes: 25 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,31 @@ type will not be checked.
If a file has "application/json" content-type, `expand_event_list_from_field`
becomes required to read the json file.

[float]
==== `file_selectors`

If the SQS queue will have events that correspond to files that
{beatname_uc} shouldn't process `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are
made up of `regex` and `expand_event_list_from_field` options. The
`regex` should match the S3 object key in the SQS message, and the
optional `expand_event_list_from_field` is the same as the global
setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones
specified in the `file_selectors`. Regex syntax is the same as the Go
language. Files that don't match one of the regexes won't be
processed.

["source", "yml"]
----
file_selectors:
- regex: '^AWSLogs/\d+/CloudTrail/'
expand_event_list_from_field: 'Records'
- regex: '^AWSLogs/\d+/CloudTrail-Digest'
```
----


[float]
==== `api_timeout`

Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# Process CloudTrail logs
# default is true, set to false to skip Cloudtrail logs
# var.process_cloudtrail_logs: false

# Process CloudTrail Digest logs
# default true, set to false to skip CloudTrail Digest logs
# var.process_digest_logs: false

# Process CloudTrail Insight logs
# default true, set to false to skip CloudTrail Insight logs
# var.process_insight_logs: false

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down
16 changes: 16 additions & 0 deletions x-pack/filebeat/input/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package s3

import (
"fmt"
"regexp"
"time"

"github.com/elastic/beats/v7/filebeat/harvester"
Expand All @@ -19,6 +20,14 @@ type config struct {
AwsConfig awscommon.ConfigAWS `config:",inline"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
APITimeout time.Duration `config:"api_timeout"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
}

// FileSelectorCfg defines type and configuration of FileSelectors
type FileSelectorCfg struct {
RegexString string `config:"regex"`
Regex *regexp.Regexp `config:",ignore"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good! One question: what happens if file_selectors are specified and also the global expand_event_list_from_field is given, which one takes priority? Maybe we should clarify this in the documentation 😄

}

func defaultConfig() config {
Expand All @@ -40,5 +49,12 @@ func (c *config) Validate() error {
return fmt.Errorf("api timeout %v needs to be larger than"+
" 0s and smaller than half of the visibility timeout", c.APITimeout)
}
for i := range c.FileSelectors {
r, err := regexp.Compile(c.FileSelectors[i].RegexString)
if err != nil {
return err
}
c.FileSelectors[i].Regex = r
}
return nil
}
70 changes: 45 additions & 25 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ type s3Input struct {
}

type s3Info struct {
name string
key string
region string
arn string
name string
key string
region string
arn string
expandEventListFromField string
}

type bucket struct {
Expand Down Expand Up @@ -252,7 +253,7 @@ func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityT
func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, wg *sync.WaitGroup, errC chan error) {
defer wg.Done()

s3Infos, err := handleSQSMessage(message)
s3Infos, err := p.handleSQSMessage(message)
if err != nil {
p.logger.Error(errors.Wrap(err, "handleSQSMessage failed"))
return
Expand Down Expand Up @@ -352,7 +353,7 @@ func getRegionFromQueueURL(queueURL string) (string, error) {
}

// handle message
func handleSQSMessage(m sqs.Message) ([]s3Info, error) {
func (p *s3Input) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
msg := sqsMessage{}
err := json.Unmarshal([]byte(*m.Body), &msg)
if err != nil {
Expand All @@ -361,21 +362,40 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) {

var s3Infos []s3Info
for _, record := range msg.Records {
if record.EventSource == "aws:s3" && strings.HasPrefix(record.EventName, "ObjectCreated:") {
// Unescape substrings from s3 log name. For example, convert "%3D" back to "="
filename, err := url.QueryUnescape(record.S3.object.Key)
if err != nil {
return nil, errors.Wrapf(err, "url.QueryUnescape failed for '%s'", record.S3.object.Key)
}
if record.EventSource != "aws:s3" || !strings.HasPrefix(record.EventName, "ObjectCreated:") {
return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications")
}
// Unescape substrings from s3 log name. For example, convert "%3D" back to "="
filename, err := url.QueryUnescape(record.S3.object.Key)
if err != nil {
return nil, errors.Wrapf(err, "url.QueryUnescape failed for '%s'", record.S3.object.Key)
}

if len(p.config.FileSelectors) == 0 {
s3Infos = append(s3Infos, s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: p.config.ExpandEventListFromField,
})
} else {
return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications")
continue
}

for _, fs := range p.config.FileSelectors {
if fs.Regex == nil {
continue
}
if fs.Regex.MatchString(filename) {
s3Infos = append(s3Infos, s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: fs.ExpandEventListFromField,
})
break
}
}
}
return s3Infos, nil
Expand Down Expand Up @@ -456,7 +476,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || p.config.ExpandEventListFromField != "" {
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" {
decoder := json.NewDecoder(reader)
err := p.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
Expand Down Expand Up @@ -537,10 +557,10 @@ func (p *s3Input) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3
func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
switch f := jsonFields.(type) {
case map[string][]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if s3Info.expandEventListFromField != "" {
textValues, ok := f[s3Info.expandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
err := errors.Errorf("key '%s' not found", s3Info.expandEventListFromField)
p.logger.Error(err)
return offset, err
}
Expand All @@ -555,10 +575,10 @@ func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash
return offset, nil
}
case map[string]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if s3Info.expandEventListFromField != "" {
textValues, ok := f[s3Info.expandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
err := errors.Errorf("key '%s' not found", s3Info.expandEventListFromField)
p.logger.Error(err)
return offset, err
}
Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/s3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ func TestHandleMessage(t *testing.T) {
},
}

p := &s3Input{context: &channelContext{}}
for _, c := range casesPositive {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
s3Info, err := p.handleSQSMessage(c.message)
assert.NoError(t, err)
assert.Equal(t, len(c.expectedS3Infos), len(s3Info))
if len(s3Info) > 0 {
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestHandleMessage(t *testing.T) {

for _, c := range casesNegative {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
s3Info, err := p.handleSQSMessage(c.message)
assert.Error(t, err)
assert.Nil(t, s3Info)
})
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/module/aws/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# Process CloudTrail logs
# default is true, set to false to skip Cloudtrail logs
# var.process_cloudtrail_logs: false

# Process CloudTrail Digest logs
# default true, set to false to skip CloudTrail Digest logs
# var.process_digest_logs: false

# Process CloudTrail Insight logs
# default true, set to false to skip CloudTrail Insight logs
# var.process_insight_logs: false

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down
Loading