Skip to content

Commit

Permalink
Improve DynamicFiltersChecker to catch unsupported dynamic filters
Browse files Browse the repository at this point in the history
cherry pick of trinodb/trino@5b74033

Co-Authored-By: James Sun <[email protected]>
Co-Authored-By: Raunaq Morarka <[email protected]>
  • Loading branch information
3 people committed Aug 17, 2020
1 parent 08e260c commit 8338c0b
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,34 @@

import com.facebook.presto.Session;
import com.facebook.presto.execution.warnings.WarningCollector;
import com.facebook.presto.expressions.DynamicFilters;
import com.facebook.presto.expressions.DynamicFilters.DynamicFilterPlaceholder;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.plan.AbstractJoinNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.OutputNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.relational.Expressions;
import com.google.common.collect.ImmutableSet;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.expressions.DynamicFilters.extractDynamicFilters;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.intersection;

/**
* When dynamic filter assignments are present on a Join node, they should be consumed by a Filter node on it's probe side
* When dynamic filter assignments are present on a Join node, they should be consumed by a Filter node on its probe side
*/
public class DynamicFiltersChecker
implements PlanChecker.Checker
Expand Down Expand Up @@ -84,10 +88,12 @@ private Set<String> extractUnmatchedDynamicFilters(AbstractJoinNode node, Void c
Set<String> consumedProbeSide = node.getProbe().accept(this, context);
verify(
difference(currentJoinDynamicFilters, consumedProbeSide).isEmpty(),
"Dynamic filters present in join were not fully consumed by it's probe side.");
"Dynamic filters present in join were not fully consumed by its probe side.");

Set<String> consumedBuildSide = node.getBuild().accept(this, context);
verify(intersection(currentJoinDynamicFilters, consumedBuildSide).isEmpty());
verify(
intersection(currentJoinDynamicFilters, consumedBuildSide).isEmpty(),
"Dynamic filters present in join were consumed by its build side.");

Set<String> unmatched = new HashSet<>(consumedBuildSide);
unmatched.addAll(consumedProbeSide);
Expand All @@ -99,13 +105,21 @@ private Set<String> extractUnmatchedDynamicFilters(AbstractJoinNode node, Void c
public Set<String> visitFilter(FilterNode node, Void context)
{
ImmutableSet.Builder<String> consumed = ImmutableSet.builder();
List<DynamicFilterPlaceholder> dynamicFilters = extractDynamicFilters(node.getPredicate()).getDynamicConjuncts();
dynamicFilters.stream()
extractDynamicPredicates(node.getPredicate()).stream()
.map(DynamicFilterPlaceholder::getId)
.forEach(consumed::add);
consumed.addAll(node.getSource().accept(this, context));
return consumed.build();
}
}, null);
}

private static List<DynamicFilterPlaceholder> extractDynamicPredicates(RowExpression expression)
{
return Expressions.uniqueSubExpressions(expression).stream()
.map(DynamicFilters::getPlaceholder)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.sanity;

import com.facebook.presto.execution.warnings.WarningCollector;
import com.facebook.presto.expressions.LogicalRowExpressions;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.assertions.BasePlanTest;
import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.relational.FunctionResolution;
import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
import com.facebook.presto.testing.TestingTransactionHandle;
import com.facebook.presto.tpch.TpchColumnHandle;
import com.facebook.presto.tpch.TpchTableHandle;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Optional;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.sql.planner.optimizations.PredicatePushDown.createDynamicFilterExpression;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;

public class TestDynamicFiltersChecker
extends BasePlanTest
{
private Metadata metadata;
private LogicalRowExpressions logicalRowExpressions;
private PlanBuilder builder;
private VariableReferenceExpression lineitemOrderKeyVariable;
private TableScanNode lineitemTableScanNode;
private VariableReferenceExpression ordersOrderKeyVariable;
private TableScanNode ordersTableScanNode;

@BeforeClass
public void setup()
{
metadata = getQueryRunner().getMetadata();
logicalRowExpressions = new LogicalRowExpressions(
new RowExpressionDeterminismEvaluator(metadata.getFunctionManager()),
new FunctionResolution(metadata.getFunctionManager()),
metadata.getFunctionManager());
builder = new PlanBuilder(getQueryRunner().getDefaultSession(), new PlanNodeIdAllocator(), metadata);
ConnectorId connectorId = getCurrentConnectorId();
TableHandle lineitemTableHandle = new TableHandle(
connectorId,
new TpchTableHandle("lineitem", 1.0),
TestingTransactionHandle.create(),
Optional.empty());
lineitemOrderKeyVariable = builder.variable("LINEITEM_OK", BIGINT);
lineitemTableScanNode = builder.tableScan(lineitemTableHandle, ImmutableList.of(lineitemOrderKeyVariable), ImmutableMap.of(lineitemOrderKeyVariable, new TpchColumnHandle("orderkey", BIGINT)));

TableHandle ordersTableHandle = new TableHandle(
connectorId,
new TpchTableHandle("orders", 1.0),
TestingTransactionHandle.create(),
Optional.empty());
ordersOrderKeyVariable = builder.variable("ORDERS_OK", BIGINT);
ordersTableScanNode = builder.tableScan(ordersTableHandle, ImmutableList.of(ordersOrderKeyVariable), ImmutableMap.of(ordersOrderKeyVariable, new TpchColumnHandle("orderkey", BIGINT)));
}

@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters present in join were not fully consumed by its probe side.")
public void testUnconsumedDynamicFilterInJoin()
{
PlanNode root = builder.join(
INNER,
builder.filter(builder.rowExpression("ORDERS_OK > 0"), ordersTableScanNode),
lineitemTableScanNode,
ImmutableList.of(new JoinNode.EquiJoinClause(ordersOrderKeyVariable, lineitemOrderKeyVariable)),
ImmutableList.of(ordersOrderKeyVariable),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of("DF", lineitemOrderKeyVariable));
validatePlan(root);
}

@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters present in join were consumed by its build side.")
public void testDynamicFilterConsumedOnBuildSide()
{
PlanNode root = builder.join(
INNER,
builder.filter(
createDynamicFilterExpression("DF", ordersOrderKeyVariable, metadata.getFunctionManager()),
ordersTableScanNode),
builder.filter(
createDynamicFilterExpression("DF", ordersOrderKeyVariable, metadata.getFunctionManager()),
lineitemTableScanNode),
ImmutableList.of(new JoinNode.EquiJoinClause(ordersOrderKeyVariable, lineitemOrderKeyVariable)),
ImmutableList.of(ordersOrderKeyVariable),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of("DF", lineitemOrderKeyVariable));
validatePlan(root);
}

@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "All consumed dynamic filters could not be matched with a join.")
public void testUnmatchedDynamicFilter()
{
PlanNode root = builder.output(
ImmutableList.of(),
ImmutableList.of(),
builder.join(
INNER,
ordersTableScanNode,
builder.filter(
logicalRowExpressions.combineConjuncts(
builder.rowExpression("LINEITEM_OK > 0"),
createDynamicFilterExpression("DF", lineitemOrderKeyVariable, metadata.getFunctionManager())),
lineitemTableScanNode),
ImmutableList.of(new JoinNode.EquiJoinClause(ordersOrderKeyVariable, lineitemOrderKeyVariable)),
ImmutableList.of(ordersOrderKeyVariable),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of()));
validatePlan(root);
}

@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "All consumed dynamic filters could not be matched with a join.")
public void testUnmatchedNestedDynamicFilter()
{
PlanNode root = builder.output(
ImmutableList.of(),
ImmutableList.of(),
builder.join(
INNER,
ordersTableScanNode,
builder.filter(
logicalRowExpressions.combineConjuncts(
logicalRowExpressions.combineDisjuncts(
builder.rowExpression("LINEITEM_OK IS NULL"),
createDynamicFilterExpression("DF", lineitemOrderKeyVariable, metadata.getFunctionManager())),
logicalRowExpressions.combineDisjuncts(
builder.rowExpression("LINEITEM_OK IS NOT NULL"),
createDynamicFilterExpression("DF", lineitemOrderKeyVariable, metadata.getFunctionManager()))),
lineitemTableScanNode),
ImmutableList.of(new JoinNode.EquiJoinClause(ordersOrderKeyVariable, lineitemOrderKeyVariable)),
ImmutableList.of(ordersOrderKeyVariable),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of()));
validatePlan(root);
}

private void validatePlan(PlanNode root)
{
getQueryRunner().inTransaction(session -> {
// metadata.getCatalogHandle() registers the catalog for the transaction
session.getCatalog().ifPresent(catalog -> metadata.getCatalogHandle(session, catalog));
new DynamicFiltersChecker().validate(root, session, metadata, new SqlParser(), TypeProvider.empty(), WarningCollector.NOOP);
return null;
});
}
}

0 comments on commit 8338c0b

Please sign in to comment.