Skip to content

Commit

Permalink
Apply the fast filter optimization to composite aggregation (opensear…
Browse files Browse the repository at this point in the history
…ch-project#11505)

We can do efficient DateHistogram aggregation under composite 
aggregations.

---------

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jan 17, 2024
1 parent 9da8811 commit cc815d3
Show file tree
Hide file tree
Showing 14 changed files with 585 additions and 378 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.comparators.LongComparator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.RoaringDocIdSet;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.IndexSortConfig;
import org.opensearch.lucene.queries.SearchAfterSortedDocQuery;
Expand All @@ -70,7 +72,9 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats;
Expand All @@ -79,6 +83,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.LongUnaryOperator;
Expand Down Expand Up @@ -110,6 +115,10 @@ final class CompositeAggregator extends BucketsAggregator {

private boolean earlyTerminated;

private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
private LongKeyedBucketOrds bucketOrds = null;
private Rounding.Prepared preparedRounding = null;

CompositeAggregator(
String name,
AggregatorFactories factories,
Expand Down Expand Up @@ -153,12 +162,33 @@ final class CompositeAggregator extends BucketsAggregator {
}
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is the data structure for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
// Currently the filter rewrite is only supported for date histograms
FastFilterRewriteHelper.CompositeAggregationType aggregationType =
(FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
preparedRounding = aggregationType.getRoundingPreparer();
fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
x -> aggregationType.getRounding(),
() -> preparedRounding
);
}
}

@Override
protected void doClose() {
try {
Releasables.close(queue);
Releasables.close(bucketOrds);
} finally {
Releasables.close(sources);
}
Expand Down Expand Up @@ -186,12 +216,14 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
}

int num = Math.min(size, queue.size());
final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];

long[] bucketOrdsToCollect = new long[queue.size()];
for (int i = 0; i < queue.size(); i++) {
bucketOrdsToCollect[i] = i;
}
InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);

while (queue.size() > 0) {
int slot = queue.pop();
CompositeKey key = queue.toCompositeKey(slot);
Expand All @@ -207,6 +239,43 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
aggs
);
}

// Build results from fast filters optimization
if (bucketOrds != null) {
// CompositeKey is the value of bucket key
final Map<CompositeKey, InternalComposite.InternalBucket> bucketMap = new HashMap<>();
// Some segments may not be optimized, so buckets may contain results from the queue.
for (InternalComposite.InternalBucket internalBucket : buckets) {
bucketMap.put(internalBucket.getRawKey(), internalBucket);
}
// Loop over the buckets in the bucketOrds, and populate the map accordingly
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0);
while (ordsEnum.next()) {
Long bucketKeyValue = ordsEnum.value();
CompositeKey key = new CompositeKey(bucketKeyValue);
if (bucketMap.containsKey(key)) {
long docCount = bucketDocCount(ordsEnum.ord()) + bucketMap.get(key).getDocCount();
bucketMap.get(key).setDocCount(docCount);
} else {
InternalComposite.InternalBucket bucket = new InternalComposite.InternalBucket(
sourceNames,
formats,
key,
reverseMuls,
missingOrders,
bucketDocCount(ordsEnum.ord()),
buildEmptySubAggregations()
);
bucketMap.put(key, bucket);
}
}
// since a map is not sorted structure, sort it before transform back to buckets
List<InternalComposite.InternalBucket> bucketList = new ArrayList<>(bucketMap.values());
CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey);
buckets = bucketList.subList(0, Math.min(size, bucketList.size())).toArray(InternalComposite.InternalBucket[]::new);
num = buckets.length;
}

CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
return new InternalAggregation[] {
new InternalComposite(
Expand Down Expand Up @@ -295,16 +364,16 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException

if (indexSortField.getReverse() != (source.reverseMul == -1)) {
if (i == 0) {
// the leading index sort matches the leading source field but the order is reversed
// the leading index sort matches the leading source field, but the order is reversed,
// so we don't check the other sources.
return new Sort(indexSortField);
}
break;
}
sortFields.add(indexSortField);
if (sourceConfig.valuesSource() instanceof RoundingValuesSource) {
// the rounding "squashes" many values together, that breaks the ordering of sub-values
// so we ignore subsequent source even if they match the index sort.
// the rounding "squashes" many values together, that breaks the ordering of sub-values,
// so we ignore the subsequent sources even if they match the index sort.
break;
}
}
Expand Down Expand Up @@ -447,6 +516,16 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
ctx,
fastFilterContext,
(key, count) -> incrementBucketDocCount(
FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))),
count
)
);
if (optimized) throw new CollectionTerminatedException();

finishLeaf();

boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
Expand Down Expand Up @@ -476,9 +555,10 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
}
if (rawAfterKey != null && sortPrefixLen > 0) {
// We have an after key and index sort is applicable so we jump directly to the doc
// that is after the index sort prefix using the rawAfterKey and we start collecting
// document from there.
// We have an after key and index sort is applicable, so we jump directly to the doc
// after the index sort prefix using the rawAfterKey and we start collecting
// documents from there.
assert indexSortPrefix != null;
processLeafFromQuery(ctx, indexSortPrefix);
throw new CollectionTerminatedException();
} else {
Expand Down Expand Up @@ -506,6 +586,8 @@ public void collect(int doc, long bucket) throws IOException {
try {
long docCount = docCountProvider.getDocCount(doc);
if (queue.addIfCompetitive(indexSortPrefix, docCount)) {
// one doc may contain multiple values, we iterate over and collect one by one
// so the same doc can appear multiple times here
if (builder != null && lastDoc != doc) {
builder.add(doc);
lastDoc = doc;
Expand Down Expand Up @@ -568,7 +650,7 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0;
Integer slot = queue.compareCurrent();
Integer slot = queue.getCurrentSlot();
if (slot != null) {
// The candidate key is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*
* @opensearch.internal
*/
class CompositeKey implements Writeable {
public class CompositeKey implements Writeable {
private final Comparable[] values;

CompositeKey(Comparable... values) {
Expand All @@ -64,11 +64,11 @@ Comparable[] values() {
return values;
}

int size() {
public int size() {
return values.length;
}

Comparable get(int pos) {
public Comparable get(int pos) {
assert pos < values.length;
return values[pos];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@

/**
* A specialized {@link PriorityQueue} implementation for composite buckets.
* Can think of this as a max heap that holds the top small buckets slots in order.
* Each slot holds the values of the composite bucket key it represents.
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -77,7 +79,7 @@ public int hashCode() {

private final BigArrays bigArrays;
private final int maxSize;
private final Map<Slot, Integer> map;
private final Map<Slot, Integer> map; // to quickly find the slot for a value
private final SingleDimensionValuesSource<?>[] arrays;

private LongArray docCounts;
Expand Down Expand Up @@ -108,7 +110,7 @@ public int hashCode() {

@Override
protected boolean lessThan(Integer a, Integer b) {
return compare(a, b) > 0;
return compare(a, b) > 0; // max heap
}

/**
Expand All @@ -119,10 +121,10 @@ boolean isFull() {
}

/**
* Compares the current candidate with the values in the queue and returns
* Try to get the slot of the current/candidate values in the queue and returns
* the slot if the candidate is already in the queue or null if the candidate is not present.
*/
Integer compareCurrent() {
Integer getCurrentSlot() {
return map.get(new Slot(CANDIDATE_SLOT));
}

Expand Down Expand Up @@ -281,32 +283,34 @@ boolean addIfCompetitive(long inc) {
*/
boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
// checks if the candidate key is competitive
Integer topSlot = compareCurrent();
if (topSlot != null) {
Integer curSlot = getCurrentSlot();
if (curSlot != null) {
// this key is already in the top N, skip it
docCounts.increment(topSlot, inc);
docCounts.increment(curSlot, inc);
return true;
}

if (afterKeyIsSet) {
int cmp = compareCurrentWithAfter();
if (cmp <= 0) {
if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) {
// the leading index sort is in the reverse order of the leading source
// the leading index sort is and the leading source order are both reversed,
// so we can early terminate when we reach a document that is smaller
// than the after key (collected on a previous page).
throw new CollectionTerminatedException();
}
// key was collected on a previous page, skip it (>= afterKey).
// the key was collected on a previous page, skip it.
return false;
}
}

// the heap is full, check if the candidate key larger than max heap top
if (size() >= maxSize) {
// the tree map is full, check if the candidate key should be kept
int cmp = compare(CANDIDATE_SLOT, top());
if (cmp > 0) {
if (cmp <= indexSortSourcePrefix) {
// index sort guarantees that there is no key greater or equal than the
// current one in the subsequent documents so we can early terminate.
// index sort guarantees the following documents will have a key larger than the current candidate,
// so we can early terminate.
throw new CollectionTerminatedException();
}
// the candidate key is not competitive, skip it.
Expand All @@ -324,7 +328,7 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
} else {
newSlot = size();
}
// move the candidate key to its new slot
// move the candidate key to its new slot by copy its values to the new slot
copyCurrent(newSlot, inc);
map.put(new Slot(newSlot), newSlot);
add(newSlot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public MissingOrder missingOrder() {
/**
* Returns true if the source contains a script that can change the value.
*/
protected boolean hasScript() {
public boolean hasScript() {
return hasScript;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
// TODO once composite is plugged in to the values source registry or at least understands Date values source types use it
// here
Rounding.Prepared preparedRounding = rounding.prepareForUnknown();
RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding);
RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding, rounding);
// is specified in the builder.
final DocValueFormat docValueFormat = format == null ? DocValueFormat.RAW : valuesSourceConfig.format();
final MappedFieldType fieldType = valuesSourceConfig.fieldType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern
KeyComparable<InternalBucket> {

private final CompositeKey key;
private final long docCount;
private long docCount;
private final InternalAggregations aggregations;
private final transient int[] reverseMuls;
private final transient MissingOrder[] missingOrders;
Expand Down Expand Up @@ -448,6 +448,10 @@ public long getDocCount() {
return docCount;
}

public void setDocCount(long docCount) {
this.docCount = docCount;
}

@Override
public Aggregations getAggregations() {
return aggregations;
Expand Down
Loading

0 comments on commit cc815d3

Please sign in to comment.