Skip to content

Commit

Permalink
fix duplicate test
Browse files Browse the repository at this point in the history
  • Loading branch information
tejasapatil committed Jan 13, 2018
1 parent 3c367a0 commit d37eb8b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
assert(requiredChildDistributions.length == children.length)
assert(requiredChildOrderings.length == children.length)

val distinctNumPartitionsExpected =
requiredChildDistributions.flatMap(_.requiredNumPartitions).distinct

val numPreShufflePartitions =
if (distinctNumPartitionsExpected.isEmpty || distinctNumPartitionsExpected.size > 1) {
defaultNumPreShufflePartitions
} else {
distinctNumPartitionsExpected.head
}

// Ensure that the operator's children satisfy their output distribution requirements.
children = children.zip(requiredChildDistributions).map {
case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
Expand All @@ -164,7 +154,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
BroadcastExchangeExec(mode, child)
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(numPreShufflePartitions)
.getOrElse(defaultNumPreShufflePartitions)
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
}

Expand Down Expand Up @@ -303,12 +293,22 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}

/**
* TODO(tejasp) update the doc as per new edits
* Based on the type of join and the properties of the child nodes, adjust following:
*
* [A] Join keys
* -----------------------------
* When the physical operators are created for JOIN, the ordering of join keys is based on order
* in which the join keys appear in the user query. That might not match with the output
* partitioning of the join node's children (thus leading to extra sort / shuffle being
* introduced). This rule will change the ordering of the join keys to match with the
* introduced). This method will change the ordering of the join keys to match with the
* partitioning of the join nodes' children.
*
* [B] Hashing function class and required partitions for children
* --------------------------------------------------------------------
* In case when children of the join node are already shuffled using the same hash function and
* have the same number of partitions, then let the join node use the same values (and not the
* default number of shuffle partitions and hashing function). This saves shuffling of the join
* nodes' children.
*/
private def adjustJoinRequirements(plan: SparkPlan): SparkPlan = {
plan.transformUp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,17 +512,6 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}

test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
// Set hive.exec.stagingdir under the table directory without start with ".".
withSQLConf("hive.exec.stagingdir" -> "./test") {
withTable("test_table") {
sql("CREATE TABLE test_table (key int)")
sql("INSERT OVERWRITE TABLE test_table SELECT 1")
checkAnswer(sql("SELECT * FROM test_table"), Row(1))
}
}
}

test("insert overwrite to dir from hive metastore table") {
withTempDir { dir =>
val path = dir.toURI.getPath
Expand Down

0 comments on commit d37eb8b

Please sign in to comment.