-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2205] [SQL] Avoid unnecessary exchange operators in multi-way joins #7773
Changes from all commits
2201129
d5b84c3
69bb072
7c2d2d8
e616d3b
c6667e7
f9516b0
d3d2e64
c57a954
247e5fa
884ab95
4a99204
73913f7
2963857
cd8269b
5c45924
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,8 +60,9 @@ case class ClusteredDistribution(clustering: Seq[Expression]) extends Distributi | |
/** | ||
* Represents data where tuples have been ordered according to the `ordering` | ||
* [[Expression Expressions]]. This is a strictly stronger guarantee than | ||
* [[ClusteredDistribution]] as an ordering will ensure that tuples that share the same value for | ||
* the ordering expressions are contiguous and will never be split across partitions. | ||
* [[ClusteredDistribution]] as an ordering will ensure that tuples that share the | ||
* same value for the ordering expressions are contiguous and will never be split across | ||
* partitions. | ||
*/ | ||
case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { | ||
require( | ||
|
@@ -86,8 +87,12 @@ sealed trait Partitioning { | |
*/ | ||
def satisfies(required: Distribution): Boolean | ||
|
||
/** Returns the expressions that are used to key the partitioning. */ | ||
def keyExpressions: Seq[Expression] | ||
/** | ||
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One clarification: how does this property differ from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This confusion has been significantly lessened by the removal of |
||
* guarantees the same partitioning scheme described by `other`. | ||
*/ | ||
// TODO: Add an example once we have the `nullSafe` concept. | ||
def guarantees(other: Partitioning): Boolean | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that we should only consider a name like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, makes sense. Then, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep. Let's leave this for now. |
||
} | ||
|
||
case class UnknownPartitioning(numPartitions: Int) extends Partitioning { | ||
|
@@ -96,23 +101,29 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning { | |
case _ => false | ||
} | ||
|
||
override def keyExpressions: Seq[Expression] = Nil | ||
override def guarantees(other: Partitioning): Boolean = false | ||
} | ||
|
||
case object SinglePartition extends Partitioning { | ||
val numPartitions = 1 | ||
|
||
override def satisfies(required: Distribution): Boolean = true | ||
|
||
override def keyExpressions: Seq[Expression] = Nil | ||
override def guarantees(other: Partitioning): Boolean = other match { | ||
case SinglePartition => true | ||
case _ => false | ||
} | ||
} | ||
|
||
case object BroadcastPartitioning extends Partitioning { | ||
val numPartitions = 1 | ||
|
||
override def satisfies(required: Distribution): Boolean = true | ||
|
||
override def keyExpressions: Seq[Expression] = Nil | ||
override def guarantees(other: Partitioning): Boolean = other match { | ||
case BroadcastPartitioning => true | ||
case _ => false | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -127,7 +138,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) | |
override def nullable: Boolean = false | ||
override def dataType: DataType = IntegerType | ||
|
||
private[this] lazy val clusteringSet = expressions.toSet | ||
lazy val clusteringSet = expressions.toSet | ||
|
||
override def satisfies(required: Distribution): Boolean = required match { | ||
case UnspecifiedDistribution => true | ||
|
@@ -136,7 +147,11 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) | |
case _ => false | ||
} | ||
|
||
override def keyExpressions: Seq[Expression] = expressions | ||
override def guarantees(other: Partitioning): Boolean = other match { | ||
case o: HashPartitioning => | ||
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions | ||
case _ => false | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -170,5 +185,57 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) | |
case _ => false | ||
} | ||
|
||
override def keyExpressions: Seq[Expression] = ordering.map(_.child) | ||
override def guarantees(other: Partitioning): Boolean = other match { | ||
case o: RangePartitioning => this == o | ||
case _ => false | ||
} | ||
} | ||
|
||
/** | ||
* A collection of [[Partitioning]]s that can be used to describe the partitioning | ||
* scheme of the output of a physical operator. It is usually used for an operator | ||
* that has multiple children. In this case, a [[Partitioning]] in this collection | ||
* describes how this operator's output is partitioned based on expressions from | ||
* a child. For example, for a Join operator on two tables `A` and `B` | ||
* with a join condition `A.key1 = B.key2`, assuming we use HashPartitioning schema, | ||
* there are two [[Partitioning]]s can be used to describe how the output of | ||
* this Join operator is partitioned, which are `HashPartitioning(A.key1)` and | ||
* `HashPartitioning(B.key2)`. It is also worth noting that `partitionings` | ||
* in this collection do not need to be equivalent, which is useful for | ||
* Outer Join operators. | ||
*/ | ||
case class PartitioningCollection(partitionings: Seq[Partitioning]) | ||
extends Expression with Partitioning with Unevaluable { | ||
|
||
require( | ||
partitionings.map(_.numPartitions).distinct.length == 1, | ||
s"PartitioningCollection requires all of its partitionings have the same numPartitions.") | ||
|
||
override def children: Seq[Expression] = partitionings.collect { | ||
case expr: Expression => expr | ||
} | ||
|
||
override def nullable: Boolean = false | ||
|
||
override def dataType: DataType = IntegerType | ||
|
||
override val numPartitions = partitionings.map(_.numPartitions).distinct.head | ||
|
||
/** | ||
* Returns true if any `partitioning` of this collection satisfies the given | ||
* [[Distribution]]. | ||
*/ | ||
override def satisfies(required: Distribution): Boolean = | ||
partitionings.exists(_.satisfies(required)) | ||
|
||
/** | ||
* Returns true if any `partitioning` of this collection guarantees | ||
* the given [[Partitioning]]. | ||
*/ | ||
override def guarantees(other: Partitioning): Boolean = | ||
partitionings.exists(_.guarantees(other)) | ||
|
||
override def toString: String = { | ||
partitionings.map(_.toString).mkString("(", " or ", ")") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ import org.apache.spark.annotation.DeveloperApi | |
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} | ||
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, Distribution, UnspecifiedDistribution} | ||
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} | ||
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} | ||
import org.apache.spark.util.ThreadUtils | ||
|
@@ -57,6 +57,8 @@ case class BroadcastHashOuterJoin( | |
override def requiredChildDistribution: Seq[Distribution] = | ||
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil | ||
|
||
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bug fix. |
||
|
||
@transient | ||
private val broadcastFuture = future { | ||
// Note that we use .execute().collect() because we don't want to convert data to Scala types | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi | |
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, ClusteredDistribution} | ||
import org.apache.spark.sql.catalyst.plans.physical._ | ||
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} | ||
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} | ||
|
||
|
@@ -44,6 +44,14 @@ case class ShuffledHashOuterJoin( | |
override def requiredChildDistribution: Seq[Distribution] = | ||
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil | ||
|
||
override def outputPartitioning: Partitioning = joinType match { | ||
case LeftOuter => left.outputPartitioning | ||
case RightOuter => right.outputPartitioning | ||
case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions) | ||
case x => | ||
throw new IllegalArgumentException(s"HashOuterJoin should not take $x as the JoinType") | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove this change for now. Once we have the nullSafe concept, we can better describe how the result of this join operator is partitioned. For example, right now, it is not safe to say that the output of this operator is partitioned by the |
||
protected override def doExecute(): RDD[InternalRow] = { | ||
val joinedRow = new JoinedRow() | ||
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems
keyExpressions
is not used at all and I do not remember when we added it. So, I am removing it.