From d0f598921501edcfa0f6855e45008feb913b50b4 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 22 Nov 2023 17:14:43 -0800 Subject: [PATCH 01/18] reading Signed-off-by: bowenlan-amzn --- .../bucket/composite/CompositeAggregator.java | 48 ++++++++++++------- .../CompositeValuesCollectorQueue.java | 39 +++++++++------ .../DateHistogramValuesSourceBuilder.java | 3 +- .../bucket/composite/LongValuesSource.java | 5 +- .../composite/PointsSortedDocsProducer.java | 14 +++--- .../SingleDimensionValuesSource.java | 3 +- .../bucket/composite/SortedDocsProducer.java | 13 +++-- 7 files changed, 77 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index f2a4d5cd46127..d8bbf42b396fd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -187,11 +187,13 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I int num = Math.min(size, queue.size()); final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; + long[] bucketOrdsToCollect = new long[queue.size()]; for (int i = 0; i < queue.size(); i++) { - bucketOrdsToCollect[i] = i; + bucketOrdsToCollect[i] = i; // TODO reading meaning queue is indexed with bucket key, and contains doccount } InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect); + while (queue.size() > 0) { int slot = queue.pop(); CompositeKey key = queue.toCompositeKey(slot); @@ -207,6 +209,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I aggs ); } + CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; return new InternalAggregation[] { new InternalComposite( @@ -283,7 +286,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException for (int i = 0; i < end; i++) { CompositeValuesSourceConfig sourceConfig = sourceConfigs[i]; SingleDimensionValuesSource source = sources[i]; - SortField indexSortField = indexSort.getSort()[i]; + SortField indexSortField = indexSort.getSort()[i]; // TODO reading requiring the order should match if (source.fieldType == null // TODO: can we handle missing bucket when using index sort optimization ? || source.missingBucket @@ -324,7 +327,8 @@ private int computeSortPrefixLen(Sort indexSortPrefix) { if (indexSortPrefix == null) { return 0; } - if (indexSortPrefix.getSort()[0].getReverse() != (sources[0].reverseMul == -1)) { + if (indexSortPrefix.getSort()[0].getReverse() // TODO reading sort optimization is reversed + != (sources[0].reverseMul == -1)) { // TODO reading aggregation sort param is desc assert indexSortPrefix.getSort().length == 1; return -1; } else { @@ -416,6 +420,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t for (int i = 0; i < formats.length; i++) { formats[i] = sources[i].format; } + // TODO reading sort and search after with criteria FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc( new SortAndFormats(indexSortPrefix, formats), Arrays.copyOfRange(rawAfterKey.values(), 0, formats.length) @@ -429,13 +434,12 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t .build(); Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f); Scorer scorer = weight.scorer(ctx); + if (scorer != null) { DocIdSetIterator docIt = scorer.iterator(); - final LeafBucketCollector inner = queue.getLeafCollector( - ctx, - getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length) - ); + final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length)); inner.setScorer(scorer); + final Bits liveDocs = ctx.reader().getLiveDocs(); while (docIt.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { if (liveDocs == null || liveDocs.get(docIt.docID())) { @@ -449,13 +453,14 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { finishLeaf(); - boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; + boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; // TODO reading subAggs are deferred Sort indexSortPrefix = buildIndexSortPrefix(ctx); - int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); + int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); // TODO reading asc index sort exists + // are there index sort enabled? sortPrefixLen SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0 - ? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) + ? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) // TODO reading only using the first field : null; if (sortedDocsProducer != null) { // Visit documents sorted by the leading source of the composite definition and terminates @@ -463,25 +468,26 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket // in the queue. DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet); if (fillDocIdSet) { - entries.add(new Entry(ctx, docIdSet)); + entries.add(new Entry(ctx, docIdSet)); // TODO reading add entries } // We can bypass search entirely for this segment, the processing is done in the previous call. // Throwing this exception will terminate the execution of the search for this root aggregation, // see {@link MultiCollector} for more details on how we handle early termination in aggregations. earlyTerminated = true; throw new CollectionTerminatedException(); - } else { + } else { // TODO reading index sort not enabled if (fillDocIdSet) { currentLeaf = ctx; docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); } if (rawAfterKey != null && sortPrefixLen > 0) { - // We have an after key and index sort is applicable so we jump directly to the doc + // We have an after key and index sort is applicable, so we jump directly to the doc // that is after the index sort prefix using the rawAfterKey and we start collecting // document from there. processLeafFromQuery(ctx, indexSortPrefix); throw new CollectionTerminatedException(); } else { + // rawAfterKey == null || sort order is reversed final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen)); return new LeafBucketCollector() { @Override @@ -506,7 +512,7 @@ public void collect(int doc, long bucket) throws IOException { try { long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(indexSortPrefix, docCount)) { - if (builder != null && lastDoc != doc) { + if (builder != null && lastDoc != doc) { // TODO reading how can lastDoc == doc? builder.add(doc); lastDoc = doc; } @@ -530,14 +536,18 @@ private void runDeferredCollections() throws IOException { Query query = context.query(); weight = context.searcher().createWeight(context.searcher().rewrite(query), ScoreMode.COMPLETE, 1f); } + deferredCollectors.preCollection(); - for (Entry entry : entries) { + + for (Entry entry : entries) { // TODO reading entry is the leaf DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator(); if (docIdSetIterator == null) { continue; } + final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context); final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector)); + DocIdSetIterator scorerIt = null; if (needsScores) { Scorer scorer = weight.scorer(entry.context); @@ -546,9 +556,10 @@ private void runDeferredCollections() throws IOException { subCollector.setScorer(scorer); } } + int docID; while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (needsScores) { + if (needsScores) { // TODO reading not sure what need score does here? assert scorerIt != null && scorerIt.docID() < docID; scorerIt.advance(docID); // aggregations should only be replayed on matching documents @@ -557,6 +568,7 @@ private void runDeferredCollections() throws IOException { collector.collect(docID); } } + deferredCollectors.postCollection(); } @@ -568,11 +580,11 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec @Override public void collect(int doc, long zeroBucket) throws IOException { assert zeroBucket == 0; - Integer slot = queue.compareCurrent(); + Integer slot = queue.compareCurrent(); // TODO reading queue will make sure current value presents through collection mechanism if (slot != null) { // The candidate key is a top bucket. // We can defer the collection of this document/bucket to the sub collector - subCollector.collect(doc, slot); + subCollector.collect(doc, slot); // TODO reading slot is the same as owning bucket ordinal } } }; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 6ee1682a7b196..4d75a46bbb203 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -108,7 +108,7 @@ public int hashCode() { @Override protected boolean lessThan(Integer a, Integer b) { - return compare(a, b) > 0; + return compare(a, b) > 0; // TODO reading a > b is true, this is a max heap? } /** @@ -123,7 +123,7 @@ boolean isFull() { * the slot if the candidate is already in the queue or null if the candidate is not present. */ Integer compareCurrent() { - return map.get(new Slot(CANDIDATE_SLOT)); + return map.get(new Slot(CANDIDATE_SLOT)); // TODO reading this check the slot/bucket? of the current value } /** @@ -152,7 +152,7 @@ long getDocCount(int slot) { */ private void copyCurrent(int slot, long value) { for (int i = 0; i < arrays.length; i++) { - arrays[i].copyCurrent(slot); + arrays[i].copyCurrent(slot); // TODO reading valueSource knows current value, set the value to this slot/index } docCounts = bigArrays.grow(docCounts, slot + 1); docCounts.set(slot, value); @@ -165,11 +165,13 @@ int compare(int slot1, int slot2) { assert slot2 != CANDIDATE_SLOT; for (int i = 0; i < arrays.length; i++) { final int cmp; + if (slot1 == CANDIDATE_SLOT) { cmp = arrays[i].compareCurrent(slot2); } else { cmp = arrays[i].compare(slot1, slot2); } + if (cmp != 0) { return cmp > 0 ? i + 1 : -(i + 1); } @@ -202,7 +204,10 @@ boolean equals(int slot1, int slot2) { int hashCode(int slot) { int result = 1; for (int i = 0; i < arrays.length; i++) { - result = 31 * result + (slot == CANDIDATE_SLOT ? arrays[i].hashCodeCurrent() : arrays[i].hashCode(slot)); + result = 31 * result + // TODO reading why 31 here? For each array, it multiplies the running result by 31. Multiplying by a prime number like 31 helps distribute the hash codes more evenly. + (slot == CANDIDATE_SLOT ? + arrays[i].hashCodeCurrent() : + arrays[i].hashCode(slot)); } return result; } @@ -251,13 +256,15 @@ LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue, LeafReader int last = arrays.length - 1; LeafBucketCollector collector = in; while (last > 0) { - collector = arrays[last--].getLeafCollector(context, collector); + collector = arrays[last--].getLeafCollector(context, collector); // TODO reading the pass-in collect will work after current } + if (forceLeadSourceValue != null) { collector = arrays[last].getLeafCollector(forceLeadSourceValue, context, collector); } else { collector = arrays[last].getLeafCollector(context, collector); } + return collector; } @@ -279,14 +286,15 @@ boolean addIfCompetitive(long inc) { * * @throws CollectionTerminatedException if the current collection can be terminated early due to index sorting. */ - boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { + boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading indexSortSourcePrefix can only be -1 // checks if the candidate key is competitive - Integer topSlot = compareCurrent(); - if (topSlot != null) { + Integer curSlot = compareCurrent(); + if (curSlot != null) { // this key is already in the top N, skip it - docCounts.increment(topSlot, inc); + docCounts.increment(curSlot, inc); return true; } + if (afterKeyIsSet) { int cmp = compareCurrentWithAfter(); if (cmp <= 0) { @@ -300,13 +308,14 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { return false; } } - if (size() >= maxSize) { - // the tree map is full, check if the candidate key should be kept + + if (size() >= maxSize) { // TODO reading when queue is full, can check competitiveness + // the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite key/bucket/slot int cmp = compare(CANDIDATE_SLOT, top()); - if (cmp > 0) { - if (cmp <= indexSortSourcePrefix) { + if (cmp > 0) { // TODO reading current large than queue + if (cmp <= indexSortSourcePrefix) { // TODO reading the way of comparing current and queue uses sorted fields // index sort guarantees that there is no key greater or equal than the - // current one in the subsequent documents so we can early terminate. + // current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items using heap? throw new CollectionTerminatedException(); } // the candidate key is not competitive, skip it. @@ -322,7 +331,7 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // and we recycle the deleted slot newSlot = slot; } else { - newSlot = size(); + newSlot = size(); // TODO reading seems we don't care the number of slot here? } // move the candidate key to its new slot copyCurrent(newSlot, inc); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index fd94ba355238a..3f6aca2ad9c07 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -316,7 +316,8 @@ public static void register(ValuesSourceRegistry.Builder builder) { IndexReader reader, int size, LongConsumer addRequestCircuitBreakerBytes, - CompositeValuesSourceConfig compositeValuesSourceConfig) -> { + CompositeValuesSourceConfig compositeValuesSourceConfig + ) -> { final RoundingValuesSource roundingValuesSource = (RoundingValuesSource) compositeValuesSourceConfig.valuesSource(); return new LongValuesSource( bigArrays, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java index 48e080c1576dd..ee3b1d252fa25 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -253,7 +253,8 @@ static boolean checkMatchAllOrRangeQuery(Query query, String fieldName) { @Override SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { query = extractQuery(query); - if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || checkMatchAllOrRangeQuery(query, fieldType.name()) == false) { + if (checkIfSortedDocsIsApplicable(reader, fieldType) == false + || checkMatchAllOrRangeQuery(query, fieldType.name()) == false) { return null; } final byte[] lowerPoint; @@ -275,13 +276,11 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer case "long": toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0)); break; - case "int": case "short": case "byte": toBucketFunction = (value) -> rounding.applyAsLong(IntPoint.decodeDimension(value, 0)); break; - default: return null; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index 3d6730203b6ae..391d810d6bb37 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -68,6 +68,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade // no value for the field return DocIdSet.EMPTY; } + long lowerBucket = Long.MIN_VALUE; Comparable lowerValue = queue.getLowerValueLeadSource(); if (lowerValue != null) { @@ -85,6 +86,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade } upperBucket = (Long) upperValue; } + DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null; Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket); try { @@ -146,10 +148,9 @@ public void visit(int docID, byte[] packedValue) throws IOException { } long bucket = bucketFunction.applyAsLong(packedValue); - if (first == false && bucket != lastBucket) { - final DocIdSet docIdSet = bucketDocsBuilder.build(); - if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && - // lower bucket is inclusive + if (first == false && bucket != lastBucket) { // TODO reading process previous bucket when new bucket appears + if (processBucket(queue, context, bucketDocsBuilder.build().iterator(), lastBucket, builder) && + // lower bucket is inclusive lowerBucket != lastBucket) { // this bucket does not have any competitive composite buckets, // we can early terminate the collection because the remaining buckets are guaranteed @@ -168,7 +169,8 @@ public void visit(int docID, byte[] packedValue) throws IOException { @Override public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { - if ((upperPointQuery != null && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0) + if ((upperPointQuery != null + && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0) || (lowerPointQuery != null && Arrays.compareUnsigned(maxPackedValue, 0, bytesPerDim, lowerPointQuery, 0, bytesPerDim) < 0)) { // does not match the query @@ -182,13 +184,13 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue return PointValues.Relation.CELL_OUTSIDE_QUERY; } } - if (upperBucket != Long.MAX_VALUE) { long minBucket = bucketFunction.applyAsLong(minPackedValue); if (minBucket > upperBucket) { return PointValues.Relation.CELL_OUTSIDE_QUERY; } } + return PointValues.Relation.CELL_CROSSES_QUERY; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java index fe0801d6d230e..8eabd6e552f10 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java @@ -185,7 +185,8 @@ protected boolean checkIfSortedDocsIsApplicable(IndexReader reader, MappedFieldT return false; } - if (reader.hasDeletions() && (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) { + if (reader.hasDeletions() + && (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) { // do not use the index if it has more than 50% of deleted docs return false; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java index 9442529bf9342..0c9b1b945f4cf 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -75,8 +75,10 @@ protected boolean processBucket( ) throws IOException { final int[] topCompositeCollected = new int[1]; final boolean[] hasCollected = new boolean[1]; + final DocCountProvider docCountProvider = new DocCountProvider(); docCountProvider.setLeafReaderContext(context); + final LeafBucketCollector queueCollector = new LeafBucketCollector() { int lastDoc = -1; @@ -89,10 +91,10 @@ protected boolean processBucket( @Override public void collect(int doc, long bucket) throws IOException { hasCollected[0] = true; - long docCount = docCountProvider.getDocCount(doc); + long docCount = docCountProvider.getDocCount(doc); // TODO reading _doc_count can be >1 if (queue.addIfCompetitive(docCount)) { topCompositeCollected[0]++; - if (adder != null && doc != lastDoc) { + if (adder != null && doc != lastDoc) { // TODO reading why doc can be == lastDoc? if (remainingBits == 0) { // the cost approximation was lower than the real size, we need to grow the adder // by some numbers (128) to ensure that we can add the extra documents @@ -106,13 +108,16 @@ public void collect(int doc, long bucket) throws IOException { } } }; - final Bits liveDocs = context.reader().getLiveDocs(); + final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector); + + final Bits liveDocs = context.reader().getLiveDocs(); while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (liveDocs == null || liveDocs.get(iterator.docID())) { + if (liveDocs == null || liveDocs.get(iterator.docID())) { // TODO reading doc exists collector.collect(iterator.docID()); } } + if (queue.isFull() && hasCollected[0] && topCompositeCollected[0] == 0) { return true; } From 2c7e55b5fce561ac3837c9a2dcacea6ec861b46f Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 5 Dec 2023 10:31:04 -0800 Subject: [PATCH 02/18] First draft implementation Signed-off-by: bowenlan-amzn --- .../bucket/BucketsAggregator.java | 5 +- .../{histogram => }/FilterRewriteHelper.java | 101 ++++++++++------ .../bucket/composite/CompositeAggregator.java | 113 +++++++++++++++--- .../DateHistogramValuesSourceBuilder.java | 2 +- .../bucket/composite/InternalComposite.java | 6 +- .../composite/RoundingValuesSource.java | 21 +++- .../AutoDateHistogramAggregator.java | 24 ++-- .../histogram/DateHistogramAggregator.java | 76 +++++++----- .../AutoDateHistogramAggregatorTests.java | 10 +- 9 files changed, 254 insertions(+), 104 deletions(-) rename server/src/main/java/org/opensearch/search/aggregations/bucket/{histogram => }/FilterRewriteHelper.java (73%) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java index eef427754f535..8e43bf6ec6dd9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java @@ -422,7 +422,10 @@ protected final InternalAggregation[] buildAggregationsForVariableBuckets( + "]" ); } - buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults[b++])); + buckets.add(bucketBuilder.build( + ordsEnum.value(), + bucketDocCount(ordsEnum.ord()), + subAggregationResults[b++])); } results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], buckets); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java similarity index 73% rename from server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java rename to server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index 29cecd5b382cd..a1c4f16ce2a97 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.search.aggregations.bucket.histogram; +package org.opensearch.search.aggregations.bucket; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; @@ -23,9 +23,8 @@ import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.DateRangeIncludingNowQuery; -import org.opensearch.search.aggregations.support.FieldContext; -import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -45,9 +44,9 @@ */ public class FilterRewriteHelper { - static class FilterContext { - final DateFieldMapper.DateFieldType fieldType; - final Weight[] filters; + public static class FilterContext { + public final DateFieldMapper.DateFieldType fieldType; + public final Weight[] filters; public FilterContext(DateFieldMapper.DateFieldType fieldType, Weight[] filters) { this.fieldType = fieldType; @@ -100,7 +99,7 @@ private static long[] getIndexBoundsFromLeaves(final SearchContext context, fina return new long[] { min, max }; } - static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException { + public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException { final Query cq = unwrapIntoConcreteQuery(context.query()); final long[] indexBounds = getIndexBoundsFromLeaves(context, fieldName); if (cq instanceof PointRangeQuery) { @@ -148,86 +147,118 @@ private static Weight[] createFilterForAggregations( // Below rounding is needed as the interval could return in // non-rounded values for something like calendar month roundedLow = preparedRounding.round(roundedLow + interval); - if (prevRounded == roundedLow) break; + if (prevRounded == roundedLow) break; // TODO reading prevents getting into an infinite loop? prevRounded = roundedLow; } Weight[] filters = null; if (bucketCount > 0 && bucketCount <= MAX_NUM_FILTER_BUCKETS) { - int i = 0; filters = new Weight[bucketCount]; roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); + + int i = 0; while (i < bucketCount) { // Calculate the lower bucket bound final byte[] lower = new byte[8]; - NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); + NumericUtils.longToSortableBytes( + i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), + lower, 0 + ); + // Calculate the upper bucket bound - final byte[] upper = new byte[8]; roundedLow = preparedRounding.round(roundedLow + interval); - // Subtract -1 if the minimum is roundedLow as roundedLow itself - // is included in the next bucket + final byte[] upper = new byte[8]; NumericUtils.longToSortableBytes( - i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1, + i + 1 == bucketCount ? high : + // Subtract -1 if the minimum is roundedLow as roundedLow itself + // is included in the next bucket + fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0 ); - filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { - @Override - protected String toString(int dimension, byte[] value) { - return null; - } - }, ScoreMode.COMPLETE_NO_SCORES, 1); + + filters[i++] = context.searcher().createWeight( + new PointRangeQuery(field, lower, upper, 1) { + @Override + protected String toString(int dimension, byte[] value) { + return null; + } + }, ScoreMode.COMPLETE_NO_SCORES, 1); } } return filters; } - static FilterContext buildFastFilterContext( + public static FilterContext buildFastFilterContext( final Object parent, final int subAggLength, SearchContext context, Function roundingFunction, Supplier preparedRoundingSupplier, - ValuesSourceConfig valuesSourceConfig, - CheckedFunction computeBounds + ValueSourceContext valueSourceContext, + CheckedFunction computeBounds ) throws IOException { // Create the filters for fast aggregation only if the query is instance // of point range query and there aren't any parent/sub aggregations - if (parent == null && subAggLength == 0 && valuesSourceConfig.missing() == null && valuesSourceConfig.script() == null) { - final FieldContext fieldContext = valuesSourceConfig.fieldContext(); - if (fieldContext != null) { - final String fieldName = fieldContext.field(); - final long[] bounds = computeBounds.apply(fieldContext); + if (parent == null && subAggLength == 0 && !valueSourceContext.missing && !valueSourceContext.hasScript) { + MappedFieldType fieldType = valueSourceContext.getFieldType(); + if (fieldType != null) { + final String fieldName = valueSourceContext.getFieldType().name(); + final long[] bounds = computeBounds.apply(valueSourceContext); if (bounds != null) { - assert fieldContext.fieldType() instanceof DateFieldMapper.DateFieldType; - final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fieldContext.fieldType(); + assert fieldType instanceof DateFieldMapper.DateFieldType; final Rounding rounding = roundingFunction.apply(bounds); final Weight[] filters = FilterRewriteHelper.createFilterForAggregations( context, rounding, preparedRoundingSupplier.get(), fieldName, - fieldType, + (DateFieldMapper.DateFieldType) fieldType, bounds[0], bounds[1] ); - return new FilterContext(fieldType, filters); + return new FilterContext((DateFieldMapper.DateFieldType) fieldType, filters); } } } return null; } - static long getBucketOrd(long bucketOrd) { - if (bucketOrd < 0) { // already seen + /** + * Encapsulates metadata about a value source needed to rewrite + */ + public static class ValueSourceContext { + private final boolean missing; + private final boolean hasScript; + private final MappedFieldType fieldType; + + /** + * @param missing whether missing value/bucket is set + * @param hasScript whether script is used + * @param fieldType null if the field doesn't exist + */ + public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType) { + this.missing = missing; + this.hasScript = hasScript; + this.fieldType = fieldType; + } + + // TODO reading why boolean doesn't need getter? + public MappedFieldType getFieldType() { + return fieldType; + } + } + + public static long getBucketOrd(long bucketOrd) { + if (bucketOrd < 0) { // already seen // TODO reading theoretically for one segment, there cannot be duplicate bucket? bucketOrd = -1 - bucketOrd; } return bucketOrd; } - static boolean tryFastFilterAggregation( + public static boolean tryFastFilterAggregation( final LeafReaderContext ctx, final Weight[] filters, final DateFieldMapper.DateFieldType fieldType, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index d8bbf42b396fd..d4c01234a3e72 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -55,32 +55,25 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.search.comparators.LongComparator; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.RoaringDocIdSet; +import org.opensearch.common.Rounding; import org.opensearch.common.lease.Releasables; import org.opensearch.index.IndexSortConfig; +import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.lucene.queries.SearchAfterSortedDocQuery; import org.opensearch.search.DocValueFormat; -import org.opensearch.search.aggregations.Aggregator; -import org.opensearch.search.aggregations.AggregatorFactories; -import org.opensearch.search.aggregations.BucketCollector; -import org.opensearch.search.aggregations.CardinalityUpperBound; -import org.opensearch.search.aggregations.InternalAggregation; -import org.opensearch.search.aggregations.InternalAggregations; -import org.opensearch.search.aggregations.LeafBucketCollector; -import org.opensearch.search.aggregations.MultiBucketCollector; -import org.opensearch.search.aggregations.MultiBucketConsumerService; +import org.opensearch.search.aggregations.*; import org.opensearch.search.aggregations.bucket.BucketsAggregator; import org.opensearch.search.aggregations.bucket.missing.MissingOrder; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.searchafter.SearchAfterBuilder; import org.opensearch.search.sort.SortAndFormats; +import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; @@ -110,6 +103,11 @@ final class CompositeAggregator extends BucketsAggregator { private boolean earlyTerminated; + private final Weight[] filters; + private final LongKeyedBucketOrds bucketOrds; + private final DateFieldMapper.DateFieldType fieldType; + private final Rounding.Prepared preparedRounding; + CompositeAggregator( String name, AggregatorFactories factories, @@ -153,6 +151,35 @@ final class CompositeAggregator extends BucketsAggregator { } this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; + + bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); + + CompositeValuesSourceConfig dateHistogramSourceConfig = sourceConfigs[0]; + RoundingValuesSource dateHistogramSource = (RoundingValuesSource) dateHistogramSourceConfig.valuesSource(); + preparedRounding = dateHistogramSource.getPreparedRounding(); + FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( + dateHistogramSourceConfig.missingBucket(), + dateHistogramSourceConfig.hasScript(), + // TODO reading this can be null, and that's why we support missing + dateHistogramSourceConfig.fieldType() + ); + FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( + parent, + subAggregators.length, + context, + x -> dateHistogramSource.getRounding(), + () -> preparedRounding, + dateHistogramSourceContext, + // TODO reading need to consider afterKey in this + fc -> FilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()) + ); + if (filterContext != null) { + fieldType = filterContext.fieldType; + filters = filterContext.filters; + } else { + filters = null; + fieldType = null; + } } @Override @@ -186,11 +213,11 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I } int num = Math.min(size, queue.size()); - final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; + InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; long[] bucketOrdsToCollect = new long[queue.size()]; for (int i = 0; i < queue.size(); i++) { - bucketOrdsToCollect[i] = i; // TODO reading meaning queue is indexed with bucket key, and contains doccount + bucketOrdsToCollect[i] = i; } InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect); @@ -210,6 +237,42 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I ); } + + if (bucketOrds.size() != 0) { + // transform existing buckets into map if not 0 + // this is for the case where we have duplicate buckets, we need to add bucketOrds content into buckets + Map bucketMap = new HashMap<>(); + for (InternalComposite.InternalBucket internalBucket : buckets) { + bucketMap.put(internalBucket.getRawKey(), internalBucket); + } + // need to add bucketOrds content into buckets + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0); + // if duplicate, add to existing + while (ordsEnum.next()) { + Long bucketValue = ordsEnum.value(); + CompositeKey key = new CompositeKey(bucketValue); + if (bucketMap.containsKey(key)) { + long docCount = bucketDocCount(ordsEnum.ord()) + bucketMap.get(key).getDocCount(); + bucketMap.get(key).setDocCount(docCount); + } else { + InternalComposite.InternalBucket bucket = new InternalComposite.InternalBucket( + sourceNames, + formats, + key, + reverseMuls, + missingOrders, + bucketDocCount(ordsEnum.ord()), + buildEmptySubAggregations() + ); + bucketMap.put(key, bucket); + } + } + List bucketList = new ArrayList<>(bucketMap.values()); + CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey); + buckets = bucketList.toArray(InternalComposite.InternalBucket[]::new); + num = buckets.length; + } + CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; return new InternalAggregation[] { new InternalComposite( @@ -451,6 +514,24 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + // Need to be declared as final and array for usage within the + // LeafBucketCollectorBase subclass below + final boolean[] useOpt = new boolean[1]; + useOpt[0] = filters != null; + + // Try fast filter aggregation if the filters have been created + // Skip if tried before and gave incorrect/incomplete results + if (useOpt[0]) { + useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( + bucketOrds.add(0, preparedRounding.round(key))), + count + ); + }); + } + finishLeaf(); boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; // TODO reading subAggs are deferred diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index 3f6aca2ad9c07..3fdd0b3c12a8e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -298,7 +298,7 @@ public static void register(ValuesSourceRegistry.Builder builder) { // TODO once composite is plugged in to the values source registry or at least understands Date values source types use it // here Rounding.Prepared preparedRounding = rounding.prepareForUnknown(); - RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding); + RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding, rounding); // is specified in the builder. final DocValueFormat docValueFormat = format == null ? DocValueFormat.RAW : valuesSourceConfig.format(); final MappedFieldType fieldType = valuesSourceConfig.fieldType(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java index 9f8a4cff5f3fc..43f1ad32a66f4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java @@ -339,7 +339,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern KeyComparable { private final CompositeKey key; - private final long docCount; + private long docCount; private final InternalAggregations aggregations; private final transient int[] reverseMuls; private final transient MissingOrder[] missingOrders; @@ -436,6 +436,10 @@ public long getDocCount() { return docCount; } + public void setDocCount(long docCount) { + this.docCount = docCount; + } + @Override public Aggregations getAggregations() { return aggregations; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java index 89315724ff9ed..87217e8f5bc45 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/RoundingValuesSource.java @@ -49,15 +49,16 @@ */ class RoundingValuesSource extends ValuesSource.Numeric { private final ValuesSource.Numeric vs; - private final Rounding.Prepared rounding; + private final Rounding.Prepared preparedRounding; + private final Rounding rounding; /** - * - * @param vs The original values source - * @param rounding How to round the values + * @param vs The original values source + * @param preparedRounding How to round the values */ - RoundingValuesSource(Numeric vs, Rounding.Prepared rounding) { + RoundingValuesSource(Numeric vs, Rounding.Prepared preparedRounding, Rounding rounding) { this.vs = vs; + this.preparedRounding = preparedRounding; this.rounding = rounding; } @@ -71,8 +72,16 @@ public boolean isBigInteger() { return false; } + public Rounding.Prepared getPreparedRounding() { + return preparedRounding; + } + + public Rounding getRounding() { + return rounding; + } + public long round(long value) { - return rounding.round(value); + return preparedRounding.round(value); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index a71c15d551927..6fbbdda84bf05 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -53,6 +53,7 @@ import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator; import org.opensearch.search.aggregations.bucket.DeferringBucketCollector; +import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector; import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; @@ -156,6 +157,11 @@ private AutoDateHistogramAggregator( this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); + FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( + valuesSourceConfig.missing() != null, + valuesSourceConfig.script() != null, + valuesSourceConfig.fieldType() + ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent(), subAggregators.length, @@ -164,8 +170,8 @@ private AutoDateHistogramAggregator( // Passing prepared rounding as supplier to ensure the correct prepared // rounding is set as it is done during getMinimumRounding () -> preparedRounding, - valuesSourceConfig, - fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field()) + dateHistogramSourceContext, + fc -> FilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()) ); if (filterContext != null) { fieldType = filterContext.fieldType; @@ -240,12 +246,14 @@ public void collect(int doc, long owningBucketOrd) throws IOException { // Try fast filter aggregation if the filters have been created // Skip if tried before and gave incorrect/incomplete results if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))), - count - ); - }); + useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( + getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))), + count + ); + }); } iteratingCollector.collect(doc, owningBucketOrd); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 8437e1dce9fe0..372fddee37401 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -49,8 +49,8 @@ import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; -import org.opensearch.search.aggregations.support.FieldContext; import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; @@ -116,13 +116,18 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality); + FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( + valuesSourceConfig.missing() != null, + valuesSourceConfig.script() != null, + valuesSourceConfig.fieldType() + ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent, subAggregators.length, context, x -> rounding, () -> preparedRounding, - valuesSourceConfig, + dateHistogramSourceContext, this::computeBounds ); if (filterContext != null) { @@ -134,8 +139,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg } } - private long[] computeBounds(final FieldContext fieldContext) throws IOException { - final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.field()); + private long[] computeBounds(final FilterRewriteHelper.ValueSourceContext fieldContext) throws IOException { + final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); if (bounds != null) { // Update min/max limit if user specified any hard bounds if (hardBounds != null) { @@ -172,12 +177,15 @@ public void collect(int doc, long owningBucketOrd) throws IOException { // Try fast filter aggregation if the filters have been created // Skip if tried before and gave incorrect/incomplete results if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd(bucketOrds.add(owningBucketOrd, preparedRounding.round(key))), - count - ); - }); + useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( // TODO reading not possible to see duplicate bucket + bucketOrds.add(owningBucketOrd, preparedRounding.round(key)) + ), + count + ); + }); } if (values.advanceExact(doc)) { @@ -209,29 +217,33 @@ public void collect(int doc, long owningBucketOrd) throws IOException { @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> { - return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults); - }, (owningBucketOrd, buckets) -> { - // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); + return buildAggregationsForVariableBuckets( + owningBucketOrds, + bucketOrds, + (bucketValue, docCount, subAggregationResults) -> { + return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults); + }, + (owningBucketOrd, buckets) -> { + // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); - // value source will be null for unmapped fields - // Important: use `rounding` here, not `shardRounding` - InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 - ? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds) - : null; - return new InternalDateHistogram( - name, - buckets, - order, - minDocCount, - rounding.offset(), - emptyBucketInfo, - formatter, - keyed, - metadata() - ); - }); + // value source will be null for unmapped fields + // Important: use `rounding` here, not `shardRounding` + InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 + ? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds) + : null; + return new InternalDateHistogram( + name, + buckets, + order, + minDocCount, + rounding.offset(), + emptyBucketInfo, + formatter, + keyed, + metadata() + ); + }); } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index dda053af78b30..e8aab0b143108 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -441,10 +441,12 @@ public void testUnmappedMissing() throws IOException { final DateFieldMapper.DateFieldType fieldType = new DateFieldMapper.DateFieldType("date_field"); - testCase(aggregation, DEFAULT_QUERY, iw -> {}, (Consumer) histogram -> { - assertEquals(0, histogram.getBuckets().size()); - assertFalse(AggregationInspectionHelper.hasValue(histogram)); - }, fieldType); + testCase(aggregation, DEFAULT_QUERY, iw -> {}, + (Consumer) histogram -> { + assertEquals(0, histogram.getBuckets().size()); + assertFalse(AggregationInspectionHelper.hasValue(histogram)); + }, + fieldType); } public void testIntervalYear() throws IOException { From 5100ae025285f0e05d5201754c322e97e7c4f2ba Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 6 Dec 2023 09:15:00 -0800 Subject: [PATCH 03/18] complete implementation on small test set Signed-off-by: bowenlan-amzn --- .../bucket/FilterRewriteHelper.java | 39 ++++++++++--- .../bucket/composite/CompositeAggregator.java | 57 +++++++++++-------- .../AutoDateHistogramAggregator.java | 31 ++++------ .../histogram/DateHistogramAggregator.java | 29 ++++------ 4 files changed, 85 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index a1c4f16ce2a97..78418d25d5217 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -10,7 +10,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; -import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.MatchAllDocsQuery; @@ -37,8 +36,8 @@ import java.util.function.Supplier; /** - * Helpers functions to rewrite and optimize aggregations using - * range filter queries + * Help rewrite and optimize aggregations using range filter queries + * Currently supported types of aggregations are: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator * * @opensearch.internal */ @@ -190,6 +189,17 @@ protected String toString(int dimension, byte[] value) { return filters; } + /** + * The pre-conditions to initiate fast filter optimization on aggregations are: + * 1. The query with aggregation has to be PointRangeQuery on the same date field + * 2. No parent/sub aggregations + * 3. No missing value/bucket + * 4. No script + * + * @param computeBounds get the lower and upper bound of the field in a shard search + * @param roundingFunction produce Rounding that will provide the interval + * @param preparedRoundingSupplier produce PreparedRounding that will do the rounding + */ public static FilterContext buildFastFilterContext( final Object parent, final int subAggLength, @@ -199,8 +209,6 @@ public static FilterContext buildFastFilterContext( ValueSourceContext valueSourceContext, CheckedFunction computeBounds ) throws IOException { - // Create the filters for fast aggregation only if the query is instance - // of point range query and there aren't any parent/sub aggregations if (parent == null && subAggLength == 0 && !valueSourceContext.missing && !valueSourceContext.hasScript) { MappedFieldType fieldType = valueSourceContext.getFieldType(); if (fieldType != null) { @@ -251,19 +259,27 @@ public MappedFieldType getFieldType() { } public static long getBucketOrd(long bucketOrd) { - if (bucketOrd < 0) { // already seen // TODO reading theoretically for one segment, there cannot be duplicate bucket? + if (bucketOrd < 0) { // already seen bucketOrd = -1 - bucketOrd; } return bucketOrd; } + /** + * This should be executed for each segment + * + * @param size the maximum number of buckets needed + */ public static boolean tryFastFilterAggregation( final LeafReaderContext ctx, final Weight[] filters, final DateFieldMapper.DateFieldType fieldType, - final BiConsumer incrementDocCount + final BiConsumer incrementDocCount, + final int size ) throws IOException { + if (filters == null) return false; + final int[] counts = new int[filters.length]; int i; for (i = 0; i < filters.length; i++) { @@ -275,16 +291,21 @@ public static boolean tryFastFilterAggregation( } } + int s = 0; for (i = 0; i < filters.length; i++) { if (counts[i] > 0) { incrementDocCount.accept( fieldType.convertNanosToMillis( - NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) + NumericUtils.sortableBytesToLong( + ((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ), counts[i] ); + s++; + if (s > size) return true; } } - throw new CollectionTerminatedException(); + + return true; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index d4c01234a3e72..bbb9c6167e6f7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -73,7 +73,12 @@ import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; @@ -160,7 +165,6 @@ final class CompositeAggregator extends BucketsAggregator { FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( dateHistogramSourceConfig.missingBucket(), dateHistogramSourceConfig.hasScript(), - // TODO reading this can be null, and that's why we support missing dateHistogramSourceConfig.fieldType() ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( @@ -182,6 +186,18 @@ final class CompositeAggregator extends BucketsAggregator { } } + // private long[] computeBounds(final FilterRewriteHelper.ValueSourceContext fieldContext) { + // final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); + // if (bounds != null) { + // // Update min/max limit if user specified any hard bounds + // if (hardBounds != null) { + // bounds[0] = Math.max(bounds[0], hardBounds.getMin()); + // bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive + // } + // } + // return bounds; + // } + @Override protected void doClose() { try { @@ -237,17 +253,14 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I ); } - + // For the fast filters optimization if (bucketOrds.size() != 0) { - // transform existing buckets into map if not 0 - // this is for the case where we have duplicate buckets, we need to add bucketOrds content into buckets Map bucketMap = new HashMap<>(); for (InternalComposite.InternalBucket internalBucket : buckets) { bucketMap.put(internalBucket.getRawKey(), internalBucket); } - // need to add bucketOrds content into buckets + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0); - // if duplicate, add to existing while (ordsEnum.next()) { Long bucketValue = ordsEnum.value(); CompositeKey key = new CompositeKey(bucketValue); @@ -267,9 +280,10 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I bucketMap.put(key, bucket); } } + List bucketList = new ArrayList<>(bucketMap.values()); CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey); - buckets = bucketList.toArray(InternalComposite.InternalBucket[]::new); + buckets = bucketList.subList(0, Math.min(size, bucketList.size())).toArray(InternalComposite.InternalBucket[]::new); num = buckets.length; } @@ -514,23 +528,16 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { - // Need to be declared as final and array for usage within the - // LeafBucketCollectorBase subclass below - final boolean[] useOpt = new boolean[1]; - useOpt[0] = filters != null; - - // Try fast filter aggregation if the filters have been created - // Skip if tried before and gave incorrect/incomplete results - if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - bucketOrds.add(0, preparedRounding.round(key))), - count - ); - }); - } + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( + bucketOrds.add(0, preparedRounding.round(key)) + ), + count + ); + }, size); + if (optimized) throw new CollectionTerminatedException(); finishLeaf(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 6fbbdda84bf05..a2677d6f82768 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; @@ -232,30 +233,22 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc return LeafBucketCollector.NO_OP_COLLECTOR; } + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( + getBucketOrds().add(0, preparedRounding.round(key)) + ), + count + ); + }, Integer.MAX_VALUE); + if (optimized) throw new CollectionTerminatedException(); + final SortedNumericDocValues values = valuesSource.longValues(ctx); final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub); - - // Need to be declared as final and array for usage within the - // LeafBucketCollectorBase subclass below - final boolean[] useOpt = new boolean[1]; - useOpt[0] = filters != null; - return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - // Try fast filter aggregation if the filters have been created - // Skip if tried before and gave incorrect/incomplete results - if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))), - count - ); - }); - } - iteratingCollector.collect(doc, owningBucketOrd); } }; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 372fddee37401..24ba87e8a12ae 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.CollectionUtil; @@ -165,29 +166,21 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - // Need to be declared as final and array for usage within the - // LeafBucketCollectorBase subclass below - final boolean[] useOpt = new boolean[1]; - useOpt[0] = filters != null; + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, + (key, count) -> { + incrementBucketDocCount( + FilterRewriteHelper.getBucketOrd( + bucketOrds.add(0, preparedRounding.round(key)) + ), + count + ); + }, Integer.MAX_VALUE); + if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - // Try fast filter aggregation if the filters have been created - // Skip if tried before and gave incorrect/incomplete results - if (useOpt[0]) { - useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( // TODO reading not possible to see duplicate bucket - bucketOrds.add(owningBucketOrd, preparedRounding.round(key)) - ), - count - ); - }); - } - if (values.advanceExact(doc)) { int valuesCount = values.docValueCount(); From 25157e8fc06c1fa26b72f5a96991c1d4e74c1e13 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 6 Dec 2023 09:40:49 -0800 Subject: [PATCH 04/18] Cleaning Signed-off-by: bowenlan-amzn --- .../search/aggregations/bucket/BucketsAggregator.java | 5 +---- .../aggregations/bucket/FilterRewriteHelper.java | 7 +++---- .../bucket/composite/CompositeAggregator.java | 10 +++++++++- .../composite/CompositeValuesCollectorQueue.java | 10 +++++----- .../bucket/composite/LongValuesSource.java | 3 +-- .../bucket/composite/PointsSortedDocsProducer.java | 5 +++-- .../bucket/composite/SingleDimensionValuesSource.java | 3 +-- .../bucket/composite/SortedDocsProducer.java | 7 +++---- .../histogram/AutoDateHistogramAggregatorTests.java | 10 ++++------ 9 files changed, 30 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java index 8e43bf6ec6dd9..eef427754f535 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java @@ -422,10 +422,7 @@ protected final InternalAggregation[] buildAggregationsForVariableBuckets( + "]" ); } - buckets.add(bucketBuilder.build( - ordsEnum.value(), - bucketDocCount(ordsEnum.ord()), - subAggregationResults[b++])); + buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults[b++])); } results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], buckets); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index 78418d25d5217..f5ccc4e03c166 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -146,7 +146,7 @@ private static Weight[] createFilterForAggregations( // Below rounding is needed as the interval could return in // non-rounded values for something like calendar month roundedLow = preparedRounding.round(roundedLow + interval); - if (prevRounded == roundedLow) break; // TODO reading prevents getting into an infinite loop? + if (prevRounded == roundedLow) break; // prevents getting into an infinite loop prevRounded = roundedLow; } @@ -210,9 +210,9 @@ public static FilterContext buildFastFilterContext( CheckedFunction computeBounds ) throws IOException { if (parent == null && subAggLength == 0 && !valueSourceContext.missing && !valueSourceContext.hasScript) { - MappedFieldType fieldType = valueSourceContext.getFieldType(); + MappedFieldType fieldType = valueSourceContext.fieldType; if (fieldType != null) { - final String fieldName = valueSourceContext.getFieldType().name(); + final String fieldName = fieldType.name(); final long[] bounds = computeBounds.apply(valueSourceContext); if (bounds != null) { assert fieldType instanceof DateFieldMapper.DateFieldType; @@ -252,7 +252,6 @@ public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fi this.fieldType = fieldType; } - // TODO reading why boolean doesn't need getter? public MappedFieldType getFieldType() { return fieldType; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index bbb9c6167e6f7..519de6712a80c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -63,7 +63,15 @@ import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.lucene.queries.SearchAfterSortedDocQuery; import org.opensearch.search.DocValueFormat; -import org.opensearch.search.aggregations.*; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.BucketCollector; +import org.opensearch.search.aggregations.CardinalityUpperBound; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.MultiBucketCollector; +import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.BucketsAggregator; import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 4d75a46bbb203..ae74d5c72df88 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -108,7 +108,7 @@ public int hashCode() { @Override protected boolean lessThan(Integer a, Integer b) { - return compare(a, b) > 0; // TODO reading a > b is true, this is a max heap? + return compare(a, b) > 0; // max heap } /** @@ -123,7 +123,7 @@ boolean isFull() { * the slot if the candidate is already in the queue or null if the candidate is not present. */ Integer compareCurrent() { - return map.get(new Slot(CANDIDATE_SLOT)); // TODO reading this check the slot/bucket? of the current value + return map.get(new Slot(CANDIDATE_SLOT)); } /** @@ -152,7 +152,7 @@ long getDocCount(int slot) { */ private void copyCurrent(int slot, long value) { for (int i = 0; i < arrays.length; i++) { - arrays[i].copyCurrent(slot); // TODO reading valueSource knows current value, set the value to this slot/index + arrays[i].copyCurrent(slot); } docCounts = bigArrays.grow(docCounts, slot + 1); docCounts.set(slot, value); @@ -204,7 +204,7 @@ boolean equals(int slot1, int slot2) { int hashCode(int slot) { int result = 1; for (int i = 0; i < arrays.length; i++) { - result = 31 * result + // TODO reading why 31 here? For each array, it multiplies the running result by 31. Multiplying by a prime number like 31 helps distribute the hash codes more evenly. + result = 31 * result + (slot == CANDIDATE_SLOT ? arrays[i].hashCodeCurrent() : arrays[i].hashCode(slot)); @@ -256,7 +256,7 @@ LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue, LeafReader int last = arrays.length - 1; LeafBucketCollector collector = in; while (last > 0) { - collector = arrays[last--].getLeafCollector(context, collector); // TODO reading the pass-in collect will work after current + collector = arrays[last--].getLeafCollector(context, collector); } if (forceLeadSourceValue != null) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java index ee3b1d252fa25..445bbb831370a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -253,8 +253,7 @@ static boolean checkMatchAllOrRangeQuery(Query query, String fieldName) { @Override SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { query = extractQuery(query); - if (checkIfSortedDocsIsApplicable(reader, fieldType) == false - || checkMatchAllOrRangeQuery(query, fieldType.name()) == false) { + if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || checkMatchAllOrRangeQuery(query, fieldType.name()) == false) { return null; } final byte[] lowerPoint; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index 391d810d6bb37..ae6bfeac05db1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -148,8 +148,9 @@ public void visit(int docID, byte[] packedValue) throws IOException { } long bucket = bucketFunction.applyAsLong(packedValue); - if (first == false && bucket != lastBucket) { // TODO reading process previous bucket when new bucket appears - if (processBucket(queue, context, bucketDocsBuilder.build().iterator(), lastBucket, builder) && + if (first == false && bucket != lastBucket) { // process previous bucket when new bucket appears + final DocIdSet docIdSet = bucketDocsBuilder.build(); + if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && // lower bucket is inclusive lowerBucket != lastBucket) { // this bucket does not have any competitive composite buckets, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java index 8eabd6e552f10..fe0801d6d230e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java @@ -185,8 +185,7 @@ protected boolean checkIfSortedDocsIsApplicable(IndexReader reader, MappedFieldT return false; } - if (reader.hasDeletions() - && (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) { + if (reader.hasDeletions() && (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) { // do not use the index if it has more than 50% of deleted docs return false; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java index 0c9b1b945f4cf..95d3ecad31669 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -91,7 +91,7 @@ protected boolean processBucket( @Override public void collect(int doc, long bucket) throws IOException { hasCollected[0] = true; - long docCount = docCountProvider.getDocCount(doc); // TODO reading _doc_count can be >1 + long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(docCount)) { topCompositeCollected[0]++; if (adder != null && doc != lastDoc) { // TODO reading why doc can be == lastDoc? @@ -109,11 +109,10 @@ public void collect(int doc, long bucket) throws IOException { } }; - final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector); - final Bits liveDocs = context.reader().getLiveDocs(); + final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector); while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (liveDocs == null || liveDocs.get(iterator.docID())) { // TODO reading doc exists + if (liveDocs == null || liveDocs.get(iterator.docID())) { collector.collect(iterator.docID()); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index e8aab0b143108..dda053af78b30 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -441,12 +441,10 @@ public void testUnmappedMissing() throws IOException { final DateFieldMapper.DateFieldType fieldType = new DateFieldMapper.DateFieldType("date_field"); - testCase(aggregation, DEFAULT_QUERY, iw -> {}, - (Consumer) histogram -> { - assertEquals(0, histogram.getBuckets().size()); - assertFalse(AggregationInspectionHelper.hasValue(histogram)); - }, - fieldType); + testCase(aggregation, DEFAULT_QUERY, iw -> {}, (Consumer) histogram -> { + assertEquals(0, histogram.getBuckets().size()); + assertFalse(AggregationInspectionHelper.hasValue(histogram)); + }, fieldType); } public void testIntervalYear() throws IOException { From 5f03c729cb9565286e562295f420e3ee76ea83b5 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 6 Dec 2023 16:24:45 -0800 Subject: [PATCH 05/18] Fixing unit tests Signed-off-by: bowenlan-amzn --- .../bucket/FilterRewriteHelper.java | 18 +++- .../bucket/composite/CompositeAggregator.java | 97 +++++++++---------- .../AutoDateHistogramAggregator.java | 4 +- .../histogram/DateHistogramAggregator.java | 4 +- .../composite/CompositeAggregatorTests.java | 5 +- 5 files changed, 67 insertions(+), 61 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index f5ccc4e03c166..7d248589aef8e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -128,8 +128,9 @@ private static Weight[] createFilterForAggregations( final Rounding.Prepared preparedRounding, final String field, final DateFieldMapper.DateFieldType fieldType, - final long low, - final long high + long low, + final long high, + long afterKey ) throws IOException { final OptionalLong intervalOpt = Rounding.getInterval(rounding); if (intervalOpt.isEmpty()) { @@ -137,6 +138,11 @@ private static Weight[] createFilterForAggregations( } final long interval = intervalOpt.getAsLong(); + // afterKey is the last bucket key in previous response, while the bucket key + // is the start of the bucket values, so add the interval + if (afterKey != 0) { + low = afterKey + interval; + } // Calculate the number of buckets using range and interval long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low)); long prevRounded = roundedLow; @@ -224,7 +230,8 @@ public static FilterContext buildFastFilterContext( fieldName, (DateFieldMapper.DateFieldType) fieldType, bounds[0], - bounds[1] + bounds[1], + valueSourceContext.afterKey ); return new FilterContext((DateFieldMapper.DateFieldType) fieldType, filters); } @@ -240,16 +247,19 @@ public static class ValueSourceContext { private final boolean missing; private final boolean hasScript; private final MappedFieldType fieldType; + private final long afterKey; /** * @param missing whether missing value/bucket is set * @param hasScript whether script is used * @param fieldType null if the field doesn't exist + * @param afterKey */ - public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType) { + public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType, long afterKey) { this.missing = missing; this.hasScript = hasScript; this.fieldType = fieldType; + this.afterKey = afterKey; } public MappedFieldType getFieldType() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 519de6712a80c..a58945118bbf3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -116,10 +116,10 @@ final class CompositeAggregator extends BucketsAggregator { private boolean earlyTerminated; - private final Weight[] filters; - private final LongKeyedBucketOrds bucketOrds; - private final DateFieldMapper.DateFieldType fieldType; - private final Rounding.Prepared preparedRounding; + private Weight[] filters = null; + private LongKeyedBucketOrds bucketOrds = null; + private DateFieldMapper.DateFieldType fieldType = null; + private Rounding.Prepared preparedRounding = null; CompositeAggregator( String name, @@ -165,51 +165,48 @@ final class CompositeAggregator extends BucketsAggregator { this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; - bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); - - CompositeValuesSourceConfig dateHistogramSourceConfig = sourceConfigs[0]; - RoundingValuesSource dateHistogramSource = (RoundingValuesSource) dateHistogramSourceConfig.valuesSource(); - preparedRounding = dateHistogramSource.getPreparedRounding(); - FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( - dateHistogramSourceConfig.missingBucket(), - dateHistogramSourceConfig.hasScript(), - dateHistogramSourceConfig.fieldType() - ); - FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( - parent, - subAggregators.length, - context, - x -> dateHistogramSource.getRounding(), - () -> preparedRounding, - dateHistogramSourceContext, - // TODO reading need to consider afterKey in this - fc -> FilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()) - ); - if (filterContext != null) { - fieldType = filterContext.fieldType; - filters = filterContext.filters; - } else { - filters = null; - fieldType = null; + // Try fast filter optimization + if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) { + RoundingValuesSource dateHistogramSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); + bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); + preparedRounding = dateHistogramSource.getPreparedRounding(); + long afterValue = 0; + if (rawAfterKey != null) { + assert rawAfterKey.size() == 1 && formats.size() == 1; + afterValue = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { + throw new IllegalArgumentException("now() is not supported in [after] key"); + }); + } + FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( + sourceConfigs[0].missingBucket(), + sourceConfigs[0].hasScript(), + sourceConfigs[0].fieldType(), + afterValue + ); + FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( + parent, + subAggregators.length, + context, + x -> dateHistogramSource.getRounding(), + () -> preparedRounding, + dateHistogramSourceContext, + fc -> FilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()) + ); + if (filterContext != null) { + fieldType = filterContext.fieldType; + filters = filterContext.filters; + } else { + filters = null; + fieldType = null; + } } } - // private long[] computeBounds(final FilterRewriteHelper.ValueSourceContext fieldContext) { - // final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); - // if (bounds != null) { - // // Update min/max limit if user specified any hard bounds - // if (hardBounds != null) { - // bounds[0] = Math.max(bounds[0], hardBounds.getMin()); - // bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive - // } - // } - // return bounds; - // } - @Override protected void doClose() { try { Releasables.close(queue); + Releasables.close(bucketOrds); } finally { Releasables.close(sources); } @@ -261,8 +258,8 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I ); } - // For the fast filters optimization - if (bucketOrds.size() != 0) { + // Fast filters optimization + if (bucketOrds != null) { Map bucketMap = new HashMap<>(); for (InternalComposite.InternalBucket internalBucket : buckets) { bucketMap.put(internalBucket.getRawKey(), internalBucket); @@ -549,14 +546,14 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket finishLeaf(); - boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; // TODO reading subAggs are deferred + boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; Sort indexSortPrefix = buildIndexSortPrefix(ctx); - int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); // TODO reading asc index sort exists + int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); // are there index sort enabled? sortPrefixLen SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0 - ? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) // TODO reading only using the first field + ? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) : null; if (sortedDocsProducer != null) { // Visit documents sorted by the leading source of the composite definition and terminates @@ -564,7 +561,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket // in the queue. DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet); if (fillDocIdSet) { - entries.add(new Entry(ctx, docIdSet)); // TODO reading add entries + entries.add(new Entry(ctx, docIdSet)); } // We can bypass search entirely for this segment, the processing is done in the previous call. // Throwing this exception will terminate the execution of the search for this root aggregation, @@ -635,7 +632,7 @@ private void runDeferredCollections() throws IOException { deferredCollectors.preCollection(); - for (Entry entry : entries) { // TODO reading entry is the leaf + for (Entry entry : entries) { DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator(); if (docIdSetIterator == null) { continue; @@ -680,7 +677,7 @@ public void collect(int doc, long zeroBucket) throws IOException { if (slot != null) { // The candidate key is a top bucket. // We can defer the collection of this document/bucket to the sub collector - subCollector.collect(doc, slot); // TODO reading slot is the same as owning bucket ordinal + subCollector.collect(doc, slot); } } }; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index a2677d6f82768..c6917b9a85e75 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -161,8 +161,8 @@ private AutoDateHistogramAggregator( FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, - valuesSourceConfig.fieldType() - ); + valuesSourceConfig.fieldType(), + 0); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent(), subAggregators.length, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 24ba87e8a12ae..da68826cbbba8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -120,8 +120,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext( valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, - valuesSourceConfig.fieldType() - ); + valuesSourceConfig.fieldType(), + 0); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent, subAggregators.length, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index eabc4b7764eed..ef845c8435295 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -1279,7 +1279,7 @@ public void testWithDateHistogram() throws IOException { }, (result) -> { assertEquals(3, result.getBuckets().size()); - assertEquals("{date=1508457600000}", result.afterKey().toString()); + assertEquals("{date=1508457600000}", result.afterKey().toString()); // 2017-10-20T00:00:00 assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); assertEquals(2L, result.getBuckets().get(0).getDocCount()); assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString()); @@ -1300,9 +1300,8 @@ public void testWithDateHistogram() throws IOException { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") .calendarInterval(DateHistogramInterval.days(1)); return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).aggregateAfter( - createAfterKey("date", 1474329600000L) + createAfterKey("date", 1474329600000L) // 2016-09-20T00:00:00 ); - }, (result) -> { assertEquals(2, result.getBuckets().size()); From c506e2125d35067eee8dd15150f9e758d6f2862e Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 6 Dec 2023 16:28:25 -0800 Subject: [PATCH 06/18] spotless Signed-off-by: bowenlan-amzn --- .../bucket/FilterRewriteHelper.java | 35 ++++------- .../bucket/composite/CompositeAggregator.java | 22 +++---- .../CompositeValuesCollectorQueue.java | 11 ++-- .../DateHistogramValuesSourceBuilder.java | 3 +- .../bucket/composite/LongValuesSource.java | 2 + .../composite/PointsSortedDocsProducer.java | 5 +- .../AutoDateHistogramAggregator.java | 15 ++--- .../histogram/DateHistogramAggregator.java | 63 ++++++++----------- 8 files changed, 65 insertions(+), 91 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index 7d248589aef8e..57afe072c865c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -165,30 +165,22 @@ private static Weight[] createFilterForAggregations( while (i < bucketCount) { // Calculate the lower bucket bound final byte[] lower = new byte[8]; - NumericUtils.longToSortableBytes( - i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), - lower, 0 - ); + NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0); // Calculate the upper bucket bound roundedLow = preparedRounding.round(roundedLow + interval); final byte[] upper = new byte[8]; - NumericUtils.longToSortableBytes( - i + 1 == bucketCount ? high : - // Subtract -1 if the minimum is roundedLow as roundedLow itself - // is included in the next bucket - fieldType.convertRoundedMillisToNanos(roundedLow) - 1, - upper, - 0 - ); - - filters[i++] = context.searcher().createWeight( - new PointRangeQuery(field, lower, upper, 1) { - @Override - protected String toString(int dimension, byte[] value) { - return null; - } - }, ScoreMode.COMPLETE_NO_SCORES, 1); + NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high : + // Subtract -1 if the minimum is roundedLow as roundedLow itself + // is included in the next bucket + fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); + + filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { + @Override + protected String toString(int dimension, byte[] value) { + return null; + } + }, ScoreMode.COMPLETE_NO_SCORES, 1); } } @@ -305,8 +297,7 @@ public static boolean tryFastFilterAggregation( if (counts[i] > 0) { incrementDocCount.accept( fieldType.convertNanosToMillis( - NumericUtils.sortableBytesToLong( - ((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) + NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ), counts[i] ); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index a58945118bbf3..53f40d0f91698 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -73,12 +73,12 @@ import org.opensearch.search.aggregations.MultiBucketCollector; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.searchafter.SearchAfterBuilder; import org.opensearch.search.sort.SortAndFormats; -import org.opensearch.search.aggregations.bucket.FilterRewriteHelper; import java.io.IOException; import java.util.ArrayList; @@ -519,7 +519,10 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t if (scorer != null) { DocIdSetIterator docIt = scorer.iterator(); - final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length)); + final LeafBucketCollector inner = queue.getLeafCollector( + ctx, + getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length) + ); inner.setScorer(scorer); final Bits liveDocs = ctx.reader().getLiveDocs(); @@ -533,15 +536,9 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { - boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - bucketOrds.add(0, preparedRounding.round(key)) - ), - count - ); - }, size); + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { + incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count); + }, size); if (optimized) throw new CollectionTerminatedException(); finishLeaf(); @@ -673,7 +670,8 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec @Override public void collect(int doc, long zeroBucket) throws IOException { assert zeroBucket == 0; - Integer slot = queue.compareCurrent(); // TODO reading queue will make sure current value presents through collection mechanism + Integer slot = queue.compareCurrent(); // TODO reading queue will make sure current value presents through collection + // mechanism if (slot != null) { // The candidate key is a top bucket. // We can defer the collection of this document/bucket to the sub collector diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index ae74d5c72df88..e078ffca1597c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -204,10 +204,7 @@ boolean equals(int slot1, int slot2) { int hashCode(int slot) { int result = 1; for (int i = 0; i < arrays.length; i++) { - result = 31 * result + - (slot == CANDIDATE_SLOT ? - arrays[i].hashCodeCurrent() : - arrays[i].hashCode(slot)); + result = 31 * result + (slot == CANDIDATE_SLOT ? arrays[i].hashCodeCurrent() : arrays[i].hashCode(slot)); } return result; } @@ -310,12 +307,14 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading } if (size() >= maxSize) { // TODO reading when queue is full, can check competitiveness - // the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite key/bucket/slot + // the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite + // key/bucket/slot int cmp = compare(CANDIDATE_SLOT, top()); if (cmp > 0) { // TODO reading current large than queue if (cmp <= indexSortSourcePrefix) { // TODO reading the way of comparing current and queue uses sorted fields // index sort guarantees that there is no key greater or equal than the - // current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items using heap? + // current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items + // using heap? throw new CollectionTerminatedException(); } // the candidate key is not competitive, skip it. diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index 3fdd0b3c12a8e..3926ce9bbecb7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -316,8 +316,7 @@ public static void register(ValuesSourceRegistry.Builder builder) { IndexReader reader, int size, LongConsumer addRequestCircuitBreakerBytes, - CompositeValuesSourceConfig compositeValuesSourceConfig - ) -> { + CompositeValuesSourceConfig compositeValuesSourceConfig) -> { final RoundingValuesSource roundingValuesSource = (RoundingValuesSource) compositeValuesSourceConfig.valuesSource(); return new LongValuesSource( bigArrays, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java index 445bbb831370a..48e080c1576dd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -275,11 +275,13 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer case "long": toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0)); break; + case "int": case "short": case "byte": toBucketFunction = (value) -> rounding.applyAsLong(IntPoint.decodeDimension(value, 0)); break; + default: return null; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index ae6bfeac05db1..628fab55b5411 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -151,7 +151,7 @@ public void visit(int docID, byte[] packedValue) throws IOException { if (first == false && bucket != lastBucket) { // process previous bucket when new bucket appears final DocIdSet docIdSet = bucketDocsBuilder.build(); if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && - // lower bucket is inclusive + // lower bucket is inclusive lowerBucket != lastBucket) { // this bucket does not have any competitive composite buckets, // we can early terminate the collection because the remaining buckets are guaranteed @@ -170,8 +170,7 @@ public void visit(int docID, byte[] packedValue) throws IOException { @Override public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { - if ((upperPointQuery != null - && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0) + if ((upperPointQuery != null && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0) || (lowerPointQuery != null && Arrays.compareUnsigned(maxPackedValue, 0, bytesPerDim, lowerPointQuery, 0, bytesPerDim) < 0)) { // does not match the query diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index c6917b9a85e75..0ef546e98a5ba 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -162,7 +162,8 @@ private AutoDateHistogramAggregator( valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, valuesSourceConfig.fieldType(), - 0); + 0 + ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent(), subAggregators.length, @@ -233,15 +234,9 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc return LeafBucketCollector.NO_OP_COLLECTOR; } - boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - getBucketOrds().add(0, preparedRounding.round(key)) - ), - count - ); - }, Integer.MAX_VALUE); + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { + incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))), count); + }, Integer.MAX_VALUE); if (optimized) throw new CollectionTerminatedException(); final SortedNumericDocValues values = valuesSource.longValues(ctx); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index da68826cbbba8..11fff40e0542b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -121,7 +121,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, valuesSourceConfig.fieldType(), - 0); + 0 + ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent, subAggregators.length, @@ -166,15 +167,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, - (key, count) -> { - incrementBucketDocCount( - FilterRewriteHelper.getBucketOrd( - bucketOrds.add(0, preparedRounding.round(key)) - ), - count - ); - }, Integer.MAX_VALUE); + boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> { + incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count); + }, Integer.MAX_VALUE); if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx); @@ -210,33 +205,29 @@ public void collect(int doc, long owningBucketOrd) throws IOException { @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - return buildAggregationsForVariableBuckets( - owningBucketOrds, - bucketOrds, - (bucketValue, docCount, subAggregationResults) -> { - return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults); - }, - (owningBucketOrd, buckets) -> { - // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); + return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> { + return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults); + }, (owningBucketOrd, buckets) -> { + // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); - // value source will be null for unmapped fields - // Important: use `rounding` here, not `shardRounding` - InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 - ? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds) - : null; - return new InternalDateHistogram( - name, - buckets, - order, - minDocCount, - rounding.offset(), - emptyBucketInfo, - formatter, - keyed, - metadata() - ); - }); + // value source will be null for unmapped fields + // Important: use `rounding` here, not `shardRounding` + InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 + ? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds) + : null; + return new InternalDateHistogram( + name, + buckets, + order, + minDocCount, + rounding.offset(), + emptyBucketInfo, + formatter, + keyed, + metadata() + ); + }); } @Override From 6428e9bb47b7d97d7a7ef9746aa0c14a999adfe5 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 6 Dec 2023 17:44:03 -0800 Subject: [PATCH 07/18] Missing java doc Signed-off-by: bowenlan-amzn --- .../bucket/FilterRewriteHelper.java | 3 +++ .../bucket/composite/CompositeAggregator.java | 27 +++++++++---------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index 57afe072c865c..e34d9e480cdac 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -43,6 +43,9 @@ */ public class FilterRewriteHelper { + /** + * Saves the objects that will be used to try fast filter optimization + */ public static class FilterContext { public final DateFieldMapper.DateFieldType fieldType; public final Weight[] filters; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 53f40d0f91698..0fe61532a5de5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -368,7 +368,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException for (int i = 0; i < end; i++) { CompositeValuesSourceConfig sourceConfig = sourceConfigs[i]; SingleDimensionValuesSource source = sources[i]; - SortField indexSortField = indexSort.getSort()[i]; // TODO reading requiring the order should match + SortField indexSortField = indexSort.getSort()[i]; if (source.fieldType == null // TODO: can we handle missing bucket when using index sort optimization ? || source.missingBucket @@ -380,16 +380,18 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException if (indexSortField.getReverse() != (source.reverseMul == -1)) { if (i == 0) { - // the leading index sort matches the leading source field but the order is reversed + // the leading index sort matches the leading source field, but the order is reversed, // so we don't check the other sources. return new Sort(indexSortField); } break; } + sortFields.add(indexSortField); + if (sourceConfig.valuesSource() instanceof RoundingValuesSource) { - // the rounding "squashes" many values together, that breaks the ordering of sub-values - // so we ignore subsequent source even if they match the index sort. + // the rounding "squashes" many values together, that breaks the ordering of sub-values, + // so we ignore the subsequent sources even if they match the index sort. break; } } @@ -409,8 +411,7 @@ private int computeSortPrefixLen(Sort indexSortPrefix) { if (indexSortPrefix == null) { return 0; } - if (indexSortPrefix.getSort()[0].getReverse() // TODO reading sort optimization is reversed - != (sources[0].reverseMul == -1)) { // TODO reading aggregation sort param is desc + if (indexSortPrefix.getSort()[0].getReverse() != (sources[0].reverseMul == -1)) { assert indexSortPrefix.getSort().length == 1; return -1; } else { @@ -502,7 +503,6 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t for (int i = 0; i < formats.length; i++) { formats[i] = sources[i].format; } - // TODO reading sort and search after with criteria FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc( new SortAndFormats(indexSortPrefix, formats), Arrays.copyOfRange(rawAfterKey.values(), 0, formats.length) @@ -565,19 +565,19 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket // see {@link MultiCollector} for more details on how we handle early termination in aggregations. earlyTerminated = true; throw new CollectionTerminatedException(); - } else { // TODO reading index sort not enabled + } else { if (fillDocIdSet) { currentLeaf = ctx; docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); } if (rawAfterKey != null && sortPrefixLen > 0) { // We have an after key and index sort is applicable, so we jump directly to the doc - // that is after the index sort prefix using the rawAfterKey and we start collecting - // document from there. + // after the index sort prefix using the rawAfterKey and we start collecting + // documents from there. + assert indexSortPrefix != null; processLeafFromQuery(ctx, indexSortPrefix); throw new CollectionTerminatedException(); } else { - // rawAfterKey == null || sort order is reversed final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen)); return new LeafBucketCollector() { @Override @@ -649,7 +649,7 @@ private void runDeferredCollections() throws IOException { int docID; while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (needsScores) { // TODO reading not sure what need score does here? + if (needsScores) { assert scorerIt != null && scorerIt.docID() < docID; scorerIt.advance(docID); // aggregations should only be replayed on matching documents @@ -670,8 +670,7 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec @Override public void collect(int doc, long zeroBucket) throws IOException { assert zeroBucket == 0; - Integer slot = queue.compareCurrent(); // TODO reading queue will make sure current value presents through collection - // mechanism + Integer slot = queue.compareCurrent(); if (slot != null) { // The candidate key is a top bucket. // We can defer the collection of this document/bucket to the sub collector From 1507a654525d625362a569771f0b43bc05221778 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 7 Dec 2023 08:35:02 -0800 Subject: [PATCH 08/18] add changelog Signed-off-by: bowenlan-amzn --- .../search/aggregations/bucket/FilterRewriteHelper.java | 2 +- .../aggregations/bucket/composite/CompositeAggregator.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index e34d9e480cdac..804f179a0990e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -248,7 +248,7 @@ public static class ValueSourceContext { * @param missing whether missing value/bucket is set * @param hasScript whether script is used * @param fieldType null if the field doesn't exist - * @param afterKey + * @param afterKey for composite aggregation, the key of the last bucket in the previous response */ public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType, long afterKey) { this.missing = missing; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 0fe61532a5de5..23e875645d344 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -602,7 +602,7 @@ public void collect(int doc, long bucket) throws IOException { try { long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(indexSortPrefix, docCount)) { - if (builder != null && lastDoc != doc) { // TODO reading how can lastDoc == doc? + if (builder != null && lastDoc != doc) { builder.add(doc); lastDoc = doc; } From 19c632d52e214496fa16f7730e11dc9827e21645 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 7 Dec 2023 10:17:35 -0800 Subject: [PATCH 09/18] cleaning up Signed-off-by: bowenlan-amzn --- .../bucket/FilterRewriteHelper.java | 4 +- .../bucket/composite/CompositeAggregator.java | 19 +++------- .../CompositeValuesCollectorQueue.java | 38 +++++++++---------- .../composite/PointsSortedDocsProducer.java | 4 +- .../bucket/composite/SortedDocsProducer.java | 6 +-- .../AutoDateHistogramAggregator.java | 2 +- .../histogram/DateHistogramAggregator.java | 2 +- .../composite/CompositeAggregatorTests.java | 12 +++--- 8 files changed, 34 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java index 804f179a0990e..a647f82548e33 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FilterRewriteHelper.java @@ -143,7 +143,7 @@ private static Weight[] createFilterForAggregations( final long interval = intervalOpt.getAsLong(); // afterKey is the last bucket key in previous response, while the bucket key // is the start of the bucket values, so add the interval - if (afterKey != 0) { + if (afterKey != -1) { low = afterKey + interval; } // Calculate the number of buckets using range and interval @@ -248,7 +248,7 @@ public static class ValueSourceContext { * @param missing whether missing value/bucket is set * @param hasScript whether script is used * @param fieldType null if the field doesn't exist - * @param afterKey for composite aggregation, the key of the last bucket in the previous response + * @param afterKey used to paginate for composite aggregation, pass in -1 if not used */ public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType, long afterKey) { this.missing = missing; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 23e875645d344..e391422d7a9a2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -165,7 +165,7 @@ final class CompositeAggregator extends BucketsAggregator { this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; - // Try fast filter optimization + // Try fast filter optimization when the only source is date histogram if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) { RoundingValuesSource dateHistogramSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); @@ -258,7 +258,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I ); } - // Fast filters optimization + // Build results from fast filters optimization if (bucketOrds != null) { Map bucketMap = new HashMap<>(); for (InternalComposite.InternalBucket internalBucket : buckets) { @@ -386,9 +386,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException } break; } - sortFields.add(indexSortField); - if (sourceConfig.valuesSource() instanceof RoundingValuesSource) { // the rounding "squashes" many values together, that breaks the ordering of sub-values, // so we ignore the subsequent sources even if they match the index sort. @@ -516,7 +514,6 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t .build(); Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f); Scorer scorer = weight.scorer(ctx); - if (scorer != null) { DocIdSetIterator docIt = scorer.iterator(); final LeafBucketCollector inner = queue.getLeafCollector( @@ -524,7 +521,6 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length) ); inner.setScorer(scorer); - final Bits liveDocs = ctx.reader().getLiveDocs(); while (docIt.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { if (liveDocs == null || liveDocs.get(docIt.docID())) { @@ -548,7 +544,6 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket Sort indexSortPrefix = buildIndexSortPrefix(ctx); int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); - // are there index sort enabled? sortPrefixLen SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0 ? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) : null; @@ -602,6 +597,8 @@ public void collect(int doc, long bucket) throws IOException { try { long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(indexSortPrefix, docCount)) { + // one doc may contain multiple values, we iterate over and collect one by one + // so the same doc can appear multiple times here if (builder != null && lastDoc != doc) { builder.add(doc); lastDoc = doc; @@ -626,18 +623,14 @@ private void runDeferredCollections() throws IOException { Query query = context.query(); weight = context.searcher().createWeight(context.searcher().rewrite(query), ScoreMode.COMPLETE, 1f); } - deferredCollectors.preCollection(); - for (Entry entry : entries) { DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator(); if (docIdSetIterator == null) { continue; } - final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context); final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector)); - DocIdSetIterator scorerIt = null; if (needsScores) { Scorer scorer = weight.scorer(entry.context); @@ -646,7 +639,6 @@ private void runDeferredCollections() throws IOException { subCollector.setScorer(scorer); } } - int docID; while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { if (needsScores) { @@ -658,7 +650,6 @@ private void runDeferredCollections() throws IOException { collector.collect(docID); } } - deferredCollectors.postCollection(); } @@ -670,7 +661,7 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec @Override public void collect(int doc, long zeroBucket) throws IOException { assert zeroBucket == 0; - Integer slot = queue.compareCurrent(); + Integer slot = queue.getCurrentSlot(); if (slot != null) { // The candidate key is a top bucket. // We can defer the collection of this document/bucket to the sub collector diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index e078ffca1597c..2c4d451322bca 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -47,6 +47,8 @@ /** * A specialized {@link PriorityQueue} implementation for composite buckets. + * Can think of this as a max heap that holds the top small buckets slots in order. + * Each slot holds the values of the composite bucket key it represents. * * @opensearch.internal */ @@ -77,7 +79,7 @@ public int hashCode() { private final BigArrays bigArrays; private final int maxSize; - private final Map map; + private final Map map; // to quickly find the slot for a value private final SingleDimensionValuesSource[] arrays; private LongArray docCounts; @@ -119,10 +121,10 @@ boolean isFull() { } /** - * Compares the current candidate with the values in the queue and returns + * Try to get the slot of the current/candidate values in the queue and returns * the slot if the candidate is already in the queue or null if the candidate is not present. */ - Integer compareCurrent() { + Integer getCurrentSlot() { return map.get(new Slot(CANDIDATE_SLOT)); } @@ -165,13 +167,11 @@ int compare(int slot1, int slot2) { assert slot2 != CANDIDATE_SLOT; for (int i = 0; i < arrays.length; i++) { final int cmp; - if (slot1 == CANDIDATE_SLOT) { cmp = arrays[i].compareCurrent(slot2); } else { cmp = arrays[i].compare(slot1, slot2); } - if (cmp != 0) { return cmp > 0 ? i + 1 : -(i + 1); } @@ -255,13 +255,11 @@ LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue, LeafReader while (last > 0) { collector = arrays[last--].getLeafCollector(context, collector); } - if (forceLeadSourceValue != null) { collector = arrays[last].getLeafCollector(forceLeadSourceValue, context, collector); } else { collector = arrays[last].getLeafCollector(context, collector); } - return collector; } @@ -283,9 +281,9 @@ boolean addIfCompetitive(long inc) { * * @throws CollectionTerminatedException if the current collection can be terminated early due to index sorting. */ - boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading indexSortSourcePrefix can only be -1 + boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // checks if the candidate key is competitive - Integer curSlot = compareCurrent(); + Integer curSlot = getCurrentSlot(); if (curSlot != null) { // this key is already in the top N, skip it docCounts.increment(curSlot, inc); @@ -296,25 +294,23 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading int cmp = compareCurrentWithAfter(); if (cmp <= 0) { if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) { - // the leading index sort is in the reverse order of the leading source + // the leading index sort is and the leading source order are both reversed, // so we can early terminate when we reach a document that is smaller // than the after key (collected on a previous page). throw new CollectionTerminatedException(); } - // key was collected on a previous page, skip it (>= afterKey). + // the key was collected on a previous page, skip it. return false; } } - if (size() >= maxSize) { // TODO reading when queue is full, can check competitiveness - // the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite - // key/bucket/slot + // the heap is full, check if the candidate key larger than max heap top + if (size() >= maxSize) { int cmp = compare(CANDIDATE_SLOT, top()); - if (cmp > 0) { // TODO reading current large than queue - if (cmp <= indexSortSourcePrefix) { // TODO reading the way of comparing current and queue uses sorted fields - // index sort guarantees that there is no key greater or equal than the - // current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items - // using heap? + if (cmp > 0) { + if (cmp <= indexSortSourcePrefix) { + // index sort guarantees the following documents will have a key larger than the current candidate, + // so we can early terminate. throw new CollectionTerminatedException(); } // the candidate key is not competitive, skip it. @@ -330,9 +326,9 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading // and we recycle the deleted slot newSlot = slot; } else { - newSlot = size(); // TODO reading seems we don't care the number of slot here? + newSlot = size(); } - // move the candidate key to its new slot + // move the candidate key to its new slot by copy its values to the new slot copyCurrent(newSlot, inc); map.put(new Slot(newSlot), newSlot); add(newSlot); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index 628fab55b5411..dc130eb54c0ea 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -77,7 +77,6 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade } lowerBucket = (Long) lowerValue; } - long upperBucket = Long.MAX_VALUE; Comparable upperValue = queue.getUpperValueLeadSource(); if (upperValue != null) { @@ -148,7 +147,8 @@ public void visit(int docID, byte[] packedValue) throws IOException { } long bucket = bucketFunction.applyAsLong(packedValue); - if (first == false && bucket != lastBucket) { // process previous bucket when new bucket appears + // process previous bucket when new bucket appears + if (first == false && bucket != lastBucket) { final DocIdSet docIdSet = bucketDocsBuilder.build(); if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && // lower bucket is inclusive diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java index 95d3ecad31669..9442529bf9342 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -75,10 +75,8 @@ protected boolean processBucket( ) throws IOException { final int[] topCompositeCollected = new int[1]; final boolean[] hasCollected = new boolean[1]; - final DocCountProvider docCountProvider = new DocCountProvider(); docCountProvider.setLeafReaderContext(context); - final LeafBucketCollector queueCollector = new LeafBucketCollector() { int lastDoc = -1; @@ -94,7 +92,7 @@ public void collect(int doc, long bucket) throws IOException { long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(docCount)) { topCompositeCollected[0]++; - if (adder != null && doc != lastDoc) { // TODO reading why doc can be == lastDoc? + if (adder != null && doc != lastDoc) { if (remainingBits == 0) { // the cost approximation was lower than the real size, we need to grow the adder // by some numbers (128) to ensure that we can add the extra documents @@ -108,7 +106,6 @@ public void collect(int doc, long bucket) throws IOException { } } }; - final Bits liveDocs = context.reader().getLiveDocs(); final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector); while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { @@ -116,7 +113,6 @@ public void collect(int doc, long bucket) throws IOException { collector.collect(iterator.docID()); } } - if (queue.isFull() && hasCollected[0] && topCompositeCollected[0] == 0) { return true; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 0ef546e98a5ba..5a0b6bffb4f38 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -162,7 +162,7 @@ private AutoDateHistogramAggregator( valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, valuesSourceConfig.fieldType(), - 0 + -1 ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent(), diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 11fff40e0542b..74ed12b8d759d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -121,7 +121,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null, valuesSourceConfig.fieldType(), - 0 + -1 ); FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext( parent, diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index ef845c8435295..b581e552fec4f 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -2241,21 +2241,20 @@ private , V extends Comparable> void testRandomTerms( Function transformKey ) throws IOException { int numTerms = randomIntBetween(10, 500); - List terms = new ArrayList<>(); + List terms = new ArrayList<>(); // possible values for the terms for (int i = 0; i < numTerms; i++) { terms.add(randomSupplier.get()); } int numDocs = randomIntBetween(100, 200); List>> dataset = new ArrayList<>(); - - Set valuesSet = new HashSet<>(); - Map, AtomicLong> expectedDocCounts = new HashMap<>(); + Set valuesSet = new HashSet<>(); // how many different values + Map, AtomicLong> expectedDocCounts = new HashMap<>(); // how many docs for each value for (int i = 0; i < numDocs; i++) { int numValues = randomIntBetween(1, 5); Set values = new HashSet<>(); for (int j = 0; j < numValues; j++) { int rand = randomIntBetween(0, terms.size() - 1); - if (values.add(terms.get(rand))) { + if (values.add(terms.get(rand))) { // values are unique for one doc AtomicLong count = expectedDocCounts.computeIfAbsent(terms.get(rand), (k) -> new AtomicLong(0)); count.incrementAndGet(); valuesSet.add(terms.get(rand)); @@ -2263,9 +2262,8 @@ private , V extends Comparable> void testRandomTerms( } dataset.add(Collections.singletonMap(field, new ArrayList<>(values))); } - List expected = new ArrayList<>(valuesSet); + List expected = new ArrayList<>(valuesSet); // how many buckets expected Collections.sort(expected); - List> seen = new ArrayList<>(); AtomicBoolean finish = new AtomicBoolean(false); int size = randomIntBetween(1, expected.size()); From 6cfefb726f759cfbb1cb2ec4d313b82d2d3936e5 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 3 Jan 2024 10:05:15 -0800 Subject: [PATCH 10/18] to merge Signed-off-by: bowenlan-amzn --- .idea/runConfigurations/Debug_OpenSearch.xml | 6 +- ...lper.java => FastFilterRewriteHelper.java} | 230 ++++++++++++------ .../bucket/composite/CompositeAggregator.java | 100 +++++++- .../CompositeValuesCollectorQueue.java | 30 ++- .../DateHistogramValuesSourceBuilder.java | 2 +- .../bucket/composite/InternalComposite.java | 6 +- .../composite/PointsSortedDocsProducer.java | 6 +- .../composite/RoundingValuesSource.java | 21 +- .../AutoDateHistogramAggregator.java | 59 ++--- .../histogram/DateHistogramAggregator.java | 57 ++--- .../composite/CompositeAggregatorTests.java | 17 +- 11 files changed, 346 insertions(+), 188 deletions(-) rename server/src/main/java/org/opensearch/search/aggregations/bucket/{histogram/FilterRewriteHelper.java => FastFilterRewriteHelper.java} (55%) diff --git a/.idea/runConfigurations/Debug_OpenSearch.xml b/.idea/runConfigurations/Debug_OpenSearch.xml index 0d8bf59823acf..c18046f873477 100644 --- a/.idea/runConfigurations/Debug_OpenSearch.xml +++ b/.idea/runConfigurations/Debug_OpenSearch.xml @@ -6,6 +6,10 @@