From a51fa9c6cdcd83685eb6f8a1f6f86a7425e0785f Mon Sep 17 00:00:00 2001 From: j-esse Date: Wed, 30 Jan 2019 13:24:14 -0800 Subject: [PATCH] Maximum repeatedly substituted alias size (#475) https://issues.apache.org/jira/browse/SPARK-26626 https://github.com/apache/spark/pull/23556 ## What changes were proposed in this pull request? This adds a `spark.sql.maxRepeatedAliasSize` config option, which specifies the maximum size of an aliased expression to be substituted (in CollapseProject and PhysicalOperation). This prevents large aliased expressions from being substituted multiple times and exploding the size of the expression tree, eventually OOMing the driver. The default config value of 100 was chosen through testing to find the optimally performant value: ![image](https://user-images.githubusercontent.com/17480705/51204201-dd285300-18b7-11e9-8781-dd698df00389.png) ## How was this patch tested? Added unit tests, and did manual testing --- FORK.md | 5 +++- .../sql/catalyst/optimizer/Optimizer.scala | 25 +++++++++++++++- .../sql/catalyst/planning/patterns.scala | 30 +++++++++++++++++-- .../spark/sql/catalyst/trees/TreeNode.scala | 2 ++ .../apache/spark/sql/internal/SQLConf.scala | 11 +++++++ .../optimizer/CollapseProjectSuite.scala | 18 +++++++++++ 6 files changed, 87 insertions(+), 4 deletions(-) diff --git a/FORK.md b/FORK.md index 0f99790529174..7efc0e7961237 100644 --- a/FORK.md +++ b/FORK.md @@ -18,6 +18,9 @@ * core: Broadcast, CoarseGrainedExecutorBackend, CoarseGrainedSchedulerBackend, Executor, MemoryStore, SparkContext, TorrentBroadcast * kubernetes: ExecutorPodsAllocator, ExecutorPodsLifecycleManager, ExecutorPodsPollingSnapshotSource, ExecutorPodsSnapshot, ExecutorPodsWatchSnapshotSource, KubernetesClusterSchedulerBackend * yarn: YarnClusterSchedulerBackend, YarnSchedulerBackend + +* [SPARK-26626](https://issues.apache.org/jira/browse/SPARK-26626) - Limited the maximum size of repeatedly substituted aliases + # Added * Gradle plugin to easily create custom docker images for use with k8s @@ -25,4 +28,4 @@ # Reverted * [SPARK-25908](https://issues.apache.org/jira/browse/SPARK-25908) - Removal of `monotonicall_increasing_id`, `toDegree`, `toRadians`, `approxCountDistinct`, `unionAll` -* [SPARK-25862](https://issues.apache.org/jira/browse/SPARK-25862) - Removal of `unboundedPreceding`, `unboundedFollowing`, `currentRow` \ No newline at end of file +* [SPARK-25862](https://issues.apache.org/jira/browse/SPARK-25862) - Removal of `unboundedPreceding`, `unboundedFollowing`, `currentRow` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a330a84a3a24f..a50a94ea2765f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -649,7 +649,8 @@ object CollapseProject extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case p1 @ Project(_, p2: Project) => - if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) { + if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) || + hasOversizedRepeatedAliases(p1.projectList, p2.projectList)) { p1 } else { p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList)) @@ -682,6 +683,28 @@ object CollapseProject extends Rule[LogicalPlan] { }.exists(!_.deterministic)) } + private def hasOversizedRepeatedAliases( + upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Boolean = { + val aliases = collectAliases(lower) + + // Count how many times each alias is used in the upper Project. + // If an alias is only used once, we can safely substitute it without increasing the overall + // tree size + val referenceCounts = AttributeMap( + upper + .flatMap(_.collect { case a: Attribute => a }) + .groupBy(identity) + .mapValues(_.size).toSeq + ) + + // Check for any aliases that are used more than once, and are larger than the configured + // maximum size + aliases.exists({ case (attribute, expression) => + referenceCounts.getOrElse(attribute, 0) > 1 && + expression.treeSize > SQLConf.get.maxRepeatedAliasSize + }) + } + private def buildCleanedProjectList( upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Seq[NamedExpression] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 84be677e438a6..4188e9976b806 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf /** * A pattern that matches any number of project or filter operations on top of another relational @@ -58,8 +59,13 @@ object PhysicalOperation extends PredicateHelper { plan match { case Project(fields, child) if fields.forall(_.deterministic) => val (_, filters, other, aliases) = collectProjectsAndFilters(child) - val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]] - (Some(substitutedFields), filters, other, collectAliases(substitutedFields)) + if (hasOversizedRepeatedAliases(fields, aliases)) { + // Skip substitution if it could overly increase the overall tree size and risk OOMs + (None, Nil, plan, Map.empty) + } else { + val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]] + (Some(substitutedFields), filters, other, collectAliases(substitutedFields)) + } case Filter(condition, child) if condition.deterministic => val (fields, filters, other, aliases) = collectProjectsAndFilters(child) @@ -77,6 +83,26 @@ object PhysicalOperation extends PredicateHelper { case a @ Alias(child, _) => a.toAttribute -> child }.toMap + private def hasOversizedRepeatedAliases(fields: Seq[Expression], + aliases: Map[Attribute, Expression]): Boolean = { + // Count how many times each alias is used in the fields. + // If an alias is only used once, we can safely substitute it without increasing the overall + // tree size + val referenceCounts = AttributeMap( + fields + .flatMap(_.collect { case a: Attribute => a }) + .groupBy(identity) + .mapValues(_.size).toSeq + ) + + // Check for any aliases that are used more than once, and are larger than the configured + // maximum size + aliases.exists({ case (attribute, expression) => + referenceCounts.getOrElse(attribute, 0) > 1 && + expression.treeSize > SQLConf.get.maxRepeatedAliasSize + }) + } + private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = { expr.transform { case a @ Alias(ref: AttributeReference, name) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index becfa8d982213..566b14d7d0e19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -87,6 +87,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { lazy val containsChild: Set[TreeNode[_]] = children.toSet + lazy val treeSize: Long = children.map(_.treeSize).sum + 1 + private lazy val _hashCode: Int = scala.util.hashing.MurmurHash3.productHash(this) override def hashCode(): Int = _hashCode diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ef68b90f60228..cf67ac1ff813c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1610,6 +1610,15 @@ object SQLConf { "WHERE, which does not follow SQL standard.") .booleanConf .createWithDefault(false) + + val MAX_REPEATED_ALIAS_SIZE = + buildConf("spark.sql.maxRepeatedAliasSize") + .internal() + .doc("The maximum size of alias expression that will be substituted multiple times " + + "(size defined by the number of nodes in the expression tree). " + + "Used by the CollapseProject optimizer, and PhysicalOperation.") + .intConf + .createWithDefault(100) } /** @@ -2038,6 +2047,8 @@ class SQLConf extends Serializable with Logging { def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG) + def maxRepeatedAliasSize: Int = getConf(SQLConf.MAX_REPEATED_ALIAS_SIZE) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala index e7a5bcee420f5..c41ad24550143 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala @@ -138,4 +138,22 @@ class CollapseProjectSuite extends PlanTest { assert(projects.size === 1) assert(hasMetadata(optimized)) } + + test("ensure oversize aliases are not repeatedly substituted") { + var query: LogicalPlan = testRelation + for( a <- 1 to 100) { + query = query.select(('a + 'b).as('a), ('a - 'b).as('b)) + } + val projects = Optimize.execute(query.analyze).collect { case p: Project => p } + assert(projects.size >= 12) + } + + test("ensure oversize aliases are still substituted once") { + var query: LogicalPlan = testRelation + for( a <- 1 to 20) { + query = query.select(('a + 'b).as('a), 'b) + } + val projects = Optimize.execute(query.analyze).collect { case p: Project => p } + assert(projects.size === 1) + } }