Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12789] [SQL] Support Order By Ordinal in SQL #11815

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._
private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean

def orderByOrdinal: Boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last question: should we have 2 configs for order by and group by? Their behaviours are similar, maybe we can just use one config?

cc @rxin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have a good name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about positionAlias? It's inspired by hive's config: hive.groupby.orderby.position.alias.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea the problem is that once you don't have the groupby or order by there, it is very unclear what position alias means.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or how about simply call it groupByOrderByOrdinal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like a great name ... if we can come up with a good name I think having a single config makes sense. If not, doesn't matter that much to have two.


/**
* Returns the [[Resolver]] for the current configuration, which can be used to determin if two
* identifiers are equal.
Expand All @@ -43,8 +45,14 @@ object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}
override def orderByOrdinal: Boolean = {
throw new UnsupportedOperationException
}
}

/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf {
case class SimpleCatalystConf(
caseSensitiveAnalysis: Boolean,
orderByOrdinal: Boolean = true)
extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.planning.IntegerIndex
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -40,7 +41,10 @@ import org.apache.spark.sql.types._
* references.
*/
object SimpleAnalyzer
extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true))
extends Analyzer(
EmptyCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true))

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
Expand Down Expand Up @@ -618,13 +622,36 @@ class Analyzer(
* clause. This rule detects such queries and adds the required attributes to the original
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
*
* This rule also resolves the position number in sort references. This support is introduced
* in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting.
* - When the sort references are not integer but foldable expressions, ignore them.
* - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too.
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case s: Sort if !s.child.resolved => s
// Replace the index with the related attribute for ORDER BY
// which is a 1-base position of the projection list.
case s @ Sort(orders, global, child)
if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) =>
val newOrders = orders map {
case s @ SortOrder(IntegerIndex(index), direction) =>
if (index > 0 && index <= child.output.size) {
SortOrder(child.output(index - 1), direction)
} else {
throw new UnresolvedException(s,
"Order/sort By position: $index does not exist " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems you miss a s at head? Should be s"... $index..."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Let me correct it. Thanks!

"The Select List is indexed from 1 to ${child.output.size}")
}
case o => o
}
Sort(newOrders, global, child)

// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa

case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
case s @ Sort(order, _, child) if !s.resolved =>
try {
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.IntegerType

/**
* A pattern that matches any number of project or filter operations on top of another relational
Expand Down Expand Up @@ -202,3 +203,15 @@ object Unions {
}
}
}

/**
* Extractor for retrieving Int value.
*/
object IntegerIndex {
def unapply(a: Any): Option[Int] = a match {
case Literal(a: Int, IntegerType) => Some(a)
// When resolving ordinal in Sort, negative values are extracted for issuing error messages.
case UnaryMinus(IntegerLiteral(v)) => Some(-v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i still don't get it - why are we matching unary minus ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As shown in the following link, our parser converts -3 to UnaryMinus(Literal(3)). Thus, we have to use this to get negative values.

case Token("-", child :: Nil) => UnaryMinus(nodeToExpr(child))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why are we getting negative values? Ordinals can only be positive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ic so this is used to detect errors. i'd add some comment here explaining why we are having this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do. Thanks!

case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
trait AnalysisTest extends PlanTest {

val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
val caseSensitiveConf = new SimpleCatalystConf(true)
val caseInsensitiveConf = new SimpleCatalystConf(false)
val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)

val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._


class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
val conf = new SimpleCatalystConf(true)
val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
}

private val caseInsensitiveAnalyzer =
new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false))
new Analyzer(
EmptyCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = false))

test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
val plan = caseInsensitiveAnalyzer.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ object SQLConf {
defaultValue = Some(true),
doc = "When false, we will treat bucketed table as normal table")

val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a similar configuration value in hive since 0.11 and later, which is hive.groupby.orderby.position.alias(default as false).
See: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy#LanguageManualSortBy-SyntaxofOrderBy

I think we also need to read from ConfVar for this config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind just diverging from Hive over time and not support any of the config options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adrian-wang Thank you for your input! Glad to learn it.

defaultValue = Some(true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we turn it off by default? It's not a standard feature and hive also turn it off by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are facing two options. Not sure which one is better. For RDBMS users, the default is true.

doc = "When true, the ordinal numbers are treated as the position in the select list. " +
"When false, the ordinal numbers in order/sort By clause are ignored.")

// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
//
Expand Down Expand Up @@ -631,6 +636,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin

def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS)

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
43 changes: 43 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.math.MathContext
import java.sql.Timestamp

import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -2156,6 +2158,47 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("order by ordinal number") {
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC"),
sql("SELECT * FROM testData2 ORDER BY a DESC"))
// If the position is not an integer, ignore it.
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"),
sql("SELECT * FROM testData2 ORDER BY b ASC"))

checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC"))
checkAnswer(
sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"),
sql("SELECT * FROM testData2 SORT BY a DESC, b ASC"))
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"),
Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2)))
}

test("order by ordinal number - negative cases") {
intercept[UnresolvedException[SortOrder]] {
sql("SELECT * FROM testData2 ORDER BY 0")
}
intercept[UnresolvedException[SortOrder]] {
sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC")
}
intercept[UnresolvedException[SortOrder]] {
sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC")
}
}

test("order by ordinal number with conf spark.sql.orderByOrdinal=false") {
withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") {
// If spark.sql.orderByOrdinal=false, ignore the position number.
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
sql("SELECT * FROM testData2 ORDER BY b ASC"))
}
}

test("natural join") {
val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1")
val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")
Expand Down