Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47217][SQL] Fix deduplicated expression resolution #45552

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Mar 17, 2024

What changes were proposed in this pull request?

There seems to be a regression from Spark 3.5 to 4.0 caused by https://issues.apache.org/jira/browse/SPARK-43838 / #41347 as the following code no longer succeed:

val df = Seq((1, 2)).toDF("a", "b")
val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
df3.show()

Please note that if we dig deeper then it turns out that if we omit the extra project from df then the following is actually a bug that came in with the very first version of DeduplicateRelations, that deduplicated MultiInstanceRelations only:

val schema = StructType.fromDDL("a int, b int")
val rows = Seq(Row(1, 2))
val rdd = sparkContext.parallelize(rows)
val df = spark.createDataFrame(rdd, schema)
val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
df3.show()

The root cause seems to be DeduplicateRelations as it changes df("a") (a#7) comming from the right side when it runs on the join:

=== Applying Rule org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
!'Join Inner, '`=`(bb#12, b#8)              'Join Inner, '`=`(bb#12, b#18)
 :- Project [a#7 AS aa#11, b#8 AS bb#12]    :- Project [a#7 AS aa#11, b#8 AS bb#12]
 :  +- Project [_1#2 AS a#7, _2#3 AS b#8]   :  +- Project [_1#2 AS a#7, _2#3 AS b#8]
 :     +- LocalRelation [_1#2, _2#3]        :     +- LocalRelation [_1#2, _2#3]
!+- Project [_1#2 AS a#7, _2#3 AS b#8]      +- Project [_1#15 AS a#17, _2#16 AS b#18]
!   +- LocalRelation [_1#2, _2#3]              +- LocalRelation [_1#15, _2#16]

and then when the .select() API adds a Project node containing df("a") above the join, it can't be resolved.

This is because DeduplicateRelations always keeps the attributes of the first occurance of a node (Project [_1#2 AS a#7, _2#3 AS b#8] in this case) and creates new instances for other occurances. The rule doesn't (and can't) take into account if a top level attribute can actually come from a node or not.
If spark.sql.analyzer.failAmbiguousSelfJoin is enabled then the DetectAmbiguousSelfJoin catches the issue as

Column a#7 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via Dataset.as before joining them, and specify the column using qualified name, e.g. df.as("a").join(df.as("b"), $"a.id" > $"b.id"). You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.

If it is not enabled then a#7 can't be resolved error is thrown.

To solve the above regression this PR:

  • Assigns LogicalPlan.PLAN_ID_TAGs to logical plans of Datasets that don't have any id yet. (Connect planner already does this.)
  • Changes resolved AttributeReferences to UnresolvedAttributes in certain Dataset APIs if an attribute doesn't seem valid based on the output of the underlying logical plan. Also, the UnresolvedAttributes get the necessary tags to get resolved by ResolveReferences rule (ColumnResolutionHelper.tryResolveDataFrameColumns() logic) later.

Why are the changes needed?

To fix the regression.

Does this PR introduce any user-facing change?

Yes, it fixes the regression.

How was this patch tested?

New and existing UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @peter-toth .

cc @Hisoka-X and @cloud-fan

@@ -227,6 +227,11 @@ class Dataset[T] private[sql](
dsIds.add(id)
plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
}
// A plan might get its PLAN_ID_TAG via connect or belong to multiple dataframes so only assign
// an id to a plan if it doesn't have any
if (plan.getTagValue(LogicalPlan.PLAN_ID_TAG).isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this PR aims to fix a vanilla (non-Connect) Spark SQL issue with PLAN_ID_TAG.
However, currently the PLAN_ID_TAG is only dedicated for Spark Connect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that idea came up here: #45446 (comment)

@cloud-fan
Copy link
Contributor

The column reference in classic Spark SQL DataFrame API is very broken and I really don't want to add more hacks here and there to fix certain cases. In Spark Connect, we've redesigned the column reference and it's much more reliable and reasonable.

How about adding a config to let the classic column reference use the same implement of spark connect's? Ideally users should update their DataFrame query to always use named column like SQL API.

df1 = abc.as("df1")
df2 = xyz.as("df2")
df1.join(df2, $"df1.col" === $"df2.col")

But if users really want to stick with the old style, they can turn on the config.

@peter-toth
Copy link
Contributor Author

peter-toth commented Mar 19, 2024

I think unifying the resolution logic in Spark 4.0 is a good idea and the above example work well via connect. But I don't fully get the last sentence:

But if users really want to stick with the old style, they can turn on the config.

Is there any known downside of the new connect style resolution? Why would someone switch back to the old style?

Also, what shall we do with Spark 3.5? It would be great to fix this regression without requiring users to change their code.

@cloud-fan
Copy link
Contributor

There is no known downside of the connect style resolution, but it sounds scary to replace an existing implementation (which has been there for many years) with a new one completely, especially the old implementation is quite hacky and may work for some corner cases unintentionally.

I'd prefer to add a config to enable connect style resolution, but not by default.

@peter-toth
Copy link
Contributor Author

peter-toth commented Mar 20, 2024

I had some time to play with connect today and as I said it does work well with the query in the PR description, but it doesn't seem to support even the most basic self joins when join conditon is ambiguous:

@ val df = Seq((1, 2)).toDF("a", "b")
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
df: org.apache.spark.sql.package.DataFrame = [a: int, b: int]

@ val df2 = df.select(df("a").as("aa"), df("b"))
df2: org.apache.spark.sql.package.DataFrame = [aa: int, b: int]

@ val df3 = df2.join(df, df2("b") === df("b"))
df3: org.apache.spark.sql.package.DataFrame = Invalid Dataframe; [AMBIGUOUS_COLUMN_REFERENCE] Column "b" is ambiguous. It's because you joined several DataFrame together, and some of these DataFrames are the same.
This column points to one of the DataFrames but Spark is unable to figure out which one.
Please alias the DataFrames with different names via `DataFrame.alias` before joining them,
and specify the column using qualified name, e.g. `df.alias("a").join(df.alias("b"), col("a.id") > col("b.id"))`. SQLSTATE: 42702

Am I missing something?

@peter-toth
Copy link
Contributor Author

peter-toth commented Mar 22, 2024

@cloud-fan, I wonder if the above test should work in connect or in connect is it always required to alias conflicting columns as in SQL?

@peter-toth
Copy link
Contributor Author

@cloud-fan, since connect style resolution doesn't seem to work in all cases I wonder if we can fix non-connect resolution with this PR?

@peter-toth peter-toth force-pushed the SPARK-47217-fix-deduplicated-expression-resolution branch from c0bde7e to a4db113 Compare March 27, 2024 13:38
@cloud-fan
Copy link
Contributor

@zhengruifeng can you take a look? I think we should make this work. We can find df("b") in both join sides, but this is a special case as df("b") directly comes from the right join side. I don't want to make the algorithm very complicated like searching column references from both sides and calculate the depth, but this simple special case is not ambiguous.

Copy link

github-actions bot commented Jul 7, 2024

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 7, 2024
@github-actions github-actions bot closed this Jul 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants