diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index bc0d225c63c63..07c87199f3d28 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -14,10 +14,11 @@ import ( ) type Kafka struct { - ConsumerGroup string - Topics []string - ZookeeperPeers []string - Consumer *consumergroup.ConsumerGroup + ConsumerGroup string + Topics []string + ZookeeperPeers []string + ZookeeperChroot string + Consumer *consumergroup.ConsumerGroup // Legacy metric buffer support MetricBuffer int @@ -48,6 +49,8 @@ var sampleConfig = ` topics = ["telegraf"] ## an array of Zookeeper connection strings zookeeper_peers = ["localhost:2181"] + ## Zookeeper Chroot + zookeeper_chroot = "/" ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") @@ -80,6 +83,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { k.acc = acc config := consumergroup.NewConfig() + config.Zookeeper.Chroot = k.ZookeeperChroot switch strings.ToLower(k.Offset) { case "oldest", "": config.Offsets.Initial = sarama.OffsetOldest