-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12789] [SQL] Support Order By Ordinal in SQL #11815
Changes from 4 commits
aee1023
3342798
240aeb3
e0dce39
256814b
b04529b
d52e1c7
4d44b75
46e3f69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal | |
import org.apache.spark.sql.catalyst.encoders.OuterScopes | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
import org.apache.spark.sql.catalyst.planning.IntegerIndex | ||
import org.apache.spark.sql.catalyst.plans._ | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules._ | ||
|
@@ -40,7 +41,10 @@ import org.apache.spark.sql.types._ | |
* references. | ||
*/ | ||
object SimpleAnalyzer | ||
extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true)) | ||
extends Analyzer( | ||
EmptyCatalog, | ||
EmptyFunctionRegistry, | ||
new SimpleCatalystConf(caseSensitiveAnalysis = true)) | ||
|
||
/** | ||
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and | ||
|
@@ -618,9 +622,33 @@ class Analyzer( | |
* clause. This rule detects such queries and adds the required attributes to the original | ||
* projection, so that they will be available during sorting. Another projection is added to | ||
* remove these attributes after sorting. | ||
* | ||
* This rule also resolves the position number in Sort references. This support is introduced | ||
* in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting. | ||
*/ | ||
object ResolveSortReferences extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
// Replace the index with the related attribute for ORDER BY | ||
// which is a 1-base position of the projection list. | ||
case s @ Sort(orders, global, child) | ||
if child.resolved && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => | ||
val newOrders = orders map { | ||
case s @ SortOrder(IntegerIndex(index), direction) => | ||
if (index > 0 && index <= child.output.size && conf.orderByOrdinal) { | ||
SortOrder(child.output(index - 1), direction) | ||
} else if (!conf.orderByOrdinal) { | ||
failAnalysis( | ||
"Integer in the Order/Sort By clause is not allowed " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lower case order / sort There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'd say if the flag is set to false, we should just ignore the foldable constants. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I see. Will make the change. Thanks! |
||
"when spark.sql.orderByOrdinal is set to false") | ||
} else { | ||
throw new UnresolvedException(s, | ||
s"""Order/Sort By position: $index does not exist \n | ||
|The Select List is indexed from 1 to ${child.output.size}""".stripMargin) | ||
} | ||
case o => o | ||
} | ||
Sort(newOrders, global, child) | ||
|
||
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions | ||
case sa @ Sort(_, _, child: Aggregate) => sa | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging | |||
import org.apache.spark.sql.catalyst.expressions._ | ||||
import org.apache.spark.sql.catalyst.plans._ | ||||
import org.apache.spark.sql.catalyst.plans.logical._ | ||||
import org.apache.spark.sql.types.IntegerType | ||||
|
||||
/** | ||||
* A pattern that matches any number of project or filter operations on top of another relational | ||||
|
@@ -202,3 +203,14 @@ object Unions { | |||
} | ||||
} | ||||
} | ||||
|
||||
/** | ||||
* Extractor for retrieving Int value. | ||||
*/ | ||||
object IntegerIndex { | ||||
def unapply(a: Any): Option[Int] = a match { | ||||
case Literal(a: Int, IntegerType) => Some(a) | ||||
case UnaryMinus(IntegerLiteral(v)) => Some(-v) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i still don't get it - why are we matching unary minus ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As shown in the following link, our parser converts spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala Line 641 in 637a78f
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But why are we getting negative values? Ordinals can only be positive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When users input negative values, we just detect and then issue an exception message, like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah ic so this is used to detect errors. i'd add some comment here explaining why we are having this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, will do. Thanks! |
||||
case _ => None | ||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -435,6 +435,11 @@ object SQLConf { | |
defaultValue = Some(true), | ||
doc = "When false, we will treat bucketed table as normal table") | ||
|
||
val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a similar configuration value in hive since 0.11 and later, which is I think we also need to read from ConfVar for this config. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't mind just diverging from Hive over time and not support any of the config options. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adrian-wang Thank you for your input! Glad to learn it. |
||
defaultValue = Some(true), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we turn it off by default? It's not a standard feature and hive also turn it off by default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are facing two options. Not sure which one is better. For RDBMS users, the default is true. |
||
doc = "When true, the ordinal number is treated as the position in the select list. " + | ||
"When false, the ordinal number in Order/Sort By clause is not allowed.") | ||
|
||
// The output committer class used by HadoopFsRelation. The specified class needs to be a | ||
// subclass of org.apache.hadoop.mapreduce.OutputCommitter. | ||
// | ||
|
@@ -631,6 +636,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin | |
|
||
def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS) | ||
|
||
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) | ||
|
||
/** ********************** SQLConf functionality methods ************ */ | ||
|
||
/** Set Spark SQL configuration properties. */ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ import java.math.MathContext | |
import java.sql.Timestamp | ||
|
||
import org.apache.spark.AccumulatorSuite | ||
import org.apache.spark.sql.catalyst.analysis.UnresolvedException | ||
import org.apache.spark.sql.catalyst.expressions.SortOrder | ||
import org.apache.spark.sql.execution.aggregate | ||
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} | ||
import org.apache.spark.sql.functions._ | ||
|
@@ -2156,6 +2158,48 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
} | ||
} | ||
|
||
test("order by ordinal number") { | ||
checkAnswer( | ||
sql("SELECT * FROM testData2 ORDER BY 1 DESC"), | ||
Seq(Row(3, 1), Row(3, 2), Row(2, 1), Row(2, 2), Row(1, 1), Row(1, 2))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: change it to
which is more readable I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do. Thanks! |
||
checkAnswer( | ||
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"), | ||
sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC")) | ||
checkAnswer( | ||
sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"), | ||
sql("SELECT * FROM testData2 SORT BY a DESC, b ASC")) | ||
checkAnswer( | ||
sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"), | ||
Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2))) | ||
} | ||
|
||
test("order by ordinal number - negative cases") { | ||
val e = intercept[AnalysisException]( | ||
sql("SELECT * FROM testData2 ORDER BY 1 + 1 DESC, b ASC").collect()) | ||
assert(e.getMessage contains | ||
"cannot resolve '(1 + 1) DESC' due to data type mismatch: " + | ||
"sort position must be integer literals") | ||
intercept[UnresolvedException[SortOrder]] { | ||
sql("SELECT * FROM testData2 ORDER BY 0").collect() | ||
} | ||
intercept[UnresolvedException[SortOrder]] { | ||
sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC").collect() | ||
} | ||
intercept[UnresolvedException[SortOrder]] { | ||
sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC").collect() | ||
} | ||
} | ||
|
||
test("order by ordinal number with conf spark.sql.orderByOrdinal=false") { | ||
withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") { | ||
val e = intercept[AnalysisException]( | ||
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC").collect()) | ||
assert(e.getMessage contains | ||
"Integer in the Order/Sort By clause is not allowed " + | ||
"when spark.sql.orderByOrdinal is set to false") | ||
} | ||
} | ||
|
||
test("natural join") { | ||
val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1") | ||
val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last question: should we have 2 configs for order by and group by? Their behaviours are similar, maybe we can just use one config?
cc @rxin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have a good name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
positionAlias
? It's inspired by hive's config:hive.groupby.orderby.position.alias
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea the problem is that once you don't have the groupby or order by there, it is very unclear what position alias means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or how about simply call it
groupByOrderByOrdinal
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem like a great name ... if we can come up with a good name I think having a single config makes sense. If not, doesn't matter that much to have two.