Skip to content

Commit

Permalink
Improve the unit test according to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Sep 28, 2014
1 parent 01911e6 commit 6f3c302
Showing 1 changed file with 30 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -712,60 +712,54 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe

test("sort without breaking sorting contracts") {
val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.memoryFraction", "0.01")
conf.set("spark.shuffle.manager", "sort")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

// Using wrongHashOrdering to show integer overflow introduced exception.
val rand = new Random
val wrongOrdering = new Ordering[Int] {
override def compare(a: Int, b: Int) = a - b
// Using wrongOrdering to show integer overflow introduced exception.
val rand = new Random(100L)
val wrongOrdering = new Ordering[String] {
override def compare(a: String, b: String) = {
val h1 = if (a == null) 0 else a.hashCode()
val h2 = if (b == null) 0 else b.hashCode()
h1 - h2
}
}

val testIntData = new Iterator[Int] {
private var count = 0

def hasNext = count < 1000000
val testData = Array.tabulate(100000) { _ => rand.nextInt().toString }

def next(): Int = {
count += 1; rand.nextInt()
}
} ++ Iterator[Int](Int.MaxValue, Int.MinValue, Int.MaxValue, Int.MinValue)

val sorter1 = new ExternalSorter[Int, Int, Int](
val sorter1 = new ExternalSorter[String, String, String](
None, None, Some(wrongOrdering), None)
val thrown = intercept[IllegalArgumentException] {
sorter1.insertAll(testIntData.map(i => (i, i)))
sorter1.insertAll(testData.iterator.map(i => (i, i)))
sorter1.iterator
}

assert(thrown.getClass() === classOf[IllegalArgumentException])
assert(thrown.getMessage().contains("Comparison method violates its general contract"))
sorter1.stop()

// Using aggregation and external spill to make sure ExternalSorter using
// partitionKeyComparator.
val testData = Array[String](
"hierarch", // -1732884796
"variants", // -1249574770
"inwork", // -1183663690
"isohel", // -1179291542
"misused") // 1069518484

def createCombiner(i: Int) = ArrayBuffer(i)
def mergeValue(c: ArrayBuffer[Int], i: Int) = c += i
def mergeCombiners(c1: ArrayBuffer[Int], c2: ArrayBuffer[Int]) = c1 ++= c2

val agg = new Aggregator[String, Int, ArrayBuffer[Int]](
createCombiner, mergeValue, mergeCombiners)
def createCombiner(i: String) = ArrayBuffer(i)
def mergeValue(c: ArrayBuffer[String], i: String) = c += i
def mergeCombiners(c1: ArrayBuffer[String], c2: ArrayBuffer[String]) = c1 ++= c2

val expected = testData.map(i => (i, 25000))
val agg = new Aggregator[String, String, ArrayBuffer[String]](
createCombiner, mergeValue, mergeCombiners)

val sorter2 = new ExternalSorter[String, Int, ArrayBuffer[Int]](
val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]](
Some(agg), None, None, None)
sorter2.insertAll(expected.flatMap { case (k, v) =>
(0 until v).map(_ => (k, 1))
}.iterator)
sorter2.insertAll(testData.iterator.map(i => (i, i)))

// To validate the hash ordering of key
var minKey = Int.MinValue
sorter2.iterator.foreach { case (k, v) =>
val h = k.hashCode()
assert(h >= minKey)
minKey = h
}

val expectedResults = sorter2.iterator.map(kv => (kv._1, kv._2.sum)).toArray
assert(expectedResults === expected)
sorter2.stop()
}
}

0 comments on commit 6f3c302

Please sign in to comment.