Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
- remove map of segment ranges, pass in by calling getRanges when needed
- use AtomicInteger for the debug info

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Aug 8, 2024
1 parent e896927 commit 8962ee3
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void setRangesConsumer(Consumer<Ranges> setRanges) {
*
* @param leaf the leaf reader context for the segment
*/
abstract Ranges prepareFromSegment(LeafReaderContext leaf) throws IOException;
abstract Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException;

/**
* Attempts to build aggregation results for a segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected void buildRanges(SearchContext context) throws IOException {
}

@Override
final Ranges prepareFromSegment(LeafReaderContext leaf) throws IOException {
final Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException {
long[] bounds = Helper.getSegmentBounds(leaf, fieldType.name());
return buildRanges(bounds, maxRewriteFilters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
Expand All @@ -41,14 +40,13 @@ public final class FilterRewriteOptimizationContext {
private final AggregatorBridge aggregatorBridge;
private String shardId;

private Ranges ranges;
private final Map<Integer, Ranges> rangesFromSegment = new HashMap<>(); // map of segment ordinal to its ranges
private Ranges ranges; // built at shard level

// debug info related fields
private int leafNodeVisited;
private int innerNodeVisited;
private int segments;
private int optimizedSegments;
private final AtomicInteger leafNodeVisited = new AtomicInteger();
private final AtomicInteger innerNodeVisited = new AtomicInteger();
private final AtomicInteger segments = new AtomicInteger();
private final AtomicInteger optimizedSegments = new AtomicInteger();

public FilterRewriteOptimizationContext(
AggregatorBridge aggregatorBridge,
Expand Down Expand Up @@ -90,19 +88,6 @@ void setRanges(Ranges ranges) {
this.ranges = ranges;
}

void setRangesFromSegment(int leafOrd, Ranges ranges) {
this.rangesFromSegment.put(leafOrd, ranges);
}

void clearRangesFromSegment(int leafOrd) {
this.rangesFromSegment.remove(leafOrd);
}

Ranges getRanges(int leafOrd) {
if (!preparedAtShardLevel) return rangesFromSegment.get(leafOrd);
return ranges;
}

/**
* Try to populate the bucket doc counts for aggregation
* <p>
Expand All @@ -113,7 +98,7 @@ Ranges getRanges(int leafOrd) {
*/
public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer<Long, Long> incrementDocCount, boolean segmentMatchAll)
throws IOException {
segments++;
segments.incrementAndGet();
if (!canOptimize) {
return false;
}
Expand All @@ -135,62 +120,70 @@ public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer<Lon
return false;
}

Ranges ranges = tryBuildRangesFromSegment(leafCtx, segmentMatchAll);
Ranges ranges = getRanges(leafCtx, segmentMatchAll);
if (ranges == null) return false;

consumeDebugInfo(aggregatorBridge.tryOptimize(values, incrementDocCount, getRanges(leafCtx.ord)));
consumeDebugInfo(aggregatorBridge.tryOptimize(values, incrementDocCount, ranges));

optimizedSegments++;
optimizedSegments.incrementAndGet();
logger.debug("Fast filter optimization applied to shard {} segment {}", shardId, leafCtx.ord);
logger.debug("Crossed leaf nodes: {}, inner nodes: {}", leafNodeVisited, innerNodeVisited);

clearRangesFromSegment(leafCtx.ord);
return true;
}

Ranges getRanges(LeafReaderContext leafCtx, boolean segmentMatchAll) {
if (!preparedAtShardLevel) {
try {
return getRangesFromSegment(leafCtx, segmentMatchAll);
} catch (IOException e) {
logger.warn("Failed to build ranges from segment.", e);
return null;
}
}
return ranges;
}

/**
* Even when ranges cannot be built at shard level, we can still build ranges
* at segment level when it's functionally match-all at segment level
*/
private Ranges tryBuildRangesFromSegment(LeafReaderContext leafCtx, boolean segmentMatchAll) throws IOException {
if (!preparedAtShardLevel && !segmentMatchAll) {
private Ranges getRangesFromSegment(LeafReaderContext leafCtx, boolean segmentMatchAll) throws IOException {
if (!segmentMatchAll) {
return null;
}

if (!preparedAtShardLevel) { // not built at shard level but segment match all
logger.debug("Shard {} segment {} functionally match all documents. Build the fast filter", shardId, leafCtx.ord);
setRangesFromSegment(leafCtx.ord, aggregatorBridge.prepareFromSegment(leafCtx));
}
return getRanges(leafCtx.ord);
logger.debug("Shard {} segment {} functionally match all documents. Build the fast filter", shardId, leafCtx.ord);
return aggregatorBridge.tryBuildRangesFromSegment(leafCtx);
}

/**
* Contains debug info of BKD traversal to show in profile
*/
static class DebugInfo {
private int leafNodeVisited = 0; // leaf node visited
private int innerNodeVisited = 0; // inner node visited
private final AtomicInteger leafNodeVisited = new AtomicInteger(); // leaf node visited
private final AtomicInteger innerNodeVisited = new AtomicInteger(); // inner node visited

void visitLeaf() {
leafNodeVisited++;
leafNodeVisited.incrementAndGet();
}

void visitInner() {
innerNodeVisited++;
innerNodeVisited.incrementAndGet();
}
}

void consumeDebugInfo(DebugInfo debug) {
leafNodeVisited += debug.leafNodeVisited;
innerNodeVisited += debug.innerNodeVisited;
leafNodeVisited.addAndGet(debug.leafNodeVisited.get());
innerNodeVisited.addAndGet(debug.innerNodeVisited.get());
}

public void populateDebugInfo(BiConsumer<String, Object> add) {
if (optimizedSegments > 0) {
add.accept("optimized_segments", optimizedSegments);
add.accept("unoptimized_segments", segments - optimizedSegments);
add.accept("leaf_visited", leafNodeVisited);
add.accept("inner_visited", innerNodeVisited);
if (optimizedSegments.get() > 0) {
add.accept("optimized_segments", optimizedSegments.get());
add.accept("unoptimized_segments", segments.get() - optimizedSegments.get());
add.accept("leaf_visited", leafNodeVisited.get());
add.accept("inner_visited", innerNodeVisited.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected void buildRanges(RangeAggregator.Range[] ranges) {
}

@Override
final Ranges prepareFromSegment(LeafReaderContext leaf) {
final Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) {
throw new UnsupportedOperationException("Range aggregation should not build ranges at segment level");
}

Expand Down

0 comments on commit 8962ee3

Please sign in to comment.