Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push join build table values as filter incase of duplicates #12225

Merged
merged 6 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ JoinMatcher makeJoinMatcher(
);

/**
* Returns all nonnull values from a particular column if they are all unique, if there are "maxNumValues" or fewer,
* and if the column exists and supports this operation. Otherwise, returns an empty Optional.
* Returns all nonnull values from a particular column along with a flag to tell if they are all unique,
* if there are "maxNumValues" or fewer, and if the column exists and supports this operation.
* Otherwise, returns an empty Optional.
*
* @param columnName name of the column
* @param maxNumValues maximum number of values to return
*/
Optional<Set<String>> getNonNullColumnValuesIfAllUnique(String columnName, int maxNumValues);
ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, int maxNumValues);

/**
* Searches a column from this Joinable for a particular value, finds rows that match,
Expand All @@ -116,4 +117,27 @@ Optional<Set<String>> getCorrelatedColumnValues(
long maxCorrelationSetSize,
boolean allowNonKeyColumnSearch
);


class ColumnValuesWithUniqueFlag
{
final Set<String> columnValues;
final boolean allUnique;

public ColumnValuesWithUniqueFlag(Set<String> columnValues, boolean allUnique)
{
this.columnValues = columnValues;
this.allUnique = allUnique;
}

public Set<String> getColumnValues()
{
return columnValues;
}

public boolean isAllUnique()
{
return allUnique;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
Expand Down Expand Up @@ -228,16 +228,15 @@ static Pair<List<Filter>, List<JoinableClause>> convertJoinsToFilters(
columnsRequiredByJoinClauses.remove(column, 1);
}

final Optional<Filter> filter =
final NonnullPair<Optional<Filter>, Boolean> filter =
convertJoinToFilter(
clause,
Sets.union(requiredColumns, columnsRequiredByJoinClauses.elementSet()),
maxNumFilterValues
);

if (filter.isPresent()) {
filterList.add(filter.get());
} else {
filter.lhs.ifPresent(filterList::add);
if (!filter.rhs) {
clausesToUse.add(clause);
atStart = false;
}
Expand All @@ -246,11 +245,6 @@ static Pair<List<Filter>, List<JoinableClause>> convertJoinsToFilters(
}
}

// Sanity check. If this exception is ever thrown, it's a bug.
if (filterList.size() + clausesToUse.size() != clauses.size()) {
throw new ISE("Lost a join clause during planning");
}

return Pair.of(filterList, clausesToUse);
}

Expand All @@ -262,9 +256,11 @@ static Pair<List<Filter>, List<JoinableClause>> convertJoinsToFilters(
* - it must be an INNER equi-join
* - the right-hand columns referenced by the condition must not have any duplicate values
* - no columns from the right-hand side can appear in "requiredColumns"
* @return a pair of filter extracted from the joinable clause, and a boolean to indicate whether the filter encompasses
* the whole joinable clause
*/
@VisibleForTesting
static Optional<Filter> convertJoinToFilter(
static NonnullPair<Optional<Filter>, Boolean> convertJoinToFilter(
final JoinableClause clause,
final Set<String> requiredColumns,
final int maxNumFilterValues
Expand All @@ -276,28 +272,31 @@ static Optional<Filter> convertJoinToFilter(
&& clause.getCondition().getEquiConditions().size() > 0) {
final List<Filter> filters = new ArrayList<>();
int numValues = maxNumFilterValues;
boolean dropClause = true;

for (final Equality condition : clause.getCondition().getEquiConditions()) {
final String leftColumn = condition.getLeftExpr().getBindingIfIdentifier();

if (leftColumn == null) {
return Optional.empty();
return new NonnullPair<>(Optional.empty(), false);
}

final Optional<Set<String>> columnValuesForFilter =
clause.getJoinable().getNonNullColumnValuesIfAllUnique(condition.getRightColumn(), numValues);
Joinable.ColumnValuesWithUniqueFlag columnValuesWithUniqueFlag = clause.getJoinable().getNonNullColumnValues(condition.getRightColumn(), numValues);
if (columnValuesWithUniqueFlag.getColumnValues().isEmpty()) {
dropClause = false;
continue;
}

if (columnValuesForFilter.isPresent()) {
numValues -= columnValuesForFilter.get().size();
filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesForFilter.get())));
} else {
return Optional.empty();
numValues -= columnValuesWithUniqueFlag.getColumnValues().size();
filters.add(Filters.toFilter(new InDimFilter(leftColumn, columnValuesWithUniqueFlag.getColumnValues())));
if (!columnValuesWithUniqueFlag.isAllUnique()) {
dropClause = false;
}
}

return Optional.of(Filters.and(filters));
return new NonnullPair<>(filters.isEmpty() ? Optional.empty() : Optional.of(Filters.and(filters)), dropClause);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we ever get filters = Optional.empty() and dropClause = true? If so what is the expectation in that case?

IMO, the NonnullPair is tough to think about here, so a special class would be better. That way it could have some javadocs about the expectations. Its constructor should also do any relevant invariant checks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we ever get filters = Optional.empty() and dropClause = true? If so what is the expectation in that case?

Yes, that can happen if the joinableClause has conditions on columns which only have nulls in it. Currently, in such a case I add a FalseFilter in place of the joinable clause - does that seem ok?

IMO, the NonnullPair is tough to think about here, so a special class would be better. That way it could have some javadocs about the expectations. Its constructor should also do any relevant invariant checks.

Yes, completely agree. 👍 Thanks for the suggestion - I've made a holder instead of a pair and added documentation regarding its semantics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that can happen if the joinableClause has conditions on columns which only have nulls in it. Currently, in such a case I add a FalseFilter in place of the joinable clause - does that seem ok?

It makes sense to me. If the behavior is described in javadocs for the new holder class, that's even better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thanks for suggestion - added to the javadocs for holder class

}

return Optional.empty();
return new NonnullPair<>(Optional.empty(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public JoinMatcher makeJoinMatcher(
}

@Override
public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(String columnName, int maxNumValues)
public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, int maxNumValues)
{
if (LookupColumnSelectorFactory.KEY_COLUMN.equals(columnName) && extractor.canGetKeySet()) {
final Set<String> keys = extractor.keySet();
Expand All @@ -117,14 +117,14 @@ public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(String columnName
}

if (nonNullKeys > maxNumValues) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), true);
} else if (nonNullKeys == keys.size()) {
return Optional.of(keys);
return new ColumnValuesWithUniqueFlag(keys, true);
} else {
return Optional.of(Sets.difference(keys, nullEquivalentValues));
return new ColumnValuesWithUniqueFlag(Sets.difference(keys, nullEquivalentValues), true);
}
} else {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment.join.table;

import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.guava.Comparators;
Expand Down Expand Up @@ -93,12 +94,12 @@ public JoinMatcher makeJoinMatcher(
}

@Override
public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(final String columnName, final int maxNumValues)
public ColumnValuesWithUniqueFlag getNonNullColumnValues(String columnName, final int maxNumValues)
{
final int columnPosition = table.rowSignature().indexOf(columnName);

if (columnPosition < 0) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), true);
}

try (final IndexedTable.Reader reader = table.columnReader(columnPosition)) {
Expand All @@ -108,23 +109,24 @@ public Optional<Set<String>> getNonNullColumnValuesIfAllUnique(final String colu
// Note: we are using Comparators.naturalNullsFirst() because it prevents the need for lambda-wrapping in
// InDimFilter's "createStringPredicate" method.
final Set<String> allValues = new TreeSet<>(Comparators.naturalNullsFirst());
boolean allUnique = true;

for (int i = 0; i < table.numRows(); i++) {
final String s = DimensionHandlerUtils.convertObjectToString(reader.read(i));

if (!NullHandling.isNullOrEquivalent(s)) {
if (!allValues.add(s)) {
// Duplicate found. Since the values are not all unique, we must return an empty Optional.
return Optional.empty();
allUnique = false;
}

if (allValues.size() > maxNumValues) {
return Optional.empty();
return new ColumnValuesWithUniqueFlag(ImmutableSet.of(), true);
}
}
}

return Optional.of(allValues);
return new ColumnValuesWithUniqueFlag(allValues, allUnique);
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import com.google.common.collect.Sets;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.TestQuery;
Expand All @@ -45,7 +47,12 @@
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.join.table.RowBasedIndexedTable;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
Expand All @@ -61,6 +68,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

public class JoinableFactoryWrapperTest extends NullHandlingTest
{
Expand All @@ -81,6 +89,25 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest
? TEST_LOOKUP.keySet()
: Sets.difference(TEST_LOOKUP.keySet(), Collections.singleton(""));

private static final InlineDataSource INDEXED_TABLE_DS = InlineDataSource.fromIterable(
ImmutableList.of(
new Object[]{"Mexico"},
new Object[]{"Norway"},
new Object[]{"El Salvador"},
new Object[]{"United States"},
new Object[]{"United States"}
),
RowSignature.builder().add("country", ColumnType.STRING).build()
);

private static final IndexedTable TEST_INDEXED_TABLE = new RowBasedIndexedTable<>(
INDEXED_TABLE_DS.getRowsAsList(),
INDEXED_TABLE_DS.rowAdapter(),
INDEXED_TABLE_DS.getRowSignature(),
ImmutableSet.of("country"),
DateTimes.nowUtc().toString()
);

@Rule
public ExpectedException expectedException = ExpectedException.none();

Expand Down Expand Up @@ -467,6 +494,33 @@ public void test_convertJoinsToFilters_convertInnerJoin()
);
}

@Test
public void test_convertJoinsToPartialFilters_convertInnerJoin()
{
JoinableClause joinableClause = new JoinableClause(
"j.",
new IndexedTableJoinable(TEST_INDEXED_TABLE),
JoinType.INNER,
JoinConditionAnalysis.forExpression("x == \"j.country\"", "j.", ExprMacroTable.nil())
);
final Pair<List<Filter>, List<JoinableClause>> conversion = JoinableFactoryWrapper.convertJoinsToFilters(
ImmutableList.of(joinableClause),
ImmutableSet.of("x"),
Integer.MAX_VALUE
);

Assert.assertEquals(
Pair.of(
ImmutableList.of(new InDimFilter(
"x",
INDEXED_TABLE_DS.getRowsAsList().stream().map(row -> row[0].toString()).collect(Collectors.toSet()))
),
ImmutableList.of(joinableClause) // the joinable clause remains intact since we've duplicates in country column
),
conversion
);
}

@Test
public void test_convertJoinsToFilters_convertTwoInnerJoins()
{
Expand Down Expand Up @@ -506,6 +560,51 @@ public void test_convertJoinsToFilters_convertTwoInnerJoins()
);
}

@Test
public void test_convertJoinsToPartialAndFullFilters_convertMultipleInnerJoins()
{
final ImmutableList<JoinableClause> clauses = ImmutableList.of(
new JoinableClause(
"j.",
LookupJoinable.wrap(new MapLookupExtractor(TEST_LOOKUP, false)),
JoinType.INNER,
JoinConditionAnalysis.forExpression("x == \"j.k\"", "j.", ExprMacroTable.nil())
), // this joinable will be fully converted to a filter
new JoinableClause(
"_j.",
new IndexedTableJoinable(TEST_INDEXED_TABLE),
JoinType.INNER,
JoinConditionAnalysis.forExpression("x == \"_j.country\"", "_j.", ExprMacroTable.nil())
), // this joinable will be partially converted to a filter since we've duplicates on country column
new JoinableClause(
"__j.",
new IndexedTableJoinable(TEST_INDEXED_TABLE),
JoinType.LEFT,
JoinConditionAnalysis.forExpression("x == \"__j.country\"", "__j.", ExprMacroTable.nil())
) // this joinable will not be converted to filter since its a LEFT join
);

final Pair<List<Filter>, List<JoinableClause>> conversion = JoinableFactoryWrapper.convertJoinsToFilters(
clauses,
ImmutableSet.of("x"),
Integer.MAX_VALUE
);

Assert.assertEquals(
Pair.of(
ImmutableList.of(
new InDimFilter("x", TEST_LOOKUP_KEYS),
new InDimFilter(
"x",
INDEXED_TABLE_DS.getRowsAsList().stream().map(row -> row[0].toString()).collect(Collectors.toSet())
)
),
ImmutableList.of(clauses.get(1), clauses.get(2))
),
conversion
);
}

@Test
public void test_convertJoinsToFilters_dontConvertTooManyValues()
{
Expand Down
Loading