Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push join build table values as filter incase of duplicates #12225

Merged
merged 6 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ JoinMatcher makeJoinMatcher(
);

/**
* Returns all nonnull values from a particular column if they are all unique, if there are "maxNumValues" or fewer,
* and if the column exists and supports this operation. Otherwise, returns an empty Optional.
* Returns all non-null values from a particular column along with a flag to tell if they are all unique in the column.
* If the non-null values are greater than "maxNumValues" or if the column doesn't exists or doesn't supports this
* operation, returns an object with empty set for column values and false for uniqueness flag.
* The uniqueness flag will only be true if we've collected all non-null values in the column and found that they're
* all unique. In all other cases it will be false.
*
* The returned set may be passed to {@link org.apache.druid.query.filter.InDimFilter}. For efficiency,
* implementations should prefer creating the returned set with
Expand All @@ -96,7 +99,7 @@ JoinMatcher makeJoinMatcher(
* @param columnName name of the column
* @param maxNumValues maximum number of values to return
*/
Optional<Set<String>> getNonNullColumnValuesIfAllUnique(String columnName, int maxNumValues);
ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, int maxNumValues);

/**
* Searches a column from this Joinable for a particular value, finds rows that match,
Expand Down Expand Up @@ -125,4 +128,27 @@ Optional<Set<String>> getCorrelatedColumnValues(
long maxCorrelationSetSize,
boolean allowNonKeyColumnSearch
);


class ColumnValuesWithUniqueFlag
{
final Set<String> columnValues;
final boolean allUnique;

public ColumnValuesWithUniqueFlag(Set<String> columnValues, boolean allUnique)
{
this.columnValues = columnValues;
this.allUnique = allUnique;
}

public Set<String> getColumnValues()
{
return columnValues;
}

public boolean isAllUnique()
{
return allUnique;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
Expand All @@ -38,6 +37,7 @@
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.filter.FalseFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
Expand Down Expand Up @@ -228,16 +228,19 @@ static Pair<List<Filter>, List<JoinableClause>> convertJoinsToFilters(
columnsRequiredByJoinClauses.remove(column, 1);
}

final Optional<Filter> filter =
final JoinClauseToFilterConversion joinClauseToFilterConversion =
convertJoinToFilter(
clause,
Sets.union(requiredColumns, columnsRequiredByJoinClauses.elementSet()),
maxNumFilterValues
);

if (filter.isPresent()) {
filterList.add(filter.get());
} else {
// add the converted filter to the filter list
if (joinClauseToFilterConversion.getConvertedFilter() != null) {
filterList.add(joinClauseToFilterConversion.getConvertedFilter());
}
// if the converted filter is partial, keep the join clause too
if (!joinClauseToFilterConversion.isJoinClauseFullyConverted()) {
clausesToUse.add(clause);
atStart = false;
}
Expand All @@ -246,11 +249,6 @@ static Pair<List<Filter>, List<JoinableClause>> convertJoinsToFilters(
}
}

// Sanity check. If this exception is ever thrown, it's a bug.
if (filterList.size() + clausesToUse.size() != clauses.size()) {
throw new ISE("Lost a join clause during planning");
}

return Pair.of(filterList, clausesToUse);
}

Expand All @@ -260,11 +258,17 @@ static Pair<List<Filter>, List<JoinableClause>> convertJoinsToFilters(
* The requirements are:
*
* - it must be an INNER equi-join
* - the right-hand columns referenced by the condition must not have any duplicate values
* - the right-hand columns referenced by the condition must not have any duplicate values. If there are duplicates
* values in the column, then the join is tried to be converted to a filter while maintaining the join clause on top
* as well for correct results.
* - no columns from the right-hand side can appear in "requiredColumns"
*
* @return {@link JoinClauseToFilterConversion} object which contains the converted filter for the clause and a boolean
* to represent whether the converted filter encapsulates the whole clause or not. More semantics of the object are
* present in the class level docs.
*/
@VisibleForTesting
static Optional<Filter> convertJoinToFilter(
static JoinClauseToFilterConversion convertJoinToFilter(
final JoinableClause clause,
final Set<String> requiredColumns,
final int maxNumFilterValues
Expand All @@ -276,28 +280,74 @@ static Optional<Filter> convertJoinToFilter(
&& clause.getCondition().getEquiConditions().size() > 0) {
final List<Filter> filters = new ArrayList<>();
int numValues = maxNumFilterValues;
boolean joinClauseFullyConverted = true;

for (final Equality condition : clause.getCondition().getEquiConditions()) {
final String leftColumn = condition.getLeftExpr().getBindingIfIdentifier();

if (leftColumn == null) {
return Optional.empty();
return new JoinClauseToFilterConversion(null, false);
}

final Optional<Set<String>> columnValuesForFilter =
clause.getJoinable().getNonNullColumnValuesIfAllUnique(condition.getRightColumn(), numValues);
Joinable.ColumnValuesWithUniqueFlag columnValuesWithUniqueFlag =
clause.getJoinable().getNonNullColumnValues(condition.getRightColumn(), numValues);
// For an empty values set, isAllUnique flag will be true only if the column had no non-null values.
if (columnValuesWithUniqueFlag.getColumnValues().isEmpty()) {
if (columnValuesWithUniqueFlag.isAllUnique()) {
return new JoinClauseToFilterConversion(FalseFilter.instance(), true);
} else {
joinClauseFullyConverted = false;
}
continue;
}

if (columnValuesForFilter.isPresent()) {
numValues -= columnValuesForFilter.get().size();
filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesForFilter.get())));
} else {
return Optional.empty();
numValues -= columnValuesWithUniqueFlag.getColumnValues().size();
filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues())));
if (!columnValuesWithUniqueFlag.isAllUnique()) {
joinClauseFullyConverted = false;
}
}

return Optional.of(Filters.and(filters));
return new JoinClauseToFilterConversion(Filters.maybeAnd(filters).orElse(null), joinClauseFullyConverted);
}

return Optional.empty();
return new JoinClauseToFilterConversion(null, false);
}

/**
* Encapsulates the conversion which happened for a joinable clause.
* convertedFilter represents the filter which got generated from the conversion.
* joinClauseFullyConverted represents whether convertedFilter fully encapsulated the joinable clause or not.
* Encapsulation of the clause means that the filter can replace the whole joinable clause.
*
* If convertedFilter is null and joinClauseFullyConverted is true, it means that all parts of the joinable clause can
* be broken into filters. Further, all the clause conditions are on columns where the right side is only null values.
* In that case, we replace joinable with a FalseFilter.
* If convertedFilter is null and joinClauseFullyConverted is false, it means that no parts of the joinable clause can
* be broken into filters.
* If convertedFilter is non-null, then joinClauseFullyConverted represents whether the filter encapsulates the clause
* which was converted.
*/
private static class JoinClauseToFilterConversion
{
private final @Nullable Filter convertedFilter;
private final boolean joinClauseFullyConverted;

public JoinClauseToFilterConversion(@Nullable Filter convertedFilter, boolean joinClauseFullyConverted)
{
this.convertedFilter = convertedFilter;
this.joinClauseFullyConverted = joinClauseFullyConverted;
}

@Nullable
public Filter getConvertedFilter()
{
return convertedFilter;
}

public boolean isJoinClauseFullyConverted()
{
return joinClauseFullyConverted;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public JoinMatcher makeJoinMatcher(
}

@Override
public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(String columnName, int maxNumValues)
public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, int maxNumValues)
{
if (LookupColumnSelectorFactory.KEY_COLUMN.equals(columnName) && extractor.canGetKeySet()) {
final Set<String> keys = extractor.keySet();
Expand All @@ -117,14 +117,14 @@ public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(String columnName
}

if (nonNullKeys > maxNumValues) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), false);
} else if (nonNullKeys == keys.size()) {
return Optional.of(keys);
return new ColumnValuesWithUniqueFlag(keys, true);
} else {
return Optional.of(Sets.difference(keys, nullEquivalentValues));
return new ColumnValuesWithUniqueFlag(Sets.difference(keys, nullEquivalentValues), true);
}
} else {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment.join.table;

import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.guava.Comparators;
Expand Down Expand Up @@ -92,35 +93,36 @@ public JoinMatcher makeJoinMatcher(
}

@Override
public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(final String columnName, final int maxNumValues)
public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, final int maxNumValues)
{
final int columnPosition = table.rowSignature().indexOf(columnName);

if (columnPosition < 0) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), false);
}

try (final IndexedTable.Reader reader = table.columnReader(columnPosition)) {
// Sorted set to encourage "in" filters that result from this method to do dictionary lookups in order.
// The hopes are that this will improve locality and therefore improve performance.
final Set<String> allValues = createValuesSet();
boolean allUnique = true;

for (int i = 0; i < table.numRows(); i++) {
final String s = DimensionHandlerUtils.convertObjectToString(reader.read(i));

if (!NullHandling.isNullOrEquivalent(s)) {
if (!allValues.add(s)) {
// Duplicate found. Since the values are not all unique, we must return an empty Optional.
return Optional.empty();
// Duplicate found
allUnique = false;
}

if (allValues.size() > maxNumValues) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), false);
}
}
}

return Optional.of(allValues);
return new ColumnValuesWithUniqueFlag(allValues, allUnique);
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Loading