Skip to content

Commit

Permalink
Add support for TLS configuration in NSQ input (influxdata#3903)
Browse files Browse the repository at this point in the history
  • Loading branch information
Soulou authored and otherpirate committed Mar 15, 2019
1 parent fd9b8db commit bf5a302
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
49 changes: 39 additions & 10 deletions plugins/inputs/nsq/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,27 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

// Might add Lookupd endpoints for cluster discovery
type NSQ struct {
Endpoints []string
tls.ClientConfig
httpClient *http.Client
}

var sampleConfig = `
## An array of NSQD HTTP API endpoints
endpoints = ["http://localhost:4151"]
endpoints = ["http://localhost:4151"]
## Or using HTTPS endpoint
endpoints = ["https://localhost:4152"]
tls_cert = "/path/to/client-cert.pem"
tls_key = "/path/to/client-key.pem"
tls_ca = "/path/to/ca.pem"
insecure_skip_verify = false
`

const (
Expand All @@ -52,10 +62,14 @@ const (

func init() {
inputs.Add("nsq", func() telegraf.Input {
return &NSQ{}
return New()
})
}

func New() *NSQ {
return &NSQ{}
}

func (n *NSQ) SampleConfig() string {
return sampleConfig
}
Expand All @@ -65,6 +79,15 @@ func (n *NSQ) Description() string {
}

func (n *NSQ) Gather(acc telegraf.Accumulator) error {
var err error

if n.httpClient == nil {
n.httpClient, err = n.getHttpClient()
if err != nil {
return err
}
}

var wg sync.WaitGroup
for _, e := range n.Endpoints {
wg.Add(1)
Expand All @@ -78,21 +101,27 @@ func (n *NSQ) Gather(acc telegraf.Accumulator) error {
return nil
}

var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
func (n *NSQ) getHttpClient() (*http.Client, error) {
tlsConfig, err := n.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
}
httpClient := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return httpClient, nil
}

func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
u, err := buildURL(e)
if err != nil {
return err
}
r, err := client.Get(u.String())
r, err := n.httpClient.Get(u.String())
if err != nil {
return fmt.Errorf("Error while polling %s: %s", u.String(), err)
}
Expand Down
10 changes: 4 additions & 6 deletions plugins/inputs/nsq/nsq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ func TestNSQStatsV1(t *testing.T) {
}))
defer ts.Close()

n := &NSQ{
Endpoints: []string{ts.URL},
}
n := New()
n.Endpoints = []string{ts.URL}

var acc testutil.Accumulator
err := acc.GatherError(n.Gather)
Expand Down Expand Up @@ -276,9 +275,8 @@ func TestNSQStatsPreV1(t *testing.T) {
}))
defer ts.Close()

n := &NSQ{
Endpoints: []string{ts.URL},
}
n := New()
n.Endpoints = []string{ts.URL}

var acc testutil.Accumulator
err := acc.GatherError(n.Gather)
Expand Down

0 comments on commit bf5a302

Please sign in to comment.