Skip to content

Commit

Permalink
Merge branch 'palantir-master' into rk/more-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Kruszewski committed Feb 1, 2019
2 parents 1f9772d + a51fa9c commit a23eb35
Show file tree
Hide file tree
Showing 23 changed files with 843 additions and 414 deletions.
5 changes: 4 additions & 1 deletion FORK.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
* core: Broadcast, CoarseGrainedExecutorBackend, CoarseGrainedSchedulerBackend, Executor, MemoryStore, SparkContext, TorrentBroadcast
* kubernetes: ExecutorPodsAllocator, ExecutorPodsLifecycleManager, ExecutorPodsPollingSnapshotSource, ExecutorPodsSnapshot, ExecutorPodsWatchSnapshotSource, KubernetesClusterSchedulerBackend
* yarn: YarnClusterSchedulerBackend, YarnSchedulerBackend

* [SPARK-26626](https://issues.apache.org/jira/browse/SPARK-26626) - Limited the maximum size of repeatedly substituted aliases

# Added

* Gradle plugin to easily create custom docker images for use with k8s
Expand All @@ -27,4 +30,4 @@
* [SPARK-25908](https://issues.apache.org/jira/browse/SPARK-25908) - Removal of `monotonicall_increasing_id`, `toDegree`, `toRadians`, `approxCountDistinct`, `unionAll`
* [SPARK-25862](https://issues.apache.org/jira/browse/SPARK-25862) - Removal of `unboundedPreceding`, `unboundedFollowing`, `currentRow`
* [SPARK-26127](https://issues.apache.org/jira/browse/SPARK-26127) - Removal of deprecated setters from tree regression and classification models
* [SPARK-25867](https://issues.apache.org/jira/browse/SPARK-25867) - Removal of KMeans computeCost
* [SPARK-25867](https://issues.apache.org/jira/browse/SPARK-25867) - Removal of KMeans computeCost
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ package org.apache.spark
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,8 @@ object CollapseProject extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p1 @ Project(_, p2: Project) =>
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) {
if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
hasOversizedRepeatedAliases(p1.projectList, p2.projectList)) {
p1
} else {
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))
Expand Down Expand Up @@ -682,6 +683,28 @@ object CollapseProject extends Rule[LogicalPlan] {
}.exists(!_.deterministic))
}

private def hasOversizedRepeatedAliases(
upper: Seq[NamedExpression], lower: Seq[NamedExpression]): Boolean = {
val aliases = collectAliases(lower)

// Count how many times each alias is used in the upper Project.
// If an alias is only used once, we can safely substitute it without increasing the overall
// tree size
val referenceCounts = AttributeMap(
upper
.flatMap(_.collect { case a: Attribute => a })
.groupBy(identity)
.mapValues(_.size).toSeq
)

// Check for any aliases that are used more than once, and are larger than the configured
// maximum size
aliases.exists({ case (attribute, expression) =>
referenceCounts.getOrElse(attribute, 0) > 1 &&
expression.treeSize > SQLConf.get.maxRepeatedAliasSize
})
}

private def buildCleanedProjectList(
upper: Seq[NamedExpression],
lower: Seq[NamedExpression]): Seq[NamedExpression] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf

/**
* A pattern that matches any number of project or filter operations on top of another relational
Expand Down Expand Up @@ -58,8 +59,13 @@ object PhysicalOperation extends PredicateHelper {
plan match {
case Project(fields, child) if fields.forall(_.deterministic) =>
val (_, filters, other, aliases) = collectProjectsAndFilters(child)
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))
if (hasOversizedRepeatedAliases(fields, aliases)) {
// Skip substitution if it could overly increase the overall tree size and risk OOMs
(None, Nil, plan, Map.empty)
} else {
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))
}

case Filter(condition, child) if condition.deterministic =>
val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
Expand All @@ -77,6 +83,26 @@ object PhysicalOperation extends PredicateHelper {
case a @ Alias(child, _) => a.toAttribute -> child
}.toMap

private def hasOversizedRepeatedAliases(fields: Seq[Expression],
aliases: Map[Attribute, Expression]): Boolean = {
// Count how many times each alias is used in the fields.
// If an alias is only used once, we can safely substitute it without increasing the overall
// tree size
val referenceCounts = AttributeMap(
fields
.flatMap(_.collect { case a: Attribute => a })
.groupBy(identity)
.mapValues(_.size).toSeq
)

// Check for any aliases that are used more than once, and are larger than the configured
// maximum size
aliases.exists({ case (attribute, expression) =>
referenceCounts.getOrElse(attribute, 0) > 1 &&
expression.treeSize > SQLConf.get.maxRepeatedAliasSize
})
}

private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = {
expr.transform {
case a @ Alias(ref: AttributeReference, name) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {

lazy val containsChild: Set[TreeNode[_]] = children.toSet

lazy val treeSize: Long = children.map(_.treeSize).sum + 1

private lazy val _hashCode: Int = scala.util.hashing.MurmurHash3.productHash(this)
override def hashCode(): Int = _hashCode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,19 @@ object SQLConf {

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
.internal()
.doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
"not be provided to ExchangeCoordinator.")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(1)

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.")
.intConf
.createWithDefault(-1)
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(500)

val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
Expand Down Expand Up @@ -1621,6 +1626,15 @@ object SQLConf {
""" "... N more fields" placeholder.""")
.intConf
.createWithDefault(25)

val MAX_REPEATED_ALIAS_SIZE =
buildConf("spark.sql.maxRepeatedAliasSize")
.internal()
.doc("The maximum size of alias expression that will be substituted multiple times " +
"(size defined by the number of nodes in the expression tree). " +
"Used by the CollapseProject optimizer, and PhysicalOperation.")
.intConf
.createWithDefault(100)
}

/**
Expand Down Expand Up @@ -1726,8 +1740,9 @@ class SQLConf extends Serializable with Logging {

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS)

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

Expand Down Expand Up @@ -2053,6 +2068,8 @@ class SQLConf extends Serializable with Logging {

def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)

def maxRepeatedAliasSize: Int = getConf(SQLConf.MAX_REPEATED_ALIAS_SIZE)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,22 @@ class CollapseProjectSuite extends PlanTest {
assert(projects.size === 1)
assert(hasMetadata(optimized))
}

test("ensure oversize aliases are not repeatedly substituted") {
var query: LogicalPlan = testRelation
for( a <- 1 to 100) {
query = query.select(('a + 'b).as('a), ('a - 'b).as('b))
}
val projects = Optimize.execute(query.analyze).collect { case p: Project => p }
assert(projects.size >= 12)
}

test("ensure oversize aliases are still substituted once") {
var query: LogicalPlan = testRelation
for( a <- 1 to 20) {
query = query.select(('a + 'b).as('a), 'b)
}
val projects = Optimize.execute(query.analyze).collect { case p: Project => p }
assert(projects.size === 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.PlanQueryStage
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
Expand Down Expand Up @@ -96,7 +97,11 @@ class QueryExecution(
* row format conversions as needed.
*/
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
} else {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
}
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
Expand All @@ -107,6 +112,15 @@ class QueryExecution(
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf),
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
// only transform node in a sub-tree.
PlanQueryStage(sparkSession.sessionState.conf))

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: AnalysisException => e.toString }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.QueryStageInput
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo

Expand Down Expand Up @@ -51,6 +52,7 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case i: QueryStageInput => i.childStage :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

/**
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it adds a
* QueryStage and a QueryStageInput. If reusing Exchange is enabled, it finds duplicated exchanges
* and uses the same QueryStage for all the references.
*/
case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {

val newPlan = if (!conf.exchangeReuseEnabled) {
plan.transformUp {
case e: ShuffleExchangeExec =>
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
case e: BroadcastExchangeExec =>
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
}
} else {
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val stages = mutable.HashMap[StructType, ArrayBuffer[QueryStage]]()

plan.transformUp {
case exchange: Exchange =>
val sameSchema = stages.getOrElseUpdate(exchange.schema, ArrayBuffer[QueryStage]())
val samePlan = sameSchema.find { s =>
exchange.sameResult(s.child)
}
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
exchange match {
case e: ShuffleExchangeExec => ShuffleQueryStageInput(
samePlan.get.asInstanceOf[ShuffleQueryStage], exchange.output)
case e: BroadcastExchangeExec => BroadcastQueryStageInput(
samePlan.get.asInstanceOf[BroadcastQueryStage], exchange.output)
}
} else {
val queryStageInput = exchange match {
case e: ShuffleExchangeExec =>
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
case e: BroadcastExchangeExec =>
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
}
sameSchema += queryStageInput.childStage
queryStageInput
}
}
}
ResultQueryStage(newPlan)
}
}
Loading

0 comments on commit a23eb35

Please sign in to comment.