From 6818032342de9e756e644b6477c993df2f353e16 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 1 Aug 2024 14:58:50 +0200 Subject: [PATCH 1/9] removed regex validation to allow dynamic topic --- libbeat/outputs/kafka/config.go | 10 ---------- libbeat/outputs/kafka/config_test.go | 15 --------------- 2 files changed, 25 deletions(-) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 3e2c836a06f7..19055e0b3175 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -22,7 +22,6 @@ import ( "fmt" "math" "math/rand" - "regexp" "strings" "time" @@ -109,11 +108,6 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } -// validTopicRegExp is used to validate the topic contains only valid characters -// when running under Elastic-Agent. The regexp is taken from: -// https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33 -var validTopicRegExp = regexp.MustCompile("^[a-zA-Z0-9._-]+$") - func defaultConfig() kafkaConfig { return kafkaConfig{ Hosts: nil, @@ -193,10 +187,6 @@ func (c *kafkaConfig) Validate() error { if len(c.Topics) != 0 { return errors.New("'topics' is not supported when running under Elastic-Agent") } - - if !validTopicRegExp.MatchString(c.Topic) { - return fmt.Errorf("topic '%s' is invalid, it must match '[a-zA-Z0-9._-]'", c.Topic) - } } return nil diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index 2435b274f6e9..78307e08d2cd 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -141,26 +141,11 @@ func TestConfigUnderElasticAgent(t *testing.T) { }, expectError: true, }, - { - name: "topic cannot contain invalid characters", - cfg: mapstr.M{ - "topic": "foo bar", - }, - expectError: true, - }, - { - name: "topic with invalid characters", - cfg: mapstr.M{ - "topic": "foo + bar", - }, - expectError: true, - }, { name: "topic with invalid characters from dynamic topic selection", cfg: mapstr.M{ "topic": "%{event.field}", }, - expectError: true, }, // The default config does not set `topic` not `topics`. From bc5da6e14b7db545c3390426e20e8406c5cdc735 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 1 Aug 2024 15:02:48 +0200 Subject: [PATCH 2/9] added changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8f74257434fd..351b4fc30e5d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -220,6 +220,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add persistent volume claim name to volume if available {pull}38839[38839] - Raw events are now logged to a different file, this prevents potentially sensitive information from leaking into log files {pull}38767[38767] - Websocket input: Added runtime URL modification support based on state and cursor values {issue}39858[39858] {pull}39997[39997] +- Kafka output allows dynamic topic in `topic` field {pull}40415[40415] *Auditbeat* From 749a763728ca2d175bd176c53d228576cf0e47e2 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 2 Aug 2024 09:52:33 +0200 Subject: [PATCH 3/9] added back validation to only allowed one field name as dynamic topic --- libbeat/outputs/kafka/config.go | 12 ++++++++++++ libbeat/outputs/kafka/config_test.go | 11 +++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 19055e0b3175..3d9405bc1098 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "math/rand" + "regexp" "strings" "time" @@ -108,6 +109,12 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } +// validTopicRegExp is used to validate the topic contains only valid characters +// when running under Elastic-Agent. The regexp is taken from: +// https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33 +// also allowing a format string that contains a field name %{[FIELD]} +var validTopicRegExp = regexp.MustCompile("^(?:%\\{\\[)?[a-zA-Z0-9._-]+(?:\\]\\})?$") + func defaultConfig() kafkaConfig { return kafkaConfig{ Hosts: nil, @@ -187,6 +194,11 @@ func (c *kafkaConfig) Validate() error { if len(c.Topics) != 0 { return errors.New("'topics' is not supported when running under Elastic-Agent") } + + if !validTopicRegExp.MatchString(c.Topic) { + const regex = "(%{[)?[a-zA-Z0-9._-]+(]})?" + return fmt.Errorf("topic '%s' is invalid, it must match '%s'", c.Topic, regex) + } } return nil diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index 78307e08d2cd..166d2754f784 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -142,11 +142,18 @@ func TestConfigUnderElasticAgent(t *testing.T) { expectError: true, }, { - name: "topic with invalid characters from dynamic topic selection", + name: "valid topic with dynamic topic selection", cfg: mapstr.M{ - "topic": "%{event.field}", + "topic": "%{[event.field]}", }, }, + { + name: "invalid topic with prefix before field name", + cfg: mapstr.M{ + "topic": "test-%{[event.field]}", + }, + expectError: true, + }, // The default config does not set `topic` not `topics`. { From 7ffaacba136b711e3812863ba01a1b6e5a1ad74f Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 2 Aug 2024 09:53:46 +0200 Subject: [PATCH 4/9] updated changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 351b4fc30e5d..0b7038336bdb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -220,7 +220,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add persistent volume claim name to volume if available {pull}38839[38839] - Raw events are now logged to a different file, this prevents potentially sensitive information from leaking into log files {pull}38767[38767] - Websocket input: Added runtime URL modification support based on state and cursor values {issue}39858[39858] {pull}39997[39997] -- Kafka output allows dynamic topic in `topic` field {pull}40415[40415] +- Kafka output allows one field name as dynamic topic in `topic` field {pull}40415[40415] *Auditbeat* From 22cc4516d35358032ad7f9b1dd751791aec95b31 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 2 Aug 2024 10:10:15 +0200 Subject: [PATCH 5/9] fix lint --- libbeat/outputs/kafka/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 3d9405bc1098..f43333f63b9b 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -113,7 +113,7 @@ var compressionModes = map[string]sarama.CompressionCodec{ // when running under Elastic-Agent. The regexp is taken from: // https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33 // also allowing a format string that contains a field name %{[FIELD]} -var validTopicRegExp = regexp.MustCompile("^(?:%\\{\\[)?[a-zA-Z0-9._-]+(?:\\]\\})?$") +var validTopicRegExp = regexp.MustCompile(`^(?:%\{\[)?[a-zA-Z0-9._-]+(?:\]\})?$`) func defaultConfig() kafkaConfig { return kafkaConfig{ From c9ec2534ce62e3198b8bd93b8275b4b99f0de2d2 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 2 Aug 2024 12:28:47 +0200 Subject: [PATCH 6/9] resolve dynamic topic for managed agent --- libbeat/outputs/kafka/kafka.go | 17 +++-------------- libbeat/outputs/kafka/kafka_test.go | 2 +- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index cb23823a95a3..e8c0d75aa4d6 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -23,7 +23,6 @@ import ( "github.com/Shopify/sarama" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/outputs/outil" @@ -91,21 +90,11 @@ func makeKafka( // running under Elastic-Agent based on cfg. // // When running standalone the topic selector works as expected and documented. -// When running under Elastic-Agent, dynamic topic selection is not supported, -// so a constant selector using the `topic` value is returned. +// When running under Elastic-Agent, dynamic topic selection is also supported func buildTopicSelector(cfg *config.C) (outil.Selector, error) { - topicCfg := struct { - Topic string `config:"topic" yaml:"topic"` - }{} - if err := cfg.Unpack(&topicCfg); err != nil { - return outil.Selector{}, fmt.Errorf("cannot unpack Kafka config to read the topic: %w", err) - } - - if management.UnderAgent() { - exprSelector := outil.ConstSelectorExpr(topicCfg.Topic, outil.SelectorKeepCase) - selector := outil.MakeSelector(exprSelector) - return selector, nil + if cfg == nil { + return outil.Selector{}, fmt.Errorf("Kafka config cannot be nil") } return outil.BuildSelectorFromConfig(cfg, outil.Settings{ diff --git a/libbeat/outputs/kafka/kafka_test.go b/libbeat/outputs/kafka/kafka_test.go index 0717f8a0d51b..9430235e4584 100644 --- a/libbeat/outputs/kafka/kafka_test.go +++ b/libbeat/outputs/kafka/kafka_test.go @@ -42,7 +42,7 @@ func TestBuildTopicSelector(t *testing.T) { { name: "dynamic topic under agent", topic: "%{[foo]}", - expected: "%{[foo]}", + expected: "bar", underAgent: true, }, { From 734942501fbc8d23a64a626ef696e8b1e5e64a95 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 7 Aug 2024 13:19:28 +0200 Subject: [PATCH 7/9] removed regex validation --- libbeat/outputs/kafka/config.go | 12 ------------ libbeat/outputs/kafka/config_test.go | 7 ------- 2 files changed, 19 deletions(-) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index f43333f63b9b..19055e0b3175 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -22,7 +22,6 @@ import ( "fmt" "math" "math/rand" - "regexp" "strings" "time" @@ -109,12 +108,6 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } -// validTopicRegExp is used to validate the topic contains only valid characters -// when running under Elastic-Agent. The regexp is taken from: -// https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33 -// also allowing a format string that contains a field name %{[FIELD]} -var validTopicRegExp = regexp.MustCompile(`^(?:%\{\[)?[a-zA-Z0-9._-]+(?:\]\})?$`) - func defaultConfig() kafkaConfig { return kafkaConfig{ Hosts: nil, @@ -194,11 +187,6 @@ func (c *kafkaConfig) Validate() error { if len(c.Topics) != 0 { return errors.New("'topics' is not supported when running under Elastic-Agent") } - - if !validTopicRegExp.MatchString(c.Topic) { - const regex = "(%{[)?[a-zA-Z0-9._-]+(]})?" - return fmt.Errorf("topic '%s' is invalid, it must match '%s'", c.Topic, regex) - } } return nil diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index 166d2754f784..6d87a5ddc1ae 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -147,13 +147,6 @@ func TestConfigUnderElasticAgent(t *testing.T) { "topic": "%{[event.field]}", }, }, - { - name: "invalid topic with prefix before field name", - cfg: mapstr.M{ - "topic": "test-%{[event.field]}", - }, - expectError: true, - }, // The default config does not set `topic` not `topics`. { From 9aacb0801578b0dd29fe2d7d972c98dcee36250c Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 7 Aug 2024 13:38:34 +0200 Subject: [PATCH 8/9] updated changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0b7038336bdb..351b4fc30e5d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -220,7 +220,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add persistent volume claim name to volume if available {pull}38839[38839] - Raw events are now logged to a different file, this prevents potentially sensitive information from leaking into log files {pull}38767[38767] - Websocket input: Added runtime URL modification support based on state and cursor values {issue}39858[39858] {pull}39997[39997] -- Kafka output allows one field name as dynamic topic in `topic` field {pull}40415[40415] +- Kafka output allows dynamic topic in `topic` field {pull}40415[40415] *Auditbeat* From d436dd78b236c4231cb9e1b9db86ab12e323debd Mon Sep 17 00:00:00 2001 From: Julia Bardi <90178898+juliaElastic@users.noreply.github.com> Date: Fri, 9 Aug 2024 15:36:12 +0200 Subject: [PATCH 9/9] Update CHANGELOG.next.asciidoc Co-authored-by: Tiago Queiroz --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 262a19365063..7a25afaf4c03 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -191,7 +191,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843] - Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572] - The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669] -- Kafka output allows dynamic topic in `topic` field {pull}40415[40415] +- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415] *Auditbeat*