Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kafka output] removed regex validation to allow dynamic topic #40415

Merged
merged 11 commits into from
Aug 14, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415]

*Auditbeat*

Expand Down
10 changes: 0 additions & 10 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"math"
"math/rand"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -109,11 +108,6 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

// validTopicRegExp is used to validate the topic contains only valid characters
// when running under Elastic-Agent. The regexp is taken from:
// https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33
var validTopicRegExp = regexp.MustCompile("^[a-zA-Z0-9._-]+$")

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down Expand Up @@ -193,10 +187,6 @@ func (c *kafkaConfig) Validate() error {
if len(c.Topics) != 0 {
return errors.New("'topics' is not supported when running under Elastic-Agent")
}

if !validTopicRegExp.MatchString(c.Topic) {
return fmt.Errorf("topic '%s' is invalid, it must match '[a-zA-Z0-9._-]'", c.Topic)
}
}

return nil
Expand Down
19 changes: 2 additions & 17 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,10 @@ func TestConfigUnderElasticAgent(t *testing.T) {
expectError: true,
},
{
name: "topic cannot contain invalid characters",
name: "valid topic with dynamic topic selection",
cfg: mapstr.M{
"topic": "foo bar",
"topic": "%{[event.field]}",
},
expectError: true,
},
{
name: "topic with invalid characters",
cfg: mapstr.M{
"topic": "foo + bar",
},
expectError: true,
},
{
name: "topic with invalid characters from dynamic topic selection",
cfg: mapstr.M{
"topic": "%{event.field}",
},
expectError: true,
},

// The default config does not set `topic` not `topics`.
Expand Down
17 changes: 3 additions & 14 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/Shopify/sarama"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
Expand Down Expand Up @@ -91,21 +90,11 @@ func makeKafka(
// running under Elastic-Agent based on cfg.
//
// When running standalone the topic selector works as expected and documented.
// When running under Elastic-Agent, dynamic topic selection is not supported,
// so a constant selector using the `topic` value is returned.
// When running under Elastic-Agent, dynamic topic selection is also supported
func buildTopicSelector(cfg *config.C) (outil.Selector, error) {
topicCfg := struct {
Topic string `config:"topic" yaml:"topic"`
}{}

if err := cfg.Unpack(&topicCfg); err != nil {
return outil.Selector{}, fmt.Errorf("cannot unpack Kafka config to read the topic: %w", err)
}

if management.UnderAgent() {
exprSelector := outil.ConstSelectorExpr(topicCfg.Topic, outil.SelectorKeepCase)
selector := outil.MakeSelector(exprSelector)
return selector, nil
if cfg == nil {
return outil.Selector{}, fmt.Errorf("Kafka config cannot be nil")
}

return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestBuildTopicSelector(t *testing.T) {
{
name: "dynamic topic under agent",
topic: "%{[foo]}",
expected: "%{[foo]}",
expected: "bar",
underAgent: true,
},
{
Expand Down
Loading