Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka output plugin topic_suffix option #3196

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,33 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"

## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to measurement's name
## tag - suffix equals to specified tag's value
## tags - suffix equals to specified tags' values
## interleaved with key_separator

## Suffix equals to measurement name to topic
# [outputs.kafka.topic_suffix]
# method = "measurement"

## Suffix equals to measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tag"
# key = "foo"

## Suffix equals to measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# key_separator = "_"

## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -57,10 +84,9 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
* `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]`
* `topic`: The `kafka` topic to publish to.


### Optional parameters:

* `routing_tag`: if this tag exists, its value will be used as the routing key
* `routing_tag`: If this tag exists, its value will be used as the routing key
* `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression
* `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster.
* `max_retry`: Max number of times to retry failed write
Expand All @@ -69,3 +95,5 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
* `topic_suffix`: Which, if any, method of calculating `kafka` topic suffix to use.
For examples, please refer to sample configuration.
173 changes: 131 additions & 42 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"crypto/tls"
"fmt"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -12,54 +13,107 @@ import (
"github.com/Shopify/sarama"
)

type Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// Skip SSL verification
InsecureSkipVerify bool

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

tlsConfig tls.Config
producer sarama.SyncProducer

serializer serializers.Serializer
const (
TOPIC_SUFFIX_METHOD_EMPTY uint8 = iota
TOPIC_SUFFIX_METHOD_MEASUREMENT
TOPIC_SUFFIX_METHOD_TAG
TOPIC_SUFFIX_METHOD_TAGS
)

var TopicSuffixMethodStringToUID = map[string]uint8{
"": TOPIC_SUFFIX_METHOD_EMPTY,
"measurement": TOPIC_SUFFIX_METHOD_MEASUREMENT,
"tag": TOPIC_SUFFIX_METHOD_TAG,
"tags": TOPIC_SUFFIX_METHOD_TAGS,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the map/enums, they don't really buy us anything from a type safety point of view. You could just have a list of valid strings for validating the config file.

Copy link
Contributor Author

@trueneu trueneu Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably right about having separator between topic and topic_suffix, I think that follows the principle of least astonishment, good catch.
Renamed key_separator to just separator also.
I'm not sure about maps/enums though, because they kinda make the code less error-prone, it's easier to make a typo in a string than in a constant name. We have testcases covering that though. (And I'm not familiar with commonly accepted practices in Go).
Pushed! Any other thoughts?


type (
Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Kafka topic suffix option
TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`

// Skip SSL verification
InsecureSkipVerify bool

// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`

tlsConfig tls.Config
producer sarama.SyncProducer

serializer serializers.Serializer

topicSuffixMethodUID uint8
}
TopicSuffix struct {
Method string `toml:"method"`
Key string `toml:"key"`
Keys []string `toml:"keys"`
KeySeparator string `toml:"key_separator"`
}
)

var sampleConfig = `
## URLs of kafka brokers
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"

## Optional topic suffix configuration.
## If the section is omitted, no suffix is used.
## Following topic suffix methods are supported:
## measurement - suffix equals to measurement's name
## tag - suffix equals to specified tag's value
## tags - suffix equals to specified tags' values
## interleaved with key_separator

## Suffix equals to measurement name to topic
# [outputs.kafka.topic_suffix]
# method = "measurement"

## Suffix equals to measurement's "foo" tag value.
## If there's no such a tag, suffix equals to an empty string
# [outputs.kafka.topic_suffix]
# method = "tag"
# key = "foo"

## Suffix equals to measurement's "foo" and "bar"
## tag values, separated by "_". If there is no such tags,
## their values treated as empty strings.
# [outputs.kafka.topic_suffix]
# method = "tags"
# keys = ["foo", "bar"]
# key_separator = "_"

## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
Expand Down Expand Up @@ -108,11 +162,44 @@ var sampleConfig = `
data_format = "influx"
`

func GetTopicSuffixMethodUID(method string) (uint8, error) {
methodUID, ok := TopicSuffixMethodStringToUID[method]
if !ok {
return 0, fmt.Errorf("Unkown topic suffix method provided: %s", method)
}
return methodUID, nil
}

func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
var topicName string
switch k.topicSuffixMethodUID {
case TOPIC_SUFFIX_METHOD_MEASUREMENT:
topicName = k.Topic + metric.Name()
case TOPIC_SUFFIX_METHOD_TAG:
topicName = k.Topic + metric.Tags()[k.TopicSuffix.Key]
case TOPIC_SUFFIX_METHOD_TAGS:
var tags_values []string
for _, tag := range k.TopicSuffix.Keys {
tags_values = append(tags_values, metric.Tags()[tag])
}
topicName = k.Topic + strings.Join(tags_values, k.TopicSuffix.KeySeparator)
default:
topicName = k.Topic
}
return topicName
}

func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}

func (k *Kafka) Connect() error {
topicSuffixMethod, err := GetTopicSuffixMethodUID(k.TopicSuffix.Method)
if err != nil {
return err
}
k.topicSuffixMethodUID = topicSuffixMethod

config := sarama.NewConfig()

config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
Expand Down Expand Up @@ -175,8 +262,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
return err
}

topicName := k.GetTopicName(metric)

m := &sarama.ProducerMessage{
Topic: k.Topic,
Topic: topicName,
Value: sarama.ByteEncoder(buf),
}
if h, ok := metric.Tags()[k.RoutingTag]; ok {
Expand Down
52 changes: 52 additions & 0 deletions plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/stretchr/testify/require"
)

type topicSuffixTestpair struct {
topicSuffix TopicSuffix
expectedTopic string
}

func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
Expand All @@ -28,4 +33,51 @@ func TestConnectAndWrite(t *testing.T) {
// Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockMetrics())
require.NoError(t, err)
k.Close()
}

func TestTopicSuffixes(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

brokers := []string{testutil.GetLocalHost() + ":9092"}
s, _ := serializers.NewInfluxSerializer()

topic := "Test_"

metric := testutil.TestMetric(1)
metricTagName := "tag1"
metricTagValue := metric.Tags()[metricTagName]
metricName := metric.Name()

var testcases = []topicSuffixTestpair{
{TopicSuffix{Method: "measurement"},
topic + metricName},
{TopicSuffix{Method: "tag", Key: metricTagName},
topic + metricTagValue},
{TopicSuffix{Method: "tags", Keys: []string{metricTagName, metricTagName, metricTagName}, KeySeparator: "___"},
topic + metricTagValue + "___" + metricTagValue + "___" + metricTagValue},
// This ensures backward compatibility
{TopicSuffix{},
topic},
}

for _, testcase := range testcases {
topicSuffix := testcase.topicSuffix
expectedTopic := testcase.expectedTopic
k := &Kafka{
Brokers: brokers,
Topic: topic,
serializer: s,
TopicSuffix: topicSuffix,
}

err := k.Connect()
require.NoError(t, err)

topic := k.GetTopicName(metric)
require.Equal(t, expectedTopic, topic)
k.Close()
}
}