Skip to content

Commit

Permalink
Fix key comparison integer overflow introduced sorting exception
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Sep 24, 2014
1 parent f9d6220 commit fa2a08f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[spark] class ExternalSorter[K, V, C](
override def compare(a: K, b: K): Int = {
val h1 = if (a == null) 0 else a.hashCode()
val h2 = if (b == null) 0 else b.hashCode()
h1 - h2
Integer.compare(h1, h2)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,4 +707,53 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
Some(agg), Some(new HashPartitioner(FEW_PARTITIONS)), None, None)
assertDidNotBypassMergeSort(sorter4)
}

test("sort without breaking sorting contracts") {
val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "sort")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

val testData = Array[String](
"hierarch", // -1732884796
"variants", // -1249574770
"inwork", // -1183663690
"isohel", // -1179291542
"misused" // 1069518484
)
val expected = testData.map(s => (s, 200000))

def createCombiner(i: Int) = ArrayBuffer(i)
def mergeValue(c: ArrayBuffer[Int], i: Int) = c += i
def mergeCombiners(c1: ArrayBuffer[Int], c2: ArrayBuffer[Int]) = c1 ++= c2

val agg = new Aggregator[String, Int, ArrayBuffer[Int]](
createCombiner, mergeValue, mergeCombiners)

// Using wrongHashOrdering to show that integer overflow will lead to wrong sort result.
val wrongHashOrdering = new Ordering[String] {
override def compare(a: String, b: String) = {
val h1 = a.hashCode()
val h2 = b.hashCode()
h1 - h2
}
}
val sorter1 = new ExternalSorter[String, Int, ArrayBuffer[Int]](
None, None, Some(wrongHashOrdering), None)
sorter1.insertAll(expected.iterator)

val unexpectedResults = sorter1.iterator.toArray
assert(unexpectedResults !== expected)

// Using aggregation and external spill to make sure ExternalSorter using
// partitionKeyComparator.
val sorter2 = new ExternalSorter[String, Int, ArrayBuffer[Int]](
Some(agg), None, None, None)
sorter2.insertAll(expected.flatMap { case (k, v) =>
(0 until v).map(_ => (k, 1))
}.iterator)

val expectedResults = sorter2.iterator.map(kv => (kv._1, kv._2.sum)).toArray
assert(expectedResults === expected)
}
}

0 comments on commit fa2a08f

Please sign in to comment.