From 256814b5674bc187896614abfb8611cba34445fb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 18 Mar 2016 08:54:38 -0700 Subject: [PATCH] address comments. --- .../sql/catalyst/analysis/Analyzer.scala | 17 ++++++++--------- .../sql/catalyst/expressions/SortOrder.scala | 9 +++------ .../sql/catalyst/planning/patterns.scala | 1 + .../sql/catalyst/analysis/AnalysisSuite.scala | 10 ++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 19 +++++++++---------- 7 files changed, 41 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 677db0c26be1f..1df15b459ac62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -623,27 +623,26 @@ class Analyzer( * projection, so that they will be available during sorting. Another projection is added to * remove these attributes after sorting. * - * This rule also resolves the position number in Sort references. This support is introduced + * This rule also resolves the position number in sort references. This support is introduced * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting. + * - When the sort references are not integer but foldable expressions, ignore them. + * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too. */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { // Replace the index with the related attribute for ORDER BY // which is a 1-base position of the projection list. case s @ Sort(orders, global, child) - if child.resolved && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => + if conf.orderByOrdinal && child.resolved && + orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => val newOrders = orders map { case s @ SortOrder(IntegerIndex(index), direction) => - if (index > 0 && index <= child.output.size && conf.orderByOrdinal) { + if (index > 0 && index <= child.output.size) { SortOrder(child.output(index - 1), direction) - } else if (!conf.orderByOrdinal) { - failAnalysis( - "Integer in the Order/Sort By clause is not allowed " + - "when spark.sql.orderByOrdinal is set to false") } else { throw new UnresolvedException(s, - s"""Order/Sort By position: $index does not exist \n - |The Select List is indexed from 1 to ${child.output.size}""".stripMargin) + "Order/sort By position: $index does not exist " + + "The Select List is indexed from 1 to ${child.output.size}") } case o => o } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index f98348647c508..b739361937b6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.types._ import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.BinaryPrefixComparator import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator @@ -48,12 +47,10 @@ case class SortOrder(child: Expression, direction: SortDirection) override def foldable: Boolean = false override def checkInputDataTypes(): TypeCheckResult = { - if (!RowOrdering.isOrderable(dataType)) { - TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}") - } else if (child.references.isEmpty && IntegerIndex.unapply(child).isEmpty) { - TypeCheckResult.TypeCheckFailure(s"sort position must be integer literals") - } else { + if (RowOrdering.isOrderable(dataType)) { TypeCheckResult.TypeCheckSuccess + } else { + TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index aec25beb3e7fb..ef74504c661ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -210,6 +210,7 @@ object Unions { object IntegerIndex { def unapply(a: Any): Option[Int] = a match { case Literal(a: Int, IntegerType) => Some(a) + // When resolving ordinal in Sort, negative values are extracted for issuing error messages. case UnaryMinus(IntegerLiteral(v)) => Some(-v) case _ => None } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index b16927bd7525e..8b568b6dd6acd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -201,6 +201,16 @@ class AnalysisSuite extends AnalysisTest { checkAnalysis(plan, expected) } + test("pull out nondeterministic expressions from Sort") { + val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation) + val projected = Alias(Rand(33), "_nondeterministic")() + val expected = + Project(testRelation.output, + Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false, + Project(testRelation.output :+ projected, testRelation))) + checkAnalysis(plan, expected) + } + test("SPARK-9634: cleanup unnecessary Aliases in LogicalPlan") { val a = testRelation.output.head var plan = testRelation.select(((a + 1).as("a+1") + 2).as("col")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index afa9466ecab50..037ceea2faea3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -437,8 +437,8 @@ object SQLConf { val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal", defaultValue = Some(true), - doc = "When true, the ordinal number is treated as the position in the select list. " + - "When false, the ordinal number in Order/Sort By clause is not allowed.") + doc = "When true, the ordinal numbers are treated as the position in the select list. " + + "When false, the ordinal numbers in order/sort By clause are ignored.") // The output committer class used by HadoopFsRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 8c24c92b901c0..199e138abfdc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -952,6 +952,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df1.showString(5) == df1.showString(5)) } + test("SPARK-8609: local DataFrame with random columns should return same value after sort") { + checkAnswer(testData.sort(rand(33)), testData.sort(rand(33))) + + // We will reuse the same Expression object for LocalRelation. + val df = (1 to 10).map(Tuple1.apply).toDF() + checkAnswer(df.sort(rand(33)), df.sort(rand(33))) + } + test("SPARK-9083: sort with non-deterministic expressions") { import org.apache.spark.util.random.XORShiftRandom diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index d39e9661ea0c0..8bc127438f02c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2162,6 +2162,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 DESC"), Seq(Row(3, 1), Row(3, 2), Row(2, 1), Row(2, 2), Row(1, 1), Row(1, 2))) + // If the position is not an integer, ignore it. + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"), + Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(1, 2), Row(2, 2), Row(3, 2))) + checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC")) @@ -2174,11 +2179,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("order by ordinal number - negative cases") { - val e = intercept[AnalysisException]( - sql("SELECT * FROM testData2 ORDER BY 1 + 1 DESC, b ASC").collect()) - assert(e.getMessage contains - "cannot resolve '(1 + 1) DESC' due to data type mismatch: " + - "sort position must be integer literals") intercept[UnresolvedException[SortOrder]] { sql("SELECT * FROM testData2 ORDER BY 0").collect() } @@ -2192,11 +2192,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("order by ordinal number with conf spark.sql.orderByOrdinal=false") { withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") { - val e = intercept[AnalysisException]( - sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC").collect()) - assert(e.getMessage contains - "Integer in the Order/Sort By clause is not allowed " + - "when spark.sql.orderByOrdinal is set to false") + // If spark.sql.orderByOrdinal=false, ignore the position number. + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), + Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(1, 2), Row(2, 2), Row(3, 2))) } }