diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java index 253972bc8d876..6110dbda0742f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; @@ -152,6 +153,12 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException { @Override public double metric(long owningBucketOrd) { + try { + // Make sure all outstanding data has been synced down to the counts. + postCollectLastCollector(); + } catch (IOException e) { + throw new AggregationExecutionException("error collecting data in last segment", e); + } return counts == null ? 0 : counts.cardinality(owningBucketOrd); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java index 86cb4ef1c925d..5a4ef6c4308b4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; @@ -103,6 +104,13 @@ public void collect(int doc, long bucketOrd) throws IOException { @Override protected void beforeBuildingResults(long[] ordsToCollect) throws IOException { + buildCountIfNeeded(); + } + + private void buildCountIfNeeded() throws IOException { + if (counts != null) { + return; + } counts = new HyperLogLogPlusPlusSparse(precision, bigArrays, visitedOrds.size()); try (LongArray hashes = bigArrays.newLongArray(maxOrd, false)) { try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) { @@ -141,12 +149,18 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException { @Override public double metric(long owningBucketOrd) { - return counts == null ? 0 : counts.cardinality(owningBucketOrd); + try { + // Make sure all outstanding data has been synced down to the counts. + buildCountIfNeeded(); + } catch (IOException e) { + throw new AggregationExecutionException("error collecting data in last segment", e); + } + return counts.cardinality(owningBucketOrd); } @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) { - if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) { + if (owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) { return buildEmptyAggregation(); } // We need to build a copy because the returned Aggregation needs remain usable after diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index 703d3d9c928f4..11a184157c95e 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -86,6 +86,7 @@ import org.elasticsearch.search.aggregations.bucket.nested.InternalNested; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorTests; +import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.InternalTopHits; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; @@ -114,6 +115,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; +import static java.util.stream.Collectors.toList; import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketScript; @@ -1418,6 +1420,71 @@ public void testOrderByPipelineAggregation() throws Exception { } } + public void testOrderByCardinality() throws IOException { + boolean bIsString = randomBoolean(); + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("a").field("a") + .size(3) + .shardSize(3) + .subAggregation(new CardinalityAggregationBuilder("b").field("b")) + .order(BucketOrder.aggregation("b", false)); + + /* + * Build documents where larger "a"s obviously have more distinct "b"s + * associated with them. But insert them into Lucene in a random + * order using Lucene's randomizeWriter so we'll bump into situations + * where documents in the last segment change the outcome of the + * cardinality agg. At least, right now the bug has to do with + * documents in the last segment. But randomize so we can catch + * new and strange bugs in the future. Finally, its important that + * we have few enough values that cardinality can be exact. + */ + List> docs = new ArrayList<>(); + for (int a = 0; a < 10; a++) { + for (int b = 0; b <= a; b++) { + docs.add( + org.elasticsearch.common.collect.List.of( + new NumericDocValuesField("a", a), + bIsString ? new SortedSetDocValuesField("b", new BytesRef(Integer.toString(b))) : new NumericDocValuesField("b", b) + ) + ); + } + } + Collections.shuffle(docs, random()); + try (Directory directory = newDirectory()) { + RandomIndexWriter iw = new RandomIndexWriter(random(), directory); + for (List doc : docs) { + iw.addDocument(doc); + } + iw.close(); + + try (DirectoryReader unwrapped = DirectoryReader.open(directory); + IndexReader indexReader = wrapDirectoryReader(unwrapped)) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + + LongTerms terms = searchAndReduce( + createIndexSettings(), + indexSearcher, + new MatchAllDocsQuery(), + aggregationBuilder, + Integer.MAX_VALUE, + false, + new NumberFieldMapper.NumberFieldType("a", NumberFieldMapper.NumberType.INTEGER), + bIsString + ? new KeywordFieldMapper.KeywordFieldType("b") + : new NumberFieldMapper.NumberFieldType("b", NumberFieldMapper.NumberType.INTEGER) + ); + assertThat( + terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of(9L, 8L, 7L)) + ); + assertThat( + terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getDocCount).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of(10L, 9L, 8L)) + ); + } + } + } + private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); private List generateDocsWithNested(String id, int value, int[] nestedValues) { List documents = new ArrayList<>(); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 6e09ac844e1be..48c6eb071fac7 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -442,6 +442,24 @@ protected A searchAndReduc AggregationBuilder builder, int maxBucket, MappedFieldType... fieldTypes) throws IOException { + return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, randomBoolean(), fieldTypes); + } + + /** + * Collects all documents that match the provided query {@link Query} and + * returns the reduced {@link InternalAggregation}. + *

+ * @param splitLeavesIntoSeparateAggregators If true this creates a new {@link Aggregator} + * for each leaf as though it were a separate index. If false this aggregates + * all leaves together, like we do in production. + */ + protected A searchAndReduce(IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + boolean splitLeavesIntoSeparateAggregators, + MappedFieldType... fieldTypes) throws IOException { final IndexReaderContext ctx = searcher.getTopReaderContext(); final PipelineTree pipelines = builder.buildPipelineTree(); List aggs = new ArrayList<>(); @@ -458,7 +476,7 @@ protected A searchAndReduc ); C root = createAggregator(builder, context); - if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) { + if (splitLeavesIntoSeparateAggregators && searcher.getIndexReader().leaves().size() > 0) { assertThat(ctx, instanceOf(CompositeReaderContext.class)); final CompositeReaderContext compCTX = (CompositeReaderContext) ctx; final int size = compCTX.leaves().size();