diff --git a/monitor/client.go b/monitor/client.go index 0176680..2841a11 100644 --- a/monitor/client.go +++ b/monitor/client.go @@ -244,10 +244,14 @@ func (client *KafkaClient) offsetFetchImport() { pmanager, _ := manager.ManagePartition(topic, parition) offset, _ := pmanager.NextOffset() - msg.partitionMap[parition] = LogOffset{ + logOffset := LogOffset{ Logsize: client.topicOffset[topic][parition], Offset: offset, } + if logOffset.Logsize < logOffset.Offset && logOffset.Logsize != 0 { + logOffset.Offset = logOffset.Logsize + } + msg.partitionMap[parition] = logOffset } if len(msg.partitionMap) > 0 { client.importer.saveMsg(msg) @@ -264,9 +268,7 @@ func (client *KafkaClient) MergeMaps(topicOffsetMap map[string]map[int32]int64) client.topicOffset[topic] = topicOffset } else { for partition, offset := range topicOffset { - if _, ok2 := client.topicOffset[topic][partition]; !ok2 { - client.topicOffset[topic][partition] = offset - } + client.topicOffset[topic][partition] = offset } } }