Skip to content

Commit

Permalink
Adding Kafka Origin Topic tag.
Browse files Browse the repository at this point in the history
The Kafka consumer input plugin has already the ability to
read from multiple topics, however it does not retain the
topic which the data was read from. That data is very useful
to introduce some smart routing down the line.

The change adds the topic as a tag. The tag name is configurable.

Added some basic tests, and tested on multiple kafka clusters.
  • Loading branch information
Sebastien Le Digabel committed Nov 28, 2018
1 parent 581772a commit eae4c95
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
3 changes: 3 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and use the old zookeeper connection method.
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## Add topic as tag
# add_topic_tag = false
# topic_tag_name = "topic"

## Optional Client id
# client_id = "Telegraf"
Expand Down
10 changes: 9 additions & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Kafka struct {
Offset string `toml:"offset"`
SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"`
TopicTagName string `toml:"topic_tag_name"`

tls.ClientConfig

cluster Consumer
Expand All @@ -60,6 +62,8 @@ var sampleConfig = `
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## Add topic as tag if topic_tag_name is not empty
# topic_tag_name = ""
## Optional Client id
# client_id = "Telegraf"
Expand Down Expand Up @@ -256,7 +260,11 @@ func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.Consumer
if err != nil {
return err
}

if len(k.TopicTagName) > 0 {
for _, metric := range metrics {
metric.AddTag(k.TopicTagName, msg.Topic)
}
}
id := acc.AddTrackingMetricGroup(metrics)
k.messages[id] = msg

Expand Down
45 changes: 45 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ func newTestKafka() (*Kafka, *TestConsumer) {
return &k, consumer
}

func newTestKafkaWithTopicTag() (*Kafka, *TestConsumer) {
consumer := &TestConsumer{
errors: make(chan error),
messages: make(chan *sarama.ConsumerMessage, 1000),
}
k := Kafka{
cluster: consumer,
ConsumerGroup: "test",
Topics: []string{"telegraf"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
doNotCommitMsgs: true,
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
TopicTagName: "topic",
}
return &k, consumer
}

// Test that the parser parses kafka messages into points
func TestRunParser(t *testing.T) {
k, consumer := newTestKafka()
Expand All @@ -75,6 +94,22 @@ func TestRunParser(t *testing.T) {
assert.Equal(t, acc.NFields(), 1)
}

// Test that the parser parses kafka messages into points
// and adds the topic tag
func TestRunParserWithTopic(t *testing.T) {
k, consumer := newTestKafkaWithTopicTag()
acc := testutil.Accumulator{}
ctx := context.Background()

k.parser, _ = parsers.NewInfluxParser()
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsgWithTopic(testMsg, "test_topic"))
acc.Wait(1)

assert.Equal(t, acc.NFields(), 1)
assert.True(t, acc.HasTag("cpu_load_short", "topic"))
}

// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
k, consumer := newTestKafka()
Expand Down Expand Up @@ -173,3 +208,13 @@ func saramaMsg(val string) *sarama.ConsumerMessage {
Partition: 0,
}
}

func saramaMsgWithTopic(val string, topic string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,
Value: []byte(val),
Offset: 0,
Partition: 0,
Topic: topic,
}
}

0 comments on commit eae4c95

Please sign in to comment.