Skip to content
This repository has been archived by the owner on Aug 26, 2020. It is now read-only.

Commit

Permalink
Fix map read and write concurrent crash
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Apr 27, 2018
1 parent ef044cc commit 3cd3aa0
Showing 1 changed file with 54 additions and 41 deletions.
95 changes: 54 additions & 41 deletions monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,44 +311,46 @@ func (client *KafkaClient) CombineTopicAndConsumer() {
continue
}
result := make(map[string]*ConsumerFullOffset)
if _, ok := client.consumerOffset[topic]; ok {
for partition, partitionOffset := range offset.partitionMap {
if _, ok := client.consumerOffset[topic][partition]; ok {
for consumer, consumerOffset := range client.consumerOffset[topic][partition] {
if _, ok := result[consumer]; !ok {
result[consumer] = &ConsumerFullOffset{
Cluster: client.cluster,
Topic: topic,
Group: consumer,
partitionMap: make(map[int32]int64),
Timestamp: offset.Timestamp,
withReadLock(client.topicOffsetMapLock, func() {
if _, ok := client.consumerOffset[topic]; ok {
for partition, partitionOffset := range offset.partitionMap {
if _, ok := client.consumerOffset[topic][partition]; ok {
for consumer, consumerOffset := range client.consumerOffset[topic][partition] {
if _, ok := result[consumer]; !ok {
result[consumer] = &ConsumerFullOffset{
Cluster: client.cluster,
Topic: topic,
Group: consumer,
partitionMap: make(map[int32]int64),
Timestamp: offset.Timestamp,
}
}
}
//judge time diff
if offset.Timestamp-consumerOffset.Timestamp <= 60*1000 {
if _, ok := result[consumer].partitionMap[partition]; !ok {
result[consumer].partitionMap[partition] = consumerOffset.Offset

if consumerOffset.Offset < 0 {
consumerOffset.Offset = partitionOffset
//judge time diff
if offset.Timestamp-consumerOffset.Timestamp <= 60*1000 {
if _, ok := result[consumer].partitionMap[partition]; !ok {
result[consumer].partitionMap[partition] = consumerOffset.Offset

if consumerOffset.Offset < 0 {
consumerOffset.Offset = partitionOffset
}
result[consumer].Offset += consumerOffset.Offset
result[consumer].MaxOffset += partitionOffset
}
result[consumer].Offset += consumerOffset.Offset
result[consumer].MaxOffset += partitionOffset
} else {
//ingore
log.Debugf("Drop %s:%v offset by partition metric by time diff not match", topic, consumer)
}
} else {
//ingore
log.Debugf("Drop %s:%v offset by partition metric by time diff not match", topic, consumer)
}
} else {
log.Debugf("Drop %s:%d offset by partition metric match ", topic, partition)
}
} else {
log.Debugf("Drop %s:%d offset by partition metric match ", topic, partition)
}
} else {
//ignore
log.Debugf("Topic %s not found any consumer", topic)
}
})

} else {
//ignore
log.Debugf("Topic %s not found any consumer", topic)
}
for consumer, consumerFullOffset := range result {
if len(consumerFullOffset.partitionMap) == count {
client.importer.saveMsg(consumerFullOffset)
Expand Down Expand Up @@ -443,17 +445,16 @@ func (client *KafkaClient) RefreshConsumerOffset(msg *sarama.ConsumerMessage) {
Offset: int64(offset),
Timestamp: int64(timestamp),
}
client.consumerOffsetMapLock.Lock()
if _, ok := client.consumerOffset[topic]; !ok {
client.consumerOffset[topic] = make(map[int32]map[string]*ConsumerOffset)
}
if _, ok := client.consumerOffset[topic][int32(partition)]; !ok {
client.consumerOffset[topic][int32(partition)] = make(map[string]*ConsumerOffset)
}

client.consumerOffset[topic][int32(partition)][co.Group] = co
log.Debugf("setting %s:%v:%d offset:%d", topic, co.Group, partition, co.Offset)
client.consumerOffsetMapLock.Unlock()
withWriteLock(client.consumerOffsetMapLock, func() {
if _, ok := client.consumerOffset[topic]; !ok {
client.consumerOffset[topic] = make(map[int32]map[string]*ConsumerOffset)
}
if _, ok := client.consumerOffset[topic][int32(partition)]; !ok {
client.consumerOffset[topic][int32(partition)] = make(map[string]*ConsumerOffset)
}
client.consumerOffset[topic][int32(partition)][co.Group] = co
log.Debugf("setting %s:%v:%d offset:%d", topic, co.Group, partition, co.Offset)
})
return
}
func readString(buf *bytes.Buffer) (string, error) {
Expand All @@ -477,3 +478,15 @@ func genKey(topic, consumer string, partition int) string {
func getConsumerFromKey(key string) string {
return strings.Split(key, "_")[1]
}

func withWriteLock(lock sync.RWMutex, fn func()) {
lock.Lock()
defer lock.Unlock()
fn()
}

func withReadLock(lock sync.RWMutex, fn func()) {
lock.RLock()
defer lock.RUnlock()
fn()
}

0 comments on commit 3cd3aa0

Please sign in to comment.