Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sorting terms by cardinality agg (backport of #67839) #67991

Merged
merged 1 commit into from
Jan 26, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix sorting terms by cardinality agg (#67839)
The cardinality agg delays calculating stuff until just before it is
needed. Before #64016 it used the `postCollect` phase to do this work
which was perfect for the `terms` agg but we decided that `postCollect`
was dangerous because some aggs, notably the `parent` and `child` aggs
need to know which children to build and they *can't* during
`postCollect`. After #64016 we built the cardinality agg results when we
built the buckets. But we if you sort on the cardinality agg then you
need to do the `postCollect` stuff in order to know which buckets
to build! So you have a chicken and egg problem. Sort of.

This change splits the difference by running the delayed cardinality agg
stuff as soon as you *either* try to build the buckets *or* read the
cardinality for use with sorting. This works, but is a little janky and
feels wrong. It feels like we could make a structural fix to the way we
read metric values from aggs before building the buckets that would make
this sort of bug much more difficult to cause. But any sort of solution
to this is a larger structural change. So this fixes the bug in the
quick and janky way and we hope to do a more structural fix to the way
we read metrics soon.

Closes #67782
  • Loading branch information
nik9000 committed Jan 26, 2021
commit c1b6dc5bc4669d4648283009619e7fb2090b0fb0
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@@ -152,6 +153,12 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {

@Override
public double metric(long owningBucketOrd) {
try {
// Make sure all outstanding data has been synced down to the counts.
postCollectLastCollector();
} catch (IOException e) {
throw new AggregationExecutionException("error collecting data in last segment", e);
}
return counts == null ? 0 : counts.cardinality(owningBucketOrd);
}

Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
@@ -103,6 +104,13 @@ public void collect(int doc, long bucketOrd) throws IOException {

@Override
protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {
buildCountIfNeeded();
}

private void buildCountIfNeeded() throws IOException {
if (counts != null) {
return;
}
counts = new HyperLogLogPlusPlusSparse(precision, bigArrays, visitedOrds.size());
try (LongArray hashes = bigArrays.newLongArray(maxOrd, false)) {
try (BitArray allVisitedOrds = new BitArray(maxOrd, bigArrays)) {
@@ -141,12 +149,18 @@ protected void beforeBuildingResults(long[] ordsToCollect) throws IOException {

@Override
public double metric(long owningBucketOrd) {
return counts == null ? 0 : counts.cardinality(owningBucketOrd);
try {
// Make sure all outstanding data has been synced down to the counts.
buildCountIfNeeded();
} catch (IOException e) {
throw new AggregationExecutionException("error collecting data in last segment", e);
}
return counts.cardinality(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
if (counts == null || owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
if (owningBucketOrdinal >= counts.maxOrd() || counts.cardinality(owningBucketOrdinal) == 0) {
return buildEmptyAggregation();
}
// We need to build a copy because the returned Aggregation needs remain usable after
Original file line number Diff line number Diff line change
@@ -86,6 +86,7 @@
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorTests;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
@@ -114,6 +115,7 @@

import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketScript;
@@ -1418,6 +1420,71 @@ public void testOrderByPipelineAggregation() throws Exception {
}
}

public void testOrderByCardinality() throws IOException {
boolean bIsString = randomBoolean();
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("a").field("a")
.size(3)
.shardSize(3)
.subAggregation(new CardinalityAggregationBuilder("b").field("b"))
.order(BucketOrder.aggregation("b", false));

/*
* Build documents where larger "a"s obviously have more distinct "b"s
* associated with them. But insert them into Lucene in a random
* order using Lucene's randomizeWriter so we'll bump into situations
* where documents in the last segment change the outcome of the
* cardinality agg. At least, right now the bug has to do with
* documents in the last segment. But randomize so we can catch
* new and strange bugs in the future. Finally, its important that
* we have few enough values that cardinality can be exact.
*/
List<List<IndexableField>> docs = new ArrayList<>();
for (int a = 0; a < 10; a++) {
for (int b = 0; b <= a; b++) {
docs.add(
org.elasticsearch.common.collect.List.of(
new NumericDocValuesField("a", a),
bIsString ? new SortedSetDocValuesField("b", new BytesRef(Integer.toString(b))) : new NumericDocValuesField("b", b)
)
);
}
}
Collections.shuffle(docs, random());
try (Directory directory = newDirectory()) {
RandomIndexWriter iw = new RandomIndexWriter(random(), directory);
for (List<IndexableField> doc : docs) {
iw.addDocument(doc);
}
iw.close();

try (DirectoryReader unwrapped = DirectoryReader.open(directory);
IndexReader indexReader = wrapDirectoryReader(unwrapped)) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);

LongTerms terms = searchAndReduce(
createIndexSettings(),
indexSearcher,
new MatchAllDocsQuery(),
aggregationBuilder,
Integer.MAX_VALUE,
false,
new NumberFieldMapper.NumberFieldType("a", NumberFieldMapper.NumberType.INTEGER),
bIsString
? new KeywordFieldMapper.KeywordFieldType("b")
: new NumberFieldMapper.NumberFieldType("b", NumberFieldMapper.NumberType.INTEGER)
);
assertThat(
terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(toList()),
equalTo(org.elasticsearch.common.collect.List.of(9L, 8L, 7L))
);
assertThat(
terms.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getDocCount).collect(toList()),
equalTo(org.elasticsearch.common.collect.List.of(10L, 9L, 8L))
);
}
}
}

private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
List<Document> documents = new ArrayList<>();
Original file line number Diff line number Diff line change
@@ -442,6 +442,24 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes) throws IOException {
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, randomBoolean(), fieldTypes);
}

/**
* Collects all documents that match the provided query {@link Query} and
* returns the reduced {@link InternalAggregation}.
* <p>
* @param splitLeavesIntoSeparateAggregators If true this creates a new {@link Aggregator}
* for each leaf as though it were a separate index. If false this aggregates
* all leaves together, like we do in production.
*/
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
int maxBucket,
boolean splitLeavesIntoSeparateAggregators,
MappedFieldType... fieldTypes) throws IOException {
final IndexReaderContext ctx = searcher.getTopReaderContext();
final PipelineTree pipelines = builder.buildPipelineTree();
List<InternalAggregation> aggs = new ArrayList<>();
@@ -458,7 +476,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
);
C root = createAggregator(builder, context);

if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) {
if (splitLeavesIntoSeparateAggregators && searcher.getIndexReader().leaves().size() > 0) {
assertThat(ctx, instanceOf(CompositeReaderContext.class));
final CompositeReaderContext compCTX = (CompositeReaderContext) ctx;
final int size = compCTX.leaves().size();