Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the filter rewrite optimization #14464

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3f43898
Refactor
bowenlan-amzn Jun 19, 2024
7d9d57e
Refactor
bowenlan-amzn Jun 19, 2024
1a067ba
Refactor
bowenlan-amzn Jun 19, 2024
e8e9ad3
Refactor
bowenlan-amzn Jun 20, 2024
7c491b9
Refactor
bowenlan-amzn Jun 20, 2024
a158f78
Refactor
bowenlan-amzn Jun 21, 2024
8f10faf
Refactor
bowenlan-amzn Jun 21, 2024
6c097f3
Fix a bug
bowenlan-amzn Jun 21, 2024
40c42fa
address comment
bowenlan-amzn Jul 2, 2024
5fde041
prepareFromSegment now doesn't return Ranges
bowenlan-amzn Jul 2, 2024
510a052
how it looks like when introduce interfaces
bowenlan-amzn Jul 2, 2024
4a333af
remove interface, clean up
bowenlan-amzn Jul 2, 2024
9325061
improve doc
bowenlan-amzn Jul 2, 2024
bf958ea
Merge branch 'main' into 14435-refactor-range-agg-optimization
bowenlan-amzn Jul 2, 2024
5439401
Merge branch 'main' into 14435-refactor-range-agg-optimization
bowenlan-amzn Jul 10, 2024
3a60a81
move multirangetraversal logic to helper
bowenlan-amzn Jul 10, 2024
094c25b
improve the refactor
bowenlan-amzn Jul 17, 2024
85009ad
Merge branch 'main' into 14435-refactor-range-agg-optimization
bowenlan-amzn Aug 1, 2024
afc9bbd
Merge branch 'main' into 14435-refactor-range-agg-optimization
bowenlan-amzn Aug 5, 2024
778f1ce
Address Marc's comments
bowenlan-amzn Aug 7, 2024
234eb44
Address concurrent segment search concern
bowenlan-amzn Aug 7, 2024
e896927
remove circular dependency
bowenlan-amzn Aug 7, 2024
8962ee3
Address comment
bowenlan-amzn Aug 8, 2024
86cacab
Merge branch 'main' into 14435-refactor-range-agg-optimization
bowenlan-amzn Aug 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper.AbstractDateHistogramAggregationType;
import org.opensearch.search.aggregations.bucket.filterrewrite.CompositeAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -89,13 +89,15 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
* Main aggregator that aggregates docs from mulitple aggregations
* Main aggregator that aggregates docs from multiple aggregations
*
* @opensearch.internal
*/
Expand All @@ -118,9 +120,8 @@

private boolean earlyTerminated;

private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
private LongKeyedBucketOrds bucketOrds = null;
private Rounding.Prepared preparedRounding = null;
private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
private LongKeyedBucketOrds bucketOrds;

CompositeAggregator(
String name,
Expand Down Expand Up @@ -166,57 +167,62 @@
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) {
return;
}
fastFilterContext.setAggregationType(new CompositeAggregationType());
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is used for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared();
fastFilterContext.buildRanges(sourceConfigs[0].fieldType());
}
}
CompositeAggregatorBridge bridge = new CompositeAggregatorBridge() {
private RoundingValuesSource valuesSource;
private long afterKey = -1L;

/**
* Currently the filter rewrite is only supported for date histograms
*/
public class CompositeAggregationType extends AbstractDateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;

public CompositeAggregationType() {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
@Override
protected boolean canOptimize() {
if (canOptimize(sourceConfigs)) {
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");

Check warning on line 181 in server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java#L181

Added line #L181 was not covered by tests
});
}

// bucketOrds is used for saving the date histogram results got from the optimization path
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
return true;
}
return false;
}
}

public Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}
@Override
protected void prepare() throws IOException {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
buildRanges(context);
}

public Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
protected Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}

@Override
protected void processAfterKey(long[] bound, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bound[0] = afterKey + interval;
protected Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
}

public int getSize() {
return size;
}
@Override
protected long[] processAfterKey(long[] bounds, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bounds[0] = afterKey + interval;
}
return bounds;
}

@Override
protected int getSize() {
return size;
}

@Override
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
}
};
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
}

@Override
Expand Down Expand Up @@ -368,7 +374,7 @@
return v2 != null && DocValues.unwrapSingleton(v2) == null;

default:
// we have no clue whether the field is multi-valued or not so we assume it is.
// we have no clue whether the field is multivalued or not so we assume it is.
return true;
}
}
Expand Down Expand Up @@ -551,11 +557,7 @@

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = fastFilterContext.tryFastFilterAggregation(
ctx,
this::incrementBucketDocCount,
(key) -> bucketOrds.add(0, preparedRounding.round((long) key))
);
boolean optimized = filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
if (optimized) throw new CollectionTerminatedException();

finishLeaf();
Expand Down Expand Up @@ -709,11 +711,6 @@

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
if (fastFilterContext.optimizedSegments > 0) {
add.accept("optimized_segments", fastFilterContext.optimizedSegments);
add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments);
add.accept("leaf_visited", fastFilterContext.leaf);
add.accept("inner_visited", fastFilterContext.inner);
}
filterRewriteOptimizationContext.populateDebugInfo(add);

Check warning on line 714 in server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java#L714

Added line #L714 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.opensearch.index.mapper.MappedFieldType;

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* This interface provides a bridge between an aggregator and the optimization context, allowing
* the aggregator to provide data and optimize the aggregation process.
*
* <p>The main purpose of this interface is to encapsulate the aggregator-specific optimization
* logic and provide access to the data in Aggregator that is required for optimization, while keeping the optimization
* business logic separate from the aggregator implementation.
*
* <p>To use this interface to optimize an aggregator, you should subclass this interface in this package
* and put any specific optimization business logic in it. Then implement this subclass in the aggregator
* to provide data that is needed for doing the optimization
*
* @opensearch.internal
*/
public abstract class AggregatorBridge {

/**
* The field type associated with this aggregator bridge.
*/
MappedFieldType fieldType;

Consumer<Ranges> setRanges;

void setRangesConsumer(Consumer<Ranges> setRanges) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
this.setRanges = setRanges;
}

/**
* Checks whether the aggregator can be optimized.
* <p>
* This method is supposed to be implemented in a specific aggregator to take in fields from there
*
* @return {@code true} if the aggregator can be optimized, {@code false} otherwise.
* The result will be saved in the optimization context.
*/
protected abstract boolean canOptimize();

/**
* Prepares the optimization at shard level after checking aggregator is optimizable.
* <p>
* For example, figure out what are the ranges from the aggregation to do the optimization later
* <p>
* This method is supposed to be implemented in a specific aggregator to take in fields from there
*/
protected abstract void prepare() throws IOException;

/**
* Prepares the optimization for a specific segment when the segment is functionally matching all docs
*
* @param leaf the leaf reader context for the segment
*/
abstract Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException;

/**
* Attempts to build aggregation results for a segment
*
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
* @param ranges
*/
abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges
) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;

/**
* For composite aggregation to do optimization when it only has a single date histogram source
*/
public abstract class CompositeAggregatorBridge extends DateHistogramAggregatorBridge {
protected boolean canOptimize(CompositeValuesSourceConfig[] sourceConfigs) {
if (sourceConfigs.length != 1 || !(sourceConfigs[0].valuesSource() instanceof RoundingValuesSource)) return false;
return canOptimize(sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript(), sourceConfigs[0].fieldType());
}

private boolean canOptimize(boolean missing, boolean hasScript, MappedFieldType fieldType) {
if (!missing && !hasScript) {
if (fieldType instanceof DateFieldMapper.DateFieldType) {
if (fieldType.isSearchable()) {
this.fieldType = fieldType;
return true;
}
}
}
return false;
}
}
Loading
Loading