From 22014e677136158fd1386833ee0f5f8fb86e30f7 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 17 Jul 2020 07:35:32 -0600 Subject: [PATCH 1/2] [Filebeat] remove delimiter \n from log line in s3 input (#19972) * remove delimiter \n from log line in s3 input (cherry picked from commit 9ca82b4cb92c37d82c11e9283ea596232044d22f) --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/s3/input.go | 23 ++++++++++-- x-pack/filebeat/input/s3/input_test.go | 49 +++++++++++++++++++++----- 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ad47bd52833f..612e7028e713 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -246,6 +246,7 @@ field. You can revert this change by configuring tags for the module and omittin - Fix memory leak in tcp and unix input sources. {pull}19459[19459] - Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149] - Fix bug with empty filter values in system/service {pull}19812[19812] +- Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972] *Heartbeat* diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index b52016865971..9f4db3d8a3c0 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -477,7 +477,13 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C // handle s3 objects that are not json content-type offset := 0 for { - log, err := reader.ReadString('\n') + log, err := readStringAndTrimDelimiter(reader) + if err != nil { + err = errors.Wrap(err, "readStringAndTrimDelimiter failed") + p.logger.Error(err) + return err + } + if log == "" { break } @@ -566,7 +572,8 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { vJSON, err := json.Marshal(jsonFields) - log := string(vJSON) + logOriginal := string(vJSON) + log := trimLogDelimiter(logOriginal) offset += len([]byte(log)) event := createEvent(log, offset, s3Info, objectHash, s3Ctx) @@ -609,6 +616,18 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s return nil } +func trimLogDelimiter(log string) string { + return strings.TrimSuffix(log, "\n") +} + +func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) { + logOriginal, err := reader.ReadString('\n') + if err != nil { + return logOriginal, err + } + return trimLogDelimiter(logOriginal), nil +} + func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { s3Ctx.Inc() diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 5eddbaad9564..c9ff86780dea 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -32,10 +32,12 @@ type MockS3Client struct { } var ( - s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png \n" - s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png \n" - mockSvc = &MockS3Client{} - info = s3Info{ + s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png\n" + s3LogString1Trimmed = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png" + s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png\n" + s3LogString2Trimmed = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png" + mockSvc = &MockS3Client{} + info = s3Info{ name: "test-s3-ks", key: "log2019-06-21-16-16-54", region: "us-west-1", @@ -182,15 +184,15 @@ func TestNewS3BucketReader(t *testing.T) { for i := 0; i < 3; i++ { switch i { case 0: - log, err := reader.ReadString('\n') + log, err := readStringAndTrimDelimiter(reader) assert.NoError(t, err) - assert.Equal(t, s3LogString1, log) + assert.Equal(t, s3LogString1Trimmed, log) case 1: - log, err := reader.ReadString('\n') + log, err := readStringAndTrimDelimiter(reader) assert.NoError(t, err) - assert.Equal(t, s3LogString2, log) + assert.Equal(t, s3LogString2Trimmed, log) case 2: - log, err := reader.ReadString('\n') + log, err := readStringAndTrimDelimiter(reader) assert.Error(t, io.EOF, err) assert.Equal(t, "", log) } @@ -373,3 +375,32 @@ May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED }) } } + +func TestTrimLogDelimiter(t *testing.T) { + cases := []struct { + title string + logOriginal string + expectedLog string + }{ + {"string with delimiter", + `test +`, + "test", + }, + {"string without delimiter", + "test", + "test", + }, + {"string just with delimiter", + ` +`, + "", + }, + } + for _, c := range cases { + t.Run(c.title, func(t *testing.T) { + log := trimLogDelimiter(c.logOriginal) + assert.Equal(t, c.expectedLog, log) + }) + } +} From e50d805e355e8b33dcb6326ac77cd19064fa0d37 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 20 Jul 2020 08:55:00 -0600 Subject: [PATCH 2/2] backport #20041 --- x-pack/filebeat/input/s3/input.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 9f4db3d8a3c0..e74800ae1277 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -478,16 +478,6 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C offset := 0 for { log, err := readStringAndTrimDelimiter(reader) - if err != nil { - err = errors.Wrap(err, "readStringAndTrimDelimiter failed") - p.logger.Error(err) - return err - } - - if log == "" { - break - } - if err == io.EOF { // create event for last line offset += len([]byte(log)) @@ -500,11 +490,15 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C } return nil } else if err != nil { - err = errors.Wrap(err, "ReadString failed") + err = errors.Wrap(err, "readStringAndTrimDelimiter failed") p.logger.Error(err) return err } + if log == "" { + break + } + // create event per log line offset += len([]byte(log)) event := createEvent(log, offset, info, objectHash, s3Ctx)