Skip to content
This repository has been archived by the owner on Aug 26, 2020. It is now read-only.

Commit

Permalink
优化
Browse files Browse the repository at this point in the history
  • Loading branch information
mozengwen committed May 28, 2019
1 parent a99195a commit 0be7d2b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
53 changes: 30 additions & 23 deletions monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,38 +326,45 @@ func (client *KafkaClient) RefreshMetaData() {

//group description
topic2Consumer := map[string]map[string]bool{}
for _, broker := range client.client.Brokers() {
if ok, _ := broker.Connected(); !ok {
broker.Open(client.client.Config())
groupsPerBroker := make(map[*sarama.Broker][]string)
for _, group := range groupList {
controller, err := client.client.Coordinator(group)
if err != nil {
log.Warnf("Coordinator group:%s error : %v", group, err)
return
}
resp, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupList})
groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
}

for broker, brokerGroups := range groupsPerBroker {
response, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{
Groups: brokerGroups,
})
if err != nil {
log.Warnf("DescribeGroups error : %v", err)
log.Warnf("get groupDescribe fail:%v", err)
continue
} else {
for _, desc := range resp.Groups {
for _, gmd := range desc.Members {
metadata, err2 := gmd.GetMemberMetadata()
if err2 != nil {
log.Warnf("GetMemberMetadata error : %v", err)
continue
} else {
for _, topic := range metadata.Topics {
if _, ok := client.topicMap[topic]; !ok {
continue
}
if _, ok := topic2Consumer[topic]; !ok {
topic2Consumer[topic] = make(map[string]bool)
}
topic2Consumer[topic][desc.GroupId] = true
}
for _, desc := range response.Groups {
for _, gmd := range desc.Members {
metadata, err2 := gmd.GetMemberMetadata()
if err2 != nil {
log.Warnf("GetMemberMetadata error : %v", err)
continue
} else {
for _, topic := range metadata.Topics {
if _, ok := client.topicMap[topic]; !ok {
continue
}
if _, ok := topic2Consumer[topic]; !ok {
topic2Consumer[topic] = make(map[string]bool)
}
topic2Consumer[topic][desc.GroupId] = true
}
}
}
}
}

log.Debugf("topic2Consumer %v \n", topic2Consumer)
for topic, consumerMap := range topic2Consumer {
client.topic2Consumer[topic] = make([]string, 0, len(consumerMap))
for group := range consumerMap {
Expand All @@ -366,13 +373,13 @@ func (client *KafkaClient) RefreshMetaData() {
}
for _, reg := range client.groupFilterRegexps {
if reg.MatchString(group) {
groupList = append(groupList, group)
break
}
}
client.topic2Consumer[topic] = append(client.topic2Consumer[topic], group)
}
}
log.Debugf("topic2Consumer %v \n", client.topic2Consumer)
}

func withWriteLock(lock *sync.RWMutex, fn func()) {
Expand Down
1 change: 1 addition & 0 deletions monitor/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (i *Importer) start() {
}
if entry.Offset < 0 {
fields["lag"] = -1
continue
}

tm := time.Unix(msg.Timestamp/1000, 0)
Expand Down

0 comments on commit 0be7d2b

Please sign in to comment.