Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12705] [SPARK-10777] [SQL] Analyzer Rule ResolveSortReferences #10678

Closed
wants to merge 19 commits into from

Conversation

gatorsmile
Copy link
Member

JIRA: https://issues.apache.org/jira/browse/SPARK-12705

Scope:
This PR is a general fix for sorting reference resolution when the child's outputSet does not have the order-by attributes (called, missing attributes):

  • UnaryNode support is limited to Project, Window, Aggregate, Distinct, Filter, RepartitionByExpression.
  • We will not try to resolve the missing references inside a subquery, unless the outputSet of this subquery contains it.

General Reference Resolution Rules:

  • Jump over the nodes with the following types: Distinct, Filter, RepartitionByExpression. Do not need to add missing attributes. The reason is their outputSet is decided by their inputSet, which is the outputSet of their children.
  • Group-by expressions in Aggregate: missing order-by attributes are not allowed to be added into group-by expressions since it will change the query result. Thus, in RDBMS, it is not allowed.
  • Aggregate expressions in Aggregate: if the group-by expressions in Aggregate contains the missing attributes but aggregate expressions do not have it, just add them into the aggregate expressions. This can resolve the analysisExceptions thrown by the three TCPDS queries.
  • Project and Window are special. We just need to add the missing attributes to their projectList.

Implementation:

  1. Traverse the whole tree in a pre-order manner to find all the resolvable missing order-by attributes.
  2. Traverse the whole tree in a post-order manner to add the found missing order-by attributes to the node if their inputSet contains the attributes.
  3. If the origins of the missing order-by attributes are different nodes, each pass only resolves the missing attributes that are from the same node.

Risk:
Low. This rule will be trigger iff !s.resolved && child.resolved is true. Thus, very few cases are affected.

@@ -523,14 +523,37 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case s @ Sort(ordering, global, p @ Project(projectList, child))
if !s.resolved && p.resolved =>
val (newOrdering, missing) = resolveAndFindMissing(ordering, p, child)
val (newOrdering, missing, newChild): (Seq[SortOrder], Seq[Attribute], LogicalPlan) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comment here about why we need two separate cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do

@SparkQA
Copy link

SparkQA commented Jan 10, 2016

Test build #49052 has finished for PR 10678 at commit 5ca4630.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 10, 2016

Test build #49054 has finished for PR 10678 at commit da6baf2.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 10, 2016

Test build #49058 has finished for PR 10678 at commit da6baf2.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 10, 2016

Test build #49060 has finished for PR 10678 at commit da6baf2.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 10, 2016

Test build #49061 has finished for PR 10678 at commit b5de079.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

@davies Could you take a look if the fix covers the analysis resolution issue in TPCD? Thank you!

p.child match {
// Case 1: when WINDOW functions are used in the SELECT clause.
// Example: SELECT sum(col1) OVER() FROM table1 ORDER BY col2
case p1 @ Project(_, w @ Window(_, _, _, _, p2: Project)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the child of Window is not a Project (a Aggregation or other)? Or there are multiple projections ?

@davies
Copy link
Contributor

davies commented Jan 11, 2016

@gatorsmile Thanks for working on this, but after this patch, TPCDS Q12/Q20/Q98 are still can't be resolved: " cannot resolve 'i_item_id' given input columns: [i_class, itemrevenue, i_category, i_current_price".

You can see all these queries here: https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala#L612

@gatorsmile
Copy link
Member Author

Let me do more investigation tomorrow. @davies : )

@gatorsmile
Copy link
Member Author

I can reproduce the problem using a simple query now:

select area, rank() over (partition by area order by month) as c1
from windowData group by product, area, month order by product

The logical plan is like

'Sort ['product ASC], true
+- Project [area#1,c1#3]
   +- Project [area#1,month#0,c1#3,c1#3]
      +- Window [area#1,month#0], [rank() windowspecdefinition(area#1,month#0 ASC,ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS c1#3], [area#1], [month#0 ASC]
         +- Aggregate [product#2,area#1,month#0], [area#1,month#0]
            +- Subquery windowdata
               +- LogicalRDD [month#0,area#1,product#2], MapPartitionsRDD[1] at apply at Transformer.scala:22

This does not match the existing two patterns. Thus, it is unable to resolve the sorting columns. I will try to find and write a general rule to handle the cases, as you suggested.

@gatorsmile
Copy link
Member Author

I found the sorting columns could be partially resolved by different nodes. For example,
select a.c1, b.c2 from t1 a, t1 b order by b.c3, a.c3

I need extra time to write the fix. It becomes more complex than what I thought.

@gatorsmile
Copy link
Member Author

Like the example I posted above, it works in RDBMS. However, supporting it in Spark SQL is not trivial. In this example, Join operator is involved but it does not have a projectList attribute. Thus, we are unable to directly add missing attributes to it. Now, only two operators Project and Window are doable. That means, we might see more JIRAs to complain it.

In the long term, maybe we can add projectList to all the logical operators? That indicates we integrate Project into all the logical operators.

I will check if the three TPCDS queries can be resolved without major code changes. Thanks!

@davies
Copy link
Contributor

davies commented Jan 12, 2016

@gatorsmile There is a JIRA to have projectList in Join, we can do that later. For this PR, we may just push the missing attributes until any JOIN.

@gatorsmile
Copy link
Member Author

Thank you! @davies Will upload the new version tonight.

@gatorsmile
Copy link
Member Author

In this update, the code changes are trying to enhance the existing support for the following scenario: Sort-by attributes that are not present in the SELECT clause

Now, between the top Sort operator and the operator that can resolve the sorting columns, it can have one or multiple different operators of Aggregate, Project, Subquery and Window. The reason why we just support these four types is that only these four types we can add sorting attributes into their project list. In the future, we can easily extend the solution by supporting more types.

I am not sure if we should change the collection we used in this tail recursion function? Now, it is Seq. If the plan tree is huge, the performance could be not good. Thanks!

@gatorsmile gatorsmile changed the title [SPARK-12705] [SQL] AnalysisException: Sorting columns are not in Project of Window Function [SPARK-12705] [SQL] AnalysisException: Sorting columns are not in the child operators Jan 13, 2016
@@ -73,6 +74,7 @@ class Analyzer(
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveUpCast ::
ResolveAggregateFunctions ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to change the order?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this rule ResolveAggregateFunctions, it has a case for pushing down the expressions in Sort operator to the underlying Aggregate. This needs to be done earlier than the rule ResolveSortReferences; Otherwise, we might add unnecessary missing sorting attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depending on rule orders of one batch is not a good idea. We should either add more if conditions to only capture needed cases, or separate them into multiple batches.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. Will do the change to exclude the cases that can be resolved by ResolveAggregateFunctions. Thanks!

@davies
Copy link
Contributor

davies commented Jan 19, 2016

DISTINCT sounds good, can we use DISTRIBUTE BY, CLUSTER BY together with ORDER BY? they all change the final order.

@gatorsmile
Copy link
Member Author

I saw a test case in LogicalPlanToSQLSuite.scala.

  test("distribute by with sort by") {
    checkHiveQl("SELECT id FROM t0 DISTRIBUTE BY id SORT BY id")
  }

Do you think users could use it in this way?

@davies
Copy link
Contributor

davies commented Jan 19, 2016

@gatorsmile Let's keep that (DISTRIBUTE BY, CLUSTER BY) for now (even the query does not make sense to me).

@gatorsmile
Copy link
Member Author

Thank you for your reviews! @davies

@gatorsmile
Copy link
Member Author

Since we do not plan to support sort reference resolution inside subquery, should we just close the following JIRA? https://issues.apache.org/jira/browse/SPARK-10777

@davies
Copy link
Contributor

davies commented Jan 19, 2016

@gatorsmile I think that JIRA still valid, because the subquery already output the required attribute, we don't need to add the missing attributes into the project list inside subquery. I believe this PR could fix that.

@davies
Copy link
Contributor

davies commented Jan 19, 2016

Could you also include that in the title?

@gatorsmile
Copy link
Member Author

Sure, let me add it.

@gatorsmile gatorsmile changed the title [SPARK-12705] [SQL] Analyzer Rule ResolveSortReferences [SPARK-12705] [SPARK-10777] [SQL] Analyzer Rule ResolveSortReferences Jan 19, 2016
@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49765 has finished for PR 10678 at commit 1964884.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

@davies The code is ready to review, after addressing all the comments.

@davies @cloud-fan The current implementation of ResolveAggregateFunctions also tries to resolve the missing sort references, but it has a limit. The related logics is triggered iff Aggregate is the child of Sort. I am just thinking if we should move the related part of ResolveAggregateFunctions into the rule ResolveSortReferences? Thanks!

Update: Tried to make it more general, but it is complex. Thus, I did not change the original algorithm in ResolveAggregateFunctions. Just called it in the ResolveSortReferences

case w: Window =>
w.copy(projectList = w.projectList ++
missingResolvableAttrs.filter((w.inputSet -- w.outputSet).contains))
case a: Aggregate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case can never happen right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the following query, we can trigger this case. Actually, this query is based on the failed TPCDS queries. Thus, we added it as a test case. The column product is in the group-by clause but not appeared in aggregateExpressions. Thus, we hit this error if we want to sort the results by product.
select area, rank() over (partition by area order by month) as c1 from windowData group by product, area, month order by product, area

If we remove this case, we will get this error:

Failed to analyze query: org.apache.spark.sql.AnalysisException: resolved attribute(s) product#2 missing from area#1,c1#39 in operator !Sort [product#2 ASC,area#1 ASC], true;
Project [area#1,c1#48]
+- !Sort [product#2 ASC,area#1 ASC], true
   +- Project [area#1,c1#48]
      +- Project [area#1,month#0,c1#48,c1#48]
         +- Window [area#1,month#0], [rank(month#0) windowspecdefinition(area#1,month#0 ASC,ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS c1#48], [area#1], [month#0 ASC]
            +- Aggregate [product#2,area#1,month#0], [area#1,month#0]
               +- Subquery windowdata
                  +- LogicalRDD [month#0,area#1,product#2], MapPartitionsRDD[1] at apply at Transformer.scala:22

@SparkQA
Copy link

SparkQA commented Jan 31, 2016

Test build #50449 has finished for PR 10678 at commit ba02f46.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case s @ Sort(ordering, global, p @ Project(projectList, child))
if !s.resolved && p.resolved =>
val (newOrdering, missing) = resolveAndFindMissing(ordering, p, child)
case s @ Sort(_, _, a: Aggregate) if a.resolved =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check that !s.resolved? we have that in next case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be more clear, ResolveSortReferences tries to resolve attributes in SortOrders, and ResolveAggregateFunctions tries to resolve aggregate functions in unexpected places(filter or sort), right?

So I think we should skip sort with aggregate functions here, i.e.
case s: Sort if s.order.exists(ResolveAggregateFunctions.containsAggregate) => s, and add comment to say this case should be handled in ResolveAggregateFunctions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be missing attributes together with aggregate functions, will that work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies The missing attributes are also handled in ResolveAggregateFunctions. Thus it works. To answer your first question regarding !s.resolved, this is part of the algorithm design in the rule ResolveAggregateFunctions, as shown below: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L706-L708

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sure, let me change it. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this seems that the rule in ResolveAggregateFunctions does not really resolve the missing attributes, we could keep that rule unchanged in this PR.

If it's not trivial to fix this, we could create another JIRA for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it is not trivial. : ( So far, the current rule ResolveSortReferences only can handle the missing attributes. In this case, we have to push the aggregate function down to the underlying Aggregate. Thus, it does not work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I will let ResolveAggregateFunctions handle the missing attribute resolution as long as the child of Sort is Aggregate.

// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa

When rewriting ResolveSortReferences in another PR, I will try to make the behaviors of both rules identical for resolving the missing attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put the problematic query in an ignored test? just in case we don't forget it...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add it.

@davies
Copy link
Contributor

davies commented Jan 31, 2016

Left two comments, otherwise LGTM.

@marmbrus Could you take another pass on it?

@SparkQA
Copy link

SparkQA commented Jan 31, 2016

Test build #50459 has finished for PR 10678 at commit ddfebbf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

s // Nothing we can do here. Return original plan.
} else {
// Add the missing attributes into projectList of Project/Window or
// aggregateExpressions of Aggregate, if they are in the inputSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: too many spaces

@cloud-fan
Copy link
Contributor

LGTM, pending tests

@rxin
Copy link
Contributor

rxin commented Feb 1, 2016

Let's get @marmbrus to take a look at this one too.

@SparkQA
Copy link

SparkQA commented Feb 1, 2016

Test build #50479 has finished for PR 10678 at commit c2964da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

marmbrus commented Feb 1, 2016

LGTM, thanks for adding a bunch of tests!

Merging to master.

@asfgit asfgit closed this in 8f26eb5 Feb 1, 2016
@gatorsmile gatorsmile deleted the sortWindows branch February 2, 2016 03:36
@gatorsmile
Copy link
Member Author

Thank you everyone! : )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants