From cc815d39fcb4ffdf615c1e26adf28917b57914e9 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 17 Jan 2024 09:29:52 -0800 Subject: [PATCH] Apply the fast filter optimization to composite aggregation (#11505) We can do efficient DateHistogram aggregation under composite aggregations. --------- Signed-off-by: bowenlan-amzn --- CHANGELOG.md | 1 + .../bucket/FastFilterRewriteHelper.java | 385 ++++++++++++++++++ .../bucket/composite/CompositeAggregator.java | 98 ++++- .../bucket/composite/CompositeKey.java | 6 +- .../CompositeValuesCollectorQueue.java | 30 +- .../CompositeValuesSourceConfig.java | 2 +- .../DateHistogramValuesSourceBuilder.java | 2 +- .../bucket/composite/InternalComposite.java | 6 +- .../composite/PointsSortedDocsProducer.java | 6 +- .../composite/RoundingValuesSource.java | 24 +- .../AutoDateHistogramAggregator.java | 67 ++- .../histogram/DateHistogramAggregator.java | 60 ++- .../bucket/histogram/FilterRewriteHelper.java | 259 ------------ .../composite/CompositeAggregatorTests.java | 17 +- 14 files changed, 585 insertions(+), 378 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java delete mode 100644 server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b23128c64843a..2d01d1771d999 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057)) - Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023)) - Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083)) +- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083)) - Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087)) - Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528)) - Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java new file mode 100644 index 0000000000000..f377287d0b3bd --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -0,0 +1,385 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.IndexOrDocValuesQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.PointRangeQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.NumericUtils; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.Rounding; +import org.opensearch.common.lucene.search.function.FunctionScoreQuery; +import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.query.DateRangeIncludingNowQuery; +import org.opensearch.search.DocValueFormat; +import org.opensearch.search.aggregations.bucket.composite.CompositeKey; +import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig; +import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource; +import org.opensearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Utility class to help rewrite aggregations into filters. + * Instead of aggregation collects documents one by one, filter may count all documents that match in one pass. + *

+ * Currently supported rewrite: + *

+ * + * @opensearch.internal + */ +public final class FastFilterRewriteHelper { + + private FastFilterRewriteHelper() {} + + private static final int MAX_NUM_FILTER_BUCKETS = 1024; + private static final Map, Function> queryWrappers; + + // Initialize the wrapper map for unwrapping the query + static { + queryWrappers = new HashMap<>(); + queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery()); + queryWrappers.put(FunctionScoreQuery.class, q -> ((FunctionScoreQuery) q).getSubQuery()); + queryWrappers.put(DateRangeIncludingNowQuery.class, q -> ((DateRangeIncludingNowQuery) q).getQuery()); + queryWrappers.put(IndexOrDocValuesQuery.class, q -> ((IndexOrDocValuesQuery) q).getIndexQuery()); + } + + /** + * Recursively unwraps query into the concrete form + * for applying the optimization + */ + private static Query unwrapIntoConcreteQuery(Query query) { + while (queryWrappers.containsKey(query.getClass())) { + query = queryWrappers.get(query.getClass()).apply(query); + } + + return query; + } + + /** + * Finds the min and max bounds of field values for the shard + */ + private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException { + final List leaves = context.searcher().getIndexReader().leaves(); + long min = Long.MAX_VALUE, max = Long.MIN_VALUE; + // Since the query does not specify bounds for aggregation, we can + // build the global min/max from local min/max within each segment + for (LeafReaderContext leaf : leaves) { + final PointValues values = leaf.reader().getPointValues(fieldName); + if (values != null) { + min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); + max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); + } + } + + if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null; + + return new long[] { min, max }; + } + + /** + * This method also acts as a pre-condition check for the optimization, + * returns null if the optimization cannot be applied + */ + public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException { + final Query cq = unwrapIntoConcreteQuery(context.query()); + final long[] indexBounds = getIndexBounds(context, fieldName); + if (cq instanceof PointRangeQuery) { + final PointRangeQuery prq = (PointRangeQuery) cq; + // Ensure that the query and aggregation are on the same field + if (prq.getField().equals(fieldName)) { + return new long[] { + // Minimum bound for aggregation is the max between query and global + Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]), + // Maximum bound for aggregation is the min between query and global + Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) }; + } + } else if (cq instanceof MatchAllDocsQuery) { + return indexBounds; + } + // Check if the top-level query (which may be a PRQ on another field) is functionally match-all + Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { + if (weight.count(ctx) != ctx.reader().numDocs()) { + return null; + } + } + return indexBounds; + } + + /** + * Creates the date range filters for aggregations using the interval, min/max + * bounds and the rounding values + */ + private static Weight[] createFilterForAggregations( + final SearchContext context, + final long interval, + final Rounding.Prepared preparedRounding, + final String field, + final DateFieldMapper.DateFieldType fieldType, + long low, + final long high + ) throws IOException { + // Calculate the number of buckets using range and interval + long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + long prevRounded = roundedLow; + int bucketCount = 0; + while (roundedLow <= fieldType.convertNanosToMillis(high)) { + bucketCount++; + if (bucketCount > MAX_NUM_FILTER_BUCKETS) return null; + // Below rounding is needed as the interval could return in + // non-rounded values for something like calendar month + roundedLow = preparedRounding.round(roundedLow + interval); + if (prevRounded == roundedLow) break; // prevents getting into an infinite loop + prevRounded = roundedLow; + } + + Weight[] filters = null; + if (bucketCount > 0) { + filters = new Weight[bucketCount]; + roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + + int i = 0; + while (i < bucketCount) { + // Calculate the lower bucket bound + final byte[] lower = new byte[8]; + NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); + + // Calculate the upper bucket bound + roundedLow = preparedRounding.round(roundedLow + interval); + final byte[] upper = new byte[8]; + NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high : + // Subtract -1 if the minimum is roundedLow as roundedLow itself + // is included in the next bucket + fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); + + filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { + @Override + protected String toString(int dimension, byte[] value) { + return null; + } + }, ScoreMode.COMPLETE_NO_SCORES, 1); + } + } + + return filters; + } + + /** + * Context object to do fast filter optimization + */ + public static class FastFilterContext { + private Weight[] filters = null; + public AggregationType aggregationType; + + public FastFilterContext() {} + + private void setFilters(Weight[] filters) { + this.filters = filters; + } + + public void setAggregationType(AggregationType aggregationType) { + this.aggregationType = aggregationType; + } + + public boolean isRewriteable(final Object parent, final int subAggLength) { + return aggregationType.isRewriteable(parent, subAggLength); + } + + /** + * This filter build method is for date histogram aggregation type + * + * @param computeBounds get the lower and upper bound of the field in a shard search + * @param roundingFunction produce Rounding that contains interval of date range. + * Rounding is computed dynamically using the bounds in AutoDateHistogram + * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time + */ + public void buildFastFilter( + SearchContext context, + CheckedFunction computeBounds, + Function roundingFunction, + Supplier preparedRoundingSupplier + ) throws IOException { + assert this.aggregationType instanceof DateHistogramAggregationType; + DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType; + DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType(); + final long[] bounds = computeBounds.apply(aggregationType); + if (bounds == null) return; + + final Rounding rounding = roundingFunction.apply(bounds); + final OptionalLong intervalOpt = Rounding.getInterval(rounding); + if (intervalOpt.isEmpty()) return; + final long interval = intervalOpt.getAsLong(); + + // afterKey is the last bucket key in previous response, while the bucket key + // is the start of the bucket values, so add the interval + if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { + bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; + } + + final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( + context, + interval, + preparedRoundingSupplier.get(), + fieldType.name(), + fieldType, + bounds[0], + bounds[1] + ); + this.setFilters(filters); + } + } + + /** + * Different types have different pre-conditions, filter building logic, etc. + */ + public interface AggregationType { + boolean isRewriteable(Object parent, int subAggLength); + } + + /** + * For date histogram aggregation + */ + public static class DateHistogramAggregationType implements AggregationType { + private final MappedFieldType fieldType; + private final boolean missing; + private final boolean hasScript; + + public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { + this.fieldType = fieldType; + this.missing = missing; + this.hasScript = hasScript; + } + + @Override + public boolean isRewriteable(Object parent, int subAggLength) { + if (parent == null && subAggLength == 0 && !missing && !hasScript) { + return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; + } + return false; + } + + public DateFieldMapper.DateFieldType getFieldType() { + assert fieldType instanceof DateFieldMapper.DateFieldType; + return (DateFieldMapper.DateFieldType) fieldType; + } + } + + /** + * For composite aggregation with date histogram as a source + */ + public static class CompositeAggregationType extends DateHistogramAggregationType { + private final RoundingValuesSource valuesSource; + private long afterKey = -1L; + private final int size; + + public CompositeAggregationType( + CompositeValuesSourceConfig[] sourceConfigs, + CompositeKey rawAfterKey, + List formats, + int size + ) { + super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); + this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); + this.size = size; + if (rawAfterKey != null) { + assert rawAfterKey.size() == 1 && formats.size() == 1; + this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { + throw new IllegalArgumentException("now() is not supported in [after] key"); + }); + } + } + + public Rounding getRounding() { + return valuesSource.getRounding(); + } + + public Rounding.Prepared getRoundingPreparer() { + return valuesSource.getPreparedRounding(); + } + } + + public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) { + return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource; + } + + public static long getBucketOrd(long bucketOrd) { + if (bucketOrd < 0) { // already seen + bucketOrd = -1 - bucketOrd; + } + + return bucketOrd; + } + + /** + * This is executed for each segment by passing the leaf reader context + * + * @param incrementDocCount takes in the bucket key value and the bucket count + */ + public static boolean tryFastFilterAggregation( + final LeafReaderContext ctx, + FastFilterContext fastFilterContext, + final BiConsumer incrementDocCount + ) throws IOException { + if (fastFilterContext == null) return false; + if (fastFilterContext.filters == null) return false; + + final Weight[] filters = fastFilterContext.filters; + final int[] counts = new int[filters.length]; + int i; + for (i = 0; i < filters.length; i++) { + counts[i] = filters[i].count(ctx); + if (counts[i] == -1) { + // Cannot use the optimization if any of the counts + // is -1 indicating the segment might have deleted documents + return false; + } + } + + int s = 0; + int size = Integer.MAX_VALUE; + for (i = 0; i < filters.length; i++) { + if (counts[i] > 0) { + long bucketKey = i; // the index of filters is the key for filters aggregation + if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) { + final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType) + .getFieldType(); + bucketKey = fieldType.convertNanosToMillis( + NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) + ); + if (fastFilterContext.aggregationType instanceof CompositeAggregationType) { + size = ((CompositeAggregationType) fastFilterContext.aggregationType).size; + } + } + incrementDocCount.accept(bucketKey, counts[i]); + s++; + if (s > size) return true; + } + } + + return true; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index f2a4d5cd46127..36f94dc833a0b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -55,7 +55,9 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.search.comparators.LongComparator; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.RoaringDocIdSet; +import org.opensearch.common.Rounding; import org.opensearch.common.lease.Releasables; import org.opensearch.index.IndexSortConfig; import org.opensearch.lucene.queries.SearchAfterSortedDocQuery; @@ -70,7 +72,9 @@ import org.opensearch.search.aggregations.MultiBucketCollector; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper; import org.opensearch.search.aggregations.bucket.missing.MissingOrder; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.searchafter.SearchAfterBuilder; import org.opensearch.search.sort.SortAndFormats; @@ -79,6 +83,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.LongUnaryOperator; @@ -110,6 +115,10 @@ final class CompositeAggregator extends BucketsAggregator { private boolean earlyTerminated; + private final FastFilterRewriteHelper.FastFilterContext fastFilterContext; + private LongKeyedBucketOrds bucketOrds = null; + private Rounding.Prepared preparedRounding = null; + CompositeAggregator( String name, AggregatorFactories factories, @@ -153,12 +162,33 @@ final class CompositeAggregator extends BucketsAggregator { } this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; + + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return; + fastFilterContext.setAggregationType( + new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size) + ); + if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { + // bucketOrds is the data structure for saving date histogram results + bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); + // Currently the filter rewrite is only supported for date histograms + FastFilterRewriteHelper.CompositeAggregationType aggregationType = + (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType; + preparedRounding = aggregationType.getRoundingPreparer(); + fastFilterContext.buildFastFilter( + context, + fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), + x -> aggregationType.getRounding(), + () -> preparedRounding + ); + } } @Override protected void doClose() { try { Releasables.close(queue); + Releasables.close(bucketOrds); } finally { Releasables.close(sources); } @@ -186,12 +216,14 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I } int num = Math.min(size, queue.size()); - final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; + InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; + long[] bucketOrdsToCollect = new long[queue.size()]; for (int i = 0; i < queue.size(); i++) { bucketOrdsToCollect[i] = i; } InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect); + while (queue.size() > 0) { int slot = queue.pop(); CompositeKey key = queue.toCompositeKey(slot); @@ -207,6 +239,43 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I aggs ); } + + // Build results from fast filters optimization + if (bucketOrds != null) { + // CompositeKey is the value of bucket key + final Map bucketMap = new HashMap<>(); + // Some segments may not be optimized, so buckets may contain results from the queue. + for (InternalComposite.InternalBucket internalBucket : buckets) { + bucketMap.put(internalBucket.getRawKey(), internalBucket); + } + // Loop over the buckets in the bucketOrds, and populate the map accordingly + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0); + while (ordsEnum.next()) { + Long bucketKeyValue = ordsEnum.value(); + CompositeKey key = new CompositeKey(bucketKeyValue); + if (bucketMap.containsKey(key)) { + long docCount = bucketDocCount(ordsEnum.ord()) + bucketMap.get(key).getDocCount(); + bucketMap.get(key).setDocCount(docCount); + } else { + InternalComposite.InternalBucket bucket = new InternalComposite.InternalBucket( + sourceNames, + formats, + key, + reverseMuls, + missingOrders, + bucketDocCount(ordsEnum.ord()), + buildEmptySubAggregations() + ); + bucketMap.put(key, bucket); + } + } + // since a map is not sorted structure, sort it before transform back to buckets + List bucketList = new ArrayList<>(bucketMap.values()); + CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey); + buckets = bucketList.subList(0, Math.min(size, bucketList.size())).toArray(InternalComposite.InternalBucket[]::new); + num = buckets.length; + } + CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; return new InternalAggregation[] { new InternalComposite( @@ -295,7 +364,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException if (indexSortField.getReverse() != (source.reverseMul == -1)) { if (i == 0) { - // the leading index sort matches the leading source field but the order is reversed + // the leading index sort matches the leading source field, but the order is reversed, // so we don't check the other sources. return new Sort(indexSortField); } @@ -303,8 +372,8 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException } sortFields.add(indexSortField); if (sourceConfig.valuesSource() instanceof RoundingValuesSource) { - // the rounding "squashes" many values together, that breaks the ordering of sub-values - // so we ignore subsequent source even if they match the index sort. + // the rounding "squashes" many values together, that breaks the ordering of sub-values, + // so we ignore the subsequent sources even if they match the index sort. break; } } @@ -447,6 +516,16 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( + ctx, + fastFilterContext, + (key, count) -> incrementBucketDocCount( + FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), + count + ) + ); + if (optimized) throw new CollectionTerminatedException(); + finishLeaf(); boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; @@ -476,9 +555,10 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); } if (rawAfterKey != null && sortPrefixLen > 0) { - // We have an after key and index sort is applicable so we jump directly to the doc - // that is after the index sort prefix using the rawAfterKey and we start collecting - // document from there. + // We have an after key and index sort is applicable, so we jump directly to the doc + // after the index sort prefix using the rawAfterKey and we start collecting + // documents from there. + assert indexSortPrefix != null; processLeafFromQuery(ctx, indexSortPrefix); throw new CollectionTerminatedException(); } else { @@ -506,6 +586,8 @@ public void collect(int doc, long bucket) throws IOException { try { long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(indexSortPrefix, docCount)) { + // one doc may contain multiple values, we iterate over and collect one by one + // so the same doc can appear multiple times here if (builder != null && lastDoc != doc) { builder.add(doc); lastDoc = doc; @@ -568,7 +650,7 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec @Override public void collect(int doc, long zeroBucket) throws IOException { assert zeroBucket == 0; - Integer slot = queue.compareCurrent(); + Integer slot = queue.getCurrentSlot(); if (slot != null) { // The candidate key is a top bucket. // We can defer the collection of this document/bucket to the sub collector diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java index 5ddeb22d33a6f..338ebdc66eef7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java @@ -44,7 +44,7 @@ * * @opensearch.internal */ -class CompositeKey implements Writeable { +public class CompositeKey implements Writeable { private final Comparable[] values; CompositeKey(Comparable... values) { @@ -64,11 +64,11 @@ Comparable[] values() { return values; } - int size() { + public int size() { return values.length; } - Comparable get(int pos) { + public Comparable get(int pos) { assert pos < values.length; return values[pos]; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 6ee1682a7b196..2c4d451322bca 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -47,6 +47,8 @@ /** * A specialized {@link PriorityQueue} implementation for composite buckets. + * Can think of this as a max heap that holds the top small buckets slots in order. + * Each slot holds the values of the composite bucket key it represents. * * @opensearch.internal */ @@ -77,7 +79,7 @@ public int hashCode() { private final BigArrays bigArrays; private final int maxSize; - private final Map map; + private final Map map; // to quickly find the slot for a value private final SingleDimensionValuesSource[] arrays; private LongArray docCounts; @@ -108,7 +110,7 @@ public int hashCode() { @Override protected boolean lessThan(Integer a, Integer b) { - return compare(a, b) > 0; + return compare(a, b) > 0; // max heap } /** @@ -119,10 +121,10 @@ boolean isFull() { } /** - * Compares the current candidate with the values in the queue and returns + * Try to get the slot of the current/candidate values in the queue and returns * the slot if the candidate is already in the queue or null if the candidate is not present. */ - Integer compareCurrent() { + Integer getCurrentSlot() { return map.get(new Slot(CANDIDATE_SLOT)); } @@ -281,32 +283,34 @@ boolean addIfCompetitive(long inc) { */ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // checks if the candidate key is competitive - Integer topSlot = compareCurrent(); - if (topSlot != null) { + Integer curSlot = getCurrentSlot(); + if (curSlot != null) { // this key is already in the top N, skip it - docCounts.increment(topSlot, inc); + docCounts.increment(curSlot, inc); return true; } + if (afterKeyIsSet) { int cmp = compareCurrentWithAfter(); if (cmp <= 0) { if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) { - // the leading index sort is in the reverse order of the leading source + // the leading index sort is and the leading source order are both reversed, // so we can early terminate when we reach a document that is smaller // than the after key (collected on a previous page). throw new CollectionTerminatedException(); } - // key was collected on a previous page, skip it (>= afterKey). + // the key was collected on a previous page, skip it. return false; } } + + // the heap is full, check if the candidate key larger than max heap top if (size() >= maxSize) { - // the tree map is full, check if the candidate key should be kept int cmp = compare(CANDIDATE_SLOT, top()); if (cmp > 0) { if (cmp <= indexSortSourcePrefix) { - // index sort guarantees that there is no key greater or equal than the - // current one in the subsequent documents so we can early terminate. + // index sort guarantees the following documents will have a key larger than the current candidate, + // so we can early terminate. throw new CollectionTerminatedException(); } // the candidate key is not competitive, skip it. @@ -324,7 +328,7 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { } else { newSlot = size(); } - // move the candidate key to its new slot + // move the candidate key to its new slot by copy its values to the new slot copyCurrent(newSlot, inc); map.put(new Slot(newSlot), newSlot); add(newSlot); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java index 788a4ddc15374..5289b3a34ab34 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java @@ -156,7 +156,7 @@ public MissingOrder missingOrder() { /** * Returns true if the source contains a script that can change the value. */ - protected boolean hasScript() { + public boolean hasScript() { return hasScript; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index 4ec309b819183..cf2f70a548913 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -303,7 +303,7 @@ public static void register(ValuesSourceRegistry.Builder builder) { // TODO once composite is plugged in to the values source registry or at least understands Date values source types use it // here Rounding.Prepared preparedRounding = rounding.prepareForUnknown(); - RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding); + RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding, rounding); // is specified in the builder. final DocValueFormat docValueFormat = format == null ? DocValueFormat.RAW : valuesSourceConfig.format(); final MappedFieldType fieldType = valuesSourceConfig.fieldType(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java index fab9d11dd33e2..e4d9052f5c1a4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java @@ -351,7 +351,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern KeyComparable { private final CompositeKey key; - private final long docCount; + private long docCount; private final InternalAggregations aggregations; private final transient int[] reverseMuls; private final transient MissingOrder[] missingOrders; @@ -448,6 +448,10 @@ public long getDocCount() { return docCount; } + public void setDocCount(long docCount) { + this.docCount = docCount; + } + @Override public Aggregations getAggregations() { return aggregations; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index 3d6730203b6ae..dc130eb54c0ea 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -68,6 +68,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade // no value for the field return DocIdSet.EMPTY; } + long lowerBucket = Long.MIN_VALUE; Comparable lowerValue = queue.getLowerValueLeadSource(); if (lowerValue != null) { @@ -76,7 +77,6 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade } lowerBucket = (Long) lowerValue; } - long upperBucket = Long.MAX_VALUE; Comparable upperValue = queue.getUpperValueLeadSource(); if (upperValue != null) { @@ -85,6 +85,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade } upperBucket = (Long) upperValue; } + DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null; Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket); try { @@ -146,6 +147,7 @@ public void visit(int docID, byte[] packedValue) throws IOException { } long bucket = bucketFunction.applyAsLong(packedValue); + // process previous bucket when new bucket appears if (first == false && bucket != lastBucket) { final DocIdSet docIdSet = bucketDocsBuilder.build(); if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && @@ -182,13 +184,13 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue return PointValues.Relation.CELL_OUTSIDE_QUERY; } } - if (upperBucket != Long.MAX_VALUE) { long minBucket = bucketFunction.applyAsLong(minPackedValue); if (minBucket > upperBucket) { return PointValues.Relation.CELL_OUTSIDE_QUERY; } } + return PointValues.Relation.CELL_CROSSES_QUERY; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java index 89315724ff9ed..3f5cf919f1755 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java @@ -47,17 +47,19 @@ * * @opensearch.internal */ -class RoundingValuesSource extends ValuesSource.Numeric { +public class RoundingValuesSource extends ValuesSource.Numeric { private final ValuesSource.Numeric vs; - private final Rounding.Prepared rounding; + private final Rounding.Prepared preparedRounding; + private final Rounding rounding; /** - * - * @param vs The original values source - * @param rounding How to round the values + * @param vs The original values source + * @param preparedRounding How to round the values + * @param rounding The rounding strategy */ - RoundingValuesSource(Numeric vs, Rounding.Prepared rounding) { + RoundingValuesSource(Numeric vs, Rounding.Prepared preparedRounding, Rounding rounding) { this.vs = vs; + this.preparedRounding = preparedRounding; this.rounding = rounding; } @@ -71,8 +73,16 @@ public boolean isBigInteger() { return false; } + public Rounding.Prepared getPreparedRounding() { + return preparedRounding; + } + + public Rounding getRounding() { + return rounding; + } + public long round(long value) { - return rounding.round(value); + return preparedRounding.round(value); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index a71c15d551927..0ea820abbedf4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -33,8 +33,8 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Rounding; import org.opensearch.common.Rounding.Prepared; @@ -42,7 +42,6 @@ import org.opensearch.common.util.IntArray; import org.opensearch.common.util.LongArray; import org.opensearch.core.common.util.ByteArray; -import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -53,6 +52,7 @@ import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator; import org.opensearch.search.aggregations.bucket.DeferringBucketCollector; +import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper; import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector; import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; @@ -127,14 +127,14 @@ static AutoDateHistogramAggregator build( * {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}. */ private MergingBucketsDeferringCollector deferringCollector; - private final Weight[] filters; - private final DateFieldMapper.DateFieldType fieldType; protected final RoundingInfo[] roundingInfos; protected final int targetBuckets; protected int roundingIdx; protected Rounding.Prepared preparedRounding; + private final FastFilterRewriteHelper.FastFilterContext fastFilterContext; + private AutoDateHistogramAggregator( String name, AggregatorFactories factories, @@ -156,23 +156,23 @@ private AutoDateHistogramAggregator( this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); - FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( - parent(), - subAggregators.length, - context, - b -> getMinimumRounding(b[0], b[1]), - // Passing prepared rounding as supplier to ensure the correct prepared - // rounding is set as it is done during getMinimumRounding - () -> preparedRounding, - valuesSourceConfig, - fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field()) + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext.setAggregationType( + new FastFilterRewriteHelper.DateHistogramAggregationType( + valuesSourceConfig.fieldType(), + valuesSourceConfig.missing() != null, + valuesSourceConfig.script() != null + ) ); - if (filterContext != null) { - fieldType = filterContext.fieldType; - filters = filterContext.filters; - } else { - fieldType = null; - filters = null; + if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { + fastFilterContext.buildFastFilter( + context, + fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), + b -> getMinimumRounding(b[0], b[1]), + // Passing prepared rounding as supplier to ensure the correct prepared + // rounding is set as it is done during getMinimumRounding + () -> preparedRounding + ); } } @@ -226,28 +226,21 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc return LeafBucketCollector.NO_OP_COLLECTOR; } + boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( + ctx, + fastFilterContext, + (key, count) -> incrementBucketDocCount( + FastFilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))), + count + ) + ); + if (optimized) throw new CollectionTerminatedException(); + final SortedNumericDocValues values = valuesSource.longValues(ctx); final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub); - - // Need to be declared as final and array for usage within the - // LeafBucketCollectorBase subclass below - final boolean[] useOpt = new boolean[1]; - useOpt[0] = filters != null; - return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - // Try fast filter aggregation if the filters have been created - // Skip if tried before and gave incorrect/incomplete results - if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))), - count - ); - }); - } - iteratingCollector.collect(doc, owningBucketOrd); } }; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 8437e1dce9fe0..b95bd093b82a6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -33,13 +33,12 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; import org.opensearch.common.lease.Releasables; -import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -49,8 +48,8 @@ import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; -import org.opensearch.search.aggregations.support.FieldContext; import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; @@ -81,9 +80,9 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final long minDocCount; private final LongBounds extendedBounds; private final LongBounds hardBounds; - private final Weight[] filters; private final LongKeyedBucketOrds bucketOrds; - private final DateFieldMapper.DateFieldType fieldType; + + private final FastFilterRewriteHelper.FastFilterContext fastFilterContext; DateHistogramAggregator( String name, @@ -116,26 +115,21 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality); - FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( - parent, - subAggregators.length, - context, - x -> rounding, - () -> preparedRounding, - valuesSourceConfig, - this::computeBounds + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext.setAggregationType( + new FastFilterRewriteHelper.DateHistogramAggregationType( + valuesSourceConfig.fieldType(), + valuesSourceConfig.missing() != null, + valuesSourceConfig.script() != null + ) ); - if (filterContext != null) { - fieldType = filterContext.fieldType; - filters = filterContext.filters; - } else { - filters = null; - fieldType = null; + if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { + fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding); } } - private long[] computeBounds(final FieldContext fieldContext) throws IOException { - final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.field()); + private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException { + final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); if (bounds != null) { // Update min/max limit if user specified any hard bounds if (hardBounds != null) { @@ -160,26 +154,20 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - // Need to be declared as final and array for usage within the - // LeafBucketCollectorBase subclass below - final boolean[] useOpt = new boolean[1]; - useOpt[0] = filters != null; + boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( + ctx, + fastFilterContext, + (key, count) -> incrementBucketDocCount( + FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), + count + ) + ); + if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - // Try fast filter aggregation if the filters have been created - // Skip if tried before and gave incorrect/incomplete results - if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd(bucketOrds.add(owningBucketOrd, preparedRounding.round(key))), - count - ); - }); - } - if (values.advanceExact(doc)) { int valuesCount = values.docValueCount(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java deleted file mode 100644 index 29cecd5b382cd..0000000000000 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.search.aggregations.bucket.histogram; - -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.PointValues; -import org.apache.lucene.search.CollectionTerminatedException; -import org.apache.lucene.search.ConstantScoreQuery; -import org.apache.lucene.search.IndexOrDocValuesQuery; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.PointRangeQuery; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Weight; -import org.apache.lucene.util.NumericUtils; -import org.opensearch.common.CheckedFunction; -import org.opensearch.common.Rounding; -import org.opensearch.common.lucene.search.function.FunctionScoreQuery; -import org.opensearch.index.mapper.DateFieldMapper; -import org.opensearch.index.query.DateRangeIncludingNowQuery; -import org.opensearch.search.aggregations.support.FieldContext; -import org.opensearch.search.aggregations.support.ValuesSourceConfig; -import org.opensearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.OptionalLong; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.function.Supplier; - -/** - * Helpers functions to rewrite and optimize aggregations using - * range filter queries - * - * @opensearch.internal - */ -public class FilterRewriteHelper { - - static class FilterContext { - final DateFieldMapper.DateFieldType fieldType; - final Weight[] filters; - - public FilterContext(DateFieldMapper.DateFieldType fieldType, Weight[] filters) { - this.fieldType = fieldType; - this.filters = filters; - } - } - - private static final int MAX_NUM_FILTER_BUCKETS = 1024; - private static final Map, Function> queryWrappers; - - // Initialize the wrappers map for unwrapping the query - static { - queryWrappers = new HashMap<>(); - queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery()); - queryWrappers.put(FunctionScoreQuery.class, q -> ((FunctionScoreQuery) q).getSubQuery()); - queryWrappers.put(DateRangeIncludingNowQuery.class, q -> ((DateRangeIncludingNowQuery) q).getQuery()); - queryWrappers.put(IndexOrDocValuesQuery.class, q -> ((IndexOrDocValuesQuery) q).getIndexQuery()); - } - - /** - * Recursively unwraps query into the concrete form - * for applying the optimization - */ - private static Query unwrapIntoConcreteQuery(Query query) { - while (queryWrappers.containsKey(query.getClass())) { - query = queryWrappers.get(query.getClass()).apply(query); - } - - return query; - } - - /** - * Finds the min and max bounds for segments within the passed search context - */ - private static long[] getIndexBoundsFromLeaves(final SearchContext context, final String fieldName) throws IOException { - final List leaves = context.searcher().getIndexReader().leaves(); - long min = Long.MAX_VALUE, max = Long.MIN_VALUE; - // Since the query does not specify bounds for aggregation, we can - // build the global min/max from local min/max within each segment - for (LeafReaderContext leaf : leaves) { - final PointValues values = leaf.reader().getPointValues(fieldName); - if (values != null) { - min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); - max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); - } - } - - if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null; - - return new long[] { min, max }; - } - - static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException { - final Query cq = unwrapIntoConcreteQuery(context.query()); - final long[] indexBounds = getIndexBoundsFromLeaves(context, fieldName); - if (cq instanceof PointRangeQuery) { - final PointRangeQuery prq = (PointRangeQuery) cq; - // Ensure that the query and aggregation are on the same field - if (prq.getField().equals(fieldName)) { - return new long[] { - // Minimum bound for aggregation is the max between query and global - Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]), - // Maximum bound for aggregation is the min between query and global - Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) }; - } - } else if (cq instanceof MatchAllDocsQuery) { - return indexBounds; - } - - return null; - } - - /** - * Creates the range query filters for aggregations using the interval, min/max - * bounds and the rounding values - */ - private static Weight[] createFilterForAggregations( - final SearchContext context, - final Rounding rounding, - final Rounding.Prepared preparedRounding, - final String field, - final DateFieldMapper.DateFieldType fieldType, - final long low, - final long high - ) throws IOException { - final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) { - return null; - } - - final long interval = intervalOpt.getAsLong(); - // Calculate the number of buckets using range and interval - long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); - long prevRounded = roundedLow; - int bucketCount = 0; - while (roundedLow <= fieldType.convertNanosToMillis(high)) { - bucketCount++; - // Below rounding is needed as the interval could return in - // non-rounded values for something like calendar month - roundedLow = preparedRounding.round(roundedLow + interval); - if (prevRounded == roundedLow) break; - prevRounded = roundedLow; - } - - Weight[] filters = null; - if (bucketCount > 0 && bucketCount <= MAX_NUM_FILTER_BUCKETS) { - int i = 0; - filters = new Weight[bucketCount]; - roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); - while (i < bucketCount) { - // Calculate the lower bucket bound - final byte[] lower = new byte[8]; - NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); - // Calculate the upper bucket bound - final byte[] upper = new byte[8]; - roundedLow = preparedRounding.round(roundedLow + interval); - // Subtract -1 if the minimum is roundedLow as roundedLow itself - // is included in the next bucket - NumericUtils.longToSortableBytes( - i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1, - upper, - 0 - ); - filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { - @Override - protected String toString(int dimension, byte[] value) { - return null; - } - }, ScoreMode.COMPLETE_NO_SCORES, 1); - } - } - - return filters; - } - - static FilterContext buildFastFilterContext( - final Object parent, - final int subAggLength, - SearchContext context, - Function roundingFunction, - Supplier preparedRoundingSupplier, - ValuesSourceConfig valuesSourceConfig, - CheckedFunction computeBounds - ) throws IOException { - // Create the filters for fast aggregation only if the query is instance - // of point range query and there aren't any parent/sub aggregations - if (parent == null && subAggLength == 0 && valuesSourceConfig.missing() == null && valuesSourceConfig.script() == null) { - final FieldContext fieldContext = valuesSourceConfig.fieldContext(); - if (fieldContext != null) { - final String fieldName = fieldContext.field(); - final long[] bounds = computeBounds.apply(fieldContext); - if (bounds != null) { - assert fieldContext.fieldType() instanceof DateFieldMapper.DateFieldType; - final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fieldContext.fieldType(); - final Rounding rounding = roundingFunction.apply(bounds); - final Weight[] filters = FilterRewriteHelper.createFilterForAggregations( - context, - rounding, - preparedRoundingSupplier.get(), - fieldName, - fieldType, - bounds[0], - bounds[1] - ); - return new FilterContext(fieldType, filters); - } - } - } - return null; - } - - static long getBucketOrd(long bucketOrd) { - if (bucketOrd < 0) { // already seen - bucketOrd = -1 - bucketOrd; - } - - return bucketOrd; - } - - static boolean tryFastFilterAggregation( - final LeafReaderContext ctx, - final Weight[] filters, - final DateFieldMapper.DateFieldType fieldType, - final BiConsumer incrementDocCount - ) throws IOException { - final int[] counts = new int[filters.length]; - int i; - for (i = 0; i < filters.length; i++) { - counts[i] = filters[i].count(ctx); - if (counts[i] == -1) { - // Cannot use the optimization if any of the counts - // is -1 indicating the segment might have deleted documents - return false; - } - } - - for (i = 0; i < filters.length; i++) { - if (counts[i] > 0) { - incrementDocCount.accept( - fieldType.convertNanosToMillis( - NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) - ), - counts[i] - ); - } - } - throw new CollectionTerminatedException(); - } -} diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index eabc4b7764eed..b581e552fec4f 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -1279,7 +1279,7 @@ public void testWithDateHistogram() throws IOException { }, (result) -> { assertEquals(3, result.getBuckets().size()); - assertEquals("{date=1508457600000}", result.afterKey().toString()); + assertEquals("{date=1508457600000}", result.afterKey().toString()); // 2017-10-20T00:00:00 assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); assertEquals(2L, result.getBuckets().get(0).getDocCount()); assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString()); @@ -1300,9 +1300,8 @@ public void testWithDateHistogram() throws IOException { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") .calendarInterval(DateHistogramInterval.days(1)); return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).aggregateAfter( - createAfterKey("date", 1474329600000L) + createAfterKey("date", 1474329600000L) // 2016-09-20T00:00:00 ); - }, (result) -> { assertEquals(2, result.getBuckets().size()); @@ -2242,21 +2241,20 @@ private , V extends Comparable> void testRandomTerms( Function transformKey ) throws IOException { int numTerms = randomIntBetween(10, 500); - List terms = new ArrayList<>(); + List terms = new ArrayList<>(); // possible values for the terms for (int i = 0; i < numTerms; i++) { terms.add(randomSupplier.get()); } int numDocs = randomIntBetween(100, 200); List>> dataset = new ArrayList<>(); - - Set valuesSet = new HashSet<>(); - Map, AtomicLong> expectedDocCounts = new HashMap<>(); + Set valuesSet = new HashSet<>(); // how many different values + Map, AtomicLong> expectedDocCounts = new HashMap<>(); // how many docs for each value for (int i = 0; i < numDocs; i++) { int numValues = randomIntBetween(1, 5); Set values = new HashSet<>(); for (int j = 0; j < numValues; j++) { int rand = randomIntBetween(0, terms.size() - 1); - if (values.add(terms.get(rand))) { + if (values.add(terms.get(rand))) { // values are unique for one doc AtomicLong count = expectedDocCounts.computeIfAbsent(terms.get(rand), (k) -> new AtomicLong(0)); count.incrementAndGet(); valuesSet.add(terms.get(rand)); @@ -2264,9 +2262,8 @@ private , V extends Comparable> void testRandomTerms( } dataset.add(Collections.singletonMap(field, new ArrayList<>(values))); } - List expected = new ArrayList<>(valuesSet); + List expected = new ArrayList<>(valuesSet); // how many buckets expected Collections.sort(expected); - List> seen = new ArrayList<>(); AtomicBoolean finish = new AtomicBoolean(false); int size = randomIntBetween(1, expected.size());