From 6987cc8a432e72ebd22c729c2372ae9f2714f009 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 19 Oct 2021 07:43:06 -0600 Subject: [PATCH] Add latency config option for aws-cloudwatch input (#28509) --- CHANGELOG.next.asciidoc | 2 + .../filebeat.inputs.reference.xpack.yml.tmpl | 4 ++ .../docs/inputs/input-aws-cloudwatch.asciidoc | 7 +++ x-pack/filebeat/filebeat.reference.yml | 4 ++ x-pack/filebeat/input/awscloudwatch/config.go | 1 + x-pack/filebeat/input/awscloudwatch/input.go | 9 +++- .../input/awscloudwatch/input_test.go | 44 ++++++++++++++++++- 7 files changed, 68 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ed74be0b4650..125945c910bd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -437,6 +437,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro - `beat` module respects `basepath` config option. {pull}28162[28162] - Fix list_docker.go {pull}28374[28374] - Divide RDS metric cpu.total.pct by 100. {pull}28456[28456] + *Packetbeat* - Handle truncated DNS records more gracefully. {issue}21495[21495] {pull}28297[28297] @@ -772,6 +773,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro - Release zoom module as GA. {pull}28106[28106] - Add support for secondary object attribute handling in ThreatIntel MISP module {pull}28124[28124] - Add `base64Decode` and `base64DecodeNoPad` functions to `httpsjon` templates. {pull}28385[28385] +- Add latency config option for aws-cloudwatch input. {pull}28509[28509] *Heartbeat* diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 4b68ef4bb0e1..74cba05676bc 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -146,3 +146,7 @@ # This is used to sleep between AWS `FilterLogEvents` API calls inside the same # collection period. #api_sleep: 200ms + + # This is used to shift collection start time and end time back in order to + # collect logs when there is a delay in CloudWatch. + #latency: 1m diff --git a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc index 0306c7540790..71f666743879 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc @@ -104,6 +104,13 @@ second (TPS)/account/Region. By default, `api_sleep` is 200 ms. This value shoul only be adjusted when there are multiple Filebeats or multiple Filebeat inputs collecting logs from the same region and AWS account. +[float] +==== `latency` +Some AWS services send logs to CloudWatch with a latency to process larger than +`aws-cloudwatch` input `scan_frequency`. This case, please specify a `latency` +parameter so collection start time and end time will be shifted by the given +latency amount. + [float] ==== `aws credentials` In order to make AWS API calls, `aws-cloudwatch` input requires AWS credentials. diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 94ea3c481ac9..91f8bf0bd2e9 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3102,6 +3102,10 @@ filebeat.inputs: # collection period. #api_sleep: 200ms + # This is used to shift collection start time and end time back in order to + # collect logs when there is a delay in CloudWatch. + #latency: 1m + # =========================== Filebeat autodiscover ============================ # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/x-pack/filebeat/input/awscloudwatch/config.go b/x-pack/filebeat/input/awscloudwatch/config.go index 210e7bb9a319..3f04813e78c4 100644 --- a/x-pack/filebeat/input/awscloudwatch/config.go +++ b/x-pack/filebeat/input/awscloudwatch/config.go @@ -24,6 +24,7 @@ type config struct { ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` APITimeout time.Duration `config:"api_timeout" validate:"min=0,nonzero"` APISleep time.Duration `config:"api_sleep" validate:"min=0,nonzero"` + Latency time.Duration `config:"latency"` AwsConfig awscommon.ConfigAWS `config:",inline"` } diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index fbcfc8aff5c9..3001449378d2 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -222,7 +222,7 @@ func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI) error { currentTime := time.Now() - startTime, endTime := getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency) + startTime, endTime := getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency, in.config.Latency) in.logger.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(startTime/1000, 0), time.Unix(endTime/1000, 0)) // overwrite prevEndTime using new endTime @@ -274,7 +274,12 @@ func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, end return filterLogEventsInput } -func getStartPosition(startPosition string, currentTime time.Time, prevEndTime int64, scanFrequency time.Duration) (startTime int64, endTime int64) { +func getStartPosition(startPosition string, currentTime time.Time, prevEndTime int64, scanFrequency time.Duration, latency time.Duration) (startTime int64, endTime int64) { + if latency != 0 { + // add latency if config is not 0 + currentTime = currentTime.Add(latency * -1) + } + switch startPosition { case "beginning": if prevEndTime != int64(0) { diff --git a/x-pack/filebeat/input/awscloudwatch/input_test.go b/x-pack/filebeat/input/awscloudwatch/input_test.go index 223bed567981..7d8b45f7d443 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_test.go @@ -26,6 +26,7 @@ func TestGetStartPosition(t *testing.T) { startPosition string prevEndTime int64 scanFrequency time.Duration + latency time.Duration expectedStartTime int64 expectedEndTime int64 }{ @@ -34,6 +35,7 @@ func TestGetStartPosition(t *testing.T) { "beginning", int64(0), 30 * time.Second, + 0, int64(0), int64(1590969600000), }, @@ -42,6 +44,7 @@ func TestGetStartPosition(t *testing.T) { "end", int64(0), 30 * time.Second, + 0, int64(1590969570000), int64(1590969600000), }, @@ -50,6 +53,7 @@ func TestGetStartPosition(t *testing.T) { "typo", int64(0), 30 * time.Second, + 0, int64(0), int64(0), }, @@ -58,6 +62,7 @@ func TestGetStartPosition(t *testing.T) { "beginning", int64(1590000000000), 30 * time.Second, + 0, int64(1590000000000), int64(1590969600000), }, @@ -66,14 +71,51 @@ func TestGetStartPosition(t *testing.T) { "end", int64(1590000000000), 30 * time.Second, + 0, int64(1590000000000), int64(1590969600000), }, + { + "startPosition=beginning with latency", + "beginning", + int64(0), + 30 * time.Second, + 10 * time.Minute, + int64(0), + int64(1590969000000), + }, + { + "startPosition=beginning with prevEndTime and latency", + "beginning", + int64(1590000000000), + 30 * time.Second, + 10 * time.Minute, + int64(1590000000000), + int64(1590969000000), + }, + { + "startPosition=end with latency", + "end", + int64(0), + 30 * time.Second, + 10 * time.Minute, + int64(1590968970000), + int64(1590969000000), + }, + { + "startPosition=end with prevEndTime and latency", + "end", + int64(1590000000000), + 30 * time.Second, + 10 * time.Minute, + int64(1590000000000), + int64(1590969000000), + }, } for _, c := range cases { t.Run(c.title, func(t *testing.T) { - startTime, endTime := getStartPosition(c.startPosition, currentTime, c.prevEndTime, c.scanFrequency) + startTime, endTime := getStartPosition(c.startPosition, currentTime, c.prevEndTime, c.scanFrequency, c.latency) assert.Equal(t, c.expectedStartTime, startTime) assert.Equal(t, c.expectedEndTime, endTime) })