Skip to content
This repository has been archived by the owner on Aug 26, 2020. It is now read-only.

Commit

Permalink
add kafka username password auth
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Mar 15, 2018
1 parent e46bc72 commit ef044cc
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 0 deletions.
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type Config struct {
GroupBlacklist string `json:"groupBlacklist"`
Logconfig string `json:"logconfig"`
Pidfile string `json:"pidfile"`

TopicFilter string `json:"topicFilter"`
} `json:"general"`

Influxdb struct {
Expand All @@ -28,6 +30,11 @@ type Config struct {
OffsetTopic string `json:"offsetTopic"`
ClientProfile string `json:"ClientProfile"`
OffsetsTopic string `gcfg:"offsetsTopic"`

Sasl struct {
Username string
Password string
}
} `json:"kafka"`

Zookeeper struct {
Expand Down
2 changes: 2 additions & 0 deletions config/server.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"clientId": "burrowx-lagchecker",
"groupBlacklist": "^(console-consumer-|python-kafka-consumer-).*$",

"topicFilter" : "*",

"clientProfile" : {
"some_key" : {
"clientId" : "xxxx",
Expand Down
22 changes: 22 additions & 0 deletions monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -38,6 +39,8 @@ type KafkaClient struct {
topicOffset map[string]*TopicFullOffset //topic => offset

importer *Importer

topicFilterRegexps []*regexp.Regexp
}

type BrokerTopicRequest struct {
Expand Down Expand Up @@ -72,6 +75,11 @@ func NewKafkaClient(cfg *config.Config, cluster string) (*KafkaClient, error) {
}
clientConfig.Net.TLS.Config.InsecureSkipVerify = profile.TLSNoVerify

if cfg.Kafka[cluster].Sasl.Username != "" {
clientConfig.Net.SASL.Enable = true
clientConfig.Net.SASL.User = cfg.Kafka[cluster].Sasl.Username
clientConfig.Net.SASL.Password = cfg.Kafka[cluster].Sasl.Password
}
sclient, err := sarama.NewClient(strings.Split(cfg.Kafka[cluster].Brokers, ","), clientConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -111,6 +119,14 @@ func NewKafkaClient(cfg *config.Config, cluster string) (*KafkaClient, error) {
importer: importer,
}

if cfg.General.TopicFilter != "" {
patterns := strings.Split(cfg.General.TopicFilter, ",")
client.topicFilterRegexps = make([]*regexp.Regexp, 0, 10)
for _, p := range patterns {
client.topicFilterRegexps = append(client.topicFilterRegexps, regexp.MustCompile(p))
}
}

return client, nil
}

Expand Down Expand Up @@ -348,7 +364,13 @@ func (client *KafkaClient) CombineTopicAndConsumer() {
func (client *KafkaClient) RefreshTopicMap() {
client.topicMapLock.Lock()
topics, _ := client.client.Topics()
//filter topic by topicFilter
for _, topic := range topics {
for _, reg := range client.topicFilterRegexps {
if !reg.MatchString(topic) {
continue
}
}
partitions, _ := client.client.Partitions(topic)
client.topicMap[topic] = len(partitions)
}
Expand Down

0 comments on commit ef044cc

Please sign in to comment.