diff --git a/FORK.md b/FORK.md index 0f99790529174..7efc0e7961237 100644 --- a/FORK.md +++ b/FORK.md @@ -18,6 +18,9 @@ * core: Broadcast, CoarseGrainedExecutorBackend, CoarseGrainedSchedulerBackend, Executor, MemoryStore, SparkContext, TorrentBroadcast * kubernetes: ExecutorPodsAllocator, ExecutorPodsLifecycleManager, ExecutorPodsPollingSnapshotSource, ExecutorPodsSnapshot, ExecutorPodsWatchSnapshotSource, KubernetesClusterSchedulerBackend * yarn: YarnClusterSchedulerBackend, YarnSchedulerBackend + +* [SPARK-26626](https://issues.apache.org/jira/browse/SPARK-26626) - Limited the maximum size of repeatedly substituted aliases + # Added * Gradle plugin to easily create custom docker images for use with k8s @@ -25,4 +28,4 @@ # Reverted * [SPARK-25908](https://issues.apache.org/jira/browse/SPARK-25908) - Removal of `monotonicall_increasing_id`, `toDegree`, `toRadians`, `approxCountDistinct`, `unionAll` -* [SPARK-25862](https://issues.apache.org/jira/browse/SPARK-25862) - Removal of `unboundedPreceding`, `unboundedFollowing`, `currentRow` \ No newline at end of file +* [SPARK-25862](https://issues.apache.org/jira/browse/SPARK-25862) - Removal of `unboundedPreceding`, `unboundedFollowing`, `currentRow` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a330a84a3a24f..a50a94ea2765f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -649,7 +649,8 @@ object CollapseProject extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case p1 @ Project(_, p2: Project) => - if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) { + if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) || + hasOversizedRepeatedAliases(p1.projectList, p2.projectList)) { p1 } else { p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList)) @@ -682,6 +683,28 @@ object CollapseProject extends Rule[LogicalPlan] { }.exists(!_.deterministic)) } + private def hasOversizedRepeatedAliases( + upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Boolean = { + val aliases = collectAliases(lower) + + // Count how many times each alias is used in the upper Project. + // If an alias is only used once, we can safely substitute it without increasing the overall + // tree size + val referenceCounts = AttributeMap( + upper + .flatMap(_.collect { case a: Attribute => a }) + .groupBy(identity) + .mapValues(_.size).toSeq + ) + + // Check for any aliases that are used more than once, and are larger than the configured + // maximum size + aliases.exists({ case (attribute, expression) => + referenceCounts.getOrElse(attribute, 0) > 1 && + expression.treeSize > SQLConf.get.maxRepeatedAliasSize + }) + } + private def buildCleanedProjectList( upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Seq[NamedExpression] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 84be677e438a6..4188e9976b806 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf /** * A pattern that matches any number of project or filter operations on top of another relational @@ -58,8 +59,13 @@ object PhysicalOperation extends PredicateHelper { plan match { case Project(fields, child) if fields.forall(_.deterministic) => val (_, filters, other, aliases) = collectProjectsAndFilters(child) - val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]] - (Some(substitutedFields), filters, other, collectAliases(substitutedFields)) + if (hasOversizedRepeatedAliases(fields, aliases)) { + // Skip substitution if it could overly increase the overall tree size and risk OOMs + (None, Nil, plan, Map.empty) + } else { + val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]] + (Some(substitutedFields), filters, other, collectAliases(substitutedFields)) + } case Filter(condition, child) if condition.deterministic => val (fields, filters, other, aliases) = collectProjectsAndFilters(child) @@ -77,6 +83,26 @@ object PhysicalOperation extends PredicateHelper { case a @ Alias(child, _) => a.toAttribute -> child }.toMap + private def hasOversizedRepeatedAliases(fields: Seq[Expression], + aliases: Map[Attribute, Expression]): Boolean = { + // Count how many times each alias is used in the fields. + // If an alias is only used once, we can safely substitute it without increasing the overall + // tree size + val referenceCounts = AttributeMap( + fields + .flatMap(_.collect { case a: Attribute => a }) + .groupBy(identity) + .mapValues(_.size).toSeq + ) + + // Check for any aliases that are used more than once, and are larger than the configured + // maximum size + aliases.exists({ case (attribute, expression) => + referenceCounts.getOrElse(attribute, 0) > 1 && + expression.treeSize > SQLConf.get.maxRepeatedAliasSize + }) + } + private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = { expr.transform { case a @ Alias(ref: AttributeReference, name) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index becfa8d982213..566b14d7d0e19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -87,6 +87,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { lazy val containsChild: Set[TreeNode[_]] = children.toSet + lazy val treeSize: Long = children.map(_.treeSize).sum + 1 + private lazy val _hashCode: Int = scala.util.hashing.MurmurHash3.productHash(this) override def hashCode(): Int = _hashCode diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ef68b90f60228..cf67ac1ff813c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1610,6 +1610,15 @@ object SQLConf { "WHERE, which does not follow SQL standard.") .booleanConf .createWithDefault(false) + + val MAX_REPEATED_ALIAS_SIZE = + buildConf("spark.sql.maxRepeatedAliasSize") + .internal() + .doc("The maximum size of alias expression that will be substituted multiple times " + + "(size defined by the number of nodes in the expression tree). " + + "Used by the CollapseProject optimizer, and PhysicalOperation.") + .intConf + .createWithDefault(100) } /** @@ -2038,6 +2047,8 @@ class SQLConf extends Serializable with Logging { def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG) + def maxRepeatedAliasSize: Int = getConf(SQLConf.MAX_REPEATED_ALIAS_SIZE) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala index e7a5bcee420f5..c41ad24550143 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala @@ -138,4 +138,22 @@ class CollapseProjectSuite extends PlanTest { assert(projects.size === 1) assert(hasMetadata(optimized)) } + + test("ensure oversize aliases are not repeatedly substituted") { + var query: LogicalPlan = testRelation + for( a <- 1 to 100) { + query = query.select(('a + 'b).as('a), ('a - 'b).as('b)) + } + val projects = Optimize.execute(query.analyze).collect { case p: Project => p } + assert(projects.size >= 12) + } + + test("ensure oversize aliases are still substituted once") { + var query: LogicalPlan = testRelation + for( a <- 1 to 20) { + query = query.select(('a + 'b).as('a), 'b) + } + val projects = Optimize.execute(query.analyze).collect { case p: Project => p } + assert(projects.size === 1) + } }