Skip to content

Commit

Permalink
[Filebeat] Handle error message in handleS3Objects function (elastic#…
Browse files Browse the repository at this point in the history
…15545)

* Handle error message in handleS3Objects function

* remove s3Context.Fail and use setError and done instead

* Add changelog
  • Loading branch information
kaiyan-sheng authored Jan 14, 2020
1 parent 8bf06e3 commit 2228af4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix SSL config in input.yml for Filebeat httpjson input in the MISP module. {pull}14767[14767]
- Check content-type when creating new reader in s3 input. {pull}15252[15252] {issue}15225[15225]
- Fix session reset detection and a crash in Netflow input. {pull}14904[14904]
- Handle errors in handleS3Objects function and add more debug messages for s3 input. {pull}15545[15545]
- netflow: Allow for options templates without scope fields. {pull}15449[15449]
- netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449]
- netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449]
Expand Down
28 changes: 16 additions & 12 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (p *s3Input) Wait() {
func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityTimeout int64, svcS3 s3iface.ClientAPI, svcSQS sqsiface.ClientAPI) {
var wg sync.WaitGroup
numMessages := len(messages)
p.logger.Debugf("Processing %v messages", numMessages)
wg.Add(numMessages * 2)

// process messages received from sqs
Expand All @@ -251,14 +252,16 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w
p.logger.Error(errors.Wrap(err, "handleSQSMessage failed"))
return
}
p.logger.Debugf("handleSQSMessage succeed and returned %v sets of S3 log info", len(s3Infos))

// read from s3 object and create event for each log line
err = p.handleS3Objects(svcS3, s3Infos, errC)
if err != nil {
err = errors.Wrap(err, "handleS3Objects failed")
p.logger.Error(err)
errC <- err
return
}
p.logger.Debugf("handleS3Objects succeed")
}

func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Message, queueURL string, visibilityTimeout int64, wg *sync.WaitGroup, errC chan error) {
Expand Down Expand Up @@ -288,13 +291,14 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess
}
return
case <-time.After(time.Duration(visibilityTimeout/2) * time.Second):
p.logger.Warn("Half of the set visibilityTimeout passed, visibility timeout needs to be updated")
// If half of the set visibilityTimeout passed and this is
// still ongoing, then change visibility timeout.
err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle)
if err != nil {
p.logger.Error(errors.Wrap(err, "change message visibility failed"))
}
p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout)
p.logger.Infof("Message visibility timeout updated to %v seconds", visibilityTimeout)
}
}
}
Expand Down Expand Up @@ -370,8 +374,11 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
// read from s3 object
reader, err := p.newS3BucketReader(svc, s3Info)
if err != nil {
return errors.Wrap(err, "newS3BucketReader failed")
err = errors.Wrap(err, "newS3BucketReader failed")
s3Context.setError(err)
return err
}

if reader == nil {
continue
}
Expand All @@ -382,7 +389,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context)
if err != nil {
err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key)
s3Context.Fail(err)
s3Context.setError(err)
return err
}
return nil
Expand All @@ -403,12 +410,14 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key)
s3Context.Fail(err)
s3Context.setError(err)
return err
}
return nil
} else if err != nil {
return errors.Wrapf(err, "ReadString failed for %v", s3Info.key)
err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key)
s3Context.setError(err)
return err
}

// create event per log line
Expand All @@ -417,7 +426,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key)
s3Context.Fail(err)
s3Context.setError(err)
return err
}
}
Expand Down Expand Up @@ -610,11 +619,6 @@ func s3ObjectHash(s3Info s3Info) string {
return prefix[:10]
}

func (c *s3Context) Fail(err error) {
c.setError(err)
c.done()
}

func (c *s3Context) setError(err error) {
// only care about the last error for now
// TODO: add "Typed" error to error for context
Expand Down

0 comments on commit 2228af4

Please sign in to comment.