-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2213] [SQL] sort merge join for spark sql #5208
Changes from all commits
880d8e9
4464f16
95db7ad
303b6da
57baa40
2edd235
e3ec096
42fca0e
07ce92f
925203b
068c35d
645c70b
a28277f
47455c9
171001f
3af6ba5
078d69b
00a4430
61d7f49
2875ef2
8681d73
6e897dd
c8e82a3
b198278
7ddd656
5492884
952168a
413fd24
ec8061b
f515cd2
f91a2ae
5049d88
2493b9f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,24 +19,42 @@ package org.apache.spark.sql.execution | |
|
||
import org.apache.spark.annotation.DeveloperApi | ||
import org.apache.spark.shuffle.sort.SortShuffleManager | ||
import org.apache.spark.sql.catalyst.expressions | ||
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf} | ||
import org.apache.spark.rdd.{RDD, ShuffledRDD} | ||
import org.apache.spark.sql.{SQLContext, Row} | ||
import org.apache.spark.sql.catalyst.errors.attachTree | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, RowOrdering} | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans.physical._ | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.util.MutablePair | ||
|
||
object Exchange { | ||
/** | ||
* Returns true when the ordering expressions are a subset of the key. | ||
* if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange]]. | ||
*/ | ||
def canSortWithShuffle(partitioning: Partitioning, desiredOrdering: Seq[SortOrder]): Boolean = { | ||
desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet) | ||
} | ||
} | ||
|
||
/** | ||
* :: DeveloperApi :: | ||
* Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each | ||
* resulting partition based on expressions from the partition key. It is invalid to construct an | ||
* exchange operator with a `newOrdering` that cannot be calculated using the partitioning key. | ||
*/ | ||
@DeveloperApi | ||
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { | ||
case class Exchange( | ||
newPartitioning: Partitioning, | ||
newOrdering: Seq[SortOrder], | ||
child: SparkPlan) | ||
extends UnaryNode { | ||
|
||
override def outputPartitioning: Partitioning = newPartitioning | ||
|
||
override def outputOrdering: Seq[SortOrder] = newOrdering | ||
|
||
override def output: Seq[Attribute] = child.output | ||
|
||
/** We must copy rows when sort based shuffle is on */ | ||
|
@@ -45,6 +63,20 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una | |
private val bypassMergeThreshold = | ||
child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) | ||
|
||
private val keyOrdering = { | ||
if (newOrdering.nonEmpty) { | ||
val key = newPartitioning.keyExpressions | ||
val boundOrdering = newOrdering.map { o => | ||
val ordinal = key.indexOf(o.child) | ||
if (ordinal == -1) sys.error(s"Invalid ordering on $o requested for $newPartitioning") | ||
o.copy(child = BoundReference(ordinal, o.child.dataType, o.child.nullable)) | ||
} | ||
new RowOrdering(boundOrdering) | ||
} else { | ||
null // Ordering will not be used | ||
} | ||
} | ||
|
||
override def execute(): RDD[Row] = attachTree(this , "execute") { | ||
newPartitioning match { | ||
case HashPartitioning(expressions, numPartitions) => | ||
|
@@ -56,7 +88,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una | |
// we can avoid the defensive copies to improve performance. In the long run, we probably | ||
// want to include information in shuffle dependencies to indicate whether elements in the | ||
// source RDD should be copied. | ||
val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) { | ||
val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold | ||
|
||
val rdd = if (willMergeSort || newOrdering.nonEmpty) { | ||
child.execute().mapPartitions { iter => | ||
val hashExpressions = newMutableProjection(expressions, child.output)() | ||
iter.map(r => (hashExpressions(r).copy(), r.copy())) | ||
|
@@ -69,12 +103,17 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una | |
} | ||
} | ||
val part = new HashPartitioner(numPartitions) | ||
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part) | ||
val shuffled = | ||
if (newOrdering.nonEmpty) { | ||
new ShuffledRDD[Row, Row, Row](rdd, part).setKeyOrdering(keyOrdering) | ||
} else { | ||
new ShuffledRDD[Row, Row, Row](rdd, part) | ||
} | ||
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) | ||
shuffled.map(_._2) | ||
|
||
case RangePartitioning(sortingExpressions, numPartitions) => | ||
val rdd = if (sortBasedShuffleOn) { | ||
val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) { | ||
child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))} | ||
} else { | ||
child.execute().mapPartitions { iter => | ||
|
@@ -87,7 +126,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una | |
implicit val ordering = new RowOrdering(sortingExpressions, child.output) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe this line is redundant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, I see... For RangePartitioner.. |
||
|
||
val part = new RangePartitioner(numPartitions, rdd, ascending = true) | ||
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) | ||
val shuffled = | ||
if (newOrdering.nonEmpty) { | ||
new ShuffledRDD[Row, Null, Null](rdd, part).setKeyOrdering(keyOrdering) | ||
} else { | ||
new ShuffledRDD[Row, Null, Null](rdd, part) | ||
} | ||
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) | ||
|
||
shuffled.map(_._1) | ||
|
@@ -120,27 +164,34 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una | |
* Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]] | ||
* of input data meets the | ||
* [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for | ||
* each operator by inserting [[Exchange]] Operators where required. | ||
* each operator by inserting [[Exchange]] Operators where required. Also ensure that the | ||
* required input partition ordering requirements are met. | ||
*/ | ||
private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] { | ||
private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] { | ||
// TODO: Determine the number of partitions. | ||
def numPartitions: Int = sqlContext.conf.numShufflePartitions | ||
|
||
def apply(plan: SparkPlan): SparkPlan = plan.transformUp { | ||
case operator: SparkPlan => | ||
// Check if every child's outputPartitioning satisfies the corresponding | ||
// True iff every child's outputPartitioning satisfies the corresponding | ||
// required data distribution. | ||
def meetsRequirements: Boolean = | ||
!operator.requiredChildDistribution.zip(operator.children).map { | ||
operator.requiredChildDistribution.zip(operator.children).forall { | ||
case (required, child) => | ||
val valid = child.outputPartitioning.satisfies(required) | ||
logDebug( | ||
s"${if (valid) "Valid" else "Invalid"} distribution," + | ||
s"required: $required current: ${child.outputPartitioning}") | ||
valid | ||
}.exists(!_) | ||
} | ||
|
||
// Check if outputPartitionings of children are compatible with each other. | ||
// True iff any of the children are incorrectly sorted. | ||
def needsAnySort: Boolean = | ||
operator.requiredChildOrdering.zip(operator.children).exists { | ||
case (required, child) => required.nonEmpty && required != child.outputOrdering | ||
} | ||
|
||
// True iff outputPartitionings of children are compatible with each other. | ||
// It is possible that every child satisfies its required data distribution | ||
// but two children have incompatible outputPartitionings. For example, | ||
// A dataset is range partitioned by "a.asc" (RangePartitioning) and another | ||
|
@@ -157,28 +208,69 @@ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPl | |
case Seq(a,b) => a compatibleWith b | ||
}.exists(!_) | ||
|
||
// Check if the partitioning we want to ensure is the same as the child's output | ||
// partitioning. If so, we do not need to add the Exchange operator. | ||
def addExchangeIfNecessary(partitioning: Partitioning, child: SparkPlan): SparkPlan = | ||
if (child.outputPartitioning != partitioning) Exchange(partitioning, child) else child | ||
// Adds Exchange or Sort operators as required | ||
def addOperatorsIfNecessary( | ||
partitioning: Partitioning, | ||
rowOrdering: Seq[SortOrder], | ||
child: SparkPlan): SparkPlan = { | ||
val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering | ||
val needsShuffle = child.outputPartitioning != partitioning | ||
val canSortWithShuffle = Exchange.canSortWithShuffle(partitioning, rowOrdering) | ||
|
||
if (needSort && needsShuffle && canSortWithShuffle) { | ||
Exchange(partitioning, rowOrdering, child) | ||
} else { | ||
val withShuffle = if (needsShuffle) { | ||
Exchange(partitioning, Nil, child) | ||
} else { | ||
child | ||
} | ||
|
||
if (meetsRequirements && compatible) { | ||
val withSort = if (needSort) { | ||
if (sqlContext.conf.externalSortEnabled) { | ||
ExternalSort(rowOrdering, global = false, withShuffle) | ||
} else { | ||
Sort(rowOrdering, global = false, withShuffle) | ||
} | ||
} else { | ||
withShuffle | ||
} | ||
|
||
withSort | ||
} | ||
} | ||
|
||
if (meetsRequirements && compatible && !needsAnySort) { | ||
operator | ||
} else { | ||
// At least one child does not satisfies its required data distribution or | ||
// at least one child's outputPartitioning is not compatible with another child's | ||
// outputPartitioning. In this case, we need to add Exchange operators. | ||
val repartitionedChildren = operator.requiredChildDistribution.zip(operator.children).map { | ||
case (AllTuples, child) => | ||
addExchangeIfNecessary(SinglePartition, child) | ||
case (ClusteredDistribution(clustering), child) => | ||
addExchangeIfNecessary(HashPartitioning(clustering, numPartitions), child) | ||
case (OrderedDistribution(ordering), child) => | ||
addExchangeIfNecessary(RangePartitioning(ordering, numPartitions), child) | ||
case (UnspecifiedDistribution, child) => child | ||
case (dist, _) => sys.error(s"Don't know how to ensure $dist") | ||
val requirements = | ||
(operator.requiredChildDistribution, operator.requiredChildOrdering, operator.children) | ||
|
||
val fixedChildren = requirements.zipped.map { | ||
case (AllTuples, rowOrdering, child) => | ||
addOperatorsIfNecessary(SinglePartition, rowOrdering, child) | ||
case (ClusteredDistribution(clustering), rowOrdering, child) => | ||
addOperatorsIfNecessary(HashPartitioning(clustering, numPartitions), rowOrdering, child) | ||
case (OrderedDistribution(ordering), rowOrdering, child) => | ||
addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child) | ||
|
||
case (UnspecifiedDistribution, Seq(), child) => | ||
child | ||
case (UnspecifiedDistribution, rowOrdering, child) => | ||
if (sqlContext.conf.externalSortEnabled) { | ||
ExternalSort(rowOrdering, global = false, child) | ||
} else { | ||
Sort(rowOrdering, global = false, child) | ||
} | ||
|
||
case (dist, ordering, _) => | ||
sys.error(s"Don't know how to ensure $dist with ordering $ordering") | ||
} | ||
operator.withNewChildren(repartitionedChildren) | ||
|
||
operator.withNewChildren(fixedChildren) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,6 +72,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ | |
def requiredChildDistribution: Seq[Distribution] = | ||
Seq.fill(children.size)(UnspecifiedDistribution) | ||
|
||
/** Specifies how data is ordered in each partition. */ | ||
def outputOrdering: Seq[SortOrder] = Nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably need to think about the default ordering, e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Current implementation will not cause bugs, but may not optimizes for some cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Many unary nodes actually change the ordering. This seems like a safe option. |
||
|
||
/** Specifies sort order for each partition requirements on the input data for this operator. */ | ||
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) | ||
|
||
/** | ||
* Runs this query returning the result as an RDD. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,6 +90,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => | ||
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft) | ||
|
||
// If the sort merge join option is set, we want to use sort merge join prior to hashjoin | ||
// for now let's support inner join first, then add outer join | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets explictly comment that we try SortMergeJoin first, but only if the config is set. |
||
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) | ||
if sqlContext.conf.sortMergeJoinEnabled => | ||
val mergeJoin = | ||
joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right)) | ||
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil | ||
|
||
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => | ||
val buildSide = | ||
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { | ||
|
@@ -299,7 +307,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
case logical.OneRowRelation => | ||
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil | ||
case logical.Repartition(expressions, child) => | ||
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil | ||
execution.Exchange( | ||
HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil | ||
case e @ EvaluatePython(udf, child, _) => | ||
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil | ||
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be good to also explain that we need the ordering expressions to be a subset of the key because we are taking advantage of
ShuffledRDD
'sKeyOrdering
for sorting.