diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java index 1c390cb49691..04178f8a8b5b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java @@ -228,24 +228,34 @@ public long estimateMessageCount(long from, long to, MessageFilter filter) { if (to > getMaxOffsetInQueue()) { to = getMaxOffsetInQueue(); } + int maxSampleSize = messageStore.getMessageStoreConfig().getMaxConsumeQueueScan(); int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to - from); + + int matchThreshold = messageStore.getMessageStoreConfig().getSampleCountThreshold(); int matchSize = 0; + for (int i = 0; i < sampleSize; i++) { long index = from + i; Pair pair = getCqUnitAndStoreTime(index); if (pair == null) { - continue; + break; } CqUnit cqUnit = pair.getObject1(); if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) { matchSize++; + // if matchSize is plenty, early exit estimate + if (matchSize > matchThreshold) { + sampleSize = i; + break; + } } } // Make sure the second half is a floating point number, otherwise it will be truncated to 0 - return (long)((to - from) * (matchSize / (sampleSize * 1.0))); + return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize / (sampleSize * 1.0))); } + @Override public long getMinOffsetInQueue() { return this.messageStore.getMinOffsetInQueue(this.topic, this.queueId);