Skip to content

Commit

Permalink
Dynamic filtering integration with hive filter pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 authored and highker committed Sep 4, 2020
1 parent 76442b3 commit f74a84e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,32 @@ public static DynamicFilterExtractResult extractDynamicFilters(RowExpression exp
return new DynamicFilterExtractResult(staticConjuncts.build(), dynamicConjuncts.build());
}

public static RowExpression extractDynamicConjuncts(List<RowExpression> conjuncts, LogicalRowExpressions logicalRowExpressions)
{
ImmutableList.Builder<RowExpression> dynamicConjuncts = ImmutableList.builder();
for (RowExpression conjunct : conjuncts) {
Optional<DynamicFilterPlaceholder> placeholder = getPlaceholder(conjunct);
if (placeholder.isPresent()) {
dynamicConjuncts.add(conjunct);
}
}

return logicalRowExpressions.combineConjuncts(dynamicConjuncts.build());
}

public static RowExpression extractStaticConjuncts(List<RowExpression> conjuncts, LogicalRowExpressions logicalRowExpressions)
{
ImmutableList.Builder<RowExpression> staticConjuncts = ImmutableList.builder();
for (RowExpression conjunct : conjuncts) {
Optional<DynamicFilterPlaceholder> placeholder = getPlaceholder(conjunct);
if (!placeholder.isPresent()) {
staticConjuncts.add(conjunct);
}
}

return logicalRowExpressions.combineConjuncts(staticConjuncts.build());
}

public static boolean isDynamicFilter(RowExpression expression)
{
return getPlaceholder(expression).isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@
import java.util.function.Function;

import static com.facebook.presto.common.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.expressions.DynamicFilters.extractDynamicConjuncts;
import static com.facebook.presto.expressions.DynamicFilters.extractStaticConjuncts;
import static com.facebook.presto.expressions.LogicalRowExpressions.FALSE_CONSTANT;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.expressions.LogicalRowExpressions.and;
import static com.facebook.presto.expressions.LogicalRowExpressions.extractConjuncts;
import static com.facebook.presto.expressions.RowExpressionNodeInliner.replaceExpression;
import static com.facebook.presto.hive.HiveTableProperties.getHiveStorageFormat;
import static com.facebook.presto.spi.StandardErrorCode.DIVISION_BY_ZERO;
Expand Down Expand Up @@ -248,6 +251,12 @@ public static ConnectorPushdownFilterResult pushdownFilter(
.collect(toImmutableMap(HiveColumnHandle::getName, Functions.identity()));

SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName();

LogicalRowExpressions logicalRowExpressions = new LogicalRowExpressions(rowExpressionService.getDeterminismEvaluator(), functionResolution, functionMetadataManager);
List<RowExpression> conjuncts = extractConjuncts(decomposedFilter.getRemainingExpression());
RowExpression dynamicFilterExpression = extractDynamicConjuncts(conjuncts, logicalRowExpressions);
RowExpression remainingExpression = extractStaticConjuncts(conjuncts, logicalRowExpressions);

return new ConnectorPushdownFilterResult(
metadata.getTableLayout(
session,
Expand All @@ -259,7 +268,7 @@ public static ConnectorPushdownFilterResult pushdownFilter(
hivePartitionResult.getTableParameters(),
hivePartitionResult.getPartitions(),
domainPredicate,
decomposedFilter.getRemainingExpression(),
remainingExpression,
predicateColumns,
hivePartitionResult.getEnforcedConstraint(),
hivePartitionResult.getBucketHandle(),
Expand All @@ -274,7 +283,7 @@ public static ConnectorPushdownFilterResult pushdownFilter(
decomposedFilter.getRemainingExpression(),
domainPredicate),
currentLayoutHandle.map(layout -> ((HiveTableLayoutHandle) layout).getRequestedColumns()).orElse(Optional.empty()))),
TRUE_CONSTANT);
dynamicFilterExpression);
}

public static class ConnectorPushdownFilterResult
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.tests.AbstractTestJoinQueries;

import static com.facebook.presto.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveQueryRunner.createQueryRunner;
import static com.facebook.presto.hive.HiveSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST;
import static io.airlift.tpch.TpchTable.getTables;

public class TestHiveDistributedJoinQueriesWithDynamicFilteringAndFilterPushdown
extends AbstractTestJoinQueries
{
public TestHiveDistributedJoinQueriesWithDynamicFilteringAndFilterPushdown()
{
super(() -> createQueryRunner(getTables()));
}

@Override
protected Session getSession()
{
return Session.builder(super.getSession())
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true")
.setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "true")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name())
.setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "true")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ private Set<String> extractUnmatchedDynamicFilters(AbstractJoinNode node, Void c
Set<String> currentJoinDynamicFilters = node.getDynamicFilters().keySet();
Set<String> consumedProbeSide = node.getProbe().accept(this, context);
Set<String> unconsumedByProbeSide = difference(currentJoinDynamicFilters, consumedProbeSide);
verify(unconsumedByProbeSide.isEmpty(), "Dynamic filters %s present in join were not fully consumed by its probe side.", unconsumedByProbeSide);
verify(
unconsumedByProbeSide.isEmpty(),
"Dynamic filters %s present in join were not fully consumed by its probe side, currentJoinDynamicFilters is: %s, consumedProbeSide is: %s",
unconsumedByProbeSide,
currentJoinDynamicFilters,
consumedProbeSide);

Set<String> consumedBuildSide = node.getBuild().accept(this, context);
Set<String> unconsumedByBuildSide = intersection(currentJoinDynamicFilters, consumedBuildSide);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void setup()
ordersTableScanNode = builder.tableScan(ordersTableHandle, ImmutableList.of(ordersOrderKeyVariable), ImmutableMap.of(ordersOrderKeyVariable, new TpchColumnHandle("orderkey", BIGINT)));
}

@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters \\[DF\\] present in join were not fully consumed by its probe side.")
@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters \\[DF\\] present in join were not fully consumed by its probe side," +
" currentJoinDynamicFilters is: \\[DF\\], consumedProbeSide is: \\[\\]")
public void testUnconsumedDynamicFilterInJoin()
{
PlanNode root = builder.join(
Expand Down

0 comments on commit f74a84e

Please sign in to comment.