Skip to content

Commit

Permalink
Add latency config option for aws-cloudwatch input (elastic#28509)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyan-sheng authored and wiwen committed Nov 1, 2021
1 parent 528b1e2 commit 6987cc8
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- `beat` module respects `basepath` config option. {pull}28162[28162]
- Fix list_docker.go {pull}28374[28374]
- Divide RDS metric cpu.total.pct by 100. {pull}28456[28456]

*Packetbeat*

- Handle truncated DNS records more gracefully. {issue}21495[21495] {pull}28297[28297]
Expand Down Expand Up @@ -772,6 +773,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Release zoom module as GA. {pull}28106[28106]
- Add support for secondary object attribute handling in ThreatIntel MISP module {pull}28124[28124]
- Add `base64Decode` and `base64DecodeNoPad` functions to `httpsjon` templates. {pull}28385[28385]
- Add latency config option for aws-cloudwatch input. {pull}28509[28509]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,7 @@
# This is used to sleep between AWS `FilterLogEvents` API calls inside the same
# collection period.
#api_sleep: 200ms

# This is used to shift collection start time and end time back in order to
# collect logs when there is a delay in CloudWatch.
#latency: 1m
7 changes: 7 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ second (TPS)/account/Region. By default, `api_sleep` is 200 ms. This value shoul
only be adjusted when there are multiple Filebeats or multiple Filebeat inputs
collecting logs from the same region and AWS account.

[float]
==== `latency`
Some AWS services send logs to CloudWatch with a latency to process larger than
`aws-cloudwatch` input `scan_frequency`. This case, please specify a `latency`
parameter so collection start time and end time will be shifted by the given
latency amount.

[float]
==== `aws credentials`
In order to make AWS API calls, `aws-cloudwatch` input requires AWS credentials.
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3102,6 +3102,10 @@ filebeat.inputs:
# collection period.
#api_sleep: 200ms

# This is used to shift collection start time and end time back in order to
# collect logs when there is a delay in CloudWatch.
#latency: 1m

# =========================== Filebeat autodiscover ============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awscloudwatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type config struct {
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
APITimeout time.Duration `config:"api_timeout" validate:"min=0,nonzero"`
APISleep time.Duration `config:"api_sleep" validate:"min=0,nonzero"`
Latency time.Duration `config:"latency"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
}

Expand Down
9 changes: 7 additions & 2 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI
// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI) error {
currentTime := time.Now()
startTime, endTime := getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency)
startTime, endTime := getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency, in.config.Latency)
in.logger.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(startTime/1000, 0), time.Unix(endTime/1000, 0))

// overwrite prevEndTime using new endTime
Expand Down Expand Up @@ -274,7 +274,12 @@ func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, end
return filterLogEventsInput
}

func getStartPosition(startPosition string, currentTime time.Time, prevEndTime int64, scanFrequency time.Duration) (startTime int64, endTime int64) {
func getStartPosition(startPosition string, currentTime time.Time, prevEndTime int64, scanFrequency time.Duration, latency time.Duration) (startTime int64, endTime int64) {
if latency != 0 {
// add latency if config is not 0
currentTime = currentTime.Add(latency * -1)
}

switch startPosition {
case "beginning":
if prevEndTime != int64(0) {
Expand Down
44 changes: 43 additions & 1 deletion x-pack/filebeat/input/awscloudwatch/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestGetStartPosition(t *testing.T) {
startPosition string
prevEndTime int64
scanFrequency time.Duration
latency time.Duration
expectedStartTime int64
expectedEndTime int64
}{
Expand All @@ -34,6 +35,7 @@ func TestGetStartPosition(t *testing.T) {
"beginning",
int64(0),
30 * time.Second,
0,
int64(0),
int64(1590969600000),
},
Expand All @@ -42,6 +44,7 @@ func TestGetStartPosition(t *testing.T) {
"end",
int64(0),
30 * time.Second,
0,
int64(1590969570000),
int64(1590969600000),
},
Expand All @@ -50,6 +53,7 @@ func TestGetStartPosition(t *testing.T) {
"typo",
int64(0),
30 * time.Second,
0,
int64(0),
int64(0),
},
Expand All @@ -58,6 +62,7 @@ func TestGetStartPosition(t *testing.T) {
"beginning",
int64(1590000000000),
30 * time.Second,
0,
int64(1590000000000),
int64(1590969600000),
},
Expand All @@ -66,14 +71,51 @@ func TestGetStartPosition(t *testing.T) {
"end",
int64(1590000000000),
30 * time.Second,
0,
int64(1590000000000),
int64(1590969600000),
},
{
"startPosition=beginning with latency",
"beginning",
int64(0),
30 * time.Second,
10 * time.Minute,
int64(0),
int64(1590969000000),
},
{
"startPosition=beginning with prevEndTime and latency",
"beginning",
int64(1590000000000),
30 * time.Second,
10 * time.Minute,
int64(1590000000000),
int64(1590969000000),
},
{
"startPosition=end with latency",
"end",
int64(0),
30 * time.Second,
10 * time.Minute,
int64(1590968970000),
int64(1590969000000),
},
{
"startPosition=end with prevEndTime and latency",
"end",
int64(1590000000000),
30 * time.Second,
10 * time.Minute,
int64(1590000000000),
int64(1590969000000),
},
}

for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
startTime, endTime := getStartPosition(c.startPosition, currentTime, c.prevEndTime, c.scanFrequency)
startTime, endTime := getStartPosition(c.startPosition, currentTime, c.prevEndTime, c.scanFrequency, c.latency)
assert.Equal(t, c.expectedStartTime, startTime)
assert.Equal(t, c.expectedEndTime, endTime)
})
Expand Down

0 comments on commit 6987cc8

Please sign in to comment.