Skip to content

Commit

Permalink
[SPARK-3426] Fix sort-based shuffle error when spark.shuffle.compress…
Browse files Browse the repository at this point in the history
… and spark.shuffle.spill.compress settings are different

This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the
`spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have
different values.

The problem is that sort-based shuffle's read and write paths use different
settings for determining whether to apply compression.  ExternalSorter writes
runs to files using `TempBlockId` ids, which causes
`spark.shuffle.spill.compress` to be used for enabling compression, but these
spilled files end up being shuffled over the network and read as shuffle files
using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes
`spark.shuffle.compress` to be used for enabling decompression.  As a result,
this leads to errors when these settings disagree.

Based on the discussions in #2247 and #2178, it sounds like we don't want to
remove the `spark.shuffle.spill.compress` setting.  Therefore, I've tried to
come up with a fix where `spark.shuffle.spill.compress` is used to compress
data that's read and written locally and `spark.shuffle.compress` is used to
compress any data that will be fetched / read as shuffle blocks.

To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and
`TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and
`spark.shuffle.compress`, respectively.  ExternalAppendOnlyMap also used temp
blocks for spilling data.  It looks like ExternalSorter was designed to be
a generic sorter but its configuration already happens to be tied to sort-based
shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress
its spills; we can move the compression configuration to the constructor in
a later commit if we find that ExternalSorter is being used in other contexts
where we want different configuration options to control compression.  To
summarize:

**Before:**

|       | ExternalAppendOnlyMap        | ExternalSorter               |
|-------|------------------------------|------------------------------|
| Read  | spark.shuffle.spill.compress | spark.shuffle.compress       |
| Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress |

**After:**

|       | ExternalAppendOnlyMap        | ExternalSorter         |
|-------|------------------------------|------------------------|
| Read  | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.compress |

Thanks to andrewor14 for debugging this with me!

Author: Josh Rosen <[email protected]>

Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits:

1921cf6 [Josh Rosen] Minor edit for clarity.
c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock().
2c687b9 [Josh Rosen] Fix SPARK-3426.
91e7e40 [Josh Rosen] Combine tests into single test of all combinations
76ca65e [Josh Rosen] Add regression test for SPARK-3426.
  • Loading branch information
JoshRosen authored and andrewor14 committed Oct 22, 2014
1 parent 97cf19f commit 813effc
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 11 deletions.
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,14 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
def name = "input-" + streamId + "-" + uniqueId
}

/** Id associated with temporary data managed as blocks. Not serializable. */
private[spark] case class TempBlockId(id: UUID) extends BlockId {
def name = "temp_" + id
/** Id associated with temporary local data managed as blocks. Not serializable. */
private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
def name = "temp_local_" + id
}

/** Id associated with temporary shuffle data managed as blocks. Not serializable. */
private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {
def name = "temp_shuffle_" + id
}

// Intended only for testing purposes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,8 @@ private[spark] class BlockManager(
case _: ShuffleBlockId => compressShuffle
case _: BroadcastBlockId => compressBroadcast
case _: RDDBlockId => compressRdds
case _: TempBlockId => compressShuffleSpill
case _: TempLocalBlockId => compressShuffleSpill
case _: TempShuffleBlockId => compressShuffle
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,20 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
getAllFiles().map(f => BlockId(f.getName))
}

/** Produces a unique block id and File suitable for intermediate results. */
def createTempBlock(): (TempBlockId, File) = {
var blockId = new TempBlockId(UUID.randomUUID())
/** Produces a unique block id and File suitable for storing local intermediate results. */
def createTempLocalBlock(): (TempLocalBlockId, File) = {
var blockId = new TempLocalBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempBlockId(UUID.randomUUID())
blockId = new TempLocalBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
}

/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempShuffleBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class ExternalAppendOnlyMap[K, V, C](
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
*/
override protected[this] def spill(collection: SizeTracker): Unit = {
val (blockId, file) = diskBlockManager.createTempBlock()
val (blockId, file) = diskBlockManager.createTempLocalBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize,
curWriteMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId}
*
* If combining is disabled, the type C must equal V -- we'll cast the objects at the end.
*
* Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied
* to its use in sort-based shuffle (for example, its block compression is controlled by
* `spark.shuffle.compress`). We may need to revisit this if ExternalSorter is used in other
* non-shuffle contexts where we might want to use different configuration settings.
*
* @param aggregator optional Aggregator with combine functions to use for merging data
* @param partitioner optional Partitioner; if given, sort by partition ID and then key
* @param ordering optional Ordering to sort keys within each partition; should be a total ordering
Expand Down Expand Up @@ -259,7 +264,10 @@ private[spark] class ExternalSorter[K, V, C](
private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = {
assert(!bypassMergeSort)

val (blockId, file) = diskBlockManager.createTempBlock()
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
var objectsWritten = 0 // Objects written since the last flush
Expand Down Expand Up @@ -338,7 +346,10 @@ private[spark] class ExternalSorter[K, V, C](
if (partitionWriters == null) {
curWriteMetrics = new ShuffleWriteMetrics()
partitionWriters = Array.fill(numPartitions) {
val (blockId, file) = diskBlockManager.createTempBlock()
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
}
}
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,30 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
assert(thrown.getClass === classOf[SparkException])
assert(thrown.getMessage.toLowerCase.contains("serializable"))
}

test("shuffle with different compression settings (SPARK-3426)") {
for (
shuffleSpillCompress <- Set(true, false);
shuffleCompress <- Set(true, false)
) {
val conf = new SparkConf()
.setAppName("test")
.setMaster("local")
.set("spark.shuffle.spill.compress", shuffleSpillCompress.toString)
.set("spark.shuffle.compress", shuffleCompress.toString)
.set("spark.shuffle.memoryFraction", "0.001")
resetSparkContext()
sc = new SparkContext(conf)
try {
sc.parallelize(0 until 100000).map(i => (i / 4, i)).groupByKey().collect()
} catch {
case e: Exception =>
val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," +
s" spark.shuffle.compress=$shuffleCompress"
throw new Exception(errMsg, e)
}
}
}
}

object ShuffleSuite {
Expand Down

0 comments on commit 813effc

Please sign in to comment.