Skip to content

Commit

Permalink
[SPARK-16994][SQL] Whitelist operators for predicate pushdown
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This patch changes predicate pushdown optimization rule (PushDownPredicate) from using a blacklist to a whitelist. That is to say, operators must be explicitly allowed. This approach is more future-proof: previously it was possible for us to introduce a new operator and then render the optimization rule incorrect.

This also fixes the bug that previously we allowed pushing filter beneath limit, which was incorrect. That is to say, before this patch, the optimizer would rewrite
```
select * from (select * from range(10) limit 5) where id > 3

to

select * from range(10) where id > 3 limit 5
```

## How was this patch tested?
- a unit test case in FilterPushdownSuite
- an end-to-end test in limit.sql

Author: Reynold Xin <[email protected]>

Closes #14713 from rxin/SPARK-16994.
  • Loading branch information
rxin authored and cloud-fan committed Aug 19, 2016
1 parent 072acf5 commit 67e59d4
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1208,17 +1208,28 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

// two filters should be combine together by other rules
case filter @ Filter(_, _: Filter) => filter
// should not push predicates through sample, or will generate different results.
case filter @ Filter(_, _: Sample) => filter

case filter @ Filter(condition, u: UnaryNode) if u.expressions.forall(_.deterministic) =>
case filter @ Filter(condition, u: UnaryNode)
if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownPredicate(filter, u.child) { predicate =>
u.withNewChildren(Seq(Filter(predicate, u.child)))
}
}

private def canPushThrough(p: UnaryNode): Boolean = p match {
// Note that some operators (e.g. project, aggregate, union) are being handled separately
// (earlier in this rule).
case _: AppendColumns => true
case _: BroadcastHint => true
case _: Distinct => true
case _: Generate => true
case _: Pivot => true
case _: RedistributeData => true
case _: Repartition => true
case _: ScriptTransformation => true
case _: Sort => true
case _ => false
}

private def pushDownPredicate(
filter: Filter,
grandchild: LogicalPlan)(insertFilter: Expression => LogicalPlan): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ class FilterPushdownSuite extends PlanTest {
assert(optimized == correctAnswer)
}

test("SPARK-16994: filter should not be pushed through limit") {
val originalQuery = testRelation.limit(10).where('a === 1).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, originalQuery)
}

test("can't push without rewrite") {
val originalQuery =
testRelation
Expand Down
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/limit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ select * from testdata limit key > 3;
-- limit must be integer
select * from testdata limit true;
select * from testdata limit 'a';

-- limit within a subquery
select * from (select * from range(10) limit 5) where id > 3;
10 changes: 9 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/limit.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 9
-- Number of queries: 10


-- !query 0
Expand Down Expand Up @@ -81,3 +81,11 @@ struct<>
-- !query 8 output
org.apache.spark.sql.AnalysisException
The limit expression must be integer type, but got string;


-- !query 9
select * from (select * from range(10) limit 5) where id > 3
-- !query 9 schema
struct<id:bigint>
-- !query 9 output
4

0 comments on commit 67e59d4

Please sign in to comment.