Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new mode to multiline reader to aggregate constant number of lines #18352

Merged
merged 15 commits into from
Jun 17, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improved performance of PANW sample dashboards. {issue}19031[19031] {pull}19032[19032]
- Add support for v1 consumer API in Cloud Foundry input, use it by default. {pull}19125[19125]
- Explicitly set ECS version in all Filebeat modules. {pull}19198[19198]
- Add new mode to multiline reader to aggregate constant number of lines {pull}18352[18352]

*Heartbeat*

Expand Down
9 changes: 9 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ filebeat.inputs:
# Default is 5s.
#multiline.timeout: 5s

# To aggregate constant number of lines into a single event use the count mode of multiline.
#multiline.type: count

# The number of lines to aggregate into a single event.
#multiline.count_lines: 3

# Do not add new line character when concatenating lines.
#multiline.skip_newline: false

# Setting tail_files to true means filebeat starts reading new files at the end
# instead of the beginning. If this is used in combination with log rotation
# this can mean that the first entries of a new file are skipped.
Expand Down
12 changes: 12 additions & 0 deletions filebeat/docs/multiline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The following example shows how to configure {beatname_uc} to handle a multiline

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
Expand All @@ -46,6 +47,8 @@ multiline.match: after
at org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction.checkBlock(TransportDeleteIndexAction.java:75)
-------------------------------------------------------------------------------------

*`multiline.type`*:: Defines which aggregation method to use. The default is `pattern`. The other option
is `count` which lets you aggregate constant number of lines.

*`multiline.pattern`*:: Specifies the regular expression pattern to match. Note that the regexp patterns supported by {beatname_uc}
differ somewhat from the patterns supported by Logstash. See <<regexp-support>> for a list of supported regexp patterns.
Expand Down Expand Up @@ -76,6 +79,10 @@ lines are discarded. The default is 500.

*`multiline.timeout`*:: After the specified timeout, {beatname_uc} sends the multiline event even if no new pattern is found to start a new event. The default is 5s.

*`multiline.count_lines`*:: The number of lines to aggregate into a single event.

*`multiline.skip_newline`*:: When set, multiline events are concatenated without a line separator.


==== Examples of multiline configuration

Expand Down Expand Up @@ -103,6 +110,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '^[[:space:]]'
multiline.negate: false
multiline.match: after
Expand All @@ -127,6 +135,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
multiline.negate: false
multiline.match: after
Expand All @@ -153,6 +162,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '\\$'
multiline.negate: false
multiline.match: before
Expand All @@ -176,6 +186,7 @@ To consolidate these lines into a single event in {beatname_uc}, use the followi

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
Expand All @@ -200,6 +211,7 @@ To consolidate this as a single event in {beatname_uc}, use the following multil

[source,yaml]
-------------------------------------------------------------------------------------
multiline.type: pattern
multiline.pattern: 'Start new event'
multiline.negate: true
multiline.match: after
Expand Down
9 changes: 9 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,15 @@ filebeat.inputs:
# Default is 5s.
#multiline.timeout: 5s

# To aggregate constant number of lines into a single event use the count mode of multiline.
#multiline.type: count

# The number of lines to aggregate into a single event.
#multiline.count_lines: 3

# Do not add new line character when concatenating lines.
#multiline.skip_newline: false

# Setting tail_files to true means filebeat starts reading new files at the end
# instead of the beginning. If this is used in combination with log rotation
# this can mean that the first entries of a new file are skipped.
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ filebeat.{{input_config | default("inputs")}}:

{% if multiline %}
multiline:
type: {{multiline_type}}
pattern: {{pattern}}
negate: {{negate}}
match: {{match}}
Expand Down
11 changes: 10 additions & 1 deletion filebeat/tests/system/test_multiline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_java_elasticsearch_log(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after"
Expand Down Expand Up @@ -48,6 +49,7 @@ def test_c_style_log(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="\\\\$",
match="before"
)
Expand Down Expand Up @@ -78,6 +80,7 @@ def test_rabbitmq_multiline_log(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^=[A-Z]+",
match="after",
negate="true",
Expand Down Expand Up @@ -122,6 +125,7 @@ def test_max_lines(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -160,6 +164,7 @@ def test_timeout(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -204,6 +209,7 @@ def test_max_bytes(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -240,6 +246,7 @@ def test_close_timeout_with_multiline(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -295,6 +302,7 @@ def test_consecutive_newline(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
multiline_type="pattern",
pattern="^\[",
negate="true",
match="after",
Expand Down Expand Up @@ -340,11 +348,12 @@ def test_invalid_config(self):
self.render_config_template(
path=os.path.abspath(self.working_dir + "/log/") + "*",
multiline=True,
multiline_type="pattern",
match="after",
)

proc = self.start_beat()

self.wait_until(lambda: self.log_contains("missing required field accessing") == 1)
self.wait_until(lambda: self.log_contains("multiline.pattern cannot be empty") == 1)

proc.check_kill_and_wait(exit_code=1)
133 changes: 133 additions & 0 deletions libbeat/reader/multiline/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package multiline

import (
"github.com/elastic/beats/v7/libbeat/reader"
)

type counterReader struct {
reader reader.Reader
state func(*counterReader) (reader.Message, error)
linesCount int // number of lines to collect
msgBuffer *messageBuffer
}

func newMultilineCountReader(
r reader.Reader,
separator string,
maxBytes int,
config *Config,
) (reader.Reader, error) {
maxLines := config.LinesCount
if l := config.MaxLines; l != nil && 0 < *l {
maxLines = *l
}

return &counterReader{
reader: r,
state: (*counterReader).readFirst,
linesCount: config.LinesCount,
msgBuffer: newMessageBuffer(maxBytes, maxLines, []byte(separator), config.SkipNewLine),
}, nil
}

// Next returns next multi-line event.
func (cr *counterReader) Next() (reader.Message, error) {
return cr.state(cr)
}

func (cr *counterReader) readFirst() (reader.Message, error) {
for {
message, err := cr.reader.Next()
if err != nil {
return message, err
}

if message.Bytes == 0 {
continue
}

cr.msgBuffer.startNewMessage(message)
if cr.msgBuffer.processedLines == cr.linesCount {
msg := cr.msgBuffer.finalize()
return msg, nil
}

cr.setState((*counterReader).readNext)
return cr.readNext()
}
}

func (cr *counterReader) readNext() (reader.Message, error) {
for {
message, err := cr.reader.Next()
if err != nil {
// handle error without any bytes returned from reader
if message.Bytes == 0 {
// no lines buffered -> return error
if cr.msgBuffer.isEmpty() {
return reader.Message{}, err
}

// lines buffered, return multiline and error on next read
msg := cr.msgBuffer.finalize()
cr.msgBuffer.setErr(err)
cr.setState((*counterReader).readFailed)
return msg, nil
}

// handle error with some content being returned by reader and
// line matching multiline criteria or no multiline started yet
if cr.msgBuffer.isEmptyMessage() {
cr.msgBuffer.addLine(message)

// return multiline and error on next read
msg := cr.msgBuffer.finalize()
cr.msgBuffer.setErr(err)
cr.setState((*counterReader).readFailed)
return msg, nil
}
}

// add line to current multiline event
cr.msgBuffer.addLine(message)
if cr.msgBuffer.processedLines == cr.linesCount {
msg := cr.msgBuffer.finalize()
cr.setState((*counterReader).readFirst)
return msg, nil
urso marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (cr *counterReader) readFailed() (reader.Message, error) {
err := cr.msgBuffer.err
cr.msgBuffer.setErr(nil)
cr.resetState()
return reader.Message{}, err
}

// resetState sets state of the reader to readFirst
func (cr *counterReader) resetState() {
cr.setState((*counterReader).readFirst)
}

// setState sets state to the given function
func (cr *counterReader) setState(next func(cr *counterReader) (reader.Message, error)) {
cr.state = next
}
Loading