Skip to content

Commit

Permalink
Refactor test to use SQLTestUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 31, 2015
1 parent 2963857 commit cd8269b
Showing 1 changed file with 44 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.{SQLTestUtils, TestSQLContext}
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.test.TestSQLContext.planner._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLConf, execution}
import org.apache.spark.sql.{SQLContext, Row, SQLConf, execution}


class PlannerSuite extends SparkFunSuite {
class PlannerSuite extends SparkFunSuite with SQLTestUtils {

override def sqlContext: SQLContext = TestSQLContext

private def testPartialAggregationPlan(query: LogicalPlan): Unit = {
val plannedOption = HashAggregation(query).headOption.orElse(Aggregation(query).headOption)
val planned =
Expand Down Expand Up @@ -159,40 +163,43 @@ class PlannerSuite extends SparkFunSuite {
}

test("PartitioningCollection") {
// First, we disable broadcast join.
val origThreshold = conf.autoBroadcastJoinThreshold
setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 0)

testData.registerTempTable("normal")
testData.limit(10).registerTempTable("small")
testData.limit(3).registerTempTable("tiny")
var numExchanges = sql(
"""
|SELECT *
|FROM
| normal JOIN small ON (normal.key = small.key)
| JOIN tiny ON (small.key = tiny.key)
""".stripMargin).queryExecution.executedPlan.collect {
case exchange: Exchange => exchange
}.length

assert(numExchanges === 3)

numExchanges = sql(
"""
|SELECT *
|FROM
| normal JOIN small ON (normal.key = small.key)
| JOIN tiny ON (normal.key = tiny.key)
""".stripMargin).queryExecution.executedPlan.collect {
case exchange: Exchange => exchange
}.length

assert(numExchanges === 3)

setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
dropTempTable("normal")
dropTempTable("small")
dropTempTable("tiny")
withTempTable("normal", "small", "tiny") {
testData.registerTempTable("normal")
testData.limit(10).registerTempTable("small")
testData.limit(3).registerTempTable("tiny")

// Disable broadcast join
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
{
val numExchanges = sql(
"""
|SELECT *
|FROM
| normal JOIN small ON (normal.key = small.key)
| JOIN tiny ON (small.key = tiny.key)
""".stripMargin
).queryExecution.executedPlan.collect {
case exchange: Exchange => exchange
}.length
assert(numExchanges === 3)
}

{
// This second query joins on different keys:
val numExchanges = sql(
"""
|SELECT *
|FROM
| normal JOIN small ON (normal.key = small.key)
| JOIN tiny ON (normal.key = tiny.key)
""".stripMargin
).queryExecution.executedPlan.collect {
case exchange: Exchange => exchange
}.length
assert(numExchanges === 3)
}

}
}
}
}

0 comments on commit cd8269b

Please sign in to comment.