Skip to content

Commit

Permalink
[SPARK-33400][SQL] Normalize sameOrderExpressions in SortOrder to avo…
Browse files Browse the repository at this point in the history
…id unnecessary sort operations

### What changes were proposed in this pull request?
This pull request tries to normalize the SortOrder properly to prevent unnecessary sort operators. Currently the sameOrderExpressions are not normalized as part of AliasAwareOutputOrdering.

Example: consider this join of three tables:

      """
        |SELECT t2id, t3.id as t3id
        |FROM (
        |    SELECT t1.id as t1id, t2.id as t2id
        |    FROM t1, t2
        |    WHERE t1.id = t2.id
        |) t12, t3
        |WHERE t1id = t3.id
      """.

The plan for this looks like:

      *(8) Project [t2id#1059L, id#1004L AS t3id#1060L]
      +- *(8) SortMergeJoin [t2id#1059L], [id#1004L], Inner
         :- *(5) Sort [t2id#1059L ASC NULLS FIRST ], false, 0         <-----------------------------
         :  +- *(5) Project [id#1000L AS t2id#1059L]
         :     +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner
         :        :- *(2) Sort [id#996L ASC NULLS FIRST ], false, 0
         :        :  +- Exchange hashpartitioning(id#996L, 5), true, [id=#1426]
         :        :     +- *(1) Range (0, 10, step=1, splits=2)
         :        +- *(4) Sort [id#1000L ASC NULLS FIRST ], false, 0
         :           +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1432]
         :              +- *(3) Range (0, 20, step=1, splits=2)
         +- *(7) Sort [id#1004L ASC NULLS FIRST ], false, 0
            +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1443]
               +- *(6) Range (0, 30, step=1, splits=2)

In this plan, the marked sort node could have been avoided as the data is already sorted on "t2.id" by the lower SortMergeJoin.

### Why are the changes needed?
To remove unneeded Sort operators.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New UT added.

Closes #30302 from prakharjain09/SPARK-33400-sortorder.

Authored-by: Prakhar Jain <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
prakharjain09 authored and cloud-fan committed Nov 19, 2020
1 parent 014e1fb commit 0b0fb70
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {

final override def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder])
orderingExpressions.map { sortOrder =>
val newSortOrder = normalizeExpression(sortOrder).asInstanceOf[SortOrder]
val newSameOrderExpressions = newSortOrder.sameOrderExpressions.map(normalizeExpression)
newSortOrder.copy(sameOrderExpressions = newSameOrderExpressions)
}
} else {
orderingExpressions
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,37 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
}

test("SPARK-33400: Normalization of sortOrder should take care of sameOrderExprs") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTempView("t1", "t2", "t3") {
spark.range(10).repartition($"id").createTempView("t1")
spark.range(20).repartition($"id").createTempView("t2")
spark.range(30).repartition($"id").createTempView("t3")
val planned = sql(
"""
|SELECT t2id, t3.id as t3id
|FROM (
| SELECT t1.id as t1id, t2.id as t2id
| FROM t1, t2
| WHERE t1.id = t2.id
|) t12, t3
|WHERE t2id = t3.id
""".stripMargin).queryExecution.executedPlan

val sortNodes = planned.collect { case s: SortExec => s }
assert(sortNodes.size == 3)

val projects = planned.collect { case p: ProjectExec => p }
assert(projects.exists(_.outputOrdering match {
case Seq(SortOrder(_, Ascending, NullsFirst, sameOrderExprs)) =>
sameOrderExprs.size == 1 && sameOrderExprs.head.isInstanceOf[AttributeReference] &&
sameOrderExprs.head.asInstanceOf[AttributeReference].name == "t2id"
case _ => false
}))
}
}
}

test("aliases to expressions should not be replaced") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTempView("df1", "df2") {
Expand Down

0 comments on commit 0b0fb70

Please sign in to comment.