Skip to content

Commit

Permalink
Fix optimization conditions & update scala docs to explain.
Browse files Browse the repository at this point in the history
  • Loading branch information
concretevitamin committed Jul 29, 2014
1 parent 7d9216a commit e5bcf5b
Showing 1 changed file with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

// TODO: add comments to explain optimization
/**
* Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
* evaluated by matching hash keys.
*
* This strategy applies a simple optimization based on the estimates of the physical sizes of
* the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an
* estimated physical size smaller than the user-settable threshold
* `spark.sql.auto.convert.join.size`, the planner would mark it as the ''build'' relation and
* mark the other relation as the ''stream'' side. If both estimates exceed the threshold,
* they will instead be used to decide the build side in a [[execution.ShuffledHashJoin]].
*/
object HashJoin extends Strategy with PredicateHelper {
private[this] def broadcastHashJoin(
Expand All @@ -63,29 +69,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
condition,
left,
right)
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
condition,
left,
right)
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val buildSide =
if (right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize) BuildRight
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) BuildRight
else BuildLeft
val hashJoin =
execution.ShuffledHashJoin(
Expand Down Expand Up @@ -150,16 +144,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

// TODO: add comments to explain optimization
/**
* This strategy applies a simple optimization based on the estimates of the physical sizes of
* the two join sides: the planner would mark the relation with the smaller estimated physical
* size as the ''build'' (broadcast) relation and mark the other as the ''stream'' relation.
*/
object BroadcastNestedLoopJoin extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Join(left, right, joinType, condition) =>
val (streamed, broadcast) =
if (right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize)
(planLater(left), planLater(right))
else (planLater(right), planLater(left))
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) (left, right)
else (right, left)
execution.BroadcastNestedLoopJoin(
streamed, broadcast, joinType, condition)(sqlContext) :: Nil
planLater(streamed), planLater(broadcast), joinType, condition)(sqlContext) :: Nil
case _ => Nil
}
}
Expand Down

0 comments on commit e5bcf5b

Please sign in to comment.