From f0871c921285a05602cf566c9f2c23901224d73e Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 6 May 2016 15:39:43 +0200 Subject: [PATCH 1/3] Fix TPC-DS 41 - normalize predicates before pulling them out. --- .../sql/catalyst/analysis/Analyzer.scala | 4 ++- .../sql/catalyst/optimizer/Optimizer.scala | 28 +++++++++++++++++++ .../org/apache/spark/sql/SubquerySuite.scala | 12 ++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 527d5b635a7f9..9e9a856286533 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} @@ -958,7 +959,8 @@ class Analyzer( localPredicateReferences -- p.outputSet } - val transformed = sub transformUp { + // Simplify the predicates before pulling them out. + val transformed = BooleanSimplification(sub) transformUp { case f @ Filter(cond, child) => // Find all predicates with an outer reference. val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a3ab89dc71145..3a4a428a78129 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -101,6 +101,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) EliminateSorts, SimplifyCasts, SimplifyCaseConversionExpressions, + RewriteScalarSubqueriesInFilter, RewriteCorrelatedScalarSubquery, EliminateSerialization) :: Batch("Decimal Optimizations", fixedPoint, @@ -1645,3 +1646,30 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } } } + +/** + * Rewrite [[Filter]] plans that contain correlated [[ScalarSubquery]] expressions. When these + * correlated [[ScalarSubquery]] expressions are wrapped in a some Predicate expression, we rewrite + * them into [[PredicateSubquery]] expressions. + */ +object RewriteScalarSubqueriesInFilter extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case f @ Filter(condition, child) => + val newCond = f.condition.transformUp { + case e if e.dataType == BooleanType => + val scalars = ArrayBuffer.empty[ScalarSubquery] + val newExpr = e.transform { + case s: ScalarSubquery if s.children.nonEmpty => + scalars += s + s.query.output.head + } + scalars match { + case Seq(ScalarSubquery(query, conditions, exprId)) => + PredicateSubquery(query, conditions :+ newExpr, nullAware = false, exprId) + case _ => + e + } + } + Filter(newCond, f.child) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 80bb4e05385f4..17ac0c8c6e496 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -281,4 +281,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(msg1.getMessage.contains( "The correlated scalar subquery can only contain equality predicates")) } + + test("disjunctive correlated scalar subquery") { + checkAnswer( + sql(""" + |select a + |from l + |where (select count(*) + | from r + | where (a = c and d = 2.0) or (a = c and d = 1.0)) > 0 + """.stripMargin), + Row(3) :: Nil) + } } From b487925dea798a2ae624242f3d9a18aa84f0b6fc Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Fri, 6 May 2016 23:13:01 +0200 Subject: [PATCH 2/3] Move boolean simplification & remove rewrite. --- .../sql/catalyst/analysis/Analyzer.scala | 1 - .../analysis/BooleanSimplification.scala | 124 ++++++++++++++++ .../sql/catalyst/optimizer/Optimizer.scala | 132 +----------------- .../optimizer/CombiningLimitsSuite.scala | 1 + .../optimizer/ConstantFoldingSuite.scala | 2 +- .../optimizer/FilterPushdownSuite.scala | 2 +- .../optimizer/JoinOptimizationSuite.scala | 2 +- .../optimizer/LimitPushdownSuite.scala | 2 +- .../catalyst/optimizer/OptimizeInSuite.scala | 2 +- 9 files changed, 131 insertions(+), 137 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BooleanSimplification.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9e9a856286533..54033688f26e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BooleanSimplification.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BooleanSimplification.scala new file mode 100644 index 0000000000000..715b12032ffc3 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BooleanSimplification.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.Literal._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * Simplifies boolean expressions: + * 1. Simplifies expressions whose answer can be determined without evaluating both sides. + * 2. Eliminates / extracts common factors. + * 3. Merge same expressions + * 4. Removes `Not` operator. + */ +object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case q: LogicalPlan => q transformExpressionsUp { + case TrueLiteral And e => e + case e And TrueLiteral => e + case FalseLiteral Or e => e + case e Or FalseLiteral => e + + case FalseLiteral And _ => FalseLiteral + case _ And FalseLiteral => FalseLiteral + case TrueLiteral Or _ => TrueLiteral + case _ Or TrueLiteral => TrueLiteral + + case a And b if a.semanticEquals(b) => a + case a Or b if a.semanticEquals(b) => a + + case a And (b Or c) if Not(a).semanticEquals(b) => And(a, c) + case a And (b Or c) if Not(a).semanticEquals(c) => And(a, b) + case (a Or b) And c if a.semanticEquals(Not(c)) => And(b, c) + case (a Or b) And c if b.semanticEquals(Not(c)) => And(a, c) + + case a Or (b And c) if Not(a).semanticEquals(b) => Or(a, c) + case a Or (b And c) if Not(a).semanticEquals(c) => Or(a, b) + case (a And b) Or c if a.semanticEquals(Not(c)) => Or(b, c) + case (a And b) Or c if b.semanticEquals(Not(c)) => Or(a, c) + + // Common factor elimination for conjunction + case and @ (left And right) => + // 1. Split left and right to get the disjunctive predicates, + // i.e. lhs = (a, b), rhs = (a, c) + // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) + // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) + // 4. Apply the formula, get the optimized predicate: common || (ldiff && rdiff) + val lhs = splitDisjunctivePredicates(left) + val rhs = splitDisjunctivePredicates(right) + val common = lhs.filter(e => rhs.exists(e.semanticEquals)) + if (common.isEmpty) { + // No common factors, return the original predicate + and + } else { + val ldiff = lhs.filterNot(e => common.exists(e.semanticEquals)) + val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals)) + if (ldiff.isEmpty || rdiff.isEmpty) { + // (a || b || c || ...) && (a || b) => (a || b) + common.reduce(Or) + } else { + // (a || b || c || ...) && (a || b || d || ...) => + // ((c || ...) && (d || ...)) || a || b + (common :+ And(ldiff.reduce(Or), rdiff.reduce(Or))).reduce(Or) + } + } + + // Common factor elimination for disjunction + case or @ (left Or right) => + // 1. Split left and right to get the conjunctive predicates, + // i.e. lhs = (a, b), rhs = (a, c) + // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) + // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) + // 4. Apply the formula, get the optimized predicate: common && (ldiff || rdiff) + val lhs = splitConjunctivePredicates(left) + val rhs = splitConjunctivePredicates(right) + val common = lhs.filter(e => rhs.exists(e.semanticEquals)) + if (common.isEmpty) { + // No common factors, return the original predicate + or + } else { + val ldiff = lhs.filterNot(e => common.exists(e.semanticEquals)) + val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals)) + if (ldiff.isEmpty || rdiff.isEmpty) { + // (a && b) || (a && b && c && ...) => a && b + common.reduce(And) + } else { + // (a && b && c && ...) || (a && b && d && ...) => + // ((c && ...) || (d && ...)) && a && b + (common :+ Or(ldiff.reduce(And), rdiff.reduce(And))).reduce(And) + } + } + + case Not(TrueLiteral) => FalseLiteral + case Not(FalseLiteral) => TrueLiteral + + case Not(a GreaterThan b) => LessThanOrEqual(a, b) + case Not(a GreaterThanOrEqual b) => LessThan(a, b) + + case Not(a LessThan b) => GreaterThanOrEqual(a, b) + case Not(a LessThanOrEqual b) => GreaterThan(a, b) + + case Not(a Or b) => And(Not(a), Not(b)) + case Not(a And b) => Or(Not(a), Not(b)) + + case Not(Not(e)) => e + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3a4a428a78129..0c7fc349f2661 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -22,7 +22,7 @@ import scala.collection.immutable.HashSet import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -101,7 +101,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) EliminateSorts, SimplifyCasts, SimplifyCaseConversionExpressions, - RewriteScalarSubqueriesInFilter, RewriteCorrelatedScalarSubquery, EliminateSerialization) :: Batch("Decimal Optimizations", fixedPoint, @@ -689,108 +688,6 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { } } -/** - * Simplifies boolean expressions: - * 1. Simplifies expressions whose answer can be determined without evaluating both sides. - * 2. Eliminates / extracts common factors. - * 3. Merge same expressions - * 4. Removes `Not` operator. - */ -object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case q: LogicalPlan => q transformExpressionsUp { - case TrueLiteral And e => e - case e And TrueLiteral => e - case FalseLiteral Or e => e - case e Or FalseLiteral => e - - case FalseLiteral And _ => FalseLiteral - case _ And FalseLiteral => FalseLiteral - case TrueLiteral Or _ => TrueLiteral - case _ Or TrueLiteral => TrueLiteral - - case a And b if a.semanticEquals(b) => a - case a Or b if a.semanticEquals(b) => a - - case a And (b Or c) if Not(a).semanticEquals(b) => And(a, c) - case a And (b Or c) if Not(a).semanticEquals(c) => And(a, b) - case (a Or b) And c if a.semanticEquals(Not(c)) => And(b, c) - case (a Or b) And c if b.semanticEquals(Not(c)) => And(a, c) - - case a Or (b And c) if Not(a).semanticEquals(b) => Or(a, c) - case a Or (b And c) if Not(a).semanticEquals(c) => Or(a, b) - case (a And b) Or c if a.semanticEquals(Not(c)) => Or(b, c) - case (a And b) Or c if b.semanticEquals(Not(c)) => Or(a, c) - - // Common factor elimination for conjunction - case and @ (left And right) => - // 1. Split left and right to get the disjunctive predicates, - // i.e. lhs = (a, b), rhs = (a, c) - // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) - // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) - // 4. Apply the formula, get the optimized predicate: common || (ldiff && rdiff) - val lhs = splitDisjunctivePredicates(left) - val rhs = splitDisjunctivePredicates(right) - val common = lhs.filter(e => rhs.exists(e.semanticEquals)) - if (common.isEmpty) { - // No common factors, return the original predicate - and - } else { - val ldiff = lhs.filterNot(e => common.exists(e.semanticEquals)) - val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals)) - if (ldiff.isEmpty || rdiff.isEmpty) { - // (a || b || c || ...) && (a || b) => (a || b) - common.reduce(Or) - } else { - // (a || b || c || ...) && (a || b || d || ...) => - // ((c || ...) && (d || ...)) || a || b - (common :+ And(ldiff.reduce(Or), rdiff.reduce(Or))).reduce(Or) - } - } - - // Common factor elimination for disjunction - case or @ (left Or right) => - // 1. Split left and right to get the conjunctive predicates, - // i.e. lhs = (a, b), rhs = (a, c) - // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) - // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) - // 4. Apply the formula, get the optimized predicate: common && (ldiff || rdiff) - val lhs = splitConjunctivePredicates(left) - val rhs = splitConjunctivePredicates(right) - val common = lhs.filter(e => rhs.exists(e.semanticEquals)) - if (common.isEmpty) { - // No common factors, return the original predicate - or - } else { - val ldiff = lhs.filterNot(e => common.exists(e.semanticEquals)) - val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals)) - if (ldiff.isEmpty || rdiff.isEmpty) { - // (a && b) || (a && b && c && ...) => a && b - common.reduce(And) - } else { - // (a && b && c && ...) || (a && b && d && ...) => - // ((c && ...) || (d && ...)) && a && b - (common :+ Or(ldiff.reduce(And), rdiff.reduce(And))).reduce(And) - } - } - - case Not(TrueLiteral) => FalseLiteral - case Not(FalseLiteral) => TrueLiteral - - case Not(a GreaterThan b) => LessThanOrEqual(a, b) - case Not(a GreaterThanOrEqual b) => LessThan(a, b) - - case Not(a LessThan b) => GreaterThanOrEqual(a, b) - case Not(a LessThanOrEqual b) => GreaterThan(a, b) - - case Not(a Or b) => And(Not(a), Not(b)) - case Not(a And b) => Or(Not(a), Not(b)) - - case Not(Not(e)) => e - } - } -} - /** * Simplifies binary comparisons with semantically-equal expressions: * 1) Replace '<=>' with 'true' literal. @@ -1646,30 +1543,3 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } } } - -/** - * Rewrite [[Filter]] plans that contain correlated [[ScalarSubquery]] expressions. When these - * correlated [[ScalarSubquery]] expressions are wrapped in a some Predicate expression, we rewrite - * them into [[PredicateSubquery]] expressions. - */ -object RewriteScalarSubqueriesInFilter extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case f @ Filter(condition, child) => - val newCond = f.condition.transformUp { - case e if e.dataType == BooleanType => - val scalars = ArrayBuffer.empty[ScalarSubquery] - val newExpr = e.transform { - case s: ScalarSubquery if s.children.nonEmpty => - scalars += s - s.query.output.head - } - scalars match { - case Seq(ScalarSubquery(query, conditions, exprId)) => - PredicateSubquery(query, conditions :+ newExpr, nullAware = false, exprId) - case _ => - e - } - } - Filter(newCond, f.child) - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 87ad81db11b64..34151e5a8c0d7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.analysis.BooleanSimplification import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index d9655bbcc2ce1..e9591da7988b3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index fcc14a803beaa..6b696a3449965 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index c1ebf8b09e08d..dcdcafa17393e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Expression diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index dcbc79365c3aa..1e4d974779154 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Add diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index f1a4ea8280ab7..16bd8694a9e86 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases, UnresolvedAttribute} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ From e28bbb6c90aef0a17caab5db8072327fcf93e59d Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sat, 7 May 2016 04:35:00 +0200 Subject: [PATCH 3/3] Revert: Move boolean simplification & remove rewrite. --- .../sql/catalyst/analysis/Analyzer.scala | 1 + .../analysis/BooleanSimplification.scala | 124 ------------------ .../sql/catalyst/optimizer/Optimizer.scala | 104 ++++++++++++++- .../optimizer/CombiningLimitsSuite.scala | 1 - .../optimizer/ConstantFoldingSuite.scala | 2 +- .../optimizer/FilterPushdownSuite.scala | 2 +- .../optimizer/JoinOptimizationSuite.scala | 2 +- .../optimizer/LimitPushdownSuite.scala | 2 +- .../catalyst/optimizer/OptimizeInSuite.scala | 2 +- 9 files changed, 109 insertions(+), 131 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BooleanSimplification.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 54033688f26e6..9e9a856286533 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BooleanSimplification.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BooleanSimplification.scala deleted file mode 100644 index 715b12032ffc3..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BooleanSimplification.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.catalyst.analysis - -import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or, PredicateHelper} -import org.apache.spark.sql.catalyst.expressions.Literal._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule - -/** - * Simplifies boolean expressions: - * 1. Simplifies expressions whose answer can be determined without evaluating both sides. - * 2. Eliminates / extracts common factors. - * 3. Merge same expressions - * 4. Removes `Not` operator. - */ -object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case q: LogicalPlan => q transformExpressionsUp { - case TrueLiteral And e => e - case e And TrueLiteral => e - case FalseLiteral Or e => e - case e Or FalseLiteral => e - - case FalseLiteral And _ => FalseLiteral - case _ And FalseLiteral => FalseLiteral - case TrueLiteral Or _ => TrueLiteral - case _ Or TrueLiteral => TrueLiteral - - case a And b if a.semanticEquals(b) => a - case a Or b if a.semanticEquals(b) => a - - case a And (b Or c) if Not(a).semanticEquals(b) => And(a, c) - case a And (b Or c) if Not(a).semanticEquals(c) => And(a, b) - case (a Or b) And c if a.semanticEquals(Not(c)) => And(b, c) - case (a Or b) And c if b.semanticEquals(Not(c)) => And(a, c) - - case a Or (b And c) if Not(a).semanticEquals(b) => Or(a, c) - case a Or (b And c) if Not(a).semanticEquals(c) => Or(a, b) - case (a And b) Or c if a.semanticEquals(Not(c)) => Or(b, c) - case (a And b) Or c if b.semanticEquals(Not(c)) => Or(a, c) - - // Common factor elimination for conjunction - case and @ (left And right) => - // 1. Split left and right to get the disjunctive predicates, - // i.e. lhs = (a, b), rhs = (a, c) - // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) - // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) - // 4. Apply the formula, get the optimized predicate: common || (ldiff && rdiff) - val lhs = splitDisjunctivePredicates(left) - val rhs = splitDisjunctivePredicates(right) - val common = lhs.filter(e => rhs.exists(e.semanticEquals)) - if (common.isEmpty) { - // No common factors, return the original predicate - and - } else { - val ldiff = lhs.filterNot(e => common.exists(e.semanticEquals)) - val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals)) - if (ldiff.isEmpty || rdiff.isEmpty) { - // (a || b || c || ...) && (a || b) => (a || b) - common.reduce(Or) - } else { - // (a || b || c || ...) && (a || b || d || ...) => - // ((c || ...) && (d || ...)) || a || b - (common :+ And(ldiff.reduce(Or), rdiff.reduce(Or))).reduce(Or) - } - } - - // Common factor elimination for disjunction - case or @ (left Or right) => - // 1. Split left and right to get the conjunctive predicates, - // i.e. lhs = (a, b), rhs = (a, c) - // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) - // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) - // 4. Apply the formula, get the optimized predicate: common && (ldiff || rdiff) - val lhs = splitConjunctivePredicates(left) - val rhs = splitConjunctivePredicates(right) - val common = lhs.filter(e => rhs.exists(e.semanticEquals)) - if (common.isEmpty) { - // No common factors, return the original predicate - or - } else { - val ldiff = lhs.filterNot(e => common.exists(e.semanticEquals)) - val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals)) - if (ldiff.isEmpty || rdiff.isEmpty) { - // (a && b) || (a && b && c && ...) => a && b - common.reduce(And) - } else { - // (a && b && c && ...) || (a && b && d && ...) => - // ((c && ...) || (d && ...)) && a && b - (common :+ Or(ldiff.reduce(And), rdiff.reduce(And))).reduce(And) - } - } - - case Not(TrueLiteral) => FalseLiteral - case Not(FalseLiteral) => TrueLiteral - - case Not(a GreaterThan b) => LessThanOrEqual(a, b) - case Not(a GreaterThanOrEqual b) => LessThan(a, b) - - case Not(a LessThan b) => GreaterThanOrEqual(a, b) - case Not(a LessThanOrEqual b) => GreaterThan(a, b) - - case Not(a Or b) => And(Not(a), Not(b)) - case Not(a And b) => Or(Not(a), Not(b)) - - case Not(Not(e)) => e - } - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0c7fc349f2661..a3ab89dc71145 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -22,7 +22,7 @@ import scala.collection.immutable.HashSet import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -688,6 +688,108 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { } } +/** + * Simplifies boolean expressions: + * 1. Simplifies expressions whose answer can be determined without evaluating both sides. + * 2. Eliminates / extracts common factors. + * 3. Merge same expressions + * 4. Removes `Not` operator. + */ +object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case q: LogicalPlan => q transformExpressionsUp { + case TrueLiteral And e => e + case e And TrueLiteral => e + case FalseLiteral Or e => e + case e Or FalseLiteral => e + + case FalseLiteral And _ => FalseLiteral + case _ And FalseLiteral => FalseLiteral + case TrueLiteral Or _ => TrueLiteral + case _ Or TrueLiteral => TrueLiteral + + case a And b if a.semanticEquals(b) => a + case a Or b if a.semanticEquals(b) => a + + case a And (b Or c) if Not(a).semanticEquals(b) => And(a, c) + case a And (b Or c) if Not(a).semanticEquals(c) => And(a, b) + case (a Or b) And c if a.semanticEquals(Not(c)) => And(b, c) + case (a Or b) And c if b.semanticEquals(Not(c)) => And(a, c) + + case a Or (b And c) if Not(a).semanticEquals(b) => Or(a, c) + case a Or (b And c) if Not(a).semanticEquals(c) => Or(a, b) + case (a And b) Or c if a.semanticEquals(Not(c)) => Or(b, c) + case (a And b) Or c if b.semanticEquals(Not(c)) => Or(a, c) + + // Common factor elimination for conjunction + case and @ (left And right) => + // 1. Split left and right to get the disjunctive predicates, + // i.e. lhs = (a, b), rhs = (a, c) + // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) + // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) + // 4. Apply the formula, get the optimized predicate: common || (ldiff && rdiff) + val lhs = splitDisjunctivePredicates(left) + val rhs = splitDisjunctivePredicates(right) + val common = lhs.filter(e => rhs.exists(e.semanticEquals)) + if (common.isEmpty) { + // No common factors, return the original predicate + and + } else { + val ldiff = lhs.filterNot(e => common.exists(e.semanticEquals)) + val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals)) + if (ldiff.isEmpty || rdiff.isEmpty) { + // (a || b || c || ...) && (a || b) => (a || b) + common.reduce(Or) + } else { + // (a || b || c || ...) && (a || b || d || ...) => + // ((c || ...) && (d || ...)) || a || b + (common :+ And(ldiff.reduce(Or), rdiff.reduce(Or))).reduce(Or) + } + } + + // Common factor elimination for disjunction + case or @ (left Or right) => + // 1. Split left and right to get the conjunctive predicates, + // i.e. lhs = (a, b), rhs = (a, c) + // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) + // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) + // 4. Apply the formula, get the optimized predicate: common && (ldiff || rdiff) + val lhs = splitConjunctivePredicates(left) + val rhs = splitConjunctivePredicates(right) + val common = lhs.filter(e => rhs.exists(e.semanticEquals)) + if (common.isEmpty) { + // No common factors, return the original predicate + or + } else { + val ldiff = lhs.filterNot(e => common.exists(e.semanticEquals)) + val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals)) + if (ldiff.isEmpty || rdiff.isEmpty) { + // (a && b) || (a && b && c && ...) => a && b + common.reduce(And) + } else { + // (a && b && c && ...) || (a && b && d && ...) => + // ((c && ...) || (d && ...)) && a && b + (common :+ Or(ldiff.reduce(And), rdiff.reduce(And))).reduce(And) + } + } + + case Not(TrueLiteral) => FalseLiteral + case Not(FalseLiteral) => TrueLiteral + + case Not(a GreaterThan b) => LessThanOrEqual(a, b) + case Not(a GreaterThanOrEqual b) => LessThan(a, b) + + case Not(a LessThan b) => GreaterThanOrEqual(a, b) + case Not(a LessThanOrEqual b) => GreaterThan(a, b) + + case Not(a Or b) => And(Not(a), Not(b)) + case Not(a And b) => Or(Not(a), Not(b)) + + case Not(Not(e)) => e + } + } +} + /** * Simplifies binary comparisons with semantically-equal expressions: * 1) Replace '<=>' with 'true' literal. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 34151e5a8c0d7..87ad81db11b64 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.BooleanSimplification import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index e9591da7988b3..d9655bbcc2ce1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 6b696a3449965..fcc14a803beaa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index dcdcafa17393e..c1ebf8b09e08d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Expression diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index 1e4d974779154..dcbc79365c3aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Add diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 16bd8694a9e86..f1a4ea8280ab7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{BooleanSimplification, EliminateSubqueryAliases, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._