From 3253ae7f722a996cf0af21608e1a27d5d2a12004 Mon Sep 17 00:00:00 2001 From: wangzhenhua Date: Wed, 2 Nov 2016 11:49:30 -0700 Subject: [PATCH] [SPARK-18111][SQL] Wrong approximate quantile answer when multiple records have the minimum value(for branch 2.0) ## What changes were proposed in this pull request? When multiple records have the minimum value, the answer of `StatFunctions.multipleApproxQuantiles` is wrong. ## How was this patch tested? add a test case Author: wangzhenhua Closes #15732 from wzhfy/percentile2. --- .../spark/sql/execution/stat/StatFunctions.scala | 4 +++- .../org/apache/spark/sql/DataFrameStatSuite.scala | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 7e2ebe856bc89..acc42a00eb267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -337,7 +337,9 @@ object StatFunctions extends Logging { res.prepend(head) // If necessary, add the minimum element: val currHead = currentSamples.head - if (currHead.value < head.value) { + // don't add the minimum element if `currentSamples` has only one element (both `currHead` and + // `head` point to the same element) + if (currHead.value <= head.value && currentSamples.length > 1) { res.prepend(currentSamples.head) } res.toArray diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 73026c749db45..571e2addfa1c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -152,6 +152,19 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { } } + test("approximate quantile, multiple records with the minimum value in a partition") { + val data = Seq(1, 1, 2, 1, 1, 3, 1, 1, 4, 1, 1, 5) + val df = spark.sparkContext.makeRDD(data, 4).toDF("col") + val epsilons = List(0.1, 0.05, 0.001) + val quantile = 0.5 + val expected = 1 + for (epsilon <- epsilons) { + val Array(answer) = df.stat.approxQuantile("col", Array(quantile), epsilon) + val error = 2 * data.length * epsilon + assert(math.abs(answer - expected) < error) + } + } + test("crosstab") { val rng = new Random() val data = Seq.tabulate(25)(i => (rng.nextInt(5), rng.nextInt(10)))