diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index 7d248589aef8e..57afe072c865c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -165,30 +165,22 @@ private static Weight[] createFilterForAggregations( while (i < bucketCount) { // Calculate the lower bucket bound final byte[] lower = new byte[8]; - NumericUtils.longToSortableBytes( - i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), - lower, 0 - ); + NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); // Calculate the upper bucket bound roundedLow = preparedRounding.round(roundedLow + interval); final byte[] upper = new byte[8]; - NumericUtils.longToSortableBytes( - i + 1 == bucketCount ? high : - // Subtract -1 if the minimum is roundedLow as roundedLow itself - // is included in the next bucket - fieldType.convertRoundedMillisToNanos(roundedLow) - 1, - upper, - 0 - ); - - filters[i++] = context.searcher().createWeight( - new PointRangeQuery(field, lower, upper, 1) { - @Override - protected String toString(int dimension, byte[] value) { - return null; - } - }, ScoreMode.COMPLETE_NO_SCORES, 1); + NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high : + // Subtract -1 if the minimum is roundedLow as roundedLow itself + // is included in the next bucket + fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); + + filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { + @Override + protected String toString(int dimension, byte[] value) { + return null; + } + }, ScoreMode.COMPLETE_NO_SCORES, 1); } } @@ -305,8 +297,7 @@ public static boolean tryFastFilterAggregation( if (counts[i] > 0) { incrementDocCount.accept( fieldType.convertNanosToMillis( - NumericUtils.sortableBytesToLong( - ((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) + NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ), counts[i] ); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index a58945118bbf3..53f40d0f91698 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -73,12 +73,12 @@ import org.opensearch.search.aggregations.MultiBucketCollector; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.searchafter.SearchAfterBuilder; import org.opensearch.search.sort.SortAndFormats; -import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import java.io.IOException; import java.util.ArrayList; @@ -519,7 +519,10 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t if (scorer != null) { DocIdSetIterator docIt = scorer.iterator(); - final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length)); + final LeafBucketCollector inner = queue.getLeafCollector( + ctx, + getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length) + ); inner.setScorer(scorer); final Bits liveDocs = ctx.reader().getLiveDocs(); @@ -533,15 +536,9 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { - boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - bucketOrds.add(0, preparedRounding.round(key)) - ), - count - ); - }, size); + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { + incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count); + }, size); if (optimized) throw new CollectionTerminatedException(); finishLeaf(); @@ -673,7 +670,8 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec @Override public void collect(int doc, long zeroBucket) throws IOException { assert zeroBucket == 0; - Integer slot = queue.compareCurrent(); // TODO reading queue will make sure current value presents through collection mechanism + Integer slot = queue.compareCurrent(); // TODO reading queue will make sure current value presents through collection + // mechanism if (slot != null) { // The candidate key is a top bucket. // We can defer the collection of this document/bucket to the sub collector diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index ae74d5c72df88..e078ffca1597c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -204,10 +204,7 @@ boolean equals(int slot1, int slot2) { int hashCode(int slot) { int result = 1; for (int i = 0; i < arrays.length; i++) { - result = 31 * result + - (slot == CANDIDATE_SLOT ? - arrays[i].hashCodeCurrent() : - arrays[i].hashCode(slot)); + result = 31 * result + (slot == CANDIDATE_SLOT ? arrays[i].hashCodeCurrent() : arrays[i].hashCode(slot)); } return result; } @@ -310,12 +307,14 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading } if (size() >= maxSize) { // TODO reading when queue is full, can check competitiveness - // the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite key/bucket/slot + // the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite + // key/bucket/slot int cmp = compare(CANDIDATE_SLOT, top()); if (cmp > 0) { // TODO reading current large than queue if (cmp <= indexSortSourcePrefix) { // TODO reading the way of comparing current and queue uses sorted fields // index sort guarantees that there is no key greater or equal than the - // current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items using heap? + // current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items + // using heap? throw new CollectionTerminatedException(); } // the candidate key is not competitive, skip it. diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index 3fdd0b3c12a8e..3926ce9bbecb7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -316,8 +316,7 @@ public static void register(ValuesSourceRegistry.Builder builder) { IndexReader reader, int size, LongConsumer addRequestCircuitBreakerBytes, - CompositeValuesSourceConfig compositeValuesSourceConfig - ) -> { + CompositeValuesSourceConfig compositeValuesSourceConfig) -> { final RoundingValuesSource roundingValuesSource = (RoundingValuesSource) compositeValuesSourceConfig.valuesSource(); return new LongValuesSource( bigArrays, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java index 445bbb831370a..48e080c1576dd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -275,11 +275,13 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer case "long": toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0)); break; + case "int": case "short": case "byte": toBucketFunction = (value) -> rounding.applyAsLong(IntPoint.decodeDimension(value, 0)); break; + default: return null; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index ae6bfeac05db1..628fab55b5411 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -151,7 +151,7 @@ public void visit(int docID, byte[] packedValue) throws IOException { if (first == false && bucket != lastBucket) { // process previous bucket when new bucket appears final DocIdSet docIdSet = bucketDocsBuilder.build(); if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && - // lower bucket is inclusive + // lower bucket is inclusive lowerBucket != lastBucket) { // this bucket does not have any competitive composite buckets, // we can early terminate the collection because the remaining buckets are guaranteed @@ -170,8 +170,7 @@ public void visit(int docID, byte[] packedValue) throws IOException { @Override public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { - if ((upperPointQuery != null - && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0) + if ((upperPointQuery != null && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0) || (lowerPointQuery != null && Arrays.compareUnsigned(maxPackedValue, 0, bytesPerDim, lowerPointQuery, 0, bytesPerDim) < 0)) { // does not match the query diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index c6917b9a85e75..0ef546e98a5ba 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -162,7 +162,8 @@ private AutoDateHistogramAggregator( valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, valuesSourceConfig.fieldType(), - 0); + 0 + ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent(), subAggregators.length, @@ -233,15 +234,9 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc return LeafBucketCollector.NO_OP_COLLECTOR; } - boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - getBucketOrds().add(0, preparedRounding.round(key)) - ), - count - ); - }, Integer.MAX_VALUE); + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { + incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))), count); + }, Integer.MAX_VALUE); if (optimized) throw new CollectionTerminatedException(); final SortedNumericDocValues values = valuesSource.longValues(ctx); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index da68826cbbba8..11fff40e0542b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -121,7 +121,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, valuesSourceConfig.fieldType(), - 0); + 0 + ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent, subAggregators.length, @@ -166,15 +167,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - bucketOrds.add(0, preparedRounding.round(key)) - ), - count - ); - }, Integer.MAX_VALUE); + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { + incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count); + }, Integer.MAX_VALUE); if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); @@ -210,33 +205,29 @@ public void collect(int doc, long owningBucketOrd) throws IOException { @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - return buildAggregationsForVariableBuckets( - owningBucketOrds, - bucketOrds, - (bucketValue, docCount, subAggregationResults) -> { - return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults); - }, - (owningBucketOrd, buckets) -> { - // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); + return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> { + return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults); + }, (owningBucketOrd, buckets) -> { + // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); - // value source will be null for unmapped fields - // Important: use `rounding` here, not `shardRounding` - InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 - ? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds) - : null; - return new InternalDateHistogram( - name, - buckets, - order, - minDocCount, - rounding.offset(), - emptyBucketInfo, - formatter, - keyed, - metadata() - ); - }); + // value source will be null for unmapped fields + // Important: use `rounding` here, not `shardRounding` + InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 + ? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds) + : null; + return new InternalDateHistogram( + name, + buckets, + order, + minDocCount, + rounding.offset(), + emptyBucketInfo, + formatter, + keyed, + metadata() + ); + }); } @Override