Skip to content

Commit

Permalink
address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Mar 18, 2016
1 parent e0dce39 commit 256814b
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -623,27 +623,26 @@ class Analyzer(
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
*
* This rule also resolves the position number in Sort references. This support is introduced
* This rule also resolves the position number in sort references. This support is introduced
* in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting.
* - When the sort references are not integer but foldable expressions, ignore them.
* - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too.
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// Replace the index with the related attribute for ORDER BY
// which is a 1-base position of the projection list.
case s @ Sort(orders, global, child)
if child.resolved && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) =>
if conf.orderByOrdinal && child.resolved &&
orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) =>
val newOrders = orders map {
case s @ SortOrder(IntegerIndex(index), direction) =>
if (index > 0 && index <= child.output.size && conf.orderByOrdinal) {
if (index > 0 && index <= child.output.size) {
SortOrder(child.output(index - 1), direction)
} else if (!conf.orderByOrdinal) {
failAnalysis(
"Integer in the Order/Sort By clause is not allowed " +
"when spark.sql.orderByOrdinal is set to false")
} else {
throw new UnresolvedException(s,
s"""Order/Sort By position: $index does not exist \n
|The Select List is indexed from 1 to ${child.output.size}""".stripMargin)
"Order/sort By position: $index does not exist " +
"The Select List is indexed from 1 to ${child.output.size}")
}
case o => o
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.planning.IntegerIndex
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.BinaryPrefixComparator
import org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator
Expand Down Expand Up @@ -48,12 +47,10 @@ case class SortOrder(child: Expression, direction: SortDirection)
override def foldable: Boolean = false

override def checkInputDataTypes(): TypeCheckResult = {
if (!RowOrdering.isOrderable(dataType)) {
TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}")
} else if (child.references.isEmpty && IntegerIndex.unapply(child).isEmpty) {
TypeCheckResult.TypeCheckFailure(s"sort position must be integer literals")
} else {
if (RowOrdering.isOrderable(dataType)) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(s"cannot sort data type ${dataType.simpleString}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ object Unions {
object IntegerIndex {
def unapply(a: Any): Option[Int] = a match {
case Literal(a: Int, IntegerType) => Some(a)
// When resolving ordinal in Sort, negative values are extracted for issuing error messages.
case UnaryMinus(IntegerLiteral(v)) => Some(-v)
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ class AnalysisSuite extends AnalysisTest {
checkAnalysis(plan, expected)
}

test("pull out nondeterministic expressions from Sort") {
val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation)
val projected = Alias(Rand(33), "_nondeterministic")()
val expected =
Project(testRelation.output,
Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false,
Project(testRelation.output :+ projected, testRelation)))
checkAnalysis(plan, expected)
}

test("SPARK-9634: cleanup unnecessary Aliases in LogicalPlan") {
val a = testRelation.output.head
var plan = testRelation.select(((a + 1).as("a+1") + 2).as("col"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ object SQLConf {

val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
defaultValue = Some(true),
doc = "When true, the ordinal number is treated as the position in the select list. " +
"When false, the ordinal number in Order/Sort By clause is not allowed.")
doc = "When true, the ordinal numbers are treated as the position in the select list. " +
"When false, the ordinal numbers in order/sort By clause are ignored.")

// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
assert(df1.showString(5) == df1.showString(5))
}

test("SPARK-8609: local DataFrame with random columns should return same value after sort") {
checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))

// We will reuse the same Expression object for LocalRelation.
val df = (1 to 10).map(Tuple1.apply).toDF()
checkAnswer(df.sort(rand(33)), df.sort(rand(33)))
}

test("SPARK-9083: sort with non-deterministic expressions") {
import org.apache.spark.util.random.XORShiftRandom

Expand Down
19 changes: 9 additions & 10 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC"),
Seq(Row(3, 1), Row(3, 2), Row(2, 1), Row(2, 2), Row(1, 1), Row(1, 2)))
// If the position is not an integer, ignore it.
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 + 0 DESC, b ASC"),
Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(1, 2), Row(2, 2), Row(3, 2)))

checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC"))
Expand All @@ -2174,11 +2179,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("order by ordinal number - negative cases") {
val e = intercept[AnalysisException](
sql("SELECT * FROM testData2 ORDER BY 1 + 1 DESC, b ASC").collect())
assert(e.getMessage contains
"cannot resolve '(1 + 1) DESC' due to data type mismatch: " +
"sort position must be integer literals")
intercept[UnresolvedException[SortOrder]] {
sql("SELECT * FROM testData2 ORDER BY 0").collect()
}
Expand All @@ -2192,11 +2192,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("order by ordinal number with conf spark.sql.orderByOrdinal=false") {
withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "false") {
val e = intercept[AnalysisException](
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC").collect())
assert(e.getMessage contains
"Integer in the Order/Sort By clause is not allowed " +
"when spark.sql.orderByOrdinal is set to false")
// If spark.sql.orderByOrdinal=false, ignore the position number.
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY 1 DESC, b ASC"),
Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(1, 2), Row(2, 2), Row(3, 2)))
}
}

Expand Down

0 comments on commit 256814b

Please sign in to comment.