diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 7e2ebe856bc89..acc42a00eb267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -337,7 +337,9 @@ object StatFunctions extends Logging { res.prepend(head) // If necessary, add the minimum element: val currHead = currentSamples.head - if (currHead.value < head.value) { + // don't add the minimum element if `currentSamples` has only one element (both `currHead` and + // `head` point to the same element) + if (currHead.value <= head.value && currentSamples.length > 1) { res.prepend(currentSamples.head) } res.toArray diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 73026c749db45..571e2addfa1c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -152,6 +152,19 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { } } + test("approximate quantile, multiple records with the minimum value in a partition") { + val data = Seq(1, 1, 2, 1, 1, 3, 1, 1, 4, 1, 1, 5) + val df = spark.sparkContext.makeRDD(data, 4).toDF("col") + val epsilons = List(0.1, 0.05, 0.001) + val quantile = 0.5 + val expected = 1 + for (epsilon <- epsilons) { + val Array(answer) = df.stat.approxQuantile("col", Array(quantile), epsilon) + val error = 2 * data.length * epsilon + assert(math.abs(answer - expected) < error) + } + } + test("crosstab") { val rng = new Random() val data = Seq.tabulate(25)(i => (rng.nextInt(5), rng.nextInt(10)))