Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized #41347

Closed
wants to merge 9 commits into from

Conversation

Hisoka-X
Copy link
Member

@Hisoka-X Hisoka-X commented May 28, 2023

What changes were proposed in this pull request?

Eg:

sql("create view t(c1, c2) as values (0, 1), (0, 2), (1, 2)")

sql("select c1, c2, (select count(*) cnt from t t2 where t1.c1 = t2.c1 " +
"having cnt = 0) from t t1").show() 

The error will throw:

[PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery in batch Operator Optimization before Inferring Filters generated an invalid plan: The plan becomes unresolved: 'Project [toprettystring(c1#224, Some(America/Los_Angeles)) AS toprettystring(c1)#238, toprettystring(c2#225, Some(America/Los_Angeles)) AS toprettystring(c2)#239, toprettystring(cnt#246L, Some(America/Los_Angeles)) AS toprettystring(scalarsubquery(c1))#240]
+- 'Project [c1#224, c2#225, CASE WHEN isnull(alwaysTrue#245) THEN 0 WHEN NOT (cnt#222L = 0) THEN null ELSE cnt#222L END AS cnt#246L]
   +- 'Join LeftOuter, (c1#224 = c1#224#244)
      :- Project [col1#226 AS c1#224, col2#227 AS c2#225]
      :  +- LocalRelation [col1#226, col2#227]
      +- Project [cnt#222L, c1#224#244, cnt#222L, c1#224, true AS alwaysTrue#245]
         +- Project [cnt#222L, c1#224 AS c1#224#244, cnt#222L, c1#224]
            +- Aggregate [c1#224], [count(1) AS cnt#222L, c1#224]
               +- Project [col1#228 AS c1#224]
                  +- LocalRelation [col1#228, col2#229]The previous plan: Project [toprettystring(c1#224, Some(America/Los_Angeles)) AS toprettystring(c1)#238, toprettystring(c2#225, Some(America/Los_Angeles)) AS toprettystring(c2)#239, toprettystring(scalar-subquery#223 [c1#224 && (c1#224 = c1#224#244)], Some(America/Los_Angeles)) AS toprettystring(scalarsubquery(c1))#240]
:  +- Project [cnt#222L, c1#224 AS c1#224#244]
:     +- Filter (cnt#222L = 0)
:        +- Aggregate [c1#224], [count(1) AS cnt#222L, c1#224]
:           +- Project [col1#228 AS c1#224]
:              +- LocalRelation [col1#228, col2#229]
+- Project [col1#226 AS c1#224, col2#227 AS c2#225]
   +- LocalRelation [col1#226, col2#227] 

The reason of error is the unresolved expression in Join node which generate by subquery decorrelation. The duplicateResolved in Join node are false. That's meaning the Join left and right have same Attribute, in this eg is c1#224. The right c1#224 Attribute generated by having Inputs, because there are wrong having Inputs.

This problem only occurs when there contain having clause.

also do some code format fix.

Why are the changes needed?

Fix subquery bug on single table when use having clause

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add new test

@github-actions github-actions bot added the SQL label May 28, 2023
@Hisoka-X
Copy link
Member Author

cc @cloud-fan @jchen5

@cloud-fan
Copy link
Contributor

I think a better fix is to let DeduplicateRelations handle Project with alias as well, which is also a source of conflicting attribute ids.

@Hisoka-X
Copy link
Member Author

Hisoka-X commented Jun 7, 2023

I think a better fix is to let DeduplicateRelations handle Project with alias as well, which is also a source of conflicting attribute ids.

@cloud-fan Hi, I already implement DeduplicateRelations to handle with Project. Should I revert change from RewriteCorrelatedScalarSubquery? I think it's a hidden danger

@allisonwang-db
Copy link
Contributor

cc @jchen5

@@ -105,6 +105,21 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
(m, false)
}

case p @ Project(_, child) if p.resolved && p.projectList.forall(_.isInstanceOf[Alias]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reading the doc of the collectConflictPlans function in this class. I think the problem we have now is there are more plan nodes than the leaf node that can produce new attributes, and we need to handle all of them. Project is not the only way, let's follow what plan nodes collectConflictPlans handles.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add all plan node which in collectConflictPlans. Please check again, but there are another problem. How to test it, seem produce an negative case not easy.

@Hisoka-X Hisoka-X force-pushed the SPARK-43838_subquery_having branch from 3a29635 to 075ef46 Compare July 10, 2023 14:02
@Hisoka-X Hisoka-X force-pushed the SPARK-43838_subquery_having branch from 075ef46 to 0c12ef9 Compare July 10, 2023 14:05
@@ -437,25 +437,6 @@ class LeftSemiAntiJoinPushDownSuite extends PlanTest {
}
}

Seq(LeftSemi, LeftAnti).foreach { case jt =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test unnecessary. Because we can deduplicate those attributes in anti-join / semi-join is a self-join. Please refer #39131

@@ -271,7 +271,10 @@ case class ScalarSubquery(
mayHaveCountBug: Option[Boolean] = None)
extends SubqueryExpression(plan, outerAttrs, exprId, joinCond, hint) with Unevaluable {
override def dataType: DataType = {
assert(plan.schema.fields.nonEmpty, "Scalar subquery should have only one column")
if (!plan.schema.fields.nonEmpty) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure the AnalysisException will be throw, not AssertionError

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we really reach this code branch?

Copy link
Member Author

@Hisoka-X Hisoka-X Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Usually this error will be thrown by checkAnalysis, but we may call datatype in DeduplicateRelations to cause this exception to be thrown. This change ensures that the thrown exception is consistent.

Change before:

Caused by: sbt.ForkMain$ForkError: java.lang.AssertionError: assertion failed: Scalar subquery should have only one column
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.catalyst.expressions.ScalarSubquery.dataType(subquery.scala:274)
	at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:194)
	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$$anonfun$findAliases$1.applyOrElse(DeduplicateRelations.scala:530)
	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$$anonfun$findAliases$1.applyOrElse(DeduplicateRelations.scala:530)
	at scala.PartialFunction.$anonfun$runWith$1$adapted(PartialFunction.scala:145)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.collect(TraversableLike.scala:407)
	at scala.collection.TraversableLike.collect$(TraversableLike.scala:405)
	at scala.collection.AbstractTraversable.collect(Traversable.scala:108)
	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.findAliases(DeduplicateRelations.scala:530)
	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.org$apache$spark$sql$catalyst$analysis$DeduplicateRelations$$renewDuplicatedRelations(DeduplicateRelations.scala:120)
	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.apply(DeduplicateRelations.scala:40)
	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.apply(DeduplicateRelations.scala:38)

@cloud-fan
Copy link
Contributor

The k8s failure is unrelated, I'm merging it to master, thanks!

@cloud-fan cloud-fan closed this in e0c79c6 Jul 20, 2023
@Hisoka-X
Copy link
Member Author

Thanks @cloud-fan for your help and @allisonwang-db

@Hisoka-X Hisoka-X deleted the SPARK-43838_subquery_having branch July 20, 2023 02:22
cloud-fan pushed a commit that referenced this pull request Jul 29, 2023
…dRelations`

### What changes were proposed in this pull request?
This is a follow up PR for #41347 , add missing aggregate case in `renewDuplicatedRelations`

### Why are the changes needed?
add missing case

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist test.

Closes #42160 from Hisoka-X/SPARK-43838_subquery_aggregate_follow_up.

Authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…n't be optimized

### What changes were proposed in this pull request?

Eg:
```scala
sql("create view t(c1, c2) as values (0, 1), (0, 2), (1, 2)")

sql("select c1, c2, (select count(*) cnt from t t2 where t1.c1 = t2.c1 " +
"having cnt = 0) from t t1").show()
```
The error will throw:
```
[PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery in batch Operator Optimization before Inferring Filters generated an invalid plan: The plan becomes unresolved: 'Project [toprettystring(c1#224, Some(America/Los_Angeles)) AS toprettystring(c1)apache#238, toprettystring(c2#225, Some(America/Los_Angeles)) AS toprettystring(c2)apache#239, toprettystring(cnt#246L, Some(America/Los_Angeles)) AS toprettystring(scalarsubquery(c1))apache#240]
+- 'Project [c1#224, c2#225, CASE WHEN isnull(alwaysTrue#245) THEN 0 WHEN NOT (cnt#222L = 0) THEN null ELSE cnt#222L END AS cnt#246L]
   +- 'Join LeftOuter, (c1#224 = c1#224#244)
      :- Project [col1#226 AS c1#224, col2#227 AS c2#225]
      :  +- LocalRelation [col1#226, col2#227]
      +- Project [cnt#222L, c1#224#244, cnt#222L, c1#224, true AS alwaysTrue#245]
         +- Project [cnt#222L, c1#224 AS c1#224#244, cnt#222L, c1#224]
            +- Aggregate [c1#224], [count(1) AS cnt#222L, c1#224]
               +- Project [col1#228 AS c1#224]
                  +- LocalRelation [col1#228, col2#229]The previous plan: Project [toprettystring(c1#224, Some(America/Los_Angeles)) AS toprettystring(c1)apache#238, toprettystring(c2#225, Some(America/Los_Angeles)) AS toprettystring(c2)apache#239, toprettystring(scalar-subquery#223 [c1#224 && (c1#224 = c1#224#244)], Some(America/Los_Angeles)) AS toprettystring(scalarsubquery(c1))apache#240]
:  +- Project [cnt#222L, c1#224 AS c1#224#244]
:     +- Filter (cnt#222L = 0)
:        +- Aggregate [c1#224], [count(1) AS cnt#222L, c1#224]
:           +- Project [col1#228 AS c1#224]
:              +- LocalRelation [col1#228, col2#229]
+- Project [col1#226 AS c1#224, col2#227 AS c2#225]
   +- LocalRelation [col1#226, col2#227]
```

The reason of error is the unresolved expression in `Join` node which generate by subquery decorrelation. The `duplicateResolved` in `Join` node are false. That's meaning the `Join` left and right have same `Attribute`, in this eg is `c1#224`. The right `c1#224` `Attribute` generated by having Inputs, because there are wrong having Inputs.

This problem only occurs when there contain having clause.

also do some code format fix.

### Why are the changes needed?
Fix subquery bug on single table when use having clause

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new test

Closes apache#41347 from Hisoka-X/SPARK-43838_subquery_having.

Lead-authored-by: Jia Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
…dRelations`

### What changes were proposed in this pull request?
This is a follow up PR for apache#41347 , add missing aggregate case in `renewDuplicatedRelations`

### Why are the changes needed?
add missing case

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist test.

Closes apache#42160 from Hisoka-X/SPARK-43838_subquery_aggregate_follow_up.

Authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants