From 74ac00e17c8f692921898b617290677b0747e290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 3 Sep 2018 21:09:32 +0200 Subject: [PATCH] Cherry-pick #7991 to 6.x: Add tag "truncated" to "log.flags" if incoming line is longer than configured limit (#8165) * Add tag "truncated" to "log.flags" if incoming line is longer than configured limit (#7991) A new field is added to store the flags of an event named "log.flags". If a message is truncated, "truncated" flag is added to the list. Example event with "truncated" flag: { "@timestamp": "2018-08-16T13:00:46.759Z", "@metadata": { "beat": "filebeat", "type": "doc", "version": "7.0.0-alpha1" }, "host": { "name": "sleipnir" }, "source": "/home/n/test.log", "offset": 33, "log": { "flags": [ "truncated" ], }, "message": "test line", "prospector": { "type": "log" }, "input": { "type": "log" }, "beat": { "hostname": "sleipnir", "version": "7.0.0-alpha1", "name": "sleipnir" } } Closes #7022 (cherry picked from commit 08842368ca744ee682e1e20f338774c8c69fe931) * fix changelog && rebase --- CHANGELOG-developer.asciidoc | 1 + CHANGELOG.asciidoc | 2 + filebeat/_meta/fields.common.yml | 4 + filebeat/docs/fields.asciidoc | 8 ++ filebeat/include/fields.go | 2 +- filebeat/reader/message.go | 23 +++++- filebeat/reader/multiline/multiline.go | 17 ++++ filebeat/reader/multiline/multiline_test.go | 83 +++++++++++++++++++ filebeat/reader/readfile/limit.go | 1 + filebeat/reader/readfile/limit_test.go | 88 +++++++++++++++++++++ libbeat/common/mapstr.go | 27 +++++-- libbeat/common/mapstr_test.go | 73 +++++++++++++++++ 12 files changed, 317 insertions(+), 12 deletions(-) create mode 100644 filebeat/reader/readfile/limit_test.go diff --git a/CHANGELOG-developer.asciidoc b/CHANGELOG-developer.asciidoc index cf844df59cfa..96f2db8ef2c2 100644 --- a/CHANGELOG-developer.asciidoc +++ b/CHANGELOG-developer.asciidoc @@ -39,3 +39,4 @@ The list below covers the major changes between 6.3.0 and master only. - Libbeat provides a global registry for beats developer that allow to register and retrieve plugin. {pull}7392[7392] - Added more options to control required and optional fields in schema.Apply(), error returned is a plain nil if no error happened {pull}7335[7335] - Packaging on MacOS now produces a .dmg file containing an installer (.pkg) and uninstaller for the Beat. {pull}7481[7481] +- New function `AddTagsWithKey` is added, so `common.MapStr` can be enriched with tags with an arbitrary key. {pull}7991[7991] diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6f2f2de98f1d..9579c669cc39 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -83,6 +83,8 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] *Filebeat* +- Add tag "truncated" to "log.flags" if incoming line is longer than configured limit. {pull}7991[7991] + *Heartbeat* *Metricbeat* diff --git a/filebeat/_meta/fields.common.yml b/filebeat/_meta/fields.common.yml index 5781e4978792..930e0f67f906 100644 --- a/filebeat/_meta/fields.common.yml +++ b/filebeat/_meta/fields.common.yml @@ -108,6 +108,10 @@ description: > Logging level. + - name: log.flags + description: > + This field contains the flags of the event. + - name: event.created type: date description: > diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index edd866198bc6..05ac09dce9a0 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -2652,6 +2652,14 @@ type: keyword Logging level. +-- + +*`log.flags`*:: ++ +-- +This field contains the flags of the event. + + -- *`event.created`*:: diff --git a/filebeat/include/fields.go b/filebeat/include/fields.go index 54f7edfc95dc..fda0f3f48415 100644 --- a/filebeat/include/fields.go +++ b/filebeat/include/fields.go @@ -31,5 +31,5 @@ func init() { // Asset returns asset data func Asset() string { - return "" + return "" } diff --git a/filebeat/reader/message.go b/filebeat/reader/message.go index c5e965b10d04..a92bc9092ccf 100644 --- a/filebeat/reader/message.go +++ b/filebeat/reader/message.go @@ -49,13 +49,28 @@ func (m *Message) IsEmpty() bool { return false } -func (msg *Message) AddFields(fields common.MapStr) { +// AddFields adds fields to the message. +func (m *Message) AddFields(fields common.MapStr) { if fields == nil { return } - if msg.Fields == nil { - msg.Fields = common.MapStr{} + if m.Fields == nil { + m.Fields = common.MapStr{} } - msg.Fields.Update(fields) + m.Fields.Update(fields) +} + +// AddFlagsWithKey adds flags to the message with an arbitrary key. +// If the field does not exist, it is created. +func (m *Message) AddFlagsWithKey(key string, flags ...string) error { + if len(flags) == 0 { + return nil + } + + if m.Fields == nil { + m.Fields = common.MapStr{} + } + + return common.AddTagsWithKey(m.Fields, key, flags) } diff --git a/filebeat/reader/multiline/multiline.go b/filebeat/reader/multiline/multiline.go index 57209be94cdc..2e974c19b235 100644 --- a/filebeat/reader/multiline/multiline.go +++ b/filebeat/reader/multiline/multiline.go @@ -48,6 +48,7 @@ type Reader struct { separator []byte last []byte numLines int + truncated int err error // last seen error state func(*Reader) (reader.Message, error) message reader.Message @@ -262,13 +263,19 @@ func (mlr *Reader) clear() { mlr.message = reader.Message{} mlr.last = nil mlr.numLines = 0 + mlr.truncated = 0 mlr.err = nil } // finalize writes the existing content into the returned message and resets all reader variables. func (mlr *Reader) finalize() reader.Message { + if mlr.truncated > 0 { + mlr.message.AddFlagsWithKey("log.flags", "truncated") + } + // Copy message from existing content msg := mlr.message + mlr.clear() return msg } @@ -303,6 +310,16 @@ func (mlr *Reader) addLine(m reader.Message) { } mlr.message.Content = append(tmp, m.Content[:space]...) mlr.numLines++ + + // add number of truncated bytes to fields + diff := len(m.Content) - space + if diff > 0 { + mlr.truncated += diff + } + } else { + // increase the number of skipped bytes, if cannot add + mlr.truncated += len(m.Content) + } mlr.last = m.Content diff --git a/filebeat/reader/multiline/multiline_test.go b/filebeat/reader/multiline/multiline_test.go index 6fe05fde0d28..1c6ea96c6f38 100644 --- a/filebeat/reader/multiline/multiline_test.go +++ b/filebeat/reader/multiline/multiline_test.go @@ -150,6 +150,41 @@ func TestMultilineBeforeNegateOKWithEmptyLine(t *testing.T) { ) } +func TestMultilineAfterTruncated(t *testing.T) { + pattern := match.MustCompile(`^[ ]`) // next line is indented a space + maxLines := 2 + testMultilineTruncated(t, + Config{ + Pattern: &pattern, + Match: "after", + MaxLines: &maxLines, + }, + 2, + true, + []string{ + "line1\n line1.1\n line1.2\n", + "line2\n line2.1\n line2.2\n"}, + []string{ + "line1\n line1.1", + "line2\n line2.1"}, + ) + testMultilineTruncated(t, + Config{ + Pattern: &pattern, + Match: "after", + MaxLines: &maxLines, + }, + 2, + false, + []string{ + "line1\n line1.1\n", + "line2\n line2.1\n"}, + []string{ + "line1\n line1.1", + "line2\n line2.1"}, + ) +} + func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) { _, buf := createLineBuffer(expected...) r := createMultilineTestReader(t, buf, cfg) @@ -177,6 +212,54 @@ func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) { } } +func testMultilineTruncated(t *testing.T, cfg Config, events int, truncated bool, input, expected []string) { + _, buf := createLineBuffer(input...) + r := createMultilineTestReader(t, buf, cfg) + + var messages []reader.Message + for { + message, err := r.Next() + if err != nil { + break + } + + messages = append(messages, message) + } + + if len(messages) != events { + t.Fatalf("expected %v lines, read only %v line(s)", len(expected), len(messages)) + } + + for _, message := range messages { + found := false + statusFlags, err := message.Fields.GetValue("log.flags") + if err != nil { + if !truncated { + assert.False(t, found) + return + } + t.Fatalf("error while getting log.status field: %v", err) + } + + switch flags := statusFlags.(type) { + case []string: + for _, f := range flags { + if f == "truncated" { + found = true + } + } + default: + t.Fatalf("incorrect type for log.flags") + } + + if truncated { + assert.True(t, found) + } else { + assert.False(t, found) + } + } +} + func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reader.Reader { encFactory, ok := encoding.FindEncoding("plain") if !ok { diff --git a/filebeat/reader/readfile/limit.go b/filebeat/reader/readfile/limit.go index 1d7b46e2a471..42f9635ba124 100644 --- a/filebeat/reader/readfile/limit.go +++ b/filebeat/reader/readfile/limit.go @@ -38,6 +38,7 @@ func (r *LimitReader) Next() (reader.Message, error) { message, err := r.reader.Next() if len(message.Content) > r.maxBytes { message.Content = message.Content[:r.maxBytes] + message.AddFlagsWithKey("log.flags", "truncated") } return message, err } diff --git a/filebeat/reader/readfile/limit_test.go b/filebeat/reader/readfile/limit_test.go new file mode 100644 index 000000000000..c7096cb938f3 --- /dev/null +++ b/filebeat/reader/readfile/limit_test.go @@ -0,0 +1,88 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package readfile + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/reader" +) + +type mockReader struct { + line []byte +} + +func (m *mockReader) Next() (reader.Message, error) { + return reader.Message{ + Content: m.line, + }, nil +} + +var limitTests = []struct { + line string + maxBytes int + truncated bool +}{ + {"long-long-line", 5, true}, + {"long-long-line", 3, true}, + {"long-long-line", len("long-long-line"), false}, +} + +func TestLimitReader(t *testing.T) { + for _, test := range limitTests { + r := NewLimitReader(&mockReader{[]byte(test.line)}, test.maxBytes) + + msg, err := r.Next() + if err != nil { + t.Fatalf("Error reading from mock reader: %v", err) + } + + assert.Equal(t, test.maxBytes, len(msg.Content)) + + found := false + statusFlags, err := msg.Fields.GetValue("log.flags") + if err != nil { + if !test.truncated { + assert.False(t, found) + return + } + t.Fatalf("Error getting truncated value: %v", err) + } + + switch flags := statusFlags.(type) { + case []string: + for _, f := range flags { + if f == "truncated" { + found = true + } + } + default: + t.Fatalf("incorrect type for log.flags") + } + + if test.truncated { + assert.True(t, found) + } else { + assert.False(t, found) + } + } +} diff --git a/libbeat/common/mapstr.go b/libbeat/common/mapstr.go index 0bc2d4b2a161..2b825fd67e39 100644 --- a/libbeat/common/mapstr.go +++ b/libbeat/common/mapstr.go @@ -305,25 +305,38 @@ func MergeFields(ms, fields MapStr, underRoot bool) error { // exist then it will be created. If the tags field exists and is not a []string // then an error will be returned. It does not deduplicate the list of tags. func AddTags(ms MapStr, tags []string) error { + return AddTagsWithKey(ms, TagsKey, tags) +} + +// AddTagsWithKey appends a tag to the key field of ms. If the field does not +// exist then it will be created. If the field exists and is not a []string +// then an error will be returned. It does not deduplicate the list. +func AddTagsWithKey(ms MapStr, key string, tags []string) error { if ms == nil || len(tags) == 0 { return nil } - eventTags, exists := ms[TagsKey] - if !exists { - ms[TagsKey] = tags + + k, subMap, oldTags, present, err := mapFind(key, ms, true) + if err != nil { + return err + } + + if !present { + subMap[k] = tags return nil } - switch arr := eventTags.(type) { + switch arr := oldTags.(type) { case []string: - ms[TagsKey] = append(arr, tags...) + subMap[k] = append(arr, tags...) case []interface{}: for _, tag := range tags { arr = append(arr, tag) } - ms[TagsKey] = arr + subMap[k] = arr default: - return errors.Errorf("expected string array by type is %T", eventTags) + return errors.Errorf("expected string array by type is %T", oldTags) + } return nil } diff --git a/libbeat/common/mapstr_test.go b/libbeat/common/mapstr_test.go index 9002e5dfe03c..bb582c53b761 100644 --- a/libbeat/common/mapstr_test.go +++ b/libbeat/common/mapstr_test.go @@ -546,6 +546,79 @@ func TestAddTag(t *testing.T) { } } +func TestAddTagsWithKey(t *testing.T) { + type io struct { + Event MapStr + Key string + Tags []string + Output MapStr + Err string + } + tests := []io{ + // No existing tags, creates new tag array + { + Event: MapStr{}, + Key: "tags", + Tags: []string{"json"}, + Output: MapStr{ + "tags": []string{"json"}, + }, + }, + // Existing tags is a []string, appends + { + Event: MapStr{ + "tags": []string{"json"}, + }, + Key: "tags", + Tags: []string{"docker"}, + Output: MapStr{ + "tags": []string{"json", "docker"}, + }, + }, + // Existing tags are in submap and is a []interface{}, appends + { + Event: MapStr{ + "log": MapStr{ + "flags": []interface{}{"json"}, + }, + }, + Key: "log.flags", + Tags: []string{"docker"}, + Output: MapStr{ + "log": MapStr{ + "flags": []interface{}{"json", "docker"}, + }, + }, + }, + // Existing tags are in a submap and is not a []string or []interface{} + { + Event: MapStr{ + "log": MapStr{ + "flags": "not a slice", + }, + }, + Key: "log.flags", + Tags: []string{"docker"}, + Output: MapStr{ + "log": MapStr{ + "flags": "not a slice", + }, + }, + Err: "expected string array", + }, + } + + for _, test := range tests { + err := AddTagsWithKey(test.Event, test.Key, test.Tags) + assert.Equal(t, test.Output, test.Event) + if test.Err != "" { + assert.Contains(t, err.Error(), test.Err) + } else { + assert.NoError(t, err) + } + } +} + func TestFlatten(t *testing.T) { type data struct { Event MapStr