Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add timeout to GetObjectRequest for s3 input #15590

Merged
merged 22 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
445aaf2
Add timeout to GetObjectRequest which will cancel the request if it t…
kaiyan-sheng Jan 15, 2020
90350ff
Add changelog
kaiyan-sheng Jan 15, 2020
269ea66
Add context_timeout into config for all aws filesets
kaiyan-sheng Jan 16, 2020
32678e7
fix unit test
kaiyan-sheng Jan 16, 2020
3093ffa
Merge remote-tracking branch 'upstream/master' into handle_s3_objects
kaiyan-sheng Jan 16, 2020
dee7785
Close resp.Body from S3 GetObject API to prevent resource leak
kaiyan-sheng Jan 17, 2020
adf908a
fix unit test
kaiyan-sheng Jan 17, 2020
b211663
close resp.Body after getS3ObjectResponse function
kaiyan-sheng Jan 21, 2020
c0c511e
use context.WithTimeout for GetObjectRequest
kaiyan-sheng Jan 21, 2020
48e85f2
Add timeout to p.context
kaiyan-sheng Jan 22, 2020
5859549
Merge remote-tracking branch 'upstream/master' into handle_s3_objects
kaiyan-sheng Jan 22, 2020
bb5201f
Fix unit test for s3 input
kaiyan-sheng Jan 22, 2020
1a09c74
Add cancelFn for context.WithTimeout
kaiyan-sheng Jan 23, 2020
a1e4e18
Change context_timeout to aws_api_timeout
kaiyan-sheng Jan 23, 2020
fb07d96
create separate ctx with timeout for each request
kaiyan-sheng Jan 23, 2020
1c63a8f
Change AwsApiTimeout to AwsAPITimeout
kaiyan-sheng Jan 23, 2020
47b6fbe
fix typo
kaiyan-sheng Jan 23, 2020
8b7c544
Fix aws_api_timeout name in configs
kaiyan-sheng Jan 24, 2020
2b85538
Change back to use channelContext
kaiyan-sheng Jan 24, 2020
cbb92e7
Remove predefine cancelFn
kaiyan-sheng Jan 24, 2020
f4f8cbd
improve info message
kaiyan-sheng Jan 24, 2020
1cafa79
Change aws_api_timeout to api_timeout
kaiyan-sheng Jan 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ TLS or Beats that accept connections over TLS and validate client certificates.

*Filebeat*

- Fix s3 input hanging with GetObjectRequest API call by adding context_timeout config. {issue}15502[15502] {pull}15590[15590]

*Heartbeat*

Expand Down
12 changes: 10 additions & 2 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ URL of the AWS SQS queue that messages will be received from. Required.
[float]
==== `visibility_timeout`

The duration (in seconds) that the received messages are hidden from subsequent
The duration that the received messages are hidden from subsequent
retrieve requests after being retrieved by a ReceiveMessage request.
This value needs to be a lot bigger than filebeat collection frequency so
This value needs to be a lot bigger than Filebeat collection frequency so
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
if it took too long to read the s3 log, this sqs message will not be reprocessed.
The default visibility timeout for a message is 300 seconds. The minimum
is 0 seconds. The maximum is 12 hours.
Expand All @@ -61,6 +61,14 @@ can be assigned the name of the field. This setting will be able to split the
messages under the group value into separate events. For example, CloudTrail logs
are in JSON format and events are found under the JSON object "Records":

[float]
==== `context_timeout`

The maximum duration of GetObjectRequest AWS API can take. If it exceeds the
timeout, GetObjectRequest will be interrupted.
The default context timeout for a message is 120 seconds. The minimum
is 0 seconds. The maximum is half of the visibility timeout value.

["source","json"]
----
{
Expand Down
32 changes: 32 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ filebeat.modules:
# Profile name for aws credential
#var.credential_profile_name: fb-aws

# The duration that the received messages are hidden from ReceiveMessage request
# Default to be 300s
#var.visibility_timeout: 300s
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved

# Maximum duration before GetObject request will be interrupted by context
# Default to be 120s
#var.context_timeout: 120s

elb:
enabled: false

Expand All @@ -114,6 +122,14 @@ filebeat.modules:
# Profile name for aws credential
#var.credential_profile_name: fb-aws

# The duration that the received messages are hidden from ReceiveMessage request
# Default to be 300s
#var.visibility_timeout: 300s

# Maximum duration before GetObject request will be interrupted by context
# Default to be 120s
#var.context_timeout: 120s

vpcflow:
enabled: false

Expand All @@ -123,6 +139,14 @@ filebeat.modules:
# Profile name for aws credential
#var.credential_profile_name: fb-aws

# The duration that the received messages are hidden from ReceiveMessage request
# Default to be 300s
#var.visibility_timeout: 300s

# Maximum duration before GetObject request will be interrupted by context
# Default to be 120s
#var.context_timeout: 120s

cloudtrail:
enabled: false

Expand All @@ -132,6 +156,14 @@ filebeat.modules:
# Profile name for aws credential
#var.credential_profile_name: fb-aws

# The duration that the received messages are hidden from ReceiveMessage request
# Default to be 300s
#var.visibility_timeout: 300s

# Maximum duration before GetObject request will be interrupted by context
# Default to be 120s
#var.context_timeout: 120s

#-------------------------------- Azure Module --------------------------------
- module: azure
# All logs
Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type config struct {
VisibilityTimeout time.Duration `config:"visibility_timeout"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
ContextTimeout time.Duration `config:"context_timeout"`
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
}

func defaultConfig() config {
Expand All @@ -26,6 +27,7 @@ func defaultConfig() config {
Type: "s3",
},
VisibilityTimeout: 300 * time.Second,
ContextTimeout: 120 * time.Second,
}
}

Expand All @@ -34,5 +36,9 @@ func (c *config) Validate() error {
return fmt.Errorf("visibility timeout %v is not within the "+
"required range 0s to 12h", c.VisibilityTimeout)
}
if c.ContextTimeout < 0 || c.ContextTimeout > c.VisibilityTimeout/2 {
return fmt.Errorf("context timeout %v needs to be larger than"+
" 0s and smaller than half of the visibility timeout", c.ContextTimeout)
}
return nil
}
109 changes: 70 additions & 39 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con
func (p *s3Input) Run() {
p.workerOnce.Do(func() {
visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds())
p.logger.Infof("visibility timeout is set to %v seconds: ", visibilityTimeout)
p.logger.Infof("context timeout is set to %v: ", p.config.ContextTimeout)

regionName, err := getRegionFromQueueURL(p.config.QueueURL)
if err != nil {
p.logger.Errorf("failed to get region name from queueURL: %v", p.config.QueueURL)
Expand Down Expand Up @@ -198,7 +201,7 @@ func (p *s3Input) run(svcSQS sqsiface.ClientAPI, svcS3 s3iface.ClientAPI, visibi
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled {
continue
}
p.logger.Error("failed to receive message from SQS:", err)
p.logger.Error("failed to receive message from SQS: ", err)
time.Sleep(time.Duration(waitTimeSecond) * time.Second)
continue
}
Expand Down Expand Up @@ -272,12 +275,12 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess
return
case err := <-errC:
if err != nil {
p.logger.Warnf("Processing message failed: %v", err)
p.logger.Warn("Processing message failed, updating visibility timeout")
err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle)
if err != nil {
p.logger.Error(errors.Wrap(err, "change message visibility failed"))
}
p.logger.Warnf("Message visibility timeout updated to %v", visibilityTimeout)
p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout)
} else {
// When ACK done, message will be deleted. Or when message is
// not s3 ObjectCreated event related(handleSQSMessage function
Expand Down Expand Up @@ -372,15 +375,49 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
objectHash := s3ObjectHash(s3Info)

// read from s3 object
reader, err := p.newS3BucketReader(svc, s3Info)
resp, err := p.getS3ObjectResponse(svc, s3Info)
if err != nil {
err = errors.Wrap(err, "newS3BucketReader failed")
p.logger.Error(err)
s3Context.setError(err)
return err
}

if reader == nil {
continue
if resp == nil {
resp.Body.Close()
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
reader := bufio.NewReader(resp.Body)
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
// Check content-type
if resp.ContentType != nil {
switch *resp.ContentType {
case "application/x-gzip":
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
err = errors.Wrapf(err, "Failed to decompress application/x-gzip file %v", s3Info.key)
p.logger.Error(err)
s3Context.setError(err)
resp.Body.Close()
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
default:
reader = bufio.NewReader(resp.Body)
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
}
} else if strings.HasSuffix(s3Info.key, ".gz") {
// If there is no content-type, check file name instead.
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
err = errors.Wrapf(err, "Failed to decompress file with .gz suffix %v", s3Info.key)
p.logger.Error(err)
s3Context.setError(err)
resp.Body.Close()
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
}

// Decode JSON documents when expand_event_list_from_field is given in config
Expand All @@ -389,9 +426,12 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context)
if err != nil {
err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key)
p.logger.Error(err)
s3Context.setError(err)
resp.Body.Close()
return err
}
resp.Body.Close()
return nil
}

Expand All @@ -410,13 +450,18 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key)
p.logger.Error(err)
s3Context.setError(err)
resp.Body.Close()
return err
}
resp.Body.Close()
return nil
} else if err != nil {
err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key)
p.logger.Error(err)
s3Context.setError(err)
resp.Body.Close()
return err
}

Expand All @@ -426,12 +471,13 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key)
p.logger.Error(err)
s3Context.setError(err)
resp.Body.Close()
return err
}
}
}

return nil
}

Expand Down Expand Up @@ -501,18 +547,29 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH
return nil
}

func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufio.Reader, error) {
func (p *s3Input) getS3ObjectResponse(svc s3iface.ClientAPI, s3Info s3Info) (*s3.GetObjectResponse, error) {
// Create a context with a timeout that will abort the download if it takes
// more than the default timeout 2 minute.
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, p.config.ContextTimeout)

// Download the S3 object using GetObjectRequest. The Context will interrupt
// the request if the timeout expires.
s3GetObjectInput := &s3.GetObjectInput{
Bucket: awssdk.String(s3Info.name),
Key: awssdk.String(s3Info.key),
}
req := svc.GetObjectRequest(s3GetObjectInput)

resp, err := req.Send(p.context)
resp, err := req.Send(ctx)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
// If the SDK can determine the request or retry delay was canceled
// by a context the ErrCodeRequestCanceled error will be returned.
if awsErr.Code() == awssdk.ErrCodeRequestCanceled {
return nil, nil
err = errors.Wrapf(err, "GetObject of s3 file with key %v failed due to timeout", s3Info.key)
p.logger.Error(err)
return nil, err
}

if awsErr.Code() == "NoSuchKey" {
Expand All @@ -522,34 +579,7 @@ func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufi
}
return nil, errors.Wrapf(err, "s3 get object request failed %v", s3Info.key)
}

if resp.Body == nil {
return nil, errors.New("s3 get object response body is empty")
}

// Check content-type
if resp.ContentType != nil {
switch *resp.ContentType {
case "application/x-gzip":
reader, err := gzip.NewReader(resp.Body)
if err != nil {
return nil, errors.Wrapf(err, "Failed to decompress gzipped file %v", s3Info.key)
}
return bufio.NewReader(reader), nil
default:
return bufio.NewReader(resp.Body), nil
}
}

// If there is no content-type, check file name instead.
if strings.HasSuffix(s3Info.key, ".gz") {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
return nil, errors.Wrapf(err, "Failed to decompress gzipped file %v", s3Info.key)
}
return bufio.NewReader(gzipReader), nil
}
return bufio.NewReader(resp.Body), nil
return resp, nil
}

func (p *s3Input) forwardEvent(event beat.Event) error {
Expand All @@ -567,7 +597,8 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s
}

req := svcSQS.DeleteMessageRequest(deleteMessageInput)
_, err := req.Send(p.context)
ctx := context.Background()
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
_, err := req.Send(ctx)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled {
return nil
Expand Down
Loading