Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12789] [SQL] Support Order By Ordinal in SQL #11815

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._
private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean

def orderByOrdinal: Boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last question: should we have 2 configs for order by and group by? Their behaviours are similar, maybe we can just use one config?

cc @rxin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have a good name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about positionAlias? It's inspired by hive's config: hive.groupby.orderby.position.alias.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea the problem is that once you don't have the groupby or order by there, it is very unclear what position alias means.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or how about simply call it groupByOrderByOrdinal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like a great name ... if we can come up with a good name I think having a single config makes sense. If not, doesn't matter that much to have two.


/**
* Returns the [[Resolver]] for the current configuration, which can be used to determin if two
* identifiers are equal.
Expand All @@ -43,8 +45,14 @@ object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}
override def orderByOrdinal: Boolean = {
throw new UnsupportedOperationException
}
}

/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf {
case class SimpleCatalystConf(
caseSensitiveAnalysis: Boolean,
orderByOrdinal: Boolean = true)
extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.planning.IntegerIndex
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -40,7 +41,10 @@ import org.apache.spark.sql.types._
* references.
*/
object SimpleAnalyzer
extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true))
extends Analyzer(
EmptyCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true))

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
Expand Down Expand Up @@ -618,9 +622,33 @@ class Analyzer(
* clause. This rule detects such queries and adds the required attributes to the original
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
*
* This rule also resolves the position number in Sort references. This support is introduced
* in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting.
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// Replace the index with the related attribute for ORDER BY
// which is a 1-base position of the projection list.
case s @ Sort(orders, global, child)
if child.resolved && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) =>
val newOrders = orders map {
case s @ SortOrder(IntegerIndex(index), direction) =>
if (index > 0 && index <= child.output.size && conf.orderByOrdinal) {
SortOrder(child.output(index - 1), direction)
} else if (!conf.orderByOrdinal) {
failAnalysis(
"Integer in the Order/Sort By clause is not allowed " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower case order / sort

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd say if the flag is set to false, we should just ignore the foldable constants.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. Will make the change. Thanks!

"when spark.sql.orderByOrdinal is set to false")
} else {
throw new UnresolvedException(s,
s"""Order/Sort By position: $index does not exist \n
|The Select List is indexed from 1 to ${child.output.size}""".stripMargin)
}
case o => o
}
Sort(newOrders, global, child)

// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.planning.IntegerIndex
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.BinaryPrefixComparator
import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator
Expand Down Expand Up @@ -47,10 +48,12 @@ case class SortOrder(child: Expression, direction: SortDirection)
override def foldable: Boolean = false

override def checkInputDataTypes(): TypeCheckResult = {
if (RowOrdering.isOrderable(dataType)) {
TypeCheckResult.TypeCheckSuccess
} else {
if (!RowOrdering.isOrderable(dataType)) {
TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}")
} else if (child.references.isEmpty && IntegerIndex.unapply(child).isEmpty) {
TypeCheckResult.TypeCheckFailure(s"sort position must be integer literals")
} else {
TypeCheckResult.TypeCheckSuccess
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.IntegerType

/**
* A pattern that matches any number of project or filter operations on top of another relational
Expand Down Expand Up @@ -202,3 +203,14 @@ object Unions {
}
}
}

/**
* Extractor for retrieving Int value.
*/
object IntegerIndex {
def unapply(a: Any): Option[Int] = a match {
case Literal(a: Int, IntegerType) => Some(a)
case UnaryMinus(IntegerLiteral(v)) => Some(-v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i still don't get it - why are we matching unary minus ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As shown in the following link, our parser converts -3 to UnaryMinus(Literal(3)). Thus, we have to use this to get negative values.

case Token("-", child :: Nil) => UnaryMinus(nodeToExpr(child))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why are we getting negative values? Ordinals can only be positive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ic so this is used to detect errors. i'd add some comment here explaining why we are having this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do. Thanks!

case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,6 @@ class AnalysisSuite extends AnalysisTest {
checkAnalysis(plan, expected)
}

test("pull out nondeterministic expressions from Sort") {
val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation)
val projected = Alias(Rand(33), "_nondeterministic")()
val expected =
Project(testRelation.output,
Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false,
Project(testRelation.output :+ projected, testRelation)))
checkAnalysis(plan, expected)
}

test("SPARK-9634: cleanup unnecessary Aliases in LogicalPlan") {
val a = testRelation.output.head
var plan = testRelation.select(((a + 1).as("a+1") + 2).as("col"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
trait AnalysisTest extends PlanTest {

val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
val caseSensitiveConf = new SimpleCatalystConf(true)
val caseInsensitiveConf = new SimpleCatalystConf(false)
val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)

val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._


class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
val conf = new SimpleCatalystConf(true)
val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
}

private val caseInsensitiveAnalyzer =
new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false))
new Analyzer(
EmptyCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = false))

test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
val plan = caseInsensitiveAnalyzer.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ object SQLConf {
defaultValue = Some(true),
doc = "When false, we will treat bucketed table as normal table")

val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a similar configuration value in hive since 0.11 and later, which is hive.groupby.orderby.position.alias(default as false).
See: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy#LanguageManualSortBy-SyntaxofOrderBy

I think we also need to read from ConfVar for this config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind just diverging from Hive over time and not support any of the config options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adrian-wang Thank you for your input! Glad to learn it.

defaultValue = Some(true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we turn it off by default? It's not a standard feature and hive also turn it off by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are facing two options. Not sure which one is better. For RDBMS users, the default is true.

doc = "When true, the ordinal number is treated as the position in the select list. " +
"When false, the ordinal number in Order/Sort By clause is not allowed.")

// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
//
Expand Down Expand Up @@ -631,6 +636,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin

def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS)

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,14 +952,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
assert(df1.showString(5) == df1.showString(5))
}

test("SPARK-8609: local DataFrame with random columns should return same value after sort") {
checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))

// We will reuse the same Expression object for LocalRelation.
val df = (1 to 10).map(Tuple1.apply).toDF()
checkAnswer(df.sort(rand(33)), df.sort(rand(33)))
}

test("SPARK-9083: sort with non-deterministic expressions") {
import org.apache.spark.util.random.XORShiftRandom

Expand Down
44 changes: 44 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.math.MathContext
import java.sql.Timestamp

import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -2156,6 +2158,48 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("order by ordinal number") {
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC"),
Seq(Row(3, 1), Row(3, 2), Row(2, 1), Row(2, 2), Row(1, 1), Row(1, 2)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change it to

checkAnswer(
  sql("SELECT * FROM testData2 ORDER BY 1 DESC"),
  sql("SELECT * FROM testData2 ORDER BY a"))

which is more readable I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do. Thanks!

checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC"))
checkAnswer(
sql("SELECT * FROM testData2 SORT BY 1 DESC, 2"),
sql("SELECT * FROM testData2 SORT BY a DESC, b ASC"))
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 ASC, b ASC"),
Seq(Row(1, 1), Row(1, 2), Row(2, 1), Row(2, 2), Row(3, 1), Row(3, 2)))
}

test("order by ordinal number - negative cases") {
val e = intercept[AnalysisException](
sql("SELECT * FROM testData2 ORDER BY 1 + 1 DESC, b ASC").collect())
assert(e.getMessage contains
"cannot resolve '(1 + 1) DESC' due to data type mismatch: " +
"sort position must be integer literals")
intercept[UnresolvedException[SortOrder]] {
sql("SELECT * FROM testData2 ORDER BY 0").collect()
}
intercept[UnresolvedException[SortOrder]] {
sql("SELECT * FROM testData2 ORDER BY -1 DESC, b ASC").collect()
}
intercept[UnresolvedException[SortOrder]] {
sql("SELECT * FROM testData2 ORDER BY 3 DESC, b ASC").collect()
}
}

test("order by ordinal number with conf spark.sql.orderByOrdinal=false") {
withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") {
val e = intercept[AnalysisException](
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC").collect())
assert(e.getMessage contains
"Integer in the Order/Sort By clause is not allowed " +
"when spark.sql.orderByOrdinal is set to false")
}
}

test("natural join") {
val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1")
val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")
Expand Down