Skip to content

Commit

Permalink
[Kafka output] removed regex validation to allow dynamic topic (elast…
Browse files Browse the repository at this point in the history
…ic#40415)

* removed regex validation to allow dynamic topic

* added changelog

* added back validation to only allowed one field name as dynamic topic

* updated changelog

* fix lint

* resolve dynamic topic for managed agent

* removed regex validation

* updated changelog

* Update CHANGELOG.next.asciidoc

Co-authored-by: Tiago Queiroz <[email protected]>

---------

Co-authored-by: Tiago Queiroz <[email protected]>
  • Loading branch information
juliaElastic and belimawr authored Aug 14, 2024
1 parent c4c402d commit 5efae2b
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415]

*Auditbeat*

Expand Down
10 changes: 0 additions & 10 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"math"
"math/rand"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -109,11 +108,6 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

// validTopicRegExp is used to validate the topic contains only valid characters
// when running under Elastic-Agent. The regexp is taken from:
// https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33
var validTopicRegExp = regexp.MustCompile("^[a-zA-Z0-9._-]+$")

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down Expand Up @@ -193,10 +187,6 @@ func (c *kafkaConfig) Validate() error {
if len(c.Topics) != 0 {
return errors.New("'topics' is not supported when running under Elastic-Agent")
}

if !validTopicRegExp.MatchString(c.Topic) {
return fmt.Errorf("topic '%s' is invalid, it must match '[a-zA-Z0-9._-]'", c.Topic)
}
}

return nil
Expand Down
19 changes: 2 additions & 17 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,10 @@ func TestConfigUnderElasticAgent(t *testing.T) {
expectError: true,
},
{
name: "topic cannot contain invalid characters",
name: "valid topic with dynamic topic selection",
cfg: mapstr.M{
"topic": "foo bar",
"topic": "%{[event.field]}",
},
expectError: true,
},
{
name: "topic with invalid characters",
cfg: mapstr.M{
"topic": "foo + bar",
},
expectError: true,
},
{
name: "topic with invalid characters from dynamic topic selection",
cfg: mapstr.M{
"topic": "%{event.field}",
},
expectError: true,
},

// The default config does not set `topic` not `topics`.
Expand Down
17 changes: 3 additions & 14 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/Shopify/sarama"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
Expand Down Expand Up @@ -91,21 +90,11 @@ func makeKafka(
// running under Elastic-Agent based on cfg.
//
// When running standalone the topic selector works as expected and documented.
// When running under Elastic-Agent, dynamic topic selection is not supported,
// so a constant selector using the `topic` value is returned.
// When running under Elastic-Agent, dynamic topic selection is also supported
func buildTopicSelector(cfg *config.C) (outil.Selector, error) {
topicCfg := struct {
Topic string `config:"topic" yaml:"topic"`
}{}

if err := cfg.Unpack(&topicCfg); err != nil {
return outil.Selector{}, fmt.Errorf("cannot unpack Kafka config to read the topic: %w", err)
}

if management.UnderAgent() {
exprSelector := outil.ConstSelectorExpr(topicCfg.Topic, outil.SelectorKeepCase)
selector := outil.MakeSelector(exprSelector)
return selector, nil
if cfg == nil {
return outil.Selector{}, fmt.Errorf("Kafka config cannot be nil")
}

return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestBuildTopicSelector(t *testing.T) {
{
name: "dynamic topic under agent",
topic: "%{[foo]}",
expected: "%{[foo]}",
expected: "bar",
underAgent: true,
},
{
Expand Down

0 comments on commit 5efae2b

Please sign in to comment.