Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42049][SQL][FOLLOWUP] Always filter away invalid ordering/partitioning #40137

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
}

override final def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
val newOrdering: Iterator[Option[SortOrder]] = if (hasAlias) {
// Take the first `SortOrder`s only until they can be projected.
// E.g. we have child ordering `Seq(SortOrder(a), SortOrder(b))` then
// if only `a AS x` can be projected then we can return Seq(SortOrder(x))`
Expand All @@ -112,9 +112,21 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
} else {
None
}
}.takeWhile(_.isDefined).flatten.toSeq
}
} else {
orderingExpressions
// Make sure the returned ordering are valid (only reference output attributes of the current
Copy link
Contributor

@peter-toth peter-toth Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Thanks for this follow-up!

But, I'm not sure this part is correct as sortOrder.children should be filtered with _.references.subsetOf(outputSet) separately.

E.g. this test (is a bit artifical though) fails now:

  test("SPARK-42049: Improve AliasAwareOutputExpression - no alias but still prune expressions 2") {
    withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
      Seq(CollapseProject.ruleName, ColumnPruning.ruleName).mkString(",")) {
      val df = spark.range(2).select($"id" as "a", $"id" as "b").select($"a")
      val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering

      assert(outputOrdering.size == 1)
      assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a")
      assert(outputOrdering.head.sameOrderExpressions.size == 0)
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

// plan node). Same as above (the if branch), we take the first ordering expressions that are
// all valid.
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
orderingExpressions.iterator.map { order =>
val validChildren = order.children.filter(_.references.subsetOf(outputSet))
if (validChildren.nonEmpty) {
Some(order.copy(child = validChildren.head, sameOrderExpressions = validChildren.tail))
} else {
None
}
}
}
newOrdering.takeWhile(_.isDefined).flatten.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}

Expand All @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
with AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
if (hasAlias) {
val partitionings: Seq[Partitioning] = if (hasAlias) {
flattenPartitioning(child.outputPartitioning).flatMap {
case e: Expression =>
// We need unique partitionings but if the input partitioning is
Expand All @@ -44,13 +44,19 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
.take(aliasCandidateLimit)
.asInstanceOf[Stream[Partitioning]]
case o => Seq(o)
} match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(p) => p
case ps => PartitioningCollection(ps)
}
} else {
child.outputPartitioning
// Filter valid partitiongs (only reference output attributes of the current plan node)
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
flattenPartitioning(child.outputPartitioning).filter {
case e: Expression => e.references.subsetOf(outputSet)
case _ => true
}
}
partitionings match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(p) => p
case ps => PartitioningCollection(ps)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1129,9 +1129,10 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
assert(sortNodes.size == 3)
val outputOrdering = planned.outputOrdering
assert(outputOrdering.size == 1)
// Sort order should have 3 childrens, not 4. This is because t1.id*2 and 2*t1.id are same
assert(outputOrdering.head.children.size == 3)
assert(outputOrdering.head.children.count(_.isInstanceOf[AttributeReference]) == 2)
// Sort order should have 2 childrens, not 4. This is because t1.id*2 and 2*t1.id are same
// and t2.id is not a valid ordering (the final plan doesn't output t2.id)
assert(outputOrdering.head.children.size == 2)
assert(outputOrdering.head.children.count(_.isInstanceOf[AttributeReference]) == 1)
assert(outputOrdering.head.children.count(_.isInstanceOf[Multiply]) == 1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,18 @@ class ProjectedOrderingAndPartitioningSuite
val outputOrdering3 = df3.queryExecution.optimizedPlan.outputOrdering
assert(outputOrdering3.size == 0)
}

test("SPARK-42049: Improve AliasAwareOutputExpression - no alias but still prune expressions") {
val df = spark.range(2).select($"id" + 1 as "a", $"id" + 2 as "b")

val df1 = df.repartition($"a", $"b").selectExpr("a")
val outputPartitioning = stripAQEPlan(df1.queryExecution.executedPlan).outputPartitioning
assert(outputPartitioning.isInstanceOf[UnknownPartitioning])

val df2 = df.orderBy("a", "b").select("a")
val outputOrdering = df2.queryExecution.optimizedPlan.outputOrdering
assert(outputOrdering.size == 1)
assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a")
assert(outputOrdering.head.sameOrderExpressions.size == 0)
}
}