From 94fd641f08aed80619ee757fb65b9ca18ad5f116 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 21 Mar 2016 18:08:41 +0800 Subject: [PATCH] [SPARK-12789][SQL] Support Order By Ordinal in SQL #### What changes were proposed in this pull request? This PR is to support order by position in SQL, e.g. ```SQL select c1, c2, c3 from tbl order by 1 desc, 3 ``` should be equivalent to ```SQL select c1, c2, c3 from tbl order by c1 desc, c3 asc ``` This is controlled by config option `spark.sql.orderByOrdinal`. - When true, the ordinal numbers are treated as the position in the select list. - When false, the ordinal number in order/sort By clause are ignored. - Only convert integer literals (not foldable expressions). If found foldable expressions, ignore them - This also works with select *. **Question**: Do we still need sort by columns that contain zero reference? In this case, it will have no impact on the sorting results. IMO, we should not allow users do it. rxin cloud-fan marmbrus yhuai hvanhovell -- Update: In these cases, they are ignored in this case. **Note**: This PR is taken from https://github.com/apache/spark/pull/10731. When merging this PR, please give the credit to zhichao-li Also cc all the people who are involved in the previous discussion: adrian-wang chenghao-intel tejasapatil #### How was this patch tested? Added a few test cases for both positive and negative test cases. Author: gatorsmile Closes #11815 from gatorsmile/orderByPosition. --- .../spark/sql/catalyst/CatalystConf.scala | 10 ++++- .../sql/catalyst/analysis/Analyzer.scala | 31 ++++++++++++- .../sql/catalyst/planning/patterns.scala | 13 ++++++ .../sql/catalyst/analysis/AnalysisTest.scala | 4 +- .../analysis/DecimalPrecisionSuite.scala | 2 +- .../BooleanSimplificationSuite.scala | 5 ++- .../optimizer/EliminateSortsSuite.scala | 13 ++++-- .../apache/spark/sql/internal/SQLConf.scala | 7 +++ .../org/apache/spark/sql/SQLQuerySuite.scala | 43 +++++++++++++++++++ 9 files changed, 117 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 35884139b6be8..e10ab9790d767 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._ private[spark] trait CatalystConf { def caseSensitiveAnalysis: Boolean + def orderByOrdinal: Boolean + /** * Returns the [[Resolver]] for the current configuration, which can be used to determin if two * identifiers are equal. @@ -43,8 +45,14 @@ object EmptyConf extends CatalystConf { override def caseSensitiveAnalysis: Boolean = { throw new UnsupportedOperationException } + override def orderByOrdinal: Boolean = { + throw new UnsupportedOperationException + } } /** A CatalystConf that can be used for local testing. */ -case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf { +case class SimpleCatalystConf( + caseSensitiveAnalysis: Boolean, + orderByOrdinal: Boolean = true) + extends CatalystConf { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e4e934a01541d..333a54ee76c9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -40,7 +41,10 @@ import org.apache.spark.sql.types._ * references. */ object SimpleAnalyzer - extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true)) + extends Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and @@ -618,13 +622,36 @@ class Analyzer( * clause. This rule detects such queries and adds the required attributes to the original * projection, so that they will be available during sorting. Another projection is added to * remove these attributes after sorting. + * + * This rule also resolves the position number in sort references. This support is introduced + * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting. + * - When the sort references are not integer but foldable expressions, ignore them. + * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too. */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case s: Sort if !s.child.resolved => s + // Replace the index with the related attribute for ORDER BY + // which is a 1-base position of the projection list. + case s @ Sort(orders, global, child) + if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => + val newOrders = orders map { + case s @ SortOrder(IntegerIndex(index), direction) => + if (index > 0 && index <= child.output.size) { + SortOrder(child.output(index - 1), direction) + } else { + throw new UnresolvedException(s, + s"Order/sort By position: $index does not exist " + + s"The Select List is indexed from 1 to ${child.output.size}") + } + case o => o + } + Sort(newOrders, global, child) + // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions case sa @ Sort(_, _, child: Aggregate) => sa - case s @ Sort(order, _, child) if !s.resolved && child.resolved => + case s @ Sort(order, _, child) if !s.resolved => try { val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder]) val requiredAttrs = AttributeSet(newOrder).filter(_.resolved) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 62d54df98ecc5..ef74504c661ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types.IntegerType /** * A pattern that matches any number of project or filter operations on top of another relational @@ -202,3 +203,15 @@ object Unions { } } } + +/** + * Extractor for retrieving Int value. + */ +object IntegerIndex { + def unapply(a: Any): Option[Int] = a match { + case Literal(a: Int, IntegerType) => Some(a) + // When resolving ordinal in Sort, negative values are extracted for issuing error messages. + case UnaryMinus(IntegerLiteral(v)) => Some(-v) + case _ => None + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index ef825e606202f..39166c4f8ef73 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { - val caseSensitiveConf = new SimpleCatalystConf(true) - val caseInsensitiveConf = new SimpleCatalystConf(false) + val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index b2613e4909288..9aa685e1e8f55 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - val conf = new SimpleCatalystConf(true) + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) val catalog = new SimpleCatalog(conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index da43751b0a310..47b79fe462457 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { } private val caseInsensitiveAnalyzer = - new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false)) + new Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = false)) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index 27c15e856aba5..a4c8d1c6d2aa8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -25,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ class EliminateSortsSuite extends PlanTest { + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false) + val catalog = new SimpleCatalog(conf) + val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) object Optimize extends RuleExecutor[LogicalPlan] { val batches = @@ -48,8 +53,8 @@ class EliminateSortsSuite extends PlanTest { val x = testRelation val query = x.orderBy(SortOrder(3, Ascending), SortOrder(-1, Ascending)) - val optimized = Optimize.execute(query.analyze) - val correctAnswer = x.analyze + val optimized = Optimize.execute(analyzer.execute(query)) + val correctAnswer = analyzer.execute(x) comparePlans(optimized, correctAnswer) } @@ -58,8 +63,8 @@ class EliminateSortsSuite extends PlanTest { val x = testRelation val query = x.orderBy(SortOrder(3, Ascending), 'a.asc) - val optimized = Optimize.execute(query.analyze) - val correctAnswer = x.orderBy('a.asc).analyze + val optimized = Optimize.execute(analyzer.execute(query)) + val correctAnswer = analyzer.execute(x.orderBy('a.asc)) comparePlans(optimized, correctAnswer) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9e0878a5145a0..3d1d5b120a783 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -435,6 +435,11 @@ object SQLConf { defaultValue = Some(true), doc = "When false, we will treat bucketed table as normal table.") + val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal", + defaultValue = Some(true), + doc = "When true, the ordinal numbers are treated as the position in the select list. " + + "When false, the ordinal numbers in order/sort By clause are ignored.") + // The output committer class used by HadoopFsRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. // @@ -634,6 +639,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS) + override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6716982118fed..b765fd8b66163 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,6 +21,8 @@ import java.math.MathContext import java.sql.Timestamp import org.apache.spark.AccumulatorSuite +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} import org.apache.spark.sql.functions._ @@ -2156,6 +2158,47 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("order by ordinal number") { + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC"), + sql("SELECT * FROM testData2 ORDER BY a DESC")) + // If the position is not an integer, ignore it. + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"), + sql("SELECT * FROM testData2 ORDER BY b ASC")) + + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), + sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC")) + checkAnswer( + sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"), + sql("SELECT * FROM testData2 SORT BY a DESC, b ASC")) + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"), + Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2))) + } + + test("order by ordinal number - negative cases") { + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY 0") + } + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC") + } + intercept[UnresolvedException[SortOrder]] { + sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC") + } + } + + test("order by ordinal number with conf spark.sql.orderByOrdinal=false") { + withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") { + // If spark.sql.orderByOrdinal=false, ignore the position number. + checkAnswer( + sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), + sql("SELECT * FROM testData2 ORDER BY b ASC")) + } + } + test("natural join") { val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1") val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")