From e5bcf5bcf9c650e8a33987c738c89a380ec022c9 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Wed, 9 Jul 2014 14:33:33 -0700 Subject: [PATCH] Fix optimization conditions & update scala docs to explain. --- .../spark/sql/execution/SparkStrategies.scala | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4da2cc97a8fc4..cf689a80a6e67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -44,10 +44,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - // TODO: add comments to explain optimization /** * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be * evaluated by matching hash keys. + * + * This strategy applies a simple optimization based on the estimates of the physical sizes of + * the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an + * estimated physical size smaller than the user-settable threshold + * `spark.sql.auto.convert.join.size`, the planner would mark it as the ''build'' relation and + * mark the other relation as the ''stream'' side. If both estimates exceed the threshold, + * they will instead be used to decide the build side in a [[execution.ShuffledHashJoin]]. */ object HashJoin extends Strategy with PredicateHelper { private[this] def broadcastHashJoin( @@ -63,29 +69,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ExtractEquiJoinKeys( - Inner, - leftKeys, - rightKeys, - condition, - left, - right) + case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - case ExtractEquiJoinKeys( - Inner, - leftKeys, - rightKeys, - condition, - left, - right) + case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val buildSide = - if (right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize) BuildRight + if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) BuildRight else BuildLeft val hashJoin = execution.ShuffledHashJoin( @@ -150,16 +144,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - // TODO: add comments to explain optimization + /** + * This strategy applies a simple optimization based on the estimates of the physical sizes of + * the two join sides: the planner would mark the relation with the smaller estimated physical + * size as the ''build'' (broadcast) relation and mark the other as the ''stream'' relation. + */ object BroadcastNestedLoopJoin extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, joinType, condition) => val (streamed, broadcast) = - if (right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize) - (planLater(left), planLater(right)) - else (planLater(right), planLater(left)) + if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) (left, right) + else (right, left) execution.BroadcastNestedLoopJoin( - streamed, broadcast, joinType, condition)(sqlContext) :: Nil + planLater(streamed), planLater(broadcast), joinType, condition)(sqlContext) :: Nil case _ => Nil } }