Skip to content

Commit

Permalink
[HOTFIX] Synchronize on SQLContext.settings in tests.
Browse files Browse the repository at this point in the history
Let's see if this fixes the ongoing series of test failures in a master build machine (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT-pre-YARN/SPARK_HADOOP_VERSION=1.0.4,label=centos/81/).

pwendell marmbrus

Author: Zongheng Yang <[email protected]>

Closes apache#1277 from concretevitamin/test-fix and squashes the following commits:

28c88bd [Zongheng Yang] Synchronize on SQLContext.settings in tests.
  • Loading branch information
concretevitamin authored and marmbrus committed Jul 4, 2014
1 parent 731f683 commit d4c30cd
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 83 deletions.
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ trait SQLConf {
/** ********************** SQLConf functionality methods ************ */

@transient
private val settings = java.util.Collections.synchronizedMap(
protected[sql] val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())

def set(props: Properties): Unit = {
Expand Down
40 changes: 21 additions & 19 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,27 @@ class JoinSuite extends QueryTest {
test("plans broadcast hash join, given hints") {

def mkTest(buildSide: BuildSide, leftTable: String, rightTable: String) = {
TestSQLContext.set("spark.sql.join.broadcastTables",
s"${if (buildSide == BuildRight) rightTable else leftTable}")
val rdd = sql(s"""SELECT * FROM $leftTable JOIN $rightTable ON key = a""")
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
val physical = rdd.queryExecution.sparkPlan
val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == buildSide => j }

assert(bhj.size === 1, "planner does not pick up hint to generate broadcast hash join")
checkAnswer(
rdd,
Seq(
(1, "1", 1, 1),
(1, "1", 1, 2),
(2, "2", 2, 1),
(2, "2", 2, 2),
(3, "3", 3, 1),
(3, "3", 3, 2)
))
TestSQLContext.settings.synchronized {
TestSQLContext.set("spark.sql.join.broadcastTables",
s"${if (buildSide == BuildRight) rightTable else leftTable}")
val rdd = sql( s"""SELECT * FROM $leftTable JOIN $rightTable ON key = a""")
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
val physical = rdd.queryExecution.sparkPlan
val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == buildSide => j}

assert(bhj.size === 1, "planner does not pick up hint to generate broadcast hash join")
checkAnswer(
rdd,
Seq(
(1, "1", 1, 1),
(1, "1", 1, 2),
(2, "2", 2, 1),
(2, "2", 2, 2),
(3, "3", 3, 1),
(3, "3", 3, 2)
))
}
}

mkTest(BuildRight, "testData", "testData2")
Expand Down
64 changes: 34 additions & 30 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,50 @@ class SQLConfSuite extends QueryTest {
val testVal = "test.val.0"

test("programmatic ways of basic setting and getting") {
clear()
assert(getOption(testKey).isEmpty)
assert(getAll.toSet === Set())
TestSQLContext.settings.synchronized {
clear()
assert(getOption(testKey).isEmpty)
assert(getAll.toSet === Set())

set(testKey, testVal)
assert(get(testKey) == testVal)
assert(get(testKey, testVal + "_") == testVal)
assert(getOption(testKey) == Some(testVal))
assert(contains(testKey))
set(testKey, testVal)
assert(get(testKey) == testVal)
assert(get(testKey, testVal + "_") == testVal)
assert(getOption(testKey) == Some(testVal))
assert(contains(testKey))

// Tests SQLConf as accessed from a SQLContext is mutable after
// the latter is initialized, unlike SparkConf inside a SparkContext.
assert(TestSQLContext.get(testKey) == testVal)
assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
assert(TestSQLContext.getOption(testKey) == Some(testVal))
assert(TestSQLContext.contains(testKey))
// Tests SQLConf as accessed from a SQLContext is mutable after
// the latter is initialized, unlike SparkConf inside a SparkContext.
assert(TestSQLContext.get(testKey) == testVal)
assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
assert(TestSQLContext.getOption(testKey) == Some(testVal))
assert(TestSQLContext.contains(testKey))

clear()
clear()
}
}

test("parse SQL set commands") {
clear()
sql(s"set $testKey=$testVal")
assert(get(testKey, testVal + "_") == testVal)
assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
TestSQLContext.settings.synchronized {
clear()
sql(s"set $testKey=$testVal")
assert(get(testKey, testVal + "_") == testVal)
assert(TestSQLContext.get(testKey, testVal + "_") == testVal)

sql("set mapred.reduce.tasks=20")
assert(get("mapred.reduce.tasks", "0") == "20")
sql("set mapred.reduce.tasks = 40")
assert(get("mapred.reduce.tasks", "0") == "40")
sql("set mapred.reduce.tasks=20")
assert(get("mapred.reduce.tasks", "0") == "20")
sql("set mapred.reduce.tasks = 40")
assert(get("mapred.reduce.tasks", "0") == "40")

val key = "spark.sql.key"
val vs = "val0,val_1,val2.3,my_table"
sql(s"set $key=$vs")
assert(get(key, "0") == vs)
val key = "spark.sql.key"
val vs = "val0,val_1,val2.3,my_table"
sql(s"set $key=$vs")
assert(get(key, "0") == vs)

sql(s"set $key=")
assert(get(key, "0") == "")
sql(s"set $key=")
assert(get(key, "0") == "")

clear()
clear()
}
}

}
68 changes: 35 additions & 33 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,38 +372,40 @@ class SQLQuerySuite extends QueryTest {
}

test("SET commands semantics using sql()") {
clear()
val testKey = "test.key.0"
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"

// "set" itself returns all config variables currently specified in SQLConf.
assert(sql("SET").collect().size == 0)

// "set key=val"
sql(s"SET $testKey=$testVal")
checkAnswer(
sql("SET"),
Seq(Seq(testKey, testVal))
)

sql(s"SET ${testKey + testKey}=${testVal + testVal}")
checkAnswer(
sql("set"),
Seq(
Seq(testKey, testVal),
Seq(testKey + testKey, testVal + testVal))
)

// "set key"
checkAnswer(
sql(s"SET $testKey"),
Seq(Seq(testKey, testVal))
)
checkAnswer(
sql(s"SET $nonexistentKey"),
Seq(Seq(nonexistentKey, "<undefined>"))
)
clear()
TestSQLContext.settings.synchronized {
clear()
val testKey = "test.key.0"
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"

// "set" itself returns all config variables currently specified in SQLConf.
assert(sql("SET").collect().size == 0)

// "set key=val"
sql(s"SET $testKey=$testVal")
checkAnswer(
sql("SET"),
Seq(Seq(testKey, testVal))
)

sql(s"SET ${testKey + testKey}=${testVal + testVal}")
checkAnswer(
sql("set"),
Seq(
Seq(testKey, testVal),
Seq(testKey + testKey, testVal + testVal))
)

// "set key"
checkAnswer(
sql(s"SET $testKey"),
Seq(Seq(testKey, testVal))
)
checkAnswer(
sql(s"SET $nonexistentKey"),
Seq(Seq(nonexistentKey, "<undefined>"))
)
clear()
}
}
}

0 comments on commit d4c30cd

Please sign in to comment.