diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 946b7c01ced6..c12c2a9574f3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -62,6 +62,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Adds Gsuite Drive support. {pull}19704[19704] - Adds Gsuite Groups support. {pull}19725[19725] - Move file metrics to dataset endpoint {pull}19977[19977] +- Add `while_pattern` type to multiline reader. {pull}19662[19662] *Heartbeat* diff --git a/filebeat/docs/multiline.asciidoc b/filebeat/docs/multiline.asciidoc index 546f71d3276c..b73d62178975 100644 --- a/filebeat/docs/multiline.asciidoc +++ b/filebeat/docs/multiline.asciidoc @@ -23,7 +23,7 @@ Also read <> and <> to avoid common mistakes. You can specify the following options in the +{beatname_lc}.inputs+ section of the +{beatname_lc}.yml+ config file to control how {beatname_uc} deals with messages -that span multiple lines. +that span multiple lines. The following example shows how to configure {beatname_uc} to handle a multiline message where the first line of the message begins with a bracket (`[`). @@ -47,8 +47,8 @@ multiline.match: after at org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction.checkBlock(TransportDeleteIndexAction.java:75) ------------------------------------------------------------------------------------- -*`multiline.type`*:: Defines which aggregation method to use. The default is `pattern`. The other option -is `count` which lets you aggregate constant number of lines. +*`multiline.type`*:: Defines which aggregation method to use. The default is `pattern`. The other options +are `count` which lets you aggregate constant number of lines and `while_pattern` which aggregate lines by pattern without match option. *`multiline.pattern`*:: Specifies the regular expression pattern to match. Note that the regexp patterns supported by {beatname_uc} differ somewhat from the patterns supported by Logstash. See <> for a list of supported regexp patterns. @@ -71,7 +71,7 @@ the pattern. + NOTE: The `after` setting is equivalent to `previous` in https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html[Logstash], and `before` is equivalent to `next`. -*`multiline.flush_pattern`*:: Specifies a regular expression, in which the current multiline will be flushed from memory, ending the multiline-message. +*`multiline.flush_pattern`*:: Specifies a regular expression, in which the current multiline will be flushed from memory, ending the multiline-message. Work only with `pattern` type. *`multiline.max_lines`*:: The maximum number of lines that can be combined into one event. If the multiline message contains more than `max_lines`, any additional diff --git a/libbeat/reader/multiline/multiline.go b/libbeat/reader/multiline/multiline.go index 04f5941c11d3..689ea1536f07 100644 --- a/libbeat/reader/multiline/multiline.go +++ b/libbeat/reader/multiline/multiline.go @@ -31,10 +31,14 @@ func New( maxBytes int, config *Config, ) (reader.Reader, error) { - if config.Type == patternMode { + switch config.Type { + case patternMode: return newMultilinePatternReader(r, separator, maxBytes, config) - } else if config.Type == countMode { + case countMode: return newMultilineCountReader(r, separator, maxBytes, config) + case whilePatternMode: + return newMultilineWhilePatternReader(r, separator, maxBytes, config) + default: + return nil, fmt.Errorf("unknown multiline type %d", config.Type) } - return nil, fmt.Errorf("unknown multiline type %d", config.Type) } diff --git a/libbeat/reader/multiline/multiline_config.go b/libbeat/reader/multiline/multiline_config.go index 586816c55e32..b2f54eb92c7b 100644 --- a/libbeat/reader/multiline/multiline_config.go +++ b/libbeat/reader/multiline/multiline_config.go @@ -29,15 +29,18 @@ type multilineType uint8 const ( patternMode multilineType = iota countMode + whilePatternMode - patternStr = "pattern" - countStr = "count" + patternStr = "pattern" + countStr = "count" + whilePatternStr = "while_pattern" ) var ( multilineTypes = map[string]multilineType{ - patternStr: patternMode, - countStr: countMode, + patternStr: patternMode, + countStr: countMode, + whilePatternStr: whilePatternMode, } ) @@ -69,6 +72,10 @@ func (c *Config) Validate() error { if c.LinesCount == 0 { return fmt.Errorf("multiline.count_lines cannot be zero when count based is selected") } + } else if c.Type == whilePatternMode { + if c.Pattern == nil { + return fmt.Errorf("multiline.pattern cannot be empty when pattern based matching is selected") + } } return nil } diff --git a/libbeat/reader/multiline/multiline_test.go b/libbeat/reader/multiline/multiline_test.go index 2297fbc98b53..2924177a63bb 100644 --- a/libbeat/reader/multiline/multiline_test.go +++ b/libbeat/reader/multiline/multiline_test.go @@ -241,6 +241,48 @@ func TestMultilineCount(t *testing.T) { ) } +func TestMultilineWhilePattern(t *testing.T) { + pattern := match.MustCompile(`^{`) + testMultilineOK(t, + Config{ + Type: whilePatternMode, + Pattern: &pattern, + Negate: false, + }, + 3, + "{line1\n{line1.1\n", + "not matched line\n", + "{line2\n{line2.1\n", + ) + // use negated + testMultilineOK(t, + Config{ + Type: whilePatternMode, + Pattern: &pattern, + Negate: true, + }, + 3, + "{line1\n", + "panic:\n~stacktrace~\n", + "{line2\n", + ) + // truncated + maxLines := 2 + testMultilineTruncated(t, + Config{ + Type: whilePatternMode, + Pattern: &pattern, + MaxLines: &maxLines, + }, + 1, + true, + []string{ + "{line1\n{line1.1\n{line1.2\n"}, + []string{ + "{line1\n{line1.1\n"}, + ) +} + func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) { _, buf := createLineBuffer(expected...) r := createMultilineTestReader(t, buf, cfg) diff --git a/libbeat/reader/multiline/while.go b/libbeat/reader/multiline/while.go new file mode 100644 index 000000000000..4a9681276519 --- /dev/null +++ b/libbeat/reader/multiline/while.go @@ -0,0 +1,225 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package multiline + +import ( + "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/readfile" +) + +// MultiLine reader combining multiple line events into one multi-line event. +// +// Consecutive lines that satisfy the regular expression will be combined. +// +// The maximum number of bytes and lines to be returned is fully configurable. +// Even if limits are reached subsequent lines are matched, until event is +// fully finished. +// +// Errors will force the multiline reader to return the currently active +// multiline event first and finally return the actual error on next call to Next. +type whilePatternReader struct { + reader reader.Reader + matcher lineMatcherFunc + logger *logp.Logger + msgBuffer *messageBuffer + state func(*whilePatternReader) (reader.Message, error) +} + +func newMultilineWhilePatternReader( + r reader.Reader, + separator string, + maxBytes int, + config *Config, +) (reader.Reader, error) { + maxLines := defaultMaxLines + if config.MaxLines != nil { + maxLines = *config.MaxLines + } + + tout := defaultMultilineTimeout + if config.Timeout != nil { + tout = *config.Timeout + } + + if tout > 0 { + r = readfile.NewTimeoutReader(r, sigMultilineTimeout, tout) + } + + matcherFunc := lineMatcher(*config.Pattern) + if config.Negate { + matcherFunc = negatedLineMatcher(matcherFunc) + } + + pr := &whilePatternReader{ + reader: r, + matcher: matcherFunc, + msgBuffer: newMessageBuffer(maxBytes, maxLines, []byte(separator), config.SkipNewLine), + logger: logp.NewLogger("reader_multiline"), + state: (*whilePatternReader).readFirst, + } + return pr, nil +} + +// Next returns next multi-line event. +func (pr *whilePatternReader) Next() (reader.Message, error) { + return pr.state(pr) +} + +func (pr *whilePatternReader) readFirst() (reader.Message, error) { + for { + message, err := pr.reader.Next() + if err != nil { + // no lines buffered -> ignore timeout + if err == sigMultilineTimeout { + continue + } + + pr.logger.Debug("Multiline event flushed because timeout reached.") + + // pass error to caller (next layer) for handling + return message, err + } + + if message.Bytes == 0 { + continue + } + + // no match, return message + if !pr.matcher(message.Content) { + return message, nil + } + + // Start new multiline event + pr.msgBuffer.startNewMessage(message) + pr.setState((*whilePatternReader).readNext) + return pr.readNext() + } +} + +func (pr *whilePatternReader) readNext() (reader.Message, error) { + for { + message, err := pr.reader.Next() + if err != nil { + // handle multiline timeout signal + if err == sigMultilineTimeout { + // no lines buffered -> ignore timeout + if pr.msgBuffer.isEmpty() { + continue + } + + pr.logger.Debug("Multiline event flushed because timeout reached.") + + // return collected multiline event and + // empty buffer for new multiline event + msg := pr.msgBuffer.finalize() + pr.resetState() + return msg, nil + } + + // handle error without any bytes returned from reader + if message.Bytes == 0 { + // no lines buffered -> return error + if pr.msgBuffer.isEmpty() { + return reader.Message{}, err + } + + // lines buffered, return multiline and error on next read + return pr.collectMessageAfterError(err) + } + + // handle error with some content being returned by reader and + // line matching multiline criteria or no multiline started yet + if pr.msgBuffer.isEmptyMessage() || pr.matcher(message.Content) { + pr.msgBuffer.addLine(message) + + // return multiline and error on next read + return pr.collectMessageAfterError(err) + } + + // no match, return current multiline and return current line on next + // call to readNext + msg := pr.msgBuffer.finalize() + pr.msgBuffer.load(message) + pr.setState((*whilePatternReader).notMatchedMessageLoad) + return msg, nil + } + + // no match, return message if buffer is empty, otherwise return current + // multiline and save message to buffer + if !pr.matcher(message.Content) { + if pr.msgBuffer.isEmptyMessage() { + return message, nil + } + msg := pr.msgBuffer.finalize() + pr.msgBuffer.load(message) + pr.setState((*whilePatternReader).notMatchedMessageLoad) + return msg, nil + } + + // add line to current multiline event + pr.msgBuffer.addLine(message) + } +} + +func (pr *whilePatternReader) collectMessageAfterError(err error) (reader.Message, error) { + msg := pr.msgBuffer.finalize() + pr.msgBuffer.setErr(err) + pr.setState((*whilePatternReader).readFailed) + return msg, nil +} + +// readFailed returns empty message and error and resets line reader +func (pr *whilePatternReader) readFailed() (reader.Message, error) { + err := pr.msgBuffer.err + pr.msgBuffer.setErr(nil) + pr.resetState() + return reader.Message{}, err +} + +// notMatchedMessageLoad returns not matched message from buffer +func (pr *whilePatternReader) notMatchedMessageLoad() (reader.Message, error) { + msg := pr.msgBuffer.finalize() + pr.resetState() + return msg, nil +} + +// resetState sets state of the reader to readFirst +func (pr *whilePatternReader) resetState() { + pr.setState((*whilePatternReader).readFirst) +} + +// setState sets state to the given function +func (pr *whilePatternReader) setState(next func(pr *whilePatternReader) (reader.Message, error)) { + pr.state = next +} + +type lineMatcherFunc func(content []byte) bool + +func lineMatcher(pat match.Matcher) lineMatcherFunc { + return func(content []byte) bool { + return pat.Match(content) + } +} + +func negatedLineMatcher(m lineMatcherFunc) lineMatcherFunc { + return func(content []byte) bool { + return !m(content) + } +}