Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2423] Clean up SparkSubmit for readability #1349

Closed
wants to merge 5 commits into from

Conversation

andrewor14
Copy link
Contributor

It is currently non-trivial to trace through how different combinations of cluster managers (e.g. yarn) and deploy modes (e.g. cluster) are processed in SparkSubmit. Moving forward, it will be easier to extend SparkSubmit if we first re-organize the code by grouping related logic together.

This is a precursor to fixing standalone-cluster mode, which is currently broken (SPARK-2260).

This also moves the code block for python applications back to the
right place.
@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

* modes that Spark supports.
* Main gateway of launching a Spark application.
*
* This script handles setting up the classpath with relevant Spark dependencies and provides
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think saying "this script" is a little confusing. If the class encapsulated all the logic behind the script, it might make a little more sense, but the script has some logic of its own.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I'll call it a program or something

@sryza
Copy link
Contributor

sryza commented Jul 9, 2014

These changes seem reasonable to me.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16483/

@andrewor14
Copy link
Contributor Author

An important goal of this PR is to keep the functionality the same. I have reviewed my own changes closely to verify that the logic is preserved, but it would be best if others can also take a close look to make sure I didn't miss any case. I think @sryza and @pwendell are good candidates (though it looks like one of you already did).

I will do an end-to-end sanity test on each of the platforms to make sure this doesn't break anything.

In the yarn code block, we set the deploy mode to "cluster" the master
is "yarn-cluster" and deply mode is not specified. However, by then we
have already passed the error checks that prevent users from launching
python and shell applications in cluster mode.

This is fixed by re-ordering the two code blocks.
@andrewor14
Copy link
Contributor Author

I found a small bug by testing on YARN. If we run a shell or python application with --master yarn-cluster and no deploy mode, spark submit is supposed to get mad because cluster mode is not supported for either application. This is a simple ordering issue fixed in the latest commit.

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA tests have started for PR 1349. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16539/consoleFull

@andrewor14
Copy link
Contributor Author

As of this comment, I have tested these changes in local, standalone and yarn modes, running with additional python files and jars and various configs, and everything behaves as expected. I do not have access to a mesos cluster, but there is basically no mesos specific code in spark submit so I believe it's OK.

As expected, standalone cluster mode still doesn't work. This is to be fixed in a future PR that builds on top of this one.

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA tests have started for PR 1349. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16541/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA results for PR 1349:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
* (4) the main class for the child

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16539/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA results for PR 1349:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
* (4) the main class for the child

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16541/consoleFull

@pwendell
Copy link
Contributor

Thanks Andrew, looks good!

@asfgit asfgit closed this in 9c73822 Jul 17, 2014
@andrewor14 andrewor14 deleted the submit-cleanup branch July 17, 2014 17:28
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
It is currently non-trivial to trace through how different combinations of cluster managers (e.g. yarn) and deploy modes (e.g. cluster) are processed in SparkSubmit. Moving forward, it will be easier to extend SparkSubmit if we first re-organize the code by grouping related logic together.

This is a precursor to fixing standalone-cluster mode, which is currently broken (SPARK-2260).

Author: Andrew Or <[email protected]>

Closes apache#1349 from andrewor14/submit-cleanup and squashes the following commits:

8f99200 [Andrew Or] script -> program (minor)
30f2e65 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-cleanup
fe484a1 [Andrew Or] Move deploy mode checks after yarn code
7167824 [Andrew Or] Re-order config options and update comments
0b01ff8 [Andrew Or] Clean up SparkSubmit for readability
maropu pushed a commit that referenced this pull request Nov 17, 2020
…espect to aliases to avoid unneeded exchange/sort nodes

### What changes were proposed in this pull request?
This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases.

Example: consider this join of three tables:

     |SELECT t2id, t3.id as t3id
     |FROM (
     |    SELECT t1.id as t1id, t2.id as t2id
     |    FROM t1, t2
     |    WHERE t1.id = t2.id
     |) t12, t3
     |WHERE t1id = t3.id

The plan for this looks like:

      *(9) Project [t2id#1034L, id#1004L AS t3id#1035L]
      +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner
         :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343]   <------------------------------
         :     +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L]
         :        +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner
         :           :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0
         :           :  +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329]
         :           :     +- *(1) Range (0, 10, step=1, splits=2)
         :           +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0
         :              +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335]
         :                 +- *(3) Range (0, 20, step=1, splits=2)
         +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349]
               +- *(7) Range (0, 30, step=1, splits=2)

In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project.

### Why are the changes needed?
To remove unneeded exchanges.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT added.

On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange.

Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning.

Authored-by: Prakhar Jain <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
wangyum pushed a commit that referenced this pull request May 26, 2023
…rtitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes (#1092)

* [SPARK-31078][SQL] Respect aliases in output ordering

Currently, in the following scenario, an unnecessary `Sort` node is introduced:
```scala
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
  val df = (0 until 20).toDF("i").as("df")
  df.repartition(8, df("i")).write.format("parquet")
    .bucketBy(8, "i").sortBy("i").saveAsTable("t")
  val t1 = spark.table("t")
  val t2 = t1.selectExpr("i as ii")
  t1.join(t2, t1("i") === t2("ii")).explain
}
```
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
:  +- *(1) Filter isnotnull(i#8)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0    <==== UNNECESSARY
   +- *(2) Project [i#8 AS ii#10]
      +- *(2) Filter isnotnull(i#8)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue.

To better handle aliases in `outputOrdering`.

Yes, now with the fix, the `explain` prints out the following:
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
:  +- *(1) Filter isnotnull(i#8)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Project [i#8 AS ii#10]
   +- *(2) Filter isnotnull(i#8)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```

Tests added.

Closes #27842 from imback82/alias_aware_sort_order.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes

This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases.

Example: consider this join of three tables:

     |SELECT t2id, t3.id as t3id
     |FROM (
     |    SELECT t1.id as t1id, t2.id as t2id
     |    FROM t1, t2
     |    WHERE t1.id = t2.id
     |) t12, t3
     |WHERE t1id = t3.id

The plan for this looks like:

      *(9) Project [t2id#1034L, id#1004L AS t3id#1035L]
      +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner
         :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343]   <------------------------------
         :     +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L]
         :        +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner
         :           :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0
         :           :  +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329]
         :           :     +- *(1) Range (0, 10, step=1, splits=2)
         :           +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0
         :              +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335]
         :                 +- *(3) Range (0, 20, step=1, splits=2)
         +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349]
               +- *(7) Range (0, 30, step=1, splits=2)

In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project.

To remove unneeded exchanges.

No

New UT added.

On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange.

Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning.

Authored-by: Prakhar Jain <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>

* [CARMEL-6306] Fix ut

* [CARMEL-6306] Fix alias not compatible with ebay skew implementation

Co-authored-by: Terry Kim <[email protected]>
Co-authored-by: Prakhar Jain <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants