Skip to content

Commit

Permalink
Remove any Filters with DynamicPruning that didn't filter on partitio…
Browse files Browse the repository at this point in the history
…n column
  • Loading branch information
wangyum committed Aug 21, 2020
1 parent bcc81be commit 98f7275
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ class SparkOptimizer(
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
Batch("Pushdown Filters from PartitionPruning before Inferring Filters", fixedPoint,
PushDownPredicates) :+
Batch("Infer Filters from PartitionPruning", Once,
InferFiltersFromConstraints) :+
Batch("Pushdown Filters from PartitionPruning after Inferring Filters", fixedPoint,
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
CleanupDynamicPruningFilters,
PruneFilters)) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.dynamicpruning

import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, DynamicPruningSubquery, Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
Expand All @@ -32,12 +32,29 @@ import org.apache.spark.sql.internal.SQLConf
*/
object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelper {

private def isFilterOnNonPartition(condition: Expression, child: LogicalPlan): Boolean = {
splitConjunctivePredicates(condition).exists {
case DynamicPruningSubquery(pruningKey, _, _, _, _, _) =>
PartitionPruning.getPartitionTableScan(pruningKey, child).isEmpty
case _ => false
}
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!SQLConf.get.dynamicPartitionPruningEnabled) {
return plan
}

plan.transform {
// Remove any Filters with DynamicPruning that didn't filter on partition column`.
// This is inferred by Infer Filters from PartitionPruning.
case f @ Filter(condition, child) if isFilterOnNonPartition(condition, child) =>
val newCondition = condition.transform {
case DynamicPruningSubquery(pruningKey, _, _, _, _, _)
if PartitionPruning.getPartitionTableScan(pruningKey, child).isEmpty =>
TrueLiteral
}
f.copy(condition = newCondition)
// pass through anything that is pushed down into PhysicalOperation
case p @ PhysicalOperation(_, _, LogicalRelation(_: HadoopFsRelation, _, _, _)) => p
// remove any Filters with DynamicPruning that didn't get pushed down to PhysicalOperation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
}

// the pruning overhead is the total size in bytes of all scan relations
val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum.toFloat
val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum

filterRatio * partPlan.stats.sizeInBytes.toFloat > overhead.toFloat
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,7 @@ abstract class DynamicPartitionPruningSuiteBase
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 2)
checkDistinctSubqueries(df, 1)
checkPartitionPruningPredicate(df, false, true)
assert(!checkUnpushedFilters(df), "Inferred dynamic pruning expression has been pushed down.")

checkAnswer(df,
Row(2, 20, 2) ::
Expand All @@ -1324,6 +1325,7 @@ abstract class DynamicPartitionPruningSuiteBase

assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1)
checkPartitionPruningPredicate(df, false, true)
assert(!checkUnpushedFilters(df), "Inferred dynamic pruning expression should be removed.")

checkAnswer(df,
Row(2, "NL", 2) ::
Expand Down

0 comments on commit 98f7275

Please sign in to comment.