Skip to content

Commit

Permalink
Implementation of kvcq message lag estimation
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe committed Oct 10, 2024
1 parent 4b1a949 commit cdd61f6
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,24 +228,34 @@ public long estimateMessageCount(long from, long to, MessageFilter filter) {
if (to > getMaxOffsetInQueue()) {
to = getMaxOffsetInQueue();
}

int maxSampleSize = messageStore.getMessageStoreConfig().getMaxConsumeQueueScan();
int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to - from);

int matchThreshold = messageStore.getMessageStoreConfig().getSampleCountThreshold();
int matchSize = 0;

for (int i = 0; i < sampleSize; i++) {
long index = from + i;
Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index);
if (pair == null) {
continue;
break;
}
CqUnit cqUnit = pair.getObject1();
if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) {
matchSize++;
// if matchSize is plenty, early exit estimate
if (matchSize > matchThreshold) {
sampleSize = i;
break;
}
}
}
// Make sure the second half is a floating point number, otherwise it will be truncated to 0
return (long)((to - from) * (matchSize / (sampleSize * 1.0)));
return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize / (sampleSize * 1.0)));
}


@Override
public long getMinOffsetInQueue() {
return this.messageStore.getMinOffsetInQueue(this.topic, this.queueId);
Expand Down

0 comments on commit cdd61f6

Please sign in to comment.