Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15122][SQL] Fix TPC-DS 41 - Normalize predicates before pulling them out #12954

Closed
wants to merge 5 commits into from

Conversation

hvanhovell
Copy link
Contributor

@hvanhovell hvanhovell commented May 6, 2016

What changes were proposed in this pull request?

The official TPC-DS 41 query currently fails because it contains a scalar subquery with a disjunctive correlated predicate (the correlated predicates were nested in ORs). This makes the Analyzer pull out the entire predicate which is wrong and causes the following (correct) analysis exception: The correlated scalar subquery can only contain equality predicates

This PR fixes this by first simplifing (or normalizing) the correlated predicates before pulling them out of the subquery.

How was this patch tested?

Manual testing on TPC-DS 41, and added a test to SubquerySuite.

@hvanhovell
Copy link
Contributor Author

cc @rxin @davies

@SparkQA
Copy link

SparkQA commented May 6, 2016

Test build #57994 has finished for PR 12954 at commit f0871c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It weird to use optimizer rule in analyzer. We could have this workaround for now, could you create a JIRA for do a proper fix later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree on that. I could move the rule from optimizer to analysis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the rule in the latest commit.

@davies
Copy link
Contributor

davies commented May 6, 2016

I still can't run Q41 after this PR:

 select distinct(i_product_name)
 from item i1
 where i_manufact_id between 738 and 738+40
   and (select count(*) as item_cnt
        from item
        where (i_manufact = i1.i_manufact and
        (i_category = 'Women' and
        (i_color = 'powder' or i_color = 'khaki') and
        (i_units = 'Ounce' or i_units = 'Oz') and
        (i_size = 'medium' or i_size = 'extra large')
        ) or
        (i_category = 'Women' and
        (i_color = 'brown' or i_color = 'honeydew') and
        (i_units = 'Bunch' or i_units = 'Ton') and
        (i_size = 'N/A' or i_size = 'small')
        ) or
        (i_category = 'Men' and
        (i_color = 'floral' or i_color = 'deep') and
        (i_units = 'N/A' or i_units = 'Dozen') and
        (i_size = 'petite' or i_size = 'large')
        ) or
        (i_category = 'Men' and
        (i_color = 'light' or i_color = 'cornflower') and
        (i_units = 'Box' or i_units = 'Pound') and
        (i_size = 'medium' or i_size = 'extra large')
        ))
        or
        (i_manufact = i1.i_manufact and
        ((i_category = 'Women' and
        (i_color = 'midnight' or i_color = 'snow') and
        (i_units = 'Pallet' or i_units = 'Gross') and
        (i_size = 'medium' or i_size = 'extra large')
        ) or
        (i_category = 'Women' and
        (i_color = 'cyan' or i_color = 'papaya') and
        (i_units = 'Cup' or i_units = 'Dram') and
        (i_size = 'N/A' or i_size = 'small')
        ) or
        (i_category = 'Men' and
        (i_color = 'orange' or i_color = 'frosted') and
        (i_units = 'Each' or i_units = 'Tbl') and
        (i_size = 'petite' or i_size = 'large')
        ) or
        (i_category = 'Men' and
        (i_color = 'forest' or i_color = 'ghost') and
        (i_units = 'Lb' or i_units = 'Bundle') and
        (i_size = 'medium' or i_size = 'extra large')
        )))) > 0
 order by i_product_name
 limit 100


Traceback (most recent call last):
  File "tpcds.py", line 117, in <module>
    test(sys.argv[1], 2)
  File "tpcds.py", line 83, in test
    sqlContext.sql(q).explain(False)
  File "/Users/davies/work/spark/python/pyspark/sql/context.py", line 345, in sql
    return self.sparkSession.sql(sqlQuery)
  File "/Users/davies/work/spark/python/pyspark/sql/session.py", line 505, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "//anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 538, in __call__
    self.target_id, self.name)
  File "/Users/davies/work/spark/python/pyspark/sql/utils.py", line 63, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'The correlated scalar subquery can only contain equality predicates: ((((i_category#215#484 = Women) && (((((i_manufact#217#486 = i_manufact#217) && ((i_color#220#485 = powder) || (i_color#220#485 = khaki))) && ((i_units#221#488 = Ounce) || (i_units#221#488 = Oz))) && ((i_size#218#487 = medium) || (i_size#218#487 = extra large))) || ((((i_color#220#485 = brown) || (i_color#220#485 = honeydew)) && ((i_units#221#488 = Bunch) || (i_units#221#488 = Ton))) && ((i_size#218#487 = N/A) || (i_size#218#487 = small))))) || ((i_category#215#484 = Men) && (((((i_color#220#485 = floral) || (i_color#220#485 = deep)) && ((i_units#221#488 = N/A) || (i_units#221#488 = Dozen))) && ((i_size#218#487 = petite) || (i_size#218#487 = large))) || ((((i_color#220#485 = light) || (i_color#220#485 = cornflower)) && ((i_units#221#488 = Box) || (i_units#221#488 = Pound))) && ((i_size#218#487 = medium) || (i_size#218#487 = extra large)))))) || ((i_manufact#217#486 = i_manufact#217) && (((i_category#215#484 = Women) && (((((i_color#220#485 = midnight) || (i_color#220#485 = snow)) && ((i_units#221#488 = Pallet) || (i_units#221#488 = Gross))) && ((i_size#218#487 = medium) || (i_size#218#487 = extra large))) || ((((i_color#220#485 = cyan) || (i_color#220#485 = papaya)) && ((i_units#221#488 = Cup) || (i_units#221#488 = Dram))) && ((i_size#218#487 = N/A) || (i_size#218#487 = small))))) || ((i_category#215#484 = Men) && (((((i_color#220#485 = orange) || (i_color#220#485 = frosted)) && ((i_units#221#488 = Each) || (i_units#221#488 = Tbl))) && ((i_size#218#487 = petite) || (i_size#218#487 = large))) || ((((i_color#220#485 = forest) || (i_color#220#485 = ghost)) && ((i_units#221#488 = Lb) || (i_units#221#488 = Bundle))) && ((i_size#218#487 = medium) || (i_size#218#487 = extra large))))))));'

@davies
Copy link
Contributor

davies commented May 6, 2016

@hvanhovell never mind, I made a mistake in the query

* correlated [[ScalarSubquery]] expressions are wrapped in a some Predicate expression, we rewrite
* them into [[PredicateSubquery]] expressions.
*/
object RewriteScalarSubqueriesInFilter extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule is not necessary for this fix, right?

Also, we have a rule to turn outer join into semi join if possible, does that one not work in case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not nessecary for this fix. It is left over from the initial attempt. I do think it adds value, since it allows us to use queries which might return multiple values for one key.

We do have a rule that promotes and outer join into an inner join, but we do not have one that turns a outer/inner join into a semi join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it if you want to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With inner join, the predicate could be pushed down into subquery actually, but it was not with semi join. So it's not that obvious that which one is better.

After looking it for a long time, I'm not convinced it's correct ( or the best way to do it). Could you keep it out of this PR for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage of Semi Join is that a subquery can actually return multiple results for one row without causing correctness problems. My initial approach was to relax the rules for scalar subquery (allow disjunctive predicates) and prevent possible duplicates by using left semis. This didn't work because I was also pulling non-correlated predicates through the aggregate (which makes its results invalid).

I am not sure which predicates you want to push through the left semi. Since all predicates that should be pushed down the left hand side are already in the predicate condition. But I might be missing something here.

I have remove this in my last commit. I do feel that this might be a small improvement over the current situation. Let's revisit this after Spark 2.0.

@SparkQA
Copy link

SparkQA commented May 6, 2016

Test build #58031 has finished for PR 12954 at commit b487925.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* 3. Merge same expressions
* 4. Removes `Not` operator.
*/
object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this one into analysis is even worse ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, what do you suggest then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import it into analyzer as temporary fix, we could think of a proper way later.

@SparkQA
Copy link

SparkQA commented May 7, 2016

Test build #58047 has finished for PR 12954 at commit e28bbb6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented May 7, 2016

LGTM,
Merging this into master and 2.0 branch, thanks!

@asfgit asfgit closed this in df89f1d May 7, 2016
asfgit pushed a commit that referenced this pull request May 7, 2016
…ng them out

## What changes were proposed in this pull request?
The official TPC-DS 41 query currently fails because it contains a scalar subquery with a disjunctive correlated predicate (the correlated predicates were nested in ORs). This makes the `Analyzer` pull out the entire predicate which is wrong and causes the following (correct) analysis exception: `The correlated scalar subquery can only contain equality predicates`

This PR fixes this by first simplifing (or normalizing) the correlated predicates before pulling them out of the subquery.

## How was this patch tested?
Manual testing on TPC-DS 41, and added a test to SubquerySuite.

Author: Herman van Hovell <[email protected]>

Closes #12954 from hvanhovell/SPARK-15122.

(cherry picked from commit df89f1d)
Signed-off-by: Davies Liu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants