Skip to content

Commit

Permalink
Support left/right outer join in handling data skew feature (apache#60)
Browse files Browse the repository at this point in the history
* Support left/right outer join in data skew feature

* Style

* Refactor & style

* Modify logic for non-split conditions(join type and skewed side)

* Refactor
  • Loading branch information
Guo Chenzhao authored and carsonwang committed Aug 3, 2018
1 parent 34285fa commit bb39695
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.sql.execution.adaptive

import scala.collection.immutable.Nil
import scala.collection.mutable

import org.apache.spark.sql.catalyst.plans.{Cross, Inner, JoinType, LeftSemi}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{SortExec, SparkPlan, UnionExec}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
Expand All @@ -28,6 +29,8 @@ import org.apache.spark.sql.internal.SQLConf

case class HandleSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {

private val supportedJoinTypes = Inner :: Cross :: LeftSemi :: LeftOuter:: RightOuter :: Nil

private def isSizeSkewed(size: Long, medianSize: Long): Boolean = {
size > medianSize * conf.adaptiveSkewedFactor &&
size > conf.adaptiveSkewedSizeThreshold
Expand Down Expand Up @@ -96,15 +99,26 @@ case class HandleSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
equallyDivide(queryStageInput.numMapper, numSplits).toArray
}

/**
* Base optimization support check: the join type is supported and plan statistics is available.
* Note that for some join types(like left outer), whether a certain partition can be optimized
* also depends on the filed isSkewAndSupportsSplit.
*/
private def supportOptimization(
joinType: JoinType,
left: QueryStageInput,
right: QueryStageInput): Boolean = {
(joinType == Inner || joinType == Cross || joinType == LeftSemi) &&
supportedJoinTypes.contains(joinType) &&
left.childStage.stats.getPartitionStatistics.isDefined &&
right.childStage.stats.getPartitionStatistics.isDefined
}

private def supportSplitOnLeftPartition(joinType: JoinType) = joinType != RightOuter

private def supportSplitOnRightPartition(joinType: JoinType) = {
joinType != LeftOuter && joinType != LeftSemi
}

private def handleSkewedJoin(
operator: SparkPlan,
queryStage: QueryStage): SparkPlan = operator.transformUp {
Expand All @@ -131,18 +145,21 @@ case class HandleSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
for (partitionId <- 0 until numPartitions) {
val isLeftSkew = isSkewed(leftStats, partitionId, leftMedSize, leftMedRowCount)
val isRightSkew = isSkewed(rightStats, partitionId, rightMedSize, rightMedRowCount)
if (isLeftSkew || isRightSkew) {
val isSkewAndSupportsSplit =
(isLeftSkew && supportSplitOnLeftPartition(joinType)) ||
(isRightSkew && supportSplitOnRightPartition(joinType))

if (isSkewAndSupportsSplit) {
skewedPartitions += partitionId
val leftMapIdStartIndices = if (isLeftSkew) {
val leftMapIdStartIndices = if (isLeftSkew && supportSplitOnLeftPartition(joinType)) {
estimateMapIdStartIndices(left, partitionId, leftMedSize, leftMedRowCount)
} else {
Array(0)
}
val rightMapIdStartIndices = if (!isRightSkew || joinType == LeftSemi) {
// For left semi join, we don't split the right partition
Array(0)
} else {
val rightMapIdStartIndices = if (isRightSkew && supportSplitOnRightPartition(joinType)) {
estimateMapIdStartIndices(right, partitionId, rightMedSize, rightMedRowCount)
} else {
Array(0)
}

for (i <- 0 until leftMapIdStartIndices.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.adaptive

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import org.apache.spark.{MapOutputStatistics, SparkContext, broadcast}

import org.apache.spark.{broadcast, MapOutputStatistics, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,241 @@ class QueryStageSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}

test("adaptive skewed join: left/right outer join and skewed on right side") {
val spark = defaultSparkSession
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_JOIN_ENABLED.key, "false")
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED.key, "true")
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_ROW_COUNT_THRESHOLD.key, 10)
withSparkSession(spark) { spark: SparkSession =>
val df1 =
spark
.range(0, 10, 1, 2)
.selectExpr("id % 5 as key1", "id as value1")
val df2 =
spark
.range(0, 1000, 1, numInputPartitions)
.selectExpr("id % 1 as key2", "id as value2")

val leftOuterJoin =
df1.join(df2, col("key1") === col("key2"), "left").select(col("key1"), col("value2"))
val rightOuterJoin =
df1.join(df2, col("key1") === col("key2"), "right").select(col("key1"), col("value2"))

// Before Execution, there is one SortMergeJoin
val smjBeforeExecutionForLeftOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjBeforeExecutionForLeftOuter.length === 1)

val smjBeforeExecutionForRightOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjBeforeExecutionForRightOuter.length === 1)

// Check the answer.
val expectedAnswerForLeftOuter =
spark
.range(0, 1000)
.selectExpr("0 as key", "id as value")
.union(spark.range(0, 1000).selectExpr("0 as key", "id as value"))
.union(spark.range(0, 10, 1).filter(_ % 5 != 0).selectExpr("id % 5 as key1", "null"))
checkAnswer(
leftOuterJoin,
expectedAnswerForLeftOuter.collect())

val expectedAnswerForRightOuter =
spark
.range(0, 1000)
.selectExpr("0 as key", "id as value")
.union(spark.range(0, 1000).selectExpr("0 as key", "id as value"))
checkAnswer(
rightOuterJoin,
expectedAnswerForRightOuter.collect())

// For the left outer join case: during execution, the SMJ can not be translated to any sub
// joins due to the skewed side is on the right but the join type is left outer
// (not correspond with each other)
val smjAfterExecutionForLeftOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjAfterExecutionForLeftOuter.length === 1)

// For the right outer join case: during execution, the SMJ is changed to Union of SMJ + 5 SMJ
// joins due to the skewed side is on the right and the join type is right
// outer (correspond with each other)
val smjAfterExecutionForRightOuter = rightOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}

assert(smjAfterExecutionForRightOuter.length === 6)
val queryStageInputs = rightOuterJoin.queryExecution.executedPlan.collect {
case q: ShuffleQueryStageInput => q
}
assert(queryStageInputs.length === 2)
assert(queryStageInputs(0).skewedPartitions === queryStageInputs(1).skewedPartitions)
assert(queryStageInputs(0).skewedPartitions === Some(Set(0)))

}
}

test("adaptive skewed join: left/right outer join and skewed on left side") {
val spark = defaultSparkSession
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_JOIN_ENABLED.key, "false")
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED.key, "true")
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_ROW_COUNT_THRESHOLD.key, 10)
withSparkSession(spark) { spark: SparkSession =>
val df1 =
spark
.range(0, 1000, 1, numInputPartitions)
.selectExpr("id % 1 as key1", "id as value1")
val df2 =
spark
.range(0, 10, 1, 2)
.selectExpr("id % 5 as key2", "id as value2")

val leftOuterJoin =
df1.join(df2, col("key1") === col("key2"), "left").select(col("key1"), col("value1"))
val rightOuterJoin =
df1.join(df2, col("key1") === col("key2"), "right").select(col("key1"), col("value1"))

// Before Execution, there is one SortMergeJoin
val smjBeforeExecutionForLeftOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjBeforeExecutionForLeftOuter.length === 1)

val smjBeforeExecutionForRightOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjBeforeExecutionForRightOuter.length === 1)

// Check the answer.
val expectedAnswerForLeftOuter =
spark
.range(0, 1000)
.selectExpr("0 as key", "id as value")
.union(spark.range(0, 1000).selectExpr("0 as key", "id as value"))
checkAnswer(
leftOuterJoin,
expectedAnswerForLeftOuter.collect())

val expectedAnswerForRightOuter =
spark
.range(0, 1000)
.selectExpr("0 as key", "id as value")
.union(spark.range(0, 1000).selectExpr("0 as key", "id as value"))
.union(spark.range(0, 10, 1).filter(_ % 5 != 0).selectExpr("null", "null"))

checkAnswer(
rightOuterJoin,
expectedAnswerForRightOuter.collect())

// For the left outer join case: during execution, the SMJ is changed to Union of SMJ + 5 SMJ
// joins due to the skewed side is on the left and the join type is left outer
// (correspond with each other)
val smjAfterExecutionForLeftOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjAfterExecutionForLeftOuter.length === 6)

// For the right outer join case: during execution, the SMJ can not be translated to any sub
// joins due to the skewed side is on the left but the join type is right outer
// (not correspond with each other)
val smjAfterExecutionForRightOuter = rightOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}

assert(smjAfterExecutionForRightOuter.length === 1)
val queryStageInputs = leftOuterJoin.queryExecution.executedPlan.collect {
case q: ShuffleQueryStageInput => q
}
assert(queryStageInputs.length === 2)
assert(queryStageInputs(0).skewedPartitions === queryStageInputs(1).skewedPartitions)
assert(queryStageInputs(0).skewedPartitions === Some(Set(0)))

}
}

test("adaptive skewed join: left/right outer join and skewed on both sides") {
val spark = defaultSparkSession
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_JOIN_ENABLED.key, "false")
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED.key, "true")
spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_ROW_COUNT_THRESHOLD.key, 10)
withSparkSession(spark) { spark: SparkSession =>
import spark.implicits._
val df1 =
spark
.range(0, 100, 1, numInputPartitions)
.selectExpr("id % 1 as key1", "id as value1")
val df2 =
spark
.range(0, 100, 1, numInputPartitions)
.selectExpr("id % 1 as key2", "id as value2")

val leftOuterJoin =
df1.join(df2, col("key1") === col("key2"), "left").select(col("key1"), col("value2"))
val rightOuterJoin =
df1.join(df2, col("key1") === col("key2"), "right").select(col("key1"), col("value2"))

// Before Execution, there is one SortMergeJoin
val smjBeforeExecutionForLeftOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjBeforeExecutionForLeftOuter.length === 1)

val smjBeforeExecutionForRightOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjBeforeExecutionForRightOuter.length === 1)

// Check the answer.
val expectedAnswerForLeftOuter =
spark
.range(0, 100)
.flatMap(i => Seq.fill(100)(i))
.selectExpr("0 as key", "value")

checkAnswer(
leftOuterJoin,
expectedAnswerForLeftOuter.collect())

val expectedAnswerForRightOuter =
spark
.range(0, 100)
.flatMap(i => Seq.fill(100)(i))
.selectExpr("0 as key", "value")
checkAnswer(
rightOuterJoin,
expectedAnswerForRightOuter.collect())

// For the left outer join case: during execution, although the skewed sides include the
// right, the SMJ is still changed to Union of SMJ + 5 SMJ joins due to the skewed sides
// also include the left, so we split the left skewed partition
// (correspondence exists)
val smjAfterExecutionForLeftOuter = leftOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}
assert(smjAfterExecutionForLeftOuter.length === 6)

// For the right outer join case: during execution, although the skewed sides include the
// left, the SMJ is still changed to Union of SMJ + 5 SMJ joins due to the skewed sides
// also include the right, so we split the right skewed partition
// (correspondence exists)
val smjAfterExecutionForRightOuter = rightOuterJoin.queryExecution.executedPlan.collect {
case smj: SortMergeJoinExec => smj
}

assert(smjAfterExecutionForRightOuter.length === 6)
val queryStageInputs = rightOuterJoin.queryExecution.executedPlan.collect {
case q: ShuffleQueryStageInput => q
}
assert(queryStageInputs.length === 2)
assert(queryStageInputs(0).skewedPartitions === queryStageInputs(1).skewedPartitions)
assert(queryStageInputs(0).skewedPartitions === Some(Set(0)))

}
}

test("row count statistics, compressed") {
val spark = defaultSparkSession
withSparkSession(spark) { spark: SparkSession =>
Expand Down

0 comments on commit bb39695

Please sign in to comment.