Skip to content

Commit

Permalink
kafka: add socks5 proxy support for kafka output plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
alexole authored and mhoffm-aiven committed Feb 7, 2022
1 parent 8433a46 commit 333990c
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ require (
github.com/apache/arrow/go/arrow v0.0.0-20211006091945-a69884db78f4 // indirect
github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect
github.com/armon/go-metrics v0.3.3 // indirect
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.2.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.5.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ github.com/armon/go-metrics v0.3.3 h1:a9F4rlj7EWWrbj7BYw8J8+x+ZZkJeqzNyRk8hdPF+r
github.com/armon/go-metrics v0.3.3/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
Expand Down
22 changes: 22 additions & 0 deletions plugins/common/proxy/socks5.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package proxy

import (
"golang.org/x/net/proxy"
)

type Socks5ProxyConfig struct {
Socks5ProxyEnable *bool `toml:"socks5_enable"`
Socks5ProxyAddress string `toml:"socks5_address"`
Socks5ProxyUsername string `toml:"socks5_username"`
Socks5ProxyPassword string `toml:"socks5_password"`
}

func (c *Socks5ProxyConfig) GetDialer() (proxy.Dialer, error) {
var auth *proxy.Auth
if c.Socks5ProxyPassword != "" || c.Socks5ProxyUsername != "" {
auth = new(proxy.Auth)
auth.User = c.Socks5ProxyUsername
auth.Password = c.Socks5ProxyPassword
}
return proxy.SOCKS5("tcp", c.Socks5ProxyAddress, auth, proxy.Direct)
}
68 changes: 68 additions & 0 deletions plugins/common/proxy/socks5_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package proxy

import (
"net"
"testing"
"time"

"github.com/armon/go-socks5"
"github.com/stretchr/testify/require"
)

func TestSocks5ProxyConfig(t *testing.T) {
const (
proxyAddress = "0.0.0.0:12345"
proxyUsername = "user"
proxyPassword = "password"
)

l, err := net.Listen("tcp", "0.0.0.0:0")
require.NoError(t, err)

server, err := socks5.New(&socks5.Config{
AuthMethods: []socks5.Authenticator{socks5.UserPassAuthenticator{
Credentials: socks5.StaticCredentials{
proxyUsername: proxyPassword,
},
}},
})
require.NoError(t, err)

go func() { require.NoError(t, server.ListenAndServe("tcp", proxyAddress)) }()

enabled := true
conf := Socks5ProxyConfig{
Socks5ProxyEnable: &enabled,
Socks5ProxyAddress: proxyAddress,
Socks5ProxyUsername: proxyUsername,
Socks5ProxyPassword: proxyPassword,
}
dialer, err := conf.GetDialer()
require.NoError(t, err)

var proxyConn net.Conn
for i := 0; i < 10; i++ {
proxyConn, err = dialer.Dial("tcp", l.Addr().String())
if err != nil {
time.Sleep(10 * time.Millisecond)
continue
}
break
}
require.NotNil(t, proxyConn)
defer func() { proxyConn.Close() }()

serverConn, err := l.Accept()
require.NoError(t, err)
defer func() { serverConn.Close() }()

writePayload := []byte("test")
_, err = proxyConn.Write(writePayload)
require.NoError(t, err)

receivePayload := make([]byte, 4)
_, err = serverConn.Read(receivePayload)
require.NoError(t, err)

require.Equal(t, writePayload, receivePayload)
}
19 changes: 19 additions & 0 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
Expand All @@ -32,6 +33,8 @@ type Kafka struct {
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`

proxy.Socks5ProxyConfig

// Legacy TLS config options
// TLS client certificate
Certificate string
Expand Down Expand Up @@ -189,6 +192,12 @@ var sampleConfig = `
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Optional SOCKS5 proxy to use when connecting to brokers
# socks5_enable = true
# socks5_address = "127.0.0.1:1080"
# socks5_username = "alice"
# socks5_password = "pass123"
## Optional SASL Config
# sasl_username = "kafka"
# sasl_password = "secret"
Expand Down Expand Up @@ -292,6 +301,16 @@ func (k *Kafka) Init() error {
k.TLSKey = k.Key
}

if k.Socks5ProxyEnable != nil && *k.Socks5ProxyEnable {
config.Net.Proxy.Enable = true

dialer, err := k.Socks5ProxyConfig.GetDialer()
if err != nil {
return fmt.Errorf("Error while connecting to proxy server: %s", err)
}
config.Net.Proxy.Dialer = dialer
}

return nil
}

Expand Down

0 comments on commit 333990c

Please sign in to comment.