Skip to content

Commit

Permalink
Non-leaf filter operators NULL support. (#11185)
Browse files Browse the repository at this point in the history
  • Loading branch information
shenyu0127 authored Jul 26, 2023
1 parent a0ff2e8 commit d83d1e8
Show file tree
Hide file tree
Showing 29 changed files with 458 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.collections.MapUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.BlockDocIdIterator;
Expand Down Expand Up @@ -58,7 +59,7 @@ public final class AndDocIdSet implements BlockDocIdSet {
private final List<BlockDocIdSet> _docIdSets;
private final boolean _cardinalityBasedRankingForScan;

public AndDocIdSet(List<BlockDocIdSet> docIdSets, Map<String, String> queryOptions) {
public AndDocIdSet(List<BlockDocIdSet> docIdSets, @Nullable Map<String, String> queryOptions) {
_docIdSets = docIdSets;
_cardinalityBasedRankingForScan =
!MapUtils.isEmpty(queryOptions) && QueryOptionsUtils.isAndScanReorderingEnabled(queryOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.AndDocIdSet;
import org.apache.pinot.core.operator.docidsets.MatchAllDocIdSet;
import org.apache.pinot.core.operator.docidsets.OrDocIdSet;
import org.apache.pinot.spi.trace.Tracing;
import org.roaringbitmap.buffer.BufferFastAggregation;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
Expand All @@ -36,23 +38,34 @@ public class AndFilterOperator extends BaseFilterOperator {
private final List<BaseFilterOperator> _filterOperators;
private final Map<String, String> _queryOptions;

public AndFilterOperator(List<BaseFilterOperator> filterOperators, Map<String, String> queryOptions) {
public AndFilterOperator(List<BaseFilterOperator> filterOperators, @Nullable Map<String, String> queryOptions,
int numDocs, boolean nullHandlingEnabled) {
super(numDocs, nullHandlingEnabled);
_filterOperators = filterOperators;
_queryOptions = queryOptions;
}

public AndFilterOperator(List<BaseFilterOperator> filterOperators) {
this(filterOperators, null);
@Override
protected BlockDocIdSet getTrues() {
Tracing.activeRecording().setNumChildren(_filterOperators.size());
List<BlockDocIdSet> blockDocIdSets = new ArrayList<>(_filterOperators.size());
for (BaseFilterOperator filterOperator : _filterOperators) {
blockDocIdSets.add(filterOperator.getTrues());
}
return new AndDocIdSet(blockDocIdSets, _queryOptions);
}

@Override
protected FilterBlock getNextBlock() {
Tracing.activeRecording().setNumChildren(_filterOperators.size());
protected BlockDocIdSet getFalses() {
List<BlockDocIdSet> blockDocIdSets = new ArrayList<>(_filterOperators.size());
for (BaseFilterOperator filterOperator : _filterOperators) {
blockDocIdSets.add(filterOperator.nextBlock().getBlockDocIdSet());
if (filterOperator.isResultEmpty()) {
blockDocIdSets.add(new MatchAllDocIdSet(_numDocs));
} else {
blockDocIdSets.add(filterOperator.getFalses());
}
}
return new FilterBlock(new AndDocIdSet(blockDocIdSets, _queryOptions));
return new OrDocIdSet(blockDocIdSets, _numDocs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,25 @@
*/
package org.apache.pinot.core.operator.filter;

import java.util.Arrays;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.NotDocIdSet;
import org.apache.pinot.core.operator.docidsets.OrDocIdSet;


/**
* The {@link BaseFilterOperator} class is the base class for all filter operators.
*/
public abstract class BaseFilterOperator extends BaseOperator<FilterBlock> {
protected final int _numDocs;
protected final boolean _nullHandlingEnabled;

public BaseFilterOperator(int numDocs, boolean nullHandlingEnabled) {
_numDocs = numDocs;
_nullHandlingEnabled = nullHandlingEnabled;
}

/**
* Returns {@code true} if the result is always empty, {@code false} otherwise.
Expand Down Expand Up @@ -68,4 +79,33 @@ public boolean canProduceBitmaps() {
public BitmapCollection getBitmaps() {
throw new UnsupportedOperationException();
}

@Override
protected FilterBlock getNextBlock() {
return new FilterBlock(getTrues());
}

/**
* @return document IDs in which the predicate evaluates to true.
*/
protected abstract BlockDocIdSet getTrues();

/**
* @return document IDs in which the predicate evaluates to NULL.
*/
protected BlockDocIdSet getNulls() {
throw new UnsupportedOperationException();
}

/**
* @return document IDs in which the predicate evaluates to false.
*/
protected BlockDocIdSet getFalses() {
if (_nullHandlingEnabled) {
return new NotDocIdSet(new OrDocIdSet(Arrays.asList(getTrues(), getNulls()), _numDocs),
_numDocs);
} else {
return new NotDocIdSet(getTrues(), _numDocs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.util.Collections;
import java.util.List;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;

Expand All @@ -31,20 +31,19 @@ public class BitmapBasedFilterOperator extends BaseFilterOperator {

private final ImmutableRoaringBitmap _docIds;
private final boolean _exclusive;
private final int _numDocs;

public BitmapBasedFilterOperator(ImmutableRoaringBitmap docIds, boolean exclusive, int numDocs) {
super(numDocs, false);
_docIds = docIds;
_exclusive = exclusive;
_numDocs = numDocs;
}

@Override
protected FilterBlock getNextBlock() {
protected BlockDocIdSet getTrues() {
if (_exclusive) {
return new FilterBlock(new BitmapDocIdSet(ImmutableRoaringBitmap.flip(_docIds, 0L, _numDocs), _numDocs));
return new BitmapDocIdSet(ImmutableRoaringBitmap.flip(_docIds, 0L, _numDocs), _numDocs);
} else {
return new FilterBlock(new BitmapDocIdSet(_docIds, _numDocs));
return new BitmapDocIdSet(_docIds, _numDocs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.AndDocIdSet;
import org.apache.pinot.spi.trace.Tracing;

Expand All @@ -40,6 +39,8 @@ public class CombinedFilterOperator extends BaseFilterOperator {

public CombinedFilterOperator(BaseFilterOperator mainFilterOperator, BaseFilterOperator subFilterOperator,
Map<String, String> queryOptions) {
// This filter operator does not support AND/OR/NOT operations.
super(0, false);
assert !mainFilterOperator.isResultEmpty() && !mainFilterOperator.isResultMatchingAll()
&& !subFilterOperator.isResultEmpty() && !subFilterOperator.isResultMatchingAll();
_mainFilterOperator = mainFilterOperator;
Expand All @@ -58,10 +59,10 @@ public String toExplainString() {
}

@Override
protected FilterBlock getNextBlock() {
protected BlockDocIdSet getTrues() {
Tracing.activeRecording().setNumChildren(2);
BlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
BlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet();
return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet), _queryOptions));
return new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet), _queryOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@

import java.util.Collections;
import java.util.List;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.ExplainPlanRows;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.EmptyFilterBlock;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.EmptyDocIdSet;


/**
* Singleton class which extends {@link BaseFilterOperator} that is empty, i.e. does not match any document.
*/
public final class EmptyFilterOperator extends BaseFilterOperator {
private EmptyFilterOperator() {
// We will never call its getFalses() method.
super(0, false);
}


Expand Down Expand Up @@ -58,8 +60,8 @@ public int getNumMatchingDocs() {
}

@Override
protected FilterBlock getNextBlock() {
return EmptyFilterBlock.getInstance();
protected BlockDocIdSet getTrues() {
return EmptyDocIdSet.getInstance();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.ColumnContext;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.ExpressionDocIdSet;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
Expand All @@ -43,13 +43,12 @@
public class ExpressionFilterOperator extends BaseFilterOperator {
private static final String EXPLAIN_NAME = "FILTER_EXPRESSION";

private final int _numDocs;
private final Map<String, DataSource> _dataSourceMap;
private final TransformFunction _transformFunction;
private final PredicateEvaluator _predicateEvaluator;

public ExpressionFilterOperator(IndexSegment segment, QueryContext queryContext, Predicate predicate, int numDocs) {
_numDocs = numDocs;
super(numDocs, queryContext.isNullHandlingEnabled());

Set<String> columns = new HashSet<>();
ExpressionContext lhs = predicate.getLhs();
Expand All @@ -69,8 +68,8 @@ public ExpressionFilterOperator(IndexSegment segment, QueryContext queryContext,
}

@Override
protected FilterBlock getNextBlock() {
return new FilterBlock(new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs));
protected BlockDocIdSet getTrues() {
return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,29 +85,29 @@ public BaseFilterOperator getLeafFilterOperator(PredicateEvaluator predicateEval
Predicate.Type predicateType = predicateEvaluator.getPredicateType();
if (predicateType == Predicate.Type.RANGE) {
if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() != null) {
return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
}
if (RangeIndexBasedFilterOperator.canEvaluate(predicateEvaluator, dataSource)) {
return new RangeIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
return new RangeIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
}
return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
} else if (predicateType == Predicate.Type.REGEXP_LIKE) {
if (dataSource.getFSTIndex() != null && dataSource.getDataSourceMetadata().isSorted()) {
return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
}
if (dataSource.getFSTIndex() != null && dataSource.getInvertedIndex() != null) {
return new InvertedIndexFilterOperator(predicateEvaluator, dataSource, numDocs);
return new InvertedIndexFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
}
return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
} else {
if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() != null) {
return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
}
if (dataSource.getInvertedIndex() != null) {
return new InvertedIndexFilterOperator(predicateEvaluator, dataSource, numDocs);
return new InvertedIndexFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
}
if (RangeIndexBasedFilterOperator.canEvaluate(predicateEvaluator, dataSource)) {
return new RangeIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
return new RangeIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
}
return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled);
}
Expand All @@ -134,7 +134,8 @@ public BaseFilterOperator getAndFilterOperator(QueryContext queryContext, List<B
} else {
// Return the AND filter operator with re-ordered child filter operators
reorderAndFilterChildOperators(queryContext, childFilterOperators);
return new AndFilterOperator(childFilterOperators, queryContext.getQueryOptions());
return new AndFilterOperator(childFilterOperators, queryContext.getQueryOptions(), numDocs,
queryContext.isNullHandlingEnabled());
}
}

Expand All @@ -158,7 +159,8 @@ public BaseFilterOperator getOrFilterOperator(QueryContext queryContext,
return childFilterOperators.get(0);
} else {
// Return the OR filter operator with child filter operators
return new OrFilterOperator(childFilterOperators, numDocs);
return new OrFilterOperator(childFilterOperators, queryContext.getQueryOptions(), numDocs,
queryContext.isNullHandlingEnabled());
}
}

Expand All @@ -171,7 +173,7 @@ public BaseFilterOperator getNotFilterOperator(QueryContext queryContext, BaseFi
return new MatchAllFilterOperator(numDocs);
}

return new NotFilterOperator(filterOperator, numDocs);
return new NotFilterOperator(filterOperator, numDocs, queryContext.isNullHandlingEnabled());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.predicate.EqPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
import org.apache.pinot.core.query.request.context.QueryContext;
Expand All @@ -53,17 +53,16 @@ public class H3InclusionIndexFilterOperator extends BaseFilterOperator {
private final IndexSegment _segment;
private final QueryContext _queryContext;
private final Predicate _predicate;
private final int _numDocs;
private final H3IndexReader _h3IndexReader;
private final Geometry _geometry;
private final boolean _isPositiveCheck;

public H3InclusionIndexFilterOperator(IndexSegment segment, QueryContext queryContext, Predicate predicate,
int numDocs) {
super(numDocs, false);
_segment = segment;
_queryContext = queryContext;
_predicate = predicate;
_numDocs = numDocs;

List<ExpressionContext> arguments = predicate.getLhs().getFunction().getArguments();
EqPredicate eqPredicate = (EqPredicate) predicate;
Expand All @@ -81,7 +80,7 @@ public H3InclusionIndexFilterOperator(IndexSegment segment, QueryContext queryCo
}

@Override
protected FilterBlock getNextBlock() {
protected BlockDocIdSet getTrues() {
// get the set of H3 cells at the specified resolution which completely cover the input shape and potential cover.
Pair<LongSet, LongSet> fullCoverAndPotentialCoverCells = _queryContext
.getOrComputeSharedValue(Pair.class, LITERAL_H3_CELLS_CACHE_NAME,
Expand Down Expand Up @@ -121,21 +120,20 @@ protected FilterBlock getNextBlock() {
}

/**
* Returns the filter block based on the given the partial match doc ids.
* Returns the filter block document IDs based on the given the partial match doc ids.
*/
private FilterBlock getFilterBlock(MutableRoaringBitmap fullMatchDocIds, MutableRoaringBitmap partialMatchDocIds) {
private BlockDocIdSet getFilterBlock(MutableRoaringBitmap fullMatchDocIds, MutableRoaringBitmap partialMatchDocIds) {
ExpressionFilterOperator expressionFilterOperator =
new ExpressionFilterOperator(_segment, _queryContext, _predicate, _numDocs);
ScanBasedDocIdIterator docIdIterator =
(ScanBasedDocIdIterator) expressionFilterOperator.getNextBlock().getBlockDocIdSet().iterator();
ScanBasedDocIdIterator docIdIterator = (ScanBasedDocIdIterator) expressionFilterOperator.getTrues().iterator();
MutableRoaringBitmap result = docIdIterator.applyAnd(partialMatchDocIds);
result.or(fullMatchDocIds);
return new FilterBlock(new BitmapDocIdSet(result, _numDocs) {
return new BitmapDocIdSet(result, _numDocs) {
@Override
public long getNumEntriesScannedInFilter() {
return docIdIterator.getNumEntriesScanned();
}
});
};
}

@Override
Expand Down
Loading

0 comments on commit d83d1e8

Please sign in to comment.