From 55a26e937430ff3cfd2642a596edaa4f9277beb8 Mon Sep 17 00:00:00 2001 From: mccheah Date: Tue, 4 Feb 2020 11:25:14 -0800 Subject: [PATCH] [SPARK-28699][SQL] Disable using radix sort for ShuffleExchangeExec in repartition case (#640) ## What changes were proposed in this pull request? Disable using radix sort in ShuffleExchangeExec when we do repartition. In #20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning. But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem. ### Why are the changes needed? Fix the correctness bug caused by repartition after a shuffle. ### Does this PR introduce any user-facing change? Yes, user will get the right result in the case of repartition stage rerun. ## How was this patch tested? Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000. ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() ``` Closes #25491 from xuanyuanking/SPARK-28699-fix. Authored-by: Yuanjian Li Signed-off-by: Dongjoon Hyun Co-authored-by: Li Yuanjian --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index fec05a76b4516..2f4c5734469f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -242,7 +242,7 @@ object ShuffleExchangeExec { } // The comparator for comparing row hashcode, which should always be Integer. val prefixComparator = PrefixComparators.LONG - val canUseRadixSort = SQLConf.get.enableRadixSort + // The prefix computer generates row hashcode as the prefix, so we may decrease the // probability that the prefixes are equal when input rows choose column values from a // limited range. @@ -264,7 +264,9 @@ object ShuffleExchangeExec { prefixComparator, prefixComputer, pageSize, - canUseRadixSort) + // We are comparing binary here, which does not support radix sort. + // See more details in SPARK-28699. + false) sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else {