Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2787: Make sort-based shuffle write files directly when there's no sorting/aggregation and # partitions is small #1799

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", boundPort.toString)
}

// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
val name = conf.get(propertyName, defaultClassName)
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
Expand All @@ -178,11 +176,17 @@ object SparkEnv extends Logging {
}
}

val serializer = instantiateClass[Serializer](
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
}

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

val closureSerializer = instantiateClass[Serializer](
val closureSerializer = instantiateClassFromConf[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
Expand Down Expand Up @@ -246,8 +250,13 @@ object SparkEnv extends Logging {
"."
}

val shuffleManager = instantiateClass[ShuffleManager](
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into a problem using these short names: in ShuffleBlockManager, there's a line that looks at the spark.shuffle.manager property to see whether we're using sort-based shuffle:

  // Are we using sort-based shuffle?
  val sortBasedShuffle =
    conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName

This won't work properly if the configuration property is set to one of the short names.

We can't just re-assign the property to the full name because the BlockManager will have already been created by this point and it will have created the ShuffleBlockManager with the wrong property value. Similarly, the ShuffleBlockManager can't access SparkEnv to inspect the actual ShuffleManager because it won't be fully initialized.

I think we should perform all configuration normalization / mutation at a single top-level location and then treat the configuration as immutable from that point forward, since that seems easier to reason about. What do you think about moving the aliasing / normalization to the top of SparkEnv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not change the configuration under the user, that would be confusing if they later print it or look in the web UI. Instead, maybe add a SparkEnv.getShuffleManagerClass(conf: SparkConf) that can return the real class name.

Also I'd be fine initializing the ShuffleBlockManager after the ShuffleManager if that works, and using isInstanceOf. That would be the cleanest.

val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] class HashShuffleReader[K, C](
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.write(aggregatedIter)
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
sorter.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class SortShuffleWriter[K, V, C](

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null
private var indexFile: File = null

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
Expand All @@ -57,78 +58,36 @@ private[spark] class SortShuffleWriter[K, V, C](

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
// Get an iterator with the elements for each partition ID
val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.write(records)
sorter.partitionedIterator
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we
// don't care whether the keys get sorted in each partition; that will be done on the
// reduce side if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.write(records)
sorter.partitionedIterator
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}

// Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later
// serve different ranges of this file using an index file that we create at the end.
val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)
outputFile = blockManager.diskBlockManager.getFile(blockId)

// Track location of each range in the output file
val offsets = new Array[Long](numPartitions + 1)
val lengths = new Array[Long](numPartitions)

for ((id, elements) <- partitions) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize,
writeMetrics)
for (elem <- elements) {
writer.write(elem)
}
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
} else {
// The partition is empty; don't create a new writer to avoid writing headers, etc
offsets(id + 1) = offsets(id)
}
}

context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled

// Write an index file with the offsets of each block, plus a final offset at the end for the
// end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure
// out where each block begins and ends.
outputFile = blockManager.diskBlockManager.getFile(blockId)
indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index")

val diskBlockManager = blockManager.diskBlockManager
val indexFile = diskBlockManager.getFile(blockId.name + ".index")
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
var i = 0
while (i < numPartitions + 1) {
out.writeLong(offsets(i))
i += 1
}
} finally {
out.close()
}
val partitionLengths = sorter.writePartitionedFile(blockId, context)

// Register our map output with the ShuffleBlockManager, which handles cleaning it over time
blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions)

mapStatus = new MapStatus(blockManager.blockManagerId,
lengths.map(MapOutputTracker.compressSize))
partitionLengths.map(MapOutputTracker.compressSize))
}

/** Close this writer, passing along whether the map completed */
Expand All @@ -145,6 +104,9 @@ private[spark] class SortShuffleWriter[K, V, C](
if (outputFile != null) {
outputFile.delete()
}
if (indexFile != null) {
indexFile.delete()
}
return None
}
} finally {
Expand Down
Loading