From aee1023abd8e7d0e0d0cdc3c21c4d500522926c2 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 17 Mar 2016 23:15:54 -0700 Subject: [PATCH 1/7] Order by ordinal number. --- .../spark/sql/catalyst/CatalystConf.scala | 10 ++++- .../sql/catalyst/analysis/Analyzer.scala | 31 ++++++++++++- .../sql/catalyst/expressions/SortOrder.scala | 9 ++-- .../sql/catalyst/planning/patterns.scala | 12 +++++ .../sql/catalyst/analysis/AnalysisSuite.scala | 10 ----- .../sql/catalyst/analysis/AnalysisTest.scala | 4 +- .../analysis/DecimalPrecisionSuite.scala | 2 +- .../BooleanSimplificationSuite.scala | 5 ++- .../apache/spark/sql/internal/SQLConf.scala | 7 +++ .../org/apache/spark/sql/DataFrameSuite.scala | 8 ---- .../org/apache/spark/sql/SQLQuerySuite.scala | 44 +++++++++++++++++++ 11 files changed, 115 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 35884139b6be8..e10ab9790d767 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._ private[spark] trait CatalystConf { def caseSensitiveAnalysis: Boolean + def orderByOrdinal: Boolean + /** * Returns the [[Resolver]] for the current configuration, which can be used to determin if two * identifiers are equal. @@ -43,8 +45,14 @@ object EmptyConf extends CatalystConf { override def caseSensitiveAnalysis: Boolean = { throw new UnsupportedOperationException } + override def orderByOrdinal: Boolean = { + throw new UnsupportedOperationException + } } /** A CatalystConf that can be used for local testing. */ -case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf { +case class SimpleCatalystConf( + caseSensitiveAnalysis: Boolean, + orderByOrdinal: Boolean = true) + extends CatalystConf { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 53ea3cfef6786..6594b69d43698 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -40,7 +41,10 @@ import org.apache.spark.sql.types._ * references. */ object SimpleAnalyzer - extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true)) + extends Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and @@ -618,9 +622,34 @@ class Analyzer( * clause. This rule detects such queries and adds the required attributes to the original * projection, so that they will be available during sorting. Another projection is added to * remove these attributes after sorting. + * + * This rule also resolves the position number in Sort references. This support is introduced + * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting. */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + // Replace the index with the related attribute for ORDER BY + // which is a 1-base position of the projection list. + case s @ Sort(orders, global, child) + if child.resolved && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => + val newOrders = orders map { + case s @ SortOrder(IntegerIndex(index), direction) => + if (index > 0 && index <= child.output.size && conf.orderByOrdinal) { + SortOrder(child.output(index - 1), direction) + } else if (!conf.orderByOrdinal) { + failAnalysis( + "Integer in the Order/Sort By clause is not allowed " + + "when spark.sql.orderByOrdinal is set to false") + } + else { + throw new UnresolvedException(s, + s"""Order/Sort By position: $index does not exist \n + |The Select List is indexed from 1 to ${child.output.size}""".stripMargin) + } + case o => o + } + Sort(newOrders, global, child) + // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions case sa @ Sort(_, _, child: Aggregate) => sa diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index b739361937b6b..f98348647c508 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.types._ import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.BinaryPrefixComparator import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator @@ -47,10 +48,12 @@ case class SortOrder(child: Expression, direction: SortDirection) override def foldable: Boolean = false override def checkInputDataTypes(): TypeCheckResult = { - if (RowOrdering.isOrderable(dataType)) { - TypeCheckResult.TypeCheckSuccess - } else { + if (!RowOrdering.isOrderable(dataType)) { TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}") + } else if (child.references.isEmpty && IntegerIndex.unapply(child).isEmpty) { + TypeCheckResult.TypeCheckFailure(s"sort position must be integer literals") + } else { + TypeCheckResult.TypeCheckSuccess } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 62d54df98ecc5..aec25beb3e7fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types.IntegerType /** * A pattern that matches any number of project or filter operations on top of another relational @@ -202,3 +203,14 @@ object Unions { } } } + +/** + * Extractor for retrieving Int value. + */ +object IntegerIndex { + def unapply(a: Any): Option[Int] = a match { + case Literal(a: Int, IntegerType) => Some(a) + case UnaryMinus(IntegerLiteral(v)) => Some(-v) + case _ => None + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 8b568b6dd6acd..b16927bd7525e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -201,16 +201,6 @@ class AnalysisSuite extends AnalysisTest { checkAnalysis(plan, expected) } - test("pull out nondeterministic expressions from Sort") { - val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation) - val projected = Alias(Rand(33), "_nondeterministic")() - val expected = - Project(testRelation.output, - Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false, - Project(testRelation.output :+ projected, testRelation))) - checkAnalysis(plan, expected) - } - test("SPARK-9634: cleanup unnecessary Aliases in LogicalPlan") { val a = testRelation.output.head var plan = testRelation.select(((a + 1).as("a+1") + 2).as("col")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index ef825e606202f..39166c4f8ef73 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { - val caseSensitiveConf = new SimpleCatalystConf(true) - val caseInsensitiveConf = new SimpleCatalystConf(false) + val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index b2613e4909288..9aa685e1e8f55 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - val conf = new SimpleCatalystConf(true) + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) val catalog = new SimpleCatalog(conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index da43751b0a310..47b79fe462457 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { } private val caseInsensitiveAnalyzer = - new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false)) + new Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = false)) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9aabe2d0abe1c..afa9466ecab50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -435,6 +435,11 @@ object SQLConf { defaultValue = Some(true), doc = "When false, we will treat bucketed table as normal table") + val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal", + defaultValue = Some(true), + doc = "When true, the ordinal number is treated as the position in the select list. " + + "When false, the ordinal number in Order/Sort By clause is not allowed.") + // The output committer class used by HadoopFsRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. // @@ -631,6 +636,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS) + override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 199e138abfdc2..8c24c92b901c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -952,14 +952,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df1.showString(5) == df1.showString(5)) } - test("SPARK-8609: local DataFrame with random columns should return same value after sort") { - checkAnswer(testData.sort(rand(33)), testData.sort(rand(33))) - - // We will reuse the same Expression object for LocalRelation. - val df = (1 to 10).map(Tuple1.apply).toDF() - checkAnswer(df.sort(rand(33)), df.sort(rand(33))) - } - test("SPARK-9083: sort with non-deterministic expressions") { import org.apache.spark.util.random.XORShiftRandom diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3efe984c09eb8..3baee8dfaa7ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,6 +21,8 @@ import java.math.MathContext import java.sql.Timestamp import org.apache.spark.AccumulatorSuite +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} import org.apache.spark.sql.functions._ @@ -2156,6 +2158,48 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("order by ordinal number") { + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC"), + Seq(Row(3, 1), Row(3, 2), Row(2, 1), Row(2, 2), Row(1, 1), Row(1, 2))) + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), + sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC")) + checkAnswer( + sql("SELECT * FROM testData2 SORT BY 1 DESC, 2 ASC"), + sql("SELECT * FROM testData2 SORT BY a DESC, b ASC")) + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"), + Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2))) + } + + test("order by ordinal number - negative cases") { + val e = intercept[AnalysisException]( + sql("SELECT * FROM testData2 ORDER BY 1 + 1 DESC, b ASC").collect()) + assert(e.getMessage contains + "cannot resolve '(1 + 1) DESC' due to data type mismatch: " + + "sort position must be integer literals") + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY 0").collect() + } + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC").collect() + } + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC").collect() + } + } + + test("order by ordinal number with conf spark.sql.orderByOrdinal=false") { + withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") { + val e = intercept[AnalysisException]( + sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC").collect()) + assert(e.getMessage contains + "Integer in the Order/Sort By clause is not allowed " + + "when spark.sql.orderByOrdinal is set to false") + } + } + test("natural join") { val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1") val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2") From 240aeb3c1d099ebcc4321cbf0987aeccc8e1cbd6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 17 Mar 2016 23:21:03 -0700 Subject: [PATCH 2/7] style fix. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7fa5a9138d446..677db0c26be1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -640,8 +640,7 @@ class Analyzer( failAnalysis( "Integer in the Order/Sort By clause is not allowed " + "when spark.sql.orderByOrdinal is set to false") - } - else { + } else { throw new UnresolvedException(s, s"""Order/Sort By position: $index does not exist \n |The Select List is indexed from 1 to ${child.output.size}""".stripMargin) From e0dce39806fe4c9e9bec9f85727456820a990b99 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 17 Mar 2016 23:42:26 -0700 Subject: [PATCH 3/7] improve the test cases. --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5428c6e90188c..d39e9661ea0c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2166,7 +2166,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC")) checkAnswer( - sql("SELECT * FROM testData2 SORT BY 1 DESC, 2 ASC"), + sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"), sql("SELECT * FROM testData2 SORT BY a DESC, b ASC")) checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"), From 256814b5674bc187896614abfb8611cba34445fb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 18 Mar 2016 08:54:38 -0700 Subject: [PATCH 4/7] address comments. --- .../sql/catalyst/analysis/Analyzer.scala | 17 ++++++++--------- .../sql/catalyst/expressions/SortOrder.scala | 9 +++------ .../sql/catalyst/planning/patterns.scala | 1 + .../sql/catalyst/analysis/AnalysisSuite.scala | 10 ++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 19 +++++++++---------- 7 files changed, 41 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 677db0c26be1f..1df15b459ac62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -623,27 +623,26 @@ class Analyzer( * projection, so that they will be available during sorting. Another projection is added to * remove these attributes after sorting. * - * This rule also resolves the position number in Sort references. This support is introduced + * This rule also resolves the position number in sort references. This support is introduced * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting. + * - When the sort references are not integer but foldable expressions, ignore them. + * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too. */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { // Replace the index with the related attribute for ORDER BY // which is a 1-base position of the projection list. case s @ Sort(orders, global, child) - if child.resolved && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => + if conf.orderByOrdinal && child.resolved && + orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => val newOrders = orders map { case s @ SortOrder(IntegerIndex(index), direction) => - if (index > 0 && index <= child.output.size && conf.orderByOrdinal) { + if (index > 0 && index <= child.output.size) { SortOrder(child.output(index - 1), direction) - } else if (!conf.orderByOrdinal) { - failAnalysis( - "Integer in the Order/Sort By clause is not allowed " + - "when spark.sql.orderByOrdinal is set to false") } else { throw new UnresolvedException(s, - s"""Order/Sort By position: $index does not exist \n - |The Select List is indexed from 1 to ${child.output.size}""".stripMargin) + "Order/sort By position: $index does not exist " + + "The Select List is indexed from 1 to ${child.output.size}") } case o => o } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index f98348647c508..b739361937b6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.types._ import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.BinaryPrefixComparator import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator @@ -48,12 +47,10 @@ case class SortOrder(child: Expression, direction: SortDirection) override def foldable: Boolean = false override def checkInputDataTypes(): TypeCheckResult = { - if (!RowOrdering.isOrderable(dataType)) { - TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}") - } else if (child.references.isEmpty && IntegerIndex.unapply(child).isEmpty) { - TypeCheckResult.TypeCheckFailure(s"sort position must be integer literals") - } else { + if (RowOrdering.isOrderable(dataType)) { TypeCheckResult.TypeCheckSuccess + } else { + TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index aec25beb3e7fb..ef74504c661ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -210,6 +210,7 @@ object Unions { object IntegerIndex { def unapply(a: Any): Option[Int] = a match { case Literal(a: Int, IntegerType) => Some(a) + // When resolving ordinal in Sort, negative values are extracted for issuing error messages. case UnaryMinus(IntegerLiteral(v)) => Some(-v) case _ => None } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index b16927bd7525e..8b568b6dd6acd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -201,6 +201,16 @@ class AnalysisSuite extends AnalysisTest { checkAnalysis(plan, expected) } + test("pull out nondeterministic expressions from Sort") { + val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation) + val projected = Alias(Rand(33), "_nondeterministic")() + val expected = + Project(testRelation.output, + Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false, + Project(testRelation.output :+ projected, testRelation))) + checkAnalysis(plan, expected) + } + test("SPARK-9634: cleanup unnecessary Aliases in LogicalPlan") { val a = testRelation.output.head var plan = testRelation.select(((a + 1).as("a+1") + 2).as("col")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index afa9466ecab50..037ceea2faea3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -437,8 +437,8 @@ object SQLConf { val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal", defaultValue = Some(true), - doc = "When true, the ordinal number is treated as the position in the select list. " + - "When false, the ordinal number in Order/Sort By clause is not allowed.") + doc = "When true, the ordinal numbers are treated as the position in the select list. " + + "When false, the ordinal numbers in order/sort By clause are ignored.") // The output committer class used by HadoopFsRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 8c24c92b901c0..199e138abfdc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -952,6 +952,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df1.showString(5) == df1.showString(5)) } + test("SPARK-8609: local DataFrame with random columns should return same value after sort") { + checkAnswer(testData.sort(rand(33)), testData.sort(rand(33))) + + // We will reuse the same Expression object for LocalRelation. + val df = (1 to 10).map(Tuple1.apply).toDF() + checkAnswer(df.sort(rand(33)), df.sort(rand(33))) + } + test("SPARK-9083: sort with non-deterministic expressions") { import org.apache.spark.util.random.XORShiftRandom diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index d39e9661ea0c0..8bc127438f02c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2162,6 +2162,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 DESC"), Seq(Row(3, 1), Row(3, 2), Row(2, 1), Row(2, 2), Row(1, 1), Row(1, 2))) + // If the position is not an integer, ignore it. + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"), + Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(1, 2), Row(2, 2), Row(3, 2))) + checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC")) @@ -2174,11 +2179,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("order by ordinal number - negative cases") { - val e = intercept[AnalysisException]( - sql("SELECT * FROM testData2 ORDER BY 1 + 1 DESC, b ASC").collect()) - assert(e.getMessage contains - "cannot resolve '(1 + 1) DESC' due to data type mismatch: " + - "sort position must be integer literals") intercept[UnresolvedException[SortOrder]] { sql("SELECT * FROM testData2 ORDER BY 0").collect() } @@ -2192,11 +2192,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("order by ordinal number with conf spark.sql.orderByOrdinal=false") { withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") { - val e = intercept[AnalysisException]( - sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC").collect()) - assert(e.getMessage contains - "Integer in the Order/Sort By clause is not allowed " + - "when spark.sql.orderByOrdinal is set to false") + // If spark.sql.orderByOrdinal=false, ignore the position number. + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), + Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(1, 2), Row(2, 2), Row(3, 2))) } } From b04529bcd3ed9055b85ebf88122adf4ec097357d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 18 Mar 2016 22:43:28 -0700 Subject: [PATCH 5/7] address comments. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1df15b459ac62..a2cc226787e32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -630,11 +630,11 @@ class Analyzer( */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case s: Sort if !s.child.resolved => s // Replace the index with the related attribute for ORDER BY // which is a 1-base position of the projection list. case s @ Sort(orders, global, child) - if conf.orderByOrdinal && child.resolved && - orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => + if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => val newOrders = orders map { case s @ SortOrder(IntegerIndex(index), direction) => if (index > 0 && index <= child.output.size) { @@ -651,7 +651,7 @@ class Analyzer( // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions case sa @ Sort(_, _, child: Aggregate) => sa - case s @ Sort(order, _, child) if !s.resolved && child.resolved => + case s @ Sort(order, _, child) if !s.resolved => try { val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder]) val requiredAttrs = AttributeSet(newOrder).filter(_.resolved) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8bc127438f02c..4885ed42da05e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2161,11 +2161,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("order by ordinal number") { checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 DESC"), - Seq(Row(3, 1), Row(3, 2), Row(2, 1), Row(2, 2), Row(1, 1), Row(1, 2))) + sql("SELECT * FROM testData2 ORDER BY a DESC")) // If the position is not an integer, ignore it. checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"), - Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(1, 2), Row(2, 2), Row(3, 2))) + sql("SELECT * FROM testData2 ORDER BY b ASC")) checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), From d52e1c7bfcae03b12430426e5d6b9edd87d4a2a6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 20 Mar 2016 22:16:24 -0700 Subject: [PATCH 6/7] address comments. --- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4885ed42da05e..b765fd8b66163 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2180,13 +2180,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("order by ordinal number - negative cases") { intercept[UnresolvedException[SortOrder]] { - sql("SELECT * FROM testData2 ORDER BY 0").collect() + sql("SELECT * FROM testData2 ORDER BY 0") } intercept[UnresolvedException[SortOrder]] { - sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC").collect() + sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC") } intercept[UnresolvedException[SortOrder]] { - sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC").collect() + sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC") } } @@ -2195,7 +2195,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { // If spark.sql.orderByOrdinal=false, ignore the position number. checkAnswer( sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), - Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(1, 2), Row(2, 2), Row(3, 2))) + sql("SELECT * FROM testData2 ORDER BY b ASC")) } } From 46e3f69103efd036a55775b6e5e3f998567df3c0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 21 Mar 2016 00:24:24 -0700 Subject: [PATCH 7/7] address comments. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../catalyst/optimizer/EliminateSortsSuite.scala | 13 +++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a2cc226787e32..333a54ee76c9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -641,8 +641,8 @@ class Analyzer( SortOrder(child.output(index - 1), direction) } else { throw new UnresolvedException(s, - "Order/sort By position: $index does not exist " + - "The Select List is indexed from 1 to ${child.output.size}") + s"Order/sort By position: $index does not exist " + + s"The Select List is indexed from 1 to ${child.output.size}") } case o => o } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index 27c15e856aba5..a4c8d1c6d2aa8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -25,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ class EliminateSortsSuite extends PlanTest { + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false) + val catalog = new SimpleCatalog(conf) + val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) object Optimize extends RuleExecutor[LogicalPlan] { val batches = @@ -48,8 +53,8 @@ class EliminateSortsSuite extends PlanTest { val x = testRelation val query = x.orderBy(SortOrder(3, Ascending), SortOrder(-1, Ascending)) - val optimized = Optimize.execute(query.analyze) - val correctAnswer = x.analyze + val optimized = Optimize.execute(analyzer.execute(query)) + val correctAnswer = analyzer.execute(x) comparePlans(optimized, correctAnswer) } @@ -58,8 +63,8 @@ class EliminateSortsSuite extends PlanTest { val x = testRelation val query = x.orderBy(SortOrder(3, Ascending), 'a.asc) - val optimized = Optimize.execute(query.analyze) - val correctAnswer = x.orderBy('a.asc).analyze + val optimized = Optimize.execute(analyzer.execute(query)) + val correctAnswer = analyzer.execute(x.orderBy('a.asc)) comparePlans(optimized, correctAnswer) }