Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 8, 2018
1 parent 639e9b6 commit a2f1bc1
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ case class ClusteredDistribution(
}

/**
* Represents data where tuples have been partitioned according to the hash of the given
* Represents data where tuples have been clustered according to the hash of the given
* `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only
* [[HashPartitioning]] can satisfy this distribution.
*
* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the
* number of partitions, this distribution strictly requires which partition the tuple should be in.
*/
case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution {
case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution {
require(
expressions != Nil,
"The expressions for hash of a HashPartitionedDistribution should not be Nil. " +
Expand Down Expand Up @@ -208,7 +208,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
override def satisfies(required: Distribution): Boolean = {
super.satisfies(required) || {
required match {
case h: HashPartitionedDistribution =>
case h: HashClusteredDistribution =>
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* guarantees that the outputs of these children will have same number of partitions, so that the
* operator can safely zip partitions of these children's result RDDs. Some operators can leverage
* this guarantee to satisfy some interesting requirement, e.g., non-broadcast joins can specify
* HashPartitionedDistribution(a,b) for its left child, and specify
* HashPartitionedDistribution(c,d) for its right child, then it's guaranteed that left and right
* child are co-partitioned by a,b/c,d, which means tuples of same value are in the
* partitions of same index, e.g., (a=1,b=2) and (c=1,d=2) are both in the second partition of
* left and right child.
* HashClusteredDistribution(a,b) for its left child, and specify HashClusteredDistribution(c,d)
* for its right child, then it's guaranteed that left and right child are co-partitioned by
* a,b/c,d, which means tuples of same value are in the partitions of same index, e.g.,
* (a=1,b=2) and (c=1,d=2) are both in the second partition of left and right child.
*/
def requiredChildDistribution: Seq[Distribution] =
Seq.fill(children.size)(UnspecifiedDistribution)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
// these children may not be partitioned in the same way.
// Please see the comment in withCoordinator for more details.
val supportsDistribution = requiredChildDistributions.forall { dist =>
dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashPartitionedDistribution]
dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution]
}
children.length > 1 && supportsDistribution
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case class ShuffledHashJoinExec(
"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))

override def requiredChildDistribution: Seq[Distribution] =
HashPartitionedDistribution(leftKeys) :: HashPartitionedDistribution(rightKeys) :: Nil
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil

private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
val buildDataSize = longMetric("buildDataSize")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class SortMergeJoinExec(
}

override def requiredChildDistribution: Seq[Distribution] =
HashPartitionedDistribution(leftKeys) :: HashPartitionedDistribution(rightKeys) :: Nil
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil

override def outputOrdering: Seq[SortOrder] = joinType match {
// For inner join, orders of both sides keys should be kept.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ case class CoGroupExec(
right: SparkPlan) extends BinaryExecNode with ObjectProducerExec {

override def requiredChildDistribution: Seq[Distribution] =
HashPartitionedDistribution(leftGroup) :: HashPartitionedDistribution(rightGroup) :: Nil
HashClusteredDistribution(leftGroup) :: HashClusteredDistribution(rightGroup) :: Nil

override def requiredChildOrdering: Seq[Seq[SortOrder]] =
leftGroup.map(SortOrder(_, Ascending)) :: rightGroup.map(SortOrder(_, Ascending)) :: Nil
Expand Down

0 comments on commit a2f1bc1

Please sign in to comment.