From 863c5ec43defeaf99914f251fa1c351d6c136f80 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 25 Feb 2016 02:23:46 +0800 Subject: [PATCH] Don't push predicate through project with nondeterministic field(s) --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 +++- .../catalyst/optimizer/FilterPushdownSuite.scala | 13 ++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1554382840a48..37e68046588b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -804,7 +804,9 @@ object SimplifyFilters extends Rule[LogicalPlan] { */ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case filter @ Filter(condition, project @ Project(fields, grandChild)) => + case filter @ Filter(condition, project @ Project(fields, grandChild)) + if fields.forall(_.deterministic) => + // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). val aliasMap = AttributeMap(fields.collect { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 7805723ec86e2..7fac56605ba98 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -145,7 +145,7 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("nondeterministic: can't push down filter through project") { + test("nondeterministic: can't push down filter with nondeterministic condition through project") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) .where('rand > 5 || 'a > 5) @@ -156,6 +156,17 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } + test("nondeterministic: can't push down filter through project with nondeterministic field") { + val originalQuery = testRelation + .select(Rand(10).as('rand), 'a) + .where('a > 5) + .analyze + + val optimized = Optimize.execute(originalQuery) + + comparePlans(optimized, originalQuery) + } + test("nondeterministic: push down part of filter through project") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a)