Skip to content

Commit

Permalink
[SPARK-18189] [SQL] [Followup] Move test from ReplSuite to prevent ja…
Browse files Browse the repository at this point in the history
…va.lang.ClassCircularityError

closes #15774
  • Loading branch information
rxin committed Nov 5, 2016
1 parent 0e3312e commit 0f7c9e8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -473,21 +473,4 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("AssertionError", output)
assertDoesNotContain("Exception", output)
}

test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
val resultValue = 12345
val output = runInterpreter("local",
s"""
|val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
|val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
|val broadcasted = sc.broadcast($resultValue)
|
|// Using broadcast triggers serialization issue in KeyValueGroupedDataset
|val dataset = mapGroups.map(_ => broadcasted.value)
|dataset.collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
}
}
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,18 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
.groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() })
}

test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
val resultValue = 12345
val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
val broadcasted = spark.sparkContext.broadcast(resultValue)

// Using broadcast triggers serialization issue in KeyValueGroupedDataset
val dataset = mapGroups.map(_ => broadcasted.value)

assert(dataset.collect() sameElements Array(resultValue, resultValue))
}

Seq(true, false).foreach { eager =>
def testCheckpointing(testName: String)(f: => Unit): Unit = {
test(s"Dataset.checkpoint() - $testName (eager = $eager)") {
Expand Down

0 comments on commit 0f7c9e8

Please sign in to comment.