Skip to content

Commit

Permalink
Merge pull request #443 from palantir/juang/cherry-pick-ae-02
Browse files Browse the repository at this point in the history
[AE2.3-02][SPARK-23128] Add QueryStage and the framework for adaptive execution (auto setting the number of reducer)
  • Loading branch information
justinuang authored Jan 29, 2019
2 parents 8bfddaf + f477a24 commit 8a4a29b
Show file tree
Hide file tree
Showing 18 changed files with 750 additions and 410 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ package org.apache.spark
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,19 @@ object SQLConf {

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
.internal()
.doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
"not be provided to ExchangeCoordinator.")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(1)

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.")
.intConf
.createWithDefault(-1)
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(500)

val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
Expand Down Expand Up @@ -1710,8 +1715,9 @@ class SQLConf extends Serializable with Logging {

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS)

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.adaptive.PlanQueryStage
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
Expand Down Expand Up @@ -84,7 +85,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
* row format conversions as needed.
*/
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
} else {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
}
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
Expand All @@ -95,6 +100,15 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf),
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
// only transform node in a sub-tree.
PlanQueryStage(sparkSession.sessionState.conf))

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: AnalysisException => e.toString }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.QueryStageInput
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo

Expand Down Expand Up @@ -51,6 +52,7 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case i: QueryStageInput => i.childStage :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

/**
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it adds a
* QueryStage and a QueryStageInput. If reusing Exchange is enabled, it finds duplicated exchanges
* and uses the same QueryStage for all the references.
*/
case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {

val newPlan = if (!conf.exchangeReuseEnabled) {
plan.transformUp {
case e: ShuffleExchangeExec =>
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
case e: BroadcastExchangeExec =>
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
}
} else {
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val stages = mutable.HashMap[StructType, ArrayBuffer[QueryStage]]()

plan.transformUp {
case exchange: Exchange =>
val sameSchema = stages.getOrElseUpdate(exchange.schema, ArrayBuffer[QueryStage]())
val samePlan = sameSchema.find { s =>
exchange.sameResult(s.child)
}
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
exchange match {
case e: ShuffleExchangeExec => ShuffleQueryStageInput(
samePlan.get.asInstanceOf[ShuffleQueryStage], exchange.output)
case e: BroadcastExchangeExec => BroadcastQueryStageInput(
samePlan.get.asInstanceOf[BroadcastQueryStage], exchange.output)
}
} else {
val queryStageInput = exchange match {
case e: ShuffleExchangeExec =>
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
case e: BroadcastExchangeExec =>
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
}
sameSchema += queryStageInput.childStage
queryStageInput
}
}
}
ResultQueryStage(newPlan)
}
}
Loading

0 comments on commit 8a4a29b

Please sign in to comment.