Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3367] Remove spark.shuffle.spill.compress #2247

Closed
wants to merge 1 commit into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Sep 3, 2014

Replaced it with existing spark.shuffle.compress.

See discussion in #2178 (comment)

Replaced it with existing spark.shuffle.compress.
@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have started for PR 2247 at commit 2029d60.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have finished for PR 2247 at commit 2029d60.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor Author

rxin commented Sep 3, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have started for PR 2247 at commit 2029d60.

  • This patch merges cleanly.

@mridulm
Copy link
Contributor

mridulm commented Sep 3, 2014

Is this being done to workaround the bug (sort shuffle breakage) ?
The way we use it is, not use compress but only enable it if spilled to
disk (a corner case) - minimizing compression overhead for normal case
while enabling handling for degenerate cases.
On 03-Sep-2014 1:49 pm, "Apache Spark QA" [email protected] wrote:

QA tests have started
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19651/consoleFull
for PR 2247 at commit 2029d60
2029d60
.

  • This patch merges cleanly.


Reply to this email directly or view it on GitHub
#2247 (comment).

@rxin
Copy link
Contributor Author

rxin commented Sep 3, 2014

What compression overhead are you talking about?

@mridulm
Copy link
Contributor

mridulm commented Sep 3, 2014

The overhead of compressing ?
On 03-Sep-2014 2:02 pm, "Reynold Xin" [email protected] wrote:

What compression overhead are you talking about?


Reply to this email directly or view it on GitHub
#2247 (comment).

@rxin
Copy link
Contributor Author

rxin commented Sep 3, 2014

I don't get it. Are you saying compression is turned on for spill files, but not for general shuffle files?

And in response to your original question about sort leakage: yes this is breaking that. See discussion in #2178 (comment)

@mridulm
Copy link
Contributor

mridulm commented Sep 3, 2014

Yes, exactly : with higher block size configured sometimes.
Most tasks will not incur (de)compression overhead except for the very few
which spill due to skew.
On 03-Sep-2014 2:09 pm, "Reynold Xin" [email protected] wrote:

I don't get it. Are you saying compression is turned on for spill files,
but not for general shuffle files?

And in response to your original question about sort leakage: yes this is
breaking that. See discussion in #2178 (comment)
#2178 (comment)


Reply to this email directly or view it on GitHub
#2247 (comment).

@rxin
Copy link
Contributor Author

rxin commented Sep 3, 2014

Do you have any data on a job in which disabling shuffle compression helps with performance (especially with sort-based shuffle in which the number of concurrent streams is low)? In all cases I've seen, compression improves performance. I'm not sure if we should keep two options to satisfy an extremely rare corner case.

@SparkQA
Copy link

SparkQA commented Sep 3, 2014

QA tests have finished for PR 2247 at commit 2029d60.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Sep 3, 2014

I think we are looking at it the wrong way - this is an exposed interface
which is being removed in a minor release.

It might also break direct usage of ExternalAppendOnlyMap, but since that
is developerapi, that is less of direct concern (though potentially more
disruptive).
On 03-Sep-2014 2:19 pm, "Reynold Xin" [email protected] wrote:

Do you have any data on a job in which disabling shuffle compression helps
with performance substantially (especially with sort-based shuffle in which
the number of concurrent streams is low)? In all cases I've seen,
compression improves performance. I'm not sure if we should keep two
options to satisfy an extremely rare corner case.


Reply to this email directly or view it on GitHub
#2247 (comment).

@andrewor14
Copy link
Contributor

Yes, unfortunately we currently document spark.shuffle.spill.compress in addition to spark.shuffle.compress. I guess we just can't back port this into 1.1.

@mateiz
Copy link
Contributor

mateiz commented Sep 3, 2014

It's not an interface in the sense that program output doesn't change. It's only a config setting for an optimization.

Anyway, I agree with Reynold -- I'd like to see an example where spark.shuffle.spill.compress and spark.shuffle.compress need to be at different values, and see performance numbers for that. It seems to me that you're spilling the same kind of objects in both, so there will be the same tradeoff between I/O and compute time.

@tgravescs
Copy link
Contributor

I haven't looked at all the details of this, but to me configs are public interfaces. By you removing one that someone had used in the past could now cause their program to behave differently between minor versions, which might be unexpected. In general I would try to keep things backwards compatible. If this is direct mapping why not just have the code check if either is set?

@mridulm
Copy link
Contributor

mridulm commented Sep 4, 2014

I am not sure I follow ... We compress shuffle with first while we compress
spills with second. How would that be same ? The perf characteristics would
be different.

The team has decided to use it, and I don't want to second guess their
reasons whatever they might be (that just goes into unnecessary
discussions).
Given it is a public interface available in 1.0 (iirc since much before ?)
, we can't remove support for it until next major release.... We can of
course deprecate and warn against use, but will need to continue supporting
it until next major release.
On 04-Sep-2014 2:10 am, "Matei Zaharia" [email protected] wrote:

It's not an interface in the sense that program output doesn't change.
It's only a config setting for an optimization.

Anyway, I agree with Reynold -- I'd like to see an example where
spark.shuffle.spill.compress and spark.shuffle.compress need to be at
different values, and see performance numbers for that. It seems to me that
you're spilling the same kind of objects in both, so there will be the same
tradeoff between I/O and compute time.


Reply to this email directly or view it on GitHub
#2247 (comment).

@mateiz
Copy link
Contributor

mateiz commented Sep 4, 2014

The same data that gets spilled gets shuffled in most operations. Anyway, I'd be okay keeping it, but I'd like to understand why it's important. In general we should be aiming to reduce the number of config settings. Maybe this team is doing something wrong by using it. It's important to know the reason if we want to make a decision about reducing the # of config settings.

@tgravescs
Copy link
Contributor

Perhaps we should have a separate discussion on the dev mailing list as what we consider public interfaces? That way everyone is in agreement or at least is following the same rules when things get committed.

@mateiz
Copy link
Contributor

mateiz commented Sep 4, 2014

Sure, I'd be happy to codify that. In general my approach to this has been that we keep all config settings and defaults that affect correctness (whether the program gives the right result), but things that affect performance are fair to change. For example, we've kept spark.serializer to be Java since changing this by default would break a lot of apps. But we've tweaked things like the % of memory used for caching and buffer sizes, and we've also made some settings largely irrelevant (e.g. Akka frame size used to matter a lot, but we moved first task results and then tasks themselves to not be sent through Akka).

Anyway, in this case I'm not arguing that this should be removed in 1.X if there are concerns, I'm just trying to understand whether this is a useful setting to have longer-term. In particular, should we deprecate it and remove it in 2.X? So far I haven't seen a strong case that this is a useful setting, but I might be missing some use case.

@rxin
Copy link
Contributor Author

rxin commented Sep 6, 2014

Ok I'm going to close this for now given the debate.

@mridulm it would be great to hear more about the performance difference. It is hard for me to imagine a scenario in which you would want different settings for the two.

@andrewor14 can you fix the bug?

@rxin rxin closed this Sep 6, 2014
@andrewor14
Copy link
Contributor

Ok. I have filed a JIRA at https://issues.apache.org/jira/browse/SPARK-3426

asfgit pushed a commit that referenced this pull request Oct 22, 2014
… and spark.shuffle.spill.compress settings are different

This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the
`spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have
different values.

The problem is that sort-based shuffle's read and write paths use different
settings for determining whether to apply compression.  ExternalSorter writes
runs to files using `TempBlockId` ids, which causes
`spark.shuffle.spill.compress` to be used for enabling compression, but these
spilled files end up being shuffled over the network and read as shuffle files
using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes
`spark.shuffle.compress` to be used for enabling decompression.  As a result,
this leads to errors when these settings disagree.

Based on the discussions in #2247 and #2178, it sounds like we don't want to
remove the `spark.shuffle.spill.compress` setting.  Therefore, I've tried to
come up with a fix where `spark.shuffle.spill.compress` is used to compress
data that's read and written locally and `spark.shuffle.compress` is used to
compress any data that will be fetched / read as shuffle blocks.

To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and
`TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and
`spark.shuffle.compress`, respectively.  ExternalAppendOnlyMap also used temp
blocks for spilling data.  It looks like ExternalSorter was designed to be
a generic sorter but its configuration already happens to be tied to sort-based
shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress
its spills; we can move the compression configuration to the constructor in
a later commit if we find that ExternalSorter is being used in other contexts
where we want different configuration options to control compression.  To
summarize:

**Before:**

|       | ExternalAppendOnlyMap        | ExternalSorter               |
|-------|------------------------------|------------------------------|
| Read  | spark.shuffle.spill.compress | spark.shuffle.compress       |
| Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress |

**After:**

|       | ExternalAppendOnlyMap        | ExternalSorter         |
|-------|------------------------------|------------------------|
| Read  | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.compress |

Thanks to andrewor14 for debugging this with me!

Author: Josh Rosen <[email protected]>

Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits:

1921cf6 [Josh Rosen] Minor edit for clarity.
c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock().
2c687b9 [Josh Rosen] Fix SPARK-3426.
91e7e40 [Josh Rosen] Combine tests into single test of all combinations
76ca65e [Josh Rosen] Add regression test for SPARK-3426.

Conflicts:
	core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
asfgit pushed a commit that referenced this pull request Oct 22, 2014
… and spark.shuffle.spill.compress settings are different

This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the
`spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have
different values.

The problem is that sort-based shuffle's read and write paths use different
settings for determining whether to apply compression.  ExternalSorter writes
runs to files using `TempBlockId` ids, which causes
`spark.shuffle.spill.compress` to be used for enabling compression, but these
spilled files end up being shuffled over the network and read as shuffle files
using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes
`spark.shuffle.compress` to be used for enabling decompression.  As a result,
this leads to errors when these settings disagree.

Based on the discussions in #2247 and #2178, it sounds like we don't want to
remove the `spark.shuffle.spill.compress` setting.  Therefore, I've tried to
come up with a fix where `spark.shuffle.spill.compress` is used to compress
data that's read and written locally and `spark.shuffle.compress` is used to
compress any data that will be fetched / read as shuffle blocks.

To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and
`TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and
`spark.shuffle.compress`, respectively.  ExternalAppendOnlyMap also used temp
blocks for spilling data.  It looks like ExternalSorter was designed to be
a generic sorter but its configuration already happens to be tied to sort-based
shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress
its spills; we can move the compression configuration to the constructor in
a later commit if we find that ExternalSorter is being used in other contexts
where we want different configuration options to control compression.  To
summarize:

**Before:**

|       | ExternalAppendOnlyMap        | ExternalSorter               |
|-------|------------------------------|------------------------------|
| Read  | spark.shuffle.spill.compress | spark.shuffle.compress       |
| Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress |

**After:**

|       | ExternalAppendOnlyMap        | ExternalSorter         |
|-------|------------------------------|------------------------|
| Read  | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.compress |

Thanks to andrewor14 for debugging this with me!

Author: Josh Rosen <[email protected]>

Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits:

1921cf6 [Josh Rosen] Minor edit for clarity.
c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock().
2c687b9 [Josh Rosen] Fix SPARK-3426.
91e7e40 [Josh Rosen] Combine tests into single test of all combinations
76ca65e [Josh Rosen] Add regression test for SPARK-3426.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants