-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12705] [SPARK-10777] [SQL] Analyzer Rule ResolveSortReferences #10678
Conversation
@@ -523,14 +523,37 @@ class Analyzer( | |||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | |||
case s @ Sort(ordering, global, p @ Project(projectList, child)) | |||
if !s.resolved && p.resolved => | |||
val (newOrdering, missing) = resolveAndFindMissing(ordering, p, child) | |||
val (newOrdering, missing, newChild): (Seq[SortOrder], Seq[Attribute], LogicalPlan) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add some comment here about why we need two separate cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do
Test build #49052 has finished for PR 10678 at commit
|
Test build #49054 has finished for PR 10678 at commit
|
retest this please. |
Test build #49058 has finished for PR 10678 at commit
|
retest this please. |
Test build #49060 has finished for PR 10678 at commit
|
Test build #49061 has finished for PR 10678 at commit
|
@davies Could you take a look if the fix covers the analysis resolution issue in TPCD? Thank you! |
p.child match { | ||
// Case 1: when WINDOW functions are used in the SELECT clause. | ||
// Example: SELECT sum(col1) OVER() FROM table1 ORDER BY col2 | ||
case p1 @ Project(_, w @ Window(_, _, _, _, p2: Project)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the child of Window is not a Project (a Aggregation or other)? Or there are multiple projections ?
@gatorsmile Thanks for working on this, but after this patch, TPCDS Q12/Q20/Q98 are still can't be resolved: " cannot resolve 'i_item_id' given input columns: [i_class, itemrevenue, i_category, i_current_price". You can see all these queries here: https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala#L612 |
Let me do more investigation tomorrow. @davies : ) |
I can reproduce the problem using a simple query now:
The logical plan is like
This does not match the existing two patterns. Thus, it is unable to resolve the sorting columns. I will try to find and write a general rule to handle the cases, as you suggested. |
I found the sorting columns could be partially resolved by different nodes. For example, I need extra time to write the fix. It becomes more complex than what I thought. |
Like the example I posted above, it works in RDBMS. However, supporting it in Spark SQL is not trivial. In this example, In the long term, maybe we can add I will check if the three TPCDS queries can be resolved without major code changes. Thanks! |
@gatorsmile There is a JIRA to have projectList in Join, we can do that later. For this PR, we may just push the missing attributes until any JOIN. |
Thank you! @davies Will upload the new version tonight. |
In this update, the code changes are trying to enhance the existing support for the following scenario: Now, between the top I am not sure if we should change the collection we used in this tail recursion function? Now, it is |
@@ -73,6 +74,7 @@ class Analyzer( | |||
ResolveGroupingAnalytics :: | |||
ResolvePivot :: | |||
ResolveUpCast :: | |||
ResolveAggregateFunctions :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to change the order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this rule ResolveAggregateFunctions
, it has a case for pushing down the expressions in Sort
operator to the underlying Aggregate
. This needs to be done earlier than the rule ResolveSortReferences
; Otherwise, we might add unnecessary missing sorting attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depending on rule orders of one batch is not a good idea. We should either add more if
conditions to only capture needed cases, or separate them into multiple batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right. Will do the change to exclude the cases that can be resolved by ResolveAggregateFunctions
. Thanks!
DISTINCT sounds good, can we use DISTRIBUTE BY, CLUSTER BY together with ORDER BY? they all change the final order. |
I saw a test case in
Do you think users could use it in this way? |
@gatorsmile Let's keep that (DISTRIBUTE BY, CLUSTER BY) for now (even the query does not make sense to me). |
Thank you for your reviews! @davies |
Since we do not plan to support sort reference resolution inside subquery, should we just close the following JIRA? https://issues.apache.org/jira/browse/SPARK-10777 |
@gatorsmile I think that JIRA still valid, because the subquery already output the required attribute, we don't need to add the missing attributes into the project list inside subquery. I believe this PR could fix that. |
Could you also include that in the title? |
Sure, let me add it. |
Test build #49765 has finished for PR 10678 at commit
|
@davies The code is ready to review, after addressing all the comments. @davies @cloud-fan The current implementation of Update: Tried to make it more general, but it is complex. Thus, I did not change the original algorithm in |
case w: Window => | ||
w.copy(projectList = w.projectList ++ | ||
missingResolvableAttrs.filter((w.inputSet -- w.outputSet).contains)) | ||
case a: Aggregate => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case can never happen right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the following query, we can trigger this case. Actually, this query is based on the failed TPCDS queries. Thus, we added it as a test case. The column product
is in the group-by clause but not appeared in aggregateExpressions. Thus, we hit this error if we want to sort the results by product
.
select area, rank() over (partition by area order by month) as c1 from windowData group by product, area, month order by product, area
If we remove this case, we will get this error:
Failed to analyze query: org.apache.spark.sql.AnalysisException: resolved attribute(s) product#2 missing from area#1,c1#39 in operator !Sort [product#2 ASC,area#1 ASC], true;
Project [area#1,c1#48]
+- !Sort [product#2 ASC,area#1 ASC], true
+- Project [area#1,c1#48]
+- Project [area#1,month#0,c1#48,c1#48]
+- Window [area#1,month#0], [rank(month#0) windowspecdefinition(area#1,month#0 ASC,ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS c1#48], [area#1], [month#0 ASC]
+- Aggregate [product#2,area#1,month#0], [area#1,month#0]
+- Subquery windowdata
+- LogicalRDD [month#0,area#1,product#2], MapPartitionsRDD[1] at apply at Transformer.scala:22
Test build #50449 has finished for PR 10678 at commit
|
case s @ Sort(ordering, global, p @ Project(projectList, child)) | ||
if !s.resolved && p.resolved => | ||
val (newOrdering, missing) = resolveAndFindMissing(ordering, p, child) | ||
case s @ Sort(_, _, a: Aggregate) if a.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check that !s.resolved
? we have that in next case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be more clear, ResolveSortReferences
tries to resolve attributes in SortOrder
s, and ResolveAggregateFunctions
tries to resolve aggregate functions in unexpected places(filter or sort), right?
So I think we should skip sort with aggregate functions here, i.e.
case s: Sort if s.order.exists(ResolveAggregateFunctions.containsAggregate) => s
, and add comment to say this case should be handled in ResolveAggregateFunctions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be missing attributes together with aggregate functions, will that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies The missing attributes are also handled in ResolveAggregateFunctions
. Thus it works. To answer your first question regarding !s.resolved
, this is part of the algorithm design in the rule ResolveAggregateFunctions
, as shown below: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L706-L708
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sure, let me change it. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this seems that the rule in ResolveAggregateFunctions
does not really resolve the missing attributes, we could keep that rule unchanged in this PR.
If it's not trivial to fix this, we could create another JIRA for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, it is not trivial. : ( So far, the current rule ResolveSortReferences
only can handle the missing attributes. In this case, we have to push the aggregate function down to the underlying Aggregate
. Thus, it does not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I will let ResolveAggregateFunctions
handle the missing attribute resolution as long as the child of Sort is Aggregate.
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa
When rewriting ResolveSortReferences
in another PR, I will try to make the behaviors of both rules identical for resolving the missing attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put the problematic query in an ignored test? just in case we don't forget it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will add it.
Left two comments, otherwise LGTM. @marmbrus Could you take another pass on it? |
Test build #50459 has finished for PR 10678 at commit
|
s // Nothing we can do here. Return original plan. | ||
} else { | ||
// Add the missing attributes into projectList of Project/Window or | ||
// aggregateExpressions of Aggregate, if they are in the inputSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: too many spaces
LGTM, pending tests |
Let's get @marmbrus to take a look at this one too. |
Test build #50479 has finished for PR 10678 at commit
|
LGTM, thanks for adding a bunch of tests! Merging to master. |
Thank you everyone! : ) |
JIRA: https://issues.apache.org/jira/browse/SPARK-12705
Scope:
This PR is a general fix for sorting reference resolution when the child's
outputSet
does not have the order-by attributes (called, missing attributes):Project
,Window
,Aggregate
,Distinct
,Filter
,RepartitionByExpression
.General Reference Resolution Rules:
Distinct
,Filter
,RepartitionByExpression
. Do not need to add missing attributes. The reason is theiroutputSet
is decided by theirinputSet
, which is theoutputSet
of their children.Aggregate
: missing order-by attributes are not allowed to be added into group-by expressions since it will change the query result. Thus, in RDBMS, it is not allowed.Aggregate
: if the group-by expressions inAggregate
contains the missing attributes but aggregate expressions do not have it, just add them into the aggregate expressions. This can resolve the analysisExceptions thrown by the three TCPDS queries.Project
andWindow
are special. We just need to add the missing attributes to theirprojectList
.Implementation:
inputSet
contains the attributes.Risk:
Low. This rule will be trigger iff
!s.resolved && child.resolved
is true. Thus, very few cases are affected.