Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard against exponential increase of filters during CNF conversion #12314

Merged
merged 2 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.filter.cnf.CNFFilterExplosionException;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
Expand Down Expand Up @@ -370,7 +371,7 @@ public void readOrFilter(Blackhole blackhole)
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void readOrFilterCNF(Blackhole blackhole)
public void readOrFilterCNF(Blackhole blackhole) throws CNFFilterExplosionException
{
Filter filter = new NoBitmapSelectorFilter("dimSequential", "199");
Filter filter2 = new AndFilter(Arrays.asList(new SelectorFilter("dimMultivalEnumerated2", "Corundum"), new NoBitmapSelectorFilter("dimMultivalEnumerated", "Bar")));
Expand Down Expand Up @@ -421,7 +422,7 @@ public void readComplexOrFilter(Blackhole blackhole)
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void readComplexOrFilterCNF(Blackhole blackhole)
public void readComplexOrFilterCNF(Blackhole blackhole) throws CNFFilterExplosionException
{
DimFilter dimFilter1 = new OrDimFilter(Arrays.asList(
new SelectorDimFilter("dimSequential", "199", null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.filter.cnf.CNFFilterExplosionException;
import org.apache.druid.segment.filter.cnf.CalciteCnfHelper;
import org.apache.druid.segment.filter.cnf.HiveCnfHelper;
import org.apache.druid.segment.join.filter.AllNullColumnSelectorFactory;
Expand Down Expand Up @@ -433,10 +434,15 @@ public static Filter convertToCNFFromQueryContext(Query query, @Nullable Filter
return null;
}
boolean useCNF = query.getContextBoolean(QueryContexts.USE_FILTER_CNF_KEY, QueryContexts.DEFAULT_USE_FILTER_CNF);
return useCNF ? Filters.toCnf(filter) : filter;
try {
return useCNF ? Filters.toCnf(filter) : filter;
}
catch (CNFFilterExplosionException cnfFilterExplosionException) {
return filter; // cannot convert to CNF, return the filter as is
Copy link
Contributor

@cryptoe cryptoe Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just put a log that we are not converting to cnf due to 'CNFFilterExplosionExceptions` so that people debugging the query know whats happening.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clusters where this happens regularly, that can generate a lot of log spew. You could argue that they should respond to the log spew by not doing the queries, but it would still be a sudden chunk of log spew. In general, CNF is a potential optimization done by the planner, we don't log other optimization choices that we make, we just make them. In this case, it's deeming that converting to CNF is more expensive than just leaving it as is, which is yet another optimization decision. I don't think it needs a log, or if it does, maybe at debug level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand where your coming from. In such a case, I would then tend towards not throwing an exception and refactoring the Filter.toCnf(filter) to return an appropriate state which helps us decide if the conversionWasSucessfull or not. Maybe use Optional.of()so that we avoid throwing exceptions due to performance overheads of exceptions.
We can take this up as future work. Maybe when we are making this limit configurable.

}
}

public static Filter toCnf(Filter current)
public static Filter toCnf(Filter current) throws CNFFilterExplosionException
{
// Push down NOT filters to leaves if possible to remove NOT on NOT filters and reduce hierarchy.
// ex) ~(a OR ~b) => ~a AND b
Expand Down Expand Up @@ -578,7 +584,7 @@ public static Optional<Filter> maybeOr(final List<Filter> filters)
*
* @return The normalized or clauses for the provided filter.
*/
public static List<Filter> toNormalizedOrClauses(Filter filter)
public static List<Filter> toNormalizedOrClauses(Filter filter) throws CNFFilterExplosionException
{
Filter normalizedFilter = Filters.toCnf(filter);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.filter.cnf;

import org.apache.druid.java.util.common.StringUtils;

public class CNFFilterExplosionException extends Exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a BadQueryException? Since i think this is user controllable by setting a context flag or not, I think doing this would result in it being a user error when it eventually surfaces

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, BadQueryException makes sense but one problem is that it is an un-checked exception. I was trying to make this a checked exception so that callers must decide how to handle it. wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that makes sense, looking closer I see now that these exceptions are always eaten and don't make it to the user, i admittedly just did a drive by scan of this PR earlier and thought they might make it to the surface 😅

though thinking about it, maybe it is worth a log of some sort to know that this is happening so that there is some indicator of why the expected query isn't happening?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess a log might be kind of noisy if this was coming from the SQL planner, and would need to be collected and printed at the end instead. We have a mechanism for collecting errors this way, but not for informative messages; this could be done at a later time I think (if is actually useful to have a way to know why the planner did or didn't do a think that was asked for).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with that - it would make more sense to have something in a per query structure which is shown to the user.

{
public CNFFilterExplosionException(String formatText, Object... arguments)
{
super(StringUtils.nonStrictFormat(formatText, arguments));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment.filter.cnf;

import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.query.filter.BooleanFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.filter.AndFilter;
Expand All @@ -37,6 +38,8 @@
*/
public class HiveCnfHelper
{
private static final int CNF_MAX_FILTER_THRESHOLD = 10_000;

public static Filter pushDownNot(Filter current)
{
if (current instanceof NotFilter) {
Expand Down Expand Up @@ -78,17 +81,37 @@ public static Filter pushDownNot(Filter current)
return current;
}

public static Filter convertToCnf(Filter current)
public static Filter convertToCnf(Filter current) throws CNFFilterExplosionException
{
return convertToCnfWithLimit(current, CNF_MAX_FILTER_THRESHOLD).lhs;
}

/**
* Converts a filter to CNF form with a limit on filter count
* @param maxCNFFilterLimit the maximum number of filters allowed in CNF conversion
* @return a pair of the CNF converted filter and the new remaining filter limit
* @throws CNFFilterExplosionException is thrown if the filters in CNF representation go beyond maxCNFFilterLimit
*/
private static NonnullPair<Filter, Integer> convertToCnfWithLimit(
Filter current,
int maxCNFFilterLimit
) throws CNFFilterExplosionException
{
if (current instanceof NotFilter) {
return new NotFilter(convertToCnf(((NotFilter) current).getBaseFilter()));
NonnullPair<Filter, Integer> result = convertToCnfWithLimit(((NotFilter) current).getBaseFilter(), maxCNFFilterLimit);
return new NonnullPair<>(new NotFilter(result.lhs), result.rhs);
}
if (current instanceof AndFilter) {
List<Filter> children = new ArrayList<>();
for (Filter child : ((AndFilter) current).getFilters()) {
children.add(convertToCnf(child));
NonnullPair<Filter, Integer> result = convertToCnfWithLimit(child, maxCNFFilterLimit);
children.add(result.lhs);
maxCNFFilterLimit = result.rhs;
if (maxCNFFilterLimit < 0) {
throw new CNFFilterExplosionException("Exceeded maximum allowed filters for CNF (conjunctive normal form) conversion");
}
}
return Filters.and(children);
return new NonnullPair<>(Filters.and(children), maxCNFFilterLimit);
}
if (current instanceof OrFilter) {
// a list of leaves that weren't under AND expressions
Expand All @@ -107,11 +130,11 @@ public static Filter convertToCnf(Filter current)
}
if (!andList.isEmpty()) {
List<Filter> result = new ArrayList<>();
generateAllCombinations(result, andList, nonAndList);
return Filters.and(result);
generateAllCombinations(result, andList, nonAndList, maxCNFFilterLimit);
return new NonnullPair<>(Filters.and(result), maxCNFFilterLimit - result.size());
}
}
return current;
return new NonnullPair<>(current, maxCNFFilterLimit);
}

public static Filter flatten(Filter root)
Expand Down Expand Up @@ -158,8 +181,9 @@ public static Filter flatten(Filter root)
private static void generateAllCombinations(
List<Filter> result,
List<Filter> andList,
List<Filter> nonAndList
)
List<Filter> nonAndList,
int maxAllowedFilters
) throws CNFFilterExplosionException
{
List<Filter> children = new ArrayList<>(((AndFilter) andList.get(0)).getFilters());
if (result.isEmpty()) {
Expand All @@ -168,6 +192,9 @@ private static void generateAllCombinations(
a.add(child);
// Result must receive an actual OrFilter, so wrap if Filters.or managed to un-OR it.
result.add(idempotentOr(Filters.or(a)));
if (result.size() > maxAllowedFilters) {
throw new CNFFilterExplosionException("Exceeded maximum allowed filters for CNF (conjunctive normal form) conversion");
}
}
} else {
List<Filter> work = new ArrayList<>(result);
Expand All @@ -178,11 +205,14 @@ private static void generateAllCombinations(
a.add(child);
// Result must receive an actual OrFilter.
result.add(idempotentOr(Filters.or(a)));
if (result.size() > maxAllowedFilters) {
throw new CNFFilterExplosionException("Exceeded maximum allowed filters for CNF (conjunctive normal form) conversion");
}
}
}
}
if (andList.size() > 1) {
generateAllCombinations(result, andList.subList(1, andList.size()), nonAndList);
generateAllCombinations(result, andList.subList(1, andList.size()), nonAndList, maxAllowedFilters);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
import org.apache.druid.segment.filter.cnf.CNFFilterExplosionException;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -101,7 +102,13 @@ public static JoinFilterPreAnalysis computeJoinFilterPreAnalysis(final JoinFilte
return preAnalysisBuilder.build();
}

List<Filter> normalizedOrClauses = Filters.toNormalizedOrClauses(key.getFilter());
List<Filter> normalizedOrClauses;
try {
normalizedOrClauses = Filters.toNormalizedOrClauses(key.getFilter());
}
catch (CNFFilterExplosionException cnfFilterExplosionException) {
return preAnalysisBuilder.build(); // disable the filter pushdown and rewrite optimization
}

List<Filter> normalizedBaseTableClauses = new ArrayList<>();
List<Filter> normalizedJoinTableClauses = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.filter.cnf.CNFFilterExplosionException;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
Expand Down Expand Up @@ -367,7 +368,12 @@ private Filter makeFilter(final DimFilter dimFilter)

final DimFilter maybeOptimized = optimize ? dimFilter.optimize() : dimFilter;
final Filter filter = maybeOptimized.toFilter();
return cnf ? Filters.toCnf(filter) : filter;
try {
return cnf ? Filters.toCnf(filter) : filter;
}
catch (CNFFilterExplosionException cnfFilterExplosionException) {
throw new RuntimeException(cnfFilterExplosionException);
}
}

private DimFilter maybeOptimize(final DimFilter dimFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.filter.cnf.CNFFilterExplosionException;
import org.apache.druid.segment.filter.cnf.CalciteCnfHelper;
import org.apache.druid.segment.filter.cnf.HiveCnfHelper;
import org.junit.Assert;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testFlattenUnflattenable()
}

@Test
public void testToCnfWithMuchReducibleFilter()
public void testToCnfWithMuchReducibleFilter() throws CNFFilterExplosionException
{
final Filter muchReducible = FilterTestUtils.and(
// should be flattened
Expand Down Expand Up @@ -158,7 +159,7 @@ public void testToCnfWithMuchReducibleFilter()
}

@Test
public void testToNormalizedOrClausesWithMuchReducibleFilter()
public void testToNormalizedOrClausesWithMuchReducibleFilter() throws CNFFilterExplosionException
{
final Filter muchReducible = FilterTestUtils.and(
// should be flattened
Expand Down Expand Up @@ -203,7 +204,7 @@ public void testToNormalizedOrClausesWithMuchReducibleFilter()
}

@Test
public void testToCnfWithComplexFilterIncludingNotAndOr()
public void testToCnfWithComplexFilterIncludingNotAndOr() throws CNFFilterExplosionException
{
final Filter filter = FilterTestUtils.and(
FilterTestUtils.or(
Expand Down Expand Up @@ -307,7 +308,7 @@ public void testToCnfWithComplexFilterIncludingNotAndOr()
}

@Test
public void testToNormalizedOrClausesWithComplexFilterIncludingNotAndOr()
public void testToNormalizedOrClausesWithComplexFilterIncludingNotAndOr() throws CNFFilterExplosionException
{
final Filter filter = FilterTestUtils.and(
FilterTestUtils.or(
Expand Down Expand Up @@ -411,7 +412,7 @@ public void testToNormalizedOrClausesWithComplexFilterIncludingNotAndOr()
}

@Test
public void testToCnfCollapsibleBigFilter()
public void testToCnfCollapsibleBigFilter() throws CNFFilterExplosionException
{
List<Filter> ands = new ArrayList<>();
List<Filter> ors = new ArrayList<>();
Expand Down Expand Up @@ -477,7 +478,7 @@ public void testPullNotPullableFilter()
}

@Test
public void testToCnfFilterThatPullCannotConvertToCnfProperly()
public void testToCnfFilterThatPullCannotConvertToCnfProperly() throws CNFFilterExplosionException
{
final Filter filter = FilterTestUtils.or(
FilterTestUtils.and(
Expand Down Expand Up @@ -507,7 +508,7 @@ public void testToCnfFilterThatPullCannotConvertToCnfProperly()
}

@Test
public void testToNormalizedOrClausesNonAndFilterShouldReturnSingleton()
public void testToNormalizedOrClausesNonAndFilterShouldReturnSingleton() throws CNFFilterExplosionException
{
Filter filter = FilterTestUtils.or(
FilterTestUtils.selector("col1", "val1"),
Expand All @@ -528,6 +529,78 @@ public void testTrueFalseFilterRequiredColumnRewrite()
Assert.assertEquals(FalseFilter.instance(), FalseFilter.instance().rewriteRequiredColumns(ImmutableMap.of()));
}

@Test(expected = CNFFilterExplosionException.class)
public void testExceptionOnCNFFilterExplosion() throws CNFFilterExplosionException
{
Filter filter = FilterTestUtils.or(
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val2")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val3"),
FilterTestUtils.selector("col2", "val4")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val1"),
FilterTestUtils.selector("col2", "val3")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val3"),
FilterTestUtils.selector("col2", "val2")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val5"),
FilterTestUtils.selector("col2", "val6")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val5"),
FilterTestUtils.selector("col2", "val7")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val6"),
FilterTestUtils.selector("col2", "val7")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val6"),
FilterTestUtils.selector("col2", "val8")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val7"),
FilterTestUtils.selector("col2", "val9")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val8"),
FilterTestUtils.selector("col2", "val9")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val4"),
FilterTestUtils.selector("col2", "val9")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val4"),
FilterTestUtils.selector("col2", "val8")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val5"),
FilterTestUtils.selector("col2", "val2")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val5"),
FilterTestUtils.selector("col2", "val1")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val7"),
FilterTestUtils.selector("col2", "val0")
),
FilterTestUtils.and(
FilterTestUtils.selector("col1", "val9"),
FilterTestUtils.selector("col2", "val8")
)
);
Filters.toNormalizedOrClauses(filter);
}

private void assertFilter(Filter original, Filter expectedConverted, Filter actualConverted)
{
assertEquivalent(original, expectedConverted);
Expand Down
Loading