diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f56449eae86a..8442eb84a0cd 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -141,6 +141,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Add default_fields to Elasticsearch template when connecting to Elasticsearch >= 7.0. {pull}7015[7015] - Add support for loading a template.json file directly instead of using fields.yml. {pull}7039[7039] - Add support for keyword multifields in field.yml. {pull}7131[7131] +- Add dissect processor. {pull}6925[6925] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 8383283461b5..1d2daefc2863 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -250,6 +250,14 @@ auditbeat.modules: # - from: "a" # to: "b" # +# The following example tokenizes the string into fields: +# +#processors: +#- dissect: +# tokenizer: "%{key1} - %{key2}" +# field: "message" +# target_prefix: "dissect" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 99c8e9e9da4f..6ef0073f868e 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -886,6 +886,14 @@ filebeat.inputs: # - from: "a" # to: "b" # +# The following example tokenizes the string into fields: +# +#processors: +#- dissect: +# tokenizer: "%{key1} - %{key2}" +# field: "message" +# target_prefix: "dissect" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/filebeat/tests/system/test_processors.py b/filebeat/tests/system/test_processors.py index e5c3a82a031d..b92b1132c460 100644 --- a/filebeat/tests/system/test_processors.py +++ b/filebeat/tests/system/test_processors.py @@ -115,3 +115,80 @@ def test_condition(self): assert "beat.name" in output assert "message" in output assert "test" in output["message"] + + def test_dissect_good_tokenizer(self): + """ + Check dissect with a good tokenizer + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + processors=[{ + "dissect": { + "tokenizer": "\"%{key} world\"", + "field": "message", + "target_prefix": "extracted" + }, + }] + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("Hello world\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp"], + )[0] + assert output["extracted.key"] == "Hello" + + def test_dissect_defaults(self): + """ + Check dissect defaults + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + processors=[{ + "dissect": { + "tokenizer": "\"%{key} world\"", + }, + }] + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("Hello world\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp"], + )[0] + assert output["dissect.key"] == "Hello" + + def test_dissect_bad_tokenizer(self): + """ + Check dissect with a bad tokenizer + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + processors=[{ + "dissect": { + "tokenizer": "\"not %{key} world\"", + "field": "message", + "target_prefix": "extracted" + }, + }] + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("Hello world\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp"], + )[0] + assert "extracted.key" not in output + assert output["message"] == "Hello world" diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 4ff38198ffed..c0aa5b4b30ee 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -359,6 +359,14 @@ heartbeat.scheduler: # - from: "a" # to: "b" # +# The following example tokenizes the string into fields: +# +#processors: +#- dissect: +# tokenizer: "%{key1} - %{key2}" +# field: "message" +# target_prefix: "dissect" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index 86015e64d7fc..be93f76b56d0 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -145,6 +145,14 @@ # - from: "a" # to: "b" # +# The following example tokenizes the string into fields: +# +#processors: +#- dissect: +# tokenizer: "%{key1} - %{key2}" +# field: "message" +# target_prefix: "dissect" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 1f5b4deb9d0c..568bf8205766 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -54,6 +54,7 @@ import ( _ "github.com/elastic/beats/libbeat/processors/add_host_metadata" _ "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata" _ "github.com/elastic/beats/libbeat/processors/add_locale" + _ "github.com/elastic/beats/libbeat/processors/dissect" // Register autodiscover providers _ "github.com/elastic/beats/libbeat/autodiscover/providers/docker" diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 71a0874709a4..5e4c4333fef4 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -47,6 +47,7 @@ The supported processors are: * <> * <> * <> + * <> [[conditions]] ==== Conditions @@ -761,3 +762,34 @@ The fields added to the event are looking as following: ------------------------------------------------------------------------------- NOTE: The host information is refreshed every 5 minutes. + +[[dissect]] +=== Dissect strings + +The dissect processor tokenizes incoming strings using defined patterns. + +[source,yaml] +------- +processors: +- dissect: + tokenizer: "%{key1} %{key2}" + field: "message" + target_prefix: "dissect" +------- + +The `dissect` processor has the following configuration settings: + +`field`:: (Optional) The event field to tokenize. Default is `message`. + +`target_prefix`:: (Optional) The name of the field where the values will be extracted. When an empty +string is defined, the processor will create the keys at the root of the event. Default is +`dissect`. When the target key already exists in the event, the processor won't replace it and log +an error; you need to either drop or rename the key before using dissect. + +For tokenization to be successful, all keys must be found and extracted, if one of them cannot be +found an error will be logged and no modification is done on the original event. + +NOTE: A key can contain any characters except reserved suffix or prefix modifiers: `/`,`&`, `+` +and `?`. + +See <> for a list of supported conditions. diff --git a/libbeat/processors/dissect/.gitignore b/libbeat/processors/dissect/.gitignore new file mode 100644 index 000000000000..f216c4184967 --- /dev/null +++ b/libbeat/processors/dissect/.gitignore @@ -0,0 +1 @@ +dissect_tests.json diff --git a/libbeat/processors/dissect/config.go b/libbeat/processors/dissect/config.go new file mode 100644 index 000000000000..1a61fbc4e2e7 --- /dev/null +++ b/libbeat/processors/dissect/config.go @@ -0,0 +1,25 @@ +package dissect + +type config struct { + Tokenizer *tokenizer `config:"tokenizer"` + Field string `config:"field"` + TargetPrefix string `config:"target_prefix"` +} + +var defaultConfig = config{ + Field: "message", + TargetPrefix: "dissect", +} + +// tokenizer add validation at the unpack level for this specific field. +type tokenizer = Dissector + +// Unpack a tokenizer into a dissector this will trigger the normal validation of the dissector. +func (t *tokenizer) Unpack(v string) error { + d, err := New(v) + if err != nil { + return err + } + *t = *d + return nil +} diff --git a/libbeat/processors/dissect/config_test.go b/libbeat/processors/dissect/config_test.go new file mode 100644 index 000000000000..96d048c7f28d --- /dev/null +++ b/libbeat/processors/dissect/config_test.go @@ -0,0 +1,43 @@ +package dissect + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestTokenizerType(t *testing.T) { + t.Run("valid", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%{value1}", + "field": "message", + }) + if !assert.NoError(t, err) { + return + } + + cfg := config{} + err = c.Unpack(&cfg) + if !assert.NoError(t, err) { + return + } + }) + + t.Run("invalid", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%value1}", + "field": "message", + }) + if !assert.NoError(t, err) { + return + } + + cfg := config{} + err = c.Unpack(&cfg) + if !assert.Error(t, err) { + return + } + }) +} diff --git a/libbeat/processors/dissect/const.go b/libbeat/processors/dissect/const.go new file mode 100644 index 000000000000..079aa16560ea --- /dev/null +++ b/libbeat/processors/dissect/const.go @@ -0,0 +1,32 @@ +package dissect + +import ( + "errors" + "regexp" +) + +var ( + // delimiterRE tokenizes the following string into walkable with extracted delimiter + key. + // string: + // ` %{key}, %{key/2}` + // into: + // [["", "key" ], [", ", "key/2"]] + delimiterRE = regexp.MustCompile("(?s)(.*?)%\\{([^}]*?)}") + suffixRE = regexp.MustCompile("(.+?)(/(\\d{1,2}))?(->)?$") + + skipFieldPrefix = "?" + appendFieldPrefix = "+" + indirectFieldPrefix = "&" + appendIndirectPrefix = "+&" + indirectAppendPrefix = "&+" + greedySuffix = "->" + + defaultJoinString = " " + + errParsingFailure = errors.New("parsing failure") + errInvalidTokenizer = errors.New("invalid dissect tokenizer") + errEmpty = errors.New("empty string provided") + errMixedPrefixIndirectAppend = errors.New("mixed prefix `&+`") + errMixedPrefixAppendIndirect = errors.New("mixed prefix `&+`") + errEmptyKey = errors.New("empty key") +) diff --git a/libbeat/processors/dissect/delimiter.go b/libbeat/processors/dissect/delimiter.go new file mode 100644 index 000000000000..b60978fb9717 --- /dev/null +++ b/libbeat/processors/dissect/delimiter.go @@ -0,0 +1,125 @@ +package dissect + +import ( + "fmt" + "strings" +) + +//delimiter represents a text section after or before a key, it keeps track of the needle and allows +// to retrieve the position where it starts from a haystack. +type delimiter interface { + // IndexOf receives the haystack and a offset position and will return the absolute position where + // the needle is found. + IndexOf(haystack string, offset int) int + + // Len returns the length of the needle used to calculate boundaries. + Len() int + + // String displays debugging information. + String() string + + // Delimiter returns the actual delimiter string. + Delimiter() string + + // IsGreedy return true if the next key should be greedy (end of string) or when explicitely + // configured. + IsGreedy() bool + + // MarkGreedy marks this delimiter as greedy. + MarkGreedy() + + // Next returns the next delimiter in the chain. + Next() delimiter + + //SetNext sets the next delimiter or nil if current delimiter is the last. + SetNext(d delimiter) +} + +// zeroByte represents a zero string delimiter its usually start of the line. +type zeroByte struct { + needle string + greedy bool + next delimiter +} + +func (z *zeroByte) IndexOf(haystack string, offset int) int { + return offset +} + +func (z *zeroByte) Len() int { + return 0 +} + +func (z *zeroByte) String() string { + return "delimiter: zerobyte" +} + +func (z *zeroByte) Delimiter() string { + return z.needle +} + +func (z *zeroByte) IsGreedy() bool { + return z.greedy +} + +func (z *zeroByte) MarkGreedy() { + z.greedy = true +} + +func (z *zeroByte) Next() delimiter { + return z.next +} + +func (z *zeroByte) SetNext(d delimiter) { + z.next = d +} + +// multiByte represents a delimiter with at least one byte. +type multiByte struct { + needle string + greedy bool + next delimiter +} + +func (m *multiByte) IndexOf(haystack string, offset int) int { + i := strings.Index(haystack[offset:], m.needle) + if i != -1 { + return i + offset + } + return -1 +} + +func (m *multiByte) Len() int { + return len(m.needle) +} + +func (m *multiByte) IsGreedy() bool { + return m.greedy +} + +func (m *multiByte) MarkGreedy() { + m.greedy = true +} + +func (m *multiByte) String() string { + return fmt.Sprintf("delimiter: multibyte (match: '%s', len: %d)", string(m.needle), m.Len()) +} + +func (m *multiByte) Delimiter() string { + return m.needle +} + +func (m *multiByte) Next() delimiter { + return m.next +} + +func (m *multiByte) SetNext(d delimiter) { + m.next = d +} + +func newDelimiter(needle string) delimiter { + if len(needle) == 0 { + return &zeroByte{} + } + return &multiByte{needle: needle} +} diff --git a/libbeat/processors/dissect/delimiter_test.go b/libbeat/processors/dissect/delimiter_test.go new file mode 100644 index 000000000000..ded44e236c10 --- /dev/null +++ b/libbeat/processors/dissect/delimiter_test.go @@ -0,0 +1,17 @@ +package dissect + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMultiByte(t *testing.T) { + m := newDelimiter("needle") + assert.Equal(t, 3, m.IndexOf(" needle", 1)) +} + +func TestSingleByte(t *testing.T) { + m := newDelimiter("") + assert.Equal(t, 5, m.IndexOf(" needle", 5)) +} diff --git a/libbeat/processors/dissect/dissect.go b/libbeat/processors/dissect/dissect.go new file mode 100644 index 000000000000..74e096bb4b89 --- /dev/null +++ b/libbeat/processors/dissect/dissect.go @@ -0,0 +1,131 @@ +package dissect + +import "fmt" + +// Map represents the keys and their values extracted with the defined tokenizer. +type Map = map[string]string + +// positions represents the start and end position of the keys found in the string. +type positions []position + +type position struct { + start int + end int +} + +// Dissector is a tokenizer based on the Dissect syntax as defined at: +// https://www.elastic.co/guide/en/logstash/current/plugins-filters-dissect.html +type Dissector struct { + raw string + parser *parser +} + +// Dissect takes the raw string and will use the defined tokenizer to return a map with the +// extracted keys and their values. +// +// Dissect uses a 3 steps process: +// - Find the key positions +// - Extract and resolve the keys (append / indirect) +// - Ignore namedSkipField +func (d *Dissector) Dissect(s string) (Map, error) { + if len(s) == 0 { + return nil, errEmpty + } + + positions, err := d.extract(s) + if err != nil { + return nil, err + } + + if len(positions) == 0 { + return nil, errParsingFailure + } + + return d.resolve(s, positions), nil +} + +// Raw returns the raw tokenizer used to generate the actual parser. +func (d *Dissector) Raw() string { + return d.raw +} + +// extract will navigate through the delimiters and will save the ending and starting position +// of the keys. After we will resolve the positions with the required fields and do the reordering. +func (d *Dissector) extract(s string) (positions, error) { + positions := make([]position, len(d.parser.fields)) + var i, start, lookahead, end int + + // Position on the first delimiter, we assume a hard match on the first delimiter. + // Previous version of dissect was doing a lookahead in the string until it can find the delimiter, + // LS and Beats now have the same behavior and this is consistent with the principle of least + // surprise. + dl := d.parser.delimiters[0] + offset := dl.IndexOf(s, 0) + if offset == -1 || offset != 0 { + return nil, fmt.Errorf( + "could not find beginning delimiter: `%s` in remaining: `%s`, (offset: %d)", + dl.Delimiter(), s, 0, + ) + } + offset += dl.Len() + + // move through all the other delimiters, until we have consumed all of them. + for dl.Next() != nil { + start = offset + end = dl.Next().IndexOf(s, offset) + if end == -1 { + return nil, fmt.Errorf( + "could not find delimiter: `%s` in remaining: `%s`, (offset: %d)", + dl.Delimiter(), s[offset:], offset, + ) + } + + offset = end + + // Greedy consumes keys defined with padding. + // Keys are defined with `->` suffix. + if dl.IsGreedy() { + for { + lookahead = dl.Next().IndexOf(s, offset+1) + if lookahead != offset+1 { + break + } else { + offset = lookahead + } + } + } + + positions[i] = position{start: start, end: end} + offset += dl.Next().Len() + i++ + dl = dl.Next() + } + + if offset < len(s) { + positions[i] = position{start: offset, end: len(s)} + } + return positions, nil +} + +// resolve takes the raw string and the extracted positions and apply fields syntax. +func (d *Dissector) resolve(s string, p positions) Map { + m := make(Map, len(p)) + for _, f := range d.parser.fields { + pos := p[f.ID()] + f.Apply(s[pos.start:pos.end], m) + } + + for _, f := range d.parser.skipFields { + delete(m, f.Key()) + } + return m +} + +// New creates a new Dissector from a tokenized string. +func New(tokenizer string) (*Dissector, error) { + p, err := newParser(tokenizer) + if err != nil { + return nil, err + } + return &Dissector{parser: p, raw: tokenizer}, nil +} diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go new file mode 100644 index 000000000000..6cd427a0533e --- /dev/null +++ b/libbeat/processors/dissect/dissect_test.go @@ -0,0 +1,336 @@ +package dissect + +import ( + "encoding/json" + "flag" + "io/ioutil" + "regexp" + "testing" + + "github.com/stretchr/testify/assert" +) + +var export = flag.Bool("test.export-dissect", false, "export dissect tests to JSON.") + +func TestNoToken(t *testing.T) { + _, err := New("hello") + assert.Equal(t, errInvalidTokenizer, err) +} + +func TestEmptyString(t *testing.T) { + d, err := New("%{hello}") + _, err = d.Dissect("") + assert.Equal(t, errEmpty, err) +} + +// JSON tags are used to create a common test file for the `logstash-filter-dissect` and the +// beat implementation. +type dissectTest struct { + Name string `json:"name"` + Tok string `json:"tok"` + Msg string `json:"msg"` + Expected Map `json:"expected"` + Skip bool `json:"skip"` + Fail bool `json:"fail"` +} + +var tests = []dissectTest{ + { + Name: "Complex stack trace", + Tok: "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{message}", + Msg: `18-Apr-2018 06:53:20.411 INFO [http-nio-8080-exec-1] org.apache.coyote.http11.Http11Processor.service Error parsing HTTP request header + Note: further occurrences of HTTP header parsing errors will be logged at DEBUG level. + java.lang.IllegalArgumentException: Invalid character found in method name. HTTP method names must be tokens + at org.apache.coyote.http11.Http11InputBuffer.parseRequestLine(Http11InputBuffer.java:426) + at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:687) + at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) + at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) + at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) + at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) + at java.lang.Thread.run(Thread.java:748)`, + Expected: Map{ + "day": "18", + "month": "Apr", + "year": "2018", + "hour": "06:53:20.411", + "severity": "INFO", + "thread_id": "http-nio-8080-exec-1", + "origin": "org.apache.coyote.http11.Http11Processor.service", + "message": `Error parsing HTTP request header + Note: further occurrences of HTTP header parsing errors will be logged at DEBUG level. + java.lang.IllegalArgumentException: Invalid character found in method name. HTTP method names must be tokens + at org.apache.coyote.http11.Http11InputBuffer.parseRequestLine(Http11InputBuffer.java:426) + at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:687) + at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) + at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) + at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) + at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) + at java.lang.Thread.run(Thread.java:748)`, + }, + }, + { + Name: "fails when delimiter is not found at the beginning of the string", + Tok: "/var/log/%{key}.log", + Msg: "foobar", + Fail: true, + }, + { + Name: "fails when delimiter is not found after the key", + Tok: "/var/log/%{key}.log", + Msg: "/var/log/foobar", + Fail: true, + }, + { + Name: "simple dissect", + Tok: "%{key}", + Msg: "foobar", + Expected: Map{"key": "foobar"}, + }, + { + Name: "dissect two replacement", + Tok: "%{key1} %{key2}", + Msg: "foo bar", + Expected: Map{"key1": "foo", "key2": "bar"}, + }, + { + Name: "one level dissect not end of string", + Tok: "/var/%{key}/log", + Msg: "/var/foobar/log", + Expected: Map{"key": "foobar"}, + }, + { + Name: "one level dissect", + Tok: "/var/%{key}", + Msg: "/var/foobar/log", + Expected: Map{"key": "foobar/log"}, + }, + { + Name: "multiple keys dissect end of string", + Tok: "/var/%{key}/log/%{key1}", + Msg: "/var/foobar/log/apache", + Expected: Map{"key": "foobar", "key1": "apache"}, + }, + { + Name: "multiple keys not end of string", + Tok: "/var/%{key}/log/%{key1}.log", + Msg: "/var/foobar/log/apache.log", + Expected: Map{"key": "foobar", "key1": "apache"}, + }, + { + Name: "simple ordered", + Tok: "%{+key/3} %{+key/1} %{+key/2}", + Msg: "1 2 3", + Expected: Map{"key": "2 3 1"}, + }, + { + Name: "simple append", + Tok: "%{key}-%{+key}-%{+key}", + Msg: "1-2-3", + Expected: Map{"key": "1-2-3"}, + }, + { + Name: "indirect field", + Tok: "%{key} %{&key}", + Msg: "hello world", + Expected: Map{"key": "hello", "hello": "world"}, + }, + { + Name: "skip field", + Tok: "%{} %{key}", + Msg: "hello world", + Expected: Map{"key": "world"}, + }, + { + Name: "named skiped field with indirect", + Tok: "%{?key} %{&key}", + Msg: "hello world", + Expected: Map{"hello": "world"}, + }, + { + Name: "missing fields", + Tok: "%{name},%{addr1},%{addr2},%{addr3},%{city},%{zip}", + Msg: "Jane Doe,4321 Fifth Avenue,,,New York,87432", + Expected: Map{ + "name": "Jane Doe", + "addr1": "4321 Fifth Avenue", + "addr2": "", + "addr3": "", + "city": "New York", + "zip": "87432", + }, + }, + { + Name: "ignore right padding", + Tok: "%{id} %{function->} %{server}", + Msg: "00000043 ViewReceive machine-321", + Expected: Map{ + "id": "00000043", + "function": "ViewReceive", + "server": "machine-321", + }, + }, + { + Name: "padding on the last key need a delimiter", + Tok: "%{id} %{function} %{server->} ", + Msg: "00000043 ViewReceive machine-321 ", + Expected: Map{ + "id": "00000043", + "function": "ViewReceive", + "server": "machine-321", + }, + }, + { + Name: "ignore left padding", + Tok: "%{id->} %{function} %{server}", + Msg: "00000043 ViewReceive machine-321", + Expected: Map{ + "id": "00000043", + "function": "ViewReceive", + "server": "machine-321", + }, + }, + { + Name: "when the delimiters contains `{` and `}`", + Tok: "{%{a}}{%{b}} %{rest}", + Msg: "{c}{d} anything", + Expected: Map{ + "a": "c", + "b": "d", + "rest": "anything", + }, + }, +} + +func TestDissect(t *testing.T) { + if export != nil && *export { + dumpJSON() + return + } + + for _, test := range tests { + if test.Skip { + continue + } + t.Run(test.Name, func(t *testing.T) { + d, err := New(test.Tok) + if !assert.NoError(t, err) { + return + } + + if test.Fail { + _, err := d.Dissect(test.Msg) + assert.Error(t, err) + return + } + + r, err := d.Dissect(test.Msg) + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, test.Expected, r) + }) + } +} + +var results Map +var o [][]string + +func BenchmarkDissect(b *testing.B) { + for _, test := range tests { + if test.Skip { + continue + } + b.Run(test.Name, func(b *testing.B) { + tok := test.Tok + msg := test.Msg + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + if test.Fail { + assert.Error(b, err) + return + } + assert.NoError(b, err) + results = r + } + }) + } + + // Add a few regular expression matches agains the same string the test suite, + // this give us a baseline to compare to, note that we only test a raw match against the string. + b.Run("Regular expression", func(b *testing.B) { + re := regexp.MustCompile("/var/log/([a-z]+)/log/([a-z]+)/apache/([a-b]+)") + by := "/var/log/docker/more/apache/super" + b.ReportAllocs() + for n := 0; n < b.N; n++ { + o = re.FindAllStringSubmatch(by, -1) + } + }) + + b.Run("Larger regular expression", func(b *testing.B) { + re := regexp.MustCompile("^(\\d{2})-(\\w{3})-(\\d{4})\\s([0-9:.]+)\\s(\\w+)\\s\\[([a-zA-Z0-9-]+)\\]\\s([a-zA-Z0-9.]+)\\s(.+)") + + by := `18-Apr-2018 06:53:20.411 INFO [http-nio-8080-exec-1] org.apache.coyote.http11.Http11Processor.service Error parsing HTTP request header + Note: further occurrences of HTTP header parsing errors will be logged at DEBUG level. + java.lang.IllegalArgumentException: Invalid character found in method name. HTTP method names must be tokens + at org.apache.coyote.http11.Http11InputBuffer.parseRequestLine(Http11InputBuffer.java:426) + at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:687) + at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) + at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) + at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) + at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) + at java.lang.Thread.run(Thread.java:748)` + b.ReportAllocs() + for n := 0; n < b.N; n++ { + o = re.FindAllStringSubmatch(by, -1) + } + }) + + b.Run("regular expression to match end of line", func(b *testing.B) { + re := regexp.MustCompile("MACHINE\\[(\\w+)\\]$") + + by := `18-Apr-2018 06:53:20.411 INFO [http-nio-8080-exec-1] org.apache.coyote.http11.Http11Processor.service Error parsing HTTP request header + Note: further occurrences of HTTP header parsing errors will be logged at DEBUG level. + java.lang.IllegalArgumentException: Invalid character found in method name. HTTP method names must be tokens + at org.apache.coyote.http11.Http11InputBuffer.parseRequestLine(Http11InputBuffer.java:426) + at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:687) + at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) + at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) + at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) + at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) + at java.lang.Thread.run(Thread.java:748) MACHINE[hello]` + b.ReportAllocs() + for n := 0; n < b.N; n++ { + o = re.FindAllStringSubmatch(by, -1) + } + }) +} + +func dumpJSON() { + b, err := json.MarshalIndent(&tests, "", "\t") + if err != nil { + panic("could not marshal json") + } + + err = ioutil.WriteFile("dissect_tests.json", b, 0666) + if err != nil { + panic("could not write to file") + } +} diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go new file mode 100644 index 000000000000..06b428688a75 --- /dev/null +++ b/libbeat/processors/dissect/field.go @@ -0,0 +1,242 @@ +package dissect + +import ( + "fmt" + "strconv" + "strings" +) + +type field interface { + MarkGreedy() + IsGreedy() bool + Ordinal() int + Key() string + ID() int + Apply(b string, m Map) + String() string + IsSaveable() bool +} + +type baseField struct { + id int + key string + ordinal int + greedy bool +} + +func (f baseField) IsGreedy() bool { + return f.greedy +} + +func (f baseField) MarkGreedy() { + f.greedy = true +} + +func (f baseField) Ordinal() int { + return f.ordinal +} + +func (f baseField) Key() string { + return f.key +} + +func (f baseField) ID() int { + return f.id +} + +func (f baseField) IsSaveable() bool { + return true +} + +func (f baseField) String() string { + return fmt.Sprintf("field: %s, ordinal: %d, greedy: %v", f.key, f.ordinal, f.IsGreedy()) +} + +// normalField is a simple key reference like this: `%{key}` +// +// dissect: %{key} +// message: hello +// result: +// key: hello +type normalField struct { + baseField +} + +func (f normalField) Apply(b string, m Map) { + m[f.Key()] = b +} + +// skipField is an skip field without a name like this: `%{}`, this is often used to +// skip uninteresting parts of a string. +// +// dissect: %{} %{key} +// message: hello world +// result: +// key: world +type skipField struct { + baseField +} + +func (f skipField) Apply(b string, m Map) { +} + +func (f skipField) IsSaveable() bool { + return false +} + +// namedSkipFields is a named skip field with the following syntax: `%{?key}`, this is used +// in conjunction of the indirect field to create a custom `key => value` pair. +// +// dissect: %{?key} %{&key} +// message: hello world +// result: +// hello: world +type namedSkipField struct { + baseField +} + +func (f namedSkipField) Apply(b string, m Map) { + m[f.Key()] = b +} + +func (f namedSkipField) IsSaveable() bool { + return false +} + +// IndirectField is a value that will be extracted and saved in a previously defined namedSkipField. +// the field is defined with the following syntax: `%{&key}`. +// +// dissect: %{?key} %{&key} +// message: hello world +// result: +// hello: world +type indirectField struct { + baseField +} + +func (f indirectField) Apply(b string, m Map) { + v, ok := m[f.Key()] + if ok { + m[v] = b + return + } +} + +// appendField allow an extracted field to be append to a previously extracted values. +// the field is defined with the following syntax: `%{+key} %{+key}`. +// +// dissect: %{+key} %{+key} +// message: hello world +// result: +// key: hello world +// +// dissect: %{+key/2} %{+key/1} +// message: hello world +// result: +// key: world hello +type appendField struct { + baseField + previous delimiter +} + +func (f appendField) Apply(b string, m Map) { + v, ok := m[f.Key()] + if ok { + m[f.Key()] = v + f.JoinString() + b + return + } + m[f.Key()] = b +} + +func (f appendField) JoinString() string { + if f.previous == nil || f.previous.Len() == 0 { + return defaultJoinString + } + return f.previous.Delimiter() +} + +func newField(id int, rawKey string, previous delimiter) (field, error) { + if len(rawKey) == 0 { + return newSkipField(id), nil + } + + key, ordinal, greedy := extractKeyParts(rawKey) + + // Conflicting prefix used. + if strings.HasPrefix(key, appendIndirectPrefix) { + return nil, errMixedPrefixIndirectAppend + } + + if strings.HasPrefix(key, indirectAppendPrefix) { + return nil, errMixedPrefixAppendIndirect + } + + if strings.HasPrefix(key, skipFieldPrefix) { + return newNamedSkipField(id, key[1:]), nil + } + + if strings.HasPrefix(key, appendFieldPrefix) { + return newAppendField(id, key[1:], ordinal, greedy, previous), nil + } + + if strings.HasPrefix(key, indirectFieldPrefix) { + return newIndirectField(id, key[1:]), nil + } + + return newNormalField(id, key, ordinal, greedy), nil +} + +func newSkipField(id int) skipField { + return skipField{baseField{id: id}} +} + +func newNamedSkipField(id int, key string) namedSkipField { + return namedSkipField{ + baseField{id: id, key: key}, + } +} + +func newAppendField(id int, key string, ordinal int, greedy bool, previous delimiter) appendField { + return appendField{ + baseField: baseField{ + id: id, + key: key, + ordinal: ordinal, + greedy: greedy, + }, + previous: previous, + } +} + +func newIndirectField(id int, key string) indirectField { + return indirectField{ + baseField{ + id: id, + key: key, + }, + } +} + +func newNormalField(id int, key string, ordinal int, greedy bool) normalField { + return normalField{ + baseField{ + id: id, + key: key, + ordinal: ordinal, + greedy: greedy, + }, + } +} + +func extractKeyParts(rawKey string) (key string, ordinal int, greedy bool) { + m := suffixRE.FindAllStringSubmatch(rawKey, -1) + + if m[0][3] != "" { + ordinal, _ = strconv.Atoi(m[0][3]) + } + + if strings.EqualFold(greedySuffix, m[0][4]) { + greedy = true + } + return m[0][1], ordinal, greedy +} diff --git a/libbeat/processors/dissect/parser.go b/libbeat/processors/dissect/parser.go new file mode 100644 index 000000000000..40237e428606 --- /dev/null +++ b/libbeat/processors/dissect/parser.go @@ -0,0 +1,72 @@ +package dissect + +import ( + "sort" +) + +// parser extracts the useful information from the raw tokenizer string, fields, delimiters and +// skip fields. +type parser struct { + delimiters []delimiter + fields []field + skipFields []field +} + +func newParser(tokenizer string) (*parser, error) { + // returns pair of delimiter + key + matches := delimiterRE.FindAllStringSubmatchIndex(tokenizer, -1) + if len(matches) == 0 { + return nil, errInvalidTokenizer + } + + var delimiters []delimiter + var fields []field + + pos := 0 + for id, m := range matches { + d := newDelimiter(tokenizer[m[2]:m[3]]) + key := tokenizer[m[4]:m[5]] + field, err := newField(id, key, d) + if err != nil { + return nil, err + } + if field.IsGreedy() { + d.MarkGreedy() + } + fields = append(fields, field) + delimiters = append(delimiters, d) + pos = m[5] + 1 + } + + if pos < len(tokenizer) { + d := newDelimiter(tokenizer[pos:]) + delimiters = append(delimiters, d) + } + + // Chain delimiters between them to make it easier to match them with the string. + // Some delimiters also need information about their surrounding for decision. + for i := 0; i < len(delimiters); i++ { + if i+1 < len(delimiters) { + delimiters[i].SetNext(delimiters[i+1]) + } + } + + // group and order append field at the end so the string join is from left to right. + sort.Slice(fields, func(i, j int) bool { + return fields[i].Ordinal() < fields[j].Ordinal() + }) + + // List of fields needed for indirection but don't need to appear in the final event. + var skipFields []field + for _, f := range fields { + if !f.IsSaveable() { + skipFields = append(skipFields, f) + } + } + + return &parser{ + delimiters: delimiters, + fields: fields, + skipFields: skipFields, + }, nil +} diff --git a/libbeat/processors/dissect/processor.go b/libbeat/processors/dissect/processor.go new file mode 100644 index 000000000000..d1db3749009f --- /dev/null +++ b/libbeat/processors/dissect/processor.go @@ -0,0 +1,103 @@ +package dissect + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" +) + +type mapperFunc func(e *beat.Event, m common.MapStr) (*beat.Event, error) + +type processor struct { + config config + mapper mapperFunc +} + +func init() { + processors.RegisterPlugin("dissect", newProcessor) +} + +func newProcessor(c *common.Config) (processors.Processor, error) { + config := defaultConfig + err := c.Unpack(&config) + if err != nil { + return nil, err + } + p := &processor{config: config} + + if config.TargetPrefix == "" { + p.mapper = p.mapToRoot + } else { + p.mapper = p.mapToField + } + + return p, nil +} + +// Run takes the event and will apply the tokenizer on the configured field. +func (p *processor) Run(event *beat.Event) (*beat.Event, error) { + v, err := event.GetValue(p.config.Field) + if err != nil { + return event, err + } + + s, ok := v.(string) + if !ok { + return event, fmt.Errorf("field is not a string, value: `%v`, field: `%s`", v, p.config.Field) + } + + m, err := p.config.Tokenizer.Dissect(s) + if err != nil { + return event, err + } + + event, err = p.mapper(event, mapToMapStr(m)) + if err != nil { + return event, err + } + + return event, nil +} + +func (p *processor) mapToRoot(event *beat.Event, m common.MapStr) (*beat.Event, error) { + copy := event.Fields.Clone() + + for k, v := range m { + if _, err := event.GetValue(k); err == common.ErrKeyNotFound { + event.PutValue(k, v) + } else { + event.Fields = copy + return event, fmt.Errorf("cannot override existing key: `%s`", k) + } + } + + return event, nil +} + +func (p *processor) mapToField(event *beat.Event, m common.MapStr) (*beat.Event, error) { + if _, err := event.GetValue(p.config.TargetPrefix); err != nil && err != common.ErrKeyNotFound { + return event, fmt.Errorf("cannot override existing key: `%s`", p.config.TargetPrefix) + } + + _, err := event.PutValue(p.config.TargetPrefix, m) + if err != nil { + return event, err + } + return event, nil +} + +func (p *processor) String() string { + return "dissect=" + p.config.Tokenizer.Raw() + + ",field=" + p.config.Field + + ",target_prefix=" + p.config.TargetPrefix +} + +func mapToMapStr(m Map) common.MapStr { + newMap := make(common.MapStr, len(m)) + for k, v := range m { + newMap[k] = v + } + return newMap +} diff --git a/libbeat/processors/dissect/processor_test.go b/libbeat/processors/dissect/processor_test.go new file mode 100644 index 000000000000..d96a1a321279 --- /dev/null +++ b/libbeat/processors/dissect/processor_test.go @@ -0,0 +1,132 @@ +package dissect + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestProcessor(t *testing.T) { + tests := []struct { + name string + c map[string]interface{} + fields common.MapStr + values map[string]string + }{ + { + name: "default field/default target", + c: map[string]interface{}{"tokenizer": "hello %{key}"}, + fields: common.MapStr{"message": "hello world"}, + values: map[string]string{"dissect.key": "world"}, + }, + { + name: "default field/target root", + c: map[string]interface{}{"tokenizer": "hello %{key}", "target_prefix": ""}, + fields: common.MapStr{"message": "hello world"}, + values: map[string]string{"key": "world"}, + }, + { + name: "specific field/target root", + c: map[string]interface{}{ + "tokenizer": "hello %{key}", + "target_prefix": "", + "field": "new_field", + }, + fields: common.MapStr{"new_field": "hello world"}, + values: map[string]string{"key": "world"}, + }, + { + name: "specific field/specific target", + c: map[string]interface{}{ + "tokenizer": "hello %{key}", + "target_prefix": "new_target", + "field": "new_field", + }, + fields: common.MapStr{"new_field": "hello world"}, + values: map[string]string{"new_target.key": "world"}, + }, + { + name: "set map under a root key", + c: map[string]interface{}{ + "tokenizer": "hello %{key} %{key2}", + "target_prefix": "extracted", + "field": "message", + }, + fields: common.MapStr{"message": "hello world super", "extracted": "not hello"}, + values: map[string]string{"extracted.key": "world", "extracted.key2": "super"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c, err := common.NewConfigFrom(test.c) + if !assert.NoError(t, err) { + return + } + + processor, err := newProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: test.fields} + newEvent, err := processor.Run(&e) + if !assert.NoError(t, err) { + return + } + + for field, value := range test.values { + v, err := newEvent.GetValue(field) + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, value, v) + } + }) + } +} + +func TestFieldDoesntExist(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{"tokenizer": "hello %{key}"}) + if !assert.NoError(t, err) { + return + } + + processor, err := newProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: common.MapStr{"hello": "world"}} + _, err = processor.Run(&e) + if !assert.Error(t, err) { + return + } +} + +func TestFieldAlreadyExist(t *testing.T) { + t.Run("root prefix", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "hello %{key}", + "target_prefix": "", + }) + if !assert.NoError(t, err) { + return + } + + processor, err := newProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: common.MapStr{"message": "hello world", "key": "exist"}} + _, err = processor.Run(&e) + if !assert.Error(t, err) { + return + } + }) +} diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 56bf33fc6dd5..907cf6f2977d 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -763,6 +763,14 @@ metricbeat.modules: # - from: "a" # to: "b" # +# The following example tokenizes the string into fields: +# +#processors: +#- dissect: +# tokenizer: "%{key1} - %{key2}" +# field: "message" +# target_prefix: "dissect" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 733e7a1ba5cf..0dce126ae8e4 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -622,6 +622,14 @@ packetbeat.protocols: # - from: "a" # to: "b" # +# The following example tokenizes the string into fields: +# +#processors: +#- dissect: +# tokenizer: "%{key1} - %{key2}" +# field: "message" +# target_prefix: "dissect" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud. diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index aaf2f826ed86..7164ab1972ee 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -174,6 +174,14 @@ winlogbeat.event_logs: # - from: "a" # to: "b" # +# The following example tokenizes the string into fields: +# +#processors: +#- dissect: +# tokenizer: "%{key1} - %{key2}" +# field: "message" +# target_prefix: "dissect" +# # The following example enriches each event with metadata from the cloud # provider about the host machine. It works on EC2, GCE, DigitalOcean, # Tencent Cloud, and Alibaba Cloud.