Skip to content

Commit

Permalink
optimize num agg using quick select for topN when applicable
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed Feb 11, 2025
1 parent c82fffe commit 130d890
Showing 1 changed file with 56 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.PriorityQueue;
import org.opensearch.common.Numbers;
Expand Down Expand Up @@ -183,37 +184,68 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);

int size = (int) Math.min(bucketsInOrd, localBucketCountThresholds.getRequiredSize());
PriorityQueue<B> ordered = buildPriorityQueue(size);
B spare = null;
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < localBucketCountThresholds.getMinDocCount()) {
continue;

if ((bucketsInOrd > (size * 2L)) || isKeyOrder(order)) {
// use heap sort
PriorityQueue<B> ordered = buildPriorityQueue(size);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < localBucketCountThresholds.getMinDocCount()) {
continue;
}
if (spare == null) {
spare = emptyBucketBuilder.get();
}
updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
}
// Get the top buckets
B[] bucketsForOrd = buildBuckets(ordered.size());
topBucketsPerOrd[ordIdx] = bucketsForOrd;
if (isKeyOrder(order)) {
for (int b = ordered.size() - 1; b >= 0; --b) {
topBucketsPerOrd[ordIdx][b] = ordered.pop();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
}
} else {
// sorted buckets not needed as they will be sorted by key in buildResult() which is different from
// order in priority queue ordered
Iterator<B> itr = ordered.iterator();
for (int b = ordered.size() - 1; b >= 0; --b) {
topBucketsPerOrd[ordIdx][b] = itr.next();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
}
}
if (spare == null) {
} else {
B[] bucketsForOrd = buildBuckets((int) bucketOrds.size());
int tot = 0;
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < localBucketCountThresholds.getMinDocCount()) {
continue;
}
spare = emptyBucketBuilder.get();
updateBucket(spare, ordsEnum, docCount);
bucketsForOrd[tot++] = spare;
}
updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
}

// Get the top buckets
B[] bucketsForOrd = buildBuckets(ordered.size());
topBucketsPerOrd[ordIdx] = bucketsForOrd;
if (isKeyOrder(order)) {
for (int b = ordered.size() - 1; b >= 0; --b) {
topBucketsPerOrd[ordIdx][b] = ordered.pop();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
if (tot > size & partiallyBuiltBucketComparator != null) {
// quick select topN
// TODO do we need to handle case for SignificantTerm Agg separately
ArrayUtil.select(
bucketsForOrd,
0,
tot,
size,
((b1, b2) -> partiallyBuiltBucketComparator.compare((InternalTerms.Bucket<?>) b1, (InternalTerms.Bucket<?>) b2))
);
}
} else {
// sorted buckets not needed as they will be sorted by key in buildResult() which is different from
// order in priority queue ordered
Iterator<B> itr = ordered.iterator();
for (int b = ordered.size() - 1; b >= 0; --b) {
topBucketsPerOrd[ordIdx][b] = itr.next();
topBucketsPerOrd[ordIdx] = Arrays.copyOf(bucketsForOrd, size);
for (int b = 0; b < size; b++) {
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
}
}
Expand Down

0 comments on commit 130d890

Please sign in to comment.