Skip to content

Commit

Permalink
Refactor the filter rewrite optimization (#14464)
Browse files Browse the repository at this point in the history
* Refactor

Split the single Helper classes and move the classes into a new package for any optimization we introduced for search path.
Rename the class name to make it more straightforward and general

Signed-off-by: bowenlan-amzn <[email protected]>

* Refactor

refactor the canOptimize logic
sort out the basic rule about how to provide data from aggregator, and where to put common logic

Signed-off-by: bowenlan-amzn <[email protected]>

* Refactor

refactor the data provider and try optimize logic

Signed-off-by: bowenlan-amzn <[email protected]>

* Refactor

Signed-off-by: bowenlan-amzn <[email protected]>

* Refactor

extract segment match all logic

Signed-off-by: bowenlan-amzn <[email protected]>

* Refactor

Signed-off-by: bowenlan-amzn <[email protected]>

* Refactor

inline class

Signed-off-by: bowenlan-amzn <[email protected]>

* Fix a bug

Signed-off-by: bowenlan-amzn <[email protected]>

* address comment

Signed-off-by: bowenlan-amzn <[email protected]>

* prepareFromSegment now doesn't return Ranges

Signed-off-by: bowenlan-amzn <[email protected]>

* how it looks like when introduce interfaces

Signed-off-by: bowenlan-amzn <[email protected]>

* remove interface, clean up

Signed-off-by: bowenlan-amzn <[email protected]>

* improve doc

Signed-off-by: bowenlan-amzn <[email protected]>

* move multirangetraversal logic to helper

Signed-off-by: bowenlan-amzn <[email protected]>

* improve the refactor

package name -> filterrewrite
move tree traversal logic to new class
add documentation for important abstract methods
add sub class for composite aggregation bridge

Signed-off-by: bowenlan-amzn <[email protected]>

* Address Marc's comments

Signed-off-by: bowenlan-amzn <[email protected]>

* Address concurrent segment search concern

To save the ranges per segment, now change to a map that save ranges for segments separately.

The increment document function "incrementBucketDocCount" should already be thread safe, as it's the same method used by normal aggregation execution path

Signed-off-by: bowenlan-amzn <[email protected]>

* remove circular dependency

Signed-off-by: bowenlan-amzn <[email protected]>

* Address comment

- remove map of segment ranges, pass in by calling getRanges when needed
- use AtomicInteger for the debug info

Signed-off-by: bowenlan-amzn <[email protected]>

---------

Signed-off-by: bowenlan-amzn <[email protected]>
(cherry picked from commit 170ea27)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Aug 9, 2024
1 parent 6fbd079 commit 6c301f2
Show file tree
Hide file tree
Showing 14 changed files with 1,256 additions and 1,027 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper.AbstractDateHistogramAggregationType;
import org.opensearch.search.aggregations.bucket.filterrewrite.CompositeAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -89,13 +89,15 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
* Main aggregator that aggregates docs from mulitple aggregations
* Main aggregator that aggregates docs from multiple aggregations
*
* @opensearch.internal
*/
Expand All @@ -118,9 +120,8 @@ public final class CompositeAggregator extends BucketsAggregator {

private boolean earlyTerminated;

private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
private LongKeyedBucketOrds bucketOrds = null;
private Rounding.Prepared preparedRounding = null;
private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
private LongKeyedBucketOrds bucketOrds;

CompositeAggregator(
String name,
Expand Down Expand Up @@ -166,57 +167,62 @@ public final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) {
return;
}
fastFilterContext.setAggregationType(new CompositeAggregationType());
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is used for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared();
fastFilterContext.buildRanges(sourceConfigs[0].fieldType());
}
}
CompositeAggregatorBridge bridge = new CompositeAggregatorBridge() {
private RoundingValuesSource valuesSource;
private long afterKey = -1L;

/**
* Currently the filter rewrite is only supported for date histograms
*/
public class CompositeAggregationType extends AbstractDateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;

public CompositeAggregationType() {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
@Override
protected boolean canOptimize() {
if (canOptimize(sourceConfigs)) {
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");

Check warning on line 181 in server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java#L181

Added line #L181 was not covered by tests
});
}

// bucketOrds is used for saving the date histogram results got from the optimization path
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
return true;
}
return false;
}
}

public Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}
@Override
protected void prepare() throws IOException {
buildRanges(context);
}

public Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
protected Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}

@Override
protected void processAfterKey(long[] bound, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bound[0] = afterKey + interval;
protected Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
}

public int getSize() {
return size;
}
@Override
protected long[] processAfterKey(long[] bounds, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bounds[0] = afterKey + interval;
}
return bounds;
}

@Override
protected int getSize() {
return size;
}

@Override
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
}
};
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
}

@Override
Expand Down Expand Up @@ -368,7 +374,7 @@ private boolean isMaybeMultivalued(LeafReaderContext context, SortField sortFiel
return v2 != null && DocValues.unwrapSingleton(v2) == null;

default:
// we have no clue whether the field is multi-valued or not so we assume it is.
// we have no clue whether the field is multivalued or not so we assume it is.
return true;
}
}
Expand Down Expand Up @@ -551,11 +557,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = fastFilterContext.tryFastFilterAggregation(
ctx,
this::incrementBucketDocCount,
(key) -> bucketOrds.add(0, preparedRounding.round((long) key))
);
boolean optimized = filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
if (optimized) throw new CollectionTerminatedException();

finishLeaf();
Expand Down Expand Up @@ -709,11 +711,6 @@ private static class Entry {

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
if (fastFilterContext.optimizedSegments > 0) {
add.accept("optimized_segments", fastFilterContext.optimizedSegments);
add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments);
add.accept("leaf_visited", fastFilterContext.leaf);
add.accept("inner_visited", fastFilterContext.inner);
}
filterRewriteOptimizationContext.populateDebugInfo(add);

Check warning on line 714 in server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java#L714

Added line #L714 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.opensearch.index.mapper.MappedFieldType;

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* This interface provides a bridge between an aggregator and the optimization context, allowing
* the aggregator to provide data and optimize the aggregation process.
*
* <p>The main purpose of this interface is to encapsulate the aggregator-specific optimization
* logic and provide access to the data in Aggregator that is required for optimization, while keeping the optimization
* business logic separate from the aggregator implementation.
*
* <p>To use this interface to optimize an aggregator, you should subclass this interface in this package
* and put any specific optimization business logic in it. Then implement this subclass in the aggregator
* to provide data that is needed for doing the optimization
*
* @opensearch.internal
*/
public abstract class AggregatorBridge {

/**
* The field type associated with this aggregator bridge.
*/
MappedFieldType fieldType;

Consumer<Ranges> setRanges;

void setRangesConsumer(Consumer<Ranges> setRanges) {
this.setRanges = setRanges;
}

/**
* Checks whether the aggregator can be optimized.
* <p>
* This method is supposed to be implemented in a specific aggregator to take in fields from there
*
* @return {@code true} if the aggregator can be optimized, {@code false} otherwise.
* The result will be saved in the optimization context.
*/
protected abstract boolean canOptimize();

/**
* Prepares the optimization at shard level after checking aggregator is optimizable.
* <p>
* For example, figure out what are the ranges from the aggregation to do the optimization later
* <p>
* This method is supposed to be implemented in a specific aggregator to take in fields from there
*/
protected abstract void prepare() throws IOException;

/**
* Prepares the optimization for a specific segment when the segment is functionally matching all docs
*
* @param leaf the leaf reader context for the segment
*/
abstract Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException;

/**
* Attempts to build aggregation results for a segment
*
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
* @param ranges
*/
abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges
) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;

/**
* For composite aggregation to do optimization when it only has a single date histogram source
*/
public abstract class CompositeAggregatorBridge extends DateHistogramAggregatorBridge {
protected boolean canOptimize(CompositeValuesSourceConfig[] sourceConfigs) {
if (sourceConfigs.length != 1 || !(sourceConfigs[0].valuesSource() instanceof RoundingValuesSource)) return false;
return canOptimize(sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript(), sourceConfigs[0].fieldType());
}

private boolean canOptimize(boolean missing, boolean hasScript, MappedFieldType fieldType) {
if (!missing && !hasScript) {
if (fieldType instanceof DateFieldMapper.DateFieldType) {
if (fieldType.isSearchable()) {
this.fieldType = fieldType;
return true;
}
}
}
return false;
}
}
Loading

0 comments on commit 6c301f2

Please sign in to comment.