Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka output plugin topic_suffix option #3196

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,34 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"

## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to separator + measurement's name
## tags - suffix equals to separator + specified tags' values
## interleaved with separator

## Suffix equals to "_" + measurement's name
# [outputs.kafka.topic_suffix]
# method = "measurement"
# separator = "_"

## Suffix equals to "__" + measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo"]
# separator = "__"

## Suffix equals to "_" + measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# separator = "_"

## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -57,10 +85,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
* `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]`
* `topic`: The `kafka` topic to publish to.


### Optional parameters:

* `routing_tag`: if this tag exists, its value will be used as the routing key
* `routing_tag`: If this tag exists, its value will be used as the routing key
* `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression
* `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster.
* `max_retry`: Max number of times to retry failed write
Expand All @@ -69,3 +96,5 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
* `topic_suffix`: Which, if any, method of calculating `kafka` topic suffix to use.
For examples, please refer to sample configuration.
164 changes: 122 additions & 42 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"crypto/tls"
"fmt"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -12,54 +13,97 @@ import (
"github.com/Shopify/sarama"
)

type Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// Skip SSL verification
InsecureSkipVerify bool

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

tlsConfig tls.Config
producer sarama.SyncProducer

serializer serializers.Serializer
var ValidTopicSuffixMethods = []string{
"",
"measurement",
"tags",
}

type (
Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Kafka topic suffix option
TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// Skip SSL verification
InsecureSkipVerify bool

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

tlsConfig tls.Config
producer sarama.SyncProducer

serializer serializers.Serializer
}
TopicSuffix struct {
Method string `toml:"method"`
Keys []string `toml:"keys"`
Separator string `toml:"separator"`
}
)

var sampleConfig = `
## URLs of kafka brokers
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"

## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to separator + measurement's name
## tags - suffix equals to separator + specified tags' values
## interleaved with separator

## Suffix equals to "_" + measurement name
# [outputs.kafka.topic_suffix]
# method = "measurement"
# separator = "_"

## Suffix equals to "__" + measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo"]
# separator = "__"

## Suffix equals to "_" + measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# separator = "_"

## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -108,11 +152,45 @@ var sampleConfig = `
data_format = "influx"
`

func ValidateTopicSuffixMethod(method string) error {
for _, validMethod := range ValidTopicSuffixMethods {
if method == validMethod {
return nil
}
}
return fmt.Errorf("Unkown topic suffix method provided: %s", method)
}

func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
var topicName string
switch k.TopicSuffix.Method {
case "measurement":
topicName = k.Topic + k.TopicSuffix.Separator + metric.Name()
case "tags":
var topicNameComponents []string
topicNameComponents = append(topicNameComponents, k.Topic)
for _, tag := range k.TopicSuffix.Keys {
tagValue := metric.Tags()[tag]
if tagValue != "" {
topicNameComponents = append(topicNameComponents, tagValue)
}
}
topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
default:
topicName = k.Topic
}
return topicName
}

func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}

func (k *Kafka) Connect() error {
err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
if err != nil {
return err
}
config := sarama.NewConfig()

config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
Expand Down Expand Up @@ -175,8 +253,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
return err
}

topicName := k.GetTopicName(metric)

m := &sarama.ProducerMessage{
Topic: k.Topic,
Topic: topicName,
Value: sarama.ByteEncoder(buf),
}
if h, ok := metric.Tags()[k.RoutingTag]; ok {
Expand Down
67 changes: 67 additions & 0 deletions plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/stretchr/testify/require"
)

type topicSuffixTestpair struct {
topicSuffix TopicSuffix
expectedTopic string
}

func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
Expand All @@ -28,4 +33,66 @@ func TestConnectAndWrite(t *testing.T) {
// Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockMetrics())
require.NoError(t, err)
k.Close()
}

func TestTopicSuffixes(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

topic := "Test"

metric := testutil.TestMetric(1)
metricTagName := "tag1"
metricTagValue := metric.Tags()[metricTagName]
metricName := metric.Name()

var testcases = []topicSuffixTestpair{
// This ensures empty separator is okay
{TopicSuffix{Method: "measurement"},
topic + metricName},
{TopicSuffix{Method: "measurement", Separator: "sep"},
topic + "sep" + metricName},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName}, Separator: "_"},
topic + "_" + metricTagValue},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, Separator: "___"},
topic + "___" + metricTagValue + "___" + metricTagValue + "___" + metricTagValue},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}},
topic + metricTagValue + metricTagValue + metricTagValue},
// This ensures non-existing tags are ignored
{TopicSuffix{Method: "tags", Keys: []string{"non_existing_tag", "non_existing_tag"}, Separator: "___"},
topic},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, "non_existing_tag"}, Separator: "___"},
topic + "___" + metricTagValue},
// This ensures backward compatibility
{TopicSuffix{},
topic},
}

for _, testcase := range testcases {
topicSuffix := testcase.topicSuffix
expectedTopic := testcase.expectedTopic
k := &Kafka{
Topic: topic,
TopicSuffix: topicSuffix,
}

topic := k.GetTopicName(metric)
require.Equal(t, expectedTopic, topic)
}
}

func TestValidateTopicSuffixMethod(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

err := ValidateTopicSuffixMethod("invalid_topic_suffix_method")
require.Error(t, err, "Topic suffix method used should be invalid.")

for _, method := range ValidTopicSuffixMethods {
err := ValidateTopicSuffixMethod(method)
require.NoError(t, err, "Topic suffix method used should be valid.")
}
}