From cb7fe4c47894d65b49a1ef33f011eb9b672c1838 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 1 Jul 2020 15:03:59 +0200 Subject: [PATCH] Add support for Thread ID in Filebeat Kafka module (#19463) Closes #18164 --- CHANGELOG.asciidoc | 1 + filebeat/docs/fields.asciidoc | 10 +++++ filebeat/docs/modules/kafka.asciidoc | 2 + filebeat/module/kafka/_meta/docs.asciidoc | 2 + filebeat/module/kafka/fields.go | 2 +- filebeat/module/kafka/log/_meta/fields.yml | 4 ++ filebeat/module/kafka/log/ingest/pipeline.yml | 1 + .../kafka/log/test/state-change-2.2.2.log | 23 +++++++++++ .../test/state-change-2.2.2.log-expected.json | 39 +++++++++++++++++++ 9 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 filebeat/module/kafka/log/test/state-change-2.2.2.log create mode 100644 filebeat/module/kafka/log/test/state-change-2.2.2.log-expected.json diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d2274bb0080c..d1ab5687dc9c 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -156,6 +156,7 @@ processing events. (CVE-2019-17596) See https://www.elastic.co/community/securit - Add Kibana Dashboard for MISP module. {pull}14147[14147] - Add support for gzipped files in S3 input {pull}13980[13980] - Add Filebeat Azure Dashboards {pull}14127[14127] +- Add support for thread ID in Filebeat Kafka module. {pull}19463[19463] *Heartbeat* diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index a695efaab3e9..8562e3da0e2e 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -24755,6 +24755,16 @@ type: keyword Java class the log is coming from. +type: keyword + +-- + +*`kafka.log.thread`*:: ++ +-- +Thread name the log is coming from. + + type: keyword -- diff --git a/filebeat/docs/modules/kafka.asciidoc b/filebeat/docs/modules/kafka.asciidoc index d9319b43b508..5e2145a348e2 100644 --- a/filebeat/docs/modules/kafka.asciidoc +++ b/filebeat/docs/modules/kafka.asciidoc @@ -11,6 +11,8 @@ This file is generated! See scripts/docs_collector.py The +{modulename}+ module collects and parses the logs created by https://kafka.apache.org/[Kafka]. +The module has additional support for parsing thread ID from logs. + include::../include/what-happens.asciidoc[] include::../include/gs-link.asciidoc[] diff --git a/filebeat/module/kafka/_meta/docs.asciidoc b/filebeat/module/kafka/_meta/docs.asciidoc index 4e199f98b4be..4940b08d54e4 100644 --- a/filebeat/module/kafka/_meta/docs.asciidoc +++ b/filebeat/module/kafka/_meta/docs.asciidoc @@ -6,6 +6,8 @@ The +{modulename}+ module collects and parses the logs created by https://kafka.apache.org/[Kafka]. +The module has additional support for parsing thread ID from logs. + include::../include/what-happens.asciidoc[] include::../include/gs-link.asciidoc[] diff --git a/filebeat/module/kafka/fields.go b/filebeat/module/kafka/fields.go index 86950968bf0b..6d0aadfd8673 100644 --- a/filebeat/module/kafka/fields.go +++ b/filebeat/module/kafka/fields.go @@ -32,5 +32,5 @@ func init() { // AssetKafka returns asset data. // This is the base64 encoded gzipped contents of module/kafka. func AssetKafka() string { - return "eJykkjtuwzAQRHudYuDeOoCKNOkSpMsFFtZKJsQfyLUT3z6gGP8Y2paR7cQRZ55Gu8bEhw4TDRM1gCjR3GH1np5XDdBz3ATlRTnb4aUBgFmDcf1OcwMMinUfu1law5Lhs10aOXjuMAa3878nFc9rm0sr7cbTWc3spmGeDKvdCK0sx/ZCLBOvUnnP+ko5ZpNWFAvFk2xn0rZ2z6gxUGaTsONqnuEYaeQnE+u3buVVgzfOeGfZSjV64sOXC32h3ak7zevRErLluXoVU46yI4bgTFsH0RTLr/wHxBvtKXs+RSGBNvW/UO7cAgbgM9lB2RNDWsK2eLG2h4+aedTOIro/LUnGvdMTHu7smUz4u9yqhVjAR/aGpyBww5mubX4CAAD//40BKYM=" + return "eJykkztywyAQhnud4h/31gFUpEmXTDpfYMdayYwQMLB24ttngPhF8GtMZ9Z8/6eFXWLifYeJhokaQJRo7rD4jL8XDdBzWHvlRFnT4a0BgFTDbPut5gYYFOs+dKm0hKGZT7i4ZO+4w+jt1v3tVJiXmHOUtuNxrwa7Cswry2o7QivDoT0rlokXqbxjfVE5ZJNWFIqKI9kk07Z2blajp+wmfsvVvJlDoJGfTKyfupZXDV7b2VnDRqrRE++/re+L2o12x/V+QEI2nFqvQsxRZsTg7dzWRTSF8itfkPigHWXmUxay8Uxl1Asaq8RL8Oc8PK3rr6F8+w9IAKuIgzJHhzgMbfHH2jzgzg3da89Ddv9uS7LujT7h7uyczIR/ytf9oBbwldlw5AV2ONm1zW8AAAD//95/TD8=" } diff --git a/filebeat/module/kafka/log/_meta/fields.yml b/filebeat/module/kafka/log/_meta/fields.yml index c06f44d0bf49..32b9008f7483 100644 --- a/filebeat/module/kafka/log/_meta/fields.yml +++ b/filebeat/module/kafka/log/_meta/fields.yml @@ -20,6 +20,10 @@ type: keyword description: > Java class the log is coming from. + - name: thread + type: keyword + description: > + Thread name the log is coming from. - name: trace type: group description: > diff --git a/filebeat/module/kafka/log/ingest/pipeline.yml b/filebeat/module/kafka/log/ingest/pipeline.yml index 41db8f4f1973..a10724891225 100644 --- a/filebeat/module/kafka/log/ingest/pipeline.yml +++ b/filebeat/module/kafka/log/ingest/pipeline.yml @@ -6,6 +6,7 @@ processors: patterns: - (?m)%{TIMESTAMP_ISO8601:kafka.log.timestamp}. %{LOGLEVEL:log.level} +%{JAVALOGMESSAGE:message} \(%{JAVACLASS:kafka.log.class}\)$[ \n]*(?'kafka.log.trace.full'.*) + - (?m)\[%{TIMESTAMP_ISO8601:kafka.log.timestamp}\] \[%{LOGLEVEL:log.level} ?\] \[%{NOTSPACE:kafka.log.thread}\] \[%{NOTSPACE:kafka.log.class}\] \- %{GREEDYDATA:message} - grok: field: message pattern_definitions: diff --git a/filebeat/module/kafka/log/test/state-change-2.2.2.log b/filebeat/module/kafka/log/test/state-change-2.2.2.log new file mode 100644 index 000000000000..69d64c90d22a --- /dev/null +++ b/filebeat/module/kafka/log/test/state-change-2.2.2.log @@ -0,0 +1,23 @@ +[2020-04-23 00:28:21,796] [WARN ] [kafka-request-handler-7] [state.change.logger] - [Broker id=1] Ignoring LeaderAndIsr request from controller 1 with correlation id 1 epoch 27 for partition xxx.xxx.inventory.test.1-0-2 since its associated leader epoch 5 is not higher than the current leader epoch 5 +[2020-01-20 01:32:00,705] [ERROR] [controller-event-thread] [state.change.logger] - [Controller id=1 epoch=25] Controller 1 epoch 25 failed to change state for partition xxx.xxx.na.och.aud.1-0-2 from OfflinePartition to OnlinePartition +kafka.common.StateChangeFailedException: Failed to elect leader for partition xxx.xxx.na.och.aud.1-0-2 under strategy OfflinePartitionLeaderElectionStrategy + at kafka.controller.PartitionStateMachine.$anonfun$doElectLeaderForPartitions$9(PartitionStateMachine.scala:390) + at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) + at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) + at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) + at kafka.controller.PartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:388) + at kafka.controller.PartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:315) + at kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:225) + at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:141) + at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:123) + at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:109) + at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:66) + at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:266) + at kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1271) + at kafka.controller.KafkaController$Startup$.process(KafkaController.scala:1184) + at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:94) + at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) + at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) + at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:94) + at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) + diff --git a/filebeat/module/kafka/log/test/state-change-2.2.2.log-expected.json b/filebeat/module/kafka/log/test/state-change-2.2.2.log-expected.json new file mode 100644 index 000000000000..a8ad3645ec70 --- /dev/null +++ b/filebeat/module/kafka/log/test/state-change-2.2.2.log-expected.json @@ -0,0 +1,39 @@ +[ + { + "@timestamp": "2020-04-23T00:28:21.796-02:00", + "event.dataset": "kafka.log", + "event.kind": "event", + "event.module": "kafka", + "event.timezone": "-02:00", + "event.type": "info", + "fileset.name": "log", + "input.type": "log", + "kafka.log.class": "state.change.logger", + "kafka.log.component": "Broker id=1", + "kafka.log.thread": "kafka-request-handler-7", + "log.level": "WARN", + "log.offset": 0, + "message": "Ignoring LeaderAndIsr request from controller 1 with correlation id 1 epoch 27 for partition xxx.xxx.inventory.test.1-0-2 since its associated leader epoch 5 is not higher than the current leader epoch 5 ", + "service.type": "kafka" + }, + { + "@timestamp": "2020-01-20T01:32:00.705-02:00", + "event.dataset": "kafka.log", + "event.kind": "event", + "event.module": "kafka", + "event.timezone": "-02:00", + "event.type": "error", + "fileset.name": "log", + "input.type": "log", + "kafka.log.class": "state.change.logger", + "kafka.log.component": "Controller id=1 epoch=25", + "kafka.log.thread": "controller-event-thread", + "log.flags": [ + "multiline" + ], + "log.level": "ERROR", + "log.offset": 303, + "message": "Controller 1 epoch 25 failed to change state for partition xxx.xxx.na.och.aud.1-0-2 from OfflinePartition to OnlinePartition", + "service.type": "kafka" + } +] \ No newline at end of file