From 9e49963bf33c4afad73ab730c2fe9f4951b69afb Mon Sep 17 00:00:00 2001 From: Alexander Olekhnovich Date: Tue, 29 Sep 2020 16:02:41 +0200 Subject: [PATCH] kafka: add socks5 proxy support for kafka output plugin --- plugins/common/proxy/socks5.go | 25 +++++++++++++++++++++++++ plugins/outputs/kafka/kafka.go | 21 +++++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 plugins/common/proxy/socks5.go diff --git a/plugins/common/proxy/socks5.go b/plugins/common/proxy/socks5.go new file mode 100644 index 0000000000000..b0e299c293bcc --- /dev/null +++ b/plugins/common/proxy/socks5.go @@ -0,0 +1,25 @@ +package proxy + +import ( + "golang.org/x/net/proxy" +) + +type Socks5ProxyConfig struct { + address string `toml:"socks5_address"` + username string `toml:"socks5_username"` + password string `toml:"socks5_password"` +} + +func (c* Socks5ProxyConfig) GetDialer() (*proxy.Dialer, error) { + var auth *proxy.Auth + if c.username != "" || c.password != "" { + auth = new(proxy.Auth) + auth.User = c.username + auth.Password = c.password + } + dialer, err := proxy.SOCKS5("tcp", c.address, auth, proxy.Direct) + if err != nil { + return nil, err + } + return &dialer, nil +} diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index d7071f257babc..06ce629bb94ae 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/kafka" tlsint "github.com/influxdata/telegraf/plugins/common/tls" + proxyint "github.com/influxdata/telegraf/plugins/common/proxy" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" ) @@ -39,6 +40,9 @@ type ( MaxRetry int `toml:"max_retry"` MaxMessageBytes int `toml:"max_message_bytes"` + EnableSocks5 *bool `toml:"enable_socks5"` + proxyint.Socks5ProxyConfig + Version string `toml:"version"` // Legacy TLS config options @@ -200,6 +204,12 @@ var sampleConfig = ` ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Optional SOCKS5 proxy to use when connecting to brokers + # enable_socks5 = true + # socks5_address = "127.0.0.1:1080" + # socks5_username = "alice" + # socks5_password = "pass123" + ## Optional SASL Config # sasl_username = "kafka" # sasl_password = "secret" @@ -320,6 +330,17 @@ func (k *Kafka) Connect() error { } } + if k.EnableSocks5 != nil && *k.EnableSocks5 { + config.Net.Proxy.Enable = true + + dialer, err := k.Socks5ProxyConfig.GetDialer() + if err != nil { + log.Fatalf("Error while connecting to proxy server: %s", err) + return err + } + config.Net.Proxy.Dialer = *dialer + } + if k.SASLUsername != "" && k.SASLPassword != "" { config.Net.SASL.User = k.SASLUsername config.Net.SASL.Password = k.SASLPassword