Skip to content

Commit

Permalink
kafka: add socks5 proxy support for kafka output plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
alexole committed Oct 14, 2020
1 parent 57cd20a commit 9e49963
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
25 changes: 25 additions & 0 deletions plugins/common/proxy/socks5.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package proxy

import (
"golang.org/x/net/proxy"
)

type Socks5ProxyConfig struct {
address string `toml:"socks5_address"`
username string `toml:"socks5_username"`
password string `toml:"socks5_password"`
}

func (c* Socks5ProxyConfig) GetDialer() (*proxy.Dialer, error) {
var auth *proxy.Auth
if c.username != "" || c.password != "" {
auth = new(proxy.Auth)
auth.User = c.username
auth.Password = c.password
}
dialer, err := proxy.SOCKS5("tcp", c.address, auth, proxy.Direct)
if err != nil {
return nil, err
}
return &dialer, nil
}
21 changes: 21 additions & 0 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/kafka"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
proxyint "github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
Expand Down Expand Up @@ -39,6 +40,9 @@ type (
MaxRetry int `toml:"max_retry"`
MaxMessageBytes int `toml:"max_message_bytes"`

EnableSocks5 *bool `toml:"enable_socks5"`
proxyint.Socks5ProxyConfig

Version string `toml:"version"`

// Legacy TLS config options
Expand Down Expand Up @@ -200,6 +204,12 @@ var sampleConfig = `
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Optional SOCKS5 proxy to use when connecting to brokers
# enable_socks5 = true
# socks5_address = "127.0.0.1:1080"
# socks5_username = "alice"
# socks5_password = "pass123"
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
Expand Down Expand Up @@ -320,6 +330,17 @@ func (k *Kafka) Connect() error {
}
}

if k.EnableSocks5 != nil && *k.EnableSocks5 {
config.Net.Proxy.Enable = true

dialer, err := k.Socks5ProxyConfig.GetDialer()
if err != nil {
log.Fatalf("Error while connecting to proxy server: %s", err)
return err
}
config.Net.Proxy.Dialer = *dialer
}

if k.SASLUsername != "" && k.SASLPassword != "" {
config.Net.SASL.User = k.SASLUsername
config.Net.SASL.Password = k.SASLPassword
Expand Down

0 comments on commit 9e49963

Please sign in to comment.