Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove aggregation's postCollect phase #64016

Merged
merged 7 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,7 @@ This yields the following aggregation profile output:
"collect": 45786,
"collect_count": 4,
"build_leaf_collector": 18211,
"build_leaf_collector_count": 1,
"post_collection": 929,
"post_collection_count": 1
"build_leaf_collector_count": 1
},
"debug": {
"total_buckets": 1,
Expand All @@ -819,9 +817,7 @@ This yields the following aggregation profile output:
"collect": 69401,
"collect_count": 4,
"build_leaf_collector": 8150,
"build_leaf_collector_count": 1,
"post_collection": 1584,
"post_collection_count": 1
"build_leaf_collector_count": 1
},
"children": [
{
Expand All @@ -838,9 +834,7 @@ This yields the following aggregation profile output:
"collect": 61611,
"collect_count": 4,
"build_leaf_collector": 5564,
"build_leaf_collector_count": 1,
"post_collection": 471,
"post_collection_count": 1
"build_leaf_collector_count": 1
},
"debug": {
"total_buckets": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
}

@Override
public void postCollection() throws IOException {
// Delaying until beforeBuildingBuckets
}

@Override
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {
protected void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this method to make it a bit more clear what it is for.

IndexReader indexReader = context().searcher().getIndexReader();
for (LeafReaderContext ctx : indexReader.leaves()) {
Scorer childDocsScorer = outFilter.scorer(ctx);
Expand Down Expand Up @@ -160,14 +155,13 @@ public int docID() {
* structure that maps a primitive long to a list of primitive
* longs.
*/
for (long owningBucketOrd: ordsToCollect) {
if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) {
collectBucket(sub, docId, owningBucketOrd);
for (long o: bucketOrdsToCollect) {
if (collectionStrategy.exists(o, globalOrdinal)) {
collectBucket(sub, docId, o);
}
}
}
}
super.postCollection(); // Run post collection after collecting the sub-aggs
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
Expand All @@ -258,7 +257,6 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
Expand All @@ -285,4 +283,3 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,5 @@ public ScoreMode scoreMode() {

@Override
public void preCollection() throws IOException {}

@Override
public void postCollection() throws IOException {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,18 @@
public class AggregationProfilerIT extends ESIntegTestCase {
private static final String BUILD_LEAF_COLLECTOR = AggregationTimingType.BUILD_LEAF_COLLECTOR.toString();
private static final String COLLECT = AggregationTimingType.COLLECT.toString();
private static final String POST_COLLECTION = AggregationTimingType.POST_COLLECTION.toString();
private static final String INITIALIZE = AggregationTimingType.INITIALIZE.toString();
private static final String BUILD_AGGREGATION = AggregationTimingType.BUILD_AGGREGATION.toString();
private static final String REDUCE = AggregationTimingType.REDUCE.toString();
private static final Set<String> BREAKDOWN_KEYS = Set.of(
INITIALIZE,
BUILD_LEAF_COLLECTOR,
COLLECT,
POST_COLLECTION,
BUILD_AGGREGATION,
REDUCE,
INITIALIZE + "_count",
BUILD_LEAF_COLLECTOR + "_count",
COLLECT + "_count",
POST_COLLECTION + "_count",
BUILD_AGGREGATION + "_count",
REDUCE + "_count"
);
Expand Down Expand Up @@ -330,7 +327,6 @@ public void testDiversifiedAggProfile() {
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(diversifyBreakdown.get(COLLECT), greaterThan(0L));
assertThat(diversifyBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(diversifyBreakdown.get(REDUCE), equalTo(0L));
assertThat(diversifyAggResult.getDebugInfo(), equalTo(Map.of(DEFERRED, List.of("max"))));
Expand All @@ -347,7 +343,6 @@ public void testDiversifiedAggProfile() {
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(diversifyBreakdown.get(COLLECT), greaterThan(0L));
assertThat(diversifyBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand Down Expand Up @@ -391,7 +386,6 @@ public void testComplexProfile() {
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
assertThat(histoBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(histoBreakdown.get(REDUCE), equalTo(0L));
Map<String, Object> histoDebugInfo = histoAggResult.getDebugInfo();
Expand All @@ -413,7 +407,6 @@ public void testComplexProfile() {
assertThat(tagsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(tagsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(tagsAggResult);
Expand All @@ -432,7 +425,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -448,7 +440,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -464,7 +455,6 @@ public void testComplexProfile() {
assertThat(stringsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(stringsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(stringsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(stringsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(stringsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(stringsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(stringsAggResult);
Expand All @@ -483,7 +473,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -499,7 +488,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -516,7 +504,6 @@ public void testComplexProfile() {
assertThat(tagsBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(tagsBreakdown.get(COLLECT), greaterThan(0L));
assertThat(tagsBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(tagsBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(tagsBreakdown.get(REDUCE), equalTo(0L));
assertRemapTermsDebugInfo(tagsAggResult);
Expand All @@ -535,7 +522,6 @@ public void testComplexProfile() {
assertThat(avgBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(avgBreakdown.get(COLLECT), greaterThan(0L));
assertThat(avgBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(avgBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(avgBreakdown.get(REDUCE), equalTo(0L));
assertThat(avgAggResult.getDebugInfo(), equalTo(Map.of()));
Expand All @@ -551,7 +537,6 @@ public void testComplexProfile() {
assertThat(maxBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
assertThat(maxBreakdown.get(COLLECT), greaterThan(0L));
assertThat(maxBreakdown.get(POST_COLLECTION), greaterThan(0L));
assertThat(maxBreakdown.get(BUILD_AGGREGATION), greaterThan(0L));
assertThat(maxBreakdown.get(REDUCE), equalTo(0L));
assertThat(maxAggResult.getDebugInfo(), equalTo(Map.of()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public void execute(SearchContext context) {
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : context.aggregations().aggregators()) {
try {
aggregator.postCollection();
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ public interface BucketComparator {

/**
* Build the results of this aggregation.
* @param owningBucketOrds the ordinals of the buckets that we want to
* @param ordsToCollect the ordinals of the buckets that we want to
* collect from this aggregation
* @return the results for each ordinal, in the same order as the array
* of ordinals
*/
public abstract InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a more clear variable name. I made this change in a dead end when I was working on this change locally but I kind of like it.

public abstract InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException;

/**
* Build the result of this aggregation if it is at the "top level"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ public void preCollection() throws IOException {
badState();
}

@Override
public void postCollection() throws IOException {
badState();
}
@Override
public ScoreMode scoreMode() {
badState();
Expand Down Expand Up @@ -248,19 +244,6 @@ public SearchContext context() {
return context;
}

/**
* Called after collection of all document is done.
* <p>
* Warning: this is not final only to allow the parent join aggregator
* to delay this until building buckets.
*/
@Override
public void postCollection() throws IOException {
// post-collect this agg before subs to make it possible to buffer and then replay in postCollection()
doPostCollection();
collectableSubAggregators.postCollection();
}

/** Called upon release of the aggregator. */
@Override
public void close() {
Expand All @@ -274,12 +257,6 @@ public void close() {
/** Release instance-specific data. */
protected void doClose() {}

/**
* Can be overridden by aggregator implementation to be called back when the collection phase ends.
*/
protected void doPostCollection() throws IOException {
}

protected final InternalAggregations buildEmptySubAggregations() {
List<InternalAggregation> aggs = new ArrayList<>();
for (Aggregator aggregator : subAggregators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ public void preCollection() throws IOException {
// no-op
}
@Override
public void postCollection() throws IOException {
// no-op
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
Expand All @@ -58,10 +54,4 @@ public ScoreMode scoreMode() {
* Pre collection callback.
*/
public abstract void preCollection() throws IOException;

/**
* Post-collection callback.
*/
public abstract void postCollection() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,6 @@ public void preCollection() throws IOException {
}
}

@Override
public void postCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.postCollection();
}
}

@Override
public String toString() {
return Arrays.toString(collectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ static class Entry {
protected PackedLongValues.Builder docDeltasBuilder;
protected PackedLongValues.Builder bucketsBuilder;
protected long maxBucket = -1;
protected boolean finished = false;
protected LongHash selectedBuckets;

/**
Expand Down Expand Up @@ -136,20 +135,12 @@ public void preCollection() throws IOException {
collector.preCollection();
}

@Override
public void postCollection() throws IOException {
finishLeaf();
finished = true;
}

/**
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
finishLeaf();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty happy about how this turned out! No more finished to check on. It's just all built in now.

if (this.selectedBuckets != null) {
throw new IllegalStateException("Already been replayed");
}
Expand Down Expand Up @@ -201,7 +192,6 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
// continue with the following leaf
}
}
collector.postCollection();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ public final int bucketDocCount(long bucketOrd) {
}

/**
* Hook to allow taking an action before building buckets.
* Hook to allow taking an action before building the sub agg results.
*/
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
protected void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {}

/**
* Build the results of the sub-aggregations of the buckets at each of
Expand All @@ -186,7 +186,7 @@ protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
* array of ordinals
*/
protected final InternalAggregations[] buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
beforeBuildingBuckets(bucketOrdsToCollect);
prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

Expand Down Expand Up @@ -91,9 +91,9 @@ protected boolean shouldDefer(Aggregator aggregator) {
}

@Override
protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {
protected final void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {
if (recordingWrapper != null) {
recordingWrapper.prepareSelectedBuckets(ordsToCollect);
recordingWrapper.prepareSelectedBuckets(bucketOrdsToCollect);
}
}

Expand Down
Loading