Skip to content

Commit

Permalink
Fix TPC-DS 41 - normalize predicates before pulling them out.
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed May 6, 2016
1 parent 0fd3a47 commit f0871c9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.planning.IntegerIndex
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
Expand Down Expand Up @@ -958,7 +959,8 @@ class Analyzer(
localPredicateReferences -- p.outputSet
}

val transformed = sub transformUp {
// Simplify the predicates before pulling them out.
val transformed = BooleanSimplification(sub) transformUp {
case f @ Filter(cond, child) =>
// Find all predicates with an outer reference.
val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
RewriteScalarSubqueriesInFilter,
RewriteCorrelatedScalarSubquery,
EliminateSerialization) ::
Batch("Decimal Optimizations", fixedPoint,
Expand Down Expand Up @@ -1645,3 +1646,30 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
}
}
}

/**
* Rewrite [[Filter]] plans that contain correlated [[ScalarSubquery]] expressions. When these
* correlated [[ScalarSubquery]] expressions are wrapped in a some Predicate expression, we rewrite
* them into [[PredicateSubquery]] expressions.
*/
object RewriteScalarSubqueriesInFilter extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, child) =>
val newCond = f.condition.transformUp {
case e if e.dataType == BooleanType =>
val scalars = ArrayBuffer.empty[ScalarSubquery]
val newExpr = e.transform {
case s: ScalarSubquery if s.children.nonEmpty =>
scalars += s
s.query.output.head
}
scalars match {
case Seq(ScalarSubquery(query, conditions, exprId)) =>
PredicateSubquery(query, conditions :+ newExpr, nullAware = false, exprId)
case _ =>
e
}
}
Filter(newCond, f.child)
}
}
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
assert(msg1.getMessage.contains(
"The correlated scalar subquery can only contain equality predicates"))
}

test("disjunctive correlated scalar subquery") {
checkAnswer(
sql("""
|select a
|from l
|where (select count(*)
| from r
| where (a = c and d = 2.0) or (a = c and d = 1.0)) > 0
""".stripMargin),
Row(3) :: Nil)
}
}

0 comments on commit f0871c9

Please sign in to comment.