Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16994][SQL] Whitelist operators for predicate pushdown #14713

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1208,17 +1208,28 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

// two filters should be combine together by other rules
case filter @ Filter(_, _: Filter) => filter
// should not push predicates through sample, or will generate different results.
case filter @ Filter(_, _: Sample) => filter

case filter @ Filter(condition, u: UnaryNode) if u.expressions.forall(_.deterministic) =>
case filter @ Filter(condition, u: UnaryNode)
if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownPredicate(filter, u.child) { predicate =>
u.withNewChildren(Seq(Filter(predicate, u.child)))
}
}

private def canPushThrough(p: UnaryNode): Boolean = p match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about Project?

Copy link
Contributor

@hvanhovell hvanhovell Aug 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Project, Aggregate, Window & Union are treated as special cases. (was wondering the same)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added some comment

// Note that some operators (e.g. project, aggregate, union) are being handled separately
// (earlier in this rule).
case _: AppendColumns => true
case _: BroadcastHint => true
case _: Distinct => true
case _: Generate => true
case _: Pivot => true
case _: RedistributeData => true
case _: Repartition => true
case _: ScriptTransformation => true
case _: Sort => true
case _ => false
}

private def pushDownPredicate(
filter: Filter,
grandchild: LogicalPlan)(insertFilter: Expression => LogicalPlan): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ class FilterPushdownSuite extends PlanTest {
assert(optimized == correctAnswer)
}

test("SPARK-16994: filter should not be pushed through limit") {
val originalQuery = testRelation.limit(10).where('a === 1).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, originalQuery)
}

test("can't push without rewrite") {
val originalQuery =
testRelation
Expand Down
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/limit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ select * from testdata limit key > 3;
-- limit must be integer
select * from testdata limit true;
select * from testdata limit 'a';

-- limit within a subquery
select * from (select * from range(10) limit 5) where id > 3;
10 changes: 9 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/limit.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 9
-- Number of queries: 10


-- !query 0
Expand Down Expand Up @@ -81,3 +81,11 @@ struct<>
-- !query 8 output
org.apache.spark.sql.AnalysisException
The limit expression must be integer type, but got string;


-- !query 9
select * from (select * from range(10) limit 5) where id > 3
-- !query 9 schema
struct<id:bigint>
-- !query 9 output
4