Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24802][SQL][FOLLOW-UP] Add a new config for Optimization Rule Exclusion #21876

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)

protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)

/**
* Defines the default rule batches in the Optimizer.
*
* Implementations of this class should override this method, and [[nonExcludableRules]] if
* necessary, instead of [[batches]]. The rule batches that eventually run in the Optimizer,
* i.e., returned by [[batches]], will be (defaultBatches - (excludedRules - nonExcludableRules)).
*/
def defaultBatches: Seq[Batch] = {
val operatorOptimizationRuleSet =
Seq(
Expand Down Expand Up @@ -160,6 +167,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
UpdateNullabilityInAttributeReferences)
}

/**
* Defines rules that cannot be excluded from the Optimizer even if they are specified in
* SQL config "excludedRules".
*
* Implementations of this class can override this method if necessary. The rule batches
* that eventually run in the Optimizer, i.e., returned by [[batches]], will be
* (defaultBatches - (excludedRules - nonExcludableRules)).
*/
def nonExcludableRules: Seq[String] =
EliminateDistinct.ruleName ::
EliminateSubqueryAliases.ruleName ::
Expand Down Expand Up @@ -202,7 +217,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
*/
def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil

override def batches: Seq[Batch] = {
/**
* Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that
* eventually run in the Optimizer.
*
* Implementations of this class should override [[defaultBatches]], and [[nonExcludableRules]]
* if necessary, instead of this method.
*/
final override def batches: Seq[Batch] = {
val excludedRulesConf =
SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq)
val excludedRules = excludedRulesConf.filter { ruleName =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class OptimizerExtendableSuite extends SparkFunSuite {
DummyRule) :: Nil
}

override def batches: Seq[Batch] = super.batches ++ myBatches
override def defaultBatches: Seq[Batch] = super.defaultBatches ++ myBatches
}

test("Extending batches possible") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ class OptimizerRuleExclusionSuite extends PlanTest {

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

private def verifyExcludedRules(excludedRuleNames: Seq[String]) {
val optimizer = new SimpleTestOptimizer()
private def verifyExcludedRules(optimizer: Optimizer, rulesToExclude: Seq[String]) {
val nonExcludableRules = optimizer.nonExcludableRules

val excludedRuleNames = rulesToExclude.filter(!nonExcludableRules.contains(_))
// Batches whose rules are all to be excluded should be removed as a whole.
val excludedBatchNames = optimizer.batches
.filter(batch => batch.rules.forall(rule => excludedRuleNames.contains(rule.ruleName)))
Expand All @@ -38,21 +40,31 @@ class OptimizerRuleExclusionSuite extends PlanTest {
withSQLConf(
OPTIMIZER_EXCLUDED_RULES.key -> excludedRuleNames.foldLeft("")((l, r) => l + "," + r)) {
val batches = optimizer.batches
// Verify removed batches.
assert(batches.forall(batch => !excludedBatchNames.contains(batch.name)))
// Verify removed rules.
assert(
batches
.forall(batch => batch.rules.forall(rule => !excludedRuleNames.contains(rule.ruleName))))
// Verify non-excludable rules retained.
nonExcludableRules.foreach { nonExcludableRule =>
assert(
optimizer.batches
.exists(batch => batch.rules.exists(rule => rule.ruleName == nonExcludableRule)))
}
}
}

test("Exclude a single rule from multiple batches") {
verifyExcludedRules(
new SimpleTestOptimizer(),
Seq(
PushPredicateThroughJoin.ruleName))
}

test("Exclude multiple rules from single or multiple batches") {
verifyExcludedRules(
new SimpleTestOptimizer(),
Seq(
CombineUnions.ruleName,
RemoveLiteralFromGroupExpressions.ruleName,
Expand All @@ -61,27 +73,42 @@ class OptimizerRuleExclusionSuite extends PlanTest {

test("Exclude non-existent rule with other valid rules") {
verifyExcludedRules(
new SimpleTestOptimizer(),
Seq(
LimitPushDown.ruleName,
InferFiltersFromConstraints.ruleName,
"DummyRuleName"))
}

test("Try to exclude a non-excludable rule") {
val excludedRules = Seq(
ReplaceIntersectWithSemiJoin.ruleName,
PullupCorrelatedPredicates.ruleName)
verifyExcludedRules(
new SimpleTestOptimizer(),
Seq(
ReplaceIntersectWithSemiJoin.ruleName,
PullupCorrelatedPredicates.ruleName))
}

val optimizer = new SimpleTestOptimizer()
test("Custom optimizer") {
val optimizer = new SimpleTestOptimizer() {
override def defaultBatches: Seq[Batch] =
Batch("push", Once,
PushDownPredicate,
PushPredicateThroughJoin,
PushProjectionThroughUnion) ::
Batch("pull", Once,
PullupCorrelatedPredicates) :: Nil

withSQLConf(
OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) {
excludedRules.foreach { excludedRule =>
assert(
optimizer.batches
.exists(batch => batch.rules.exists(rule => rule.ruleName == excludedRule)))
}
override def nonExcludableRules: Seq[String] =
PushDownPredicate.ruleName ::
PullupCorrelatedPredicates.ruleName :: Nil
}

verifyExcludedRules(
optimizer,
Seq(
PushDownPredicate.ruleName,
PushProjectionThroughUnion.ruleName,
PullupCorrelatedPredicates.ruleName))
}

test("Verify optimized plan after excluding CombineUnions rule") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest {
EmptyFunctionRegistry,
new SQLConf())) {
val newBatch = Batch("OptimizeRuleBreakSI", Once, OptimizeRuleBreakSI)
override def batches: Seq[Batch] = Seq(newBatch) ++ super.batches
override def defaultBatches: Seq[Batch] = Seq(newBatch) ++ super.defaultBatches
}

test("check for invalid plan after execution of rule") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ class SparkOptimizer(
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalog) {

override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
postHocOptimizationBatches :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)

override def nonExcludableRules: Seq[String] =
super.nonExcludableRules :+ ExtractPythonUDFFromAggregate.ruleName

/**
* Optimization batches that are executed before the regular optimization batches (also before
* the finish analysis batch).
Expand Down