Skip to content

Commit

Permalink
Reader side changes for hive bucketing
Browse files Browse the repository at this point in the history
  • Loading branch information
tejasapatil committed Jan 13, 2018
1 parent 7460770 commit 3c367a0
Show file tree
Hide file tree
Showing 11 changed files with 902 additions and 73 deletions.
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@ private[spark] class CoalescedRDD[T: ClassTag](
partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
}
}
/**
* Coalesce the partitions of a parent RDD into fewer partitions, so that each partition of
* this RDD computes one or more of the parent ones. Every i'th partition of the parent RDD is
* mapped to (i % targetPartitions)'th partition of the output RDD
*/
private[spark] class RoundRobinPartitionCoalescer() extends PartitionCoalescer with Serializable {
def coalesce(targetPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
val partitionGroups = ArrayBuffer[PartitionGroup]()
for (_ <- 0 until targetPartitions) {
partitionGroups += new PartitionGroup(None)
}

for ((p, i) <- parent.partitions.zipWithIndex) {
partitionGroups(i % targetPartitions).partitions += p
}
partitionGroups.toArray
}
}

/**
* Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.plans.physical

import scala.language.existentials

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{DataType, IntegerType}

Expand Down Expand Up @@ -109,7 +108,7 @@ case class ClusteredDistribution(
*/
case class HashClusteredDistribution(
expressions: Seq[Expression],
requiredNumPartitions: Option[Int],
requiredNumPartitions: Option[Int] = None,
hashingFunctionClass: Class[_ <: HashExpression[Int]] = classOf[Murmur3Hash])
extends Distribution {

Expand Down Expand Up @@ -231,8 +230,8 @@ case class HashPartitioning(
super.satisfies(required) || {
required match {
case h: HashClusteredDistribution =>
h.hashingFunctionClass == hashingFunctionClass &&
(h.requiredNumPartitions.isEmpty || h.requiredNumPartitions.get == numPartitions) &&
h.hashingFunctionClass == hashingFunctionClass &&
expressions.length == h.expressions.length &&
expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,22 @@ class DistributionSuite extends SparkFunSuite {

checkSatisfied(
HashPartitioning(Seq('a, 'b, 'c), 10),
ClusteredDistribution(Seq('a, 'b, 'c), Some(10), Some(classOf[Murmur3Hash])),
HashClusteredDistribution(Seq('a, 'b, 'c), Some(10), classOf[Murmur3Hash]),
true)

checkSatisfied(
HashPartitioning(Seq('a, 'b, 'c), 10),
ClusteredDistribution(Seq('a, 'b, 'c), Some(12), Some(classOf[Murmur3Hash])),
HashClusteredDistribution(Seq('a, 'b, 'c), Some(12), classOf[Murmur3Hash]),
false)

checkSatisfied(
HashPartitioning(Seq('a, 'b, 'c), 10),
ClusteredDistribution(Seq('d, 'e), Some(10), Some(classOf[Murmur3Hash])),
HashClusteredDistribution(Seq('d, 'e), Some(10), classOf[Murmur3Hash]),
false)

checkSatisfied(
HashPartitioning(Seq('a, 'b, 'c), 10),
ClusteredDistribution(Seq('a, 'b, 'c), Some(10), Some(classOf[HiveHash])),
HashClusteredDistribution(Seq('a, 'b, 'c), Some(10), classOf[HiveHash]),
false)

checkSatisfied(
Expand Down Expand Up @@ -146,36 +146,6 @@ class DistributionSuite extends SparkFunSuite {
OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc, 'd.desc)),
true)

checkSatisfied(
RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
ClusteredDistribution(Seq('a, 'b, 'c), Some(10), None),
true)

checkSatisfied(
RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
ClusteredDistribution(Seq('c, 'b, 'a), Some(10), None),
true)

checkSatisfied(
RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
ClusteredDistribution(Seq('b, 'c, 'a, 'd), Some(10), None),
true)

checkSatisfied(
RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
ClusteredDistribution(Seq('b, 'c, 'a, 'd), Some(10), Some(classOf[Murmur3Hash])),
false)

checkSatisfied(
RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
ClusteredDistribution(Seq('b, 'c, 'a, 'd), Some(12), Some(classOf[Murmur3Hash])),
false)

checkSatisfied(
RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
ClusteredDistribution(Seq('b, 'c, 'a, 'd), Some(10), Some(classOf[HiveHash])),
false)

// Cases which need an exchange between two data properties.
// TODO: We can have an optimization to first sort the dataset
// by a.asc and then sort b, and c in a partition. This optimization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.exchange

import scala.collection.mutable.ArrayBuffer
import scala.language.existentials

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
Expand Down Expand Up @@ -145,6 +146,16 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
assert(requiredChildDistributions.length == children.length)
assert(requiredChildOrderings.length == children.length)

val distinctNumPartitionsExpected =
requiredChildDistributions.flatMap(_.requiredNumPartitions).distinct

val numPreShufflePartitions =
if (distinctNumPartitionsExpected.isEmpty || distinctNumPartitionsExpected.size > 1) {
defaultNumPreShufflePartitions
} else {
distinctNumPartitionsExpected.head
}

// Ensure that the operator's children satisfy their output distribution requirements.
children = children.zip(requiredChildDistributions).map {
case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
Expand All @@ -153,7 +164,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
BroadcastExchangeExec(mode, child)
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(defaultNumPreShufflePartitions)
.getOrElse(numPreShufflePartitions)
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
}

Expand Down Expand Up @@ -262,14 +273,44 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}
}

private def adjustHashingFunctionAndNumPartitions(
plan: SparkPlan,
requiredNumPartitions: Option[Int],
hashingFunctionClass: Class[_ <: HashExpression[Int]]):
(Option[Int], Class[_ <: HashExpression[Int]]) = {

val childHashPartitionings = plan.children.map(_.outputPartitioning)
.filter(_.isInstanceOf[HashPartitioning])
.map(_.asInstanceOf[HashPartitioning])

val distinctRequiredNumPartitions = childHashPartitionings.map(_.numPartitions).distinct
val newRequiredNumPartitions =
if (distinctRequiredNumPartitions.nonEmpty && distinctRequiredNumPartitions.size == 1) {
Some(distinctRequiredNumPartitions.head)
} else {
requiredNumPartitions
}

val distinctHashingFunctions = childHashPartitionings.map(_.hashingFunctionClass).distinct
val newHashingFunctionClass =
if (distinctHashingFunctions.nonEmpty && distinctHashingFunctions.size == 1) {
distinctHashingFunctions.head
} else {
hashingFunctionClass
}

(newRequiredNumPartitions, newHashingFunctionClass)
}

/**
* TODO(tejasp) update the doc as per new edits
* When the physical operators are created for JOIN, the ordering of join keys is based on order
* in which the join keys appear in the user query. That might not match with the output
* partitioning of the join node's children (thus leading to extra sort / shuffle being
* introduced). This rule will change the ordering of the join keys to match with the
* partitioning of the join nodes' children.
*/
private def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
private def adjustJoinRequirements(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left,
right) =>
Expand All @@ -278,16 +319,25 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
BroadcastHashJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, buildSide, condition,
left, right)

case ShuffledHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left, right) =>
case ShuffledHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left, right,
requiredNumPartitions, hashingFunctionClass) =>

val (reorderedLeftKeys, reorderedRightKeys) =
reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning)
val (newRequiredNumPartitions, newHashingFunctionClass) =
adjustHashingFunctionAndNumPartitions(plan, requiredNumPartitions, hashingFunctionClass)
ShuffledHashJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, buildSide, condition,
left, right)
left, right, newRequiredNumPartitions, newHashingFunctionClass)

case SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, left, right,
requiredNumPartitions, hashingFunctionClass) =>

case SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, left, right) =>
val (reorderedLeftKeys, reorderedRightKeys) =
reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning)
SortMergeJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, condition, left, right)
val (newRequiredNumPartitions, newHashingFunctionClass) =
adjustHashingFunctionAndNumPartitions(plan, requiredNumPartitions, hashingFunctionClass)
SortMergeJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, condition,
left, right, newRequiredNumPartitions, newHashingFunctionClass)
}
}

Expand All @@ -299,6 +349,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
case _ => operator
}
case operator: SparkPlan =>
ensureDistributionAndOrdering(reorderJoinPredicates(operator))
ensureDistributionAndOrdering(adjustJoinRequirements(operator))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.execution.joins

import scala.language.existentials

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Expression, HashExpression, Murmur3Hash}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
Expand All @@ -36,7 +38,9 @@ case class ShuffledHashJoinExec(
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
right: SparkPlan,
requiredNumPartitions: Option[Int] = None,
hashingFunctionClass: Class[_ <: HashExpression[Int]] = classOf[Murmur3Hash])
extends BinaryExecNode with HashJoin {

override lazy val metrics = Map(
Expand All @@ -45,8 +49,11 @@ case class ShuffledHashJoinExec(
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"),
"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))

override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
override def requiredChildDistribution: Seq[Distribution] = {
HashClusteredDistribution(leftKeys, requiredNumPartitions, hashingFunctionClass) ::
HashClusteredDistribution(rightKeys, requiredNumPartitions, hashingFunctionClass) ::
Nil
}

private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
val buildDataSize = longMetric("buildDataSize")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.joins

import scala.collection.mutable.ArrayBuffer
import scala.language.existentials

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -39,7 +40,10 @@ case class SortMergeJoinExec(
joinType: JoinType,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with CodegenSupport {
right: SparkPlan,
requiredNumPartitions: Option[Int] = None,
hashingFunctionClass: Class[_ <: HashExpression[Int]] = classOf[Murmur3Hash])
extends BinaryExecNode with CodegenSupport {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down Expand Up @@ -77,8 +81,11 @@ case class SortMergeJoinExec(
s"${getClass.getSimpleName} should not take $x as the JoinType")
}

override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
override def requiredChildDistribution: Seq[Distribution] = {
HashClusteredDistribution(leftKeys, requiredNumPartitions, hashingFunctionClass) ::
HashClusteredDistribution(rightKeys, requiredNumPartitions, hashingFunctionClass) ::
Nil
}

override def outputOrdering: Seq[SortOrder] = joinType match {
// For inner join, orders of both sides keys should be kept.
Expand Down
Loading

0 comments on commit 3c367a0

Please sign in to comment.