Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7871][SQL]Improve the outputPartitioning for HashOuterJoin #6413

Closed
wants to merge 12 commits into from

Conversation

chenghao-intel
Copy link
Contributor

https://issues.apache.org/jira/browse/SPARK-7871
Optimize the Full outer join followed by another join with the same equi-join key.
For example:

explain SELECT l.key, m.key, r.key from src l full outer join src m on l.key=m.key full outer join src r on l.key=r.key;

Before this PR: (4 Exchange, and include one Exchange for intermediate result repartitioning, this probably cause huge performance problem)
== Physical Plan ==
Project [key#1,key#3,key#5]
 HashOuterJoin [key#1], [key#5], FullOuter, None
  Exchange (HashPartitioning [key#1], 200), []
   Project [key#1,key#3]
    HashOuterJoin [key#1], [key#3], FullOuter, None
     Exchange (HashPartitioning [key#1], 200), []
      HiveTableScan [key#1], (MetastoreRelation default, src, Some(l)), None
     Exchange (HashPartitioning [key#3], 200), []
      HiveTableScan [key#3], (MetastoreRelation default, src, Some(m)), None
  Exchange (HashPartitioning [key#5], 200), []
   HiveTableScan [key#5], (MetastoreRelation default, src, Some(r)), None

Applied this PR (3 Exchange only for raw table data repartitioning)
== Physical Plan ==
Project [key#1,key#3,key#5]
 HashOuterJoin [key#1], [key#5], FullOuter, None
  Project [key#1,key#3]
   HashOuterJoin [key#1], [key#3], FullOuter, None
    Exchange (HashPartitioning 200)
     HiveTableScan [key#1], (MetastoreRelation default, src, Some(l)), None
    Exchange (HashPartitioning 200)
     HiveTableScan [key#3], (MetastoreRelation default, src, Some(m)), None
  Exchange (HashPartitioning 200)
   HiveTableScan [key#5], (MetastoreRelation default, src, Some(r)), None

However, we don't want to change the logic for being followed by a GROUP BY, whose keys are exactly the same as the join key.


explain SELECT l.key, count(m.value) from src l full outer join src m on l.key=m.key group by l.key;
== Physical Plan ==
Aggregate false, [key#9], [key#9,Coalesce(SUM(PartialCount#14L),0) AS _c1#7L]
 Exchange (HashPartitioning 200)
  Aggregate true, [key#9], [key#9,COUNT(value#12) AS PartialCount#14L]
   Project [key#9,value#12]
    HashOuterJoin [key#9], [key#11], FullOuter, None
     Exchange (HashPartitioning 200)
      HiveTableScan [key#9], (MetastoreRelation default, src, Some(l)), None
     Exchange (HashPartitioning 200)
      HiveTableScan [key#11,value#12], (MetastoreRelation default, src, Some(m)), None
Even the join key exactly the same with the group by key, however, the full outer join probably produce the null value for (l.key), hence we have to add another Exchange for Aggregate.

As we probably involve more factors to determine if data shuffle needed, I've also refactor the entire code for EnsureRequirements, by introducing the Gap of the requiredDistribution and the outputPartitioning.

// to describe the output data distribution.
abstract class Partitioning {
/** the number of partitions that the data is split across */
  numPartitions: Option[Int] = None,

  /** the expressions that are used to key the partitioning. */
  clusterKeys: Seq[Expression] = Nil,

  /** the expression that are used to sort the data. */
  sortKeys: Seq[SortOrder] = Nil,

  /** work with `sortKeys` if the sorting cross or just within the partition. */
  globalOrdered: Boolean = false,

  /** to indicate if null clustering key will be generated. */
  additionalNullClusterKeyGenerated: Boolean = true
}

// to describe the required data distribution to the child operator.
trait Distribution (UnspecifiedDistribution, ClusteredDistribution, OrderedDistribution) 

// to describe the additional operations needed to the child operator,
// according to the associated `requiredDistribution` and the child's `outputPartitioning`.
trait Gap(NoGap, SortKeyWithinPartition, GlobalOrdering, RepartitionKey, RepartitionKeyAndSort)

This contains the code refactor for exchange in Spark SQL. It's a WIP PR and still need to do:

  • Enable more unit tests
  • Add More Scala Doc
  • Review all of the existed physical Plan for its requiredDistribution and outputPartitioning

A known issue will be solved in another PR once this PR merged. (https://issues.apache.org/jira/browse/SPARK-2205)

@SparkQA
Copy link

SparkQA commented May 26, 2015

Test build #33519 has finished for PR 6413 at commit 5e6516f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33589 has finished for PR 6413 at commit 970a2dc.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(

.queryExecution.executedPlan
val exchanges = planned.collect { case n: Exchange => n }

assert(exchanges.size === 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is these changs doesn't effect to

testData
  .join(testData2, testData("key") === testData2("a"), "outer")
  .join(testData2, testData("a") === testData3("a"), "outer")

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires some further change I think, @yhuai should have some idea on this.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33593 has finished for PR 6413 at commit a69f2ae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(

* as a valid value if `nullKeysSensitive` == true.
*
* For examples:
* JOIN KEYS: values contains null will be considered as invalid values, which means
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here values means the original value of the table or the intermediate value of the join?
is the null in original data of table also considered as invalid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be the input value. (Either the original data from table or the intermediate result(e.g. join outputs)).

Validity of the null in the original table, depends on the semantics, in Join, it's should also be invalid, but it's valid for Group BY.

It would an other optimization for repartition, contains null in the join keys.

@chenghao-intel
Copy link
Contributor Author

sorry, it will causes performance regression for case like

left join a.key=b.key group by a.key, will figure out how to fix it soon.

@chenghao-intel chenghao-intel changed the title [SPARK-7871] [SQL] Improve the outputPartitioning for HashOuterJoin [SPARK-7871][SQL][WIP] Improve the outputPartitioning for HashOuterJoin Jun 22, 2015
@SparkQA
Copy link

SparkQA commented Jun 22, 2015

Test build #35457 has finished for PR 6413 at commit b17a74d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(
    • sealed case class Partitioning(

@chenghao-intel
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 23, 2015

Test build #35498 has finished for PR 6413 at commit b17a74d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(
    • sealed case class Partitioning(

@chenghao-intel chenghao-intel changed the title [SPARK-7871][SQL][WIP] Improve the outputPartitioning for HashOuterJoin [SPARK-7871][SQL]Improve the outputPartitioning for HashOuterJoin Jun 29, 2015
@SparkQA
Copy link

SparkQA commented Jun 29, 2015

Test build #35961 has finished for PR 6413 at commit 32d8af0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 29, 2015

Test build #35965 has finished for PR 6413 at commit e59b4d4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@chenghao-intel
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 29, 2015

Test build #35975 has finished for PR 6413 at commit e59b4d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(
    • sealed case class Partitioning(

@chenghao-intel
Copy link
Contributor Author

@yhuai sorry it's a big change. :) can you review this for me?

@chenghao-intel
Copy link
Contributor Author

Sorry, I found another bug, will solve it soon.

@SparkQA
Copy link

SparkQA commented Jul 2, 2015

Test build #36368 has finished for PR 6413 at commit bd37778.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(
    • sealed case class Partitioning(

@SparkQA
Copy link

SparkQA commented Jul 2, 2015

Test build #36379 has finished for PR 6413 at commit bd4541d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 2, 2015

Test build #36392 has finished for PR 6413 at commit fcb9aed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(
    • sealed case class Partitioning(

@SparkQA
Copy link

SparkQA commented Jul 3, 2015

Test build #36499 has finished for PR 6413 at commit ec4e5c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredDistribution(
    • sealed case class Partitioning(

clusterKeys,
sortKeys,
globalOrdered,
additionalNullClusterKeyGenerated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may use the copy method comes with all case classes:

this.copy(numPartitions = Some(num))

@JoshRosen
Copy link
Contributor

If I'm not mistaken, I think that this patch's changes will be subsumed by the combination of #7773 and the null-unsafe/safe parts of #7685. Are there any changes in this patch that are missed by the combination of those other two patches?

@yhuai
Copy link
Contributor

yhuai commented Aug 1, 2015

@JoshRosen Right, #7773 and the null-unsafe/safe parts of #7685 will address the issue.

@chenghao-intel
Copy link
Contributor Author

ok, I am closing this pr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants