Skip to content

Commit

Permalink
kafka: Add support for using TLS authentication for the kafka output
Browse files Browse the repository at this point in the history
With the advent of Kafka 0.9.0+ it is possible to set up TLS client
certificate based authentication to limit access to Kafka.

Four new configuration variables are specified for setting up the
authentication. If they're not set the behavior stays the same as
before the change.
  • Loading branch information
Ormod committed Jan 17, 2016
1 parent a712036 commit 0562ec1
Showing 1 changed file with 62 additions and 3 deletions.
65 changes: 62 additions & 3 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package kafka

import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"

"github.com/Shopify/sarama"
"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/plugins/outputs"
"io/ioutil"
)

type Kafka struct {
Expand All @@ -16,8 +18,17 @@ type Kafka struct {
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
// Verfiy SSL certificate chain
VerifySsl bool

producer sarama.SyncProducer
tlsConfig tls.Config
producer sarama.SyncProducer
}

var sampleConfig = `
Expand All @@ -28,10 +39,58 @@ var sampleConfig = `
# Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
# Client certificate
certificate = ""
# Client key
key = ""
# Certificate authority file
ca = ""
# Verify SSL certificate chain
verify_ssl = false
`

func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) {
if k.Certificate != "" && k.Key != "" && k.CA != "" {
cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key)
if err != nil {
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s",
err))
}

caCert, err := ioutil.ReadFile(k.CA)
if err != nil {
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s",
err))
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: k.VerifySsl,
}
}
// will be nil by default if nothing is provided
return t, nil
}

func (k *Kafka) Connect() error {
producer, err := sarama.NewSyncProducer(k.Brokers, nil)
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
tlsConfig, err := createTlsConfiguration(k)
if err != nil {
return err
}

if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

producer, err := sarama.NewSyncProducer(k.Brokers, config)
if err != nil {
return err
}
Expand Down

0 comments on commit 0562ec1

Please sign in to comment.