Skip to content

Commit

Permalink
Fixup rare terms (#64366)
Browse files Browse the repository at this point in the history
In #64016 I caused a bug in rare_terms where we would rewrite the
current leaf and remove all hits from it, causing us to trip an
assertion. This didn't happen before because previously we never rewrote
the current leaf. The fix involves cleaning up the state that the
deferring collector uses if we end up removing all hits.

This requires some super intimate knowledge of how
`BestBucketsDeferringCollector` works so I decided to move the
implementation of that mergin from `MergingBucketsDeferringCollector`
into `BestBucketsDeferringCollector`. This means that
`MergingBucketsDeferringCollector` is pretty much empty now. So I've
deprecated it and will remove it entirely in an subsequent change.

Closes #64356
  • Loading branch information
nik9000 authored Oct 29, 2020
1 parent 2c58841 commit b4b01ec
Show file tree
Hide file tree
Showing 17 changed files with 282 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.LongUnaryOperator;

/**
* A specialization of {@link DeferringBucketCollector} that collects all
Expand All @@ -62,16 +63,16 @@ static class Entry {
}
}

protected List<Entry> entries = new ArrayList<>();
protected BucketCollector collector;
private final Query topLevelQuery;
private final IndexSearcher searcher;
protected final boolean isGlobal;
protected LeafReaderContext context;
protected PackedLongValues.Builder docDeltasBuilder;
protected PackedLongValues.Builder bucketsBuilder;
protected long maxBucket = -1;
protected LongHash selectedBuckets;
private final boolean isGlobal;

private List<Entry> entries = new ArrayList<>();
private BucketCollector collector;
private LeafReaderContext context;
private PackedLongValues.Builder docDeltasBuilder;
private PackedLongValues.Builder bucketsBuilder;
private LongHash selectedBuckets;

/**
* Sole constructor.
Expand All @@ -97,21 +98,30 @@ public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = MultiBucketCollector.wrap(deferredCollectors);
}

/**
* Button up the builders for the current leaf.
*/
private void finishLeaf() {
if (context != null) {
assert docDeltasBuilder != null && bucketsBuilder != null;
assert docDeltasBuilder.size() > 0;
entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build()));
clearLeaf();
}
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();

/**
* Clear the status for the current leaf.
*/
private void clearLeaf() {
context = null;
// allocates the builder lazily in case this segment doesn't contain any match
docDeltasBuilder = null;
bucketsBuilder = null;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();

return new LeafBucketCollector() {
int lastDoc = 0;
Expand All @@ -126,7 +136,6 @@ public void collect(int doc, long bucket) throws IOException {
docDeltasBuilder.add(doc - lastDoc);
bucketsBuilder.add(bucket);
lastDoc = doc;
maxBucket = Math.max(maxBucket, bucket);
}
};
}
Expand Down Expand Up @@ -201,7 +210,6 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
*/
@Override
public Aggregator wrap(final Aggregator in) {

return new WrappedAggregator(in) {
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
Expand All @@ -220,4 +228,87 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
};
}

/**
* Merge or prune the selected buckets.
* <p>
* This process rebuilds some packed structures and is O(number_of_collected_docs) so
* do your best to skip calling it unless you need it.
*
* @param howToRewrite a unary operator which maps a bucket's ordinal to the ordinal it has
* after this process. If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*/
public void rewriteBuckets(LongUnaryOperator howToRewrite) {
List<Entry> newEntries = new ArrayList<>(entries.size());
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
PackedLongValues.Builder newDocDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
PackedLongValues.Iterator docDeltasItr = sourceEntry.docDeltas.iterator();

long lastGoodDelta = 0;
for (PackedLongValues.Iterator itr = sourceEntry.buckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
assert docDeltasItr.hasNext();
long delta = docDeltasItr.next();

// Only merge in the ordinal if it hasn't been "removed", signified with -1
long ordinal = howToRewrite.applyAsLong(bucket);

if (ordinal != -1) {
newBuckets.add(ordinal);
newDocDeltas.add(delta + lastGoodDelta);
lastGoodDelta = 0;
} else {
// we are skipping this ordinal, which means we need to accumulate the
// doc delta's since the last "good" delta
lastGoodDelta += delta;
}
}
// Only create an entry if this segment has buckets after merging
if (newBuckets.size() > 0) {
assert newDocDeltas.size() > 0 : "docDeltas was empty but we had buckets";
newEntries.add(new Entry(sourceEntry.context, newDocDeltas.build(), newBuckets.build()));
}
}
entries = newEntries;

// if there are buckets that have been collected in the current segment
// we need to update the bucket ordinals there too
if (bucketsBuilder != null && bucketsBuilder.size() > 0) {
PackedLongValues currentBuckets = bucketsBuilder.build();
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
PackedLongValues.Builder newDocDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);

// The current segment's deltas aren't built yet, so build to a temp object
PackedLongValues currentDeltas = docDeltasBuilder.build();
PackedLongValues.Iterator docDeltasItr = currentDeltas.iterator();

long lastGoodDelta = 0;
for (PackedLongValues.Iterator itr = currentBuckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
assert docDeltasItr.hasNext();
long delta = docDeltasItr.next();
long ordinal = howToRewrite.applyAsLong(bucket);

// Only merge in the ordinal if it hasn't been "removed", signified with -1
if (ordinal != -1) {
newBuckets.add(ordinal);
newDocDeltas.add(delta + lastGoodDelta);
lastGoodDelta = 0;
} else {
// we are skipping this ordinal, which means we need to accumulate the
// doc delta's since the last "good" delta.
// The first is skipped because the original deltas are stored as offsets from first doc,
// not offsets from 0
lastGoodDelta += delta;
}
}
if (newDocDeltas.size() == 0) {
// We've decided not to keep *anything* in the current leaf so we should just pitch our state.
clearLeaf();
} else {
docDeltasBuilder = newDocDeltas;
bucketsBuilder = newBuckets;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,22 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
*
* Refer to that method for documentation about the merge map.
*
* @deprecated use {@link mergeBuckets(long, LongUnaryOperator)}
* @deprecated use {@link rewriteBuckets(long, LongUnaryOperator)}
*/
@Deprecated
public final void mergeBuckets(long[] mergeMap, long newNumBuckets) {
mergeBuckets(newNumBuckets, bucket -> mergeMap[Math.toIntExact(bucket)]);
rewriteBuckets(newNumBuckets, bucket -> mergeMap[Math.toIntExact(bucket)]);
}

/**
*
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*
* This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(LongUnaryOperator)} to
* This only tidies up doc counts. Call {@link BestBucketsDeferringCollector#rewriteBuckets(LongUnaryOperator)} to
* merge the actual ordinals and doc ID deltas.
*/
public final void mergeBuckets(long newNumBuckets, LongUnaryOperator mergeMap){
public final void rewriteBuckets(long newNumBuckets, LongUnaryOperator mergeMap){
try (IntArray oldDocCounts = docCounts) {
docCounts = bigArrays.newIntArray(newNumBuckets, true);
docCounts.fill(0, newNumBuckets, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator {
* Wrapper that records collections. Non-null if any aggregations have
* been deferred.
*/
private DeferringBucketCollector recordingWrapper;
private DeferringBucketCollector deferringCollector;
private List<String> deferredAggregationNames;

protected DeferableBucketAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
Expand All @@ -52,28 +52,38 @@ protected void doPreCollection() throws IOException {
List<BucketCollector> deferredAggregations = null;
for (int i = 0; i < subAggregators.length; ++i) {
if (shouldDefer(subAggregators[i])) {
if (recordingWrapper == null) {
recordingWrapper = getDeferringCollector();
if (deferringCollector == null) {
deferringCollector = buildDeferringCollector();
deferredAggregations = new ArrayList<>(subAggregators.length);
deferredAggregationNames = new ArrayList<>(subAggregators.length);
}
deferredAggregations.add(subAggregators[i]);
deferredAggregationNames.add(subAggregators[i].name());
subAggregators[i] = recordingWrapper.wrap(subAggregators[i]);
subAggregators[i] = deferringCollector.wrap(subAggregators[i]);
} else {
collectors.add(subAggregators[i]);
}
}
if (recordingWrapper != null) {
recordingWrapper.setDeferredCollector(deferredAggregations);
collectors.add(recordingWrapper);
if (deferringCollector != null) {
deferringCollector.setDeferredCollector(deferredAggregations);
collectors.add(deferringCollector);
}
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
}

public DeferringBucketCollector getDeferringCollector() {
// Default impl is a collector that selects the best buckets
// but an alternative defer policy may be based on best docs.
/**
* Get the deferring collector.
*/
protected DeferringBucketCollector deferringCollector() {
return deferringCollector;
}

/**
* Build the {@link DeferringBucketCollector}. The default implementation
* replays all hits against the buckets selected by
* {#link {@link DeferringBucketCollector#prepareSelectedBuckets(long...)}.
*/
protected DeferringBucketCollector buildDeferringCollector() {
return new BestBucketsDeferringCollector(topLevelQuery(), searcher(), descendsFromGlobalAggregator(parent()));
}

Expand All @@ -92,8 +102,8 @@ protected boolean shouldDefer(Aggregator aggregator) {

@Override
protected final void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {
if (recordingWrapper != null) {
recordingWrapper.prepareSelectedBuckets(bucketOrdsToCollect);
if (deferringCollector != null) {
deferringCollector.prepareSelectedBuckets(bucketOrdsToCollect);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@

import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;

import java.util.ArrayList;
import java.util.List;
import java.util.function.LongUnaryOperator;

/**
* A specialization of {@link BestBucketsDeferringCollector} that collects all
* matches and then is able to replay a given subset of buckets. Exposes
* mergeBuckets, which can be invoked by the aggregator when increasing the
* rounding interval.
* @deprecated Use {@link BestBucketsDeferringCollector}
*/
@Deprecated
public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollector {
public MergingBucketsDeferringCollector(Query topLevelQuery, IndexSearcher searcher, boolean isGlobal) {
super(topLevelQuery, searcher, isGlobal);
Expand All @@ -54,89 +52,10 @@ public MergingBucketsDeferringCollector(Query topLevelQuery, IndexSearcher searc
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
* not be called unless there are actually changes to be made, to avoid unnecessary work.
*
* @deprecated use {@link mergeBuckets(LongUnaryOperator)}
* @deprecated use {@link BestBucketsDeferringCollector#rewriteBuckets(LongUnaryOperator)}
*/
@Deprecated
public void mergeBuckets(long[] mergeMap) {
mergeBuckets(bucket -> mergeMap[Math.toIntExact(bucket)]);
}

/**
* Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap.
*
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
* not be called unless there are actually changes to be made, to avoid unnecessary work.
*/
public void mergeBuckets(LongUnaryOperator mergeMap){
List<Entry> newEntries = new ArrayList<>(entries.size());
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
PackedLongValues.Builder newDocDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
PackedLongValues.Iterator docDeltasItr = sourceEntry.docDeltas.iterator();

long lastGoodDelta = 0;
for (PackedLongValues.Iterator itr = sourceEntry.buckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
assert docDeltasItr.hasNext();
long delta = docDeltasItr.next();

// Only merge in the ordinal if it hasn't been "removed", signified with -1
long ordinal = mergeMap.applyAsLong(bucket);

if (ordinal != -1) {
newBuckets.add(ordinal);
newDocDeltas.add(delta + lastGoodDelta);
lastGoodDelta = 0;
} else {
// we are skipping this ordinal, which means we need to accumulate the
// doc delta's since the last "good" delta
lastGoodDelta += delta;
}
}
// Only create an entry if this segment has buckets after merging
if (newBuckets.size() > 0) {
assert newDocDeltas.size() > 0 : "docDeltas was empty but we had buckets";
newEntries.add(new Entry(sourceEntry.context, newDocDeltas.build(), newBuckets.build()));
}
}
entries = newEntries;

// if there are buckets that have been collected in the current segment
// we need to update the bucket ordinals there too
if (bucketsBuilder != null && bucketsBuilder.size() > 0) {
PackedLongValues currentBuckets = bucketsBuilder.build();
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
PackedLongValues.Builder newDocDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);

// The current segment's deltas aren't built yet, so build to a temp object
PackedLongValues currentDeltas = docDeltasBuilder.build();
PackedLongValues.Iterator docDeltasItr = currentDeltas.iterator();

long lastGoodDelta = 0;
for (PackedLongValues.Iterator itr = currentBuckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
assert docDeltasItr.hasNext();
long delta = docDeltasItr.next();
long ordinal = mergeMap.applyAsLong(bucket);

// Only merge in the ordinal if it hasn't been "removed", signified with -1
if (ordinal != -1) {
newBuckets.add(ordinal);
newDocDeltas.add(delta + lastGoodDelta);
lastGoodDelta = 0;
} else {
// we are skipping this ordinal, which means we need to accumulate the
// doc delta's since the last "good" delta.
// The first is skipped because the original deltas are stored as offsets from first doc,
// not offsets from 0
lastGoodDelta += delta;
}
}
docDeltasBuilder = newDocDeltas;
bucketsBuilder = newBuckets;
}
rewriteBuckets(bucket -> mergeMap[Math.toIntExact(bucket)]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected final boolean shouldDefer(Aggregator aggregator) {
}

@Override
public final DeferringBucketCollector getDeferringCollector() {
public final DeferringBucketCollector buildDeferringCollector() {
deferringCollector = new MergingBucketsDeferringCollector(topLevelQuery(), searcher(), descendsFromGlobalAggregator(parent()));
return deferringCollector;
}
Expand Down
Loading

0 comments on commit b4b01ec

Please sign in to comment.