From eaec2f5f2b4e3193de41655b84a1dc936b0e50a3 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Fri, 13 Jul 2018 14:32:01 -0700 Subject: [PATCH 1/7] [SPARK-24802] Optimization Rule Exclusion --- .../sql/catalyst/optimizer/Optimizer.scala | 65 ++++++++++---- .../apache/spark/sql/internal/SQLConf.scala | 10 +++ .../OptimizerRuleExclusionSuite.scala | 84 +++++++++++++++++++ 3 files changed, 143 insertions(+), 16 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2cc27d82f7d20..24908a1ed8df5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable +import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} @@ -36,7 +37,7 @@ import org.apache.spark.util.Utils * Optimizers can override this. */ abstract class Optimizer(sessionCatalog: SessionCatalog) - extends RuleExecutor[LogicalPlan] { + extends RuleExecutor[LogicalPlan] with Logging { // Check for structural integrity of the plan in test mode. Currently we only check if a plan is // still resolved after the execution of each rule. @@ -46,7 +47,23 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) - def batches: Seq[Batch] = { + protected def postAnalysisBatches: Seq[Batch] = { + Batch("Eliminate Distinct", Once, EliminateDistinct) :: + // Technically some of the rules in Finish Analysis are not optimizer rules and belong more + // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). + // However, because we also use the analyzer to canonicalized queries (for view definition), + // we do not eliminate subqueries or compute current time in the analyzer. + Batch("Finish Analysis", Once, + EliminateSubqueryAliases, + EliminateView, + ReplaceExpressions, + ComputeCurrentTime, + GetCurrentDatabase(sessionCatalog), + RewriteDistinctAggregates, + ReplaceDeduplicateWithAggregate) :: Nil + } + + protected def optimizationBatches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down @@ -100,19 +117,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) rulesWithoutInferFiltersFromConstraints: _*) :: Nil } - (Batch("Eliminate Distinct", Once, EliminateDistinct) :: - // Technically some of the rules in Finish Analysis are not optimizer rules and belong more - // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). - // However, because we also use the analyzer to canonicalized queries (for view definition), - // we do not eliminate subqueries or compute current time in the analyzer. - Batch("Finish Analysis", Once, - EliminateSubqueryAliases, - EliminateView, - ReplaceExpressions, - ComputeCurrentTime, - GetCurrentDatabase(sessionCatalog), - RewriteDistinctAggregates, - ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here ////////////////////////////////////////////////////////////////////////////////////////// @@ -121,7 +125,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // extra operators between two adjacent Union operators. // - Call CombineUnions again in Batch("Operator Optimizations"), // since the other rules might make two separate Unions operators adjacent. - Batch("Union", Once, + (Batch("Union", Once, CombineUnions) :: Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: @@ -175,6 +179,35 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) * Override to provide additional rules for the operator optimization batch. */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + + override def batches: Seq[Batch] = { + val excludedRules = + SQLConf.get.optimizerExcludedRules.toSeq.flatMap(_.split(",").map(_.trim).filter(!_.isEmpty)) + val filteredOptimizationBatches = if (excludedRules.isEmpty) { + optimizationBatches + } else { + optimizationBatches.flatMap { batch => + val filteredRules = + batch.rules.filter { rule => + val exclude = excludedRules.contains(rule.ruleName) + if (exclude) { + logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") + } + !exclude + } + if (batch.rules == filteredRules) { + Some(batch) + } else if (filteredRules.nonEmpty) { + Some(Batch(batch.name, batch.strategy, filteredRules: _*)) + } else { + logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + + s"as all enclosed rules have been excluded.") + None + } + } + } + postAnalysisBatches ++ filteredOptimizationBatches + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e2c48e2d8a14c..dcd9d056bc06c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -127,6 +127,14 @@ object SQLConf { } } + val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules") + .doc("Configures a list of rules to be disabled in the optimizer, in which the rules are " + + "specified by their rule names and separated by comma. It is not guaranteed that all the " + + "rules in this configuration will eventually be excluded, as some rules are necessary " + + "for correctness. The optimizer will log the rules that have indeed been excluded.") + .stringConf + .createOptional + val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations") .internal() .doc("The max number of iterations the optimizer and analyzer runs.") @@ -1383,6 +1391,8 @@ class SQLConf extends Serializable with Logging { /** ************************ Spark SQL Params/Hints ******************* */ + def optimizerExcludedRules: Option[String] = getConf(OPTIMIZER_EXCLUDED_RULES) + def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS) def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala new file mode 100644 index 0000000000000..5075d35589437 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_EXCLUDED_RULES + + +class OptimizerRuleExclusionSuite extends PlanTest { + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + private def verifyExcludedRules(excludedRuleNames: Seq[String]) { + val optimizer = new SimpleTestOptimizer() + // Batches whose rules are all to be excluded should be removed as a whole. + val excludedBatchNames = optimizer.batches + .filter(batch => batch.rules.forall(rule => excludedRuleNames.contains(rule.ruleName))) + .map(_.name) + + withSQLConf( + OPTIMIZER_EXCLUDED_RULES.key -> excludedRuleNames.foldLeft("")((l, r) => l + "," + r)) { + val batches = optimizer.batches + assert(batches.forall(batch => !excludedBatchNames.contains(batch.name))) + assert( + batches + .forall(batch => batch.rules.forall(rule => !excludedRuleNames.contains(rule.ruleName)))) + } + } + + test("Exclude a single rule from multiple batches") { + verifyExcludedRules( + Seq( + PushPredicateThroughJoin.ruleName)) + } + + test("Exclude multiple rules from single or multiple batches") { + verifyExcludedRules( + Seq( + CombineUnions.ruleName, + RemoveLiteralFromGroupExpressions.ruleName, + RemoveRepetitionFromGroupExpressions.ruleName)) + } + + test("Exclude non-existent rule with other valid rules") { + verifyExcludedRules( + Seq( + LimitPushDown.ruleName, + InferFiltersFromConstraints.ruleName, + "DummyRuleName")) + } + + test("Verify optimized plan after excluding CombineUnions rule") { + val excludedRules = Seq( + ConvertToLocalRelation.ruleName, + PropagateEmptyRelation.ruleName, + CombineUnions.ruleName) + + withSQLConf( + OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) { + val optimizer = new SimpleTestOptimizer() + val originalQuery = testRelation.union(testRelation.union(testRelation)).analyze + val optimized = optimizer.execute(originalQuery) + comparePlans(originalQuery, optimized) + } + } +} From 84f1a6b5cba08df8684179e9d7195545be655e76 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Tue, 17 Jul 2018 23:13:50 -0700 Subject: [PATCH 2/7] Address review comments --- .../sql/catalyst/optimizer/Optimizer.scala | 84 +++++++++++-------- .../OptimizerRuleExclusionSuite.scala | 17 ++++ 2 files changed, 65 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 24908a1ed8df5..c8257d48454b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable -import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} @@ -37,7 +36,7 @@ import org.apache.spark.util.Utils * Optimizers can override this. */ abstract class Optimizer(sessionCatalog: SessionCatalog) - extends RuleExecutor[LogicalPlan] with Logging { + extends RuleExecutor[LogicalPlan] { // Check for structural integrity of the plan in test mode. Currently we only check if a plan is // still resolved after the execution of each rule. @@ -47,23 +46,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) - protected def postAnalysisBatches: Seq[Batch] = { - Batch("Eliminate Distinct", Once, EliminateDistinct) :: - // Technically some of the rules in Finish Analysis are not optimizer rules and belong more - // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). - // However, because we also use the analyzer to canonicalized queries (for view definition), - // we do not eliminate subqueries or compute current time in the analyzer. - Batch("Finish Analysis", Once, - EliminateSubqueryAliases, - EliminateView, - ReplaceExpressions, - ComputeCurrentTime, - GetCurrentDatabase(sessionCatalog), - RewriteDistinctAggregates, - ReplaceDeduplicateWithAggregate) :: Nil - } - - protected def optimizationBatches: Seq[Batch] = { + def defaultBatches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down @@ -117,6 +100,19 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) rulesWithoutInferFiltersFromConstraints: _*) :: Nil } + (Batch("Eliminate Distinct", Once, EliminateDistinct) :: + // Technically some of the rules in Finish Analysis are not optimizer rules and belong more + // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). + // However, because we also use the analyzer to canonicalized queries (for view definition), + // we do not eliminate subqueries or compute current time in the analyzer. + Batch("Finish Analysis", Once, + EliminateSubqueryAliases, + EliminateView, + ReplaceExpressions, + ComputeCurrentTime, + GetCurrentDatabase(sessionCatalog), + RewriteDistinctAggregates, + ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here ////////////////////////////////////////////////////////////////////////////////////////// @@ -125,7 +121,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // extra operators between two adjacent Union operators. // - Call CombineUnions again in Batch("Operator Optimizations"), // since the other rules might make two separate Unions operators adjacent. - (Batch("Union", Once, + Batch("Union", Once, CombineUnions) :: Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: @@ -164,6 +160,13 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) UpdateNullabilityInAttributeReferences) } + def nonExcludableBatches: Seq[String] = + "Eliminate Distinct" :: + "Finish Analysis" :: + "Replace Operators" :: + "Pullup Correlated Expressions" :: + "RewriteSubquery" :: Nil + /** * Optimize all the subqueries inside expression. */ @@ -183,30 +186,39 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) override def batches: Seq[Batch] = { val excludedRules = SQLConf.get.optimizerExcludedRules.toSeq.flatMap(_.split(",").map(_.trim).filter(!_.isEmpty)) - val filteredOptimizationBatches = if (excludedRules.isEmpty) { - optimizationBatches + if (excludedRules.isEmpty) { + defaultBatches } else { - optimizationBatches.flatMap { batch => - val filteredRules = - batch.rules.filter { rule => - val exclude = excludedRules.contains(rule.ruleName) - if (exclude) { - logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") + defaultBatches.flatMap { batch => + if (nonExcludableBatches.contains(batch.name)) { + batch.rules.foreach { rule => + if (excludedRules.contains(rule.ruleName)) { + logWarning(s"Optimization rule '${rule.ruleName}' cannot be excluded from the " + + s"non-excludable batch ${batch.name}.") } - !exclude } - if (batch.rules == filteredRules) { Some(batch) - } else if (filteredRules.nonEmpty) { - Some(Batch(batch.name, batch.strategy, filteredRules: _*)) } else { - logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + - s"as all enclosed rules have been excluded.") - None + val filteredRules = + batch.rules.filter { rule => + val exclude = excludedRules.contains(rule.ruleName) + if (exclude) { + logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") + } + !exclude + } + if (batch.rules == filteredRules) { + Some(batch) + } else if (filteredRules.nonEmpty) { + Some(Batch(batch.name, batch.strategy, filteredRules: _*)) + } else { + logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + + s"as all enclosed rules have been excluded.") + None + } } } } - postAnalysisBatches ++ filteredOptimizationBatches } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala index 5075d35589437..f48b0a557b275 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala @@ -67,6 +67,23 @@ class OptimizerRuleExclusionSuite extends PlanTest { "DummyRuleName")) } + test("Exclude rules from a non-excluable batch") { + val excludedRules = Seq( + ReplaceIntersectWithSemiJoin.ruleName, + PullupCorrelatedPredicates.ruleName) + + val optimizer = new SimpleTestOptimizer() + + withSQLConf( + OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) { + excludedRules.foreach { excludedRule => + assert( + optimizer.batches + .exists(batch => batch.rules.exists(rule => rule.ruleName == excludedRule))) + } + } + } + test("Verify optimized plan after excluding CombineUnions rule") { val excludedRules = Seq( ConvertToLocalRelation.ruleName, From ff23edf81a4a78d1589ed582a1802b94a8ebf4c6 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Fri, 20 Jul 2018 19:37:54 -0700 Subject: [PATCH 3/7] Address review comments --- .../sql/catalyst/optimizer/Optimizer.scala | 71 +++++++++++-------- 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c8257d48454b4..0d2d20ad1691f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -160,12 +160,24 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) UpdateNullabilityInAttributeReferences) } - def nonExcludableBatches: Seq[String] = - "Eliminate Distinct" :: - "Finish Analysis" :: - "Replace Operators" :: - "Pullup Correlated Expressions" :: - "RewriteSubquery" :: Nil + def nonExcludableRules: Seq[String] = + EliminateDistinct.ruleName :: + EliminateSubqueryAliases.ruleName :: + EliminateView.ruleName :: + ReplaceExpressions.ruleName :: + ComputeCurrentTime.ruleName :: + GetCurrentDatabase(sessionCatalog).ruleName :: + RewriteDistinctAggregates.ruleName :: + ReplaceDeduplicateWithAggregate.ruleName :: + ReplaceIntersectWithSemiJoin.ruleName :: + ReplaceExceptWithFilter.ruleName :: + ReplaceExceptWithAntiJoin.ruleName :: + ReplaceDistinctWithAggregate.ruleName :: + PullupCorrelatedPredicates.ruleName :: + RewritePredicateSubquery.ruleName :: + ColumnPruning.ruleName :: + CollapseProject.ruleName :: + RemoveRedundantProject.ruleName :: Nil /** * Optimize all the subqueries inside expression. @@ -184,38 +196,35 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil override def batches: Seq[Batch] = { - val excludedRules = - SQLConf.get.optimizerExcludedRules.toSeq.flatMap(_.split(",").map(_.trim).filter(!_.isEmpty)) + val excludedRulesConf = + SQLConf.get.optimizerExcludedRules.toSeq.flatMap(_.split(",").map(_.trim).filter(_.nonEmpty)) + val excludedRules = excludedRulesConf.filter { ruleName => + val nonExcludable = nonExcludableRules.contains(ruleName) + if (nonExcludable) { + logWarning(s"Optimization rule '${ruleName}' was not excluded from the optimizer " + + s"because this rule is a non-excludable rule.") + } + !nonExcludable + } if (excludedRules.isEmpty) { defaultBatches } else { defaultBatches.flatMap { batch => - if (nonExcludableBatches.contains(batch.name)) { - batch.rules.foreach { rule => - if (excludedRules.contains(rule.ruleName)) { - logWarning(s"Optimization rule '${rule.ruleName}' cannot be excluded from the " + - s"non-excludable batch ${batch.name}.") - } + val filteredRules = batch.rules.filter { rule => + val exclude = excludedRules.contains(rule.ruleName) + if (exclude) { + logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") } + !exclude + } + if (batch.rules == filteredRules) { Some(batch) + } else if (filteredRules.nonEmpty) { + Some(Batch(batch.name, batch.strategy, filteredRules: _*)) } else { - val filteredRules = - batch.rules.filter { rule => - val exclude = excludedRules.contains(rule.ruleName) - if (exclude) { - logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") - } - !exclude - } - if (batch.rules == filteredRules) { - Some(batch) - } else if (filteredRules.nonEmpty) { - Some(Batch(batch.name, batch.strategy, filteredRules: _*)) - } else { - logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + - s"as all enclosed rules have been excluded.") - None - } + logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + + s"as all enclosed rules have been excluded.") + None } } } From b154979236e211dc7185ca8e450493f0c6b0f469 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Fri, 20 Jul 2018 19:41:21 -0700 Subject: [PATCH 4/7] change test name --- .../sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala index f48b0a557b275..5a5396e6f58b0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala @@ -67,7 +67,7 @@ class OptimizerRuleExclusionSuite extends PlanTest { "DummyRuleName")) } - test("Exclude rules from a non-excluable batch") { + test("Try to exclude a non-excludable rule") { val excludedRules = Seq( ReplaceIntersectWithSemiJoin.ruleName, PullupCorrelatedPredicates.ruleName) From 87afe4fbcaf71d303b07612f9ceb9ad25dd3dcda Mon Sep 17 00:00:00 2001 From: maryannxue Date: Sun, 22 Jul 2018 17:35:01 -0700 Subject: [PATCH 5/7] address review comments --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0d2d20ad1691f..09ec63d045d76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -174,10 +174,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ReplaceExceptWithAntiJoin.ruleName :: ReplaceDistinctWithAggregate.ruleName :: PullupCorrelatedPredicates.ruleName :: - RewritePredicateSubquery.ruleName :: - ColumnPruning.ruleName :: - CollapseProject.ruleName :: - RemoveRedundantProject.ruleName :: Nil + RewritePredicateSubquery.ruleName :: Nil /** * Optimize all the subqueries inside expression. From 39b6ce9548c99363e81cb246b4cbe5534d710f3e Mon Sep 17 00:00:00 2001 From: maryannxue Date: Sun, 22 Jul 2018 21:28:00 -0700 Subject: [PATCH 6/7] address review comments --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 09ec63d045d76..6faecd3efc40d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -194,7 +194,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) override def batches: Seq[Batch] = { val excludedRulesConf = - SQLConf.get.optimizerExcludedRules.toSeq.flatMap(_.split(",").map(_.trim).filter(_.nonEmpty)) + SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq) val excludedRules = excludedRulesConf.filter { ruleName => val nonExcludable = nonExcludableRules.contains(ruleName) if (nonExcludable) { From 3730053d7386188042b2f2d4bd6784c3de722df6 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Wed, 25 Jul 2018 13:08:19 -0700 Subject: [PATCH 7/7] Extend rule-exclusion to Optimizer sub-classes, esp. SparkOptimizer --- .../sql/catalyst/optimizer/Optimizer.scala | 24 ++++++++- .../optimizer/OptimizerExtendableSuite.scala | 2 +- .../OptimizerRuleExclusionSuite.scala | 53 ++++++++++++++----- ...mizerStructuralIntegrityCheckerSuite.scala | 2 +- .../spark/sql/execution/SparkOptimizer.scala | 5 +- 5 files changed, 69 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index adb1350adc261..3c264eb8586b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -46,6 +46,13 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) + /** + * Defines the default rule batches in the Optimizer. + * + * Implementations of this class should override this method, and [[nonExcludableRules]] if + * necessary, instead of [[batches]]. The rule batches that eventually run in the Optimizer, + * i.e., returned by [[batches]], will be (defaultBatches - (excludedRules - nonExcludableRules)). + */ def defaultBatches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( @@ -160,6 +167,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) UpdateNullabilityInAttributeReferences) } + /** + * Defines rules that cannot be excluded from the Optimizer even if they are specified in + * SQL config "excludedRules". + * + * Implementations of this class can override this method if necessary. The rule batches + * that eventually run in the Optimizer, i.e., returned by [[batches]], will be + * (defaultBatches - (excludedRules - nonExcludableRules)). + */ def nonExcludableRules: Seq[String] = EliminateDistinct.ruleName :: EliminateSubqueryAliases.ruleName :: @@ -202,7 +217,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil - override def batches: Seq[Batch] = { + /** + * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that + * eventually run in the Optimizer. + * + * Implementations of this class should override [[defaultBatches]], and [[nonExcludableRules]] + * if necessary, instead of this method. + */ + final override def batches: Seq[Batch] = { val excludedRulesConf = SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq) val excludedRules = excludedRulesConf.filter { ruleName => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala index 7112c033eabce..36b083a540c3c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala @@ -47,7 +47,7 @@ class OptimizerExtendableSuite extends SparkFunSuite { DummyRule) :: Nil } - override def batches: Seq[Batch] = super.batches ++ myBatches + override def defaultBatches: Seq[Batch] = super.defaultBatches ++ myBatches } test("Extending batches possible") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala index 5a5396e6f58b0..30c80d26b67a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala @@ -28,8 +28,10 @@ class OptimizerRuleExclusionSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) - private def verifyExcludedRules(excludedRuleNames: Seq[String]) { - val optimizer = new SimpleTestOptimizer() + private def verifyExcludedRules(optimizer: Optimizer, rulesToExclude: Seq[String]) { + val nonExcludableRules = optimizer.nonExcludableRules + + val excludedRuleNames = rulesToExclude.filter(!nonExcludableRules.contains(_)) // Batches whose rules are all to be excluded should be removed as a whole. val excludedBatchNames = optimizer.batches .filter(batch => batch.rules.forall(rule => excludedRuleNames.contains(rule.ruleName))) @@ -38,21 +40,31 @@ class OptimizerRuleExclusionSuite extends PlanTest { withSQLConf( OPTIMIZER_EXCLUDED_RULES.key -> excludedRuleNames.foldLeft("")((l, r) => l + "," + r)) { val batches = optimizer.batches + // Verify removed batches. assert(batches.forall(batch => !excludedBatchNames.contains(batch.name))) + // Verify removed rules. assert( batches .forall(batch => batch.rules.forall(rule => !excludedRuleNames.contains(rule.ruleName)))) + // Verify non-excludable rules retained. + nonExcludableRules.foreach { nonExcludableRule => + assert( + optimizer.batches + .exists(batch => batch.rules.exists(rule => rule.ruleName == nonExcludableRule))) + } } } test("Exclude a single rule from multiple batches") { verifyExcludedRules( + new SimpleTestOptimizer(), Seq( PushPredicateThroughJoin.ruleName)) } test("Exclude multiple rules from single or multiple batches") { verifyExcludedRules( + new SimpleTestOptimizer(), Seq( CombineUnions.ruleName, RemoveLiteralFromGroupExpressions.ruleName, @@ -61,6 +73,7 @@ class OptimizerRuleExclusionSuite extends PlanTest { test("Exclude non-existent rule with other valid rules") { verifyExcludedRules( + new SimpleTestOptimizer(), Seq( LimitPushDown.ruleName, InferFiltersFromConstraints.ruleName, @@ -68,20 +81,34 @@ class OptimizerRuleExclusionSuite extends PlanTest { } test("Try to exclude a non-excludable rule") { - val excludedRules = Seq( - ReplaceIntersectWithSemiJoin.ruleName, - PullupCorrelatedPredicates.ruleName) + verifyExcludedRules( + new SimpleTestOptimizer(), + Seq( + ReplaceIntersectWithSemiJoin.ruleName, + PullupCorrelatedPredicates.ruleName)) + } - val optimizer = new SimpleTestOptimizer() + test("Custom optimizer") { + val optimizer = new SimpleTestOptimizer() { + override def defaultBatches: Seq[Batch] = + Batch("push", Once, + PushDownPredicate, + PushPredicateThroughJoin, + PushProjectionThroughUnion) :: + Batch("pull", Once, + PullupCorrelatedPredicates) :: Nil - withSQLConf( - OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) { - excludedRules.foreach { excludedRule => - assert( - optimizer.batches - .exists(batch => batch.rules.exists(rule => rule.ruleName == excludedRule))) - } + override def nonExcludableRules: Seq[String] = + PushDownPredicate.ruleName :: + PullupCorrelatedPredicates.ruleName :: Nil } + + verifyExcludedRules( + optimizer, + Seq( + PushDownPredicate.ruleName, + PushProjectionThroughUnion.ruleName, + PullupCorrelatedPredicates.ruleName)) } test("Verify optimized plan after excluding CombineUnions rule") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index 6e183d81b7265..a22a81e9844d3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -44,7 +44,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { EmptyFunctionRegistry, new SQLConf())) { val newBatch = Batch("OptimizeRuleBreakSI", Once, OptimizeRuleBreakSI) - override def batches: Seq[Batch] = Seq(newBatch) ++ super.batches + override def defaultBatches: Seq[Batch] = Seq(newBatch) ++ super.defaultBatches } test("check for invalid plan after execution of rule") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 00ff4c8ac310b..64d3f2cdbfa82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -28,13 +28,16 @@ class SparkOptimizer( experimentalMethods: ExperimentalMethods) extends Optimizer(catalog) { - override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+ + override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++ postHocOptimizationBatches :+ Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) + override def nonExcludableRules: Seq[String] = + super.nonExcludableRules :+ ExtractPythonUDFFromAggregate.ruleName + /** * Optimization batches that are executed before the regular optimization batches (also before * the finish analysis batch).