diff --git a/monitor/client.go b/monitor/client.go index 60d6f14..dfb8e63 100644 --- a/monitor/client.go +++ b/monitor/client.go @@ -326,38 +326,45 @@ func (client *KafkaClient) RefreshMetaData() { //group description topic2Consumer := map[string]map[string]bool{} - for _, broker := range client.client.Brokers() { - if ok, _ := broker.Connected(); !ok { - broker.Open(client.client.Config()) + groupsPerBroker := make(map[*sarama.Broker][]string) + for _, group := range groupList { + controller, err := client.client.Coordinator(group) + if err != nil { + log.Warnf("Coordinator group:%s error : %v", group, err) + return } - resp, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupList}) + groupsPerBroker[controller] = append(groupsPerBroker[controller], group) + } + + for broker, brokerGroups := range groupsPerBroker { + response, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{ + Groups: brokerGroups, + }) if err != nil { - log.Warnf("DescribeGroups error : %v", err) + log.Warnf("get groupDescribe fail:%v", err) continue - } else { - for _, desc := range resp.Groups { - for _, gmd := range desc.Members { - metadata, err2 := gmd.GetMemberMetadata() - if err2 != nil { - log.Warnf("GetMemberMetadata error : %v", err) - continue - } else { - for _, topic := range metadata.Topics { - if _, ok := client.topicMap[topic]; !ok { - continue - } - if _, ok := topic2Consumer[topic]; !ok { - topic2Consumer[topic] = make(map[string]bool) - } - topic2Consumer[topic][desc.GroupId] = true + } + for _, desc := range response.Groups { + for _, gmd := range desc.Members { + metadata, err2 := gmd.GetMemberMetadata() + if err2 != nil { + log.Warnf("GetMemberMetadata error : %v", err) + continue + } else { + for _, topic := range metadata.Topics { + if _, ok := client.topicMap[topic]; !ok { + continue + } + if _, ok := topic2Consumer[topic]; !ok { + topic2Consumer[topic] = make(map[string]bool) } + topic2Consumer[topic][desc.GroupId] = true } } } } } - log.Debugf("topic2Consumer %v \n", topic2Consumer) for topic, consumerMap := range topic2Consumer { client.topic2Consumer[topic] = make([]string, 0, len(consumerMap)) for group := range consumerMap { @@ -366,13 +373,13 @@ func (client *KafkaClient) RefreshMetaData() { } for _, reg := range client.groupFilterRegexps { if reg.MatchString(group) { - groupList = append(groupList, group) break } } client.topic2Consumer[topic] = append(client.topic2Consumer[topic], group) } } + log.Debugf("topic2Consumer %v \n", client.topic2Consumer) } func withWriteLock(lock *sync.RWMutex, fn func()) { diff --git a/monitor/importer.go b/monitor/importer.go index 9536902..c9d2aa6 100644 --- a/monitor/importer.go +++ b/monitor/importer.go @@ -70,6 +70,7 @@ func (i *Importer) start() { } if entry.Offset < 0 { fields["lag"] = -1 + continue } tm := time.Unix(msg.Timestamp/1000, 0)