Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: Add support for using TLS authentication for the kafka output #541

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package kafka

import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"

"github.com/Shopify/sarama"
"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/plugins/outputs"
"io/ioutil"
)

type Kafka struct {
Expand All @@ -16,8 +18,17 @@ type Kafka struct {
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
// Verfiy SSL certificate chain
VerifySsl bool

producer sarama.SyncProducer
tlsConfig tls.Config
producer sarama.SyncProducer
}

var sampleConfig = `
Expand All @@ -28,10 +39,58 @@ var sampleConfig = `
# Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
# Client certificate
certificate = ""
# Client key
key = ""
# Certificate authority file
ca = ""
# Verify SSL certificate chain
verify_ssl = false
`

func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) {
if k.Certificate != "" && k.Key != "" && k.CA != "" {
cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key)
if err != nil {
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s",
err))
}

caCert, err := ioutil.ReadFile(k.CA)
if err != nil {
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s",
err))
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: k.VerifySsl,
}
}
// will be nil by default if nothing is provided
return t, nil
}

func (k *Kafka) Connect() error {
producer, err := sarama.NewSyncProducer(k.Brokers, nil)
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
tlsConfig, err := createTlsConfiguration(k)
if err != nil {
return err
}

if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

producer, err := sarama.NewSyncProducer(k.Brokers, config)
if err != nil {
return err
}
Expand Down