From c1b6dc5bc4669d4648283009619e7fb2090b0fb0 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 26 Jan 2021 08:42:17 -0500 Subject: [PATCH] Fix sorting terms by cardinality agg (#67839) The cardinality agg delays calculating stuff until just before it is needed. Before #64016 it used the `postCollect` phase to do this work which was perfect for the `terms` agg but we decided that `postCollect` was dangerous because some aggs, notably the `parent` and `child` aggs need to know which children to build and they *can't* during `postCollect`. After #64016 we built the cardinality agg results when we built the buckets. But we if you sort on the cardinality agg then you need to do the `postCollect` stuff in order to know which buckets to build! So you have a chicken and egg problem. Sort of. This change splits the difference by running the delayed cardinality agg stuff as soon as you *either* try to build the buckets *or* read the cardinality for use with sorting. This works, but is a little janky and feels wrong. It feels like we could make a structural fix to the way we read metric values from aggs before building the buckets that would make this sort of bug much more difficult to cause. But any sort of solution to this is a larger structural change. So this fixes the bug in the quick and janky way and we hope to do a more structural fix to the way we read metrics soon. Closes #67782 --- .../metrics/CardinalityAggregator.java | 7 ++ .../GlobalOrdCardinalityAggregator.java | 18 ++++- .../bucket/terms/TermsAggregatorTests.java | 67 +++++++++++++++++++ .../aggregations/AggregatorTestCase.java | 20 +++++- 4 files changed, 109 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java index 253972bc8d876..6110dbda0742f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregator.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; @@ -152,6 +153,12 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException { @Override public double metric(long owningBucketOrd) { + try { + // Make sure all outstanding data has been synced down to the counts. + postCollectLastCollector(); + } catch (IOException e) { + throw new AggregationExecutionException("error collecting data in last segment", e); + } return counts == null ? 0 : counts.cardinality(owningBucketOrd); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java index 86cb4ef1c925d..5a4ef6c4308b4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; @@ -103,6 +104,13 @@ public void collect(int doc, long bucketOrd) throws IOException { @Override protected void beforeBuildingResults(long[] ordsToCollect) throws IOException { + buildCountIfNeeded(); + } + + private void buildCountIfNeeded() throws IOException { + if (counts != null) { + return; + } counts = new HyperLogLogPlusPlusSparse(precision, bigArrays, visitedOrds.size()); try (LongArray hashes = bigArrays.newLongArray(maxOrd, false)) { try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) { @@ -141,12 +149,18 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException { @Override public double metric(long owningBucketOrd) { - return counts == null ? 0 : counts.cardinality(owningBucketOrd); + try { + // Make sure all outstanding data has been synced down to the counts. + buildCountIfNeeded(); + } catch (IOException e) { + throw new AggregationExecutionException("error collecting data in last segment", e); + } + return counts.cardinality(owningBucketOrd); } @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) { - if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) { + if (owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) { return buildEmptyAggregation(); } // We need to build a copy because the returned Aggregation needs remain usable after diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index 703d3d9c928f4..11a184157c95e 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -86,6 +86,7 @@ import org.elasticsearch.search.aggregations.bucket.nested.InternalNested; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorTests; +import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.InternalTopHits; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; @@ -114,6 +115,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; +import static java.util.stream.Collectors.toList; import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketScript; @@ -1418,6 +1420,71 @@ public void testOrderByPipelineAggregation() throws Exception { } } + public void testOrderByCardinality() throws IOException { + boolean bIsString = randomBoolean(); + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("a").field("a") + .size(3) + .shardSize(3) + .subAggregation(new CardinalityAggregationBuilder("b").field("b")) + .order(BucketOrder.aggregation("b", false)); + + /* + * Build documents where larger "a"s obviously have more distinct "b"s + * associated with them. But insert them into Lucene in a random + * order using Lucene's randomizeWriter so we'll bump into situations + * where documents in the last segment change the outcome of the + * cardinality agg. At least, right now the bug has to do with + * documents in the last segment. But randomize so we can catch + * new and strange bugs in the future. Finally, its important that + * we have few enough values that cardinality can be exact. + */ + List> docs = new ArrayList<>(); + for (int a = 0; a < 10; a++) { + for (int b = 0; b <= a; b++) { + docs.add( + org.elasticsearch.common.collect.List.of( + new NumericDocValuesField("a", a), + bIsString ? new SortedSetDocValuesField("b", new BytesRef(Integer.toString(b))) : new NumericDocValuesField("b", b) + ) + ); + } + } + Collections.shuffle(docs, random()); + try (Directory directory = newDirectory()) { + RandomIndexWriter iw = new RandomIndexWriter(random(), directory); + for (List doc : docs) { + iw.addDocument(doc); + } + iw.close(); + + try (DirectoryReader unwrapped = DirectoryReader.open(directory); + IndexReader indexReader = wrapDirectoryReader(unwrapped)) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + + LongTerms terms = searchAndReduce( + createIndexSettings(), + indexSearcher, + new MatchAllDocsQuery(), + aggregationBuilder, + Integer.MAX_VALUE, + false, + new NumberFieldMapper.NumberFieldType("a", NumberFieldMapper.NumberType.INTEGER), + bIsString + ? new KeywordFieldMapper.KeywordFieldType("b") + : new NumberFieldMapper.NumberFieldType("b", NumberFieldMapper.NumberType.INTEGER) + ); + assertThat( + terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of(9L, 8L, 7L)) + ); + assertThat( + terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getDocCount).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of(10L, 9L, 8L)) + ); + } + } + } + private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); private List generateDocsWithNested(String id, int value, int[] nestedValues) { List documents = new ArrayList<>(); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 6e09ac844e1be..48c6eb071fac7 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -442,6 +442,24 @@ protected A searchAndReduc AggregationBuilder builder, int maxBucket, MappedFieldType... fieldTypes) throws IOException { + return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, randomBoolean(), fieldTypes); + } + + /** + * Collects all documents that match the provided query {@link Query} and + * returns the reduced {@link InternalAggregation}. + *

+ * @param splitLeavesIntoSeparateAggregators If true this creates a new {@link Aggregator} + * for each leaf as though it were a separate index. If false this aggregates + * all leaves together, like we do in production. + */ + protected A searchAndReduce(IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + boolean splitLeavesIntoSeparateAggregators, + MappedFieldType... fieldTypes) throws IOException { final IndexReaderContext ctx = searcher.getTopReaderContext(); final PipelineTree pipelines = builder.buildPipelineTree(); List aggs = new ArrayList<>(); @@ -458,7 +476,7 @@ protected A searchAndReduc ); C root = createAggregator(builder, context); - if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) { + if (splitLeavesIntoSeparateAggregators && searcher.getIndexReader().leaves().size() > 0) { assertThat(ctx, instanceOf(CompositeReaderContext.class)); final CompositeReaderContext compCTX = (CompositeReaderContext) ctx; final int size = compCTX.leaves().size();