diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 35884139b6be8..e10ab9790d767 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._ private[spark] trait CatalystConf { def caseSensitiveAnalysis: Boolean + def orderByOrdinal: Boolean + /** * Returns the [[Resolver]] for the current configuration, which can be used to determin if two * identifiers are equal. @@ -43,8 +45,14 @@ object EmptyConf extends CatalystConf { override def caseSensitiveAnalysis: Boolean = { throw new UnsupportedOperationException } + override def orderByOrdinal: Boolean = { + throw new UnsupportedOperationException + } } /** A CatalystConf that can be used for local testing. */ -case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf { +case class SimpleCatalystConf( + caseSensitiveAnalysis: Boolean, + orderByOrdinal: Boolean = true) + extends CatalystConf { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e4e934a01541d..333a54ee76c9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -40,7 +41,10 @@ import org.apache.spark.sql.types._ * references. */ object SimpleAnalyzer - extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true)) + extends Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and @@ -618,13 +622,36 @@ class Analyzer( * clause. This rule detects such queries and adds the required attributes to the original * projection, so that they will be available during sorting. Another projection is added to * remove these attributes after sorting. + * + * This rule also resolves the position number in sort references. This support is introduced + * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting. + * - When the sort references are not integer but foldable expressions, ignore them. + * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too. */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case s: Sort if !s.child.resolved => s + // Replace the index with the related attribute for ORDER BY + // which is a 1-base position of the projection list. + case s @ Sort(orders, global, child) + if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => + val newOrders = orders map { + case s @ SortOrder(IntegerIndex(index), direction) => + if (index > 0 && index <= child.output.size) { + SortOrder(child.output(index - 1), direction) + } else { + throw new UnresolvedException(s, + s"Order/sort By position: $index does not exist " + + s"The Select List is indexed from 1 to ${child.output.size}") + } + case o => o + } + Sort(newOrders, global, child) + // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions case sa @ Sort(_, _, child: Aggregate) => sa - case s @ Sort(order, _, child) if !s.resolved && child.resolved => + case s @ Sort(order, _, child) if !s.resolved => try { val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder]) val requiredAttrs = AttributeSet(newOrder).filter(_.resolved) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 62d54df98ecc5..ef74504c661ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types.IntegerType /** * A pattern that matches any number of project or filter operations on top of another relational @@ -202,3 +203,15 @@ object Unions { } } } + +/** + * Extractor for retrieving Int value. + */ +object IntegerIndex { + def unapply(a: Any): Option[Int] = a match { + case Literal(a: Int, IntegerType) => Some(a) + // When resolving ordinal in Sort, negative values are extracted for issuing error messages. + case UnaryMinus(IntegerLiteral(v)) => Some(-v) + case _ => None + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index ef825e606202f..39166c4f8ef73 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { - val caseSensitiveConf = new SimpleCatalystConf(true) - val caseInsensitiveConf = new SimpleCatalystConf(false) + val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index b2613e4909288..9aa685e1e8f55 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - val conf = new SimpleCatalystConf(true) + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) val catalog = new SimpleCatalog(conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index da43751b0a310..47b79fe462457 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { } private val caseInsensitiveAnalyzer = - new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false)) + new Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = false)) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index 27c15e856aba5..a4c8d1c6d2aa8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -25,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ class EliminateSortsSuite extends PlanTest { + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false) + val catalog = new SimpleCatalog(conf) + val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) object Optimize extends RuleExecutor[LogicalPlan] { val batches = @@ -48,8 +53,8 @@ class EliminateSortsSuite extends PlanTest { val x = testRelation val query = x.orderBy(SortOrder(3, Ascending), SortOrder(-1, Ascending)) - val optimized = Optimize.execute(query.analyze) - val correctAnswer = x.analyze + val optimized = Optimize.execute(analyzer.execute(query)) + val correctAnswer = analyzer.execute(x) comparePlans(optimized, correctAnswer) } @@ -58,8 +63,8 @@ class EliminateSortsSuite extends PlanTest { val x = testRelation val query = x.orderBy(SortOrder(3, Ascending), 'a.asc) - val optimized = Optimize.execute(query.analyze) - val correctAnswer = x.orderBy('a.asc).analyze + val optimized = Optimize.execute(analyzer.execute(query)) + val correctAnswer = analyzer.execute(x.orderBy('a.asc)) comparePlans(optimized, correctAnswer) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9e0878a5145a0..3d1d5b120a783 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -435,6 +435,11 @@ object SQLConf { defaultValue = Some(true), doc = "When false, we will treat bucketed table as normal table.") + val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal", + defaultValue = Some(true), + doc = "When true, the ordinal numbers are treated as the position in the select list. " + + "When false, the ordinal numbers in order/sort By clause are ignored.") + // The output committer class used by HadoopFsRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. // @@ -634,6 +639,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS) + override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6716982118fed..b765fd8b66163 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,6 +21,8 @@ import java.math.MathContext import java.sql.Timestamp import org.apache.spark.AccumulatorSuite +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} import org.apache.spark.sql.functions._ @@ -2156,6 +2158,47 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("order by ordinal number") { + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC"), + sql("SELECT * FROM testData2 ORDER BY a DESC")) + // If the position is not an integer, ignore it. + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"), + sql("SELECT * FROM testData2 ORDER BY b ASC")) + + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), + sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC")) + checkAnswer( + sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"), + sql("SELECT * FROM testData2 SORT BY a DESC, b ASC")) + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"), + Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2))) + } + + test("order by ordinal number - negative cases") { + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY 0") + } + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC") + } + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC") + } + } + + test("order by ordinal number with conf spark.sql.orderByOrdinal=false") { + withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") { + // If spark.sql.orderByOrdinal=false, ignore the position number. + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), + sql("SELECT * FROM testData2 ORDER BY b ASC")) + } + } + test("natural join") { val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1") val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")